Merge pull request 'Fix Various Bugs Associated with Profile Start Up / Restart' (#515) from startupbugs into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #515
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2023-05-16 23:21:40 +00:00
commit cff2a8cafe
6 changed files with 51 additions and 12 deletions

View File

@ -413,8 +413,8 @@ func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
if _, exists := app.engines[onion]; !exists { if _, exists := app.engines[onion]; !exists {
log.Debugf("restartFlow: Creating a New Protocol Engine...")
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus() app.QueryACNStatus()
if true { if true {

View File

@ -176,7 +176,10 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
for { for {
if cr.ACNUp { // Only attempt connection if both the ACN and the Protocol Engines are Online...
log.Debugf("restartFlow checking state")
if cr.ACNUp && cr.protocolEngine {
log.Debugf("restartFlow time to queue!!")
cr.requeueReady() cr.requeueReady()
connectingCount := cr.connectingCount() connectingCount := cr.connectingCount()
log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount) log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
@ -292,15 +295,41 @@ func (cr *contactRetry) processStatus() {
return return
} }
if cr.acnProgress == 100 && !cr.ACNUp { if cr.acnProgress == 100 && !cr.ACNUp {
// ACN is up...at this point we need to completely reset our state
// as there is no guarantee that the tor daemon shares our state anymore...
cr.ACNUp = true cr.ACNUp = true
cr.ACNUpTime = time.Now() cr.ACNUpTime = time.Now()
// reset all of the queues...
cr.connCount = 0
cr.priorityQueue = newConnectionQueue()
cr.pendingQueue = newConnectionQueue()
// Loop through connections. Reset state, and requeue...
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.queued = true
// prioritize connections made recently...
log.Debugf("adding %v to queue", p.id)
if time.Since(p.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
cr.priorityQueue.insert(p)
} else {
cr.pendingQueue.insert(p)
}
return true
})
} else if cr.acnProgress != 100 {
cr.ACNUp = false
cr.connections.Range(func(k, v interface{}) bool { cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact) p := v.(*contact)
p.failedCount = 0 p.failedCount = 0
p.queued = false
p.state = connections.DISCONNECTED
return true return true
}) })
} else if cr.acnProgress != 100 {
cr.ACNUp = false
} }
} }
@ -339,6 +368,7 @@ func (cr *contactRetry) requeueReady() {
} }
func (cr *contactRetry) publishConnectionRequest(contact *contact) { func (cr *contactRetry) publishConnectionRequest(contact *contact) {
log.Debugf("RestartFlow Publish Connection Request listener %v", contact)
if contact.ctype == peerConn { if contact.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
} }

View File

@ -18,7 +18,8 @@ func TestContactRetryQueue(t *testing.T) {
log.SetLevel(log.LevelDebug) log.SetLevel(log.LevelDebug)
bus := event.NewEventManager() bus := event.NewEventManager()
cr := NewConnectionRetry(bus, "").(*contactRetry) cr := NewConnectionRetry(bus, "").(*contactRetry)
cr.ACNUp = true // fake an ACN connection... cr.ACNUp = true // fake an ACN connection...
cr.protocolEngine = true // fake protocol engine
go cr.run() go cr.run()
t.Logf("contact plugin up and running..sending peer connection...") t.Logf("contact plugin up and running..sending peer connection...")

View File

@ -193,6 +193,9 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat
// Assert the filename...this is technically not necessary, but is here for completeness // Assert the filename...this is technically not necessary, but is here for completeness
manifest.FileName = downloadfilepath manifest.FileName = downloadfilepath
if manifest.VerifyFile() == nil { if manifest.VerifyFile() == nil {
// Send a FileDownloaded Event. Usually when VerifyOrResumeDownload is triggered it's because some UI is awaiting the results of a
// Download.
profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: downloadfilepath, event.TempFile: downloadfilepath}))
// File is verified and there is nothing else to do... // File is verified and there is nothing else to do...
return nil return nil
} else { } else {

View File

@ -102,6 +102,7 @@ func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPee
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True { if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
if _, err := os.Stat(fp); err == nil { if _, err := os.Stat(fp); err == nil {
// file is marked as completed downloaded and exists... // file is marked as completed downloaded and exists...
// Note: this will also resend the FileDownloaded event if successful...
if fsf.VerifyOrResumeDownload(profile, conversation.ID, fileKey, constants.ImagePreviewMaxSizeInBytes) == nil { if fsf.VerifyOrResumeDownload(profile, conversation.ID, fileKey, constants.ImagePreviewMaxSizeInBytes) == nil {
return return
} }

View File

@ -149,6 +149,7 @@ func (e *engine) EventManager() event.Manager {
// eventHandler process events from other subsystems // eventHandler process events from other subsystems
func (e *engine) eventHandler() { func (e *engine) eventHandler() {
log.Debugf("restartFlow Launching ProtocolEngine listener")
for { for {
ev := e.queue.Next() ev := e.queue.Next()
// optimistic shutdown... // optimistic shutdown...
@ -159,6 +160,7 @@ func (e *engine) eventHandler() {
case event.StatusRequest: case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
case event.PeerRequest: case event.PeerRequest:
log.Debugf("restartFlow Handling Peer Request")
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
go e.peerWithOnion(ev.Data[event.RemotePeer]) go e.peerWithOnion(ev.Data[event.RemotePeer])
} }
@ -333,6 +335,7 @@ func (e *engine) listenFn() {
func (e *engine) Shutdown() { func (e *engine) Shutdown() {
// don't accept any more events... // don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown() e.service.Shutdown()
e.shuttingDown.Store(true) e.shuttingDown.Store(true)
e.ephemeralServicesLock.Lock() e.ephemeralServicesLock.Lock()
@ -358,23 +361,24 @@ func (e *engine) peerWithOnion(onion string) {
if !e.isBlocked(onion) { if !e.isBlocked(onion) {
e.ignoreOnShutdown(e.peerConnecting)(onion) e.ignoreOnShutdown(e.peerConnecting)(onion)
connected, err := e.service.Connect(onion, e.createPeerTemplate()) connected, err := e.service.Connect(onion, e.createPeerTemplate())
// If we are already connected...check if we are authed and issue an auth event // If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless) // (This allows the ui to be stateless)
if connected && err != nil { if connected && err != nil {
conn, err := e.service.GetConnection(onion) conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
if err == nil { if err == nil {
if conn.HasCapability(cwtchCapability) { if conn.HasCapability(cwtchCapability) {
e.ignoreOnShutdown(e.peerAuthed)(onion) e.ignoreOnShutdown(e.peerAuthed)(onion)
return return
} }
log.Errorf("PeerWithOnion something went very wrong...%v %v", onion, err)
if conn != nil {
conn.Close()
}
e.ignoreOnShutdown(e.peerDisconnected)(onion)
} else {
e.ignoreOnShutdown(e.peerDisconnected)(onion)
} }
} }
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
if !connected && err != nil {
e.ignoreOnShutdown(e.peerDisconnected)(onion)
}
} }
} }