Properly manage contact retries during mode switching
continuous-integration/drone/pr Build is passing Details

Fixes a small file shareing management issue where a file was being marked as inactive because the timestamp wasn't updated.
This commit is contained in:
Sarah Jamie Lewis 2023-09-19 12:21:53 -07:00
parent 13583f3e8c
commit f16eeb1922
6 changed files with 40 additions and 7 deletions

View File

@ -408,6 +408,12 @@ func (app *application) ConfigureConnections(onion string, listen bool, peers bo
if listen {
profile.Listen()
}
// if we are making a decision to ignore
if !peers || !servers {
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.PurgeRetries))
}
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ResumeRetries))
profile.StartConnections(peers, servers)
}
}
@ -481,10 +487,10 @@ func (app *application) eventHandler() {
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
if !exists || autostart == "true" {
app.ActivatePeerEngine(onion)
if appearOfflineExists && appearOffline == "true" {
// don't configure any connections...
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
app.ConfigureConnections(onion, false, false, false)
} else {
app.ConfigureConnections(onion, true, true, true)
}

View File

@ -125,11 +125,12 @@ type contactRetry struct {
pendingQueue *connectionQueue
priorityQueue *connectionQueue
authorizedPeers sync.Map
stallRetries bool
}
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, stallRetries: true, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
return cr
}
@ -183,10 +184,12 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
cr.bus.Subscribe(event.DeleteContact, cr.queue)
cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue)
cr.bus.Subscribe(event.PurgeRetries, cr.queue)
cr.bus.Subscribe(event.ResumeRetries, cr.queue)
for {
// Only attempt connection if both the ACN and the Protocol Engines are Online...
log.Debugf("restartFlow checking state")
if cr.ACNUp && cr.protocolEngine {
if cr.ACNUp && cr.protocolEngine && !cr.stallRetries {
log.Debugf("restartFlow time to queue!!")
cr.requeueReady()
connectingCount := cr.connectingCount()
@ -231,6 +234,20 @@ func (cr *contactRetry) run() {
select {
case e := <-cr.queue.OutChan():
switch e.EventType {
case event.PurgeRetries:
// Purge All Authorized Peers
cr.authorizedPeers.Range(func(key interface{}, value interface{}) bool {
cr.authorizedPeers.Delete(key)
return true
})
// Purge All Connection States
cr.connections.Range(func(key interface{}, value interface{}) bool {
cr.connections.Delete(key)
return true
})
case event.ResumeRetries:
log.Infof("resuming retries...")
cr.stallRetries = false
case event.DisconnectPeerRequest:
peer := e.Data[event.RemotePeer]
cr.authorizedPeers.Delete(peer)
@ -299,7 +316,7 @@ func (cr *contactRetry) run() {
case event.ProtocolEngineShutdown:
cr.ACNUp = false
cr.protocolEngine = false
cr.stallRetries = true
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
if p.state == connections.AUTHENTICATED || p.state == connections.SYNCED {

View File

@ -21,6 +21,7 @@ func TestContactRetryQueue(t *testing.T) {
cr := NewConnectionRetry(bus, "").(*contactRetry)
cr.ACNUp = true // fake an ACN connection...
cr.protocolEngine = true // fake protocol engine
cr.stallRetries = false // fake not being in offline mode...
go cr.run()
testOnion := "2wgvbza2mbuc72a4u6r6k4hc2blcvrmk4q26bfvlwbqxv2yq5k52fcqd"

View File

@ -30,6 +30,10 @@ const (
DisconnectPeerRequest = Type("DisconnectPeerRequest")
DisconnectServerRequest = Type("DisconnectServerRequest")
// Events to Manage Retry Contacts
PurgeRetries = Type("PurgeRetries")
ResumeRetries = Type("ResumeRetries")
// RetryServerRequest
// Asks CwtchPeer to retry a server connection...
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"

View File

@ -210,7 +210,7 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat
return errors.New("file download metadata does not exist, or is corrupted")
}
func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey string) {
func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey string) error {
path, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey))
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{
@ -229,6 +229,7 @@ func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey stri
event.FilePath: path,
}))
}
return nil // cannot fail
}
func (f *Functionality) EnhancedShareFile(profile peer.CwtchPeer, conversationID int, sharefilepath string) string {
@ -309,6 +310,9 @@ func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, m
// set the filekey status to active
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey), constants.True)
// reset the timestamp...
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey), strconv.FormatInt(time.Now().Unix(), 10))
// share the manifest
profile.PublishEvent(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: filekey, event.SerializedManifest: manifest}))
return nil
}
@ -566,11 +570,12 @@ func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath,
}
// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file
func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) {
func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) error {
// Note we do not do a permissions check here, as we are *always* permitted to stop sharing files.
// set the filekey status to inactive
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False)
profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey}))
return nil // cannot fail
}
// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files

View File

@ -167,7 +167,7 @@ func TestFileSharing(t *testing.T) {
if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) {
// path/to/whatever does not exist
t.Fatalf("cwthc.png should have been automatically downloadeded...")
t.Fatalf("cwtch.png should have been automatically downloaded...")
}
app.Shutdown()