tweak reconnect plugin to have faster intervals; add group sync progress state to peer #433
|
@ -248,13 +248,15 @@ func (app *application) ShutdownPeer(onion string) {
|
||||||
// Shutdown shutsdown all peers of an app and then the tormanager
|
// Shutdown shutsdown all peers of an app and then the tormanager
|
||||||
func (app *application) Shutdown() {
|
func (app *application) Shutdown() {
|
||||||
for id, peer := range app.peers {
|
for id, peer := range app.peers {
|
||||||
peer.Shutdown()
|
|
||||||
log.Debugf("Shutting Down Peer %v", id)
|
log.Debugf("Shutting Down Peer %v", id)
|
||||||
|
peer.Shutdown()
|
||||||
|
log.Debugf("Shutting Down Plugins for %v", id)
|
||||||
app.appletPlugins.ShutdownPeer(id)
|
app.appletPlugins.ShutdownPeer(id)
|
||||||
log.Debugf("Shutting Down Engines for %v", id)
|
log.Debugf("Shutting Down Engines for %v", id)
|
||||||
app.engines[id].Shutdown()
|
app.engines[id].Shutdown()
|
||||||
log.Debugf("Shutting Down Bus for %v", id)
|
log.Debugf("Shutting Down Bus for %v", id)
|
||||||
app.eventBuses[id].Shutdown()
|
app.eventBuses[id].Shutdown()
|
||||||
|
log.Debugf("Done Shutdown for peer %v", id)
|
||||||
}
|
}
|
||||||
log.Debugf("Shutting Down App")
|
log.Debugf("Shutting Down App")
|
||||||
app.appBus.Shutdown()
|
app.appBus.Shutdown()
|
||||||
|
|
|
@ -8,8 +8,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const tickTime = 10 * time.Second
|
const tickTime = 5 * time.Second
|
||||||
const maxBackoff int = 32 // 320 seconds or ~5 min
|
const maxBackoff int = 64 // 320 seconds or ~5 min
|
||||||
|
|
||||||
type connectionType int
|
type connectionType int
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ func (cr *contactRetry) retryDisconnected() {
|
||||||
|
|
||||||
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
||||||
if _, exists := cr.connections.Load(id); !exists {
|
if _, exists := cr.connections.Load(id); !exists {
|
||||||
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype}
|
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
|
||||||
cr.connections.Store(id, p)
|
cr.connections.Store(id, p)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,3 +51,6 @@ const AttrRejected = "rejected-invite"
|
||||||
const AttrDownloaded = "file-downloaded"
|
const AttrDownloaded = "file-downloaded"
|
||||||
|
|
||||||
const CustomProfileImageKey = "custom-profile-image"
|
const CustomProfileImageKey = "custom-profile-image"
|
||||||
|
|
||||||
|
const SyncPreLastMessageTime = "SyncPreLastMessageTime"
|
||||||
|
const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"cwtch.im/cwtch/model/attr"
|
"cwtch.im/cwtch/model/attr"
|
||||||
"cwtch.im/cwtch/model/constants"
|
"cwtch.im/cwtch/model/constants"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AccessControl is a type determining client assigned authorization to a peer
|
// AccessControl is a type determining client assigned authorization to a peer
|
||||||
|
@ -86,6 +87,30 @@ func (ci *Conversation) IsServer() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process
|
||||||
|
// it returns a double (0-1) representing the estimated progress of the syncing
|
||||||
|
func (ci *Conversation) ServerSyncProgress() float64 {
|
||||||
|
startTimeStr, startExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)).ToString()]
|
||||||
|
recentTimeStr, recentExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)).ToString()]
|
||||||
|
|
||||||
|
if !startExists || !recentExists {
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime, err := time.Parse(startTimeStr, time.RFC3339Nano)
|
||||||
|
if err != nil {
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
recentTime, err := time.Parse(recentTimeStr, time.RFC3339Nano)
|
||||||
|
if err != nil {
|
||||||
|
return 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
syncRange := time.Since(startTime)
|
||||||
|
pointFromStart := startTime.Sub(recentTime)
|
||||||
|
return pointFromStart.Seconds() / syncRange.Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
// ConversationMessage bundles an instance of a conversation message row
|
// ConversationMessage bundles an instance of a conversation message row
|
||||||
type ConversationMessage struct {
|
type ConversationMessage struct {
|
||||||
ID int
|
ID int
|
||||||
|
|
|
@ -51,6 +51,12 @@ var DefaultEventsToHandle = []event.Type{
|
||||||
event.NewGetValMessageFromPeer,
|
event.NewGetValMessageFromPeer,
|
||||||
event.ProtocolEngineStopped,
|
event.ProtocolEngineStopped,
|
||||||
event.RetryServerRequest,
|
event.RetryServerRequest,
|
||||||
|
event.PeerStateChange,
|
||||||
|
event.ServerStateChange,
|
||||||
|
event.SendMessageToPeerError,
|
||||||
|
event.NewRetValMessageFromPeer,
|
||||||
|
event.ManifestReceived,
|
||||||
|
event.FileDownloaded,
|
||||||
}
|
}
|
||||||
|
|
||||||
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
||||||
|
@ -1051,6 +1057,10 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
// Time to either acknowledge the message or insert a new message
|
// Time to either acknowledge the message or insert a new message
|
||||||
// Re-encode signature to base64
|
// Re-encode signature to base64
|
||||||
cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, base64.StdEncoding.EncodeToString(signature), dgm)
|
cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, base64.StdEncoding.EncodeToString(signature), dgm)
|
||||||
|
if serverState, exists := cp.state[ev.Data[event.GroupServer]]; exists && serverState == connections.AUTHENTICATED {
|
||||||
|
// server is syncing, update it's most recent sync message time
|
||||||
|
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), time.Unix(int64(dgm.Timestamp), 0).Format(time.RFC3339Nano))
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1127,8 +1137,6 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/***** Non default but requestable handleable events *****/
|
|
||||||
|
|
||||||
case event.ManifestReceived:
|
case event.ManifestReceived:
|
||||||
log.Debugf("Manifest Received Event!: %v", ev)
|
log.Debugf("Manifest Received Event!: %v", ev)
|
||||||
handle := ev.Data[event.Handle]
|
handle := ev.Data[event.Handle]
|
||||||
|
@ -1238,8 +1246,37 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
case event.ServerStateChange:
|
case event.ServerStateChange:
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
cp.state[ev.Data[event.GroupServer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
|
state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
|
||||||
|
cp.state[ev.Data[event.GroupServer]] = state
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
|
|
||||||
|
// If starting to sync, determine last message from known groups on server so we can calculate sync progress
|
||||||
|
if state == connections.AUTHENTICATED {
|
||||||
|
conversations, err := cp.FetchConversations()
|
||||||
|
mostRecentTime := time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC)
|
||||||
|
if err == nil {
|
||||||
|
for _, conversationInfo := range conversations {
|
||||||
|
if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] {
|
||||||
|
lastMessage, _ := cp.GetMostRecentMessages(conversationInfo.ID, 0, 0, 1)
|
||||||
|
if len(lastMessage) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lastGroupMsgTime, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp])
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if lastGroupMsgTime.After(mostRecentTime) {
|
||||||
|
mostRecentTime = lastGroupMsgTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
serverInfo, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
|
||||||
|
if err == nil {
|
||||||
|
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
||||||
|
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
if ev.EventType != "" {
|
if ev.EventType != "" {
|
||||||
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
||||||
|
|
Loading…
Reference in New Issue