better handle acn/engine isntability #512

Merged
dan merged 4 commits from handleStatus into master 2023-05-01 21:47:57 +00:00
4 changed files with 89 additions and 33 deletions

View File

@ -24,13 +24,13 @@ type application struct {
eventBuses map[string]event.Manager
directory string
peerLock sync.Mutex
peers map[string]peer.CwtchPeer
acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin
peers map[string]peer.CwtchPeer
acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine
appBus event.Manager
eventQueue event.Queue
appmutex sync.Mutex
engineHooks connections.EngineHooks
@ -97,7 +97,7 @@ func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings, eventQueue: event.NewQueue()}
app.peers = make(map[string]peer.CwtchPeer)
app.engineHooks = connections.DefaultEngineHooks{}
app.acn = acn
@ -107,6 +107,9 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
prog, status := acn.GetBootstrapStatus()
statusHandler(prog, status)
app.GetPrimaryBus().Subscribe(event.ACNStatus, app.eventQueue)
go app.eventHandler()
return app
}
@ -128,9 +131,6 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
defer app.appmutex.Unlock()
app.settings.WriteGlobalSettings(settings)
// we now need to propagate changes to all peers
app.peerLock.Lock()
defer app.peerLock.Unlock()
for _, profile := range app.peers {
profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments)
@ -150,8 +150,8 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
func (app *application) ListProfiles() []string {
var keys []string
app.peerLock.Lock()
defer app.peerLock.Unlock()
app.appmutex.Lock()
defer app.appmutex.Unlock()
for handle := range app.peers {
keys = append(keys, handle)
}
@ -474,6 +474,48 @@ func (app *application) QueryACNVersion() {
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
}
func (app *application) eventHandler() {
acnStatus := -1
for {
e := app.eventQueue.Next()
switch e.EventType {
case event.ACNStatus:
Review

this should be a struct level member,

this should be a struct level member,
newAcnStatus, err := strconv.Atoi(e.Data[event.Progress])
if err != nil {
break
}
if newAcnStatus == 100 {
if acnStatus != 100 {
for _, onion := range app.ListProfiles() {
profile := app.GetPeer(onion)
if profile != nil {
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
if !exists || autostart == "true" {
app.ActivatePeerEngine(onion)
}
}
}
}
} else {
if acnStatus == 100 {
// just fell offline
for _, onion := range app.ListProfiles() {
app.DeactivatePeerEngine(onion)
}
}
}
acnStatus = newAcnStatus
default:
// invalid event, signifies shutdown
if e.EventType == "" {
return
}
}
}
}
// ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) {
app.appmutex.Lock()
@ -519,6 +561,7 @@ func (app *application) Shutdown() {
app.shutdownPeer(id)
}
log.Debugf("Shutting Down App")
app.eventQueue.Shutdown()
app.appBus.Shutdown()
log.Debugf("Shut Down Complete")
}

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
"math"
"strconv"
"sync"
"time"
)
@ -113,6 +114,7 @@ type contactRetry struct {
breakChan chan bool
onion string
lastCheck time.Time
acnProgress int
connections sync.Map //[string]*contact
connCount int
@ -249,8 +251,6 @@ func (cr *contactRetry) run() {
}
}
}
case event.ProtocolEngineCreated:
cr.protocolEngine = true
case event.ProtocolEngineShutdown:
cr.ACNUp = false
@ -265,22 +265,15 @@ func (cr *contactRetry) run() {
p.failedCount = 0
return true
})
case event.ProtocolEngineCreated:
cr.protocolEngine = true
cr.processStatus()
case event.ACNStatus:
prog := e.Data[event.Progress]
if !cr.protocolEngine {
continue
}
if prog == "100" && !cr.ACNUp {
cr.ACNUp = true
cr.ACNUpTime = time.Now()
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.failedCount = 0
return true
})
} else if prog != "100" {
cr.ACNUp = false
progData := e.Data[event.Progress]
if prog, err := strconv.Atoi(progData); err == nil {
cr.acnProgress = prog
cr.processStatus()
}
}
@ -294,6 +287,24 @@ func (cr *contactRetry) run() {
}
}
func (cr *contactRetry) processStatus() {
if !cr.protocolEngine {
cr.ACNUp = false
return
}
if cr.acnProgress == 100 && !cr.ACNUp {
cr.ACNUp = true
cr.ACNUpTime = time.Now()
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.failedCount = 0
return true
})
} else if cr.acnProgress != 100 {
cr.ACNUp = false
}
}
func (cr *contactRetry) requeueReady() {
if !cr.ACNUp {
return

View File

@ -1336,7 +1336,7 @@ func (cp *cwtchPeer) eventHandler() {
ci, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
if ci == nil || err != nil {
log.Errorf("no server connection count")
return
continue
}
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), ev.Data[event.Signature])
conversations, err := cp.FetchConversations()

View File

@ -342,18 +342,20 @@ func (e *engine) Shutdown() {
// don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown()
e.shuttingDown.Store(true)
e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock()
for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service")
connection.connectingLock.Lock()
connection.service.Shutdown()
connection.connectingLock.Unlock()
}
// work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a
// goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits
go func() {
connection.connectingLock.Lock()
connection.service.Shutdown()
connection.connectingLock.Unlock()
}()
}
e.queue.Shutdown()
}