From c66561d84fd62ba977b19cdd341feaf309e8d008 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 11 Oct 2022 10:58:10 -0700 Subject: [PATCH] Allow using cached tokens for local integ testing (also new TORCACHE env for integ testing to speed up bootstrapping locally) --- event/common.go | 1 + peer/cwtch_peer.go | 26 ++++++++-- peer/profile_interface.go | 9 ++++ protocol/connections/engine.go | 23 ++++++-- protocol/connections/engine_token_handler.go | 10 +++- protocol/connections/token_manager.go | 24 +++++++-- protocol/connections/tokenboardclientapp.go | 24 ++++++++- protocol/groups/common.go | 6 +++ testing/cwtch_peer_server_integration_test.go | 52 ++++++++++++++++--- tools/cwtch_tools.go | 4 ++ 10 files changed, 156 insertions(+), 23 deletions(-) diff --git a/event/common.go b/event/common.go index d78d861..5756314 100644 --- a/event/common.go +++ b/event/common.go @@ -219,6 +219,7 @@ const ( RemotePeer = Field("RemotePeer") Ciphertext = Field("Ciphertext") Signature = Field("Signature") + CachedTokens = Field("CachedTokens") PreviousSignature = Field("PreviousSignature") TimestampSent = Field("TimestampSent") TimestampReceived = Field("TimestampReceived") diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index e16bf9e..14c85c0 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" @@ -75,6 +76,17 @@ type cwtchPeer struct { eventBus event.Manager } +func (cp *cwtchPeer) LoadCachedTokens(tokenServer string, tokens []*privacypass.Token) { + ci, err := cp.FetchConversationInfo(tokenServer) + if ci != nil && err == nil { + // Overwrite any existing tokens.. + tokenPath := attr.LocalScope.ConstructScopedZonedPath(attr.ServerZone.ConstructZonedPath("tokens")) + data, _ := json.Marshal(tokens) + log.Debugf("storing cached tokens for %v", tokenServer) + cp.SetConversationAttribute(ci.ID, tokenPath, string(data)) + } +} + func (cp *cwtchPeer) Export(file string) error { cp.mutex.Lock() defer cp.mutex.Unlock() @@ -279,9 +291,16 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) log.Debugf("sending message to group: %v", conversationInfo.ID) id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text)) if err == nil { - ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) - cp.eventBus.Publish(ev) - return id, nil + serverCI, err := cp.FetchConversationInfo(group.GroupServer) + if err == nil { + cachedTokensJson, hasCachedTokens := serverCI.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") + if hasCachedTokens { + log.Debugf("using cached tokens for %v", conversationInfo.Handle) + } + ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig), event.CachedTokens: cachedTokensJson}) + cp.eventBus.Publish(ev) + return id, nil + } } return -1, err } @@ -1395,6 +1414,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature // by the given handle and attempts to mark the message as errored. returns error on failure // to either find the contact or the associated message func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error { + ci, err := cp.FetchConversationInfo(handle) // We should *never* received an error for a conversation that doesn't exist... if ci != nil && err == nil { diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 6b12310..9e95a56 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -5,6 +5,7 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity" ) @@ -116,9 +117,17 @@ type CwtchPeer interface { GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error + // File Sharing APIS + // TODO move these to feature protected interfaces ShareFile(fileKey string, serializedManifest string) StopFileShare(fileKey string) StopAllFileShares() + + // Server Token APIS + // TODO move these to feature protected interfaces + LoadCachedTokens(tokenServer string, tokens []*privacypass.Token) + + // Profile Management CheckPassword(password string) bool ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error Export(file string) error diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 860ab02..1335a73 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "strconv" "strings" "sync" @@ -181,7 +182,16 @@ func (e *engine) eventHandler() { case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) + + // if we have been sent cached tokens, also deserialize them + cachedTokensJson := ev.Data[event.CachedTokens] + var cachedTokens []*privacypass.Token + if len(cachedTokensJson) != 0 { + json.Unmarshal([]byte(cachedTokensJson), &cachedTokens) + } + + // launch a goroutine to post to the server + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0, cachedTokens) case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] @@ -366,7 +376,7 @@ func (e *engine) makeAntispamPayment(onion string) { if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { - tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) if tokenManager.NumTokens() < 5 { @@ -589,7 +599,7 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes } // sendMessageToGroup attempts to sent the given message to the given group id. -func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { +func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int, cachedTokens []*privacypass.Token) { // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) // rather than trying to keep all that logic in method we simply back-off and try again // but if we fail more than 5 times then we report back to the client so they can investigate other options. @@ -614,15 +624,18 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) + + tokenApp.tokenBoardHandler.NewTokenHandler(tokenApp.tokenServiceOnion, cachedTokens) + if ok { - if spent, numtokens := tokenApp.Post(ct, sig); !spent { + if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent { // we failed to post, probably because we ran out of tokens... so make a payment go tokenApp.PurchaseTokens() // backoff time.Sleep(time.Second * 5) // try again log.Debugf("sending message to group error attempt: %v", attempts) - e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) + e.sendMessageToGroup(groupID, server, ct, sig, attempts+1, []*privacypass.Token{}) } else { if numtokens < 5 { go tokenApp.PurchaseTokens() diff --git a/protocol/connections/engine_token_handler.go b/protocol/connections/engine_token_handler.go index 604f3b0..6585029 100644 --- a/protocol/connections/engine_token_handler.go +++ b/protocol/connections/engine_token_handler.go @@ -3,6 +3,7 @@ package connections import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/groups" + "encoding/base64" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "strconv" ) @@ -14,6 +15,11 @@ func (e *engine) GroupMessageHandler(server string, gm *groups.EncryptedGroupMes e.receiveGroupMessage(server, gm) } +// PostingFailed notifies a peer that a message failed to post +func (e *engine) PostingFailed(group string, sig []byte) { + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: group, event.Error: "failed to post message", event.Signature: base64.StdEncoding.EncodeToString(sig)})) +} + // ServerAuthedHandler is notified when a server has successfully authed func (e *engine) ServerAuthedHandler(server string) { e.serverAuthed(server) @@ -31,7 +37,7 @@ func (e *engine) ServerClosedHandler(server string) { // NewTokenHandler is notified after a successful token acquisition func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) { - tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager)) + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) tokenManager.NewTokens(tokens) e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())})) @@ -39,7 +45,7 @@ func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Toke // FetchToken is notified when a server requires a new token from the client func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) { - tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager)) + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) token, numTokens, err := tokenManager.FetchToken() e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)})) diff --git a/protocol/connections/token_manager.go b/protocol/connections/token_manager.go index 3c1c61d..4e0ed7f 100644 --- a/protocol/connections/token_manager.go +++ b/protocol/connections/token_manager.go @@ -1,6 +1,7 @@ package connections import ( + "encoding/json" "errors" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "sync" @@ -9,14 +10,23 @@ import ( // TokenManager maintains a list of tokens associated with a single TokenServer type TokenManager struct { lock sync.Mutex - tokens []*privacypass.Token + tokens map[string]bool +} + +func NewTokenManager() *TokenManager { + tm := new(TokenManager) + tm.tokens = make(map[string]bool) + return tm } // NewTokens adds tokens to the internal list managed by this TokenManager func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) { tm.lock.Lock() defer tm.lock.Unlock() - tm.tokens = append(tm.tokens, tokens...) + for _, token := range tokens { + serialized, _ := json.Marshal(token) + tm.tokens[string(serialized)] = true + } } // NumTokens returns the current number of tokens @@ -34,7 +44,11 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) { if len(tm.tokens) == 0 { return nil, 0, errors.New("no more tokens") } - token := tm.tokens[0] - tm.tokens = tm.tokens[1:] - return token, len(tm.tokens), nil + for serializedToken := range tm.tokens { + delete(tm.tokens, serializedToken) + token := new(privacypass.Token) + json.Unmarshal([]byte(serializedToken), token) + return token, len(tm.tokens), nil + } + return nil, 0, errors.New("no more tokens") } diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 9704421..0b300ba 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -9,6 +9,7 @@ import ( "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "github.com/gtank/ristretto255" + "sync" ) // TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board @@ -19,6 +20,7 @@ type TokenBoardHandler interface { ServerSyncedHandler(server string) ServerClosedHandler(server string) NewTokenHandler(tokenService string, tokens []*privacypass.Token) + PostingFailed(server string, sig []byte) FetchToken(tokenService string) (*privacypass.Token, int, error) } @@ -46,6 +48,9 @@ type TokenBoardClient struct { tokenService *privacypass.TokenServer tokenServiceOnion string lastKnownSignature []byte + + postLock sync.Mutex + postQueue []groups.CachedEncryptedGroupMessage } // NewInstance Client a new TokenBoardApp @@ -107,7 +112,18 @@ func (ta *TokenBoardClient) Listen() { return } case groups.PostResultMessage: - // TODO handle failure + ta.postLock.Lock() + egm := ta.postQueue[0] + ta.postQueue = ta.postQueue[1:] + ta.postLock.Unlock() + if !message.PostResult.Success { + // Retry using another token + posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature) + // if posting failed... + if !posted { + ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature) + } + } case groups.ReplayResultMessage: if message.ReplayResult != nil { log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages) @@ -151,13 +167,17 @@ func (ta *TokenBoardClient) PurchaseTokens() { } // Post sends a Post Request to the server -func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) { +func (ta *TokenBoardClient) Post(group string, ct []byte, sig []byte) (bool, int) { egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig} token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) if err == nil { data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}}) + ta.postLock.Lock() + // ONLY put group in the EGM as a cache / for error reporting... + ta.postQueue = append(ta.postQueue, groups.CachedEncryptedGroupMessage{Group: group, EncryptedGroupMessage: egm}) log.Debugf("Message Length: %s %v", data, len(data)) err := ta.connection.Send(data) + ta.postLock.Unlock() if err != nil { return false, numTokens } diff --git a/protocol/groups/common.go b/protocol/groups/common.go index 718765c..5d4a804 100644 --- a/protocol/groups/common.go +++ b/protocol/groups/common.go @@ -39,6 +39,12 @@ type EncryptedGroupMessage struct { Signature []byte } +// CachedEncryptedGroupMessage provides an encapsulation of the encrypted group message for local caching / error reporting +type CachedEncryptedGroupMessage struct { + EncryptedGroupMessage + Group string +} + // ToBytes converts the encrypted group message to a set of bytes for serialization func (egm EncryptedGroupMessage) ToBytes() []byte { data, _ := json.Marshal(egm) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index e066ede..ce554ca 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -12,6 +12,7 @@ import ( "cwtch.im/cwtch/protocol/connections" "encoding/base64" "encoding/json" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" _ "github.com/mutecomm/go-sqlcipher/v4" @@ -19,6 +20,7 @@ import ( "os" "os/user" "path" + "path/filepath" "runtime" "runtime/pprof" "testing" @@ -51,6 +53,18 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co } } +func checkAndLoadTokens() []*privacypass.Token { + var tokens []*privacypass.Token + data, err := os.ReadFile("../tokens") + if err == nil { + err := json.Unmarshal(data, &tokens) + if err != nil { + log.Errorf("could not load tokens from file") + } + } + return tokens +} + func TestCwtchPeerIntegration(t *testing.T) { // Goroutine Monitoring Start.. @@ -62,6 +76,13 @@ func TestCwtchPeerIntegration(t *testing.T) { log.ExcludeFromPattern("outbound/3dhauthchannel") log.ExcludeFromPattern("event/eventmanager") log.ExcludeFromPattern("tapir") + + // checking if we should use the token cache + cachedTokens := checkAndLoadTokens() + if len(cachedTokens) > 7 { + log.Infof("using cached tokens") + } + os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") os.MkdirAll(dataDir, 0700) @@ -78,9 +99,18 @@ func TestCwtchPeerIntegration(t *testing.T) { panic(err) } + useCache := os.Getenv("TORCACHE") == "true" + torDataDir := "" - if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { - t.Fatalf("could not create data dir") + if useCache { + log.Infof("using tor cache") + torDataDir = filepath.Join(dataDir, "data-dir-torcache") + os.MkdirAll(torDataDir, 0700) + } else { + log.Infof("using clean tor data dir") + if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { + t.Fatalf("could not create data dir") + } } tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") @@ -225,10 +255,14 @@ func TestCwtchPeerIntegration(t *testing.T) { log.Infof("Alice has carol's name as '%v'\n", carolName) // Group Testing - + usedTokens := len(aliceLines) // Simulate Alice Creating a Group log.Infoln("Alice joining server...") - if _, err := alice.AddServer(string(serverKeyBundle)); err != nil { + if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil { + if len(cachedTokens) > len(aliceLines) { + alice.LoadCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)]) + } + t.Fatalf("Failed to Add Server Bundle %v", err) } @@ -260,6 +294,10 @@ func TestCwtchPeerIntegration(t *testing.T) { log.Infof("Parsed Overlay Message: %v", overlayMessage) err = bob.ImportBundle(overlayMessage.Data) log.Infof("Result of Bob Importing the Bundle from Alice: %v", err) + if len(cachedTokens) > (usedTokens + len(bobLines)) { + bob.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)]) + usedTokens += len(bobLines) + } log.Infof("Waiting for Bob to join connect to group server...") waitForConnection(t, bob, ServerAddr, connections.SYNCED) @@ -278,15 +316,17 @@ func TestCwtchPeerIntegration(t *testing.T) { checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) - // Pretend that Carol Aquires the Overlay Message through some other means... + // Pretend that Carol Acquires the Overlay Message through some other means... json.Unmarshal([]byte(message), &overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage) err = carol.ImportBundle(overlayMessage.Data) log.Infof("Result of Carol Importing the Bundle from Alice: %v", err) log.Infof("Waiting for Carol to join connect to group server...") carolGroupConversationID := 3 + if len(cachedTokens) > (usedTokens + len(carolLines)) { + carol.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)]) + } waitForConnection(t, carol, ServerAddr, connections.SYNCED) - numGoRoutinesPostCarolConnect := runtime.NumGoroutine() // Check Alice Timeline diff --git a/tools/cwtch_tools.go b/tools/cwtch_tools.go index 9375227..de94f05 100644 --- a/tools/cwtch_tools.go +++ b/tools/cwtch_tools.go @@ -128,6 +128,10 @@ func getTokens(bundle string) { type Handler struct { } +func (h Handler) PostingFailed(server string, sig []byte) { + +} + func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) { }