Move Cached Token Loading into Server Join (from SMTG)
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2022-11-23 08:01:22 -08:00
parent 4324ffae03
commit 0ba45cd59a
4 changed files with 27 additions and 29 deletions

View File

@ -303,16 +303,9 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
log.Debugf("sending message to group: %v", conversationInfo.ID)
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text))
if err == nil {
serverCI, err := cp.FetchConversationInfo(group.GroupServer)
if err == nil {
cachedTokensJson, hasCachedTokens := serverCI.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", conversationInfo.Handle)
}
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig), event.CachedTokens: cachedTokensJson})
cp.eventBus.Publish(ev)
return id, nil
}
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
cp.eventBus.Publish(ev)
return id, nil
}
return -1, err
}
@ -951,7 +944,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{})
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil
}
return errors.New("no keys found for server connection")

View File

@ -169,7 +169,16 @@ func (e *engine) eventHandler() {
// will result in a full sync
signature = []byte{}
}
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
// if we have been sent cached tokens, also deserialize them
cachedTokensJson := ev.Data[event.CachedTokens]
var cachedTokens []*privacypass.Token
if len(cachedTokensJson) != 0 {
json.Unmarshal([]byte(cachedTokensJson), &cachedTokens)
}
// create a new token handler...
e.NewTokenHandler(ev.Data[event.ServerTokenOnion], cachedTokens)
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature, cachedTokens)
case event.MakeAntispamPayment:
go e.makeAntispamPayment(ev.Data[event.GroupServer])
case event.LeaveServer:
@ -183,15 +192,8 @@ func (e *engine) eventHandler() {
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
// if we have been sent cached tokens, also deserialize them
cachedTokensJson := ev.Data[event.CachedTokens]
var cachedTokens []*privacypass.Token
if len(cachedTokensJson) != 0 {
json.Unmarshal([]byte(cachedTokensJson), &cachedTokens)
}
// launch a goroutine to post to the server
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0, cachedTokens)
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0)
case event.SendMessageToPeer:
// TODO: remove this passthrough once the UI is integrated.
context, ok := ev.Data[event.EventContext]
@ -388,7 +390,7 @@ func (e *engine) makeAntispamPayment(onion string) {
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte, cachedTokens []*privacypass.Token) {
e.ephemeralServicesLock.Lock()
_, exists := e.ephemeralServices[onion]
@ -599,7 +601,7 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
}
// sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int, cachedTokens []*privacypass.Token) {
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) {
// sending to groups can fail for a few reasons (slow server, not enough tokens, etc.)
// rather than trying to keep all that logic in method we simply back-off and try again
// but if we fail more than 5 times then we report back to the client so they can investigate other options.
@ -624,9 +626,6 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
tokenApp.tokenBoardHandler.NewTokenHandler(tokenApp.tokenServiceOnion, cachedTokens)
if ok {
if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent {
// we failed to post, probably because we ran out of tokens... so make a payment
@ -635,7 +634,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
time.Sleep(time.Second * 5)
// try again
log.Debugf("sending message to group error attempt: %v", attempts)
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1, []*privacypass.Token{})
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1)
} else {
if numtokens < 5 {
go tokenApp.PurchaseTokens()

View File

@ -39,7 +39,7 @@ func (e *engine) ServerClosedHandler(server string) {
func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
tokenManager.NewTokens(tokens)
tokenManager.StoreNewTokens(tokens)
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
}

View File

@ -20,8 +20,8 @@ func NewTokenManager() *TokenManager {
return tm
}
// NewTokens adds tokens to the internal list managed by this TokenManager
func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) {
// StoreNewTokens adds tokens to the internal list managed by this TokenManager
func (tm *TokenManager) StoreNewTokens(tokens []*privacypass.Token) {
tm.lock.Lock()
defer tm.lock.Unlock()
log.Debugf("acquired %v new tokens", tokens)