Merge pull request 'tweak reconnect plugin to have faster intervals; add group sync progress state to peer' (#433) from state into master
continuous-integration/drone/push Build is pending Details
continuous-integration/drone/tag Build is pending Details

Reviewed-on: #433
This commit is contained in:
Sarah Jamie Lewis 2022-03-04 00:27:50 +00:00
commit a9ab91688b
5 changed files with 74 additions and 7 deletions

View File

@ -248,13 +248,15 @@ func (app *application) ShutdownPeer(onion string) {
// Shutdown shutsdown all peers of an app and then the tormanager
func (app *application) Shutdown() {
for id, peer := range app.peers {
peer.Shutdown()
log.Debugf("Shutting Down Peer %v", id)
peer.Shutdown()
log.Debugf("Shutting Down Plugins for %v", id)
app.appletPlugins.ShutdownPeer(id)
log.Debugf("Shutting Down Engines for %v", id)
app.engines[id].Shutdown()
log.Debugf("Shutting Down Bus for %v", id)
app.eventBuses[id].Shutdown()
log.Debugf("Done Shutdown for peer %v", id)
}
log.Debugf("Shutting Down App")
app.appBus.Shutdown()

View File

@ -8,8 +8,8 @@ import (
"time"
)
const tickTime = 10 * time.Second
const maxBackoff int = 32 // 320 seconds or ~5 min
const tickTime = 5 * time.Second
const maxBackoff int = 64 // 320 seconds or ~5 min
type connectionType int
@ -132,7 +132,7 @@ func (cr *contactRetry) retryDisconnected() {
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype}
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
cr.connections.Store(id, p)
return
}

View File

@ -51,3 +51,6 @@ const AttrRejected = "rejected-invite"
const AttrDownloaded = "file-downloaded"
const CustomProfileImageKey = "custom-profile-image"
const SyncPreLastMessageTime = "SyncPreLastMessageTime"
const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"

View File

@ -4,6 +4,7 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"encoding/json"
"time"
)
// AccessControl is a type determining client assigned authorization to a peer
@ -86,6 +87,30 @@ func (ci *Conversation) IsServer() bool {
return false
}
// ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process
// it returns a double (0-1) representing the estimated progress of the syncing
func (ci *Conversation) ServerSyncProgress() float64 {
startTimeStr, startExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)).ToString()]
recentTimeStr, recentExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)).ToString()]
if !startExists || !recentExists {
return 0.0
}
startTime, err := time.Parse(startTimeStr, time.RFC3339Nano)
if err != nil {
return 0.0
}
recentTime, err := time.Parse(recentTimeStr, time.RFC3339Nano)
if err != nil {
return 0.0
}
syncRange := time.Since(startTime)
pointFromStart := startTime.Sub(recentTime)
return pointFromStart.Seconds() / syncRange.Seconds()
}
// ConversationMessage bundles an instance of a conversation message row
type ConversationMessage struct {
ID int

View File

@ -51,6 +51,12 @@ var DefaultEventsToHandle = []event.Type{
event.NewGetValMessageFromPeer,
event.ProtocolEngineStopped,
event.RetryServerRequest,
event.PeerStateChange,
event.ServerStateChange,
event.SendMessageToPeerError,
event.NewRetValMessageFromPeer,
event.ManifestReceived,
event.FileDownloaded,
}
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
@ -1051,6 +1057,10 @@ func (cp *cwtchPeer) eventHandler() {
// Time to either acknowledge the message or insert a new message
// Re-encode signature to base64
cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, base64.StdEncoding.EncodeToString(signature), dgm)
if serverState, exists := cp.state[ev.Data[event.GroupServer]]; exists && serverState == connections.AUTHENTICATED {
// server is syncing, update it's most recent sync message time
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), time.Unix(int64(dgm.Timestamp), 0).Format(time.RFC3339Nano))
}
break
}
}
@ -1127,8 +1137,6 @@ func (cp *cwtchPeer) eventHandler() {
}
}
/***** Non default but requestable handleable events *****/
case event.ManifestReceived:
log.Debugf("Manifest Received Event!: %v", ev)
handle := ev.Data[event.Handle]
@ -1238,8 +1246,37 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock()
case event.ServerStateChange:
cp.mutex.Lock()
cp.state[ev.Data[event.GroupServer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
cp.state[ev.Data[event.GroupServer]] = state
cp.mutex.Unlock()
// If starting to sync, determine last message from known groups on server so we can calculate sync progress
if state == connections.AUTHENTICATED {
conversations, err := cp.FetchConversations()
mostRecentTime := time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC)
if err == nil {
for _, conversationInfo := range conversations {
if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] {
lastMessage, _ := cp.GetMostRecentMessages(conversationInfo.ID, 0, 0, 1)
if len(lastMessage) == 0 {
continue
}
lastGroupMsgTime, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp])
if err != nil {
continue
}
if lastGroupMsgTime.After(mostRecentTime) {
mostRecentTime = lastGroupMsgTime
}
}
}
}
serverInfo, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
if err == nil {
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
}
}
default:
if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)