Merge pull request 'Allow force restarting of file shares regardless of timestamp.' (#530) from stable-blockers into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #530
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2023-08-31 18:51:40 +00:00
commit 7464e3922d
6 changed files with 78 additions and 36 deletions

View File

@ -3,6 +3,7 @@ package plugins
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"math"
"strconv"
@ -120,14 +121,15 @@ type contactRetry struct {
lastCheck time.Time
acnProgress int
connections sync.Map //[string]*contact
pendingQueue *connectionQueue
priorityQueue *connectionQueue
connections sync.Map //[string]*contact
pendingQueue *connectionQueue
priorityQueue *connectionQueue
authorizedPeers sync.Map
}
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
return cr
}
@ -177,7 +179,8 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
cr.bus.Subscribe(event.DeleteContact, cr.queue)
cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue)
for {
// Only attempt connection if both the ACN and the Protocol Engines are Online...
log.Debugf("restartFlow checking state")
@ -226,16 +229,33 @@ func (cr *contactRetry) run() {
select {
case e := <-cr.queue.OutChan():
switch e.EventType {
case event.DeleteContact:
// this case covers both servers and peers (servers are peers, and go through the
// same delete conversation flow)
peer := e.Data[event.RemotePeer]
cr.authorizedPeers.Delete(peer)
case event.UpdateConversationAuthorization:
// if we update the conversation authorization then we need to check if
// we need to remove blocked conversations from the regular flow.
peer := e.Data[event.RemotePeer]
blocked := e.Data[event.Blocked]
if blocked == "true" {
cr.authorizedPeers.Delete(peer)
}
case event.PeerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
peer := e.Data[event.RemotePeer]
cr.handleEvent(peer, state, peerConn)
// only handle state change events from pre-authorized peers;
if _, exists := cr.authorizedPeers.Load(peer); exists {
cr.handleEvent(peer, state, peerConn)
}
case event.ServerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
server := e.Data[event.GroupServer]
cr.handleEvent(server, state, serverConn)
// only handle state change events from pre-authorized servers;
if _, exists := cr.authorizedPeers.Load(server); exists {
cr.handleEvent(server, state, serverConn)
}
case event.QueueJoinServer:
fallthrough
case event.QueuePeerRequest:
@ -252,7 +272,9 @@ func (cr *contactRetry) run() {
id = server
cr.addConnection(server, connections.DISCONNECTED, serverConn, lastSeen)
}
// this was an authorized event, and so we store this peer.
log.Debugf("authorizing id: %v", id)
cr.authorizedPeers.Store(id, true)
if c, ok := cr.connections.Load(id); ok {
contact := c.(*contact)
if contact.state == connections.DISCONNECTED && !contact.queued {
@ -423,6 +445,12 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
return
}
// reject events that contain invalid hostnames...we cannot connect to them
// and they could result in spurious connection attempts...
if !tor.IsValidHostname(id) {
return
}
if _, exists := cr.connections.Load(id); !exists {
// We have an event for something we don't know about...
// The only reason this should happen is if a *new* Peer/Server connection has changed.

View File

@ -23,32 +23,38 @@ func TestContactRetryQueue(t *testing.T) {
cr.protocolEngine = true // fake protocol engine
go cr.run()
testOnion := "2wgvbza2mbuc72a4u6r6k4hc2blcvrmk4q26bfvlwbqxv2yq5k52fcqd"
t.Logf("contact plugin up and running..sending peer connection...")
// Assert that there is a peer connection identified as "test"
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test", event.LastSeen: "test"}))
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: testOnion, event.LastSeen: "test"}))
// Wait until the test actually exists, and is queued
// This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but
// go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make
// progress...
for {
if pinf, exists := cr.connections.Load("test"); exists {
setup := false;
for !setup {
if pinf, exists := cr.connections.Load(testOnion); exists {
if pinf.(*contact).queued {
break
if _, exists := cr.authorizedPeers.Load(testOnion); exists {
t.Logf("authorized")
setup = true;
}
}
}
}
pinf, _ := cr.connections.Load("test")
pinf, _ := cr.connections.Load(testOnion)
if pinf.(*contact).queued == false {
t.Fatalf("test connection should be queued, actually: %v", pinf.(*contact).queued)
}
// Asset that "test" is authenticated
cr.handleEvent("test", connections.AUTHENTICATED, peerConn)
cr.handleEvent(testOnion, connections.AUTHENTICATED, peerConn)
// Assert that "test has a valid state"
pinf, _ = cr.connections.Load("test")
pinf, _ = cr.connections.Load(testOnion)
if pinf.(*contact).state != 3 {
t.Fatalf("test connection should be in authenticated after update, actually: %v", pinf.(*contact).state)
}
@ -62,11 +68,11 @@ func TestContactRetryQueue(t *testing.T) {
}
// Publish a new peer request...
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test"}))
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: testOnion}))
time.Sleep(time.Second) // yield for a second so the event can catch up...
// Peer test should be forced to queue....
pinf, _ = cr.connections.Load("test")
pinf, _ = cr.connections.Load(testOnion)
if pinf.(*contact).queued != true {
t.Fatalf("test connection should be forced to queue after new queue peer request")
}

View File

@ -45,6 +45,8 @@ func (f *Functionality) ExperimentsToRegister() []string {
func (f *Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
if profile.IsFeatureEnabled(constants.FileSharingExperiment) {
switch ev.EventType {
case event.ProtocolEngineCreated:
f.ReShareFiles(profile)
case event.ManifestReceived:
log.Debugf("Manifest Received Event!: %v", ev)
handle := ev.Data[event.Handle]
@ -294,9 +296,10 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int,
}
// startFileShare is a private method used to finalize a file share and publish it to the protocol engine for processing.
func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, manifest string) error {
// if force is set to true, this function will ignore timestamp checks...
func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, manifest string, force bool) error {
tsStr, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey))
if exists {
if exists && !force {
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil || ts < time.Now().Unix()-2592000 {
log.Errorf("ignoring request to download a file offered more than 30 days ago")
@ -311,7 +314,14 @@ func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, m
}
// RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest
// by default this function always forces a file share, even if the file has timed out.
func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error {
return f.restartFileShareAdvanced(profile, filekey, true)
}
// RestartFileShareAdvanced takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest in addition
// to a set of parameters
func (f *Functionality) restartFileShareAdvanced(profile peer.CwtchPeer, filekey string, force bool) error {
// assert that we are allowed to restart filesharing
if !profile.IsFeatureEnabled(constants.FileSharingExperiment) {
@ -323,7 +333,7 @@ func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string)
if manifestExists {
// everything is in order, so reshare this file with the engine
log.Debugf("restarting file share: %v", filekey)
return f.startFileShare(profile, filekey, manifest)
return f.startFileShare(profile, filekey, manifest, force)
}
return fmt.Errorf("manifest does not exist for filekey: %v", filekey)
}
@ -357,12 +367,10 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error {
filekey := strings.Join(keyparts[:2], ".")
sharedFile, err := f.GetFileShareInfo(profile, filekey)
// If we haven't explicitly stopped sharing the file AND
// If fewer than 30 days have passed since we originally shared this file,
// Then attempt to share this file again...
// TODO: In the future this would be the point to change the timestamp and reshare the file...
// If we haven't explicitly stopped sharing the file then attempt a reshare
if err == nil && sharedFile.Active {
err := f.RestartFileShare(profile, filekey)
// this reshare can fail because we don't force sharing of files older than 30 days...
err := f.restartFileShareAdvanced(profile, filekey, false)
if err != nil {
log.Debugf("could not reshare file: %v", err)
}
@ -456,7 +464,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (stri
profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), string(serializedManifest))
profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize)))))
err = f.startFileShare(profile, key, string(serializedManifest))
err = f.startFileShare(profile, key, string(serializedManifest), false)
return key, string(wrapperJSON), err
}

View File

@ -75,10 +75,9 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
// we reset the profile image here so that it is always available.
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", key), strconv.FormatInt(time.Now().Unix(), 10))
log.Debugf("Custom Profile Image: %v %s", key, serializedManifest)
f := Functionality{}
f.RestartFileShare(profile, key)
}
// If file sharing is enabled then reshare all active files...
fsf := FunctionalityGate()
_ = fsf.ReShareFiles(profile)
}
}
}

View File

@ -30,7 +30,7 @@ func (f *Functionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f *Functionality) EventsToRegister() []event.Type {
return []event.Type{event.Heartbeat}
return []event.Type{event.QueueJoinServer}
}
func (f *Functionality) ExperimentsToRegister() []string {
@ -42,8 +42,9 @@ func (f *Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
if profile.IsFeatureEnabled(constants.GroupsExperiment) {
switch ev.EventType {
// keep the UI in sync with the current backend server updates...
// TODO: do we need a secondary heartbeat for less common updates?
case event.Heartbeat:
// queue join server gets triggered on load and on new servers so it's a nice
// low-noise event to hook into...
case event.QueueJoinServer:
f.PublishServerUpdate(profile)
}
}

View File

@ -670,7 +670,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey))
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName)
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite, event.GroupName: gci.GroupName}))
cp.JoinServer(gci.ServerHost)
cp.QueueJoinServer(gci.ServerHost)
}
return groupConversationID, err
}
@ -993,7 +993,7 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) {
cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v)
}
cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))), serverSpecification)
cp.JoinServer(onion)
cp.QueueJoinServer(onion)
return onion, err
}
return "", err