package event import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" "sync" ) // Event is a structure which binds a given set of data to an EventType type Event struct { EventType string Data []byte } // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. type Manager struct { subscribers map[string][]chan Event events chan Event mapMutex sync.Mutex internal chan bool } // Initialize sets up the Manager. func (em *Manager) Initialize() { em.subscribers = make(map[string][]chan Event) em.events = make(chan Event) em.internal = make(chan bool) go em.eventBus() } // Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type // will be sent to the eventChannel. func (em *Manager) Subscribe(eventType string, eventChannel chan Event) { em.mapMutex.Lock() defer em.mapMutex.Unlock() em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel) } // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers func (em *Manager) Publish(event Event) { if event.EventType != "" { em.events <- event } } // eventBus is an internal function that is used to distribute events to all subscribers func (em *Manager) eventBus() { for { event := <-em.events // In the case on an empty event. Teardown the Queue if event.EventType == "" { break } // maps aren't thread safe em.mapMutex.Lock() subscribers := em.subscribers[event.EventType] em.mapMutex.Unlock() // Send the event to any subscribers to that event type for _, subscriber := range subscribers { select { case subscriber <- event: log.Debugf("Sending %v to %v", event.EventType, subscriber) default: log.Errorf("Failed to send %v to %v. The subsystem might be running too slow!", event.EventType, subscriber) } } } // We are about to exit the eventbus thread, fire off an event internally em.internal <- true } // Shutdown triggers, and waits for, the internal eventBus goroutine to finish func (em *Manager) Shutdown() { em.events <- Event{"", []byte{}} // wait for eventBus to finish <-em.internal close(em.events) close(em.internal) }