Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

infinitechannel.go 1.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package event
  2. /*
  3. This package is taken from https://github.com/eapache/channels
  4. as per their suggestion we are not importing the entire package and instead cherry picking and adapting what is needed
  5. It is covered by the MIT License https://github.com/eapache/channels/blob/master/LICENSE
  6. */
  7. // infiniteChannel implements the Channel interface with an infinite buffer between the input and the output.
  8. type infiniteChannel struct {
  9. input, output chan Event
  10. length chan int
  11. buffer *infiniteQueue
  12. }
  13. func newInfiniteChannel() *infiniteChannel {
  14. ch := &infiniteChannel{
  15. input: make(chan Event),
  16. output: make(chan Event),
  17. length: make(chan int),
  18. buffer: newInfinitQueue(),
  19. }
  20. go ch.infiniteBuffer()
  21. return ch
  22. }
  23. func (ch *infiniteChannel) In() chan<- Event {
  24. return ch.input
  25. }
  26. func (ch *infiniteChannel) Out() <-chan Event {
  27. return ch.output
  28. }
  29. func (ch *infiniteChannel) Len() int {
  30. return <-ch.length
  31. }
  32. func (ch *infiniteChannel) Close() {
  33. close(ch.input)
  34. }
  35. func (ch *infiniteChannel) infiniteBuffer() {
  36. var input, output chan Event
  37. var next Event
  38. input = ch.input
  39. for input != nil || output != nil {
  40. select {
  41. case elem, open := <-input:
  42. if open {
  43. ch.buffer.Add(elem)
  44. } else {
  45. input = nil
  46. }
  47. case output <- next:
  48. ch.buffer.Remove()
  49. case ch.length <- ch.buffer.Length():
  50. }
  51. if ch.buffer.Length() > 0 {
  52. output = ch.output
  53. next = ch.buffer.Peek()
  54. } else {
  55. output = nil
  56. //next = nil
  57. }
  58. }
  59. close(ch.output)
  60. close(ch.length)
  61. }