Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

eventQueue.go 2.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package event
  2. import "sync"
  3. type queue struct {
  4. infChan infiniteChannel
  5. lock sync.Mutex
  6. closed bool
  7. }
  8. type simpleQueue struct {
  9. eventChannel chan Event
  10. lock sync.Mutex
  11. closed bool
  12. }
  13. // Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
  14. // The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
  15. // the event.Manager.
  16. type Queue interface {
  17. Publish(event Event)
  18. Next() *Event
  19. Shutdown()
  20. OutChan() <-chan Event
  21. Len() int
  22. }
  23. // NewQueue initializes an event.Queue
  24. func NewQueue() Queue {
  25. queue := &queue{infChan: *newInfiniteChannel()}
  26. return queue
  27. }
  28. // NewSimpleQueue initializes an event.Queue of the given buffer size.
  29. func NewSimpleQueue(buffer int) Queue {
  30. queue := new(simpleQueue)
  31. queue.eventChannel = make(chan Event, buffer)
  32. return queue
  33. }
  34. func (sq *simpleQueue) inChan() chan<- Event {
  35. return sq.eventChannel
  36. }
  37. func (sq *simpleQueue) OutChan() <-chan Event {
  38. return sq.eventChannel
  39. }
  40. // Backlog returns the length of the queue backlog
  41. func (sq *simpleQueue) Len() int {
  42. return len(sq.eventChannel)
  43. }
  44. // Next returns the next available event from the front of the queue
  45. func (sq *simpleQueue) Next() *Event {
  46. event := <-sq.eventChannel
  47. return &event
  48. }
  49. // Shutdown closes our eventChannel
  50. func (sq *simpleQueue) Shutdown() {
  51. sq.lock.Lock()
  52. sq.closed = true
  53. close(sq.eventChannel)
  54. sq.lock.Unlock()
  55. }
  56. // Shutdown closes our eventChannel
  57. func (sq *simpleQueue) Publish(event Event) {
  58. sq.lock.Lock()
  59. if !sq.closed {
  60. sq.inChan() <- event
  61. }
  62. sq.lock.Unlock()
  63. }
  64. func (iq *queue) inChan() chan<- Event {
  65. return iq.infChan.In()
  66. }
  67. func (iq *queue) OutChan() <-chan Event {
  68. return iq.infChan.Out()
  69. }
  70. // Out returns the next available event from the front of the queue
  71. func (iq *queue) Next() *Event {
  72. event := <-iq.infChan.Out()
  73. return &event
  74. }
  75. func (iq *queue) Len() int {
  76. return iq.infChan.Len()
  77. }
  78. // Shutdown closes our eventChannel
  79. func (iq *queue) Shutdown() {
  80. iq.lock.Lock()
  81. iq.closed = true
  82. iq.infChan.Close()
  83. iq.lock.Unlock()
  84. }
  85. func (iq *queue) Publish(event Event) {
  86. iq.lock.Lock()
  87. if !iq.closed {
  88. iq.inChan() <- event
  89. }
  90. iq.lock.Unlock()
  91. }