2019-01-02 22:33:56 +00:00
package event
import (
2020-09-28 22:07:22 +00:00
"crypto/rand"
2021-03-24 21:42:15 +00:00
"encoding/json"
2021-04-28 19:47:55 +00:00
"fmt"
2021-03-24 21:42:15 +00:00
"git.openprivacy.ca/openprivacy/log"
2020-09-28 22:07:22 +00:00
"math"
"math/big"
2021-04-28 19:47:55 +00:00
"os"
"runtime"
"strings"
2019-01-02 22:33:56 +00:00
"sync"
)
2021-03-24 20:16:50 +00:00
// Event is the core struct type passed around between various subsystems. Events consist of a type which can be
2021-03-24 20:09:07 +00:00
// filtered on, an event ID for tracing and a map of Fields to string values.
2019-01-02 22:33:56 +00:00
type Event struct {
2019-01-04 21:44:21 +00:00
EventType Type
EventID string
2019-01-21 18:47:07 +00:00
Data map [ Field ] string
2019-01-04 21:44:21 +00:00
}
2020-09-28 22:07:22 +00:00
// GetRandNumber is a helper function which returns a random integer, this is
// currently mostly used to generate messageids
func GetRandNumber ( ) * big . Int {
num , err := rand . Int ( rand . Reader , big . NewInt ( math . MaxUint32 ) )
// If we can't generate random numbers then panicking is probably
// the best option.
if err != nil {
panic ( err . Error ( ) )
}
return num
}
2019-01-04 21:44:21 +00:00
// NewEvent creates a new event object with a unique ID and the given type and data.
2019-01-21 18:47:07 +00:00
func NewEvent ( eventType Type , data map [ Field ] string ) Event {
2020-09-28 22:07:22 +00:00
return Event { EventType : eventType , EventID : GetRandNumber ( ) . String ( ) , Data : data }
2019-01-02 22:33:56 +00:00
}
2019-06-05 20:40:55 +00:00
// NewEventList creates a new event object with a unique ID and the given type and data supplied in a list format and composed into a map of Type:string
func NewEventList ( eventType Type , args ... interface { } ) Event {
data := map [ Field ] string { }
for i := 0 ; i < len ( args ) ; i += 2 {
key , kok := args [ i ] . ( Field )
val , vok := args [ i + 1 ] . ( string )
if kok && vok {
data [ key ] = val
}
}
2020-09-28 22:07:22 +00:00
return Event { EventType : eventType , EventID : GetRandNumber ( ) . String ( ) , Data : data }
2019-06-05 20:40:55 +00:00
}
2019-01-02 22:33:56 +00:00
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
2019-06-05 20:40:55 +00:00
type manager struct {
2019-11-12 20:39:55 +00:00
subscribers map [ Type ] [ ] Queue
2021-03-24 21:42:15 +00:00
events chan [ ] byte
2019-01-02 22:33:56 +00:00
mapMutex sync . Mutex
2022-03-23 21:57:18 +00:00
chanMutex sync . Mutex
2019-01-02 22:33:56 +00:00
internal chan bool
2019-05-15 20:12:11 +00:00
closed bool
2021-05-03 23:32:48 +00:00
trace bool
2019-01-02 22:33:56 +00:00
}
2019-06-05 20:40:55 +00:00
// Manager is an interface for an event bus
type Manager interface {
2019-08-14 20:56:45 +00:00
Subscribe ( Type , Queue )
2019-06-05 20:40:55 +00:00
Publish ( Event )
Shutdown ( )
}
// NewEventManager returns an initialized EventManager
func NewEventManager ( ) Manager {
em := & manager { }
em . initialize ( )
return em
}
2019-01-02 22:33:56 +00:00
// Initialize sets up the Manager.
2019-06-05 20:40:55 +00:00
func ( em * manager ) initialize ( ) {
2019-11-12 20:39:55 +00:00
em . subscribers = make ( map [ Type ] [ ] Queue )
2021-03-24 21:42:15 +00:00
em . events = make ( chan [ ] byte )
2019-01-02 22:33:56 +00:00
em . internal = make ( chan bool )
2019-05-15 20:12:11 +00:00
em . closed = false
2021-04-28 19:47:55 +00:00
_ , em . trace = os . LookupEnv ( "CWTCH_EVENT_SOURCE" )
2019-01-02 22:33:56 +00:00
go em . eventBus ( )
}
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
// will be sent to the eventChannel.
2019-08-14 20:56:45 +00:00
func ( em * manager ) Subscribe ( eventType Type , queue Queue ) {
2019-01-02 22:33:56 +00:00
em . mapMutex . Lock ( )
defer em . mapMutex . Unlock ( )
2019-11-12 20:39:55 +00:00
em . subscribers [ eventType ] = append ( em . subscribers [ eventType ] , queue )
2019-01-02 22:33:56 +00:00
}
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
2019-06-05 20:40:55 +00:00
func ( em * manager ) Publish ( event Event ) {
2022-03-23 21:57:18 +00:00
em . chanMutex . Lock ( )
defer em . chanMutex . Unlock ( )
2021-06-02 18:34:57 +00:00
if event . EventType != "" && ! em . closed {
2021-04-28 19:47:55 +00:00
// Debug Events for Tracing, locked behind an environment variable
// for now.
if em . trace {
pc , _ , _ , _ := runtime . Caller ( 1 )
funcName := runtime . FuncForPC ( pc ) . Name ( )
lastSlash := strings . LastIndexByte ( funcName , '/' )
if lastSlash < 0 {
lastSlash = 0
}
lastDot := strings . LastIndexByte ( funcName [ lastSlash : ] , '.' ) + lastSlash
event . Data [ Source ] = fmt . Sprintf ( "%v.%v" , funcName [ : lastDot ] , funcName [ lastDot + 1 : ] )
}
2021-03-24 21:42:15 +00:00
// Deep Copy the Event...
2021-04-09 01:22:08 +00:00
eventJSON , err := json . Marshal ( event )
2021-03-24 21:42:15 +00:00
if err != nil {
log . Errorf ( "Error serializing event: %v" , event )
}
em . events <- eventJSON
2019-01-02 22:33:56 +00:00
}
}
// eventBus is an internal function that is used to distribute events to all subscribers
2019-06-05 20:40:55 +00:00
func ( em * manager ) eventBus ( ) {
2019-01-02 22:33:56 +00:00
for {
2021-03-24 21:42:15 +00:00
eventJSON := <- em . events
2019-01-02 22:33:56 +00:00
// In the case on an empty event. Teardown the Queue
2021-03-24 21:42:15 +00:00
if len ( eventJSON ) == 0 {
log . Errorf ( "Received zero length event" )
2019-01-02 22:33:56 +00:00
break
}
2021-03-24 21:42:15 +00:00
var event Event
err := json . Unmarshal ( eventJSON , & event )
2021-04-09 01:22:08 +00:00
if err != nil {
2021-03-24 21:42:15 +00:00
log . Errorf ( "Error on Deep Copy: %v %v" , eventJSON , err )
}
2019-01-02 22:33:56 +00:00
// maps aren't thread safe
em . mapMutex . Lock ( )
subscribers := em . subscribers [ event . EventType ]
em . mapMutex . Unlock ( )
// Send the event to any subscribers to that event type
for _ , subscriber := range subscribers {
2021-03-24 21:42:15 +00:00
// Deep Copy for Each Subscriber
var eventCopy Event
json . Unmarshal ( eventJSON , & eventCopy )
subscriber . Publish ( eventCopy )
2019-01-02 22:33:56 +00:00
}
}
// We are about to exit the eventbus thread, fire off an event internally
em . internal <- true
}
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
2019-06-05 20:40:55 +00:00
func ( em * manager ) Shutdown ( ) {
2021-03-24 21:42:15 +00:00
em . events <- [ ] byte { }
2022-03-23 21:57:18 +00:00
em . chanMutex . Lock ( )
2019-05-15 20:12:11 +00:00
em . closed = true
2022-03-23 21:57:18 +00:00
em . chanMutex . Unlock ( )
2019-01-02 22:33:56 +00:00
// wait for eventBus to finish
<- em . internal
close ( em . events )
close ( em . internal )
}