From 93e2a25673b4483041806ef591d98f8c28c4ecac Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Thu, 3 Mar 2022 15:58:41 -0800 Subject: [PATCH] tweak reconnect plugin to have faster intervals; add group sync progress state to peer --- app/app.go | 4 +++- app/plugins/contactRetry.go | 6 ++--- model/constants/attributes.go | 3 +++ model/conversation.go | 25 ++++++++++++++++++++ peer/cwtch_peer.go | 43 ++++++++++++++++++++++++++++++++--- 5 files changed, 74 insertions(+), 7 deletions(-) diff --git a/app/app.go b/app/app.go index fe05c6b..d892aea 100644 --- a/app/app.go +++ b/app/app.go @@ -248,13 +248,15 @@ func (app *application) ShutdownPeer(onion string) { // Shutdown shutsdown all peers of an app and then the tormanager func (app *application) Shutdown() { for id, peer := range app.peers { - peer.Shutdown() log.Debugf("Shutting Down Peer %v", id) + peer.Shutdown() + log.Debugf("Shutting Down Plugins for %v", id) app.appletPlugins.ShutdownPeer(id) log.Debugf("Shutting Down Engines for %v", id) app.engines[id].Shutdown() log.Debugf("Shutting Down Bus for %v", id) app.eventBuses[id].Shutdown() + log.Debugf("Done Shutdown for peer %v", id) } log.Debugf("Shutting Down App") app.appBus.Shutdown() diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index b019c13..d23f031 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -8,8 +8,8 @@ import ( "time" ) -const tickTime = 10 * time.Second -const maxBackoff int = 32 // 320 seconds or ~5 min +const tickTime = 5 * time.Second +const maxBackoff int = 64 // 320 seconds or ~5 min type connectionType int @@ -132,7 +132,7 @@ func (cr *contactRetry) retryDisconnected() { func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { if _, exists := cr.connections.Load(id); !exists { - p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype} + p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} cr.connections.Store(id, p) return } diff --git a/model/constants/attributes.go b/model/constants/attributes.go index d74dc48..8cf6f9a 100644 --- a/model/constants/attributes.go +++ b/model/constants/attributes.go @@ -51,3 +51,6 @@ const AttrRejected = "rejected-invite" const AttrDownloaded = "file-downloaded" const CustomProfileImageKey = "custom-profile-image" + +const SyncPreLastMessageTime = "SyncPreLastMessageTime" +const SyncMostRecentMessageTime = "SyncMostRecentMessageTime" diff --git a/model/conversation.go b/model/conversation.go index df7e151..02137e6 100644 --- a/model/conversation.go +++ b/model/conversation.go @@ -4,6 +4,7 @@ import ( "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" "encoding/json" + "time" ) // AccessControl is a type determining client assigned authorization to a peer @@ -86,6 +87,30 @@ func (ci *Conversation) IsServer() bool { return false } +// ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process +// it returns a double (0-1) representing the estimated progress of the syncing +func (ci *Conversation) ServerSyncProgress() float64 { + startTimeStr, startExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)).ToString()] + recentTimeStr, recentExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)).ToString()] + + if !startExists || !recentExists { + return 0.0 + } + + startTime, err := time.Parse(startTimeStr, time.RFC3339Nano) + if err != nil { + return 0.0 + } + recentTime, err := time.Parse(recentTimeStr, time.RFC3339Nano) + if err != nil { + return 0.0 + } + + syncRange := time.Since(startTime) + pointFromStart := startTime.Sub(recentTime) + return pointFromStart.Seconds() / syncRange.Seconds() +} + // ConversationMessage bundles an instance of a conversation message row type ConversationMessage struct { ID int diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 186003c..1249551 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -51,6 +51,12 @@ var DefaultEventsToHandle = []event.Type{ event.NewGetValMessageFromPeer, event.ProtocolEngineStopped, event.RetryServerRequest, + event.PeerStateChange, + event.ServerStateChange, + event.SendMessageToPeerError, + event.NewRetValMessageFromPeer, + event.ManifestReceived, + event.FileDownloaded, } // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer @@ -1051,6 +1057,10 @@ func (cp *cwtchPeer) eventHandler() { // Time to either acknowledge the message or insert a new message // Re-encode signature to base64 cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, base64.StdEncoding.EncodeToString(signature), dgm) + if serverState, exists := cp.state[ev.Data[event.GroupServer]]; exists && serverState == connections.AUTHENTICATED { + // server is syncing, update it's most recent sync message time + cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), time.Unix(int64(dgm.Timestamp), 0).Format(time.RFC3339Nano)) + } break } } @@ -1127,8 +1137,6 @@ func (cp *cwtchPeer) eventHandler() { } } - /***** Non default but requestable handleable events *****/ - case event.ManifestReceived: log.Debugf("Manifest Received Event!: %v", ev) handle := ev.Data[event.Handle] @@ -1238,8 +1246,37 @@ func (cp *cwtchPeer) eventHandler() { cp.mutex.Unlock() case event.ServerStateChange: cp.mutex.Lock() - cp.state[ev.Data[event.GroupServer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] + state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] + cp.state[ev.Data[event.GroupServer]] = state cp.mutex.Unlock() + + // If starting to sync, determine last message from known groups on server so we can calculate sync progress + if state == connections.AUTHENTICATED { + conversations, err := cp.FetchConversations() + mostRecentTime := time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC) + if err == nil { + for _, conversationInfo := range conversations { + if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] { + lastMessage, _ := cp.GetMostRecentMessages(conversationInfo.ID, 0, 0, 1) + if len(lastMessage) == 0 { + continue + } + lastGroupMsgTime, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp]) + if err != nil { + continue + } + if lastGroupMsgTime.After(mostRecentTime) { + mostRecentTime = lastGroupMsgTime + } + } + } + } + serverInfo, err := cp.FetchConversationInfo(ev.Data[event.GroupServer]) + if err == nil { + cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) + cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) + } + } default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)