Support Appear Offline / Disconnect from Server/Peer
continuous-integration/drone/pr Build was killed Details

This commit is contained in:
Sarah Jamie Lewis 2023-09-13 10:01:00 -07:00
parent 7464e3922d
commit e311301d72
8 changed files with 88 additions and 54 deletions

View File

@ -63,7 +63,7 @@ type Application interface {
QueryACNStatus()
QueryACNVersion()
ActivateEngines(doListn, doPeers, doServers bool)
ConfigureConnections(onion string, doListn, doPeers, doServers bool)
ActivatePeerEngine(onion string)
DeactivatePeerEngine(onion string)
@ -385,31 +385,6 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
return false
}
// ActivateEngines launches all peer engines
func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
log.Debugf("ActivateEngines")
for _, profile := range app.peers {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
}
app.QueryACNStatus()
if doListen {
for _, profile := range app.peers {
log.Debugf(" Listen for %v", profile.GetOnion())
profile.Listen()
}
}
if doPeers || doServers {
for _, profile := range app.peers {
log.Debugf(" Start Connections for %v doPeers:%v doServers:%v", profile.GetOnion(), doPeers, doServers)
profile.StartConnections(doPeers, doServers)
}
}
}
// ActivatePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion)
@ -419,14 +394,24 @@ func (app *application) ActivatePeerEngine(onion string) {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()
if true {
profile.Listen()
}
profile.StartConnections(true, true)
}
}
}
// ConfigureConnections autostarts the given kinds of connections.
func (app *application) ConfigureConnections(onion string, listen bool, peers bool, servers bool) {
profile := app.GetPeer(onion)
if profile != nil {
// enable the engine if it doesn't exist...
// note: this function is idempotent
app.ActivatePeerEngine(onion)
if listen {
profile.Listen()
}
profile.StartConnections(peers, servers)
}
}
// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
func (app *application) DeactivatePeerEngine(onion string) {
if engine, exists := app.engines[onion]; exists {
@ -494,9 +479,17 @@ func (app *application) eventHandler() {
profile := app.GetPeer(onion)
if profile != nil {
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
if !exists || autostart == "true" {
app.ActivatePeerEngine(onion)
if appearOfflineExists && appearOffline == "true" {
// don't configure any connections...
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
} else {
app.ConfigureConnections(onion, true, true, true)
}
}
}
}
}

View File

@ -177,6 +177,8 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
cr.bus.Subscribe(event.DisconnectPeerRequest, cr.queue)
cr.bus.Subscribe(event.DisconnectServerRequest, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
cr.bus.Subscribe(event.DeleteContact, cr.queue)
@ -229,6 +231,12 @@ func (cr *contactRetry) run() {
select {
case e := <-cr.queue.OutChan():
switch e.EventType {
case event.DisconnectPeerRequest:
peer := e.Data[event.RemotePeer]
cr.authorizedPeers.Delete(peer)
case event.DisconnectServerRequest:
peer := e.Data[event.GroupServer]
cr.authorizedPeers.Delete(peer)
case event.DeleteContact:
// this case covers both servers and peers (servers are peers, and go through the
// same delete conversation flow)

View File

@ -33,13 +33,13 @@ func TestContactRetryQueue(t *testing.T) {
// This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but
// go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make
// progress...
setup := false;
setup := false
for !setup {
if pinf, exists := cr.connections.Load(testOnion); exists {
if pinf.(*contact).queued {
if _, exists := cr.authorizedPeers.Load(testOnion); exists {
t.Logf("authorized")
setup = true;
setup = true
}
}
}

View File

@ -25,6 +25,11 @@ const (
// GroupServer
QueuePeerRequest = Type("QueuePeerRequest")
// Disconnect*Request
// Close active connections and prevent new connections
DisconnectPeerRequest = Type("DisconnectPeerRequest")
DisconnectServerRequest = Type("DisconnectServerRequest")
// RetryServerRequest
// Asks CwtchPeer to retry a server connection...
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"

View File

@ -57,6 +57,7 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
const AttrLastConnectionTime = "last-connection-time"
const PeerAutostart = "autostart"
const PeerAppearOffline = "appear-offline"
const Archived = "archived"
const ProfileStatus = "profile-status"

View File

@ -1042,6 +1042,14 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
}
func (cp *cwtchPeer) DisconnectFromPeer(onion string) {
cp.eventBus.Publish(event.NewEvent(event.DisconnectPeerRequest, map[event.Field]string{event.RemotePeer: onion}))
}
func (cp *cwtchPeer) DisconnectFromServer(onion string) {
cp.eventBus.Publish(event.NewEvent(event.DisconnectServerRequest, map[event.Field]string{event.GroupServer: onion}))
}
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
// Status: Ready for 1.10
func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
@ -1179,28 +1187,38 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
// JoinServer manages a new server connection with the given onion address
func (cp *cwtchPeer) JoinServer(onion string) error {
ci, err := cp.FetchConversationInfo(onion)
if ci == nil || err != nil {
// only connect to servers if the group experiment is enabled.
// note: there are additional checks throughout the app that minimize server interaction
// regardless, and we can only reach this point if groups experiment was at one point enabled
// TODO: this really belongs in an extension, but for legacy reasons groups are more tightly
// integrated into Cwtch. At some point, probably during hybrid groups implementation this
// API should be deprecated in favor of one with much stronger protections.
if cp.IsFeatureEnabled(constants.GroupsExperiment) {
ci, err := cp.FetchConversationInfo(onion)
if ci == nil || err != nil {
return errors.New("no keys found for server connection")
}
//if cp.GetContact(onion) != nil {
tokenY, yExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypePrivacyPass))).ToString()]
tokenOnion, onionExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()]
if yExists && onionExists {
signature, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)).ToString()]
if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{})
}
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil
}
return errors.New("no keys found for server connection")
}
//if cp.GetContact(onion) != nil {
tokenY, yExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypePrivacyPass))).ToString()]
tokenOnion, onionExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()]
if yExists && onionExists {
signature, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)).ToString()]
if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{})
}
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil
}
return errors.New("no keys found for server connection")
return errors.New("group experiment is not enabled")
}
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
@ -1316,7 +1334,8 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
byRecent := cp.getConnectionsSortedByLastSeen(doPeers, doServers)
log.Infof("StartConnections for %v", cp.GetOnion())
for _, conversation := range byRecent {
if conversation.model.IsServer() {
// only bother tracking servers if the experiment is enabled...
if conversation.model.IsServer() && cp.IsFeatureEnabled(constants.GroupsExperiment) {
log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle)
cp.QueueJoinServer(conversation.model.Handle)
} else {

View File

@ -20,7 +20,9 @@ type ModifyPeeringState interface {
BlockUnknownConnections()
AllowUnknownConnections()
PeerWithOnion(string)
JoinServer(string) error
QueueJoinServer(string)
DisconnectFromPeer(string)
DisconnectFromServer(string)
}
// ModifyContactsAndPeers is a meta-interface intended to restrict a call to reading and modifying contacts

View File

@ -122,6 +122,8 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue)
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
engine.eventManager.Subscribe(event.DisconnectPeerRequest, engine.queue)
engine.eventManager.Subscribe(event.DisconnectServerRequest, engine.queue)
// File Handling
engine.eventManager.Subscribe(event.ShareManifest, engine.queue)
@ -194,6 +196,10 @@ func (e *engine) eventHandler() {
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
e.authorizations.Delete(ev.Data[event.RemotePeer])
e.deleteConnection(onion)
case event.DisconnectPeerRequest:
e.deleteConnection(ev.Data[event.RemotePeer])
case event.DisconnectServerRequest:
e.leaveServer(ev.Data[event.GroupServer])
case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])