Merge pull request 'Handle Server Disconnections, and Partial Syncing' (#354) from servers into master
Reviewed-on: #354
This commit is contained in:
commit
014252f4b5
|
@ -34,6 +34,7 @@ type contactRetry struct {
|
||||||
running bool
|
running bool
|
||||||
breakChan chan bool
|
breakChan chan bool
|
||||||
onion string
|
onion string
|
||||||
|
lastCheck time.Time
|
||||||
|
|
||||||
connections sync.Map //[string]*contact
|
connections sync.Map //[string]*contact
|
||||||
}
|
}
|
||||||
|
@ -59,6 +60,10 @@ func (cr *contactRetry) run() {
|
||||||
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
if time.Since(cr.lastCheck) > tickTime {
|
||||||
|
cr.retryDisconnected()
|
||||||
|
cr.lastCheck = time.Now()
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case e := <-cr.queue.OutChan():
|
case e := <-cr.queue.OutChan():
|
||||||
switch e.EventType {
|
switch e.EventType {
|
||||||
|
@ -94,22 +99,7 @@ func (cr *contactRetry) run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-time.After(tickTime):
|
case <-time.After(tickTime):
|
||||||
cr.connections.Range(func(k, v interface{}) bool {
|
continue
|
||||||
p := v.(*contact)
|
|
||||||
|
|
||||||
if p.state == connections.DISCONNECTED {
|
|
||||||
p.ticks++
|
|
||||||
if p.ticks == p.backoff {
|
|
||||||
p.ticks = 0
|
|
||||||
if cr.networkUp {
|
|
||||||
if p.ctype == peerConn {
|
|
||||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
case <-cr.breakChan:
|
case <-cr.breakChan:
|
||||||
cr.running = false
|
cr.running = false
|
||||||
|
@ -118,9 +108,31 @@ func (cr *contactRetry) run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cr *contactRetry) retryDisconnected() {
|
||||||
|
cr.connections.Range(func(k, v interface{}) bool {
|
||||||
|
p := v.(*contact)
|
||||||
|
|
||||||
|
if p.state == connections.DISCONNECTED {
|
||||||
|
p.ticks++
|
||||||
|
if p.ticks >= p.backoff {
|
||||||
|
p.ticks = 0
|
||||||
|
if cr.networkUp {
|
||||||
|
if p.ctype == peerConn {
|
||||||
|
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
||||||
|
}
|
||||||
|
if p.ctype == serverConn {
|
||||||
|
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
||||||
if _, exists := cr.connections.Load(id); !exists {
|
if _, exists := cr.connections.Load(id); !exists {
|
||||||
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype}
|
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 0, ticks: 0, ctype: ctype}
|
||||||
cr.connections.Store(id, p)
|
cr.connections.Store(id, p)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -129,7 +141,9 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
|
||||||
p := pinf.(*contact)
|
p := pinf.(*contact)
|
||||||
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
|
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
|
||||||
p.state = connections.DISCONNECTED
|
p.state = connections.DISCONNECTED
|
||||||
if p.backoff < maxBakoff {
|
if p.backoff == 0 {
|
||||||
|
p.backoff = 1
|
||||||
|
} else if p.backoff < maxBakoff {
|
||||||
p.backoff *= 2
|
p.backoff *= 2
|
||||||
}
|
}
|
||||||
p.ticks = 0
|
p.ticks = 0
|
||||||
|
@ -137,7 +151,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
|
||||||
p.state = state
|
p.state = state
|
||||||
} else if state == connections.AUTHENTICATED {
|
} else if state == connections.AUTHENTICATED {
|
||||||
p.state = state
|
p.state = state
|
||||||
p.backoff = 1
|
p.backoff = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const lastKnownSignature = "LastKnowSignature"
|
||||||
|
|
||||||
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
|
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
|
||||||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
|
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
|
||||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
|
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
|
||||||
|
@ -456,7 +458,11 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
||||||
tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
|
tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
|
||||||
tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
|
tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
|
||||||
if yExists && onionExists {
|
if yExists && onionExists {
|
||||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion}))
|
signature, exists := cp.GetContactAttribute(onion, lastKnownSignature)
|
||||||
|
if !exists {
|
||||||
|
signature = base64.StdEncoding.EncodeToString([]byte{})
|
||||||
|
}
|
||||||
|
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -671,9 +677,11 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
case event.EncryptedGroupMessage:
|
case event.EncryptedGroupMessage:
|
||||||
// If successful, a side effect is the message is added to the group's timeline
|
// If successful, a side effect is the message is added to the group's timeline
|
||||||
cp.mutex.Lock()
|
|
||||||
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||||
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||||
|
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
|
||||||
|
cp.mutex.Lock()
|
||||||
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
|
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
if ok && !seen {
|
if ok && !seen {
|
||||||
|
|
|
@ -128,7 +128,12 @@ func (e *engine) eventHandler() {
|
||||||
case event.InvitePeerToGroup:
|
case event.InvitePeerToGroup:
|
||||||
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
|
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
|
||||||
case event.JoinServer:
|
case event.JoinServer:
|
||||||
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY])
|
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||||
|
if err != nil {
|
||||||
|
// will result in a full sync
|
||||||
|
signature = []byte{}
|
||||||
|
}
|
||||||
|
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||||
case event.LeaveServer:
|
case event.LeaveServer:
|
||||||
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
|
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -270,7 +275,7 @@ func (e *engine) peerWithOnion(onion string) {
|
||||||
|
|
||||||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||||
// needs to be run in a goroutine as will block on Open.
|
// needs to be run in a goroutine as will block on Open.
|
||||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) {
|
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||||
|
|
||||||
service, exists := e.ephemeralServices.Load(onion)
|
service, exists := e.ephemeralServices.Load(onion)
|
||||||
if exists {
|
if exists {
|
||||||
|
@ -292,7 +297,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
|
||||||
|
|
||||||
Y := ristretto255.NewElement()
|
Y := ristretto255.NewElement()
|
||||||
Y.UnmarshalText([]byte(tokenServerY))
|
Y.UnmarshalText([]byte(tokenServerY))
|
||||||
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, e.receiveGroupMessage, e.serverSynced))
|
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverSynced, e.serverDisconnected))
|
||||||
e.ephemeralServices.Store(onion, ephemeralService)
|
e.ephemeralServices.Store(onion, ephemeralService)
|
||||||
// If we are already connected...check if we are authed and issue an auth event
|
// If we are already connected...check if we are authed and issue an auth event
|
||||||
// (This allows the ui to be stateless)
|
// (This allows the ui to be stateless)
|
||||||
|
@ -361,7 +366,6 @@ func (e *engine) serverConnected(onion string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *engine) serverSynced(onion string) {
|
func (e *engine) serverSynced(onion string) {
|
||||||
log.Debugf("SERVER SYNCED: %v", onion)
|
|
||||||
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||||
event.GroupServer: onion,
|
event.GroupServer: onion,
|
||||||
event.ConnectionState: ConnectionStateName[SYNCED],
|
event.ConnectionState: ConnectionStateName[SYNCED],
|
||||||
|
@ -435,7 +439,7 @@ func (e *engine) deleteConnection(id string) {
|
||||||
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
|
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
|
||||||
// Publish Event so that a Profile Engine can deal with it.
|
// Publish Event so that a Profile Engine can deal with it.
|
||||||
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
|
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
|
||||||
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
|
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.GroupServer: server, event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendMessageToGroup attempts to sent the given message to the given group id.
|
// sendMessageToGroup attempts to sent the given message to the given group id.
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTokenBoardClient generates a new Client for Token Board
|
// NewTokenBoardClient generates a new Client for Token Board
|
||||||
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string)) tapir.Application {
|
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, lastKnownSignature []byte, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string), serverClosedHandler func(server string)) tapir.Application {
|
||||||
tba := new(TokenBoardClient)
|
tba := new(TokenBoardClient)
|
||||||
tba.acn = acn
|
tba.acn = acn
|
||||||
tba.tokenService = privacypass.NewTokenServer()
|
tba.tokenService = privacypass.NewTokenServer()
|
||||||
|
@ -23,6 +23,8 @@ func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenSer
|
||||||
tba.tokenServiceOnion = tokenServiceOnion
|
tba.tokenServiceOnion = tokenServiceOnion
|
||||||
tba.receiveGroupMessageHandler = groupMessageHandler
|
tba.receiveGroupMessageHandler = groupMessageHandler
|
||||||
tba.serverSyncedHandler = serverSyncedHandler
|
tba.serverSyncedHandler = serverSyncedHandler
|
||||||
|
tba.serverClosedHandler = serverClosedHandler
|
||||||
|
tba.lastKnownSignature = lastKnownSignature
|
||||||
return tba
|
return tba
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,22 +34,26 @@ type TokenBoardClient struct {
|
||||||
connection tapir.Connection
|
connection tapir.Connection
|
||||||
receiveGroupMessageHandler func(server string, gm *groups.EncryptedGroupMessage)
|
receiveGroupMessageHandler func(server string, gm *groups.EncryptedGroupMessage)
|
||||||
serverSyncedHandler func(server string)
|
serverSyncedHandler func(server string)
|
||||||
|
serverClosedHandler func(server string)
|
||||||
|
|
||||||
// Token service handling
|
// Token service handling
|
||||||
acn connectivity.ACN
|
acn connectivity.ACN
|
||||||
tokens []*privacypass.Token
|
tokens []*privacypass.Token
|
||||||
tokenService *privacypass.TokenServer
|
tokenService *privacypass.TokenServer
|
||||||
tokenServiceOnion string
|
tokenServiceOnion string
|
||||||
|
lastKnownSignature []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInstance Client a new TokenBoardApp
|
// NewInstance Client a new TokenBoardApp
|
||||||
func (ta *TokenBoardClient) NewInstance() tapir.Application {
|
func (ta *TokenBoardClient) NewInstance() tapir.Application {
|
||||||
tba := new(TokenBoardClient)
|
tba := new(TokenBoardClient)
|
||||||
tba.serverSyncedHandler = ta.serverSyncedHandler
|
tba.serverSyncedHandler = ta.serverSyncedHandler
|
||||||
|
tba.serverClosedHandler = ta.serverClosedHandler
|
||||||
tba.receiveGroupMessageHandler = ta.receiveGroupMessageHandler
|
tba.receiveGroupMessageHandler = ta.receiveGroupMessageHandler
|
||||||
tba.acn = ta.acn
|
tba.acn = ta.acn
|
||||||
tba.tokenService = ta.tokenService
|
tba.tokenService = ta.tokenService
|
||||||
tba.tokenServiceOnion = ta.tokenServiceOnion
|
tba.tokenServiceOnion = ta.tokenServiceOnion
|
||||||
|
tba.lastKnownSignature = ta.lastKnownSignature
|
||||||
return tba
|
return tba
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,14 +78,15 @@ func (ta *TokenBoardClient) Listen() {
|
||||||
data := ta.connection.Expect()
|
data := ta.connection.Expect()
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
log.Debugf("Server closed the connection...")
|
log.Debugf("Server closed the connection...")
|
||||||
|
ta.serverClosedHandler(ta.connection.Hostname())
|
||||||
return // connection is closed
|
return // connection is closed
|
||||||
}
|
}
|
||||||
|
|
||||||
// We always expect the server to follow protocol, and the second it doesn't we close the connection
|
// We always expect the server to follow protocol, and the second it doesn't we close the connection
|
||||||
// TODO issue an error so the client is aware
|
|
||||||
var message groups.Message
|
var message groups.Message
|
||||||
if err := json.Unmarshal(data, &message); err != nil {
|
if err := json.Unmarshal(data, &message); err != nil {
|
||||||
log.Debugf("Server sent an unexpected message, closing the connection: %v", err)
|
log.Debugf("Server sent an unexpected message, closing the connection: %v", err)
|
||||||
|
ta.serverClosedHandler(ta.connection.Hostname())
|
||||||
ta.connection.Close()
|
ta.connection.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -89,8 +96,8 @@ func (ta *TokenBoardClient) Listen() {
|
||||||
if message.NewMessage != nil {
|
if message.NewMessage != nil {
|
||||||
ta.receiveGroupMessageHandler(ta.connection.Hostname(), &message.NewMessage.EGM)
|
ta.receiveGroupMessageHandler(ta.connection.Hostname(), &message.NewMessage.EGM)
|
||||||
} else {
|
} else {
|
||||||
// TODO: Send this error to the UI
|
|
||||||
log.Debugf("Server sent an unexpected NewMessage, closing the connection: %s", data)
|
log.Debugf("Server sent an unexpected NewMessage, closing the connection: %s", data)
|
||||||
|
ta.serverClosedHandler(ta.connection.Hostname())
|
||||||
ta.connection.Close()
|
ta.connection.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -101,12 +108,21 @@ func (ta *TokenBoardClient) Listen() {
|
||||||
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
|
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
|
||||||
for i := 0; i < message.ReplayResult.NumMessages; i++ {
|
for i := 0; i < message.ReplayResult.NumMessages; i++ {
|
||||||
data := ta.connection.Expect()
|
data := ta.connection.Expect()
|
||||||
|
|
||||||
|
if len(data) == 0 {
|
||||||
|
log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection")
|
||||||
|
ta.serverClosedHandler(ta.connection.Hostname())
|
||||||
|
ta.connection.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
egm := &groups.EncryptedGroupMessage{}
|
egm := &groups.EncryptedGroupMessage{}
|
||||||
if err := json.Unmarshal(data, egm); err == nil {
|
if err := json.Unmarshal(data, egm); err == nil {
|
||||||
ta.receiveGroupMessageHandler(ta.connection.Hostname(), egm)
|
ta.receiveGroupMessageHandler(ta.connection.Hostname(), egm)
|
||||||
|
ta.lastKnownSignature = egm.Signature
|
||||||
} else {
|
} else {
|
||||||
// TODO: Send this error to the UI
|
|
||||||
log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection: %v", err)
|
log.Debugf("Server sent an unexpected EncryptedGroupMessage, closing the connection: %v", err)
|
||||||
|
ta.serverClosedHandler(ta.connection.Hostname())
|
||||||
ta.connection.Close()
|
ta.connection.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -119,8 +135,7 @@ func (ta *TokenBoardClient) Listen() {
|
||||||
|
|
||||||
// Replay posts a Replay Message to the server.
|
// Replay posts a Replay Message to the server.
|
||||||
func (ta *TokenBoardClient) Replay() {
|
func (ta *TokenBoardClient) Replay() {
|
||||||
// TODO - Allow configurable ranges
|
data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: ta.lastKnownSignature}})
|
||||||
data, _ := json.Marshal(groups.Message{MessageType: groups.ReplayRequestMessage, ReplayRequest: &groups.ReplayRequest{LastCommit: []byte{}}})
|
|
||||||
ta.connection.Send(data)
|
ta.connection.Send(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue