diff --git a/event/common.go b/event/common.go index 9b9f975..d78d861 100644 --- a/event/common.go +++ b/event/common.go @@ -40,6 +40,7 @@ const ( // attributes GroupServer - the onion of the server to leave LeaveServer = Type("LeaveServer") + ProtocolEngineShutdown = Type("ProtocolEngineShutdown") ProtocolEngineStartListen = Type("ProtocolEngineStartListen") ProtocolEngineStopped = Type("ProtocolEngineStopped") diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 5d8771f..e16bf9e 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -787,7 +787,7 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState { func (cp *cwtchPeer) PeerWithOnion(onion string) { go func() { // wait a random number of seconds before triggering - // this cuts down on contention in the event + // this cuts down on contention in the evengit st bus randWait := time.Duration(rand2.Int() % 60) time.Sleep(randWait * time.Second) cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 0d1f8d3..864e5f6 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -94,6 +94,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager = eventManager engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) + engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue) engine.eventManager.Subscribe(event.PeerRequest, engine.queue) engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue) engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) @@ -137,6 +138,10 @@ func (e *engine) EventManager() event.Manager { func (e *engine) eventHandler() { for { ev := e.queue.Next() + // optimistic shutdown... + if e.shuttingDown { + return + } switch ev.EventType { case event.StatusRequest: e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) @@ -248,6 +253,8 @@ func (e *engine) eventHandler() { e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } } + case event.ProtocolEngineShutdown: + return default: return } @@ -297,8 +304,10 @@ func (e *engine) listenFn() { // Shutdown tears down the eventHandler goroutine func (e *engine) Shutdown() { - e.shuttingDown = true + // don't accept any more events... + e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) e.service.Shutdown() + e.shuttingDown = true e.ephemeralServicesLock.Lock() defer e.ephemeralServicesLock.Unlock()