Handle Server Disconnections, and Partial Syncing #354
|
@ -34,6 +34,7 @@ type contactRetry struct {
|
|||
running bool
|
||||
breakChan chan bool
|
||||
onion string
|
||||
lastCheck time.Time
|
||||
|
||||
connections sync.Map //[string]*contact
|
||||
}
|
||||
|
@ -59,6 +60,10 @@ func (cr *contactRetry) run() {
|
|||
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
||||
|
||||
for {
|
||||
if time.Since(cr.lastCheck) > tickTime {
|
||||
cr.retryDisconnected()
|
||||
cr.lastCheck = time.Now()
|
||||
}
|
||||
select {
|
||||
case e := <-cr.queue.OutChan():
|
||||
switch e.EventType {
|
||||
|
@ -94,22 +99,7 @@ func (cr *contactRetry) run() {
|
|||
}
|
||||
|
||||
case <-time.After(tickTime):
|
||||
cr.connections.Range(func(k, v interface{}) bool {
|
||||
p := v.(*contact)
|
||||
|
||||
if p.state == connections.DISCONNECTED {
|
||||
p.ticks++
|
||||
if p.ticks == p.backoff {
|
||||
p.ticks = 0
|
||||
if cr.networkUp {
|
||||
if p.ctype == peerConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
continue
|
||||
|
||||
case <-cr.breakChan:
|
||||
cr.running = false
|
||||
|
@ -118,9 +108,31 @@ func (cr *contactRetry) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (cr *contactRetry) retryDisconnected() {
|
||||
cr.connections.Range(func(k, v interface{}) bool {
|
||||
p := v.(*contact)
|
||||
|
||||
if p.state == connections.DISCONNECTED {
|
||||
p.ticks++
|
||||
if p.ticks >= p.backoff {
|
||||
p.ticks = 0
|
||||
if cr.networkUp {
|
||||
if p.ctype == peerConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
||||
}
|
||||
if p.ctype == serverConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
||||
if _, exists := cr.connections.Load(id); !exists {
|
||||
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype}
|
||||
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype}
|
||||
cr.connections.Store(id, p)
|
||||
return
|
||||
}
|
||||
|
@ -129,7 +141,9 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
|
|||
p := pinf.(*contact)
|
||||
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
|
||||
p.state = connections.DISCONNECTED
|
||||
if p.backoff < maxBakoff {
|
||||
if p.backoff == 0 {
|
||||
p.backoff = 1
|
||||
} else if p.backoff < maxBakoff {
|
||||
p.backoff *= 2
|
||||
}
|
||||
p.ticks = 0
|
||||
|
@ -137,7 +151,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
|
|||
p.state = state
|
||||
} else if state == connections.AUTHENTICATED {
|
||||
p.state = state
|
||||
p.backoff = 1
|
||||
p.backoff = 0
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const lastKnownSignature = "LastKnowSignature"
|
||||
|
||||
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
|
||||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
|
||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
|
||||
|
@ -456,7 +458,11 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
|||
tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
|
||||
tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
|
||||
if yExists && onionExists {
|
||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion}))
|
||||
signature, exists := cp.GetContactAttribute(onion, lastKnownSignature)
|
||||
if !exists {
|
||||
signature = base64.StdEncoding.EncodeToString([]byte{})
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -671,9 +677,11 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.mutex.Unlock()
|
||||
case event.EncryptedGroupMessage:
|
||||
// If successful, a side effect is the message is added to the group's timeline
|
||||
cp.mutex.Lock()
|
||||
|
||||
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
|
||||
cp.mutex.Lock()
|
||||
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
|
||||
cp.mutex.Unlock()
|
||||
if ok && !seen {
|
||||
|
|
|
@ -128,7 +128,12 @@ func (e *engine) eventHandler() {
|
|||
case event.InvitePeerToGroup:
|
||||
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
|
||||
case event.JoinServer:
|
||||
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY])
|
||||
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
if err != nil {
|
||||
// will result in a full sync
|
||||
signature = []byte{}
|
||||
}
|
||||
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||
case event.LeaveServer:
|
||||
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
|
||||
if ok {
|
||||
|
@ -270,7 +275,7 @@ func (e *engine) peerWithOnion(onion string) {
|
|||
|
||||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||
// needs to be run in a goroutine as will block on Open.
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) {
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||
|
||||
service, exists := e.ephemeralServices.Load(onion)
|
||||
if exists {
|
||||
|
@ -292,7 +297,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
|
|||
|
||||
Y := ristretto255.NewElement()
|
||||
Y.UnmarshalText([]byte(tokenServerY))
|
||||
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, e.receiveGroupMessage, e.serverSynced))
|
||||
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverSynced, e.serverDisconnected))
|
||||
e.ephemeralServices.Store(onion, ephemeralService)
|
||||
// If we are already connected...check if we are authed and issue an auth event
|
||||
// (This allows the ui to be stateless)
|
||||
|
@ -361,7 +366,6 @@ func (e *engine) serverConnected(onion string) {
|
|||
}
|
||||
|
||||
func (e *engine) serverSynced(onion string) {
|
||||
log.Debugf("SERVER SYNCED: %v", onion)
|
||||
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: onion,
|
||||
event.ConnectionState: ConnectionStateName[SYNCED],
|
||||
|
@ -435,7 +439,7 @@ func (e *engine) deleteConnection(id string) {
|
|||
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
|
||||
// Publish Event so that a Profile Engine can deal with it.
|
||||
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
|
||||
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
|
||||
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.GroupServer: server, event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
|
||||
}
|
||||
|
||||
// sendMessageToGroup attempts to sent the given message to the given group id.
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
)
|
||||
|
||||
// NewTokenBoardClient generates a new Client for Token Board
|
||||
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string)) tapir.Application {
|
||||
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, lastKnownSignature []byte, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string), serverClosedHandler func(server string)) tapir.Application {
|
||||
tba := new(TokenBoardClient)
|
||||
tba.acn = acn
|
||||
tba.tokenService = privacypass.NewTokenServer()
|
||||
|
@ -23,6 +23,8 @@ func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenSer
|
|||
tba.tokenServiceOnion = tokenServiceOnion
|
||||
tba.receiveGroupMessageHandler = groupMessageHandler
|
||||
tba.serverSyncedHandler = serverSyncedHandler
|
||||
tba.serverClosedHandler = serverClosedHandler
|
||||
tba.lastKnownSignature = lastKnownSignature
|
||||
return tba
|
||||
}
|
||||
|
||||
|
@ -32,22 +34,26 @@ type TokenBoardClient struct {
|
|||
connection tapir.Connection
|
||||
receiveGroupMessageHandler func(server string, gm *groups.EncryptedGroupMessage)
|
||||
serverSyncedHandler func(server string)
|
||||
serverClosedHandler func(server string)
|
||||
|
||||
// Token service handling
|
||||
acn connectivity.ACN
|
||||
tokens []*privacypass.Token
|
||||
tokenService *privacypass.TokenServer
|
||||
tokenServiceOnion string
|
||||
acn connectivity.ACN
|
||||
tokens []*privacypass.Token
|
||||
tokenService *privacypass.TokenServer
|
||||
tokenServiceOnion string
|
||||
lastKnownSignature []byte
|
||||
}
|
||||
|
||||
// NewInstance Client a new TokenBoardApp
|
||||
func (ta *TokenBoardClient) NewInstance() tapir.Application {
|
||||
tba := new(TokenBoardClient)
|
||||
tba.serverSyncedHandler = ta.serverSyncedHandler
|
||||
tba.serverClosedHandler = ta.serverClosedHandler
|
||||
tba.receiveGroupMessageHandler = ta.receiveGroupMessageHandler
|
||||
tba.acn = ta.acn
|
||||
tba.tokenService = ta.tokenService
|
||||
tba.tokenServiceOnion = ta.tokenServiceOnion
|
||||
tba.lastKnownSignature = ta.lastKnownSignature
|
||||
return tba
|
||||
}
|
||||
|
||||
|
@ -72,14 +78,15 @@ func (ta *TokenBoardClient) Listen() {
|
|||
data := ta.connection.Expect()
|
||||
if len(data) == 0 {
|
||||
log.Debugf("Server closed the connection...")
|
||||
ta.serverClosedHandler(ta.connection.Hostname())
|
||||
return // connection is closed
|
||||
}
|
||||
|
||||
// We always expect the server to follow protocol, and the second it doesn't we close the connection
|
||||
// TODO issue an error so the client is aware
|
||||
var message groups.Message
|
||||
if err := json.Unmarshal(data, &message); err != nil {
|
||||
log.Debugf("Server sent an unexpected message, closing the connection: %v", err)
|
||||
ta.serverClosedHandler(ta.connection.Hostname())
|
||||
ta.connection.Close()
|
||||
return
|
||||
}
|
||||
|
@ -89,8 +96,8 @@ func (ta *TokenBoardClient) Listen() {
|
|||
if message.NewMessage != nil {
|
||||
ta.receiveGroupMessageHandler(ta.connection.Hostname(), &message.NewMessage.EGM)
|
||||
} else {
|
||||
// TODO: Send this error to the UI
|
||||
log.Debugf("Server sent an unexpected NewMessage, closing the connection: %s", data)
|
||||
ta.serverClosedHandler(ta.connection.Hostname())
|
||||
ta.connection.Close()
|
||||
return
|
||||
}
|
||||
|
@ -101,12 +108,21 @@ func (ta *TokenBoardClient) Listen() {
|
|||
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
|
||||
for i := 0; i < message.ReplayResult.NumMessages; i++ {
|
||||
data := ta.connection.Expect()
|
||||
|
||||
if len(data) == 0 {
|
||||
log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection")
|
||||
ta.serverClosedHandler(ta.connection.Hostname())
|
||||
ta.connection.Close()
|
||||
return
|
||||
}
|
||||
|
||||
egm := &groups.EncryptedGroupMessage{}
|
||||
if err := json.Unmarshal(data, egm); err == nil {
|
||||
ta.receiveGroupMessageHandler(ta.connection.Hostname(), egm)
|
||||
ta.lastKnownSignature = egm.Signature
|
||||
} else {
|
||||
// TODO: Send this error to the UI
|
||||
log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection: %v", err)
|
||||
ta.serverClosedHandler(ta.connection.Hostname())
|
||||
ta.connection.Close()
|
||||
return
|
||||
}
|
||||
|
@ -119,8 +135,7 @@ func (ta *TokenBoardClient) Listen() {
|
|||
|
||||
// Replay posts a Replay Message to the server.
|
||||
func (ta *TokenBoardClient) Replay() {
|
||||
// TODO - Allow configurable ranges
|
||||
data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: []byte{}}})
|
||||
data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: ta.lastKnownSignature}})
|
||||
ta.connection.Send(data)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue