Official peer and server implementations.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
3.5 KiB

package event
import (
// Event is a structure which binds a given set of data to an Type
type Event struct {
EventType Type
EventID string
Data map[Field]string
// NewEvent creates a new event object with a unique ID and the given type and data.
func NewEvent(eventType Type, data map[Field]string) Event {
return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
// NewEventList creates a new event object with a unique ID and the given type and data supplied in a list format and composed into a map of Type:string
func NewEventList(eventType Type, args ...interface{}) Event {
data := map[Field]string{}
for i := 0; i < len(args); i += 2 {
key, kok := args[i].(Field)
val, vok := args[i+1].(string)
if kok && vok {
data[key] = val
return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
type manager struct {
subscribers map[Type][]chan Event
events chan Event
mapMutex sync.Mutex
internal chan bool
closed bool
// Manager is an interface for an event bus
type Manager interface {
Subscribe(Type, chan Event)
// NewEventManager returns an initialized EventManager
func NewEventManager() Manager {
em := &manager{}
return em
// Initialize sets up the Manager.
func (em *manager) initialize() {
em.subscribers = make(map[Type][]chan Event) = make(chan Event)
em.internal = make(chan bool)
em.closed = false
go em.eventBus()
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
// will be sent to the eventChannel.
func (em *manager) Subscribe(eventType Type, eventChannel chan Event) {
defer em.mapMutex.Unlock()
em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel)
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *manager) Publish(event Event) {
if event.EventType != "" && em.closed != true { <- event
// Publish an event only locally, not going over an IPC bridge if there is one
func (em *manager) PublishLocal(event Event) {
// eventBus is an internal function that is used to distribute events to all subscribers
func (em *manager) eventBus() {
for {
event := <
// In the case on an empty event. Teardown the Queue
if event.EventType == "" {
// maps aren't thread safe
subscribers := em.subscribers[event.EventType]
// Send the event to any subscribers to that event type
for _, subscriber := range subscribers {
select {
case subscriber <- event:
log.Debugf("Sending %v to %v", event.EventType, subscriber)
log.Errorf("Failed to send %v to %v. The subsystem might be running too slow!", event.EventType, subscriber)
// We are about to exit the eventbus thread, fire off an event internally
em.internal <- true
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
func (em *manager) Shutdown() { <- Event{}
em.closed = true
// wait for eventBus to finish