adding a few cases of syncmap, removing unused mutex #248
|
@ -33,9 +33,8 @@ func NewEventList(eventType Type, args ...interface{}) Event {
|
||||||
|
|
||||||
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
|
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
|
||||||
type manager struct {
|
type manager struct {
|
||||||
subscribers map[Type][]chan Event
|
subscribers sync.Map //[Type][]chan Event
|
||||||
events chan Event
|
events chan Event
|
||||||
mapMutex sync.Mutex
|
|
||||||
internal chan bool
|
internal chan bool
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
@ -57,7 +56,7 @@ func NewEventManager() Manager {
|
||||||
|
|
||||||
// Initialize sets up the Manager.
|
// Initialize sets up the Manager.
|
||||||
func (em *manager) initialize() {
|
func (em *manager) initialize() {
|
||||||
em.subscribers = make(map[Type][]chan Event)
|
em.subscribers = sync.Map{}
|
||||||
em.events = make(chan Event)
|
em.events = make(chan Event)
|
||||||
em.internal = make(chan bool)
|
em.internal = make(chan bool)
|
||||||
em.closed = false
|
em.closed = false
|
||||||
|
@ -67,9 +66,12 @@ func (em *manager) initialize() {
|
||||||
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
|
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
|
||||||
// will be sent to the eventChannel.
|
// will be sent to the eventChannel.
|
||||||
func (em *manager) Subscribe(eventType Type, eventChannel chan Event) {
|
func (em *manager) Subscribe(eventType Type, eventChannel chan Event) {
|
||||||
em.mapMutex.Lock()
|
result, ok := em.subscribers.Load(eventType)
|
||||||
defer em.mapMutex.Unlock()
|
if !ok {
|
||||||
em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel)
|
em.subscribers.Store(eventType, []chan Event{eventChannel})
|
||||||
|
} else {
|
||||||
|
em.subscribers.Store(eventType, append(result.([]chan Event), eventChannel))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
|
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
|
||||||
|
@ -94,10 +96,11 @@ func (em *manager) eventBus() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// maps aren't thread safe
|
result, exists := em.subscribers.Load(event.EventType)
|
||||||
em.mapMutex.Lock()
|
if !exists {
|
||||||
subscribers := em.subscribers[event.EventType]
|
continue
|
||||||
em.mapMutex.Unlock()
|
}
|
||||||
|
subscribers := result.([]chan Event)
|
||||||
|
|
||||||
// Send the event to any subscribers to that event type
|
// Send the event to any subscribers to that event type
|
||||||
for _, subscriber := range subscribers {
|
for _, subscriber := range subscribers {
|
||||||
|
|
|
@ -17,6 +17,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ****** Since we have a "live" 0.1 in the wild, we have to be careful of type changes breaking saving and loading *****
|
||||||
|
|
||||||
// PublicProfile is a local copy of a CwtchIdentity
|
// PublicProfile is a local copy of a CwtchIdentity
|
||||||
type PublicProfile struct {
|
type PublicProfile struct {
|
||||||
Name string
|
Name string
|
||||||
|
|
|
@ -13,14 +13,12 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
||||||
type cwtchPeer struct {
|
type cwtchPeer struct {
|
||||||
Profile *model.Profile
|
Profile *model.Profile
|
||||||
mutex sync.Mutex
|
|
||||||
shutdown bool
|
shutdown bool
|
||||||
started bool
|
started bool
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue