diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 3167146..c3364ba 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -34,6 +34,7 @@ type contactRetry struct { running bool breakChan chan bool onion string + lastCheck time.Time connections sync.Map //[string]*contact } @@ -59,6 +60,10 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.ServerStateChange, cr.queue) for { + if time.Since(cr.lastCheck) > tickTime { + cr.retryDisconnected() + cr.lastCheck = time.Now() + } select { case e := <-cr.queue.OutChan(): switch e.EventType { @@ -94,22 +99,7 @@ func (cr *contactRetry) run() { } case <-time.After(tickTime): - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) - - if p.state == connections.DISCONNECTED { - p.ticks++ - if p.ticks == p.backoff { - p.ticks = 0 - if cr.networkUp { - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - } - } - } - return true - }) + continue case <-cr.breakChan: cr.running = false @@ -118,9 +108,31 @@ func (cr *contactRetry) run() { } } +func (cr *contactRetry) retryDisconnected() { + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + + if p.state == connections.DISCONNECTED { + p.ticks++ + if p.ticks >= p.backoff { + p.ticks = 0 + if cr.networkUp { + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + } + } + } + return true + }) +} + func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { if _, exists := cr.connections.Load(id); !exists { - p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype} + p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype} cr.connections.Store(id, p) return } @@ -129,7 +141,9 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState p := pinf.(*contact) if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { p.state = connections.DISCONNECTED - if p.backoff < maxBakoff { + if p.backoff == 0 { + p.backoff = 1 + } else if p.backoff < maxBakoff { p.backoff *= 2 } p.ticks = 0 @@ -137,7 +151,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState p.state = state } else if state == connections.AUTHENTICATED { p.state = state - p.backoff = 1 + p.backoff = 0 } } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index a013c94..e5abba4 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -16,6 +16,8 @@ import ( "time" ) +const lastKnownSignature = "LastKnowSignature" + var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, @@ -456,7 +458,11 @@ func (cp *cwtchPeer) JoinServer(onion string) error { tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass)) tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion)) if yExists && onionExists { - cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion})) + signature, exists := cp.GetContactAttribute(onion, lastKnownSignature) + if !exists { + signature = base64.StdEncoding.EncodeToString([]byte{}) + } + cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature})) return nil } } @@ -671,9 +677,11 @@ func (cp *cwtchPeer) eventHandler() { cp.mutex.Unlock() case event.EncryptedGroupMessage: // If successful, a side effect is the message is added to the group's timeline - cp.mutex.Lock() + ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature]) + cp.mutex.Lock() ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature) cp.mutex.Unlock() if ok && !seen { diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 5419ea8..15b43c6 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -128,7 +128,12 @@ func (e *engine) eventHandler() { case event.InvitePeerToGroup: e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite])) case event.JoinServer: - e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY]) + signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + if err != nil { + // will result in a full sync + signature = []byte{} + } + e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) case event.LeaveServer: es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer]) if ok { @@ -270,7 +275,7 @@ func (e *engine) peerWithOnion(onion string) { // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. -func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) { +func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { service, exists := e.ephemeralServices.Load(onion) if exists { @@ -292,7 +297,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke Y := ristretto255.NewElement() Y.UnmarshalText([]byte(tokenServerY)) - connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, e.receiveGroupMessage, e.serverSynced)) + connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverSynced, e.serverDisconnected)) e.ephemeralServices.Store(onion, ephemeralService) // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) @@ -361,7 +366,6 @@ func (e *engine) serverConnected(onion string) { } func (e *engine) serverSynced(onion string) { - log.Debugf("SERVER SYNCED: %v", onion) e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: onion, event.ConnectionState: ConnectionStateName[SYNCED], @@ -435,7 +439,7 @@ func (e *engine) deleteConnection(id string) { func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) { // Publish Event so that a Profile Engine can deal with it. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! - e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)})) + e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.GroupServer: server, event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)})) } // sendMessageToGroup attempts to sent the given message to the given group id. diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 6a28852..f4b9cb0 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -15,7 +15,7 @@ import ( ) // NewTokenBoardClient generates a new Client for Token Board -func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string)) tapir.Application { +func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, lastKnownSignature []byte, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string), serverClosedHandler func(server string)) tapir.Application { tba := new(TokenBoardClient) tba.acn = acn tba.tokenService = privacypass.NewTokenServer() @@ -23,6 +23,8 @@ func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenSer tba.tokenServiceOnion = tokenServiceOnion tba.receiveGroupMessageHandler = groupMessageHandler tba.serverSyncedHandler = serverSyncedHandler + tba.serverClosedHandler = serverClosedHandler + tba.lastKnownSignature = lastKnownSignature return tba } @@ -32,22 +34,26 @@ type TokenBoardClient struct { connection tapir.Connection receiveGroupMessageHandler func(server string, gm *groups.EncryptedGroupMessage) serverSyncedHandler func(server string) + serverClosedHandler func(server string) // Token service handling - acn connectivity.ACN - tokens []*privacypass.Token - tokenService *privacypass.TokenServer - tokenServiceOnion string + acn connectivity.ACN + tokens []*privacypass.Token + tokenService *privacypass.TokenServer + tokenServiceOnion string + lastKnownSignature []byte } // NewInstance Client a new TokenBoardApp func (ta *TokenBoardClient) NewInstance() tapir.Application { tba := new(TokenBoardClient) tba.serverSyncedHandler = ta.serverSyncedHandler + tba.serverClosedHandler = ta.serverClosedHandler tba.receiveGroupMessageHandler = ta.receiveGroupMessageHandler tba.acn = ta.acn tba.tokenService = ta.tokenService tba.tokenServiceOnion = ta.tokenServiceOnion + tba.lastKnownSignature = ta.lastKnownSignature return tba } @@ -72,14 +78,15 @@ func (ta *TokenBoardClient) Listen() { data := ta.connection.Expect() if len(data) == 0 { log.Debugf("Server closed the connection...") + ta.serverClosedHandler(ta.connection.Hostname()) return // connection is closed } // We always expect the server to follow protocol, and the second it doesn't we close the connection - // TODO issue an error so the client is aware var message groups.Message if err := json.Unmarshal(data, &message); err != nil { log.Debugf("Server sent an unexpected message, closing the connection: %v", err) + ta.serverClosedHandler(ta.connection.Hostname()) ta.connection.Close() return } @@ -89,8 +96,8 @@ func (ta *TokenBoardClient) Listen() { if message.NewMessage != nil { ta.receiveGroupMessageHandler(ta.connection.Hostname(), &message.NewMessage.EGM) } else { - // TODO: Send this error to the UI log.Debugf("Server sent an unexpected NewMessage, closing the connection: %s", data) + ta.serverClosedHandler(ta.connection.Hostname()) ta.connection.Close() return } @@ -101,12 +108,21 @@ func (ta *TokenBoardClient) Listen() { log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages) for i := 0; i < message.ReplayResult.NumMessages; i++ { data := ta.connection.Expect() + + if len(data) == 0 { + log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection") + ta.serverClosedHandler(ta.connection.Hostname()) + ta.connection.Close() + return + } + egm := &groups.EncryptedGroupMessage{} if err := json.Unmarshal(data, egm); err == nil { ta.receiveGroupMessageHandler(ta.connection.Hostname(), egm) + ta.lastKnownSignature = egm.Signature } else { - // TODO: Send this error to the UI log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection: %v", err) + ta.serverClosedHandler(ta.connection.Hostname()) ta.connection.Close() return } @@ -119,8 +135,7 @@ func (ta *TokenBoardClient) Listen() { // Replay posts a Replay Message to the server. func (ta *TokenBoardClient) Replay() { - // TODO - Allow configurable ranges - data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: []byte{}}}) + data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: ta.lastKnownSignature}}) ta.connection.Send(data) }