From c66561d84fd62ba977b19cdd341feaf309e8d008 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 11 Oct 2022 10:58:10 -0700 Subject: [PATCH 1/5] Allow using cached tokens for local integ testing (also new TORCACHE env for integ testing to speed up bootstrapping locally) --- event/common.go | 1 + peer/cwtch_peer.go | 26 ++++++++-- peer/profile_interface.go | 9 ++++ protocol/connections/engine.go | 23 ++++++-- protocol/connections/engine_token_handler.go | 10 +++- protocol/connections/token_manager.go | 24 +++++++-- protocol/connections/tokenboardclientapp.go | 24 ++++++++- protocol/groups/common.go | 6 +++ testing/cwtch_peer_server_integration_test.go | 52 ++++++++++++++++--- tools/cwtch_tools.go | 4 ++ 10 files changed, 156 insertions(+), 23 deletions(-) diff --git a/event/common.go b/event/common.go index d78d861..5756314 100644 --- a/event/common.go +++ b/event/common.go @@ -219,6 +219,7 @@ const ( RemotePeer = Field("RemotePeer") Ciphertext = Field("Ciphertext") Signature = Field("Signature") + CachedTokens = Field("CachedTokens") PreviousSignature = Field("PreviousSignature") TimestampSent = Field("TimestampSent") TimestampReceived = Field("TimestampReceived") diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index e16bf9e..14c85c0 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" @@ -75,6 +76,17 @@ type cwtchPeer struct { eventBus event.Manager } +func (cp *cwtchPeer) LoadCachedTokens(tokenServer string, tokens []*privacypass.Token) { + ci, err := cp.FetchConversationInfo(tokenServer) + if ci != nil && err == nil { + // Overwrite any existing tokens.. + tokenPath := attr.LocalScope.ConstructScopedZonedPath(attr.ServerZone.ConstructZonedPath("tokens")) + data, _ := json.Marshal(tokens) + log.Debugf("storing cached tokens for %v", tokenServer) + cp.SetConversationAttribute(ci.ID, tokenPath, string(data)) + } +} + func (cp *cwtchPeer) Export(file string) error { cp.mutex.Lock() defer cp.mutex.Unlock() @@ -279,9 +291,16 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) log.Debugf("sending message to group: %v", conversationInfo.ID) id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text)) if err == nil { - ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) - cp.eventBus.Publish(ev) - return id, nil + serverCI, err := cp.FetchConversationInfo(group.GroupServer) + if err == nil { + cachedTokensJson, hasCachedTokens := serverCI.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") + if hasCachedTokens { + log.Debugf("using cached tokens for %v", conversationInfo.Handle) + } + ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig), event.CachedTokens: cachedTokensJson}) + cp.eventBus.Publish(ev) + return id, nil + } } return -1, err } @@ -1395,6 +1414,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature // by the given handle and attempts to mark the message as errored. returns error on failure // to either find the contact or the associated message func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error { + ci, err := cp.FetchConversationInfo(handle) // We should *never* received an error for a conversation that doesn't exist... if ci != nil && err == nil { diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 6b12310..9e95a56 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -5,6 +5,7 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity" ) @@ -116,9 +117,17 @@ type CwtchPeer interface { GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error + // File Sharing APIS + // TODO move these to feature protected interfaces ShareFile(fileKey string, serializedManifest string) StopFileShare(fileKey string) StopAllFileShares() + + // Server Token APIS + // TODO move these to feature protected interfaces + LoadCachedTokens(tokenServer string, tokens []*privacypass.Token) + + // Profile Management CheckPassword(password string) bool ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error Export(file string) error diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 860ab02..1335a73 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "strconv" "strings" "sync" @@ -181,7 +182,16 @@ func (e *engine) eventHandler() { case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) + + // if we have been sent cached tokens, also deserialize them + cachedTokensJson := ev.Data[event.CachedTokens] + var cachedTokens []*privacypass.Token + if len(cachedTokensJson) != 0 { + json.Unmarshal([]byte(cachedTokensJson), &cachedTokens) + } + + // launch a goroutine to post to the server + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0, cachedTokens) case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] @@ -366,7 +376,7 @@ func (e *engine) makeAntispamPayment(onion string) { if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { - tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) if tokenManager.NumTokens() < 5 { @@ -589,7 +599,7 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes } // sendMessageToGroup attempts to sent the given message to the given group id. -func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { +func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int, cachedTokens []*privacypass.Token) { // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) // rather than trying to keep all that logic in method we simply back-off and try again // but if we fail more than 5 times then we report back to the client so they can investigate other options. @@ -614,15 +624,18 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) + + tokenApp.tokenBoardHandler.NewTokenHandler(tokenApp.tokenServiceOnion, cachedTokens) + if ok { - if spent, numtokens := tokenApp.Post(ct, sig); !spent { + if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent { // we failed to post, probably because we ran out of tokens... so make a payment go tokenApp.PurchaseTokens() // backoff time.Sleep(time.Second * 5) // try again log.Debugf("sending message to group error attempt: %v", attempts) - e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) + e.sendMessageToGroup(groupID, server, ct, sig, attempts+1, []*privacypass.Token{}) } else { if numtokens < 5 { go tokenApp.PurchaseTokens() diff --git a/protocol/connections/engine_token_handler.go b/protocol/connections/engine_token_handler.go index 604f3b0..6585029 100644 --- a/protocol/connections/engine_token_handler.go +++ b/protocol/connections/engine_token_handler.go @@ -3,6 +3,7 @@ package connections import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/groups" + "encoding/base64" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "strconv" ) @@ -14,6 +15,11 @@ func (e *engine) GroupMessageHandler(server string, gm *groups.EncryptedGroupMes e.receiveGroupMessage(server, gm) } +// PostingFailed notifies a peer that a message failed to post +func (e *engine) PostingFailed(group string, sig []byte) { + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: group, event.Error: "failed to post message", event.Signature: base64.StdEncoding.EncodeToString(sig)})) +} + // ServerAuthedHandler is notified when a server has successfully authed func (e *engine) ServerAuthedHandler(server string) { e.serverAuthed(server) @@ -31,7 +37,7 @@ func (e *engine) ServerClosedHandler(server string) { // NewTokenHandler is notified after a successful token acquisition func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) { - tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager)) + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) tokenManager.NewTokens(tokens) e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())})) @@ -39,7 +45,7 @@ func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Toke // FetchToken is notified when a server requires a new token from the client func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) { - tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager)) + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) token, numTokens, err := tokenManager.FetchToken() e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)})) diff --git a/protocol/connections/token_manager.go b/protocol/connections/token_manager.go index 3c1c61d..4e0ed7f 100644 --- a/protocol/connections/token_manager.go +++ b/protocol/connections/token_manager.go @@ -1,6 +1,7 @@ package connections import ( + "encoding/json" "errors" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "sync" @@ -9,14 +10,23 @@ import ( // TokenManager maintains a list of tokens associated with a single TokenServer type TokenManager struct { lock sync.Mutex - tokens []*privacypass.Token + tokens map[string]bool +} + +func NewTokenManager() *TokenManager { + tm := new(TokenManager) + tm.tokens = make(map[string]bool) + return tm } // NewTokens adds tokens to the internal list managed by this TokenManager func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) { tm.lock.Lock() defer tm.lock.Unlock() - tm.tokens = append(tm.tokens, tokens...) + for _, token := range tokens { + serialized, _ := json.Marshal(token) + tm.tokens[string(serialized)] = true + } } // NumTokens returns the current number of tokens @@ -34,7 +44,11 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) { if len(tm.tokens) == 0 { return nil, 0, errors.New("no more tokens") } - token := tm.tokens[0] - tm.tokens = tm.tokens[1:] - return token, len(tm.tokens), nil + for serializedToken := range tm.tokens { + delete(tm.tokens, serializedToken) + token := new(privacypass.Token) + json.Unmarshal([]byte(serializedToken), token) + return token, len(tm.tokens), nil + } + return nil, 0, errors.New("no more tokens") } diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 9704421..0b300ba 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -9,6 +9,7 @@ import ( "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "github.com/gtank/ristretto255" + "sync" ) // TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board @@ -19,6 +20,7 @@ type TokenBoardHandler interface { ServerSyncedHandler(server string) ServerClosedHandler(server string) NewTokenHandler(tokenService string, tokens []*privacypass.Token) + PostingFailed(server string, sig []byte) FetchToken(tokenService string) (*privacypass.Token, int, error) } @@ -46,6 +48,9 @@ type TokenBoardClient struct { tokenService *privacypass.TokenServer tokenServiceOnion string lastKnownSignature []byte + + postLock sync.Mutex + postQueue []groups.CachedEncryptedGroupMessage } // NewInstance Client a new TokenBoardApp @@ -107,7 +112,18 @@ func (ta *TokenBoardClient) Listen() { return } case groups.PostResultMessage: - // TODO handle failure + ta.postLock.Lock() + egm := ta.postQueue[0] + ta.postQueue = ta.postQueue[1:] + ta.postLock.Unlock() + if !message.PostResult.Success { + // Retry using another token + posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature) + // if posting failed... + if !posted { + ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature) + } + } case groups.ReplayResultMessage: if message.ReplayResult != nil { log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages) @@ -151,13 +167,17 @@ func (ta *TokenBoardClient) PurchaseTokens() { } // Post sends a Post Request to the server -func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) { +func (ta *TokenBoardClient) Post(group string, ct []byte, sig []byte) (bool, int) { egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig} token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) if err == nil { data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}}) + ta.postLock.Lock() + // ONLY put group in the EGM as a cache / for error reporting... + ta.postQueue = append(ta.postQueue, groups.CachedEncryptedGroupMessage{Group: group, EncryptedGroupMessage: egm}) log.Debugf("Message Length: %s %v", data, len(data)) err := ta.connection.Send(data) + ta.postLock.Unlock() if err != nil { return false, numTokens } diff --git a/protocol/groups/common.go b/protocol/groups/common.go index 718765c..5d4a804 100644 --- a/protocol/groups/common.go +++ b/protocol/groups/common.go @@ -39,6 +39,12 @@ type EncryptedGroupMessage struct { Signature []byte } +// CachedEncryptedGroupMessage provides an encapsulation of the encrypted group message for local caching / error reporting +type CachedEncryptedGroupMessage struct { + EncryptedGroupMessage + Group string +} + // ToBytes converts the encrypted group message to a set of bytes for serialization func (egm EncryptedGroupMessage) ToBytes() []byte { data, _ := json.Marshal(egm) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index e066ede..ce554ca 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -12,6 +12,7 @@ import ( "cwtch.im/cwtch/protocol/connections" "encoding/base64" "encoding/json" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" _ "github.com/mutecomm/go-sqlcipher/v4" @@ -19,6 +20,7 @@ import ( "os" "os/user" "path" + "path/filepath" "runtime" "runtime/pprof" "testing" @@ -51,6 +53,18 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co } } +func checkAndLoadTokens() []*privacypass.Token { + var tokens []*privacypass.Token + data, err := os.ReadFile("../tokens") + if err == nil { + err := json.Unmarshal(data, &tokens) + if err != nil { + log.Errorf("could not load tokens from file") + } + } + return tokens +} + func TestCwtchPeerIntegration(t *testing.T) { // Goroutine Monitoring Start.. @@ -62,6 +76,13 @@ func TestCwtchPeerIntegration(t *testing.T) { log.ExcludeFromPattern("outbound/3dhauthchannel") log.ExcludeFromPattern("event/eventmanager") log.ExcludeFromPattern("tapir") + + // checking if we should use the token cache + cachedTokens := checkAndLoadTokens() + if len(cachedTokens) > 7 { + log.Infof("using cached tokens") + } + os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") os.MkdirAll(dataDir, 0700) @@ -78,9 +99,18 @@ func TestCwtchPeerIntegration(t *testing.T) { panic(err) } + useCache := os.Getenv("TORCACHE") == "true" + torDataDir := "" - if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { - t.Fatalf("could not create data dir") + if useCache { + log.Infof("using tor cache") + torDataDir = filepath.Join(dataDir, "data-dir-torcache") + os.MkdirAll(torDataDir, 0700) + } else { + log.Infof("using clean tor data dir") + if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { + t.Fatalf("could not create data dir") + } } tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") @@ -225,10 +255,14 @@ func TestCwtchPeerIntegration(t *testing.T) { log.Infof("Alice has carol's name as '%v'\n", carolName) // Group Testing - + usedTokens := len(aliceLines) // Simulate Alice Creating a Group log.Infoln("Alice joining server...") - if _, err := alice.AddServer(string(serverKeyBundle)); err != nil { + if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil { + if len(cachedTokens) > len(aliceLines) { + alice.LoadCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)]) + } + t.Fatalf("Failed to Add Server Bundle %v", err) } @@ -260,6 +294,10 @@ func TestCwtchPeerIntegration(t *testing.T) { log.Infof("Parsed Overlay Message: %v", overlayMessage) err = bob.ImportBundle(overlayMessage.Data) log.Infof("Result of Bob Importing the Bundle from Alice: %v", err) + if len(cachedTokens) > (usedTokens + len(bobLines)) { + bob.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)]) + usedTokens += len(bobLines) + } log.Infof("Waiting for Bob to join connect to group server...") waitForConnection(t, bob, ServerAddr, connections.SYNCED) @@ -278,15 +316,17 @@ func TestCwtchPeerIntegration(t *testing.T) { checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) - // Pretend that Carol Aquires the Overlay Message through some other means... + // Pretend that Carol Acquires the Overlay Message through some other means... json.Unmarshal([]byte(message), &overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage) err = carol.ImportBundle(overlayMessage.Data) log.Infof("Result of Carol Importing the Bundle from Alice: %v", err) log.Infof("Waiting for Carol to join connect to group server...") carolGroupConversationID := 3 + if len(cachedTokens) > (usedTokens + len(carolLines)) { + carol.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)]) + } waitForConnection(t, carol, ServerAddr, connections.SYNCED) - numGoRoutinesPostCarolConnect := runtime.NumGoroutine() // Check Alice Timeline diff --git a/tools/cwtch_tools.go b/tools/cwtch_tools.go index 9375227..de94f05 100644 --- a/tools/cwtch_tools.go +++ b/tools/cwtch_tools.go @@ -128,6 +128,10 @@ func getTokens(bundle string) { type Handler struct { } +func (h Handler) PostingFailed(server string, sig []byte) { + +} + func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) { } From f2b879a9c497c6f3c5bcba205223159f3f372f42 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Oct 2022 13:59:05 -0700 Subject: [PATCH 2/5] Upgrade Tapir / Fix Token Acquisition --- app/app.go | 7 ++- go.mod | 2 +- go.sum | 4 +- peer/cwtch_peer.go | 58 +++++++++++++-------- peer/cwtchprofilestorage.go | 7 +-- protocol/connections/engine.go | 4 +- protocol/connections/token_manager.go | 12 ++--- protocol/connections/tokenboardclientapp.go | 11 +++- 8 files changed, 64 insertions(+), 41 deletions(-) diff --git a/app/app.go b/app/app.go index af87955..a4adc46 100644 --- a/app/app.go +++ b/app/app.go @@ -145,11 +145,14 @@ func (app *application) DeletePeer(onion string, password string) { defer app.appmutex.Unlock() if app.peers[onion].CheckPassword(password) { - app.shutdownPeer(onion) + // soft-shutdown + app.peers[onion].Shutdown() + // delete the underlying storage app.peers[onion].Delete() + // hard shutdown / remove from app + app.shutdownPeer(onion) // Shutdown and Remove the Engine - log.Debugf("Delete peer for %v Done\n", onion) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) return diff --git a/go.mod b/go.mod index 8c40e1b..22f10aa 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module cwtch.im/cwtch go 1.17 require ( - git.openprivacy.ca/cwtch.im/tapir v0.5.5 + git.openprivacy.ca/cwtch.im/tapir v0.6.0 git.openprivacy.ca/openprivacy/connectivity v1.8.6 git.openprivacy.ca/openprivacy/log v1.0.3 github.com/gtank/ristretto255 v0.1.3-0.20210930101514-6bb39798585c diff --git a/go.sum b/go.sum index b4f5627..d5839b5 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek= filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= -git.openprivacy.ca/cwtch.im/tapir v0.5.5 h1:km6UDrLYH/GCEn2s+S299/TiRHhxKCIAipYr9GbG3Hk= -git.openprivacy.ca/cwtch.im/tapir v0.5.5/go.mod h1:bWWHrDYBtHvxMri59RwIB/w7Eg1aC0BrQ/ycKlnbB5k= +git.openprivacy.ca/cwtch.im/tapir v0.6.0 h1:TtnKjxitkIDMM7Qn0n/u+mOHRLJzuQUYjYRu5n0/QFY= +git.openprivacy.ca/cwtch.im/tapir v0.6.0/go.mod h1:iQIq4y7N+DuP3CxyG66WNEC/d6vzh+wXvvOmelB+KoY= git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= git.openprivacy.ca/openprivacy/connectivity v1.8.6 h1:g74PyDGvpMZ3+K0dXy3mlTJh+e0rcwNk0XF8owzkmOA= diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 14c85c0..8109bd4 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -99,42 +99,54 @@ func (cp *cwtchPeer) Delete() { cp.storage.Delete() } +// CheckPassword returns true if the given password can be used to derive the key that encrypts the underlying +// cwtch storage database. Returns false otherwise. func (cp *cwtchPeer) CheckPassword(password string) bool { + + // this lock is not really needed, but because we directly access cp.storage.ProfileDirectory + // we keep it here. cp.mutex.Lock() defer cp.mutex.Unlock() + + // open *our* database with the given password (set createIfNotExists to false) db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false) if db == nil || err != nil { + // this will only fail in the rare cases that ProfileDirectory has been moved or deleted + // it is actually a critical error, but far beyond the scope of Cwtch to deal with. return false } - db.Close() + // check that the storage object is valid (this will fail if the DB key is incorrect) + cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory) + if err != nil { + // this will error if any SQL queries fail, which will be the case if the profile is invalid. + return false + } + // we have a valid database, close the storage (but don't purge as we may be using those conversations...) + cps.Close(false) + + // success! return true } func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpasswordAgain string) error { - cp.mutex.Lock() - defer cp.mutex.Unlock() - db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false) - if db == nil || err != nil { - return errors.New(constants.InvalidPasswordError) - } - cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory) - if err != nil { - return errors.New(constants.InvalidPasswordError) - } - cps.Close() + if cp.CheckPassword(password) { + cp.mutex.Lock() + defer cp.mutex.Unlock() - salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile)) - if err != nil { - return err - } + salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile)) + if err != nil { + return err + } - // probably redundant but we like api safety - if newpassword == newpasswordAgain { - rekey := createKey(newpassword, salt) - log.Infof("rekeying database...") - return cp.storage.Rekey(rekey) + // probably redundant but we like api safety + if newpassword == newpasswordAgain { + rekey := createKey(newpassword, salt) + log.Infof("rekeying database...") + return cp.storage.Rekey(rekey) + } + return errors.New(constants.PasswordsDoNotMatchError) } - return errors.New(constants.PasswordsDoNotMatchError) + return errors.New(constants.InvalidPasswordError) } // GenerateProtocolEngine @@ -1027,7 +1039,7 @@ func (cp *cwtchPeer) Shutdown() { cp.shutdown = true cp.queue.Shutdown() if cp.storage != nil { - cp.storage.Close() + cp.storage.Close(true) } } diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index 3ba6b81..8769b42 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -771,12 +771,13 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() { } // Close closes the underlying database and prepared statements -func (cps *CwtchProfileStorage) Close() { +func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) { cps.mutex.Lock() defer cps.mutex.Unlock() if cps.db != nil { - - cps.PurgeNonSavedMessages() + if purgeAllNonSavedMessages { + cps.PurgeNonSavedMessages() + } cps.insertProfileKeyValueStmt.Close() cps.selectProfileKeyValueStmt.Close() diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 1335a73..744750d 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -490,14 +490,14 @@ func (e *engine) peerAuthed(onion string) { func (e *engine) peerConnecting(onion string) { e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ - event.RemotePeer: string(onion), + event.RemotePeer: onion, event.ConnectionState: ConnectionStateName[CONNECTING], })) } func (e *engine) serverConnecting(onion string) { e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ - event.GroupServer: string(onion), + event.GroupServer: onion, event.ConnectionState: ConnectionStateName[CONNECTING], })) } diff --git a/protocol/connections/token_manager.go b/protocol/connections/token_manager.go index 4e0ed7f..12ab0a1 100644 --- a/protocol/connections/token_manager.go +++ b/protocol/connections/token_manager.go @@ -4,18 +4,19 @@ import ( "encoding/json" "errors" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" + "git.openprivacy.ca/openprivacy/log" "sync" ) // TokenManager maintains a list of tokens associated with a single TokenServer type TokenManager struct { lock sync.Mutex - tokens map[string]bool + tokens map[string]*privacypass.Token } func NewTokenManager() *TokenManager { tm := new(TokenManager) - tm.tokens = make(map[string]bool) + tm.tokens = make(map[string]*privacypass.Token) return tm } @@ -23,9 +24,10 @@ func NewTokenManager() *TokenManager { func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) { tm.lock.Lock() defer tm.lock.Unlock() + log.Debugf("acquired %v new tokens", tokens) for _, token := range tokens { serialized, _ := json.Marshal(token) - tm.tokens[string(serialized)] = true + tm.tokens[string(serialized)] = token } } @@ -44,10 +46,8 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) { if len(tm.tokens) == 0 { return nil, 0, errors.New("no more tokens") } - for serializedToken := range tm.tokens { + for serializedToken, token := range tm.tokens { delete(tm.tokens, serializedToken) - token := new(privacypass.Token) - json.Unmarshal([]byte(serializedToken), token) return token, len(tm.tokens), nil } return nil, 0, errors.New("no more tokens") diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 0b300ba..bf37cdf 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -66,17 +66,22 @@ func (ta *TokenBoardClient) NewInstance() tapir.Application { // Init initializes the cryptographic TokenBoardApp func (ta *TokenBoardClient) Init(connection tapir.Connection) { + // connection.Hostname is always valid because we are ALWAYS the initiating party + log.Debugf("connecting to server: %v", connection.Hostname()) ta.AuthApp.Init(connection) + log.Debugf("server protocol complete: %v", connection.Hostname()) if connection.HasCapability(applications.AuthCapability) { - ta.connection = connection - ta.tokenBoardHandler.ServerAuthedHandler(ta.connection.Hostname()) log.Debugf("Successfully Initialized Connection to %v", connection.Hostname()) + ta.connection = connection + ta.tokenBoardHandler.ServerAuthedHandler(connection.Hostname()) go ta.Listen() // Optimistically acquire many tokens for this server... go ta.PurchaseTokens() go ta.PurchaseTokens() ta.Replay() } else { + log.Debugf("Error Connecting to %v", connection.Hostname()) + ta.tokenBoardHandler.ServerClosedHandler(connection.Hostname()) connection.Close() } } @@ -117,10 +122,12 @@ func (ta *TokenBoardClient) Listen() { ta.postQueue = ta.postQueue[1:] ta.postLock.Unlock() if !message.PostResult.Success { + log.Debugf("post result message: %v", message.PostResult) // Retry using another token posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature) // if posting failed... if !posted { + log.Errorf("error posting message") ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature) } } From 4324ffae03cf741f5b80dc34803f6bffb44176fd Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Oct 2022 14:05:08 -0700 Subject: [PATCH 3/5] safely close db when cps fails --- peer/cwtchprofilestorage.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index 8769b42..e7cca3f 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -124,66 +124,77 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err) return nil, err } selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err) return nil, err } findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err) return nil, err } insertConversationStmt, err := db.Prepare(insertConversationSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err) return nil, err } fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err) return nil, err } selectConversationStmt, err := db.Prepare(selectConversationSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err) return nil, err } selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err) return nil, err } acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err) return nil, err } deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err) return nil, err } setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err) return nil, err } setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt) if err != nil { + db.Close() log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err) return nil, err } From 0ba45cd59a49e4876ac646d9ecbccb75e2fe96cc Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 23 Nov 2022 08:01:22 -0800 Subject: [PATCH 4/5] Move Cached Token Loading into Server Join (from SMTG) --- peer/cwtch_peer.go | 21 +++++++------- protocol/connections/engine.go | 29 ++++++++++---------- protocol/connections/engine_token_handler.go | 2 +- protocol/connections/token_manager.go | 4 +-- 4 files changed, 27 insertions(+), 29 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 8109bd4..e67d999 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -303,16 +303,9 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) log.Debugf("sending message to group: %v", conversationInfo.ID) id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text)) if err == nil { - serverCI, err := cp.FetchConversationInfo(group.GroupServer) - if err == nil { - cachedTokensJson, hasCachedTokens := serverCI.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") - if hasCachedTokens { - log.Debugf("using cached tokens for %v", conversationInfo.Handle) - } - ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig), event.CachedTokens: cachedTokensJson}) - cp.eventBus.Publish(ev) - return id, nil - } + ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) + cp.eventBus.Publish(ev) + return id, nil } return -1, err } @@ -951,7 +944,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error { if !exists { signature = base64.StdEncoding.EncodeToString([]byte{}) } - cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature})) + + cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens") + if hasCachedTokens { + log.Debugf("using cached tokens for %v", ci.Handle) + } + + cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson})) return nil } return errors.New("no keys found for server connection") diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 744750d..57b6fc3 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -169,7 +169,16 @@ func (e *engine) eventHandler() { // will result in a full sync signature = []byte{} } - go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + // if we have been sent cached tokens, also deserialize them + cachedTokensJson := ev.Data[event.CachedTokens] + var cachedTokens []*privacypass.Token + if len(cachedTokensJson) != 0 { + json.Unmarshal([]byte(cachedTokensJson), &cachedTokens) + } + + // create a new token handler... + e.NewTokenHandler(ev.Data[event.ServerTokenOnion], cachedTokens) + go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature, cachedTokens) case event.MakeAntispamPayment: go e.makeAntispamPayment(ev.Data[event.GroupServer]) case event.LeaveServer: @@ -183,15 +192,8 @@ func (e *engine) eventHandler() { ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - // if we have been sent cached tokens, also deserialize them - cachedTokensJson := ev.Data[event.CachedTokens] - var cachedTokens []*privacypass.Token - if len(cachedTokensJson) != 0 { - json.Unmarshal([]byte(cachedTokensJson), &cachedTokens) - } - // launch a goroutine to post to the server - go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0, cachedTokens) + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] @@ -388,7 +390,7 @@ func (e *engine) makeAntispamPayment(onion string) { // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. -func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { +func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte, cachedTokens []*privacypass.Token) { e.ephemeralServicesLock.Lock() _, exists := e.ephemeralServices[onion] @@ -599,7 +601,7 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes } // sendMessageToGroup attempts to sent the given message to the given group id. -func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int, cachedTokens []*privacypass.Token) { +func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) // rather than trying to keep all that logic in method we simply back-off and try again // but if we fail more than 5 times then we report back to the client so they can investigate other options. @@ -624,9 +626,6 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) - - tokenApp.tokenBoardHandler.NewTokenHandler(tokenApp.tokenServiceOnion, cachedTokens) - if ok { if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent { // we failed to post, probably because we ran out of tokens... so make a payment @@ -635,7 +634,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si time.Sleep(time.Second * 5) // try again log.Debugf("sending message to group error attempt: %v", attempts) - e.sendMessageToGroup(groupID, server, ct, sig, attempts+1, []*privacypass.Token{}) + e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) } else { if numtokens < 5 { go tokenApp.PurchaseTokens() diff --git a/protocol/connections/engine_token_handler.go b/protocol/connections/engine_token_handler.go index 6585029..ec02adc 100644 --- a/protocol/connections/engine_token_handler.go +++ b/protocol/connections/engine_token_handler.go @@ -39,7 +39,7 @@ func (e *engine) ServerClosedHandler(server string) { func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) { tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) tokenManager := tokenManagerPointer.(*TokenManager) - tokenManager.NewTokens(tokens) + tokenManager.StoreNewTokens(tokens) e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())})) } diff --git a/protocol/connections/token_manager.go b/protocol/connections/token_manager.go index 12ab0a1..bba540b 100644 --- a/protocol/connections/token_manager.go +++ b/protocol/connections/token_manager.go @@ -20,8 +20,8 @@ func NewTokenManager() *TokenManager { return tm } -// NewTokens adds tokens to the internal list managed by this TokenManager -func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) { +// StoreNewTokens adds tokens to the internal list managed by this TokenManager +func (tm *TokenManager) StoreNewTokens(tokens []*privacypass.Token) { tm.lock.Lock() defer tm.lock.Unlock() log.Debugf("acquired %v new tokens", tokens) From e319976832794d16f1a79b41476c12b042adf358 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 30 Nov 2022 07:58:37 -0800 Subject: [PATCH 5/5] Load->StoreCachedTokens --- peer/cwtch_peer.go | 2 +- peer/profile_interface.go | 2 +- testing/cwtch_peer_server_integration_test.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index e67d999..2911630 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -76,7 +76,7 @@ type cwtchPeer struct { eventBus event.Manager } -func (cp *cwtchPeer) LoadCachedTokens(tokenServer string, tokens []*privacypass.Token) { +func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) { ci, err := cp.FetchConversationInfo(tokenServer) if ci != nil && err == nil { // Overwrite any existing tokens.. diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 9e95a56..51d6c38 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -125,7 +125,7 @@ type CwtchPeer interface { // Server Token APIS // TODO move these to feature protected interfaces - LoadCachedTokens(tokenServer string, tokens []*privacypass.Token) + StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) // Profile Management CheckPassword(password string) bool diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index ce554ca..e1f95ed 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -260,7 +260,7 @@ func TestCwtchPeerIntegration(t *testing.T) { log.Infoln("Alice joining server...") if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil { if len(cachedTokens) > len(aliceLines) { - alice.LoadCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)]) + alice.StoreCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)]) } t.Fatalf("Failed to Add Server Bundle %v", err) @@ -295,7 +295,7 @@ func TestCwtchPeerIntegration(t *testing.T) { err = bob.ImportBundle(overlayMessage.Data) log.Infof("Result of Bob Importing the Bundle from Alice: %v", err) if len(cachedTokens) > (usedTokens + len(bobLines)) { - bob.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)]) + bob.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)]) usedTokens += len(bobLines) } @@ -324,7 +324,7 @@ func TestCwtchPeerIntegration(t *testing.T) { log.Infof("Waiting for Carol to join connect to group server...") carolGroupConversationID := 3 if len(cachedTokens) > (usedTokens + len(carolLines)) { - carol.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)]) + carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)]) } waitForConnection(t, carol, ServerAddr, connections.SYNCED) numGoRoutinesPostCarolConnect := runtime.NumGoroutine() @@ -394,7 +394,7 @@ func TestCwtchPeerIntegration(t *testing.T) { pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ - "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", + "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)