Merge pull request 'Fixup ProtocolEngine Shutdown' (#461) from protocl_engine_shutdown_fix into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #461
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2022-09-10 19:08:27 +00:00
commit 0e10b47c42
3 changed files with 12 additions and 2 deletions

View File

@ -40,6 +40,7 @@ const (
// attributes GroupServer - the onion of the server to leave // attributes GroupServer - the onion of the server to leave
LeaveServer = Type("LeaveServer") LeaveServer = Type("LeaveServer")
ProtocolEngineShutdown = Type("ProtocolEngineShutdown")
ProtocolEngineStartListen = Type("ProtocolEngineStartListen") ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
ProtocolEngineStopped = Type("ProtocolEngineStopped") ProtocolEngineStopped = Type("ProtocolEngineStopped")

View File

@ -787,7 +787,7 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
func (cp *cwtchPeer) PeerWithOnion(onion string) { func (cp *cwtchPeer) PeerWithOnion(onion string) {
go func() { go func() {
// wait a random number of seconds before triggering // wait a random number of seconds before triggering
// this cuts down on contention in the event // this cuts down on contention in the evengit st bus
randWait := time.Duration(rand2.Int() % 60) randWait := time.Duration(rand2.Int() % 60)
time.Sleep(randWait * time.Second) time.Sleep(randWait * time.Second)
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))

View File

@ -94,6 +94,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager = eventManager engine.eventManager = eventManager
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue) engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue) engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
@ -137,6 +138,10 @@ func (e *engine) EventManager() event.Manager {
func (e *engine) eventHandler() { func (e *engine) eventHandler() {
for { for {
ev := e.queue.Next() ev := e.queue.Next()
// optimistic shutdown...
if e.shuttingDown {
return
}
switch ev.EventType { switch ev.EventType {
case event.StatusRequest: case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
@ -248,6 +253,8 @@ func (e *engine) eventHandler() {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
} }
} }
case event.ProtocolEngineShutdown:
return
default: default:
return return
} }
@ -297,8 +304,10 @@ func (e *engine) listenFn() {
// Shutdown tears down the eventHandler goroutine // Shutdown tears down the eventHandler goroutine
func (e *engine) Shutdown() { func (e *engine) Shutdown() {
e.shuttingDown = true // don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown() e.service.Shutdown()
e.shuttingDown = true
e.ephemeralServicesLock.Lock() e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock() defer e.ephemeralServicesLock.Unlock()