diff --git a/event/eventQueue.go b/event/eventQueue.go index 6bb72e0..f65ec77 100644 --- a/event/eventQueue.go +++ b/event/eventQueue.go @@ -1,21 +1,27 @@ package event +import "sync" + type queue struct { infChan infiniteChannel + lock sync.Mutex + closed bool } type simpleQueue struct { eventChannel chan Event + lock sync.Mutex + closed bool } // Queue is a wrapper around a channel for handling Events in a consistent way across subsystems. // The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from // the event.Manager. type Queue interface { - InChan() chan<- Event - OutChan() <-chan Event + Publish(event Event) Next() *Event Shutdown() + OutChan() <-chan Event Len() int } @@ -32,7 +38,7 @@ func NewSimpleQueue(buffer int) Queue { return queue } -func (sq *simpleQueue) InChan() chan<- Event { +func (sq *simpleQueue) inChan() chan<- Event { return sq.eventChannel } @@ -53,10 +59,22 @@ func (sq *simpleQueue) Next() *Event { // Shutdown closes our eventChannel func (sq *simpleQueue) Shutdown() { + sq.lock.Lock() + sq.closed = true close(sq.eventChannel) + sq.lock.Unlock() } -func (iq *queue) InChan() chan<- Event { +// Shutdown closes our eventChannel +func (sq *simpleQueue) Publish(event Event) { + sq.lock.Lock() + if !sq.closed { + sq.inChan() <- event + } + sq.lock.Unlock() +} + +func (iq *queue) inChan() chan<- Event { return iq.infChan.In() } @@ -76,5 +94,17 @@ func (iq *queue) Len() int { // Shutdown closes our eventChannel func (iq *queue) Shutdown() { + iq.lock.Lock() + iq.closed = true iq.infChan.Close() + iq.lock.Unlock() + +} + +func (iq *queue) Publish(event Event) { + iq.lock.Lock() + if !iq.closed { + iq.inChan() <- event + } + iq.lock.Unlock() } diff --git a/event/eventmanager.go b/event/eventmanager.go index 2785b36..f4305ba 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -32,7 +32,7 @@ func NewEventList(eventType Type, args ...interface{}) Event { // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. type manager struct { - subscribers map[Type][]chan<- Event + subscribers map[Type][]Queue events chan Event mapMutex sync.Mutex internal chan bool @@ -57,7 +57,7 @@ func NewEventManager() Manager { // Initialize sets up the Manager. func (em *manager) initialize() { - em.subscribers = make(map[Type][]chan<- Event) + em.subscribers = make(map[Type][]Queue) em.events = make(chan Event) em.internal = make(chan bool) em.closed = false @@ -69,7 +69,7 @@ func (em *manager) initialize() { func (em *manager) Subscribe(eventType Type, queue Queue) { em.mapMutex.Lock() defer em.mapMutex.Unlock() - em.subscribers[eventType] = append(em.subscribers[eventType], queue.InChan()) + em.subscribers[eventType] = append(em.subscribers[eventType], queue) } // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers @@ -101,7 +101,7 @@ func (em *manager) eventBus() { // Send the event to any subscribers to that event type for _, subscriber := range subscribers { - subscriber <- event + subscriber.Publish(event) } } diff --git a/event/eventmanager_test.go b/event/eventmanager_test.go index 9db3db7..337082e 100644 --- a/event/eventmanager_test.go +++ b/event/eventmanager_test.go @@ -2,6 +2,7 @@ package event import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" + "sync" "testing" "time" ) @@ -12,7 +13,7 @@ func TestEventManager(t *testing.T) { // We need to make this buffer at least 1, otherwise we will log an error! testChan := make(chan Event, 1) - simpleQueue := &simpleQueue{testChan} + simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false} eventManager.Subscribe("TEST", simpleQueue) eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}}) @@ -32,7 +33,7 @@ func TestEventManagerOverflow(t *testing.T) { // Explicitly setting this to 0 log an error! testChan := make(chan Event) - simpleQueue := &simpleQueue{testChan} + simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false} eventManager.Subscribe("TEST", simpleQueue) eventManager.Publish(Event{EventType: "TEST"}) } diff --git a/event/infinitechannel.go b/event/infinitechannel.go index 630da8c..376de5a 100644 --- a/event/infinitechannel.go +++ b/event/infinitechannel.go @@ -24,7 +24,6 @@ func newInfiniteChannel() *infiniteChannel { go ch.infiniteBuffer() return ch } - func (ch *infiniteChannel) In() chan<- Event { return ch.input } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index f4ef4b2..eb8cc72 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -1,5 +1,4 @@ // Known race issue with event bus channel closure -// +build !race package storage diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 6d58163..184d095 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -1,6 +1,3 @@ -// FIXME currently we have channel related races inside the event bus which cause this to trigger. Remove once fixed -// +build !race - package testing import ( diff --git a/testing/tests.sh b/testing/tests.sh index 52ff8f8..a5d571b 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -5,7 +5,7 @@ pwd GORACE="haltonerror=1" go test -race ${1} -coverprofile=model.cover.out -v ./model go test -race ${1} -coverprofile=event.cover.out -v ./event -go test ${1} -coverprofile=storage.cover.out -v ./storage +go test -race ${1} -coverprofile=storage.cover.out -v ./storage go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections go test -race ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam go test -race ${1} -coverprofile=peer.fetch.cover.out -v ./protocol/connections/fetch