engine shutdown now puts potentially long blocking service.close()s in goroutine; contact retry more smartly handles protocolengine start in case last ACNstatus == 100 message comes first
continuous-integration/drone/pr Build is pending Details

This commit is contained in:
Dan Ballard 2023-04-27 15:16:24 -06:00
parent 70c335df81
commit 12b89966de
2 changed files with 35 additions and 22 deletions

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"math" "math"
"strconv"
"sync" "sync"
"time" "time"
) )
@ -113,6 +114,7 @@ type contactRetry struct {
breakChan chan bool breakChan chan bool
onion string onion string
lastCheck time.Time lastCheck time.Time
acnProgress int
connections sync.Map //[string]*contact connections sync.Map //[string]*contact
connCount int connCount int
@ -249,8 +251,6 @@ func (cr *contactRetry) run() {
} }
} }
} }
case event.ProtocolEngineCreated:
cr.protocolEngine = true
case event.ProtocolEngineShutdown: case event.ProtocolEngineShutdown:
cr.ACNUp = false cr.ACNUp = false
@ -265,22 +265,15 @@ func (cr *contactRetry) run() {
p.failedCount = 0 p.failedCount = 0
return true return true
}) })
case event.ProtocolEngineCreated:
cr.protocolEngine = true
cr.processStatus()
case event.ACNStatus: case event.ACNStatus:
prog := e.Data[event.Progress] progData := e.Data[event.Progress]
if !cr.protocolEngine { if prog, err := strconv.Atoi(progData); err == nil {
continue cr.acnProgress = prog
} cr.processStatus()
if prog == "100" && !cr.ACNUp {
cr.ACNUp = true
cr.ACNUpTime = time.Now()
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.failedCount = 0
return true
})
} else if prog != "100" {
cr.ACNUp = false
} }
} }
@ -294,6 +287,24 @@ func (cr *contactRetry) run() {
} }
} }
func (cr *contactRetry) processStatus() {
if !cr.protocolEngine {
cr.ACNUp = false
return
}
if cr.acnProgress == 100 && !cr.ACNUp {
cr.ACNUp = true
cr.ACNUpTime = time.Now()
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.failedCount = 0
return true
})
} else if cr.acnProgress != 100 {
cr.ACNUp = false
}
}
func (cr *contactRetry) requeueReady() { func (cr *contactRetry) requeueReady() {
if !cr.ACNUp { if !cr.ACNUp {
return return

View File

@ -342,18 +342,20 @@ func (e *engine) Shutdown() {
// don't accept any more events... // don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown() e.service.Shutdown()
e.shuttingDown.Store(true) e.shuttingDown.Store(true)
e.ephemeralServicesLock.Lock() e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock() defer e.ephemeralServicesLock.Unlock()
for _, connection := range e.ephemeralServices { for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service") log.Infof("shutting down ephemeral service")
connection.connectingLock.Lock() // work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a
connection.service.Shutdown() // goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits
connection.connectingLock.Unlock() go func() {
} connection.connectingLock.Lock()
connection.service.Shutdown()
connection.connectingLock.Unlock()
}()
}
e.queue.Shutdown() e.queue.Shutdown()
} }