From ad72ce6e7a677e4e36a6e111b7e7008de652e31a Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 2 Dec 2022 16:23:11 -0800 Subject: [PATCH] add to app ActivatePeerEngine; add to peer StartConnections; order connection attempts by lastseend (track); massive connection retry rework --- app/app.go | 34 +- app/plugins/antispam.go | 4 + app/plugins/contactRetry.go | 309 +++++++++++++----- app/plugins/networkCheck.go | 4 + app/plugins/plugin.go | 1 + event/common.go | 16 +- model/constants/attributes.go | 2 + peer/cwtch_peer.go | 179 +++++++--- peer/profile_interface.go | 4 + protocol/connections/engine.go | 13 +- testing/cwtch_peer_server_integration_test.go | 14 +- .../file_sharing_integration_test.go | 4 +- 12 files changed, 422 insertions(+), 162 deletions(-) diff --git a/app/app.go b/app/app.go index b56be0e..8ded4ee 100644 --- a/app/app.go +++ b/app/app.go @@ -104,6 +104,13 @@ func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.M pluginsinf, _ := ap.plugins.Load(peerid) peerPlugins := pluginsinf.([]plugins.Plugin) + for _, plugin := range peerPlugins { + if plugin.Id() == id { + log.Errorf("trying to add second instance of plugin %v to peer %v", id, peerid) + return + } + } + newp, err := plugins.Get(id, bus, acn, peerid) if err == nil { newp.Start() @@ -136,7 +143,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin if tag != "" { profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) } - + app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) } @@ -237,6 +244,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { app.eventBuses[profile.GetOnion()] = eventBus profile.Init(app.eventBuses[profile.GetOnion()]) app.peers[profile.GetOnion()] = profile + app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) return true } @@ -246,7 +254,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { } func (app *application) ActivateEngine(doListen, doPeers, doServers bool) { - log.Infof("ActivateEngine") + log.Debugf("ActivateEngine") for _, profile := range app.peers { app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) @@ -254,20 +262,15 @@ func (app *application) ActivateEngine(doListen, doPeers, doServers bool) { if doListen { for _, profile := range app.peers { - log.Infof(" Listen for %v", profile.GetOnion()) + log.Debugf(" Listen for %v", profile.GetOnion()) profile.Listen() } } - if doServers { + + if doPeers || doServers { for _, profile := range app.peers { - log.Infof(" StartServerCons for %v", profile.GetOnion()) - profile.StartServerConnections() - } - } - if doPeers { - for _, profile := range app.peers { - log.Infof(" StartPeerCons for %v", profile.GetOnion()) - profile.StartPeersConnections() + log.Debugf(" Start Connections for %v doPeers:%v doServers:%v", profile.GetOnion(), doPeers, doServers) + profile.StartConnections(doPeers, doServers) } } } @@ -280,12 +283,7 @@ func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doSe if doListen { profile.Listen() } - if doServers { - profile.StartServerConnections() - } - if doPeers { - profile.StartPeersConnections() - } + profile.StartConnections(doPeers, doServers) } } diff --git a/app/plugins/antispam.go b/app/plugins/antispam.go index 467137d..6fb1195 100644 --- a/app/plugins/antispam.go +++ b/app/plugins/antispam.go @@ -18,6 +18,10 @@ func (a *antispam) Start() { go a.run() } +func (cr *antispam) Id() PluginID { + return ANTISPAM +} + func (a antispam) Shutdown() { a.breakChan <- true } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index e3b31f3..efd83fe 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -10,12 +10,13 @@ import ( "time" ) -const tickTimeSec = 300 //120 +const tickTimeSec = 30 const tickTime = tickTimeSec * time.Second -const baseMaxBackoffPeer int = 6 // 5min * 6 = 30min -// Servers don't reach out, we can assume other peers coming online will ideally contact us, servers will not -const baseMaxBackoffServer int = 3 // 5min * 3 = 15min -const circutTimeoutMins float64 = 2 + +const circutTimeoutSecs int = 120 + +const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min +const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m type connectionType int @@ -29,29 +30,126 @@ type contact struct { state connections.ConnectionState ctype connectionType - ticks int - backoff int + lastAttempt time.Time + failedCount int + + lastSeen time.Time + queued bool +} + +// compare a to b +// returns -1 if a < b +// +// 0 if a == b +// +1 if a > b +// +// algo: sort by failedCount first favouring less attempts, then sort by lastSeen time favouring more recent connections +func (a *contact) compare(b *contact) int { + if a.failedCount < b.failedCount { + return -1 + } else if a.failedCount > b.failedCount { + return +1 + } + + if a.lastSeen.After(b.lastSeen) { + return -1 + } else if a.lastSeen.Before(b.lastSeen) { + return +1 + } + + return 0 +} + +type connectionQueue struct { + queue []*contact + lock sync.Mutex +} + +func newConnectionQueue() *connectionQueue { + return &connectionQueue{queue: []*contact{}} +} + +func (cq *connectionQueue) insert(c *contact) { + cq.lock.Lock() + defer cq.lock.Unlock() + // find loc + i := 0 + var b *contact + for i, b = range cq.queue { + if c.compare(b) >= 0 { + break + } + } + + // insert + if len(cq.queue) == i { // nil or empty slice or after last element + cq.queue = append(cq.queue, c) + } else { + cq.queue = append(cq.queue[:i+1], cq.queue[i:]...) // index < len(a) + cq.queue[i] = c + } + + c.queued = true +} + +func (cq *connectionQueue) dequeue() *contact { + cq.lock.Lock() + defer cq.lock.Unlock() + if len(cq.queue) == 0 { + return nil + } + c := cq.queue[0] + cq.queue = cq.queue[1:] + c.queued = false + return c } type contactRetry struct { - bus event.Manager - queue event.Queue - networkUp bool - running bool - breakChan chan bool - onion string - lastCheck time.Time + bus event.Manager + queue event.Queue + networkUp bool + networkUpTime time.Time + running bool + breakChan chan bool + onion string + lastCheck time.Time - connectingCount int64 - connections sync.Map //[string]*contact + connections sync.Map //[string]*contact + connCount int64 + pendingQueue *connectionQueue } -// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing +// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion, connectingCount: 0} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, networkUp: false, networkUpTime: time.Now(), onion: onion, pendingQueue: newConnectionQueue()} return cr } +// maxTorCircuitsPending a function to throttle access to tor network during start up +func (cr *contactRetry) maxTorCircuitsPending() int { + timeSinceStart := time.Now().Sub(cr.networkUpTime) + if timeSinceStart < 30*time.Second { + return 4 + } else if timeSinceStart < 4*time.Minute { + return 8 + } else if timeSinceStart < 8*time.Minute { + return 16 + } + return connections.TorMaxPendingConns +} + +func (cr *contactRetry) connectingCount() int { + connecting := 0 + cr.connections.Range(func(k, v interface{}) bool { + conn := v.(*contact) + if conn.state == connections.CONNECTING { + connecting++ + } + return true + }) + return connecting +} + func (cr *contactRetry) Start() { if !cr.running { go cr.run() @@ -60,17 +158,34 @@ func (cr *contactRetry) Start() { } } +func (cr *contactRetry) Id() PluginID { + return CONNECTIONRETRY +} + func (cr *contactRetry) run() { cr.running = true cr.bus.Subscribe(event.PeerStateChange, cr.queue) cr.bus.Subscribe(event.ACNStatus, cr.queue) cr.bus.Subscribe(event.ServerStateChange, cr.queue) + cr.bus.Subscribe(event.PeerRequest, cr.queue) + cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) + cr.bus.Subscribe(event.QueueJoinServer, cr.queue) for { - if time.Since(cr.lastCheck) > tickTime { - cr.retryDisconnected() - cr.lastCheck = time.Now() + cr.requeueReady() + connectingCount := cr.connectingCount() + connCount := atomic.LoadInt64(&cr.connCount) + log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), connCount, connectingCount) + for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { + contact := cr.pendingQueue.dequeue() + // could have received incoming connection while in queue, make sure still disconnected before trying + if contact.state == connections.DISCONNECTED { + cr.publishConnectionRequest(contact) + connectingCount++ + } } + cr.lastCheck = time.Now() + select { case e := <-cr.queue.OutChan(): switch e.EventType { @@ -84,20 +199,38 @@ func (cr *contactRetry) run() { server := e.Data[event.GroupServer] cr.handleEvent(server, state, serverConn) + case event.QueueJoinServer: + fallthrough + case event.QueuePeerRequest: + lastSeen, err := time.Parse(e.Data[event.LastSeen], time.RFC3339Nano) + if err != nil { + lastSeen = event.CwtchEpoch + } + + id := "" + if peer, exists := e.Data[event.RemotePeer]; exists { + id = peer + cr.addConnection(peer, connections.DISCONNECTED, peerConn, lastSeen) + } else if server, exists := e.Data[event.GroupServer]; exists { + id = server + cr.addConnection(server, connections.DISCONNECTED, serverConn, lastSeen) + } + + if c, ok := cr.connections.Load(id); ok { + contact := c.(*contact) + if contact.state == connections.DISCONNECTED && !contact.queued { + cr.pendingQueue.insert(contact) + } + } + case event.ACNStatus: prog := e.Data[event.Progress] if prog == "100" && !cr.networkUp { cr.networkUp = true + cr.networkUpTime = time.Now() cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) - p.ticks = 0 - p.backoff = 1 - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - } + p.failedCount = 0 return true }) } else if prog != "100" { @@ -115,47 +248,68 @@ func (cr *contactRetry) run() { } } -func calcPendingMultiplier(connectingCount int) int { - throughPutPerMin := (int)(math.Floor(connections.TorMaxPendingConns / circutTimeoutMins)) - minsToClear := connectingCount / throughPutPerMin - baseMaxServerTime := baseMaxBackoffServer * (tickTimeSec / 60) // the lower of the two queues * the tick time in min - multiplier := minsToClear / baseMaxServerTime - if multiplier < 1 { - return 1 - } - return multiplier +func ticksToSec(ticks int) int { + return ticks * tickTimeSec } -func (cr *contactRetry) retryDisconnected() { - var retryCount int64 = 0 +func (cr *contactRetry) requeueReady() { + if !cr.networkUp { + return + } + + retryable := []*contact{} + count := atomic.LoadInt64(&cr.connCount) + + throughPutPerMin := cr.maxTorCircuitsPending() / (circutTimeoutSecs / 60) + adjustedBaseTimeout := int(count) / throughPutPerMin * 60 + if adjustedBaseTimeout < circutTimeoutSecs { + adjustedBaseTimeout = circutTimeoutSecs + } else if adjustedBaseTimeout > MaxBaseTimeoutSec { + adjustedBaseTimeout = MaxBaseTimeoutSec + } + cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) - pendingMultiplier := calcPendingMultiplier((int)(atomic.LoadInt64(&cr.connectingCount) + retryCount)) - if p.state == connections.DISCONNECTED { - p.ticks++ - log.Infof("Retrying on disconnected connection, with pendingmult: %v", pendingMultiplier) - if p.ticks >= (p.backoff * pendingMultiplier) { - retryCount++ - p.ticks = 0 - if cr.networkUp { - if p.ctype == peerConn { - go func(id string) { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: id})) - }(p.id) - } - if p.ctype == serverConn { - go func(id string) { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - }(p.id) - } - } + if p.state == connections.DISCONNECTED && !p.queued { + timeout := time.Duration((math.Pow(2, float64(p.failedCount)))*float64(adjustedBaseTimeout /*baseTimeoutSec*/)) * time.Second + if time.Now().Sub(p.lastAttempt) > timeout { + retryable = append(retryable, p) } } return true }) + for _, contact := range retryable { + cr.pendingQueue.insert(contact) + } +} + +func (cr *contactRetry) publishConnectionRequest(contact *contact) { + if contact.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) + } + if contact.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id})) + } + contact.state = connections.CONNECTING // Hacky but needed so we don't over flood waiting for PeerStateChange from engine + contact.lastAttempt = time.Now() +} + +func (cr *contactRetry) addConnection(id string, state connections.ConnectionState, ctype connectionType, lastSeen time.Time) { + // don't handle contact retries for ourselves + if id == cr.onion { + return + } + + if _, exists := cr.connections.Load(id); !exists { + p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false} + cr.connections.Store(id, p) + atomic.AddInt64(&cr.connCount, 1) + return + } } func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { + log.Debugf("cr.handleEvent state to %v on id %v", connections.ConnectionStateName[state], id) // don't handle contact retries for ourselves if id == cr.onion { @@ -163,41 +317,34 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState } if _, exists := cr.connections.Load(id); !exists { - p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} - cr.connections.Store(id, p) - cr.manageStateChange(state, connections.DISCONNECTED) + cr.addConnection(id, state, ctype, event.CwtchEpoch) return } pinf, _ := cr.connections.Load(id) p := pinf.(*contact) - cr.manageStateChange(state, p.state) + log.Infof(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion) if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { - p.state = connections.DISCONNECTED - if p.backoff == 0 { - p.backoff = 1 - } else if p.backoff < baseMaxBackoffPeer { - p.backoff *= 2 + if p.state == connections.SYNCED || p.state == connections.AUTHENTICATED { + p.lastSeen = time.Now() + } else { + p.failedCount += 1 + } + p.state = connections.DISCONNECTED + p.lastAttempt = time.Now() + if p.failedCount > maxFailedBackoff { + p.failedCount = maxFailedBackoff } - p.ticks = 0 } else if state == connections.CONNECTING || state == connections.CONNECTED { p.state = state - } else if state == connections.AUTHENTICATED { + } else if state == connections.AUTHENTICATED || state == connections.SYNCED { p.state = state - p.backoff = 0 - } -} - -func (cr *contactRetry) manageStateChange(state, prevState connections.ConnectionState) { - if state == connections.CONNECTING { - atomic.AddInt64(&cr.connectingCount, 1) - log.Infof("New connecting, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount)) - } else if prevState == connections.CONNECTING { - atomic.AddInt64(&cr.connectingCount, -1) - log.Infof("Failed or Connected, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount)) + p.lastSeen = time.Now() + p.failedCount = 0 } } func (cr *contactRetry) Shutdown() { cr.breakChan <- true + cr.queue.Shutdown() } diff --git a/app/plugins/networkCheck.go b/app/plugins/networkCheck.go index eec11a5..75a967e 100644 --- a/app/plugins/networkCheck.go +++ b/app/plugins/networkCheck.go @@ -40,6 +40,10 @@ func (nc *networkCheck) Start() { go nc.run() } +func (cr *networkCheck) Id() PluginID { + return NETWORKCHECK +} + func (nc *networkCheck) run() { nc.running = true nc.offline = true diff --git a/app/plugins/plugin.go b/app/plugins/plugin.go index 7ebf773..9141b1a 100644 --- a/app/plugins/plugin.go +++ b/app/plugins/plugin.go @@ -20,6 +20,7 @@ const ( type Plugin interface { Start() Shutdown() + Id() PluginID } // Get is a plugin factory for the requested plugin diff --git a/event/common.go b/event/common.go index 5756314..51bc7fd 100644 --- a/event/common.go +++ b/event/common.go @@ -1,5 +1,9 @@ package event +import "time" + +var CwtchEpoch = time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC) + // Type captures the definition of many common Cwtch application events type Type string @@ -13,6 +17,14 @@ const ( // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" PeerRequest = Type("PeerRequest") + // QueuePeerRequest + // When peer has too many peers to try and wants to ease off Tor throttling, use this to notify ContactRetry plugin to schedule a peer for later try + // LastSeen: last seen time of the contact + // And one of + // RemotePeer + // GroupServer + QueuePeerRequest = Type("QueuePeerRequest") + // RetryPeerRequest // Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers // attributes: @@ -35,7 +47,8 @@ const ( AllowUnknownPeers = Type("AllowUnknownPeers") // GroupServer - JoinServer = Type("JoinServer") + QueueJoinServer = Type("QueueJoinServer") + JoinServer = Type("JoinServer") // attributes GroupServer - the onion of the server to leave LeaveServer = Type("LeaveServer") @@ -217,6 +230,7 @@ const ( Onion = Field("Onion") RemotePeer = Field("RemotePeer") + LastSeen = Field("LastSeen") Ciphertext = Field("Ciphertext") Signature = Field("Signature") CachedTokens = Field("CachedTokens") diff --git a/model/constants/attributes.go b/model/constants/attributes.go index 8cf6f9a..9e6f4f6 100644 --- a/model/constants/attributes.go +++ b/model/constants/attributes.go @@ -54,3 +54,5 @@ const CustomProfileImageKey = "custom-profile-image" const SyncPreLastMessageTime = "SyncPreLastMessageTime" const SyncMostRecentMessageTime = "SyncMostRecentMessageTime" + +const AttrLastConnectionTime = "last-connection-time" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 75f227a..2799416 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -809,12 +809,34 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState { // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // address. func (cp *cwtchPeer) PeerWithOnion(onion string) { - log.Infof("PeerWithOnion go") - //go func() { - // time.Sleep(10 * time.Second) - log.Infof(" peerWithOnion wake, Pub") - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) - //}() + lastSeen := event.CwtchEpoch + ci, err := cp.FetchConversationInfo(onion) + if err == nil { + lastSeen = cp.GetConversationLastSeenTime(ci.ID) + } + cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) +} + +// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests +// Status: Ready for 1.10 +func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) { + lastSeen := event.CwtchEpoch + ci, err := cp.FetchConversationInfo(handle) + if err == nil { + lastSeen = cp.GetConversationLastSeenTime(ci.ID) + } + cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) +} + +// QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests +// Status: Ready for 1.10 +func (cp *cwtchPeer) QueueJoinServer(handle string) { + lastSeen := event.CwtchEpoch + ci, err := cp.FetchConversationInfo(handle) + if err == nil { + lastSeen = cp.GetConversationLastSeenTime(ci.ID) + } + cp.eventBus.Publish(event.NewEvent(event.QueueJoinServer, map[event.Field]string{event.GroupServer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) } // SendInviteToConversation kicks off the invite process @@ -948,7 +970,6 @@ func (cp *cwtchPeer) JoinServer(onion string) error { log.Debugf("using cached tokens for %v", ci.Handle) } - log.Infof("Publish JoinServer Event") cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson})) return nil } @@ -1003,61 +1024,97 @@ func (cp *cwtchPeer) Listen() { } -type RecentConversation struct { - model *model.Conversation - lastMessageTime int +type LastSeenConversation struct { + model *model.Conversation + lastSeen time.Time +} + +func (cp *cwtchPeer) GetConversationLastSeenTime(conversationId int) time.Time { + timestamp, err := cp.GetConversationAttribute(conversationId, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime))) + if err == nil { + if time, err := time.Parse(time.RFC3339Nano, timestamp); err == nil { + return time + } + } + + lastMessage, _ := cp.GetMostRecentMessages(conversationId, 0, 0, 1) + if len(lastMessage) != 0 { + time, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp]) + if err == nil { + return time + } + } + + // Cwtch launch date + return event.CwtchEpoch + +} + +func (cp *cwtchPeer) getConnectionsSortedByLastSeen(doPeers, doServers bool) []*LastSeenConversation { + conversations, _ := cp.FetchConversations() + byRecent := []*LastSeenConversation{} + + for _, conversation := range conversations { + if conversation.Accepted && !conversation.IsGroup() { + if conversation.IsServer() { + if !doServers { + continue + } + } else { + if !doPeers { + continue + } + } + + byRecent = append(byRecent, &LastSeenConversation{conversation, cp.GetConversationLastSeenTime(conversation.ID)}) + } + } + sort.Slice(byRecent, func(i, j int) bool { + return byRecent[i].lastSeen.After(byRecent[j].lastSeen) + }) + return byRecent +} + +func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) { + byRecent := cp.getConnectionsSortedByLastSeen(doPeers, doServers) + log.Infof("StartConnections for %v", cp.GetOnion()) + for _, conversation := range byRecent { + if conversation.model.IsServer() { + log.Infof(" QueueJoinServer(%v)", conversation.model.Handle) + cp.QueueJoinServer(conversation.model.Handle) + } else { + log.Infof(" QueuePeerWithOnion(%v)", conversation.model.Handle) + cp.QueuePeeringWithOnion(conversation.model.Handle) + } + time.Sleep(50 * time.Millisecond) + } } // StartPeersConnections attempts to connect to peer connections // Status: Ready for 1.5 +// Deprecated: for 1.10 use StartConnections func (cp *cwtchPeer) StartPeersConnections() { - conversations, _ := cp.FetchConversations() - byRecent := []*RecentConversation{} log.Infof("StartPeerConnections") - for _, conversation := range conversations { - if conversation.Accepted && !conversation.IsGroup() && !conversation.IsServer() { - lastMessageTime := 0 - lastMessage, _ := cp.GetMostRecentMessages(conversation.ID, 0, 0, 1) - if len(lastMessage) != 0 { - time, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp]) - if err == nil { - lastMessageTime = int(time.Unix()) - } - } - byRecent = append(byRecent, &RecentConversation{conversation, lastMessageTime}) - } - } - - sort.Slice(byRecent, func(i, j int) bool { - return byRecent[i].lastMessageTime > byRecent[j].lastMessageTime - }) + byRecent := cp.getConnectionsSortedByLastSeen(true, false) for _, conversation := range byRecent { log.Infof(" PeerWithOnion(%v)", conversation.model.Handle) - //go func(contact string) { - cp.PeerWithOnion(conversation.model.Handle) - //}(conversation.model.Handle) + cp.QueuePeeringWithOnion(conversation.model.Handle) } } // StartServerConnections attempts to connect to all server connections // Status: Ready for 1.5 +// Deprecated: for 1.10 use StartConnections func (cp *cwtchPeer) StartServerConnections() { log.Infof("StartServerConections") - conversations, _ := cp.FetchConversations() - for _, conversation := range conversations { - if conversation.IsServer() { - log.Infof(" joinServer(%v)", conversation.Handle) - //go func(server string) { - // time.Sleep(5 * time.Second) - err := cp.JoinServer(conversation.Handle) - if err != nil { - // Almost certainly a programming error so print it.. - log.Errorf("error joining server %v", err) - } - //}(conversation.Handle) + byRecent := cp.getConnectionsSortedByLastSeen(false, true) + for _, conversation := range byRecent { + if conversation.model.IsServer() { + log.Infof(" QueueJoinServer(%v)", conversation.model.Handle) + cp.QueueJoinServer(conversation.model.Handle) } } } @@ -1342,9 +1399,27 @@ func (cp *cwtchPeer) eventHandler() { case event.PeerStateChange: handle := ev.Data[event.RemotePeer] if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED { - _, err := cp.FetchConversationInfo(handle) + ci, err := cp.FetchConversationInfo(handle) + var cid int if err != nil { - cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false) + // if it's a newly authenticated connection with no conversation storage, init + cid, err = cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false) + } else { + cid = ci.ID + } + + timestamp := time.Now().Format(time.RFC3339Nano) + cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp) + } else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED { + ci, err := cp.FetchConversationInfo(handle) + if err == nil { + cp.mutex.Lock() + if cp.state[ev.Data[event.RemotePeer]] == connections.AUTHENTICATED { + // If peer went offline, set last seen time to now + timestamp := time.Now().Format(time.RFC3339Nano) + cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp) + } + cp.mutex.Unlock() } } cp.mutex.Lock() @@ -1352,6 +1427,7 @@ func (cp *cwtchPeer) eventHandler() { cp.mutex.Unlock() case event.ServerStateChange: cp.mutex.Lock() + prevState := cp.state[ev.Data[event.GroupServer]] state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] cp.state[ev.Data[event.GroupServer]] = state cp.mutex.Unlock() @@ -1359,7 +1435,7 @@ func (cp *cwtchPeer) eventHandler() { // If starting to sync, determine last message from known groups on server so we can calculate sync progress if state == connections.AUTHENTICATED { conversations, err := cp.FetchConversations() - mostRecentTime := time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC) + mostRecentTime := event.CwtchEpoch if err == nil { for _, conversationInfo := range conversations { if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] { @@ -1381,6 +1457,15 @@ func (cp *cwtchPeer) eventHandler() { if err == nil { cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) + + if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED { + timestamp := time.Now().Format(time.RFC3339Nano) + cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp) + } else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED && prevState == connections.AUTHENTICATED { + // If peer went offline, set last seen time to now + timestamp := time.Now().Format(time.RFC3339Nano) + cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp) + } } } case event.TriggerAntispamCheck: diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 51d6c38..493bc3e 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -64,8 +64,12 @@ type CwtchPeer interface { AutoHandleEvents(events []event.Type) Listen() + StartConnections(doPeers, doServers bool) + // Deprecated in 1.10 StartPeersConnections() + // Deprecated in 1.10 StartServerConnections() + Shutdown() // GetOnion is deprecated. If you find yourself needing to rely on this method it is time diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 48564ed..419a381 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -27,7 +27,12 @@ import ( "golang.org/x/crypto/ed25519" ) -const TorMaxPendingConns = 32 // tor/src/app/config/config.c MaxClientCircuitsPending +// 32 from tor/src/app/config/config.c MaxClientCircuitsPending +// we lower a bit because there's a lot of spillage +// - just cus we get a SOCKS timeout doesn't mean tor has stopped trying as a huge sorce +// - potential multiple profiles as a huge source +// - second order connections like token service's second servers aren't tracked in our system adding a few extra periodically +const TorMaxPendingConns = 28 type connectionLockedService struct { service tapir.Service @@ -140,7 +145,6 @@ func (e *engine) EventManager() event.Manager { // eventHandler process events from other subsystems func (e *engine) eventHandler() { - log.Infof("engine.EventHandler()...") for { ev := e.queue.Next() // optimistic shutdown... @@ -341,15 +345,10 @@ func (e *engine) Shutdown() { // peerWithOnion is the entry point for cwtchPeer relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithOnion(onion string) { - log.Infof("engine.peerWithOnion(%v)", onion) log.Debugf("Called PeerWithOnion for %v", onion) if !e.isBlocked(onion) { e.ignoreOnShutdown(e.peerConnecting)(onion) - log.Infof(" service.Connect(%v)", onion) connected, err := e.service.Connect(onion, e.createPeerTemplate()) - if err != nil { - log.Errorf("peerWithOnion err form Connect: %v\n", err) - } // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index e1f95ed..1fbe98f 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -38,7 +38,7 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co for { log.Infof("%v checking connection...\n", peerName) state := peer.GetPeerState(addr) - log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state) + log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state]) if state == connections.FAILED { t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr) } @@ -118,6 +118,7 @@ func TestCwtchPeerIntegration(t *testing.T) { if err != nil { t.Fatalf("Could not start Tor: %v", err) } + log.Infof("Waiting for tor to bootstrap...") acn.WaitTillBootstrapped() defer acn.Close() @@ -149,25 +150,25 @@ func TestCwtchPeerIntegration(t *testing.T) { app.CreateTaggedPeer("Carol", "asdfasdf", "test") alice := app2.WaitGetPeer(app, "Alice") - app.ActivePeerEngine(alice.GetOnion(), true, true, true) + app.ActivatePeerEngine(alice.GetOnion(), true, true, true) log.Infoln("Alice created:", alice.GetOnion()) alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob := app2.WaitGetPeer(app, "Bob") - app.ActivePeerEngine(bob.GetOnion(), true, true, true) + app.ActivatePeerEngine(bob.GetOnion(), true, true, true) log.Infoln("Bob created:", bob.GetOnion()) bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol := app2.WaitGetPeer(app, "Carol") - app.ActivePeerEngine(carol.GetOnion(), true, true, true) + app.ActivatePeerEngine(carol.GetOnion(), true, true, true) log.Infoln("Carol created:", carol.GetOnion()) carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) waitTime := time.Duration(60) * time.Second - log.Infof("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime) + log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime) time.Sleep(waitTime) numGoRoutinesPostPeerStart := runtime.NumGoroutine() log.Infof("** Wait Done!") @@ -394,7 +395,7 @@ func TestCwtchPeerIntegration(t *testing.T) { pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ - "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", + "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) @@ -418,6 +419,7 @@ func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, messa // Utility function for testing that a message in a conversation is as expected func checkMessage(t *testing.T, profile peer.CwtchPeer, id int, messageID int, expected string) { message, _, err := profile.GetChannelMessage(id, 0, messageID) + log.Debugf(" checking if expected: %v is actual: %v", expected, message) if err != nil { log.Errorf("unexpected message %v expected: %v got error: %v", profile.GetOnion(), expected, err) t.Fatalf("unexpected message %v expected: %v got error: %v\n", profile.GetOnion(), expected, err) diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index b84280b..2539011 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -103,9 +103,9 @@ func TestFileSharing(t *testing.T) { t.Logf("** Waiting for Alice, Bob...") alice := app2.WaitGetPeer(app, "alice") - app.ActivePeerEngine(alice.GetOnion(), true, true, true) + app.ActivatePeerEngine(alice.GetOnion(), true, true, true) bob := app2.WaitGetPeer(app, "bob") - app.ActivePeerEngine(bob.GetOnion(), true, true, true) + app.ActivatePeerEngine(bob.GetOnion(), true, true, true) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived})