tweak reconnect plugin to have faster intervals; add group sync progress state to peer
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Dan Ballard 2022-03-03 15:58:41 -08:00
parent d6a34258be
commit 93e2a25673
5 changed files with 74 additions and 7 deletions

View File

@ -248,13 +248,15 @@ func (app *application) ShutdownPeer(onion string) {
// Shutdown shutsdown all peers of an app and then the tormanager // Shutdown shutsdown all peers of an app and then the tormanager
func (app *application) Shutdown() { func (app *application) Shutdown() {
for id, peer := range app.peers { for id, peer := range app.peers {
peer.Shutdown()
log.Debugf("Shutting Down Peer %v", id) log.Debugf("Shutting Down Peer %v", id)
peer.Shutdown()
log.Debugf("Shutting Down Plugins for %v", id)
app.appletPlugins.ShutdownPeer(id) app.appletPlugins.ShutdownPeer(id)
log.Debugf("Shutting Down Engines for %v", id) log.Debugf("Shutting Down Engines for %v", id)
app.engines[id].Shutdown() app.engines[id].Shutdown()
log.Debugf("Shutting Down Bus for %v", id) log.Debugf("Shutting Down Bus for %v", id)
app.eventBuses[id].Shutdown() app.eventBuses[id].Shutdown()
log.Debugf("Done Shutdown for peer %v", id)
} }
log.Debugf("Shutting Down App") log.Debugf("Shutting Down App")
app.appBus.Shutdown() app.appBus.Shutdown()

View File

@ -8,8 +8,8 @@ import (
"time" "time"
) )
const tickTime = 10 * time.Second const tickTime = 5 * time.Second
const maxBackoff int = 32 // 320 seconds or ~5 min const maxBackoff int = 64 // 320 seconds or ~5 min
type connectionType int type connectionType int
@ -132,7 +132,7 @@ func (cr *contactRetry) retryDisconnected() {
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
if _, exists := cr.connections.Load(id); !exists { if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype} p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
cr.connections.Store(id, p) cr.connections.Store(id, p)
return return
} }

View File

@ -51,3 +51,6 @@ const AttrRejected = "rejected-invite"
const AttrDownloaded = "file-downloaded" const AttrDownloaded = "file-downloaded"
const CustomProfileImageKey = "custom-profile-image" const CustomProfileImageKey = "custom-profile-image"
const SyncPreLastMessageTime = "SyncPreLastMessageTime"
const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"

View File

@ -4,6 +4,7 @@ import (
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/model/constants"
"encoding/json" "encoding/json"
"time"
) )
// AccessControl is a type determining client assigned authorization to a peer // AccessControl is a type determining client assigned authorization to a peer
@ -86,6 +87,30 @@ func (ci *Conversation) IsServer() bool {
return false return false
} }
// ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process
// it returns a double (0-1) representing the estimated progress of the syncing
func (ci *Conversation) ServerSyncProgress() float64 {
startTimeStr, startExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)).ToString()]
recentTimeStr, recentExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)).ToString()]
if !startExists || !recentExists {
return 0.0
}
startTime, err := time.Parse(startTimeStr, time.RFC3339Nano)
if err != nil {
return 0.0
}
recentTime, err := time.Parse(recentTimeStr, time.RFC3339Nano)
if err != nil {
return 0.0
}
syncRange := time.Since(startTime)
pointFromStart := startTime.Sub(recentTime)
return pointFromStart.Seconds() / syncRange.Seconds()
}
// ConversationMessage bundles an instance of a conversation message row // ConversationMessage bundles an instance of a conversation message row
type ConversationMessage struct { type ConversationMessage struct {
ID int ID int

View File

@ -51,6 +51,12 @@ var DefaultEventsToHandle = []event.Type{
event.NewGetValMessageFromPeer, event.NewGetValMessageFromPeer,
event.ProtocolEngineStopped, event.ProtocolEngineStopped,
event.RetryServerRequest, event.RetryServerRequest,
event.PeerStateChange,
event.ServerStateChange,
event.SendMessageToPeerError,
event.NewRetValMessageFromPeer,
event.ManifestReceived,
event.FileDownloaded,
} }
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
@ -1051,6 +1057,10 @@ func (cp *cwtchPeer) eventHandler() {
// Time to either acknowledge the message or insert a new message // Time to either acknowledge the message or insert a new message
// Re-encode signature to base64 // Re-encode signature to base64
cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, base64.StdEncoding.EncodeToString(signature), dgm) cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, base64.StdEncoding.EncodeToString(signature), dgm)
if serverState, exists := cp.state[ev.Data[event.GroupServer]]; exists && serverState == connections.AUTHENTICATED {
// server is syncing, update it's most recent sync message time
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), time.Unix(int64(dgm.Timestamp), 0).Format(time.RFC3339Nano))
}
break break
} }
} }
@ -1127,8 +1137,6 @@ func (cp *cwtchPeer) eventHandler() {
} }
} }
/***** Non default but requestable handleable events *****/
case event.ManifestReceived: case event.ManifestReceived:
log.Debugf("Manifest Received Event!: %v", ev) log.Debugf("Manifest Received Event!: %v", ev)
handle := ev.Data[event.Handle] handle := ev.Data[event.Handle]
@ -1238,8 +1246,37 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock() cp.mutex.Unlock()
case event.ServerStateChange: case event.ServerStateChange:
cp.mutex.Lock() cp.mutex.Lock()
cp.state[ev.Data[event.GroupServer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
cp.state[ev.Data[event.GroupServer]] = state
cp.mutex.Unlock() cp.mutex.Unlock()
// If starting to sync, determine last message from known groups on server so we can calculate sync progress
if state == connections.AUTHENTICATED {
conversations, err := cp.FetchConversations()
mostRecentTime := time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC)
if err == nil {
for _, conversationInfo := range conversations {
if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] {
lastMessage, _ := cp.GetMostRecentMessages(conversationInfo.ID, 0, 0, 1)
if len(lastMessage) == 0 {
continue
}
lastGroupMsgTime, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp])
if err != nil {
continue
}
if lastGroupMsgTime.After(mostRecentTime) {
mostRecentTime = lastGroupMsgTime
}
}
}
}
serverInfo, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
if err == nil {
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
}
}
default: default:
if ev.EventType != "" { if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)