Browse Source

Fixing Data Races in Event Bus

pull/290/head
Sarah Jamie Lewis 3 weeks ago
parent
commit
a7fd359233

+ 34
- 4
event/eventQueue.go View File

@@ -1,21 +1,27 @@
package event

import "sync"

type queue struct {
infChan infiniteChannel
lock sync.Mutex
closed bool
}

type simpleQueue struct {
eventChannel chan Event
lock sync.Mutex
closed bool
}

// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
// the event.Manager.
type Queue interface {
InChan() chan<- Event
OutChan() <-chan Event
Publish(event Event)
Next() *Event
Shutdown()
OutChan() <-chan Event
Len() int
}

@@ -32,7 +38,7 @@ func NewSimpleQueue(buffer int) Queue {
return queue
}

func (sq *simpleQueue) InChan() chan<- Event {
func (sq *simpleQueue) inChan() chan<- Event {
return sq.eventChannel
}

@@ -53,10 +59,22 @@ func (sq *simpleQueue) Next() *Event {

// Shutdown closes our eventChannel
func (sq *simpleQueue) Shutdown() {
sq.lock.Lock()
sq.closed = true
close(sq.eventChannel)
sq.lock.Unlock()
}

func (iq *queue) InChan() chan<- Event {
// Shutdown closes our eventChannel
func (sq *simpleQueue) Publish(event Event) {
sq.lock.Lock()
if !sq.closed {
sq.inChan() <- event
}
sq.lock.Unlock()
}

func (iq *queue) inChan() chan<- Event {
return iq.infChan.In()
}

@@ -76,5 +94,17 @@ func (iq *queue) Len() int {

// Shutdown closes our eventChannel
func (iq *queue) Shutdown() {
iq.lock.Lock()
iq.closed = true
iq.infChan.Close()
iq.lock.Unlock()

}

func (iq *queue) Publish(event Event) {
iq.lock.Lock()
if !iq.closed {
iq.inChan() <- event
}
iq.lock.Unlock()
}

+ 4
- 4
event/eventmanager.go View File

@@ -32,7 +32,7 @@ func NewEventList(eventType Type, args ...interface{}) Event {

// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
type manager struct {
subscribers map[Type][]chan<- Event
subscribers map[Type][]Queue
events chan Event
mapMutex sync.Mutex
internal chan bool
@@ -57,7 +57,7 @@ func NewEventManager() Manager {

// Initialize sets up the Manager.
func (em *manager) initialize() {
em.subscribers = make(map[Type][]chan<- Event)
em.subscribers = make(map[Type][]Queue)
em.events = make(chan Event)
em.internal = make(chan bool)
em.closed = false
@@ -69,7 +69,7 @@ func (em *manager) initialize() {
func (em *manager) Subscribe(eventType Type, queue Queue) {
em.mapMutex.Lock()
defer em.mapMutex.Unlock()
em.subscribers[eventType] = append(em.subscribers[eventType], queue.InChan())
em.subscribers[eventType] = append(em.subscribers[eventType], queue)
}

// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
@@ -101,7 +101,7 @@ func (em *manager) eventBus() {

// Send the event to any subscribers to that event type
for _, subscriber := range subscribers {
subscriber <- event
subscriber.Publish(event)
}
}


+ 3
- 2
event/eventmanager_test.go View File

@@ -2,6 +2,7 @@ package event

import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"sync"
"testing"
"time"
)
@@ -12,7 +13,7 @@ func TestEventManager(t *testing.T) {

// We need to make this buffer at least 1, otherwise we will log an error!
testChan := make(chan Event, 1)
simpleQueue := &simpleQueue{testChan}
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
eventManager.Subscribe("TEST", simpleQueue)
eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}})

@@ -32,7 +33,7 @@ func TestEventManagerOverflow(t *testing.T) {

// Explicitly setting this to 0 log an error!
testChan := make(chan Event)
simpleQueue := &simpleQueue{testChan}
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
eventManager.Subscribe("TEST", simpleQueue)
eventManager.Publish(Event{EventType: "TEST"})
}

+ 0
- 1
event/infinitechannel.go View File

@@ -24,7 +24,6 @@ func newInfiniteChannel() *infiniteChannel {
go ch.infiniteBuffer()
return ch
}

func (ch *infiniteChannel) In() chan<- Event {
return ch.input
}

+ 0
- 1
storage/profile_store_test.go View File

@@ -1,5 +1,4 @@
// Known race issue with event bus channel closure
// +build !race

package storage


+ 0
- 3
testing/cwtch_peer_server_integration_test.go View File

@@ -1,6 +1,3 @@
// FIXME currently we have channel related races inside the event bus which cause this to trigger. Remove once fixed
// +build !race

package testing

import (

+ 1
- 1
testing/tests.sh View File

@@ -5,7 +5,7 @@ pwd
GORACE="haltonerror=1"
go test -race ${1} -coverprofile=model.cover.out -v ./model
go test -race ${1} -coverprofile=event.cover.out -v ./event
go test ${1} -coverprofile=storage.cover.out -v ./storage
go test -race ${1} -coverprofile=storage.cover.out -v ./storage
go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
go test -race ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam
go test -race ${1} -coverprofile=peer.fetch.cover.out -v ./protocol/connections/fetch

Loading…
Cancel
Save