Handle Server Disconnections, and Partial Syncing
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2021-05-07 13:54:12 -07:00
parent 924be178af
commit 678c820db2
4 changed files with 70 additions and 31 deletions

View File

@ -34,6 +34,7 @@ type contactRetry struct {
running bool
breakChan chan bool
onion string
lastCheck time.Time
connections sync.Map //[string]*contact
}
@ -59,6 +60,10 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
for {
if time.Since(cr.lastCheck) > tickTime {
cr.retryDisconnected()
cr.lastCheck = time.Now()
}
select {
case e := <-cr.queue.OutChan():
switch e.EventType {
@ -94,22 +99,7 @@ func (cr *contactRetry) run() {
}
case <-time.After(tickTime):
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks == p.backoff {
p.ticks = 0
if cr.networkUp {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
}
}
}
return true
})
continue
case <-cr.breakChan:
cr.running = false
@ -118,6 +108,28 @@ func (cr *contactRetry) run() {
}
}
func (cr *contactRetry) retryDisconnected() {
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks == p.backoff {
p.ticks = 0
if cr.networkUp {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}
}
}
}
return true
})
}
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype}

View File

@ -456,7 +456,15 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
if yExists && onionExists {
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion}))
messages := cp.GetContact(onion).Timeline.GetMessages()
// by default we do a full sync
signature := base64.StdEncoding.EncodeToString([]byte{})
// if we know of messages on the server, we only ask to receive newer messages
if len(messages) > 0 {
lastMessageIndex := len(messages) - 1
signature = base64.StdEncoding.EncodeToString(messages[lastMessageIndex].Signature)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
return nil
}
}
@ -703,7 +711,7 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock()
case event.RetryServerRequest:
// Automated Join Server Request triggered by a plugin.
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
log.Infof("profile received an automated retry event for %v", ev.Data[event.GroupServer])
cp.JoinServer(ev.Data[event.GroupServer])
case event.NewGetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]

View File

@ -128,7 +128,12 @@ func (e *engine) eventHandler() {
case event.InvitePeerToGroup:
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
case event.JoinServer:
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY])
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
if err != nil {
// will result in a full sync
signature = []byte{}
}
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
case event.LeaveServer:
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
if ok {
@ -270,7 +275,7 @@ func (e *engine) peerWithOnion(onion string) {
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) {
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
service, exists := e.ephemeralServices.Load(onion)
if exists {
@ -292,7 +297,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
Y := ristretto255.NewElement()
Y.UnmarshalText([]byte(tokenServerY))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, e.receiveGroupMessage, e.serverSynced))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverSynced, e.serverDisconnected))
e.ephemeralServices.Store(onion, ephemeralService)
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
@ -361,7 +366,6 @@ func (e *engine) serverConnected(onion string) {
}
func (e *engine) serverSynced(onion string) {
log.Debugf("SERVER SYNCED: %v", onion)
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: onion,
event.ConnectionState: ConnectionStateName[SYNCED],

View File

@ -15,7 +15,7 @@ import (
)
// NewTokenBoardClient generates a new Client for Token Board
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string)) tapir.Application {
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, lastKnownSignature []byte, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string), serverClosedHandler func(server string)) tapir.Application {
tba := new(TokenBoardClient)
tba.acn = acn
tba.tokenService = privacypass.NewTokenServer()
@ -23,6 +23,8 @@ func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenSer
tba.tokenServiceOnion = tokenServiceOnion
tba.receiveGroupMessageHandler = groupMessageHandler
tba.serverSyncedHandler = serverSyncedHandler
tba.serverClosedHandler = serverClosedHandler
tba.lastKnownSignature = lastKnownSignature
return tba
}
@ -32,18 +34,21 @@ type TokenBoardClient struct {
connection tapir.Connection
receiveGroupMessageHandler func(server string, gm *groups.EncryptedGroupMessage)
serverSyncedHandler func(server string)
serverClosedHandler func(server string)
// Token service handling
acn connectivity.ACN
tokens []*privacypass.Token
tokenService *privacypass.TokenServer
tokenServiceOnion string
acn connectivity.ACN
tokens []*privacypass.Token
tokenService *privacypass.TokenServer
tokenServiceOnion string
lastKnownSignature []byte
}
// NewInstance Client a new TokenBoardApp
func (ta *TokenBoardClient) NewInstance() tapir.Application {
tba := new(TokenBoardClient)
tba.serverSyncedHandler = ta.serverSyncedHandler
tba.serverClosedHandler = ta.serverClosedHandler
tba.receiveGroupMessageHandler = ta.receiveGroupMessageHandler
tba.acn = ta.acn
tba.tokenService = ta.tokenService
@ -72,6 +77,7 @@ func (ta *TokenBoardClient) Listen() {
data := ta.connection.Expect()
if len(data) == 0 {
log.Debugf("Server closed the connection...")
ta.serverClosedHandler(ta.connection.Hostname())
return // connection is closed
}
@ -80,6 +86,7 @@ func (ta *TokenBoardClient) Listen() {
var message groups.Message
if err := json.Unmarshal(data, &message); err != nil {
log.Debugf("Server sent an unexpected message, closing the connection: %v", err)
ta.serverClosedHandler(ta.connection.Hostname())
ta.connection.Close()
return
}
@ -89,8 +96,8 @@ func (ta *TokenBoardClient) Listen() {
if message.NewMessage != nil {
ta.receiveGroupMessageHandler(ta.connection.Hostname(), &message.NewMessage.EGM)
} else {
// TODO: Send this error to the UI
log.Debugf("Server sent an unexpected NewMessage, closing the connection: %s", data)
ta.serverClosedHandler(ta.connection.Hostname())
ta.connection.Close()
return
}
@ -101,12 +108,21 @@ func (ta *TokenBoardClient) Listen() {
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
for i := 0; i < message.ReplayResult.NumMessages; i++ {
data := ta.connection.Expect()
if len(data) == 0 {
log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection")
ta.serverClosedHandler(ta.connection.Hostname())
ta.connection.Close()
return
}
egm := &groups.EncryptedGroupMessage{}
if err := json.Unmarshal(data, egm); err == nil {
ta.receiveGroupMessageHandler(ta.connection.Hostname(), egm)
ta.lastKnownSignature = egm.Signature
} else {
// TODO: Send this error to the UI
log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection: %v", err)
ta.serverClosedHandler(ta.connection.Hostname())
ta.connection.Close()
return
}
@ -119,8 +135,7 @@ func (ta *TokenBoardClient) Listen() {
// Replay posts a Replay Message to the server.
func (ta *TokenBoardClient) Replay() {
// TODO - Allow configurable ranges
data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: []byte{}}})
data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: ta.lastKnownSignature}})
ta.connection.Send(data)
}