From 12b89966de418a3317a23264b224d4eaadbacf96 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Thu, 27 Apr 2023 15:16:24 -0600 Subject: [PATCH 1/4] engine shutdown now puts potentially long blocking service.close()s in goroutine; contact retry more smartly handles protocolengine start in case last ACNstatus == 100 message comes first --- app/plugins/contactRetry.go | 43 +++++++++++++++++++++------------- protocol/connections/engine.go | 14 ++++++----- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index f12c09f..6a4f345 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -5,6 +5,7 @@ import ( "cwtch.im/cwtch/protocol/connections" "git.openprivacy.ca/openprivacy/log" "math" + "strconv" "sync" "time" ) @@ -113,6 +114,7 @@ type contactRetry struct { breakChan chan bool onion string lastCheck time.Time + acnProgress int connections sync.Map //[string]*contact connCount int @@ -249,8 +251,6 @@ func (cr *contactRetry) run() { } } } - case event.ProtocolEngineCreated: - cr.protocolEngine = true case event.ProtocolEngineShutdown: cr.ACNUp = false @@ -265,22 +265,15 @@ func (cr *contactRetry) run() { p.failedCount = 0 return true }) + case event.ProtocolEngineCreated: + cr.protocolEngine = true + cr.processStatus() case event.ACNStatus: - prog := e.Data[event.Progress] - if !cr.protocolEngine { - continue - } - if prog == "100" && !cr.ACNUp { - cr.ACNUp = true - cr.ACNUpTime = time.Now() - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) - p.failedCount = 0 - return true - }) - } else if prog != "100" { - cr.ACNUp = false + progData := e.Data[event.Progress] + if prog, err := strconv.Atoi(progData); err == nil { + cr.acnProgress = prog + cr.processStatus() } } @@ -294,6 +287,24 @@ func (cr *contactRetry) run() { } } +func (cr *contactRetry) processStatus() { + if !cr.protocolEngine { + cr.ACNUp = false + return + } + if cr.acnProgress == 100 && !cr.ACNUp { + cr.ACNUp = true + cr.ACNUpTime = time.Now() + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + p.failedCount = 0 + return true + }) + } else if cr.acnProgress != 100 { + cr.ACNUp = false + } +} + func (cr *contactRetry) requeueReady() { if !cr.ACNUp { return diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 9cd0872..a264f47 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -342,18 +342,20 @@ func (e *engine) Shutdown() { // don't accept any more events... e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) e.service.Shutdown() - e.shuttingDown.Store(true) - e.ephemeralServicesLock.Lock() defer e.ephemeralServicesLock.Unlock() for _, connection := range e.ephemeralServices { log.Infof("shutting down ephemeral service") - connection.connectingLock.Lock() - connection.service.Shutdown() - connection.connectingLock.Unlock() - } + // work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a + // goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits + go func() { + connection.connectingLock.Lock() + connection.service.Shutdown() + connection.connectingLock.Unlock() + }() + } e.queue.Shutdown() } -- 2.25.1 From 440b7f422c45423e9763b653bfede9033d26cc21 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 28 Apr 2023 15:00:15 -0600 Subject: [PATCH 2/4] move event handling for AcnStatus engine reboot from lcg into app --- app/app.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/app/app.go b/app/app.go index 634f245..29aef9a 100644 --- a/app/app.go +++ b/app/app.go @@ -31,6 +31,7 @@ type application struct { engines map[string]connections.Engine appBus event.Manager + eventQueue event.Queue appmutex sync.Mutex engineHooks connections.EngineHooks @@ -97,7 +98,7 @@ func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile { // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application { - app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings} + app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings, eventQueue: event.NewQueue()} app.peers = make(map[string]peer.CwtchPeer) app.engineHooks = connections.DefaultEngineHooks{} app.acn = acn @@ -107,6 +108,9 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global prog, status := acn.GetBootstrapStatus() statusHandler(prog, status) + app.GetPrimaryBus().Subscribe(event.ACNStatus, app.eventQueue) + go app.eventHandler() + return app } @@ -474,6 +478,48 @@ func (app *application) QueryACNVersion() { app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) } +func (app *application) eventHandler() { + acnStatus := -1 + for { + e := app.eventQueue.Next() + switch e.EventType { + case event.ACNStatus: + newAcnStatus, err := strconv.Atoi(e.Data[event.Progress]) + if err != nil { + break + } + if newAcnStatus == 100 { + if acnStatus != 100 { + for _, onion := range app.ListProfiles() { + profile := app.GetPeer(onion) + if profile != nil { + autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart) + if !exists || autostart == "true" { + app.ActivatePeerEngine(onion) + } + } + } + } + } else { + if acnStatus == 100 { + // just fell offline + for _, onion := range app.ListProfiles() { + app.DeactivatePeerEngine(onion) + } + } + } + acnStatus = newAcnStatus + + default: + // invalid event, signifies shutdown + if e.EventType == "" { + return + } + + } + } +} + // ShutdownPeer shuts down a peer and removes it from the app's management func (app *application) ShutdownPeer(onion string) { app.appmutex.Lock() @@ -519,6 +565,7 @@ func (app *application) Shutdown() { app.shutdownPeer(id) } log.Debugf("Shutting Down App") + app.eventQueue.Shutdown() app.appBus.Shutdown() log.Debugf("Shut Down Complete") } -- 2.25.1 From e9e2a18678b75baefa3be559c87ee9b9d8b2fc75 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 28 Apr 2023 15:00:23 -0600 Subject: [PATCH 3/4] fix? --- peer/cwtch_peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 1f793be..2d6b363 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1336,7 +1336,7 @@ func (cp *cwtchPeer) eventHandler() { ci, err := cp.FetchConversationInfo(ev.Data[event.GroupServer]) if ci == nil || err != nil { log.Errorf("no server connection count") - return + continue } cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), ev.Data[event.Signature]) conversations, err := cp.FetchConversations() -- 2.25.1 From 7053f4a31bb1eb2e3516e0588d5f27c15f8cbd21 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Mon, 1 May 2023 16:13:39 -0500 Subject: [PATCH 4/4] remove peerlock probably left over from peerapp seperation --- app/app.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/app/app.go b/app/app.go index 29aef9a..ecef3ef 100644 --- a/app/app.go +++ b/app/app.go @@ -24,10 +24,9 @@ type application struct { eventBuses map[string]event.Manager directory string - peerLock sync.Mutex - peers map[string]peer.CwtchPeer - acn connectivity.ACN - plugins sync.Map //map[string] []plugins.Plugin + peers map[string]peer.CwtchPeer + acn connectivity.ACN + plugins sync.Map //map[string] []plugins.Plugin engines map[string]connections.Engine appBus event.Manager @@ -132,9 +131,6 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) { defer app.appmutex.Unlock() app.settings.WriteGlobalSettings(settings) - // we now need to propagate changes to all peers - app.peerLock.Lock() - defer app.peerLock.Unlock() for _, profile := range app.peers { profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments) @@ -154,8 +150,8 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) { func (app *application) ListProfiles() []string { var keys []string - app.peerLock.Lock() - defer app.peerLock.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() for handle := range app.peers { keys = append(keys, handle) } -- 2.25.1