Fix Various Bugs Associated with Profile Start Up / Restart #515
|
@ -413,8 +413,8 @@ func (app *application) ActivatePeerEngine(onion string) {
|
|||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
if _, exists := app.engines[onion]; !exists {
|
||||
log.Debugf("restartFlow: Creating a New Protocol Engine...")
|
||||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
|
||||
|
||||
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
|
||||
app.QueryACNStatus()
|
||||
if true {
|
||||
|
|
|
@ -176,7 +176,10 @@ func (cr *contactRetry) run() {
|
|||
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
|
||||
|
||||
for {
|
||||
if cr.ACNUp {
|
||||
// Only attempt connection if both the ACN and the Protocol Engines are Online...
|
||||
log.Debugf("restartFlow checking state")
|
||||
if cr.ACNUp && cr.protocolEngine {
|
||||
log.Debugf("restartFlow time to queue!!")
|
||||
cr.requeueReady()
|
||||
connectingCount := cr.connectingCount()
|
||||
log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
|
||||
|
@ -292,15 +295,41 @@ func (cr *contactRetry) processStatus() {
|
|||
return
|
||||
}
|
||||
if cr.acnProgress == 100 && !cr.ACNUp {
|
||||
// ACN is up...at this point we need to completely reset our state
|
||||
// as there is no guarantee that the tor daemon shares our state anymore...
|
||||
cr.ACNUp = true
|
||||
cr.ACNUpTime = time.Now()
|
||||
|
||||
// reset all of the queues...
|
||||
cr.connCount = 0
|
||||
cr.priorityQueue = newConnectionQueue()
|
||||
cr.pendingQueue = newConnectionQueue()
|
||||
|
||||
// Loop through connections. Reset state, and requeue...
|
||||
cr.connections.Range(func(k, v interface{}) bool {
|
||||
p := v.(*contact)
|
||||
p.queued = true
|
||||
|
||||
// prioritize connections made recently...
|
||||
log.Debugf("adding %v to queue", p.id)
|
||||
if time.Since(p.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
||||
cr.priorityQueue.insert(p)
|
||||
} else {
|
||||
cr.pendingQueue.insert(p)
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
} else if cr.acnProgress != 100 {
|
||||
cr.ACNUp = false
|
||||
cr.connections.Range(func(k, v interface{}) bool {
|
||||
p := v.(*contact)
|
||||
p.failedCount = 0
|
||||
p.queued = false
|
||||
p.state = connections.DISCONNECTED
|
||||
return true
|
||||
})
|
||||
} else if cr.acnProgress != 100 {
|
||||
cr.ACNUp = false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -339,6 +368,7 @@ func (cr *contactRetry) requeueReady() {
|
|||
}
|
||||
|
||||
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
|
||||
log.Debugf("RestartFlow Publish Connection Request listener %v", contact)
|
||||
if contact.ctype == peerConn {
|
||||
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ func TestContactRetryQueue(t *testing.T) {
|
|||
bus := event.NewEventManager()
|
||||
cr := NewConnectionRetry(bus, "").(*contactRetry)
|
||||
cr.ACNUp = true // fake an ACN connection...
|
||||
cr.protocolEngine = true // fake protocol engine
|
||||
go cr.run()
|
||||
|
||||
t.Logf("contact plugin up and running..sending peer connection...")
|
||||
|
|
|
@ -193,6 +193,9 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat
|
|||
// Assert the filename...this is technically not necessary, but is here for completeness
|
||||
manifest.FileName = downloadfilepath
|
||||
if manifest.VerifyFile() == nil {
|
||||
// Send a FileDownloaded Event. Usually when VerifyOrResumeDownload is triggered it's because some UI is awaiting the results of a
|
||||
// Download.
|
||||
profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: downloadfilepath, event.TempFile: downloadfilepath}))
|
||||
// File is verified and there is nothing else to do...
|
||||
return nil
|
||||
} else {
|
||||
|
|
|
@ -102,6 +102,7 @@ func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPee
|
|||
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
|
||||
if _, err := os.Stat(fp); err == nil {
|
||||
// file is marked as completed downloaded and exists...
|
||||
// Note: this will also resend the FileDownloaded event if successful...
|
||||
if fsf.VerifyOrResumeDownload(profile, conversation.ID, fileKey, constants.ImagePreviewMaxSizeInBytes) == nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -149,6 +149,7 @@ func (e *engine) EventManager() event.Manager {
|
|||
|
||||
// eventHandler process events from other subsystems
|
||||
func (e *engine) eventHandler() {
|
||||
log.Debugf("restartFlow Launching ProtocolEngine listener")
|
||||
for {
|
||||
ev := e.queue.Next()
|
||||
// optimistic shutdown...
|
||||
|
@ -159,6 +160,7 @@ func (e *engine) eventHandler() {
|
|||
case event.StatusRequest:
|
||||
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
|
||||
case event.PeerRequest:
|
||||
log.Debugf("restartFlow Handling Peer Request")
|
||||
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
||||
}
|
||||
|
@ -333,6 +335,7 @@ func (e *engine) listenFn() {
|
|||
func (e *engine) Shutdown() {
|
||||
// don't accept any more events...
|
||||
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
|
||||
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
|
||||
e.service.Shutdown()
|
||||
e.shuttingDown.Store(true)
|
||||
e.ephemeralServicesLock.Lock()
|
||||
|
@ -358,22 +361,23 @@ func (e *engine) peerWithOnion(onion string) {
|
|||
if !e.isBlocked(onion) {
|
||||
e.ignoreOnShutdown(e.peerConnecting)(onion)
|
||||
connected, err := e.service.Connect(onion, e.createPeerTemplate())
|
||||
|
||||
// If we are already connected...check if we are authed and issue an auth event
|
||||
// (This allows the ui to be stateless)
|
||||
if connected && err != nil {
|
||||
conn, err := e.service.GetConnection(onion)
|
||||
conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
|
||||
if err == nil {
|
||||
if conn.HasCapability(cwtchCapability) {
|
||||
e.ignoreOnShutdown(e.peerAuthed)(onion)
|
||||
return
|
||||
}
|
||||
log.Errorf("PeerWithOnion something went very wrong...%v %v", onion, err)
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
|
||||
if !connected && err != nil {
|
||||
e.ignoreOnShutdown(e.peerDisconnected)(onion)
|
||||
} else {
|
||||
e.ignoreOnShutdown(e.peerDisconnected)(onion)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue