adding a few cases of syncmap, removing unused mutex #248

Closed
dan wants to merge 1 commits from dan:syncmap into master
3 changed files with 15 additions and 12 deletions

View File

@ -33,9 +33,8 @@ func NewEventList(eventType Type, args ...interface{}) Event {
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
type manager struct { type manager struct {
subscribers map[Type][]chan Event subscribers sync.Map //[Type][]chan Event
events chan Event events chan Event
mapMutex sync.Mutex
internal chan bool internal chan bool
closed bool closed bool
} }
@ -57,7 +56,7 @@ func NewEventManager() Manager {
// Initialize sets up the Manager. // Initialize sets up the Manager.
func (em *manager) initialize() { func (em *manager) initialize() {
em.subscribers = make(map[Type][]chan Event) em.subscribers = sync.Map{}
em.events = make(chan Event) em.events = make(chan Event)
em.internal = make(chan bool) em.internal = make(chan bool)
em.closed = false em.closed = false
@ -67,9 +66,12 @@ func (em *manager) initialize() {
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type // Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
// will be sent to the eventChannel. // will be sent to the eventChannel.
func (em *manager) Subscribe(eventType Type, eventChannel chan Event) { func (em *manager) Subscribe(eventType Type, eventChannel chan Event) {
em.mapMutex.Lock() result, ok := em.subscribers.Load(eventType)
defer em.mapMutex.Unlock() if !ok {
em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel) em.subscribers.Store(eventType, []chan Event{eventChannel})
} else {
em.subscribers.Store(eventType, append(result.([]chan Event), eventChannel))
}
} }
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
@ -94,10 +96,11 @@ func (em *manager) eventBus() {
break break
} }
// maps aren't thread safe result, exists := em.subscribers.Load(event.EventType)
em.mapMutex.Lock() if !exists {
subscribers := em.subscribers[event.EventType] continue
em.mapMutex.Unlock() }
subscribers := result.([]chan Event)
// Send the event to any subscribers to that event type // Send the event to any subscribers to that event type
for _, subscriber := range subscribers { for _, subscriber := range subscribers {

View File

@ -17,6 +17,8 @@ import (
"time" "time"
) )
// ****** Since we have a "live" 0.1 in the wild, we have to be careful of type changes breaking saving and loading *****
// PublicProfile is a local copy of a CwtchIdentity // PublicProfile is a local copy of a CwtchIdentity
type PublicProfile struct { type PublicProfile struct {
Name string Name string

View File

@ -13,14 +13,12 @@ import (
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"strings" "strings"
"sync"
"time" "time"
) )
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct { type cwtchPeer struct {
Profile *model.Profile Profile *model.Profile
mutex sync.Mutex
shutdown bool shutdown bool
started bool started bool