diff --git a/app/app.go b/app/app.go index 23960f7..2a2e43d 100644 --- a/app/app.go +++ b/app/app.go @@ -63,7 +63,7 @@ type Application interface { QueryACNStatus() QueryACNVersion() - ActivateEngines(doListn, doPeers, doServers bool) + ConfigureConnections(onion string, doListn, doPeers, doServers bool) ActivatePeerEngine(onion string) DeactivatePeerEngine(onion string) @@ -385,31 +385,6 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { return false } -// ActivateEngines launches all peer engines -func (app *application) ActivateEngines(doListen, doPeers, doServers bool) { - log.Debugf("ActivateEngines") - - for _, profile := range app.peers { - app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks) - app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) - } - app.QueryACNStatus() - - if doListen { - for _, profile := range app.peers { - log.Debugf(" Listen for %v", profile.GetOnion()) - profile.Listen() - } - } - - if doPeers || doServers { - for _, profile := range app.peers { - log.Debugf(" Start Connections for %v doPeers:%v doServers:%v", profile.GetOnion(), doPeers, doServers) - profile.StartConnections(doPeers, doServers) - } - } -} - // ActivatePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online func (app *application) ActivatePeerEngine(onion string) { profile := app.GetPeer(onion) @@ -419,14 +394,24 @@ func (app *application) ActivatePeerEngine(onion string) { app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks) app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.QueryACNStatus() - if true { - profile.Listen() - } - profile.StartConnections(true, true) } } } +// ConfigureConnections autostarts the given kinds of connections. +func (app *application) ConfigureConnections(onion string, listen bool, peers bool, servers bool) { + profile := app.GetPeer(onion) + if profile != nil { + // enable the engine if it doesn't exist... + // note: this function is idempotent + app.ActivatePeerEngine(onion) + if listen { + profile.Listen() + } + profile.StartConnections(peers, servers) + } +} + // DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline func (app *application) DeactivatePeerEngine(onion string) { if engine, exists := app.engines[onion]; exists { @@ -494,9 +479,17 @@ func (app *application) eventHandler() { profile := app.GetPeer(onion) if profile != nil { autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart) + appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline) if !exists || autostart == "true" { app.ActivatePeerEngine(onion) + if appearOfflineExists && appearOffline == "true" { + // don't configure any connections... + log.Infof("peer appearing offline, not launching listen threads or connecting jobs") + } else { + app.ConfigureConnections(onion, true, true, true) + } } + } } } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index b36bba0..5bdc510 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -177,6 +177,8 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.ServerStateChange, cr.queue) cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) cr.bus.Subscribe(event.QueueJoinServer, cr.queue) + cr.bus.Subscribe(event.DisconnectPeerRequest, cr.queue) + cr.bus.Subscribe(event.DisconnectServerRequest, cr.queue) cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue) cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) cr.bus.Subscribe(event.DeleteContact, cr.queue) @@ -229,6 +231,12 @@ func (cr *contactRetry) run() { select { case e := <-cr.queue.OutChan(): switch e.EventType { + case event.DisconnectPeerRequest: + peer := e.Data[event.RemotePeer] + cr.authorizedPeers.Delete(peer) + case event.DisconnectServerRequest: + peer := e.Data[event.GroupServer] + cr.authorizedPeers.Delete(peer) case event.DeleteContact: // this case covers both servers and peers (servers are peers, and go through the // same delete conversation flow) diff --git a/app/plugins/contactRetry_test.go b/app/plugins/contactRetry_test.go index ab55a98..af5e19c 100644 --- a/app/plugins/contactRetry_test.go +++ b/app/plugins/contactRetry_test.go @@ -33,13 +33,13 @@ func TestContactRetryQueue(t *testing.T) { // This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but // go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make // progress... - setup := false; + setup := false for !setup { if pinf, exists := cr.connections.Load(testOnion); exists { if pinf.(*contact).queued { if _, exists := cr.authorizedPeers.Load(testOnion); exists { t.Logf("authorized") - setup = true; + setup = true } } } diff --git a/event/common.go b/event/common.go index 182937a..3b9b0bd 100644 --- a/event/common.go +++ b/event/common.go @@ -25,6 +25,11 @@ const ( // GroupServer QueuePeerRequest = Type("QueuePeerRequest") + // Disconnect*Request + // Close active connections and prevent new connections + DisconnectPeerRequest = Type("DisconnectPeerRequest") + DisconnectServerRequest = Type("DisconnectServerRequest") + // RetryServerRequest // Asks CwtchPeer to retry a server connection... // GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" diff --git a/model/constants/attributes.go b/model/constants/attributes.go index 443f747..782ecd3 100644 --- a/model/constants/attributes.go +++ b/model/constants/attributes.go @@ -57,6 +57,7 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime" const AttrLastConnectionTime = "last-connection-time" const PeerAutostart = "autostart" +const PeerAppearOffline = "appear-offline" const Archived = "archived" const ProfileStatus = "profile-status" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 50f8c33..185c1a1 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1042,6 +1042,14 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) { cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) } +func (cp *cwtchPeer) DisconnectFromPeer(onion string) { + cp.eventBus.Publish(event.NewEvent(event.DisconnectPeerRequest, map[event.Field]string{event.RemotePeer: onion})) +} + +func (cp *cwtchPeer) DisconnectFromServer(onion string) { + cp.eventBus.Publish(event.NewEvent(event.DisconnectServerRequest, map[event.Field]string{event.GroupServer: onion})) +} + // QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests // Status: Ready for 1.10 func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) { @@ -1179,28 +1187,38 @@ func (cp *cwtchPeer) ImportBundle(importString string) error { // JoinServer manages a new server connection with the given onion address func (cp *cwtchPeer) JoinServer(onion string) error { - ci, err := cp.FetchConversationInfo(onion) - if ci == nil || err != nil { + + // only connect to servers if the group experiment is enabled. + // note: there are additional checks throughout the app that minimize server interaction + // regardless, and we can only reach this point if groups experiment was at one point enabled + // TODO: this really belongs in an extension, but for legacy reasons groups are more tightly + // integrated into Cwtch. At some point, probably during hybrid groups implementation this + // API should be deprecated in favor of one with much stronger protections. + if cp.IsFeatureEnabled(constants.GroupsExperiment) { + ci, err := cp.FetchConversationInfo(onion) + if ci == nil || err != nil { + return errors.New("no keys found for server connection") + } + + //if cp.GetContact(onion) != nil { + tokenY, yExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypePrivacyPass))).ToString()] + tokenOnion, onionExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()] + if yExists && onionExists { + signature, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)).ToString()] + if !exists { + signature = base64.StdEncoding.EncodeToString([]byte{}) + } + cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") + if hasCachedTokens { + log.Debugf("using cached tokens for %v", ci.Handle) + } + + cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson})) + return nil + } return errors.New("no keys found for server connection") } - - //if cp.GetContact(onion) != nil { - tokenY, yExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypePrivacyPass))).ToString()] - tokenOnion, onionExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()] - if yExists && onionExists { - signature, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)).ToString()] - if !exists { - signature = base64.StdEncoding.EncodeToString([]byte{}) - } - cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") - if hasCachedTokens { - log.Debugf("using cached tokens for %v", ci.Handle) - } - - cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson})) - return nil - } - return errors.New("no keys found for server connection") + return errors.New("group experiment is not enabled") } // MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails... @@ -1316,7 +1334,8 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) { byRecent := cp.getConnectionsSortedByLastSeen(doPeers, doServers) log.Infof("StartConnections for %v", cp.GetOnion()) for _, conversation := range byRecent { - if conversation.model.IsServer() { + // only bother tracking servers if the experiment is enabled... + if conversation.model.IsServer() && cp.IsFeatureEnabled(constants.GroupsExperiment) { log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle) cp.QueueJoinServer(conversation.model.Handle) } else { diff --git a/peer/profile_interface.go b/peer/profile_interface.go index eda8130..9d70504 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -20,7 +20,9 @@ type ModifyPeeringState interface { BlockUnknownConnections() AllowUnknownConnections() PeerWithOnion(string) - JoinServer(string) error + QueueJoinServer(string) + DisconnectFromPeer(string) + DisconnectFromServer(string) } // ModifyContactsAndPeers is a meta-interface intended to restrict a call to reading and modifying contacts diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 1e4b81c..2d58238 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -122,6 +122,8 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue) engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue) + engine.eventManager.Subscribe(event.DisconnectPeerRequest, engine.queue) + engine.eventManager.Subscribe(event.DisconnectServerRequest, engine.queue) // File Handling engine.eventManager.Subscribe(event.ShareManifest, engine.queue) @@ -194,6 +196,10 @@ func (e *engine) eventHandler() { // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. e.authorizations.Delete(ev.Data[event.RemotePeer]) e.deleteConnection(onion) + case event.DisconnectPeerRequest: + e.deleteConnection(ev.Data[event.RemotePeer]) + case event.DisconnectServerRequest: + e.leaveServer(ev.Data[event.GroupServer]) case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])