Fix Various Bugs Associated with Profile Start Up / Restart
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2023-05-16 15:41:08 -07:00 committed by Gitea
parent 50cca925de
commit 8fd6d5ead2
5 changed files with 49 additions and 10 deletions

View File

@ -413,8 +413,8 @@ func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion)
if profile != nil {
if _, exists := app.engines[onion]; !exists {
log.Debugf("restartFlow: Creating a New Protocol Engine...")
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()
if true {

View File

@ -176,7 +176,10 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
for {
if cr.ACNUp {
// Only attempt connection if both the ACN and the Protocol Engines are Online...
log.Debugf("restartFlow checking state")
if cr.ACNUp && cr.protocolEngine {
log.Debugf("restartFlow time to queue!!")
cr.requeueReady()
connectingCount := cr.connectingCount()
log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
@ -292,15 +295,41 @@ func (cr *contactRetry) processStatus() {
return
}
if cr.acnProgress == 100 && !cr.ACNUp {
// ACN is up...at this point we need to completely reset our state
// as there is no guarantee that the tor daemon shares our state anymore...
cr.ACNUp = true
cr.ACNUpTime = time.Now()
// reset all of the queues...
cr.connCount = 0
cr.priorityQueue = newConnectionQueue()
cr.pendingQueue = newConnectionQueue()
// Loop through connections. Reset state, and requeue...
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.queued = true
// prioritize connections made recently...
log.Debugf("adding %v to queue", p.id)
if time.Since(p.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
cr.priorityQueue.insert(p)
} else {
cr.pendingQueue.insert(p)
}
return true
})
} else if cr.acnProgress != 100 {
cr.ACNUp = false
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.failedCount = 0
p.queued = false
p.state = connections.DISCONNECTED
return true
})
} else if cr.acnProgress != 100 {
cr.ACNUp = false
}
}
@ -339,6 +368,7 @@ func (cr *contactRetry) requeueReady() {
}
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
log.Debugf("RestartFlow Publish Connection Request listener %v", contact)
if contact.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
}

View File

@ -193,6 +193,9 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat
// Assert the filename...this is technically not necessary, but is here for completeness
manifest.FileName = downloadfilepath
if manifest.VerifyFile() == nil {
// Send a FileDownloaded Event. Usually when VerifyOrResumeDownload is triggered it's because some UI is awaiting the results of a
// Download.
profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: downloadfilepath, event.TempFile: downloadfilepath}))
// File is verified and there is nothing else to do...
return nil
} else {

View File

@ -102,6 +102,7 @@ func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPee
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
if _, err := os.Stat(fp); err == nil {
// file is marked as completed downloaded and exists...
// Note: this will also resend the FileDownloaded event if successful...
if fsf.VerifyOrResumeDownload(profile, conversation.ID, fileKey, constants.ImagePreviewMaxSizeInBytes) == nil {
return
}

View File

@ -149,6 +149,7 @@ func (e *engine) EventManager() event.Manager {
// eventHandler process events from other subsystems
func (e *engine) eventHandler() {
log.Debugf("restartFlow Launching ProtocolEngine listener")
for {
ev := e.queue.Next()
// optimistic shutdown...
@ -159,6 +160,7 @@ func (e *engine) eventHandler() {
case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
case event.PeerRequest:
log.Debugf("restartFlow Handling Peer Request")
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
go e.peerWithOnion(ev.Data[event.RemotePeer])
}
@ -333,6 +335,7 @@ func (e *engine) listenFn() {
func (e *engine) Shutdown() {
// don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown()
e.shuttingDown.Store(true)
e.ephemeralServicesLock.Lock()
@ -357,23 +360,25 @@ func (e *engine) peerWithOnion(onion string) {
log.Debugf("Called PeerWithOnion for %v", onion)
if !e.isBlocked(onion) {
e.ignoreOnShutdown(e.peerConnecting)(onion)
log.Debugf("PeerWithOnion Connecting %v", onion)
connected, err := e.service.Connect(onion, e.createPeerTemplate())
log.Debugf("PeerWithOnion %v %v", onion, err)
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
conn, err := e.service.GetConnection(onion)
conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
if err == nil {
if conn.HasCapability(cwtchCapability) {
e.ignoreOnShutdown(e.peerAuthed)(onion)
return
}
}
}
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
if !connected && err != nil {
log.Debugf("PeerWithOnion something went very wrong...%v %v", onion, err)
e.ignoreOnShutdown(e.peerDisconnected)(onion)
} else {
log.Debugf("PeerWithOnion %v %v", onion, err)
e.ignoreOnShutdown(e.peerDisconnected)(onion)
}
}
}
}