From 6d8f31773ea7f59cea8e6951c8e7937a77f61139 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Sun, 25 Sep 2022 18:28:04 -0700 Subject: [PATCH] add activateEngine to app to handle multiple profiles a little more gracefully; lauchPeerConnections sorts based on last message time; contactRetry slow downs and partial state tracking of circuit queue for adaptive slow downs --- app/app.go | 38 +++++++++++++++++++---- app/plugins/contactRetry.go | 55 ++++++++++++++++++++++++++++------ peer/cwtch_peer.go | 52 +++++++++++++++++++++++++------- protocol/connections/engine.go | 9 +++++- 4 files changed, 129 insertions(+), 25 deletions(-) diff --git a/app/app.go b/app/app.go index a4adc46..b56be0e 100644 --- a/app/app.go +++ b/app/app.go @@ -44,7 +44,8 @@ type Application interface { QueryACNStatus() QueryACNVersion() - ActivePeerEngine(onion string, doListen, doPeers, doServers bool) + ActivateEngine(doListn, doPeers, doServers bool) + ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) DeactivatePeerEngine(onion string) ShutdownPeer(string) @@ -244,20 +245,47 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { return false } +func (app *application) ActivateEngine(doListen, doPeers, doServers bool) { + log.Infof("ActivateEngine") + + for _, profile := range app.peers { + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + } + + if doListen { + for _, profile := range app.peers { + log.Infof(" Listen for %v", profile.GetOnion()) + profile.Listen() + } + } + if doServers { + for _, profile := range app.peers { + log.Infof(" StartServerCons for %v", profile.GetOnion()) + profile.StartServerConnections() + } + } + if doPeers { + for _, profile := range app.peers { + log.Infof(" StartPeerCons for %v", profile.GetOnion()) + profile.StartPeersConnections() + } + } +} + // ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online -func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { +func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) { profile := app.GetPeer(onion) if profile != nil { app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) if doListen { profile.Listen() } - if doPeers { - profile.StartPeersConnections() - } if doServers { profile.StartServerConnections() } + if doPeers { + profile.StartPeersConnections() + } } } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 08d7211..e3b31f3 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -4,12 +4,18 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/connections" "git.openprivacy.ca/openprivacy/log" + "math" "sync" + "sync/atomic" "time" ) -const tickTime = 30 * time.Second -const maxBackoff int = 10 // 320 seconds or ~5 min +const tickTimeSec = 300 //120 +const tickTime = tickTimeSec * time.Second +const baseMaxBackoffPeer int = 6 // 5min * 6 = 30min +// Servers don't reach out, we can assume other peers coming online will ideally contact us, servers will not +const baseMaxBackoffServer int = 3 // 5min * 3 = 15min +const circutTimeoutMins float64 = 2 type connectionType int @@ -36,12 +42,13 @@ type contactRetry struct { onion string lastCheck time.Time - connections sync.Map //[string]*contact + connectingCount int64 + connections sync.Map //[string]*contact } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion, connectingCount: 0} return cr } @@ -108,20 +115,38 @@ func (cr *contactRetry) run() { } } +func calcPendingMultiplier(connectingCount int) int { + throughPutPerMin := (int)(math.Floor(connections.TorMaxPendingConns / circutTimeoutMins)) + minsToClear := connectingCount / throughPutPerMin + baseMaxServerTime := baseMaxBackoffServer * (tickTimeSec / 60) // the lower of the two queues * the tick time in min + multiplier := minsToClear / baseMaxServerTime + if multiplier < 1 { + return 1 + } + return multiplier +} + func (cr *contactRetry) retryDisconnected() { + var retryCount int64 = 0 cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) - + pendingMultiplier := calcPendingMultiplier((int)(atomic.LoadInt64(&cr.connectingCount) + retryCount)) if p.state == connections.DISCONNECTED { p.ticks++ - if p.ticks >= p.backoff { + log.Infof("Retrying on disconnected connection, with pendingmult: %v", pendingMultiplier) + if p.ticks >= (p.backoff * pendingMultiplier) { + retryCount++ p.ticks = 0 if cr.networkUp { if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + go func(id string) { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: id})) + }(p.id) } if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + go func(id string) { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + }(p.id) } } } @@ -140,16 +165,18 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState if _, exists := cr.connections.Load(id); !exists { p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} cr.connections.Store(id, p) + cr.manageStateChange(state, connections.DISCONNECTED) return } pinf, _ := cr.connections.Load(id) p := pinf.(*contact) + cr.manageStateChange(state, p.state) if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { p.state = connections.DISCONNECTED if p.backoff == 0 { p.backoff = 1 - } else if p.backoff < maxBackoff { + } else if p.backoff < baseMaxBackoffPeer { p.backoff *= 2 } p.ticks = 0 @@ -161,6 +188,16 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState } } +func (cr *contactRetry) manageStateChange(state, prevState connections.ConnectionState) { + if state == connections.CONNECTING { + atomic.AddInt64(&cr.connectingCount, 1) + log.Infof("New connecting, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount)) + } else if prevState == connections.CONNECTING { + atomic.AddInt64(&cr.connectingCount, -1) + log.Infof("Failed or Connected, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount)) + } +} + func (cr *contactRetry) Shutdown() { cr.breakChan <- true } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 2911630..75f227a 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -15,10 +15,10 @@ import ( "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" "math/bits" - rand2 "math/rand" "os" path "path/filepath" "runtime" + "sort" "strconv" "strings" "sync" @@ -809,13 +809,12 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState { // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // address. func (cp *cwtchPeer) PeerWithOnion(onion string) { - go func() { - // wait a random number of seconds before triggering - // this cuts down on contention in the evengit st bus - randWait := time.Duration(rand2.Int() % 60) - time.Sleep(randWait * time.Second) - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) - }() + log.Infof("PeerWithOnion go") + //go func() { + // time.Sleep(10 * time.Second) + log.Infof(" peerWithOnion wake, Pub") + cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) + //}() } // SendInviteToConversation kicks off the invite process @@ -944,12 +943,12 @@ func (cp *cwtchPeer) JoinServer(onion string) error { if !exists { signature = base64.StdEncoding.EncodeToString([]byte{}) } - cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") if hasCachedTokens { log.Debugf("using cached tokens for %v", ci.Handle) } + log.Infof("Publish JoinServer Event") cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson})) return nil } @@ -1004,28 +1003,61 @@ func (cp *cwtchPeer) Listen() { } +type RecentConversation struct { + model *model.Conversation + lastMessageTime int +} + // StartPeersConnections attempts to connect to peer connections // Status: Ready for 1.5 func (cp *cwtchPeer) StartPeersConnections() { conversations, _ := cp.FetchConversations() + byRecent := []*RecentConversation{} + log.Infof("StartPeerConnections") + for _, conversation := range conversations { if conversation.Accepted && !conversation.IsGroup() && !conversation.IsServer() { - cp.PeerWithOnion(conversation.Handle) + lastMessageTime := 0 + lastMessage, _ := cp.GetMostRecentMessages(conversation.ID, 0, 0, 1) + if len(lastMessage) != 0 { + time, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp]) + if err == nil { + lastMessageTime = int(time.Unix()) + } + } + byRecent = append(byRecent, &RecentConversation{conversation, lastMessageTime}) } } + + sort.Slice(byRecent, func(i, j int) bool { + return byRecent[i].lastMessageTime > byRecent[j].lastMessageTime + }) + + for _, conversation := range byRecent { + log.Infof(" PeerWithOnion(%v)", conversation.model.Handle) + //go func(contact string) { + cp.PeerWithOnion(conversation.model.Handle) + //}(conversation.model.Handle) + } } // StartServerConnections attempts to connect to all server connections // Status: Ready for 1.5 func (cp *cwtchPeer) StartServerConnections() { + log.Infof("StartServerConections") conversations, _ := cp.FetchConversations() for _, conversation := range conversations { if conversation.IsServer() { + log.Infof(" joinServer(%v)", conversation.Handle) + //go func(server string) { + // time.Sleep(5 * time.Second) err := cp.JoinServer(conversation.Handle) if err != nil { // Almost certainly a programming error so print it.. log.Errorf("error joining server %v", err) } + //}(conversation.Handle) + } } } diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 57b6fc3..48564ed 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -27,6 +27,8 @@ import ( "golang.org/x/crypto/ed25519" ) +const TorMaxPendingConns = 32 // tor/src/app/config/config.c MaxClientCircuitsPending + type connectionLockedService struct { service tapir.Service connectingLock sync.Mutex @@ -138,6 +140,7 @@ func (e *engine) EventManager() event.Manager { // eventHandler process events from other subsystems func (e *engine) eventHandler() { + log.Infof("engine.EventHandler()...") for { ev := e.queue.Next() // optimistic shutdown... @@ -338,11 +341,15 @@ func (e *engine) Shutdown() { // peerWithOnion is the entry point for cwtchPeer relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithOnion(onion string) { - + log.Infof("engine.peerWithOnion(%v)", onion) log.Debugf("Called PeerWithOnion for %v", onion) if !e.isBlocked(onion) { e.ignoreOnShutdown(e.peerConnecting)(onion) + log.Infof(" service.Connect(%v)", onion) connected, err := e.service.Connect(onion, e.createPeerTemplate()) + if err != nil { + log.Errorf("peerWithOnion err form Connect: %v\n", err) + } // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless)