diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 3167146..725d091 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -34,6 +34,7 @@ type contactRetry struct { running bool breakChan chan bool onion string + lastCheck time.Time connections sync.Map //[string]*contact } @@ -59,6 +60,10 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.ServerStateChange, cr.queue) for { + if time.Since(cr.lastCheck) > tickTime { + cr.retryDisconnected() + cr.lastCheck = time.Now() + } select { case e := <-cr.queue.OutChan(): switch e.EventType { @@ -94,22 +99,7 @@ func (cr *contactRetry) run() { } case <-time.After(tickTime): - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) - - if p.state == connections.DISCONNECTED { - p.ticks++ - if p.ticks == p.backoff { - p.ticks = 0 - if cr.networkUp { - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - } - } - } - return true - }) + continue case <-cr.breakChan: cr.running = false @@ -118,6 +108,28 @@ func (cr *contactRetry) run() { } } +func (cr *contactRetry) retryDisconnected() { + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + + if p.state == connections.DISCONNECTED { + p.ticks++ + if p.ticks == p.backoff { + p.ticks = 0 + if cr.networkUp { + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + } + } + } + return true + }) +} + func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { if _, exists := cr.connections.Load(id); !exists { p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index a013c94..b5319f0 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -456,7 +456,15 @@ func (cp *cwtchPeer) JoinServer(onion string) error { tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass)) tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion)) if yExists && onionExists { - cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion})) + messages := cp.GetContact(onion).Timeline.GetMessages() + // by default we do a full sync + signature := base64.StdEncoding.EncodeToString([]byte{}) + // if we know of messages on the server, we only ask to receive newer messages + if len(messages) > 0 { + lastMessageIndex := len(messages) - 1 + signature = base64.StdEncoding.EncodeToString(messages[lastMessageIndex].Signature) + } + cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature})) return nil } } @@ -703,7 +711,7 @@ func (cp *cwtchPeer) eventHandler() { cp.mutex.Unlock() case event.RetryServerRequest: // Automated Join Server Request triggered by a plugin. - log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer]) + log.Infof("profile received an automated retry event for %v", ev.Data[event.GroupServer]) cp.JoinServer(ev.Data[event.GroupServer]) case event.NewGetValMessageFromPeer: onion := ev.Data[event.RemotePeer] diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 5419ea8..a6e5b5d 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -128,7 +128,12 @@ func (e *engine) eventHandler() { case event.InvitePeerToGroup: e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite])) case event.JoinServer: - e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY]) + signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + if err != nil { + // will result in a full sync + signature = []byte{} + } + e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) case event.LeaveServer: es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer]) if ok { @@ -270,7 +275,7 @@ func (e *engine) peerWithOnion(onion string) { // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. -func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) { +func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { service, exists := e.ephemeralServices.Load(onion) if exists { @@ -292,7 +297,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke Y := ristretto255.NewElement() Y.UnmarshalText([]byte(tokenServerY)) - connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, e.receiveGroupMessage, e.serverSynced)) + connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverSynced, e.serverDisconnected)) e.ephemeralServices.Store(onion, ephemeralService) // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) @@ -361,7 +366,6 @@ func (e *engine) serverConnected(onion string) { } func (e *engine) serverSynced(onion string) { - log.Debugf("SERVER SYNCED: %v", onion) e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: onion, event.ConnectionState: ConnectionStateName[SYNCED], diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 6a28852..1fdf6c4 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -15,7 +15,7 @@ import ( ) // NewTokenBoardClient generates a new Client for Token Board -func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string)) tapir.Application { +func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, lastKnownSignature []byte, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string), serverClosedHandler func(server string)) tapir.Application { tba := new(TokenBoardClient) tba.acn = acn tba.tokenService = privacypass.NewTokenServer() @@ -23,6 +23,8 @@ func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenSer tba.tokenServiceOnion = tokenServiceOnion tba.receiveGroupMessageHandler = groupMessageHandler tba.serverSyncedHandler = serverSyncedHandler + tba.serverClosedHandler = serverClosedHandler + tba.lastKnownSignature = lastKnownSignature return tba } @@ -32,18 +34,21 @@ type TokenBoardClient struct { connection tapir.Connection receiveGroupMessageHandler func(server string, gm *groups.EncryptedGroupMessage) serverSyncedHandler func(server string) + serverClosedHandler func(server string) // Token service handling - acn connectivity.ACN - tokens []*privacypass.Token - tokenService *privacypass.TokenServer - tokenServiceOnion string + acn connectivity.ACN + tokens []*privacypass.Token + tokenService *privacypass.TokenServer + tokenServiceOnion string + lastKnownSignature []byte } // NewInstance Client a new TokenBoardApp func (ta *TokenBoardClient) NewInstance() tapir.Application { tba := new(TokenBoardClient) tba.serverSyncedHandler = ta.serverSyncedHandler + tba.serverClosedHandler = ta.serverClosedHandler tba.receiveGroupMessageHandler = ta.receiveGroupMessageHandler tba.acn = ta.acn tba.tokenService = ta.tokenService @@ -72,6 +77,7 @@ func (ta *TokenBoardClient) Listen() { data := ta.connection.Expect() if len(data) == 0 { log.Debugf("Server closed the connection...") + ta.serverClosedHandler(ta.connection.Hostname()) return // connection is closed } @@ -80,6 +86,7 @@ func (ta *TokenBoardClient) Listen() { var message groups.Message if err := json.Unmarshal(data, &message); err != nil { log.Debugf("Server sent an unexpected message, closing the connection: %v", err) + ta.serverClosedHandler(ta.connection.Hostname()) ta.connection.Close() return } @@ -89,8 +96,8 @@ func (ta *TokenBoardClient) Listen() { if message.NewMessage != nil { ta.receiveGroupMessageHandler(ta.connection.Hostname(), &message.NewMessage.EGM) } else { - // TODO: Send this error to the UI log.Debugf("Server sent an unexpected NewMessage, closing the connection: %s", data) + ta.serverClosedHandler(ta.connection.Hostname()) ta.connection.Close() return } @@ -101,12 +108,21 @@ func (ta *TokenBoardClient) Listen() { log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages) for i := 0; i < message.ReplayResult.NumMessages; i++ { data := ta.connection.Expect() + + if len(data) == 0 { + log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection") + ta.serverClosedHandler(ta.connection.Hostname()) + ta.connection.Close() + return + } + egm := &groups.EncryptedGroupMessage{} if err := json.Unmarshal(data, egm); err == nil { ta.receiveGroupMessageHandler(ta.connection.Hostname(), egm) + ta.lastKnownSignature = egm.Signature } else { - // TODO: Send this error to the UI log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection: %v", err) + ta.serverClosedHandler(ta.connection.Hostname()) ta.connection.Close() return } @@ -119,8 +135,7 @@ func (ta *TokenBoardClient) Listen() { // Replay posts a Replay Message to the server. func (ta *TokenBoardClient) Replay() { - // TODO - Allow configurable ranges - data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: []byte{}}}) + data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: ta.lastKnownSignature}}) ta.connection.Send(data) }