tweak reconnect plugin to have faster intervals; add group sync progress state to peer #433
|
@ -248,13 +248,15 @@ func (app *application) ShutdownPeer(onion string) {
|
|||
// Shutdown shutsdown all peers of an app and then the tormanager
|
||||
func (app *application) Shutdown() {
|
||||
for id, peer := range app.peers {
|
||||
peer.Shutdown()
|
||||
log.Debugf("Shutting Down Peer %v", id)
|
||||
peer.Shutdown()
|
||||
log.Debugf("Shutting Down Plugins for %v", id)
|
||||
app.appletPlugins.ShutdownPeer(id)
|
||||
log.Debugf("Shutting Down Engines for %v", id)
|
||||
app.engines[id].Shutdown()
|
||||
log.Debugf("Shutting Down Bus for %v", id)
|
||||
app.eventBuses[id].Shutdown()
|
||||
log.Debugf("Done Shutdown for peer %v", id)
|
||||
}
|
||||
log.Debugf("Shutting Down App")
|
||||
app.appBus.Shutdown()
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const tickTime = 10 * time.Second
|
||||
const maxBackoff int = 32 // 320 seconds or ~5 min
|
||||
const tickTime = 5 * time.Second
|
||||
const maxBackoff int = 64 // 320 seconds or ~5 min
|
||||
|
||||
type connectionType int
|
||||
|
||||
|
@ -132,7 +132,7 @@ func (cr *contactRetry) retryDisconnected() {
|
|||
|
||||
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
||||
if _, exists := cr.connections.Load(id); !exists {
|
||||
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype}
|
||||
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
|
||||
cr.connections.Store(id, p)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -51,3 +51,6 @@ const AttrRejected = "rejected-invite"
|
|||
const AttrDownloaded = "file-downloaded"
|
||||
|
||||
const CustomProfileImageKey = "custom-profile-image"
|
||||
|
||||
const SyncPreLastMessageTime = "SyncPreLastMessageTime"
|
||||
const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
// AccessControl is a type determining client assigned authorization to a peer
|
||||
|
@ -86,6 +87,30 @@ func (ci *Conversation) IsServer() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process
|
||||
// it returns a double (0-1) representing the estimated progress of the syncing
|
||||
func (ci *Conversation) ServerSyncProgress() float64 {
|
||||
startTimeStr, startExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)).ToString()]
|
||||
recentTimeStr, recentExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)).ToString()]
|
||||
|
||||
if !startExists || !recentExists {
|
||||
return 0.0
|
||||
}
|
||||
|
||||
startTime, err := time.Parse(startTimeStr, time.RFC3339Nano)
|
||||
if err != nil {
|
||||
return 0.0
|
||||
}
|
||||
recentTime, err := time.Parse(recentTimeStr, time.RFC3339Nano)
|
||||
if err != nil {
|
||||
return 0.0
|
||||
}
|
||||
|
||||
syncRange := time.Since(startTime)
|
||||
pointFromStart := startTime.Sub(recentTime)
|
||||
return pointFromStart.Seconds() / syncRange.Seconds()
|
||||
}
|
||||
|
||||
// ConversationMessage bundles an instance of a conversation message row
|
||||
type ConversationMessage struct {
|
||||
ID int
|
||||
|
|
|
@ -51,6 +51,12 @@ var DefaultEventsToHandle = []event.Type{
|
|||
event.NewGetValMessageFromPeer,
|
||||
event.ProtocolEngineStopped,
|
||||
event.RetryServerRequest,
|
||||
event.PeerStateChange,
|
||||
event.ServerStateChange,
|
||||
event.SendMessageToPeerError,
|
||||
event.NewRetValMessageFromPeer,
|
||||
event.ManifestReceived,
|
||||
event.FileDownloaded,
|
||||
}
|
||||
|
||||
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
||||
|
@ -1051,6 +1057,10 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
// Time to either acknowledge the message or insert a new message
|
||||
// Re-encode signature to base64
|
||||
cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, base64.StdEncoding.EncodeToString(signature), dgm)
|
||||
if serverState, exists := cp.state[ev.Data[event.GroupServer]]; exists && serverState == connections.AUTHENTICATED {
|
||||
// server is syncing, update it's most recent sync message time
|
||||
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), time.Unix(int64(dgm.Timestamp), 0).Format(time.RFC3339Nano))
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -1127,8 +1137,6 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
}
|
||||
}
|
||||
|
||||
/***** Non default but requestable handleable events *****/
|
||||
|
||||
case event.ManifestReceived:
|
||||
log.Debugf("Manifest Received Event!: %v", ev)
|
||||
handle := ev.Data[event.Handle]
|
||||
|
@ -1238,8 +1246,37 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.mutex.Unlock()
|
||||
case event.ServerStateChange:
|
||||
cp.mutex.Lock()
|
||||
cp.state[ev.Data[event.GroupServer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
|
||||
state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
|
||||
cp.state[ev.Data[event.GroupServer]] = state
|
||||
cp.mutex.Unlock()
|
||||
|
||||
// If starting to sync, determine last message from known groups on server so we can calculate sync progress
|
||||
if state == connections.AUTHENTICATED {
|
||||
conversations, err := cp.FetchConversations()
|
||||
mostRecentTime := time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC)
|
||||
if err == nil {
|
||||
for _, conversationInfo := range conversations {
|
||||
if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] {
|
||||
lastMessage, _ := cp.GetMostRecentMessages(conversationInfo.ID, 0, 0, 1)
|
||||
if len(lastMessage) == 0 {
|
||||
continue
|
||||
}
|
||||
lastGroupMsgTime, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if lastGroupMsgTime.After(mostRecentTime) {
|
||||
mostRecentTime = lastGroupMsgTime
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
serverInfo, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
|
||||
if err == nil {
|
||||
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
||||
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
||||
}
|
||||
}
|
||||
default:
|
||||
if ev.EventType != "" {
|
||||
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
||||
|
|
Loading…
Reference in New Issue