diff --git a/event/EventQueue.go b/event/EventQueue.go new file mode 100644 index 0000000..acd2ce6 --- /dev/null +++ b/event/EventQueue.go @@ -0,0 +1,31 @@ +package event + +// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems. +// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from +// the event.Manager. +type Queue struct { + EventChannel chan Event +} + +// NewEventQueue initializes an event.Queue of the given buffer size. +func NewEventQueue(buffer int) *Queue { + queue := new(Queue) + queue.EventChannel = make(chan Event, buffer) + return queue +} + +// Backlog returns the length of the queue backlog +func (eq *Queue) Backlog() int { + return len(eq.EventChannel) +} + +// Next returns the next available event from the front of the queue +func (eq *Queue) Next() (event Event) { + event = <-eq.EventChannel + return +} + +// Shutdown closes our EventChannel +func (eq *Queue) Shutdown() { + close(eq.EventChannel) +} diff --git a/event/eventmanager.go b/event/eventmanager.go new file mode 100644 index 0000000..8521526 --- /dev/null +++ b/event/eventmanager.go @@ -0,0 +1,83 @@ +package event + +import ( + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "sync" +) + +// Event is a structure which binds a given set of data to an EventType +type Event struct { + EventType string + Data []byte +} + +// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. +type Manager struct { + subscribers map[string][]chan Event + events chan Event + mapMutex sync.Mutex + internal chan bool +} + +// Initialize sets up the Manager. +func (em *Manager) Initialize() { + em.subscribers = make(map[string][]chan Event) + em.events = make(chan Event) + em.internal = make(chan bool) + go em.eventBus() +} + +// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type +// will be sent to the eventChannel. +func (em *Manager) Subscribe(eventType string, eventChannel chan Event) { + em.mapMutex.Lock() + defer em.mapMutex.Unlock() + em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel) +} + +// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers +func (em *Manager) Publish(event Event) { + if event.EventType != "" { + em.events <- event + } +} + +// eventBus is an internal function that is used to distribute events to all subscribers +func (em *Manager) eventBus() { + for { + event := <-em.events + + // In the case on an empty event. Teardown the Queue + if event.EventType == "" { + break + } + + // maps aren't thread safe + em.mapMutex.Lock() + subscribers := em.subscribers[event.EventType] + em.mapMutex.Unlock() + + // Send the event to any subscribers to that event type + for _, subscriber := range subscribers { + select { + case subscriber <- event: + log.Debugf("Sending %v to %v", event.EventType, subscriber) + default: + log.Errorf("Failed to send %v to %v. The subsystem might be running too slow!", event.EventType, subscriber) + } + + } + } + + // We are about to exit the eventbus thread, fire off an event internally + em.internal <- true +} + +// Shutdown triggers, and waits for, the internal eventBus goroutine to finish +func (em *Manager) Shutdown() { + em.events <- Event{"", []byte{}} + // wait for eventBus to finish + <-em.internal + close(em.events) + close(em.internal) +} diff --git a/event/eventmanager_test.go b/event/eventmanager_test.go new file mode 100644 index 0000000..7d24bee --- /dev/null +++ b/event/eventmanager_test.go @@ -0,0 +1,104 @@ +package event + +import ( + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "testing" + "time" +) + +// Most basic Manager Test, Initialize, Subscribe, Publish, Receive +func TestEventManager(t *testing.T) { + eventManager := new(Manager) + eventManager.Initialize() + + // We need to make this buffer at least 1, otherwise we will log an error! + testChan := make(chan Event, 1) + eventManager.Subscribe("TEST", testChan) + eventManager.Publish(Event{EventType: "TEST", Data: []byte("Hello World")}) + + event := <-testChan + if event.EventType == "TEST" && string(event.Data) == "Hello World" { + + } else { + t.Errorf("Received Invalid Event") + } + + eventManager.Shutdown() +} + +// Most basic Manager Test, Initialize, Subscribe, Publish, Receive +func TestEventManagerOverflow(t *testing.T) { + eventManager := new(Manager) + eventManager.Initialize() + + // Explicitly setting this to 0 log an error! + testChan := make(chan Event) + eventManager.Subscribe("TEST", testChan) + eventManager.Publish(Event{EventType: "TEST", Data: []byte("Hello World")}) +} + +func TestEventManagerMultiple(t *testing.T) { + log.SetLevel(log.LevelDebug) + eventManager := new(Manager) + eventManager.Initialize() + + groupEventQueue := NewEventQueue(10) + peerEventQueue := NewEventQueue(10) + allEventQueue := NewEventQueue(10) + + + eventManager.Subscribe("PeerEvent", peerEventQueue.EventChannel) + eventManager.Subscribe("GroupEvent", groupEventQueue.EventChannel) + eventManager.Subscribe("PeerEvent", allEventQueue.EventChannel) + eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel) + eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel) + + eventManager.Publish(Event{EventType: "PeerEvent", Data: []byte("Hello World Peer")}) + eventManager.Publish(Event{EventType: "GroupEvent", Data: []byte("Hello World Group")}) + eventManager.Publish(Event{EventType: "PeerEvent", Data: []byte("Hello World Peer 2")}) + eventManager.Publish(Event{EventType: "ErrorEvent", Data: []byte("Hello World Error")}) + + assertLength := func(len int, expected int, label string) { + if len != expected { + t.Errorf("Expected %s to be %v was %v", label, expected, len) + } + } + + time.Sleep(time.Second) + + assertLength(groupEventQueue.Backlog(), 1, "Group Event Queue Length") + assertLength(peerEventQueue.Backlog(), 2, "Peer Event Queue Length") + assertLength(allEventQueue.Backlog(), 4, "All Event Queue Length") + + checkEvent := func(eventType string, expected string, label string) { + if eventType != expected { + t.Errorf("Expected %s to be %v was %v", label, expected, eventType) + } + } + + event := groupEventQueue.Next() + checkEvent(event.EventType, "GroupEvent", "First Group Event") + + event = peerEventQueue.Next() + checkEvent(event.EventType, "PeerEvent", "First Peer Event") + event = peerEventQueue.Next() + checkEvent(event.EventType, "PeerEvent", "Second Peer Event") + + event = allEventQueue.Next() + checkEvent(event.EventType, "PeerEvent", "ALL: First Peer Event") + event = allEventQueue.Next() + checkEvent(event.EventType, "GroupEvent", "ALL: First Group Event") + event = allEventQueue.Next() + checkEvent(event.EventType, "PeerEvent", "ALL: Second Peer Event") + event = allEventQueue.Next() + checkEvent(event.EventType, "ErrorEvent", "ALL: First Error Event") + + eventManager.Shutdown() + groupEventQueue.Shutdown() + peerEventQueue.Shutdown() + allEventQueue.Shutdown() + + // Reading from a closed queue should result in an instant return and an empty event + event = groupEventQueue.Next() + checkEvent(event.EventType, "", "Test Next() on Empty Queue") +} diff --git a/server/metrics/monitors.go b/server/metrics/monitors.go index d0aa1f0..47381a1 100644 --- a/server/metrics/monitors.go +++ b/server/metrics/monitors.go @@ -59,7 +59,7 @@ func (mp *Monitors) run() { func (mp *Monitors) report() { f, err := os.Create(path.Join(mp.configDir, reportFile)) if err != nil { - log.Errorf("Could not open monitor reporting file: ", err) + log.Errorf("Could not open monitor reporting file: %v", err) return } defer f.Close() diff --git a/server/serverConfig.go b/server/serverConfig.go index 4fa02e4..4d9bcca 100644 --- a/server/serverConfig.go +++ b/server/serverConfig.go @@ -50,7 +50,7 @@ func LoadConfig(configDir, filename string) Config { err = json.Unmarshal(raw, &config) if err != nil { - log.Errorf("reading config: ", err) + log.Errorf("reading config: %v", err) } } diff --git a/testing/tests.sh b/testing/tests.sh index 2ce493c..8213cfb 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -3,6 +3,7 @@ set -e pwd go test ${1} -coverprofile=model.cover.out -v ./model +go test ${1} -coverprofile=event.cover.out -v ./event go test ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/spam go test ${1} -coverprofile=storage.cover.out -v ./storage go test ${1} -coverprofile=peer.connections.cover.out -v ./peer/connections