forked from cwtch.im/cwtch
Merge pull request 'Support Appear Offline / Disconnect from Server/Peer' (#531) from stable-blockers into master
Reviewed-on: cwtch.im/cwtch#531 Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
commit
45d6d76a7d
51
app/app.go
51
app/app.go
|
@ -63,7 +63,7 @@ type Application interface {
|
|||
QueryACNStatus()
|
||||
QueryACNVersion()
|
||||
|
||||
ActivateEngines(doListn, doPeers, doServers bool)
|
||||
ConfigureConnections(onion string, doListn, doPeers, doServers bool)
|
||||
ActivatePeerEngine(onion string)
|
||||
DeactivatePeerEngine(onion string)
|
||||
|
||||
|
@ -385,31 +385,6 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// ActivateEngines launches all peer engines
|
||||
func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
|
||||
log.Debugf("ActivateEngines")
|
||||
|
||||
for _, profile := range app.peers {
|
||||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
|
||||
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
|
||||
}
|
||||
app.QueryACNStatus()
|
||||
|
||||
if doListen {
|
||||
for _, profile := range app.peers {
|
||||
log.Debugf(" Listen for %v", profile.GetOnion())
|
||||
profile.Listen()
|
||||
}
|
||||
}
|
||||
|
||||
if doPeers || doServers {
|
||||
for _, profile := range app.peers {
|
||||
log.Debugf(" Start Connections for %v doPeers:%v doServers:%v", profile.GetOnion(), doPeers, doServers)
|
||||
profile.StartConnections(doPeers, doServers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ActivatePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
||||
func (app *application) ActivatePeerEngine(onion string) {
|
||||
profile := app.GetPeer(onion)
|
||||
|
@ -419,11 +394,21 @@ func (app *application) ActivatePeerEngine(onion string) {
|
|||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
|
||||
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
|
||||
app.QueryACNStatus()
|
||||
if true {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ConfigureConnections autostarts the given kinds of connections.
|
||||
func (app *application) ConfigureConnections(onion string, listen bool, peers bool, servers bool) {
|
||||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
// enable the engine if it doesn't exist...
|
||||
// note: this function is idempotent
|
||||
app.ActivatePeerEngine(onion)
|
||||
if listen {
|
||||
profile.Listen()
|
||||
}
|
||||
profile.StartConnections(true, true)
|
||||
}
|
||||
profile.StartConnections(peers, servers)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -494,10 +479,18 @@ func (app *application) eventHandler() {
|
|||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
|
||||
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
|
||||
if !exists || autostart == "true" {
|
||||
app.ActivatePeerEngine(onion)
|
||||
if appearOfflineExists && appearOffline == "true" {
|
||||
// don't configure any connections...
|
||||
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
|
||||
} else {
|
||||
app.ConfigureConnections(onion, true, true, true)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -177,6 +177,8 @@ func (cr *contactRetry) run() {
|
|||
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
||||
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
|
||||
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
|
||||
cr.bus.Subscribe(event.DisconnectPeerRequest, cr.queue)
|
||||
cr.bus.Subscribe(event.DisconnectServerRequest, cr.queue)
|
||||
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
|
||||
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
|
||||
cr.bus.Subscribe(event.DeleteContact, cr.queue)
|
||||
|
@ -229,6 +231,12 @@ func (cr *contactRetry) run() {
|
|||
select {
|
||||
case e := <-cr.queue.OutChan():
|
||||
switch e.EventType {
|
||||
case event.DisconnectPeerRequest:
|
||||
peer := e.Data[event.RemotePeer]
|
||||
cr.authorizedPeers.Delete(peer)
|
||||
case event.DisconnectServerRequest:
|
||||
peer := e.Data[event.GroupServer]
|
||||
cr.authorizedPeers.Delete(peer)
|
||||
case event.DeleteContact:
|
||||
// this case covers both servers and peers (servers are peers, and go through the
|
||||
// same delete conversation flow)
|
||||
|
|
|
@ -33,13 +33,13 @@ func TestContactRetryQueue(t *testing.T) {
|
|||
// This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but
|
||||
// go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make
|
||||
// progress...
|
||||
setup := false;
|
||||
setup := false
|
||||
for !setup {
|
||||
if pinf, exists := cr.connections.Load(testOnion); exists {
|
||||
if pinf.(*contact).queued {
|
||||
if _, exists := cr.authorizedPeers.Load(testOnion); exists {
|
||||
t.Logf("authorized")
|
||||
setup = true;
|
||||
setup = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,11 @@ const (
|
|||
// GroupServer
|
||||
QueuePeerRequest = Type("QueuePeerRequest")
|
||||
|
||||
// Disconnect*Request
|
||||
// Close active connections and prevent new connections
|
||||
DisconnectPeerRequest = Type("DisconnectPeerRequest")
|
||||
DisconnectServerRequest = Type("DisconnectServerRequest")
|
||||
|
||||
// RetryServerRequest
|
||||
// Asks CwtchPeer to retry a server connection...
|
||||
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
||||
|
|
|
@ -57,6 +57,7 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
|
|||
|
||||
const AttrLastConnectionTime = "last-connection-time"
|
||||
const PeerAutostart = "autostart"
|
||||
const PeerAppearOffline = "appear-offline"
|
||||
const Archived = "archived"
|
||||
|
||||
const ProfileStatus = "profile-status"
|
||||
|
|
|
@ -1042,6 +1042,14 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
|||
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) DisconnectFromPeer(onion string) {
|
||||
cp.eventBus.Publish(event.NewEvent(event.DisconnectPeerRequest, map[event.Field]string{event.RemotePeer: onion}))
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) DisconnectFromServer(onion string) {
|
||||
cp.eventBus.Publish(event.NewEvent(event.DisconnectServerRequest, map[event.Field]string{event.GroupServer: onion}))
|
||||
}
|
||||
|
||||
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
|
||||
// Status: Ready for 1.10
|
||||
func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
|
||||
|
@ -1179,6 +1187,14 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
|
|||
|
||||
// JoinServer manages a new server connection with the given onion address
|
||||
func (cp *cwtchPeer) JoinServer(onion string) error {
|
||||
|
||||
// only connect to servers if the group experiment is enabled.
|
||||
// note: there are additional checks throughout the app that minimize server interaction
|
||||
// regardless, and we can only reach this point if groups experiment was at one point enabled
|
||||
// TODO: this really belongs in an extension, but for legacy reasons groups are more tightly
|
||||
// integrated into Cwtch. At some point, probably during hybrid groups implementation this
|
||||
// API should be deprecated in favor of one with much stronger protections.
|
||||
if cp.IsFeatureEnabled(constants.GroupsExperiment) {
|
||||
ci, err := cp.FetchConversationInfo(onion)
|
||||
if ci == nil || err != nil {
|
||||
return errors.New("no keys found for server connection")
|
||||
|
@ -1201,6 +1217,8 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
|||
return nil
|
||||
}
|
||||
return errors.New("no keys found for server connection")
|
||||
}
|
||||
return errors.New("group experiment is not enabled")
|
||||
}
|
||||
|
||||
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
|
||||
|
@ -1316,7 +1334,8 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
|
|||
byRecent := cp.getConnectionsSortedByLastSeen(doPeers, doServers)
|
||||
log.Infof("StartConnections for %v", cp.GetOnion())
|
||||
for _, conversation := range byRecent {
|
||||
if conversation.model.IsServer() {
|
||||
// only bother tracking servers if the experiment is enabled...
|
||||
if conversation.model.IsServer() && cp.IsFeatureEnabled(constants.GroupsExperiment) {
|
||||
log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle)
|
||||
cp.QueueJoinServer(conversation.model.Handle)
|
||||
} else {
|
||||
|
|
|
@ -20,7 +20,9 @@ type ModifyPeeringState interface {
|
|||
BlockUnknownConnections()
|
||||
AllowUnknownConnections()
|
||||
PeerWithOnion(string)
|
||||
JoinServer(string) error
|
||||
QueueJoinServer(string)
|
||||
DisconnectFromPeer(string)
|
||||
DisconnectFromServer(string)
|
||||
}
|
||||
|
||||
// ModifyContactsAndPeers is a meta-interface intended to restrict a call to reading and modifying contacts
|
||||
|
|
|
@ -122,6 +122,8 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
|||
engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue)
|
||||
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
|
||||
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
|
||||
engine.eventManager.Subscribe(event.DisconnectPeerRequest, engine.queue)
|
||||
engine.eventManager.Subscribe(event.DisconnectServerRequest, engine.queue)
|
||||
|
||||
// File Handling
|
||||
engine.eventManager.Subscribe(event.ShareManifest, engine.queue)
|
||||
|
@ -194,6 +196,10 @@ func (e *engine) eventHandler() {
|
|||
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
|
||||
e.authorizations.Delete(ev.Data[event.RemotePeer])
|
||||
e.deleteConnection(onion)
|
||||
case event.DisconnectPeerRequest:
|
||||
e.deleteConnection(ev.Data[event.RemotePeer])
|
||||
case event.DisconnectServerRequest:
|
||||
e.leaveServer(ev.Data[event.GroupServer])
|
||||
case event.SendMessageToGroup:
|
||||
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
|
|
|
@ -114,8 +114,10 @@ func TestFileSharing(t *testing.T) {
|
|||
t.Logf("** Waiting for Alice, Bob...")
|
||||
alice := app2.WaitGetPeer(app, "alice")
|
||||
app.ActivatePeerEngine(alice.GetOnion())
|
||||
app.ConfigureConnections(alice.GetOnion(), true, true, true);
|
||||
bob := app2.WaitGetPeer(app, "bob")
|
||||
app.ActivatePeerEngine(bob.GetOnion())
|
||||
app.ConfigureConnections(bob.GetOnion(), true, true, true);
|
||||
|
||||
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
|
||||
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
|
||||
|
|
|
@ -150,6 +150,12 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
numGoRoutinesPostAppStart := runtime.NumGoroutine()
|
||||
|
||||
// ***** cwtchPeer setup *****
|
||||
// Turn on Groups Experiment...
|
||||
settings := app.ReadSettings()
|
||||
settings.ExperimentsEnabled = true
|
||||
settings.Experiments[constants.GroupsExperiment] = true
|
||||
app.UpdateSettings(settings)
|
||||
|
||||
|
||||
log.Infoln("Creating Alice...")
|
||||
app.CreateProfile("Alice", "asdfasdf", true)
|
||||
|
@ -163,6 +169,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
alice := app2.WaitGetPeer(app, "Alice")
|
||||
aliceBus := app.GetEventBus(alice.GetOnion())
|
||||
app.ActivatePeerEngine(alice.GetOnion())
|
||||
app.ConfigureConnections(alice.GetOnion(), true, true, true);
|
||||
log.Infoln("Alice created:", alice.GetOnion())
|
||||
// alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
|
@ -170,6 +177,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
bob := app2.WaitGetPeer(app, "Bob")
|
||||
bobBus := app.GetEventBus(bob.GetOnion())
|
||||
app.ActivatePeerEngine(bob.GetOnion())
|
||||
app.ConfigureConnections(bob.GetOnion(), true, true, true);
|
||||
log.Infoln("Bob created:", bob.GetOnion())
|
||||
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
|
@ -177,6 +185,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
carol := app2.WaitGetPeer(app, "Carol")
|
||||
carolBus := app.GetEventBus(carol.GetOnion())
|
||||
app.ActivatePeerEngine(carol.GetOnion())
|
||||
app.ConfigureConnections(carol.GetOnion(), true, true, true);
|
||||
log.Infoln("Carol created:", carol.GetOnion())
|
||||
// carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
|
|
|
@ -116,9 +116,10 @@ func TestFileSharing(t *testing.T) {
|
|||
t.Logf("** Waiting for Alice, Bob...")
|
||||
alice := app2.WaitGetPeer(app, "alice")
|
||||
app.ActivatePeerEngine(alice.GetOnion())
|
||||
app.ConfigureConnections(alice.GetOnion(), true, true, true);
|
||||
bob := app2.WaitGetPeer(app, "bob")
|
||||
app.ActivatePeerEngine(bob.GetOnion())
|
||||
|
||||
app.ConfigureConnections(bob.GetOnion(), true, true, true);
|
||||
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
|
||||
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
|
||||
|
||||
|
|
Loading…
Reference in New Issue