Fixup ProtocolEngine Shutdown
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2022-09-10 11:54:45 -07:00
parent 35ca930628
commit 0b72a90b1f
3 changed files with 12 additions and 2 deletions

View File

@ -40,6 +40,7 @@ const (
// attributes GroupServer - the onion of the server to leave
LeaveServer = Type("LeaveServer")
ProtocolEngineShutdown = Type("ProtocolEngineShutdown")
ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
ProtocolEngineStopped = Type("ProtocolEngineStopped")

View File

@ -787,7 +787,7 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
func (cp *cwtchPeer) PeerWithOnion(onion string) {
go func() {
// wait a random number of seconds before triggering
// this cuts down on contention in the event
// this cuts down on contention in the evengit st bus
randWait := time.Duration(rand2.Int() % 60)
time.Sleep(randWait * time.Second)
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))

View File

@ -94,6 +94,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager = eventManager
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
@ -137,6 +138,10 @@ func (e *engine) EventManager() event.Manager {
func (e *engine) eventHandler() {
for {
ev := e.queue.Next()
// optimistic shutdown...
if e.shuttingDown {
return
}
switch ev.EventType {
case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
@ -248,6 +253,8 @@ func (e *engine) eventHandler() {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
}
case event.ProtocolEngineShutdown:
return
default:
return
}
@ -297,8 +304,10 @@ func (e *engine) listenFn() {
// Shutdown tears down the eventHandler goroutine
func (e *engine) Shutdown() {
e.shuttingDown = true
// don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown()
e.shuttingDown = true
e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock()