From 8fd6d5ead215f4a347d8d65d4aba7dd3203a051e Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 16 May 2023 15:41:08 -0700 Subject: [PATCH] Fix Various Bugs Associated with Profile Start Up / Restart --- app/app.go | 2 +- app/plugins/contactRetry.go | 36 +++++++++++++++++-- .../filesharing/filesharing_functionality.go | 3 ++ functionality/filesharing/image_previews.go | 1 + protocol/connections/engine.go | 17 +++++---- 5 files changed, 49 insertions(+), 10 deletions(-) diff --git a/app/app.go b/app/app.go index 36950d9..01afb99 100644 --- a/app/app.go +++ b/app/app.go @@ -413,8 +413,8 @@ func (app *application) ActivatePeerEngine(onion string) { profile := app.GetPeer(onion) if profile != nil { if _, exists := app.engines[onion]; !exists { + log.Debugf("restartFlow: Creating a New Protocol Engine...") app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks) - app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.QueryACNStatus() if true { diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index cb36c20..e8f67dd 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -176,7 +176,10 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) for { - if cr.ACNUp { + // Only attempt connection if both the ACN and the Protocol Engines are Online... + log.Debugf("restartFlow checking state") + if cr.ACNUp && cr.protocolEngine { + log.Debugf("restartFlow time to queue!!") cr.requeueReady() connectingCount := cr.connectingCount() log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount) @@ -292,15 +295,41 @@ func (cr *contactRetry) processStatus() { return } if cr.acnProgress == 100 && !cr.ACNUp { + // ACN is up...at this point we need to completely reset our state + // as there is no guarantee that the tor daemon shares our state anymore... cr.ACNUp = true cr.ACNUpTime = time.Now() + + // reset all of the queues... + cr.connCount = 0 + cr.priorityQueue = newConnectionQueue() + cr.pendingQueue = newConnectionQueue() + + // Loop through connections. Reset state, and requeue... + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + p.queued = true + + // prioritize connections made recently... + log.Debugf("adding %v to queue", p.id) + if time.Since(p.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours { + cr.priorityQueue.insert(p) + } else { + cr.pendingQueue.insert(p) + } + + return true + }) + + } else if cr.acnProgress != 100 { + cr.ACNUp = false cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) p.failedCount = 0 + p.queued = false + p.state = connections.DISCONNECTED return true }) - } else if cr.acnProgress != 100 { - cr.ACNUp = false } } @@ -339,6 +368,7 @@ func (cr *contactRetry) requeueReady() { } func (cr *contactRetry) publishConnectionRequest(contact *contact) { + log.Debugf("RestartFlow Publish Connection Request listener %v", contact) if contact.ctype == peerConn { cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) } diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index fa5aa9c..22986cc 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -193,6 +193,9 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat // Assert the filename...this is technically not necessary, but is here for completeness manifest.FileName = downloadfilepath if manifest.VerifyFile() == nil { + // Send a FileDownloaded Event. Usually when VerifyOrResumeDownload is triggered it's because some UI is awaiting the results of a + // Download. + profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: downloadfilepath, event.TempFile: downloadfilepath})) // File is verified and there is nothing else to do... return nil } else { diff --git a/functionality/filesharing/image_previews.go b/functionality/filesharing/image_previews.go index 106d32b..ad509a2 100644 --- a/functionality/filesharing/image_previews.go +++ b/functionality/filesharing/image_previews.go @@ -102,6 +102,7 @@ func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPee if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True { if _, err := os.Stat(fp); err == nil { // file is marked as completed downloaded and exists... + // Note: this will also resend the FileDownloaded event if successful... if fsf.VerifyOrResumeDownload(profile, conversation.ID, fileKey, constants.ImagePreviewMaxSizeInBytes) == nil { return } diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 20e6d63..52b91ba 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -149,6 +149,7 @@ func (e *engine) EventManager() event.Manager { // eventHandler process events from other subsystems func (e *engine) eventHandler() { + log.Debugf("restartFlow Launching ProtocolEngine listener") for { ev := e.queue.Next() // optimistic shutdown... @@ -159,6 +160,7 @@ func (e *engine) eventHandler() { case event.StatusRequest: e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) case event.PeerRequest: + log.Debugf("restartFlow Handling Peer Request") if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { go e.peerWithOnion(ev.Data[event.RemotePeer]) } @@ -333,6 +335,7 @@ func (e *engine) listenFn() { func (e *engine) Shutdown() { // don't accept any more events... e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) e.service.Shutdown() e.shuttingDown.Store(true) e.ephemeralServicesLock.Lock() @@ -357,24 +360,26 @@ func (e *engine) peerWithOnion(onion string) { log.Debugf("Called PeerWithOnion for %v", onion) if !e.isBlocked(onion) { e.ignoreOnShutdown(e.peerConnecting)(onion) + log.Debugf("PeerWithOnion Connecting %v", onion) connected, err := e.service.Connect(onion, e.createPeerTemplate()) + log.Debugf("PeerWithOnion %v %v", onion, err) // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) if connected && err != nil { - conn, err := e.service.GetConnection(onion) + conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability) if err == nil { if conn.HasCapability(cwtchCapability) { e.ignoreOnShutdown(e.peerAuthed)(onion) return } + log.Debugf("PeerWithOnion something went very wrong...%v %v", onion, err) + e.ignoreOnShutdown(e.peerDisconnected)(onion) + } else { + log.Debugf("PeerWithOnion %v %v", onion, err) + e.ignoreOnShutdown(e.peerDisconnected)(onion) } } - - // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists) - if !connected && err != nil { - e.ignoreOnShutdown(e.peerDisconnected)(onion) - } } }