From d455eb64771b9741010b83612fb7c25829d572c0 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sat, 10 Sep 2022 10:15:32 -0700 Subject: [PATCH 1/4] Fix Issues with Antispam triggering / Add explicit timeout calls for group servers / token aquisition and optimistic closing for peers --- app/app.go | 6 +-- app/plugins/antispam.go | 43 +++++++++++++++++++++ app/plugins/plugin.go | 3 ++ event/common.go | 4 +- peer/cwtch_peer.go | 27 ++++++++++++- protocol/connections/engine.go | 30 ++++++++++++++ protocol/connections/peerapp.go | 2 + protocol/connections/tokenboardclientapp.go | 40 ++++++++++--------- 8 files changed, 130 insertions(+), 25 deletions(-) create mode 100644 app/plugins/antispam.go diff --git a/app/app.go b/app/app.go index 2d403d3..3e3f9d4 100644 --- a/app/app.go +++ b/app/app.go @@ -241,7 +241,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { return false } -/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online +// / ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { profile := app.GetPeer(onion) if profile != nil { @@ -258,7 +258,7 @@ func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServ } } -/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline +// / DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline func (app *application) DeactivatePeerEngine(onion string) { if engine, exists := app.engines[onion]; exists { engine.Shutdown() @@ -316,7 +316,7 @@ func (app *application) ShutdownPeer(onion string) { app.shutdownPeer(onion) } -/// shutdownPeer mutex unlocked helper shutdown peer +// / shutdownPeer mutex unlocked helper shutdown peer func (app *application) shutdownPeer(onion string) { app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) app.eventBuses[onion].Shutdown() diff --git a/app/plugins/antispam.go b/app/plugins/antispam.go new file mode 100644 index 0000000..467137d --- /dev/null +++ b/app/plugins/antispam.go @@ -0,0 +1,43 @@ +package plugins + +import ( + "cwtch.im/cwtch/event" + "git.openprivacy.ca/openprivacy/log" + "time" +) + +const antispamTickTime = 30 * time.Second + +type antispam struct { + bus event.Manager + queue event.Queue + breakChan chan bool +} + +func (a *antispam) Start() { + go a.run() +} + +func (a antispam) Shutdown() { + a.breakChan <- true +} + +func (a *antispam) run() { + log.Infof("running antispam trigger plugin") + for { + select { + case <-time.After(antispamTickTime): + // no fuss, just trigger the check. Downstream will filter out superfluous actions + a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{})) + continue + case <-a.breakChan: + return + } + } +} + +// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval +func NewAntiSpam(bus event.Manager) Plugin { + cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)} + return cr +} diff --git a/app/plugins/plugin.go b/app/plugins/plugin.go index f5a9915..7ebf773 100644 --- a/app/plugins/plugin.go +++ b/app/plugins/plugin.go @@ -13,6 +13,7 @@ type PluginID int const ( CONNECTIONRETRY PluginID = iota NETWORKCHECK + ANTISPAM ) // Plugin is the interface for a plugin @@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl return NewConnectionRetry(bus, onion), nil case NETWORKCHECK: return NewNetworkCheck(onion, bus, acn), nil + case ANTISPAM: + return NewAntiSpam(bus), nil } return nil, fmt.Errorf("plugin not defined %v", id) diff --git a/event/common.go b/event/common.go index 9fa02bf..9b9f975 100644 --- a/event/common.go +++ b/event/common.go @@ -201,7 +201,9 @@ const ( StartingStorageMiragtion = Type("StartingStorageMigration") DoneStorageMigration = Type("DoneStorageMigration") - TokenManagerInfo = Type("TokenManagerInfo") + TokenManagerInfo = Type("TokenManagerInfo") + TriggerAntispamCheck = Type("TriggerAntispamCheck") + MakeAntispamPayment = Type("MakeAntispamPayment") ) // Field defines common event attributes diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 03b45fb..5d8771f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -14,6 +14,7 @@ import ( "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" "math/bits" + rand2 "math/rand" "os" path "path/filepath" "runtime" @@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, - event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true} + event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true} // DefaultEventsToHandle specifies which events will be subscribed to // @@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{ event.NewRetValMessageFromPeer, event.ManifestReceived, event.FileDownloaded, + event.TriggerAntispamCheck, } // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer @@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState { // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // address. func (cp *cwtchPeer) PeerWithOnion(onion string) { - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) + go func() { + // wait a random number of seconds before triggering + // this cuts down on contention in the event + randWait := time.Duration(rand2.Int() % 60) + time.Sleep(randWait * time.Second) + cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) + }() } // SendInviteToConversation kicks off the invite process @@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error { return errors.New("no keys found for server connection") } +// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails... +// TODO in the future we might want to expose this in CwtchPeer interface +// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down +// on the number of events (right now it should be minimal) +func (cp *cwtchPeer) MakeAntispamPayment(server string) { + cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server})) +} + // ResyncServer completely tears down and resyncs a new server connection with the given handle func (cp *cwtchPeer) ResyncServer(handle string) error { ci, err := cp.FetchConversationInfo(handle) @@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() { cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) } } + case event.TriggerAntispamCheck: + conversations, _ := cp.FetchConversations() + for _, conversation := range conversations { + if conversation.IsServer() { + cp.MakeAntispamPayment(conversation.Handle) + } + } default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index fc29319..0d1f8d3 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) + // Token Server + engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue) + for peer, authorization := range peerAuthorizations { engine.authorizations.Store(peer, authorization) } @@ -160,6 +163,8 @@ func (e *engine) eventHandler() { signature = []byte{} } go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + case event.MakeAntispamPayment: + go e.makeAntispamPayment(ev.Data[event.GroupServer]) case event.LeaveServer: e.leaveServer(ev.Data[event.GroupServer]) case event.DeleteContact: @@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) { } } +func (e *engine) makeAntispamPayment(onion string) { + log.Debugf("making antispam payment") + e.ephemeralServicesLock.Lock() + ephemeralService, ok := e.ephemeralServices[onion] + e.ephemeralServicesLock.Unlock() + + if ephemeralService == nil || !ok { + log.Debugf("could not find associated group for antispam payment") + return + } + + conn, err := ephemeralService.service.GetConnection(onion) + if err == nil { + tokenApp, ok := (conn.App()).(*TokenBoardClient) + if ok { + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) + tokenManager := tokenManagerPointer.(*TokenManager) + log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) + if tokenManager.NumTokens() < 5 { + go tokenApp.MakePayment() + } + } + } +} + // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index dc73290..edd7932 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) { } } else { // The auth protocol wasn't completed, we can safely shutdown the connection + // send an onclose here because we *may* have triggered this and we want to retry later... + pa.OnClose(connection.Hostname()) connection.Close() } } diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 8a12f23..0bbc59a 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -2,8 +2,8 @@ package connections import ( "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/utils" "encoding/json" - "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/networks/tor" @@ -191,28 +191,30 @@ func (ta *TokenBoardClient) MakePayment() error { connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp) if connected && err == nil { log.Debugf("Waiting for successful Token Acquisition...") - conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) - if err == nil { - powtapp, ok := conn.App().(*applications.TokenApplication) - if ok { - log.Debugf("Updating Tokens") - ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens) - log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) - conn.Close() + tp := utils.TimeoutPolicy(time.Second * 30) + err := tp.ExecuteAction(func() error { + conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) + if err == nil { + powtapp, ok := conn.App().(*applications.TokenApplication) + if ok { + log.Debugf("Updating Tokens") + ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens) + log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) + conn.Close() + return nil + } + log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App())) return nil } - log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App())) - return errors.New("invalid cast of powapp. this should never happen") + return nil + }) + + // we timed out + if err != nil { + ta.connection.Close() } - log.Debugf("could not connect to payment server..trying again: %v", err) - return ta.MakePayment() - } else if connected && err != nil { - log.Debugf("inexplicable error: %v", err) } - log.Debugf("failed to make a connection. trying again...") - // it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice - time.Sleep(time.Second) - return ta.MakePayment() + return err } // NextToken retrieves the next token From 27cec93ad75a5717c9ffb573118b77487c8b2077 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sat, 10 Sep 2022 10:33:17 -0700 Subject: [PATCH 2/4] Adjust contact retry --- app/plugins/contactRetry.go | 240 ++++++++++++++++++------------------ 1 file changed, 120 insertions(+), 120 deletions(-) diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index b4274ab..0cdeaa9 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -1,166 +1,166 @@ package plugins import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/protocol/connections" - "git.openprivacy.ca/openprivacy/log" - "sync" - "time" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/openprivacy/log" + "sync" + "time" ) -const tickTime = 5 * time.Second -const maxBackoff int = 64 // 320 seconds or ~5 min +const tickTime = 30 * time.Second +const maxBackoff int = 10 // 320 seconds or ~5 min type connectionType int const ( - peerConn connectionType = iota - serverConn + peerConn connectionType = iota + serverConn ) type contact struct { - id string - state connections.ConnectionState - ctype connectionType + id string + state connections.ConnectionState + ctype connectionType - ticks int - backoff int + ticks int + backoff int } type contactRetry struct { - bus event.Manager - queue event.Queue - networkUp bool - running bool - breakChan chan bool - onion string - lastCheck time.Time + bus event.Manager + queue event.Queue + networkUp bool + running bool + breakChan chan bool + onion string + lastCheck time.Time - connections sync.Map //[string]*contact + connections sync.Map //[string]*contact } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} - return cr + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} + return cr } func (cr *contactRetry) Start() { - if !cr.running { - go cr.run() - } else { - log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) - } + if !cr.running { + go cr.run() + } else { + log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) + } } func (cr *contactRetry) run() { - cr.running = true - cr.bus.Subscribe(event.PeerStateChange, cr.queue) - cr.bus.Subscribe(event.ACNStatus, cr.queue) - cr.bus.Subscribe(event.ServerStateChange, cr.queue) + cr.running = true + cr.bus.Subscribe(event.PeerStateChange, cr.queue) + cr.bus.Subscribe(event.ACNStatus, cr.queue) + cr.bus.Subscribe(event.ServerStateChange, cr.queue) - for { - if time.Since(cr.lastCheck) > tickTime { - cr.retryDisconnected() - cr.lastCheck = time.Now() - } - select { - case e := <-cr.queue.OutChan(): - switch e.EventType { - case event.PeerStateChange: - state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] - peer := e.Data[event.RemotePeer] - cr.handleEvent(peer, state, peerConn) + for { + if time.Since(cr.lastCheck) > tickTime { + cr.retryDisconnected() + cr.lastCheck = time.Now() + } + select { + case e := <-cr.queue.OutChan(): + switch e.EventType { + case event.PeerStateChange: + state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] + peer := e.Data[event.RemotePeer] + cr.handleEvent(peer, state, peerConn) - case event.ServerStateChange: - state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] - server := e.Data[event.GroupServer] - cr.handleEvent(server, state, serverConn) + case event.ServerStateChange: + state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] + server := e.Data[event.GroupServer] + cr.handleEvent(server, state, serverConn) - case event.ACNStatus: - prog := e.Data[event.Progress] - if prog == "100" && !cr.networkUp { - cr.networkUp = true - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) - p.ticks = 0 - p.backoff = 1 - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - } - return true - }) - } else if prog != "100" { - cr.networkUp = false - } - } + case event.ACNStatus: + prog := e.Data[event.Progress] + if prog == "100" && !cr.networkUp { + cr.networkUp = true + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + p.ticks = 0 + p.backoff = 1 + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + return true + }) + } else if prog != "100" { + cr.networkUp = false + } + } - case <-time.After(tickTime): - continue + case <-time.After(tickTime): + continue - case <-cr.breakChan: - cr.running = false - return - } - } + case <-cr.breakChan: + cr.running = false + return + } + } } func (cr *contactRetry) retryDisconnected() { - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) - if p.state == connections.DISCONNECTED { - p.ticks++ - if p.ticks >= p.backoff { - p.ticks = 0 - if cr.networkUp { - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - } - } - } - } - return true - }) + if p.state == connections.DISCONNECTED { + p.ticks++ + if p.ticks >= p.backoff { + p.ticks = 0 + if cr.networkUp { + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + } + } + } + return true + }) } func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { - // don't handle contact retries for ourselves - if id == cr.onion { - return - } + // don't handle contact retries for ourselves + if id == cr.onion { + return + } - if _, exists := cr.connections.Load(id); !exists { - p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} - cr.connections.Store(id, p) - return - } + if _, exists := cr.connections.Load(id); !exists { + p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} + cr.connections.Store(id, p) + return + } - pinf, _ := cr.connections.Load(id) - p := pinf.(*contact) - if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { - p.state = connections.DISCONNECTED - if p.backoff == 0 { - p.backoff = 1 - } else if p.backoff < maxBackoff { - p.backoff *= 2 - } - p.ticks = 0 - } else if state == connections.CONNECTING || state == connections.CONNECTED { - p.state = state - } else if state == connections.AUTHENTICATED { - p.state = state - p.backoff = 0 - } + pinf, _ := cr.connections.Load(id) + p := pinf.(*contact) + if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { + p.state = connections.DISCONNECTED + if p.backoff == 0 { + p.backoff = 1 + } else if p.backoff < maxBackoff { + p.backoff *= 2 + } + p.ticks = 0 + } else if state == connections.CONNECTING || state == connections.CONNECTED { + p.state = state + } else if state == connections.AUTHENTICATED { + p.state = state + p.backoff = 0 + } } func (cr *contactRetry) Shutdown() { - cr.breakChan <- true + cr.breakChan <- true } From 0f4c6de2e65b46ec60519e094cbefeb5b4309699 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sat, 10 Sep 2022 10:34:36 -0700 Subject: [PATCH 3/4] quality --- app/plugins/contactRetry.go | 236 ++++++++++++++++++------------------ 1 file changed, 118 insertions(+), 118 deletions(-) diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 0cdeaa9..08d7211 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -1,11 +1,11 @@ package plugins import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/protocol/connections" - "git.openprivacy.ca/openprivacy/log" - "sync" - "time" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/openprivacy/log" + "sync" + "time" ) const tickTime = 30 * time.Second @@ -14,153 +14,153 @@ const maxBackoff int = 10 // 320 seconds or ~5 min type connectionType int const ( - peerConn connectionType = iota - serverConn + peerConn connectionType = iota + serverConn ) type contact struct { - id string - state connections.ConnectionState - ctype connectionType + id string + state connections.ConnectionState + ctype connectionType - ticks int - backoff int + ticks int + backoff int } type contactRetry struct { - bus event.Manager - queue event.Queue - networkUp bool - running bool - breakChan chan bool - onion string - lastCheck time.Time + bus event.Manager + queue event.Queue + networkUp bool + running bool + breakChan chan bool + onion string + lastCheck time.Time - connections sync.Map //[string]*contact + connections sync.Map //[string]*contact } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} - return cr + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} + return cr } func (cr *contactRetry) Start() { - if !cr.running { - go cr.run() - } else { - log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) - } + if !cr.running { + go cr.run() + } else { + log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) + } } func (cr *contactRetry) run() { - cr.running = true - cr.bus.Subscribe(event.PeerStateChange, cr.queue) - cr.bus.Subscribe(event.ACNStatus, cr.queue) - cr.bus.Subscribe(event.ServerStateChange, cr.queue) + cr.running = true + cr.bus.Subscribe(event.PeerStateChange, cr.queue) + cr.bus.Subscribe(event.ACNStatus, cr.queue) + cr.bus.Subscribe(event.ServerStateChange, cr.queue) - for { - if time.Since(cr.lastCheck) > tickTime { - cr.retryDisconnected() - cr.lastCheck = time.Now() - } - select { - case e := <-cr.queue.OutChan(): - switch e.EventType { - case event.PeerStateChange: - state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] - peer := e.Data[event.RemotePeer] - cr.handleEvent(peer, state, peerConn) + for { + if time.Since(cr.lastCheck) > tickTime { + cr.retryDisconnected() + cr.lastCheck = time.Now() + } + select { + case e := <-cr.queue.OutChan(): + switch e.EventType { + case event.PeerStateChange: + state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] + peer := e.Data[event.RemotePeer] + cr.handleEvent(peer, state, peerConn) - case event.ServerStateChange: - state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] - server := e.Data[event.GroupServer] - cr.handleEvent(server, state, serverConn) + case event.ServerStateChange: + state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] + server := e.Data[event.GroupServer] + cr.handleEvent(server, state, serverConn) - case event.ACNStatus: - prog := e.Data[event.Progress] - if prog == "100" && !cr.networkUp { - cr.networkUp = true - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) - p.ticks = 0 - p.backoff = 1 - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - } - return true - }) - } else if prog != "100" { - cr.networkUp = false - } - } + case event.ACNStatus: + prog := e.Data[event.Progress] + if prog == "100" && !cr.networkUp { + cr.networkUp = true + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + p.ticks = 0 + p.backoff = 1 + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + return true + }) + } else if prog != "100" { + cr.networkUp = false + } + } - case <-time.After(tickTime): - continue + case <-time.After(tickTime): + continue - case <-cr.breakChan: - cr.running = false - return - } - } + case <-cr.breakChan: + cr.running = false + return + } + } } func (cr *contactRetry) retryDisconnected() { - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) - if p.state == connections.DISCONNECTED { - p.ticks++ - if p.ticks >= p.backoff { - p.ticks = 0 - if cr.networkUp { - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - } - } - } - } - return true - }) + if p.state == connections.DISCONNECTED { + p.ticks++ + if p.ticks >= p.backoff { + p.ticks = 0 + if cr.networkUp { + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + } + } + } + return true + }) } func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { - // don't handle contact retries for ourselves - if id == cr.onion { - return - } + // don't handle contact retries for ourselves + if id == cr.onion { + return + } - if _, exists := cr.connections.Load(id); !exists { - p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} - cr.connections.Store(id, p) - return - } + if _, exists := cr.connections.Load(id); !exists { + p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} + cr.connections.Store(id, p) + return + } - pinf, _ := cr.connections.Load(id) - p := pinf.(*contact) - if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { - p.state = connections.DISCONNECTED - if p.backoff == 0 { - p.backoff = 1 - } else if p.backoff < maxBackoff { - p.backoff *= 2 - } - p.ticks = 0 - } else if state == connections.CONNECTING || state == connections.CONNECTED { - p.state = state - } else if state == connections.AUTHENTICATED { - p.state = state - p.backoff = 0 - } + pinf, _ := cr.connections.Load(id) + p := pinf.(*contact) + if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { + p.state = connections.DISCONNECTED + if p.backoff == 0 { + p.backoff = 1 + } else if p.backoff < maxBackoff { + p.backoff *= 2 + } + p.ticks = 0 + } else if state == connections.CONNECTING || state == connections.CONNECTED { + p.state = state + } else if state == connections.AUTHENTICATED { + p.state = state + p.backoff = 0 + } } func (cr *contactRetry) Shutdown() { - cr.breakChan <- true + cr.breakChan <- true } From 8d2134c4db6c8682ab43deb2a33906588972893f Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sat, 10 Sep 2022 10:36:28 -0700 Subject: [PATCH 4/4] fix comments --- app/app.go | 486 ++++++++++++++++++++++++++--------------------------- 1 file changed, 243 insertions(+), 243 deletions(-) diff --git a/app/app.go b/app/app.go index 3e3f9d4..a4d9cf1 100644 --- a/app/app.go +++ b/app/app.go @@ -1,57 +1,57 @@ package app import ( - "cwtch.im/cwtch/app/plugins" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/model/attr" - "cwtch.im/cwtch/model/constants" - "cwtch.im/cwtch/peer" - "cwtch.im/cwtch/protocol/connections" - "cwtch.im/cwtch/storage" - "git.openprivacy.ca/openprivacy/connectivity" - "git.openprivacy.ca/openprivacy/log" - "os" - path "path/filepath" - "strconv" - "sync" + "cwtch.im/cwtch/app/plugins" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" + "cwtch.im/cwtch/storage" + "git.openprivacy.ca/openprivacy/connectivity" + "git.openprivacy.ca/openprivacy/log" + "os" + path "path/filepath" + "strconv" + "sync" ) type application struct { - eventBuses map[string]event.Manager - directory string + eventBuses map[string]event.Manager + directory string - peerLock sync.Mutex - peers map[string]peer.CwtchPeer - acn connectivity.ACN - plugins sync.Map //map[string] []plugins.Plugin + peerLock sync.Mutex + peers map[string]peer.CwtchPeer + acn connectivity.ACN + plugins sync.Map //map[string] []plugins.Plugin - engines map[string]connections.Engine - appBus event.Manager - appmutex sync.Mutex + engines map[string]connections.Engine + appBus event.Manager + appmutex sync.Mutex } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers type Application interface { - LoadProfiles(password string) - CreateTaggedPeer(name string, password string, tag string) - ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) - DeletePeer(onion string, currentPassword string) - AddPeerPlugin(onion string, pluginID plugins.PluginID) + LoadProfiles(password string) + CreateTaggedPeer(name string, password string, tag string) + ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) + DeletePeer(onion string, currentPassword string) + AddPeerPlugin(onion string, pluginID plugins.PluginID) - GetPrimaryBus() event.Manager - GetEventBus(onion string) event.Manager - QueryACNStatus() - QueryACNVersion() + GetPrimaryBus() event.Manager + GetEventBus(onion string) event.Manager + QueryACNStatus() + QueryACNVersion() - ActivePeerEngine(onion string, doListen, doPeers, doServers bool) - DeactivatePeerEngine(onion string) + ActivePeerEngine(onion string, doListen, doPeers, doServers bool) + DeactivatePeerEngine(onion string) - ShutdownPeer(string) - Shutdown() + ShutdownPeer(string) + Shutdown() - GetPeer(onion string) peer.CwtchPeer - ListProfiles() []string + GetPeer(onion string) peer.CwtchPeer + ListProfiles() []string } // LoadProfileFn is the function signature for a function in an app that loads a profile @@ -59,295 +59,295 @@ type LoadProfileFn func(profile peer.CwtchPeer) // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { - log.Debugf("NewApp(%v)\n", appDirectory) - os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) + log.Debugf("NewApp(%v)\n", appDirectory) + os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) - app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} - app.peers = make(map[string]peer.CwtchPeer) + app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} + app.peers = make(map[string]peer.CwtchPeer) - app.acn = acn - statusHandler := app.getACNStatusHandler() - acn.SetStatusCallback(statusHandler) - acn.SetVersionCallback(app.getACNVersionHandler()) - prog, status := acn.GetBootstrapStatus() - statusHandler(prog, status) + app.acn = acn + statusHandler := app.getACNStatusHandler() + acn.SetStatusCallback(statusHandler) + acn.SetVersionCallback(app.getACNVersionHandler()) + prog, status := acn.GetBootstrapStatus() + statusHandler(prog, status) - return app + return app } // ListProfiles returns a map of onions to their profile's Name func (app *application) ListProfiles() []string { - var keys []string + var keys []string - app.peerLock.Lock() - defer app.peerLock.Unlock() - for handle := range app.peers { - keys = append(keys, handle) - } - return keys + app.peerLock.Lock() + defer app.peerLock.Unlock() + for handle := range app.peers { + keys = append(keys, handle) + } + return keys } // GetPeer returns a cwtchPeer for a given onion address func (app *application) GetPeer(onion string) peer.CwtchPeer { - if peer, ok := app.peers[onion]; ok { - return peer - } - return nil + if peer, ok := app.peers[onion]; ok { + return peer + } + return nil } func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) { - if _, exists := ap.plugins.Load(peerid); !exists { - ap.plugins.Store(peerid, []plugins.Plugin{}) - } + if _, exists := ap.plugins.Load(peerid); !exists { + ap.plugins.Store(peerid, []plugins.Plugin{}) + } - pluginsinf, _ := ap.plugins.Load(peerid) - peerPlugins := pluginsinf.([]plugins.Plugin) + pluginsinf, _ := ap.plugins.Load(peerid) + peerPlugins := pluginsinf.([]plugins.Plugin) - newp, err := plugins.Get(id, bus, acn, peerid) - if err == nil { - newp.Start() - peerPlugins = append(peerPlugins, newp) - log.Debugf("storing plugin for %v %v", peerid, peerPlugins) - ap.plugins.Store(peerid, peerPlugins) - } else { - log.Errorf("error adding plugin: %v", err) - } + newp, err := plugins.Get(id, bus, acn, peerid) + if err == nil { + newp.Start() + peerPlugins = append(peerPlugins, newp) + log.Debugf("storing plugin for %v %v", peerid, peerPlugins) + ap.plugins.Store(peerid, peerPlugins) + } else { + log.Errorf("error adding plugin: %v", err) + } } func (app *application) CreateTaggedPeer(name string, password string, tag string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() - profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) + profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) - profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) - if err != nil { - log.Errorf("Error Creating Peer: %v", err) - app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) - return - } + profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) + if err != nil { + log.Errorf("Error Creating Peer: %v", err) + app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) + return + } - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile - if tag != "" { - profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) - } + if tag != "" { + profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) + } - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) } func (app *application) DeletePeer(onion string, password string) { - log.Infof("DeletePeer called on %v\n", onion) - app.appmutex.Lock() - defer app.appmutex.Unlock() + log.Infof("DeletePeer called on %v\n", onion) + app.appmutex.Lock() + defer app.appmutex.Unlock() - if app.peers[onion].CheckPassword(password) { - app.shutdownPeer(onion) - app.peers[onion].Delete() + if app.peers[onion].CheckPassword(password) { + app.shutdownPeer(onion) + app.peers[onion].Delete() - // Shutdown and Remove the Engine + // Shutdown and Remove the Engine - log.Debugf("Delete peer for %v Done\n", onion) - app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) - return - } - app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) + log.Debugf("Delete peer for %v Done\n", onion) + app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) + return + } + app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) } func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { - app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) + app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) } func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) { - profileDirectory := path.Join(app.directory, "profiles") - profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) - if profile != nil || err == nil { - app.installProfile(profile) - } - return profile, err + profileDirectory := path.Join(app.directory, "profiles") + profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) + if profile != nil || err == nil { + app.installProfile(profile) + } + return profile, err } // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them func (app *application) LoadProfiles(password string) { - count := 0 - migrating := false + count := 0 + migrating := false - files, err := os.ReadDir(path.Join(app.directory, "profiles")) - if err != nil { - log.Errorf("error: cannot read profiles directory: %v", err) - return - } + files, err := os.ReadDir(path.Join(app.directory, "profiles")) + if err != nil { + log.Errorf("error: cannot read profiles directory: %v", err) + return + } - for _, file := range files { - // Attempt to load an encrypted database - profileDirectory := path.Join(app.directory, "profiles", file.Name()) - profile, err := peer.FromEncryptedDatabase(profileDirectory, password) - loaded := false - if err == nil { - // return the load the profile... - log.Infof("loading profile from new-type storage database...") - loaded = app.installProfile(profile) - } else { // On failure attempt to load a legacy profile - profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) - if err != nil { - continue - } - log.Infof("found legacy profile. importing to new database structure...") - legacyProfile := profileStore.GetProfileCopy(true) - if !migrating { - migrating = true - app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) - } + for _, file := range files { + // Attempt to load an encrypted database + profileDirectory := path.Join(app.directory, "profiles", file.Name()) + profile, err := peer.FromEncryptedDatabase(profileDirectory, password) + loaded := false + if err == nil { + // return the load the profile... + log.Infof("loading profile from new-type storage database...") + loaded = app.installProfile(profile) + } else { // On failure attempt to load a legacy profile + profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) + if err != nil { + continue + } + log.Infof("found legacy profile. importing to new database structure...") + legacyProfile := profileStore.GetProfileCopy(true) + if !migrating { + migrating = true + app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) + } - cps, err := peer.CreateEncryptedStore(profileDirectory, password) - if err != nil { - log.Errorf("error creating encrypted store: %v", err) - } - profile := peer.ImportLegacyProfile(legacyProfile, cps) - loaded = app.installProfile(profile) - } - if loaded { - count++ - } - } - if count == 0 { - message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) - app.appBus.Publish(message) - } - if migrating { - app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) - } + cps, err := peer.CreateEncryptedStore(profileDirectory, password) + if err != nil { + log.Errorf("error creating encrypted store: %v", err) + } + profile := peer.ImportLegacyProfile(legacyProfile, cps) + loaded = app.installProfile(profile) + } + if loaded { + count++ + } + } + if count == 0 { + message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) + app.appBus.Publish(message) + } + if migrating { + app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) + } } // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true func (app *application) installProfile(profile peer.CwtchPeer) bool { - app.appmutex.Lock() - defer app.appmutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() - // Only attempt to finalize the profile if we don't have one loaded... - if app.peers[profile.GetOnion()] == nil { - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) - return true - } - // Otherwise shutdown the connections - profile.Shutdown() - return false + // Only attempt to finalize the profile if we don't have one loaded... + if app.peers[profile.GetOnion()] == nil { + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) + return true + } + // Otherwise shutdown the connections + profile.Shutdown() + return false } -// / ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online +// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { - profile := app.GetPeer(onion) - if profile != nil { - app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) - if doListen { - profile.Listen() - } - if doPeers { - profile.StartPeersConnections() - } - if doServers { - profile.StartServerConnections() - } - } + profile := app.GetPeer(onion) + if profile != nil { + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + if doListen { + profile.Listen() + } + if doPeers { + profile.StartPeersConnections() + } + if doServers { + profile.StartServerConnections() + } + } } -// / DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline +// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline func (app *application) DeactivatePeerEngine(onion string) { - if engine, exists := app.engines[onion]; exists { - engine.Shutdown() - delete(app.engines, onion) - } + if engine, exists := app.engines[onion]; exists { + engine.Shutdown() + delete(app.engines, onion) + } } // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific func (app *application) GetPrimaryBus() event.Manager { - return app.appBus + return app.appBus } // GetEventBus returns a cwtchPeer's event bus func (app *application) GetEventBus(onion string) event.Manager { - if manager, ok := app.eventBuses[onion]; ok { - return manager - } - return nil + if manager, ok := app.eventBuses[onion]; ok { + return manager + } + return nil } func (app *application) getACNStatusHandler() func(int, string) { - return func(progress int, status string) { - progStr := strconv.Itoa(progress) - app.appmutex.Lock() - app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) - for _, bus := range app.eventBuses { - bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) - } - app.appmutex.Unlock() - } + return func(progress int, status string) { + progStr := strconv.Itoa(progress) + app.appmutex.Lock() + app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) + for _, bus := range app.eventBuses { + bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) + } + app.appmutex.Unlock() + } } func (app *application) getACNVersionHandler() func(string) { - return func(version string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() - app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) - } + return func(version string) { + app.appmutex.Lock() + defer app.appmutex.Unlock() + app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) + } } func (app *application) QueryACNStatus() { - prog, status := app.acn.GetBootstrapStatus() - app.getACNStatusHandler()(prog, status) + prog, status := app.acn.GetBootstrapStatus() + app.getACNStatusHandler()(prog, status) } func (app *application) QueryACNVersion() { - version := app.acn.GetVersion() - app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) + version := app.acn.GetVersion() + app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) } // ShutdownPeer shuts down a peer and removes it from the app's management func (app *application) ShutdownPeer(onion string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() - app.shutdownPeer(onion) + app.appmutex.Lock() + defer app.appmutex.Unlock() + app.shutdownPeer(onion) } -// / shutdownPeer mutex unlocked helper shutdown peer +// shutdownPeer mutex unlocked helper shutdown peer func (app *application) shutdownPeer(onion string) { - app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - app.eventBuses[onion].Shutdown() - delete(app.eventBuses, onion) - app.peers[onion].Shutdown() - delete(app.peers, onion) - if _, ok := app.engines[onion]; ok { - app.engines[onion].Shutdown() - delete(app.engines, onion) - } - log.Debugf("shutting down plugins for %v", onion) - pluginsI, ok := app.plugins.Load(onion) - if ok { - plugins := pluginsI.([]plugins.Plugin) - for _, plugin := range plugins { - log.Debugf("shutting down plugin: %v", plugin) - plugin.Shutdown() - } - } - app.plugins.Delete(onion) + app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) + app.eventBuses[onion].Shutdown() + delete(app.eventBuses, onion) + app.peers[onion].Shutdown() + delete(app.peers, onion) + if _, ok := app.engines[onion]; ok { + app.engines[onion].Shutdown() + delete(app.engines, onion) + } + log.Debugf("shutting down plugins for %v", onion) + pluginsI, ok := app.plugins.Load(onion) + if ok { + plugins := pluginsI.([]plugins.Plugin) + for _, plugin := range plugins { + log.Debugf("shutting down plugin: %v", plugin) + plugin.Shutdown() + } + } + app.plugins.Delete(onion) } // Shutdown shutsdown all peers of an app func (app *application) Shutdown() { - app.appmutex.Lock() - defer app.appmutex.Unlock() - for id := range app.peers { - log.Debugf("Shutting Down Peer %v", id) - app.shutdownPeer(id) - } - log.Debugf("Shutting Down App") - app.appBus.Shutdown() - log.Debugf("Shut Down Complete") + app.appmutex.Lock() + defer app.appmutex.Unlock() + for id := range app.peers { + log.Debugf("Shutting Down Peer %v", id) + app.shutdownPeer(id) + } + log.Debugf("Shutting Down App") + app.appBus.Shutdown() + log.Debugf("Shut Down Complete") }