diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index b3cef2c..b36bba0 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -3,6 +3,7 @@ package plugins import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" "math" "strconv" @@ -120,14 +121,15 @@ type contactRetry struct { lastCheck time.Time acnProgress int - connections sync.Map //[string]*contact - pendingQueue *connectionQueue - priorityQueue *connectionQueue + connections sync.Map //[string]*contact + pendingQueue *connectionQueue + priorityQueue *connectionQueue + authorizedPeers sync.Map } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} return cr } @@ -177,7 +179,8 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.QueueJoinServer, cr.queue) cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue) cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) - + cr.bus.Subscribe(event.DeleteContact, cr.queue) + cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue) for { // Only attempt connection if both the ACN and the Protocol Engines are Online... log.Debugf("restartFlow checking state") @@ -226,16 +229,33 @@ func (cr *contactRetry) run() { select { case e := <-cr.queue.OutChan(): switch e.EventType { + case event.DeleteContact: + // this case covers both servers and peers (servers are peers, and go through the + // same delete conversation flow) + peer := e.Data[event.RemotePeer] + cr.authorizedPeers.Delete(peer) + case event.UpdateConversationAuthorization: + // if we update the conversation authorization then we need to check if + // we need to remove blocked conversations from the regular flow. + peer := e.Data[event.RemotePeer] + blocked := e.Data[event.Blocked] + if blocked == "true" { + cr.authorizedPeers.Delete(peer) + } case event.PeerStateChange: state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] peer := e.Data[event.RemotePeer] - cr.handleEvent(peer, state, peerConn) - + // only handle state change events from pre-authorized peers; + if _, exists := cr.authorizedPeers.Load(peer); exists { + cr.handleEvent(peer, state, peerConn) + } case event.ServerStateChange: state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] server := e.Data[event.GroupServer] - cr.handleEvent(server, state, serverConn) - + // only handle state change events from pre-authorized servers; + if _, exists := cr.authorizedPeers.Load(server); exists { + cr.handleEvent(server, state, serverConn) + } case event.QueueJoinServer: fallthrough case event.QueuePeerRequest: @@ -252,7 +272,9 @@ func (cr *contactRetry) run() { id = server cr.addConnection(server, connections.DISCONNECTED, serverConn, lastSeen) } - + // this was an authorized event, and so we store this peer. + log.Debugf("authorizing id: %v", id) + cr.authorizedPeers.Store(id, true) if c, ok := cr.connections.Load(id); ok { contact := c.(*contact) if contact.state == connections.DISCONNECTED && !contact.queued { @@ -423,6 +445,12 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState return } + // reject events that contain invalid hostnames...we cannot connect to them + // and they could result in spurious connection attempts... + if !tor.IsValidHostname(id) { + return + } + if _, exists := cr.connections.Load(id); !exists { // We have an event for something we don't know about... // The only reason this should happen is if a *new* Peer/Server connection has changed. diff --git a/app/plugins/contactRetry_test.go b/app/plugins/contactRetry_test.go index adf2604..ab55a98 100644 --- a/app/plugins/contactRetry_test.go +++ b/app/plugins/contactRetry_test.go @@ -23,32 +23,38 @@ func TestContactRetryQueue(t *testing.T) { cr.protocolEngine = true // fake protocol engine go cr.run() + testOnion := "2wgvbza2mbuc72a4u6r6k4hc2blcvrmk4q26bfvlwbqxv2yq5k52fcqd" + t.Logf("contact plugin up and running..sending peer connection...") // Assert that there is a peer connection identified as "test" - bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test", event.LastSeen: "test"})) + bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: testOnion, event.LastSeen: "test"})) // Wait until the test actually exists, and is queued // This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but // go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make // progress... - for { - if pinf, exists := cr.connections.Load("test"); exists { + setup := false; + for !setup { + if pinf, exists := cr.connections.Load(testOnion); exists { if pinf.(*contact).queued { - break + if _, exists := cr.authorizedPeers.Load(testOnion); exists { + t.Logf("authorized") + setup = true; + } } } } - pinf, _ := cr.connections.Load("test") + pinf, _ := cr.connections.Load(testOnion) if pinf.(*contact).queued == false { t.Fatalf("test connection should be queued, actually: %v", pinf.(*contact).queued) } // Asset that "test" is authenticated - cr.handleEvent("test", connections.AUTHENTICATED, peerConn) + cr.handleEvent(testOnion, connections.AUTHENTICATED, peerConn) // Assert that "test has a valid state" - pinf, _ = cr.connections.Load("test") + pinf, _ = cr.connections.Load(testOnion) if pinf.(*contact).state != 3 { t.Fatalf("test connection should be in authenticated after update, actually: %v", pinf.(*contact).state) } @@ -62,11 +68,11 @@ func TestContactRetryQueue(t *testing.T) { } // Publish a new peer request... - bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test"})) + bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: testOnion})) time.Sleep(time.Second) // yield for a second so the event can catch up... // Peer test should be forced to queue.... - pinf, _ = cr.connections.Load("test") + pinf, _ = cr.connections.Load(testOnion) if pinf.(*contact).queued != true { t.Fatalf("test connection should be forced to queue after new queue peer request") } diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 22986cc..7c29d25 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -45,6 +45,8 @@ func (f *Functionality) ExperimentsToRegister() []string { func (f *Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { if profile.IsFeatureEnabled(constants.FileSharingExperiment) { switch ev.EventType { + case event.ProtocolEngineCreated: + f.ReShareFiles(profile) case event.ManifestReceived: log.Debugf("Manifest Received Event!: %v", ev) handle := ev.Data[event.Handle] @@ -294,9 +296,10 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int, } // startFileShare is a private method used to finalize a file share and publish it to the protocol engine for processing. -func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, manifest string) error { +// if force is set to true, this function will ignore timestamp checks... +func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, manifest string, force bool) error { tsStr, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey)) - if exists { + if exists && !force { ts, err := strconv.ParseInt(tsStr, 10, 64) if err != nil || ts < time.Now().Unix()-2592000 { log.Errorf("ignoring request to download a file offered more than 30 days ago") @@ -311,7 +314,14 @@ func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, m } // RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest +// by default this function always forces a file share, even if the file has timed out. func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error { + return f.restartFileShareAdvanced(profile, filekey, true) +} + +// RestartFileShareAdvanced takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest in addition +// to a set of parameters +func (f *Functionality) restartFileShareAdvanced(profile peer.CwtchPeer, filekey string, force bool) error { // assert that we are allowed to restart filesharing if !profile.IsFeatureEnabled(constants.FileSharingExperiment) { @@ -323,7 +333,7 @@ func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) if manifestExists { // everything is in order, so reshare this file with the engine log.Debugf("restarting file share: %v", filekey) - return f.startFileShare(profile, filekey, manifest) + return f.startFileShare(profile, filekey, manifest, force) } return fmt.Errorf("manifest does not exist for filekey: %v", filekey) } @@ -357,12 +367,10 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error { filekey := strings.Join(keyparts[:2], ".") sharedFile, err := f.GetFileShareInfo(profile, filekey) - // If we haven't explicitly stopped sharing the file AND - // If fewer than 30 days have passed since we originally shared this file, - // Then attempt to share this file again... - // TODO: In the future this would be the point to change the timestamp and reshare the file... + // If we haven't explicitly stopped sharing the file then attempt a reshare if err == nil && sharedFile.Active { - err := f.RestartFileShare(profile, filekey) + // this reshare can fail because we don't force sharing of files older than 30 days... + err := f.restartFileShareAdvanced(profile, filekey, false) if err != nil { log.Debugf("could not reshare file: %v", err) } @@ -456,7 +464,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (stri profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), string(serializedManifest)) profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize))))) - err = f.startFileShare(profile, key, string(serializedManifest)) + err = f.startFileShare(profile, key, string(serializedManifest), false) return key, string(wrapperJSON), err } diff --git a/functionality/filesharing/image_previews.go b/functionality/filesharing/image_previews.go index ad509a2..6bbc794 100644 --- a/functionality/filesharing/image_previews.go +++ b/functionality/filesharing/image_previews.go @@ -75,10 +75,9 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP // we reset the profile image here so that it is always available. profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", key), strconv.FormatInt(time.Now().Unix(), 10)) log.Debugf("Custom Profile Image: %v %s", key, serializedManifest) + f := Functionality{} + f.RestartFileShare(profile, key) } - // If file sharing is enabled then reshare all active files... - fsf := FunctionalityGate() - _ = fsf.ReShareFiles(profile) } } } diff --git a/functionality/servers/servers_functionality.go b/functionality/servers/servers_functionality.go index 4a039be..a1a3bfc 100644 --- a/functionality/servers/servers_functionality.go +++ b/functionality/servers/servers_functionality.go @@ -30,7 +30,7 @@ func (f *Functionality) NotifySettingsUpdate(settings settings.GlobalSettings) { } func (f *Functionality) EventsToRegister() []event.Type { - return []event.Type{event.Heartbeat} + return []event.Type{event.QueueJoinServer} } func (f *Functionality) ExperimentsToRegister() []string { @@ -42,8 +42,9 @@ func (f *Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { if profile.IsFeatureEnabled(constants.GroupsExperiment) { switch ev.EventType { // keep the UI in sync with the current backend server updates... - // TODO: do we need a secondary heartbeat for less common updates? - case event.Heartbeat: + // queue join server gets triggered on load and on new servers so it's a nice + // low-noise event to hook into... + case event.QueueJoinServer: f.PublishServerUpdate(profile) } } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 3edbacb..50f8c33 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -670,7 +670,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey)) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName) cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite, event.GroupName: gci.GroupName})) - cp.JoinServer(gci.ServerHost) + cp.QueueJoinServer(gci.ServerHost) } return groupConversationID, err } @@ -993,7 +993,7 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v) } cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))), serverSpecification) - cp.JoinServer(onion) + cp.QueueJoinServer(onion) return onion, err } return "", err