forked from cwtch.im/cwtch
Merge branch 'racefuzz' of cwtch.im/cwtch into master
This commit is contained in:
commit
057b695a22
|
@ -1,21 +1,27 @@
|
|||
package event
|
||||
|
||||
import "sync"
|
||||
|
||||
type queue struct {
|
||||
infChan infiniteChannel
|
||||
lock sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
type simpleQueue struct {
|
||||
eventChannel chan Event
|
||||
lock sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
|
||||
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
|
||||
// the event.Manager.
|
||||
type Queue interface {
|
||||
InChan() chan<- Event
|
||||
OutChan() <-chan Event
|
||||
Publish(event Event)
|
||||
Next() *Event
|
||||
Shutdown()
|
||||
OutChan() <-chan Event
|
||||
Len() int
|
||||
}
|
||||
|
||||
|
@ -32,7 +38,7 @@ func NewSimpleQueue(buffer int) Queue {
|
|||
return queue
|
||||
}
|
||||
|
||||
func (sq *simpleQueue) InChan() chan<- Event {
|
||||
func (sq *simpleQueue) inChan() chan<- Event {
|
||||
return sq.eventChannel
|
||||
}
|
||||
|
||||
|
@ -53,10 +59,22 @@ func (sq *simpleQueue) Next() *Event {
|
|||
|
||||
// Shutdown closes our eventChannel
|
||||
func (sq *simpleQueue) Shutdown() {
|
||||
sq.lock.Lock()
|
||||
sq.closed = true
|
||||
close(sq.eventChannel)
|
||||
sq.lock.Unlock()
|
||||
}
|
||||
|
||||
func (iq *queue) InChan() chan<- Event {
|
||||
// Shutdown closes our eventChannel
|
||||
func (sq *simpleQueue) Publish(event Event) {
|
||||
sq.lock.Lock()
|
||||
if !sq.closed {
|
||||
sq.inChan() <- event
|
||||
}
|
||||
sq.lock.Unlock()
|
||||
}
|
||||
|
||||
func (iq *queue) inChan() chan<- Event {
|
||||
return iq.infChan.In()
|
||||
}
|
||||
|
||||
|
@ -76,5 +94,17 @@ func (iq *queue) Len() int {
|
|||
|
||||
// Shutdown closes our eventChannel
|
||||
func (iq *queue) Shutdown() {
|
||||
iq.lock.Lock()
|
||||
iq.closed = true
|
||||
iq.infChan.Close()
|
||||
iq.lock.Unlock()
|
||||
|
||||
}
|
||||
|
||||
func (iq *queue) Publish(event Event) {
|
||||
iq.lock.Lock()
|
||||
if !iq.closed {
|
||||
iq.inChan() <- event
|
||||
}
|
||||
iq.lock.Unlock()
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ func NewEventList(eventType Type, args ...interface{}) Event {
|
|||
|
||||
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
|
||||
type manager struct {
|
||||
subscribers map[Type][]chan<- Event
|
||||
subscribers map[Type][]Queue
|
||||
events chan Event
|
||||
mapMutex sync.Mutex
|
||||
internal chan bool
|
||||
|
@ -57,7 +57,7 @@ func NewEventManager() Manager {
|
|||
|
||||
// Initialize sets up the Manager.
|
||||
func (em *manager) initialize() {
|
||||
em.subscribers = make(map[Type][]chan<- Event)
|
||||
em.subscribers = make(map[Type][]Queue)
|
||||
em.events = make(chan Event)
|
||||
em.internal = make(chan bool)
|
||||
em.closed = false
|
||||
|
@ -69,7 +69,7 @@ func (em *manager) initialize() {
|
|||
func (em *manager) Subscribe(eventType Type, queue Queue) {
|
||||
em.mapMutex.Lock()
|
||||
defer em.mapMutex.Unlock()
|
||||
em.subscribers[eventType] = append(em.subscribers[eventType], queue.InChan())
|
||||
em.subscribers[eventType] = append(em.subscribers[eventType], queue)
|
||||
}
|
||||
|
||||
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
|
||||
|
@ -101,7 +101,7 @@ func (em *manager) eventBus() {
|
|||
|
||||
// Send the event to any subscribers to that event type
|
||||
for _, subscriber := range subscribers {
|
||||
subscriber <- event
|
||||
subscriber.Publish(event)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package event
|
|||
|
||||
import (
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
@ -12,7 +13,7 @@ func TestEventManager(t *testing.T) {
|
|||
|
||||
// We need to make this buffer at least 1, otherwise we will log an error!
|
||||
testChan := make(chan Event, 1)
|
||||
simpleQueue := &simpleQueue{testChan}
|
||||
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
|
||||
eventManager.Subscribe("TEST", simpleQueue)
|
||||
eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}})
|
||||
|
||||
|
@ -32,7 +33,7 @@ func TestEventManagerOverflow(t *testing.T) {
|
|||
|
||||
// Explicitly setting this to 0 log an error!
|
||||
testChan := make(chan Event)
|
||||
simpleQueue := &simpleQueue{testChan}
|
||||
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
|
||||
eventManager.Subscribe("TEST", simpleQueue)
|
||||
eventManager.Publish(Event{EventType: "TEST"})
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ func newInfiniteChannel() *infiniteChannel {
|
|||
go ch.infiniteBuffer()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (ch *infiniteChannel) In() chan<- Event {
|
||||
return ch.input
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
// Known race issue with event bus channel closure
|
||||
// +build !race
|
||||
|
||||
package storage
|
||||
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
// FIXME currently we have channel related races inside the event bus which cause this to trigger. Remove once fixed
|
||||
// +build !race
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
|
|
|
@ -5,7 +5,7 @@ pwd
|
|||
GORACE="haltonerror=1"
|
||||
go test -race ${1} -coverprofile=model.cover.out -v ./model
|
||||
go test -race ${1} -coverprofile=event.cover.out -v ./event
|
||||
go test ${1} -coverprofile=storage.cover.out -v ./storage
|
||||
go test -race ${1} -coverprofile=storage.cover.out -v ./storage
|
||||
go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
|
||||
go test -race ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam
|
||||
go test -race ${1} -coverprofile=peer.fetch.cover.out -v ./protocol/connections/fetch
|
||||
|
|
Loading…
Reference in New Issue