2019-01-02 22:33:56 +00:00
package event
import (
2019-01-04 21:44:21 +00:00
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
2019-01-02 22:33:56 +00:00
"sync"
)
2019-01-04 21:44:21 +00:00
// Event is a structure which binds a given set of data to an Type
2019-01-02 22:33:56 +00:00
type Event struct {
2019-01-04 21:44:21 +00:00
EventType Type
EventID string
2019-01-21 18:47:07 +00:00
Data map [ Field ] string
2019-01-04 21:44:21 +00:00
}
// NewEvent creates a new event object with a unique ID and the given type and data.
2019-01-21 18:47:07 +00:00
func NewEvent ( eventType Type , data map [ Field ] string ) Event {
2019-01-04 21:44:21 +00:00
return Event { EventType : eventType , EventID : utils . GetRandNumber ( ) . String ( ) , Data : data }
2019-01-02 22:33:56 +00:00
}
2019-06-05 20:40:55 +00:00
// NewEventList creates a new event object with a unique ID and the given type and data supplied in a list format and composed into a map of Type:string
func NewEventList ( eventType Type , args ... interface { } ) Event {
data := map [ Field ] string { }
for i := 0 ; i < len ( args ) ; i += 2 {
key , kok := args [ i ] . ( Field )
val , vok := args [ i + 1 ] . ( string )
if kok && vok {
data [ key ] = val
}
}
return Event { EventType : eventType , EventID : utils . GetRandNumber ( ) . String ( ) , Data : data }
}
2019-01-02 22:33:56 +00:00
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
2019-06-05 20:40:55 +00:00
type manager struct {
2019-11-12 20:39:55 +00:00
subscribers map [ Type ] [ ] Queue
2019-01-02 22:33:56 +00:00
events chan Event
mapMutex sync . Mutex
internal chan bool
2019-05-15 20:12:11 +00:00
closed bool
2019-01-02 22:33:56 +00:00
}
2019-06-05 20:40:55 +00:00
// Manager is an interface for an event bus
2019-11-08 00:39:27 +00:00
// FIXME this interface lends itself to race conditions around channels
2019-06-05 20:40:55 +00:00
type Manager interface {
2019-08-14 20:56:45 +00:00
Subscribe ( Type , Queue )
2019-06-05 20:40:55 +00:00
Publish ( Event )
PublishLocal ( Event )
Shutdown ( )
}
// NewEventManager returns an initialized EventManager
func NewEventManager ( ) Manager {
em := & manager { }
em . initialize ( )
return em
}
2019-01-02 22:33:56 +00:00
// Initialize sets up the Manager.
2019-06-05 20:40:55 +00:00
func ( em * manager ) initialize ( ) {
2019-11-12 20:39:55 +00:00
em . subscribers = make ( map [ Type ] [ ] Queue )
2019-01-02 22:33:56 +00:00
em . events = make ( chan Event )
em . internal = make ( chan bool )
2019-05-15 20:12:11 +00:00
em . closed = false
2019-01-02 22:33:56 +00:00
go em . eventBus ( )
}
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
// will be sent to the eventChannel.
2019-08-14 20:56:45 +00:00
func ( em * manager ) Subscribe ( eventType Type , queue Queue ) {
2019-01-02 22:33:56 +00:00
em . mapMutex . Lock ( )
defer em . mapMutex . Unlock ( )
2019-11-12 20:39:55 +00:00
em . subscribers [ eventType ] = append ( em . subscribers [ eventType ] , queue )
2019-01-02 22:33:56 +00:00
}
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
2019-06-05 20:40:55 +00:00
func ( em * manager ) Publish ( event Event ) {
2019-05-15 20:12:11 +00:00
if event . EventType != "" && em . closed != true {
2019-01-02 22:33:56 +00:00
em . events <- event
}
}
2019-06-05 20:40:55 +00:00
// Publish an event only locally, not going over an IPC bridge if there is one
func ( em * manager ) PublishLocal ( event Event ) {
em . Publish ( event )
}
2019-01-02 22:33:56 +00:00
// eventBus is an internal function that is used to distribute events to all subscribers
2019-06-05 20:40:55 +00:00
func ( em * manager ) eventBus ( ) {
2019-01-02 22:33:56 +00:00
for {
event := <- em . events
// In the case on an empty event. Teardown the Queue
if event . EventType == "" {
break
}
// maps aren't thread safe
em . mapMutex . Lock ( )
subscribers := em . subscribers [ event . EventType ]
em . mapMutex . Unlock ( )
// Send the event to any subscribers to that event type
for _ , subscriber := range subscribers {
2019-11-12 20:39:55 +00:00
subscriber . Publish ( event )
2019-01-02 22:33:56 +00:00
}
}
// We are about to exit the eventbus thread, fire off an event internally
em . internal <- true
}
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
2019-06-05 20:40:55 +00:00
func ( em * manager ) Shutdown ( ) {
2019-01-04 21:44:21 +00:00
em . events <- Event { }
2019-05-15 20:12:11 +00:00
em . closed = true
2019-01-02 22:33:56 +00:00
// wait for eventBus to finish
<- em . internal
close ( em . events )
close ( em . internal )
}