Make contact retry more responsive in the optimisitic case
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2021-05-07 16:16:22 -07:00
parent 9f20802a1d
commit 0f0b91fc98
3 changed files with 16 additions and 14 deletions

View File

@ -114,7 +114,7 @@ func (cr *contactRetry) retryDisconnected() {
if p.state == connections.DISCONNECTED { if p.state == connections.DISCONNECTED {
p.ticks++ p.ticks++
if p.ticks == p.backoff { if p.ticks >= p.backoff {
p.ticks = 0 p.ticks = 0
if cr.networkUp { if cr.networkUp {
if p.ctype == peerConn { if p.ctype == peerConn {
@ -132,7 +132,7 @@ func (cr *contactRetry) retryDisconnected() {
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
if _, exists := cr.connections.Load(id); !exists { if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype} p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype}
cr.connections.Store(id, p) cr.connections.Store(id, p)
return return
} }
@ -141,7 +141,9 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
p := pinf.(*contact) p := pinf.(*contact)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED p.state = connections.DISCONNECTED
if p.backoff < maxBakoff { if p.backoff == 0 {
p.backoff = 1
} else if p.backoff < maxBakoff {
p.backoff *= 2 p.backoff *= 2
} }
p.ticks = 0 p.ticks = 0
@ -149,7 +151,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
p.state = state p.state = state
} else if state == connections.AUTHENTICATED { } else if state == connections.AUTHENTICATED {
p.state = state p.state = state
p.backoff = 1 p.backoff = 0
} }
} }

View File

@ -16,6 +16,8 @@ import (
"time" "time"
) )
const lastKnownSignature = "LastKnowSignature"
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
@ -456,13 +458,9 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass)) tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion)) tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
if yExists && onionExists { if yExists && onionExists {
messages := cp.GetContact(onion).Timeline.GetMessages() signature, exists := cp.GetContactAttribute(onion, lastKnownSignature)
// by default we do a full sync if !exists {
signature := base64.StdEncoding.EncodeToString([]byte{}) signature = base64.StdEncoding.EncodeToString([]byte{})
// if we know of messages on the server, we only ask to receive newer messages
if len(messages) > 0 {
lastMessageIndex := len(messages) - 1
signature = base64.StdEncoding.EncodeToString(messages[lastMessageIndex].Signature)
} }
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature})) cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
return nil return nil
@ -679,9 +677,11 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock() cp.mutex.Unlock()
case event.EncryptedGroupMessage: case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline // If successful, a side effect is the message is added to the group's timeline
cp.mutex.Lock()
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
cp.mutex.Lock()
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature) ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
cp.mutex.Unlock() cp.mutex.Unlock()
if ok && !seen { if ok && !seen {
@ -711,7 +711,7 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock() cp.mutex.Unlock()
case event.RetryServerRequest: case event.RetryServerRequest:
// Automated Join Server Request triggered by a plugin. // Automated Join Server Request triggered by a plugin.
log.Infof("profile received an automated retry event for %v", ev.Data[event.GroupServer]) log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
cp.JoinServer(ev.Data[event.GroupServer]) cp.JoinServer(ev.Data[event.GroupServer])
case event.NewGetValMessageFromPeer: case event.NewGetValMessageFromPeer:
onion := ev.Data[event.RemotePeer] onion := ev.Data[event.RemotePeer]

View File

@ -439,7 +439,7 @@ func (e *engine) deleteConnection(id string) {
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) { func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
// Publish Event so that a Profile Engine can deal with it. // Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)})) e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.GroupServer: server, event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
} }
// sendMessageToGroup attempts to sent the given message to the given group id. // sendMessageToGroup attempts to sent the given message to the given group id.