From 0ba45cd59a49e4876ac646d9ecbccb75e2fe96cc Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 23 Nov 2022 08:01:22 -0800 Subject: [PATCH] Move Cached Token Loading into Server Join (from SMTG) --- peer/cwtch_peer.go | 21 +++++++------- protocol/connections/engine.go | 29 ++++++++++---------- protocol/connections/engine_token_handler.go | 2 +- protocol/connections/token_manager.go | 4 +-- 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 8109bd4..e67d999 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -303,16 +303,9 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) log.Debugf("sending message to group: %v", conversationInfo.ID) id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text)) if err == nil { - serverCI, err := cp.FetchConversationInfo(group.GroupServer) - if err == nil { - cachedTokensJson, hasCachedTokens := serverCI.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") - if hasCachedTokens { - log.Debugf("using cached tokens for %v", conversationInfo.Handle) - } - ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig), event.CachedTokens: cachedTokensJson}) - cp.eventBus.Publish(ev) - return id, nil - } + ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) + cp.eventBus.Publish(ev) + return id, nil } return -1, err } @@ -951,7 +944,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error { if !exists { signature = base64.StdEncoding.EncodeToString([]byte{}) } - cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature})) + + cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") + if hasCachedTokens { + log.Debugf("using cached tokens for %v", ci.Handle) + } + + cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson})) return nil } return errors.New("no keys found for server connection") diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 744750d..57b6fc3 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -169,7 +169,16 @@ func (e *engine) eventHandler() { // will result in a full sync signature = []byte{} } - go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + // if we have been sent cached tokens, also deserialize them + cachedTokensJson := ev.Data[event.CachedTokens] + var cachedTokens []*privacypass.Token + if len(cachedTokensJson) != 0 { + json.Unmarshal([]byte(cachedTokensJson), &cachedTokens) + } + + // create a new token handler... + e.NewTokenHandler(ev.Data[event.ServerTokenOnion], cachedTokens) + go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature, cachedTokens) case event.MakeAntispamPayment: go e.makeAntispamPayment(ev.Data[event.GroupServer]) case event.LeaveServer: @@ -183,15 +192,8 @@ func (e *engine) eventHandler() { ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - // if we have been sent cached tokens, also deserialize them - cachedTokensJson := ev.Data[event.CachedTokens] - var cachedTokens []*privacypass.Token - if len(cachedTokensJson) != 0 { - json.Unmarshal([]byte(cachedTokensJson), &cachedTokens) - } - // launch a goroutine to post to the server - go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0, cachedTokens) + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] @@ -388,7 +390,7 @@ func (e *engine) makeAntispamPayment(onion string) { // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. -func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { +func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte, cachedTokens []*privacypass.Token) { e.ephemeralServicesLock.Lock() _, exists := e.ephemeralServices[onion] @@ -599,7 +601,7 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes } // sendMessageToGroup attempts to sent the given message to the given group id. -func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int, cachedTokens []*privacypass.Token) { +func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) // rather than trying to keep all that logic in method we simply back-off and try again // but if we fail more than 5 times then we report back to the client so they can investigate other options. @@ -624,9 +626,6 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) - - tokenApp.tokenBoardHandler.NewTokenHandler(tokenApp.tokenServiceOnion, cachedTokens) - if ok { if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent { // we failed to post, probably because we ran out of tokens... so make a payment @@ -635,7 +634,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si time.Sleep(time.Second * 5) // try again log.Debugf("sending message to group error attempt: %v", attempts) - e.sendMessageToGroup(groupID, server, ct, sig, attempts+1, []*privacypass.Token{}) + e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) } else { if numtokens < 5 { go tokenApp.PurchaseTokens() diff --git a/protocol/connections/engine_token_handler.go b/protocol/connections/engine_token_handler.go index 6585029..ec02adc 100644 --- a/protocol/connections/engine_token_handler.go +++ b/protocol/connections/engine_token_handler.go @@ -39,7 +39,7 @@ func (e *engine) ServerClosedHandler(server string) { func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) { tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) - tokenManager.NewTokens(tokens) + tokenManager.StoreNewTokens(tokens) e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())})) } diff --git a/protocol/connections/token_manager.go b/protocol/connections/token_manager.go index 12ab0a1..bba540b 100644 --- a/protocol/connections/token_manager.go +++ b/protocol/connections/token_manager.go @@ -20,8 +20,8 @@ func NewTokenManager() *TokenManager { return tm } -// NewTokens adds tokens to the internal list managed by this TokenManager -func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) { +// StoreNewTokens adds tokens to the internal list managed by this TokenManager +func (tm *TokenManager) StoreNewTokens(tokens []*privacypass.Token) { tm.lock.Lock() defer tm.lock.Unlock() log.Debugf("acquired %v new tokens", tokens)