diff --git a/app/app.go b/app/app.go index 2a2e43d..db17edb 100644 --- a/app/app.go +++ b/app/app.go @@ -408,6 +408,12 @@ func (app *application) ConfigureConnections(onion string, listen bool, peers bo if listen { profile.Listen() } + + // if we are making a decision to ignore + if !peers || !servers { + app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.PurgeRetries)) + } + app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ResumeRetries)) profile.StartConnections(peers, servers) } } @@ -481,10 +487,10 @@ func (app *application) eventHandler() { autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart) appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline) if !exists || autostart == "true" { - app.ActivatePeerEngine(onion) if appearOfflineExists && appearOffline == "true" { // don't configure any connections... log.Infof("peer appearing offline, not launching listen threads or connecting jobs") + app.ConfigureConnections(onion, false, false, false) } else { app.ConfigureConnections(onion, true, true, true) } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 5379c8c..1cb0cab 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -125,11 +125,12 @@ type contactRetry struct { pendingQueue *connectionQueue priorityQueue *connectionQueue authorizedPeers sync.Map + stallRetries bool } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, stallRetries: true, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} return cr } @@ -183,10 +184,12 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) cr.bus.Subscribe(event.DeleteContact, cr.queue) cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue) + cr.bus.Subscribe(event.PurgeRetries, cr.queue) + cr.bus.Subscribe(event.ResumeRetries, cr.queue) for { // Only attempt connection if both the ACN and the Protocol Engines are Online... log.Debugf("restartFlow checking state") - if cr.ACNUp && cr.protocolEngine { + if cr.ACNUp && cr.protocolEngine && !cr.stallRetries { log.Debugf("restartFlow time to queue!!") cr.requeueReady() connectingCount := cr.connectingCount() @@ -231,6 +234,20 @@ func (cr *contactRetry) run() { select { case e := <-cr.queue.OutChan(): switch e.EventType { + case event.PurgeRetries: + // Purge All Authorized Peers + cr.authorizedPeers.Range(func(key interface{}, value interface{}) bool { + cr.authorizedPeers.Delete(key) + return true + }) + // Purge All Connection States + cr.connections.Range(func(key interface{}, value interface{}) bool { + cr.connections.Delete(key) + return true + }) + case event.ResumeRetries: + log.Infof("resuming retries...") + cr.stallRetries = false case event.DisconnectPeerRequest: peer := e.Data[event.RemotePeer] cr.authorizedPeers.Delete(peer) @@ -299,7 +316,7 @@ func (cr *contactRetry) run() { case event.ProtocolEngineShutdown: cr.ACNUp = false cr.protocolEngine = false - + cr.stallRetries = true cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) if p.state == connections.AUTHENTICATED || p.state == connections.SYNCED { diff --git a/app/plugins/contactRetry_test.go b/app/plugins/contactRetry_test.go index af5e19c..946ced8 100644 --- a/app/plugins/contactRetry_test.go +++ b/app/plugins/contactRetry_test.go @@ -21,6 +21,7 @@ func TestContactRetryQueue(t *testing.T) { cr := NewConnectionRetry(bus, "").(*contactRetry) cr.ACNUp = true // fake an ACN connection... cr.protocolEngine = true // fake protocol engine + cr.stallRetries = false // fake not being in offline mode... go cr.run() testOnion := "2wgvbza2mbuc72a4u6r6k4hc2blcvrmk4q26bfvlwbqxv2yq5k52fcqd" diff --git a/event/common.go b/event/common.go index 3b9b0bd..c35f6da 100644 --- a/event/common.go +++ b/event/common.go @@ -30,6 +30,10 @@ const ( DisconnectPeerRequest = Type("DisconnectPeerRequest") DisconnectServerRequest = Type("DisconnectServerRequest") + // Events to Manage Retry Contacts + PurgeRetries = Type("PurgeRetries") + ResumeRetries = Type("ResumeRetries") + // RetryServerRequest // Asks CwtchPeer to retry a server connection... // GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 7c29d25..e3b7f42 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -210,7 +210,7 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat return errors.New("file download metadata does not exist, or is corrupted") } -func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey string) { +func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey string) error { path, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey)) if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True { profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{ @@ -229,6 +229,7 @@ func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey stri event.FilePath: path, })) } + return nil // cannot fail } func (f *Functionality) EnhancedShareFile(profile peer.CwtchPeer, conversationID int, sharefilepath string) string { @@ -309,6 +310,9 @@ func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, m // set the filekey status to active profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey), constants.True) + // reset the timestamp... + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey), strconv.FormatInt(time.Now().Unix(), 10)) + // share the manifest profile.PublishEvent(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: filekey, event.SerializedManifest: manifest})) return nil } @@ -566,11 +570,12 @@ func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath, } // StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file -func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) { +func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) error { // Note we do not do a permissions check here, as we are *always* permitted to stop sharing files. // set the filekey status to inactive profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False) profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey})) + return nil // cannot fail } // StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files diff --git a/testing/autodownload/file_sharing_integration_test.go b/testing/autodownload/file_sharing_integration_test.go index 7d39bc8..38afeca 100644 --- a/testing/autodownload/file_sharing_integration_test.go +++ b/testing/autodownload/file_sharing_integration_test.go @@ -167,7 +167,7 @@ func TestFileSharing(t *testing.T) { if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) { // path/to/whatever does not exist - t.Fatalf("cwthc.png should have been automatically downloadeded...") + t.Fatalf("cwtch.png should have been automatically downloaded...") } app.Shutdown()