Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
3.3KB

  1. package event
  2. import (
  3. "git.openprivacy.ca/openprivacy/libricochet-go/utils"
  4. "sync"
  5. )
  6. // Event is a structure which binds a given set of data to an Type
  7. type Event struct {
  8. EventType Type
  9. EventID string
  10. Data map[Field]string
  11. }
  12. // NewEvent creates a new event object with a unique ID and the given type and data.
  13. func NewEvent(eventType Type, data map[Field]string) Event {
  14. return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
  15. }
  16. // NewEventList creates a new event object with a unique ID and the given type and data supplied in a list format and composed into a map of Type:string
  17. func NewEventList(eventType Type, args ...interface{}) Event {
  18. data := map[Field]string{}
  19. for i := 0; i < len(args); i += 2 {
  20. key, kok := args[i].(Field)
  21. val, vok := args[i+1].(string)
  22. if kok && vok {
  23. data[key] = val
  24. }
  25. }
  26. return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
  27. }
  28. // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
  29. type manager struct {
  30. subscribers map[Type][]Queue
  31. events chan Event
  32. mapMutex sync.Mutex
  33. internal chan bool
  34. closed bool
  35. }
  36. // Manager is an interface for an event bus
  37. // FIXME this interface lends itself to race conditions around channels
  38. type Manager interface {
  39. Subscribe(Type, Queue)
  40. Publish(Event)
  41. PublishLocal(Event)
  42. Shutdown()
  43. }
  44. // NewEventManager returns an initialized EventManager
  45. func NewEventManager() Manager {
  46. em := &manager{}
  47. em.initialize()
  48. return em
  49. }
  50. // Initialize sets up the Manager.
  51. func (em *manager) initialize() {
  52. em.subscribers = make(map[Type][]Queue)
  53. em.events = make(chan Event)
  54. em.internal = make(chan bool)
  55. em.closed = false
  56. go em.eventBus()
  57. }
  58. // Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
  59. // will be sent to the eventChannel.
  60. func (em *manager) Subscribe(eventType Type, queue Queue) {
  61. em.mapMutex.Lock()
  62. defer em.mapMutex.Unlock()
  63. em.subscribers[eventType] = append(em.subscribers[eventType], queue)
  64. }
  65. // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
  66. func (em *manager) Publish(event Event) {
  67. if event.EventType != "" && em.closed != true {
  68. em.events <- event
  69. }
  70. }
  71. // Publish an event only locally, not going over an IPC bridge if there is one
  72. func (em *manager) PublishLocal(event Event) {
  73. em.Publish(event)
  74. }
  75. // eventBus is an internal function that is used to distribute events to all subscribers
  76. func (em *manager) eventBus() {
  77. for {
  78. event := <-em.events
  79. // In the case on an empty event. Teardown the Queue
  80. if event.EventType == "" {
  81. break
  82. }
  83. // maps aren't thread safe
  84. em.mapMutex.Lock()
  85. subscribers := em.subscribers[event.EventType]
  86. em.mapMutex.Unlock()
  87. // Send the event to any subscribers to that event type
  88. for _, subscriber := range subscribers {
  89. subscriber.Publish(event)
  90. }
  91. }
  92. // We are about to exit the eventbus thread, fire off an event internally
  93. em.internal <- true
  94. }
  95. // Shutdown triggers, and waits for, the internal eventBus goroutine to finish
  96. func (em *manager) Shutdown() {
  97. em.events <- Event{}
  98. em.closed = true
  99. // wait for eventBus to finish
  100. <-em.internal
  101. close(em.events)
  102. close(em.internal)
  103. }