Support Appear Offline / Disconnect from Server/Peer #531

Merged
sarah merged 4 commits from stable-blockers into master 2023-09-13 18:49:10 +00:00
11 changed files with 102 additions and 56 deletions

View File

@ -63,7 +63,7 @@ type Application interface {
QueryACNStatus() QueryACNStatus()
QueryACNVersion() QueryACNVersion()
ActivateEngines(doListn, doPeers, doServers bool) ConfigureConnections(onion string, doListn, doPeers, doServers bool)
Review

do we want to drop ActivatePeerEngine(onion string) then? or make this activatePeerEngine?

do we want to drop `ActivatePeerEngine(onion string)` then? or make this activatePeerEngine?
Review

ActivatePeerEngine initializes the actual engine, then configure the connections operates over that.

ActivatePeerEngine initializes the actual engine, then configure the connections operates over that.
ActivatePeerEngine(onion string) ActivatePeerEngine(onion string)
DeactivatePeerEngine(onion string) DeactivatePeerEngine(onion string)
@ -385,31 +385,6 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
return false return false
} }
// ActivateEngines launches all peer engines
func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
log.Debugf("ActivateEngines")
for _, profile := range app.peers {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
}
app.QueryACNStatus()
if doListen {
for _, profile := range app.peers {
log.Debugf(" Listen for %v", profile.GetOnion())
profile.Listen()
}
}
if doPeers || doServers {
for _, profile := range app.peers {
log.Debugf(" Start Connections for %v doPeers:%v doServers:%v", profile.GetOnion(), doPeers, doServers)
profile.StartConnections(doPeers, doServers)
}
}
}
// ActivatePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online // ActivatePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivatePeerEngine(onion string) { func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
@ -419,14 +394,24 @@ func (app *application) ActivatePeerEngine(onion string) {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus() app.QueryACNStatus()
if true {
profile.Listen()
}
profile.StartConnections(true, true)
} }
} }
} }
// ConfigureConnections autostarts the given kinds of connections.
func (app *application) ConfigureConnections(onion string, listen bool, peers bool, servers bool) {
profile := app.GetPeer(onion)
if profile != nil {
// enable the engine if it doesn't exist...
// note: this function is idempotent
app.ActivatePeerEngine(onion)
if listen {
profile.Listen()
}
profile.StartConnections(peers, servers)
}
}
// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline // DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
func (app *application) DeactivatePeerEngine(onion string) { func (app *application) DeactivatePeerEngine(onion string) {
if engine, exists := app.engines[onion]; exists { if engine, exists := app.engines[onion]; exists {
@ -494,9 +479,17 @@ func (app *application) eventHandler() {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart) autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
if !exists || autostart == "true" { if !exists || autostart == "true" {
app.ActivatePeerEngine(onion) app.ActivatePeerEngine(onion)
if appearOfflineExists && appearOffline == "true" {
// don't configure any connections...
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
} else {
app.ConfigureConnections(onion, true, true, true)
}
} }
} }
} }
} }

View File

@ -177,6 +177,8 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.ServerStateChange, cr.queue) cr.bus.Subscribe(event.ServerStateChange, cr.queue)
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
cr.bus.Subscribe(event.QueueJoinServer, cr.queue) cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
cr.bus.Subscribe(event.DisconnectPeerRequest, cr.queue)
cr.bus.Subscribe(event.DisconnectServerRequest, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue) cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
cr.bus.Subscribe(event.DeleteContact, cr.queue) cr.bus.Subscribe(event.DeleteContact, cr.queue)
@ -229,6 +231,12 @@ func (cr *contactRetry) run() {
select { select {
case e := <-cr.queue.OutChan(): case e := <-cr.queue.OutChan():
switch e.EventType { switch e.EventType {
case event.DisconnectPeerRequest:
peer := e.Data[event.RemotePeer]
cr.authorizedPeers.Delete(peer)
case event.DisconnectServerRequest:
peer := e.Data[event.GroupServer]
cr.authorizedPeers.Delete(peer)
case event.DeleteContact: case event.DeleteContact:
// this case covers both servers and peers (servers are peers, and go through the // this case covers both servers and peers (servers are peers, and go through the
// same delete conversation flow) // same delete conversation flow)

View File

@ -33,13 +33,13 @@ func TestContactRetryQueue(t *testing.T) {
// This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but // This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but
// go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make // go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make
// progress... // progress...
setup := false; setup := false
for !setup { for !setup {
if pinf, exists := cr.connections.Load(testOnion); exists { if pinf, exists := cr.connections.Load(testOnion); exists {
if pinf.(*contact).queued { if pinf.(*contact).queued {
if _, exists := cr.authorizedPeers.Load(testOnion); exists { if _, exists := cr.authorizedPeers.Load(testOnion); exists {
t.Logf("authorized") t.Logf("authorized")
setup = true; setup = true
} }
} }
} }

View File

@ -25,6 +25,11 @@ const (
// GroupServer // GroupServer
QueuePeerRequest = Type("QueuePeerRequest") QueuePeerRequest = Type("QueuePeerRequest")
// Disconnect*Request
// Close active connections and prevent new connections
DisconnectPeerRequest = Type("DisconnectPeerRequest")
DisconnectServerRequest = Type("DisconnectServerRequest")
// RetryServerRequest // RetryServerRequest
// Asks CwtchPeer to retry a server connection... // Asks CwtchPeer to retry a server connection...
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" // GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"

View File

@ -57,6 +57,7 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
const AttrLastConnectionTime = "last-connection-time" const AttrLastConnectionTime = "last-connection-time"
const PeerAutostart = "autostart" const PeerAutostart = "autostart"
const PeerAppearOffline = "appear-offline"
const Archived = "archived" const Archived = "archived"
const ProfileStatus = "profile-status" const ProfileStatus = "profile-status"

View File

@ -1042,6 +1042,14 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
} }
func (cp *cwtchPeer) DisconnectFromPeer(onion string) {
cp.eventBus.Publish(event.NewEvent(event.DisconnectPeerRequest, map[event.Field]string{event.RemotePeer: onion}))
}
func (cp *cwtchPeer) DisconnectFromServer(onion string) {
cp.eventBus.Publish(event.NewEvent(event.DisconnectServerRequest, map[event.Field]string{event.GroupServer: onion}))
}
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests // QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
// Status: Ready for 1.10 // Status: Ready for 1.10
func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) { func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
@ -1179,28 +1187,38 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
// JoinServer manages a new server connection with the given onion address // JoinServer manages a new server connection with the given onion address
func (cp *cwtchPeer) JoinServer(onion string) error { func (cp *cwtchPeer) JoinServer(onion string) error {
ci, err := cp.FetchConversationInfo(onion)
if ci == nil || err != nil { // only connect to servers if the group experiment is enabled.
// note: there are additional checks throughout the app that minimize server interaction
// regardless, and we can only reach this point if groups experiment was at one point enabled
// TODO: this really belongs in an extension, but for legacy reasons groups are more tightly
// integrated into Cwtch. At some point, probably during hybrid groups implementation this
// API should be deprecated in favor of one with much stronger protections.
if cp.IsFeatureEnabled(constants.GroupsExperiment) {
ci, err := cp.FetchConversationInfo(onion)
if ci == nil || err != nil {
return errors.New("no keys found for server connection")
}
//if cp.GetContact(onion) != nil {
tokenY, yExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypePrivacyPass))).ToString()]
tokenOnion, onionExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()]
if yExists && onionExists {
signature, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)).ToString()]
if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{})
}
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil
}
return errors.New("no keys found for server connection") return errors.New("no keys found for server connection")
} }
return errors.New("group experiment is not enabled")
//if cp.GetContact(onion) != nil {
tokenY, yExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypePrivacyPass))).ToString()]
tokenOnion, onionExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()]
if yExists && onionExists {
signature, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)).ToString()]
if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{})
}
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil
}
return errors.New("no keys found for server connection")
} }
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails... // MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
@ -1316,7 +1334,8 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
byRecent := cp.getConnectionsSortedByLastSeen(doPeers, doServers) byRecent := cp.getConnectionsSortedByLastSeen(doPeers, doServers)
log.Infof("StartConnections for %v", cp.GetOnion()) log.Infof("StartConnections for %v", cp.GetOnion())
for _, conversation := range byRecent { for _, conversation := range byRecent {
if conversation.model.IsServer() { // only bother tracking servers if the experiment is enabled...
if conversation.model.IsServer() && cp.IsFeatureEnabled(constants.GroupsExperiment) {
log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle) log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle)
cp.QueueJoinServer(conversation.model.Handle) cp.QueueJoinServer(conversation.model.Handle)
} else { } else {

View File

@ -20,7 +20,9 @@ type ModifyPeeringState interface {
BlockUnknownConnections() BlockUnknownConnections()
AllowUnknownConnections() AllowUnknownConnections()
PeerWithOnion(string) PeerWithOnion(string)
JoinServer(string) error QueueJoinServer(string)
DisconnectFromPeer(string)
DisconnectFromServer(string)
} }
// ModifyContactsAndPeers is a meta-interface intended to restrict a call to reading and modifying contacts // ModifyContactsAndPeers is a meta-interface intended to restrict a call to reading and modifying contacts

View File

@ -122,6 +122,8 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue) engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue)
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue) engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
engine.eventManager.Subscribe(event.DisconnectPeerRequest, engine.queue)
engine.eventManager.Subscribe(event.DisconnectServerRequest, engine.queue)
// File Handling // File Handling
engine.eventManager.Subscribe(event.ShareManifest, engine.queue) engine.eventManager.Subscribe(event.ShareManifest, engine.queue)
@ -194,6 +196,10 @@ func (e *engine) eventHandler() {
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
e.authorizations.Delete(ev.Data[event.RemotePeer]) e.authorizations.Delete(ev.Data[event.RemotePeer])
e.deleteConnection(onion) e.deleteConnection(onion)
case event.DisconnectPeerRequest:
e.deleteConnection(ev.Data[event.RemotePeer])
case event.DisconnectServerRequest:
e.leaveServer(ev.Data[event.GroupServer])
case event.SendMessageToGroup: case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])

View File

@ -114,9 +114,11 @@ func TestFileSharing(t *testing.T) {
t.Logf("** Waiting for Alice, Bob...") t.Logf("** Waiting for Alice, Bob...")
alice := app2.WaitGetPeer(app, "alice") alice := app2.WaitGetPeer(app, "alice")
app.ActivatePeerEngine(alice.GetOnion()) app.ActivatePeerEngine(alice.GetOnion())
app.ConfigureConnections(alice.GetOnion(), true, true, true);
bob := app2.WaitGetPeer(app, "bob") bob := app2.WaitGetPeer(app, "bob")
app.ActivatePeerEngine(bob.GetOnion()) app.ActivatePeerEngine(bob.GetOnion())
app.ConfigureConnections(bob.GetOnion(), true, true, true);
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})

View File

@ -150,6 +150,12 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostAppStart := runtime.NumGoroutine() numGoRoutinesPostAppStart := runtime.NumGoroutine()
// ***** cwtchPeer setup ***** // ***** cwtchPeer setup *****
// Turn on Groups Experiment...
settings := app.ReadSettings()
settings.ExperimentsEnabled = true
settings.Experiments[constants.GroupsExperiment] = true
app.UpdateSettings(settings)
log.Infoln("Creating Alice...") log.Infoln("Creating Alice...")
app.CreateProfile("Alice", "asdfasdf", true) app.CreateProfile("Alice", "asdfasdf", true)
@ -163,6 +169,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
alice := app2.WaitGetPeer(app, "Alice") alice := app2.WaitGetPeer(app, "Alice")
aliceBus := app.GetEventBus(alice.GetOnion()) aliceBus := app.GetEventBus(alice.GetOnion())
app.ActivatePeerEngine(alice.GetOnion()) app.ActivatePeerEngine(alice.GetOnion())
app.ConfigureConnections(alice.GetOnion(), true, true, true);
log.Infoln("Alice created:", alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion())
// alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity // alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
@ -170,6 +177,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
bob := app2.WaitGetPeer(app, "Bob") bob := app2.WaitGetPeer(app, "Bob")
bobBus := app.GetEventBus(bob.GetOnion()) bobBus := app.GetEventBus(bob.GetOnion())
app.ActivatePeerEngine(bob.GetOnion()) app.ActivatePeerEngine(bob.GetOnion())
app.ConfigureConnections(bob.GetOnion(), true, true, true);
log.Infoln("Bob created:", bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion())
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity // bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
@ -177,6 +185,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
carol := app2.WaitGetPeer(app, "Carol") carol := app2.WaitGetPeer(app, "Carol")
carolBus := app.GetEventBus(carol.GetOnion()) carolBus := app.GetEventBus(carol.GetOnion())
app.ActivatePeerEngine(carol.GetOnion()) app.ActivatePeerEngine(carol.GetOnion())
app.ConfigureConnections(carol.GetOnion(), true, true, true);
log.Infoln("Carol created:", carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion())
// carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity // carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})

View File

@ -116,9 +116,10 @@ func TestFileSharing(t *testing.T) {
t.Logf("** Waiting for Alice, Bob...") t.Logf("** Waiting for Alice, Bob...")
alice := app2.WaitGetPeer(app, "alice") alice := app2.WaitGetPeer(app, "alice")
app.ActivatePeerEngine(alice.GetOnion()) app.ActivatePeerEngine(alice.GetOnion())
app.ConfigureConnections(alice.GetOnion(), true, true, true);
bob := app2.WaitGetPeer(app, "bob") bob := app2.WaitGetPeer(app, "bob")
app.ActivatePeerEngine(bob.GetOnion()) app.ActivatePeerEngine(bob.GetOnion())
app.ConfigureConnections(bob.GetOnion(), true, true, true);
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})