forked from cwtch.im/cwtch
Merge pull request 'Properly manage contact retries during mode switching' (#533) from stable-blockers into master
Reviewed-on: cwtch.im/cwtch#533 Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
commit
44856003d6
|
@ -408,6 +408,12 @@ func (app *application) ConfigureConnections(onion string, listen bool, peers bo
|
||||||
if listen {
|
if listen {
|
||||||
profile.Listen()
|
profile.Listen()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we are making a decision to ignore
|
||||||
|
if !peers || !servers {
|
||||||
|
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.PurgeRetries))
|
||||||
|
}
|
||||||
|
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ResumeRetries))
|
||||||
profile.StartConnections(peers, servers)
|
profile.StartConnections(peers, servers)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -481,10 +487,10 @@ func (app *application) eventHandler() {
|
||||||
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
|
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
|
||||||
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
|
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
|
||||||
if !exists || autostart == "true" {
|
if !exists || autostart == "true" {
|
||||||
app.ActivatePeerEngine(onion)
|
|
||||||
if appearOfflineExists && appearOffline == "true" {
|
if appearOfflineExists && appearOffline == "true" {
|
||||||
// don't configure any connections...
|
// don't configure any connections...
|
||||||
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
|
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
|
||||||
|
app.ConfigureConnections(onion, false, false, false)
|
||||||
} else {
|
} else {
|
||||||
app.ConfigureConnections(onion, true, true, true)
|
app.ConfigureConnections(onion, true, true, true)
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,11 +125,12 @@ type contactRetry struct {
|
||||||
pendingQueue *connectionQueue
|
pendingQueue *connectionQueue
|
||||||
priorityQueue *connectionQueue
|
priorityQueue *connectionQueue
|
||||||
authorizedPeers sync.Map
|
authorizedPeers sync.Map
|
||||||
|
stallRetries bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
|
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
|
||||||
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
|
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
|
||||||
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
|
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, stallRetries: true, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
|
||||||
return cr
|
return cr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,10 +184,12 @@ func (cr *contactRetry) run() {
|
||||||
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
|
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
|
||||||
cr.bus.Subscribe(event.DeleteContact, cr.queue)
|
cr.bus.Subscribe(event.DeleteContact, cr.queue)
|
||||||
cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue)
|
cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue)
|
||||||
|
cr.bus.Subscribe(event.PurgeRetries, cr.queue)
|
||||||
|
cr.bus.Subscribe(event.ResumeRetries, cr.queue)
|
||||||
for {
|
for {
|
||||||
// Only attempt connection if both the ACN and the Protocol Engines are Online...
|
// Only attempt connection if both the ACN and the Protocol Engines are Online...
|
||||||
log.Debugf("restartFlow checking state")
|
log.Debugf("restartFlow checking state")
|
||||||
if cr.ACNUp && cr.protocolEngine {
|
if cr.ACNUp && cr.protocolEngine && !cr.stallRetries {
|
||||||
log.Debugf("restartFlow time to queue!!")
|
log.Debugf("restartFlow time to queue!!")
|
||||||
cr.requeueReady()
|
cr.requeueReady()
|
||||||
connectingCount := cr.connectingCount()
|
connectingCount := cr.connectingCount()
|
||||||
|
@ -231,6 +234,20 @@ func (cr *contactRetry) run() {
|
||||||
select {
|
select {
|
||||||
case e := <-cr.queue.OutChan():
|
case e := <-cr.queue.OutChan():
|
||||||
switch e.EventType {
|
switch e.EventType {
|
||||||
|
case event.PurgeRetries:
|
||||||
|
// Purge All Authorized Peers
|
||||||
|
cr.authorizedPeers.Range(func(key interface{}, value interface{}) bool {
|
||||||
|
cr.authorizedPeers.Delete(key)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
// Purge All Connection States
|
||||||
|
cr.connections.Range(func(key interface{}, value interface{}) bool {
|
||||||
|
cr.connections.Delete(key)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
case event.ResumeRetries:
|
||||||
|
log.Infof("resuming retries...")
|
||||||
|
cr.stallRetries = false
|
||||||
case event.DisconnectPeerRequest:
|
case event.DisconnectPeerRequest:
|
||||||
peer := e.Data[event.RemotePeer]
|
peer := e.Data[event.RemotePeer]
|
||||||
cr.authorizedPeers.Delete(peer)
|
cr.authorizedPeers.Delete(peer)
|
||||||
|
@ -299,7 +316,7 @@ func (cr *contactRetry) run() {
|
||||||
case event.ProtocolEngineShutdown:
|
case event.ProtocolEngineShutdown:
|
||||||
cr.ACNUp = false
|
cr.ACNUp = false
|
||||||
cr.protocolEngine = false
|
cr.protocolEngine = false
|
||||||
|
cr.stallRetries = true
|
||||||
cr.connections.Range(func(k, v interface{}) bool {
|
cr.connections.Range(func(k, v interface{}) bool {
|
||||||
p := v.(*contact)
|
p := v.(*contact)
|
||||||
if p.state == connections.AUTHENTICATED || p.state == connections.SYNCED {
|
if p.state == connections.AUTHENTICATED || p.state == connections.SYNCED {
|
||||||
|
|
|
@ -21,6 +21,7 @@ func TestContactRetryQueue(t *testing.T) {
|
||||||
cr := NewConnectionRetry(bus, "").(*contactRetry)
|
cr := NewConnectionRetry(bus, "").(*contactRetry)
|
||||||
cr.ACNUp = true // fake an ACN connection...
|
cr.ACNUp = true // fake an ACN connection...
|
||||||
cr.protocolEngine = true // fake protocol engine
|
cr.protocolEngine = true // fake protocol engine
|
||||||
|
cr.stallRetries = false // fake not being in offline mode...
|
||||||
go cr.run()
|
go cr.run()
|
||||||
|
|
||||||
testOnion := "2wgvbza2mbuc72a4u6r6k4hc2blcvrmk4q26bfvlwbqxv2yq5k52fcqd"
|
testOnion := "2wgvbza2mbuc72a4u6r6k4hc2blcvrmk4q26bfvlwbqxv2yq5k52fcqd"
|
||||||
|
|
|
@ -30,6 +30,10 @@ const (
|
||||||
DisconnectPeerRequest = Type("DisconnectPeerRequest")
|
DisconnectPeerRequest = Type("DisconnectPeerRequest")
|
||||||
DisconnectServerRequest = Type("DisconnectServerRequest")
|
DisconnectServerRequest = Type("DisconnectServerRequest")
|
||||||
|
|
||||||
|
// Events to Manage Retry Contacts
|
||||||
|
PurgeRetries = Type("PurgeRetries")
|
||||||
|
ResumeRetries = Type("ResumeRetries")
|
||||||
|
|
||||||
// RetryServerRequest
|
// RetryServerRequest
|
||||||
// Asks CwtchPeer to retry a server connection...
|
// Asks CwtchPeer to retry a server connection...
|
||||||
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
||||||
|
|
|
@ -210,7 +210,7 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat
|
||||||
return errors.New("file download metadata does not exist, or is corrupted")
|
return errors.New("file download metadata does not exist, or is corrupted")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey string) {
|
func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey string) error {
|
||||||
path, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey))
|
path, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey))
|
||||||
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
|
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
|
||||||
profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{
|
profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{
|
||||||
|
@ -229,6 +229,7 @@ func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey stri
|
||||||
event.FilePath: path,
|
event.FilePath: path,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
return nil // cannot fail
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Functionality) EnhancedShareFile(profile peer.CwtchPeer, conversationID int, sharefilepath string) string {
|
func (f *Functionality) EnhancedShareFile(profile peer.CwtchPeer, conversationID int, sharefilepath string) string {
|
||||||
|
@ -309,6 +310,9 @@ func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, m
|
||||||
|
|
||||||
// set the filekey status to active
|
// set the filekey status to active
|
||||||
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey), constants.True)
|
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey), constants.True)
|
||||||
|
// reset the timestamp...
|
||||||
|
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey), strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
|
// share the manifest
|
||||||
profile.PublishEvent(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: filekey, event.SerializedManifest: manifest}))
|
profile.PublishEvent(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: filekey, event.SerializedManifest: manifest}))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -566,11 +570,12 @@ func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath,
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file
|
// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file
|
||||||
func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) {
|
func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) error {
|
||||||
// Note we do not do a permissions check here, as we are *always* permitted to stop sharing files.
|
// Note we do not do a permissions check here, as we are *always* permitted to stop sharing files.
|
||||||
// set the filekey status to inactive
|
// set the filekey status to inactive
|
||||||
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False)
|
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False)
|
||||||
profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey}))
|
profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey}))
|
||||||
|
return nil // cannot fail
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files
|
// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files
|
||||||
|
|
|
@ -167,7 +167,7 @@ func TestFileSharing(t *testing.T) {
|
||||||
|
|
||||||
if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) {
|
if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) {
|
||||||
// path/to/whatever does not exist
|
// path/to/whatever does not exist
|
||||||
t.Fatalf("cwthc.png should have been automatically downloadeded...")
|
t.Fatalf("cwtch.png should have been automatically downloaded...")
|
||||||
}
|
}
|
||||||
|
|
||||||
app.Shutdown()
|
app.Shutdown()
|
||||||
|
|
Loading…
Reference in New Issue