Allow using cached tokens for local integ testing

(also new TORCACHE env for integ testing to speed up bootstrapping locally)
This commit is contained in:
Sarah Jamie Lewis 2022-10-11 10:58:10 -07:00 committed by Gitea
parent 8a1f9376e2
commit c66561d84f
10 changed files with 156 additions and 23 deletions

View File

@ -219,6 +219,7 @@ const (
RemotePeer = Field("RemotePeer")
Ciphertext = Field("Ciphertext")
Signature = Field("Signature")
CachedTokens = Field("CachedTokens")
PreviousSignature = Field("PreviousSignature")
TimestampSent = Field("TimestampSent")
TimestampReceived = Field("TimestampReceived")

View File

@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
@ -75,6 +76,17 @@ type cwtchPeer struct {
eventBus event.Manager
}
func (cp *cwtchPeer) LoadCachedTokens(tokenServer string, tokens []*privacypass.Token) {
ci, err := cp.FetchConversationInfo(tokenServer)
if ci != nil && err == nil {
// Overwrite any existing tokens..
tokenPath := attr.LocalScope.ConstructScopedZonedPath(attr.ServerZone.ConstructZonedPath("tokens"))
data, _ := json.Marshal(tokens)
log.Debugf("storing cached tokens for %v", tokenServer)
cp.SetConversationAttribute(ci.ID, tokenPath, string(data))
}
}
func (cp *cwtchPeer) Export(file string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
@ -279,10 +291,17 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
log.Debugf("sending message to group: %v", conversationInfo.ID)
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text))
if err == nil {
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
serverCI, err := cp.FetchConversationInfo(group.GroupServer)
if err == nil {
cachedTokensJson, hasCachedTokens := serverCI.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", conversationInfo.Handle)
}
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig), event.CachedTokens: cachedTokensJson})
cp.eventBus.Publish(ev)
return id, nil
}
}
return -1, err
}
}
@ -1395,6 +1414,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
// by the given handle and attempts to mark the message as errored. returns error on failure
// to either find the contact or the associated message
func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error {
ci, err := cp.FetchConversationInfo(handle)
// We should *never* received an error for a conversation that doesn't exist...
if ci != nil && err == nil {

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
)
@ -116,9 +117,17 @@ type CwtchPeer interface {
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
// File Sharing APIS
// TODO move these to feature protected interfaces
ShareFile(fileKey string, serializedManifest string)
StopFileShare(fileKey string)
StopAllFileShares()
// Server Token APIS
// TODO move these to feature protected interfaces
LoadCachedTokens(tokenServer string, tokens []*privacypass.Token)
// Profile Management
CheckPassword(password string) bool
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
Export(file string) error

View File

@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"strconv"
"strings"
"sync"
@ -181,7 +182,16 @@ func (e *engine) eventHandler() {
case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0)
// if we have been sent cached tokens, also deserialize them
cachedTokensJson := ev.Data[event.CachedTokens]
var cachedTokens []*privacypass.Token
if len(cachedTokensJson) != 0 {
json.Unmarshal([]byte(cachedTokensJson), &cachedTokens)
}
// launch a goroutine to post to the server
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0, cachedTokens)
case event.SendMessageToPeer:
// TODO: remove this passthrough once the UI is integrated.
context, ok := ev.Data[event.EventContext]
@ -366,7 +376,7 @@ func (e *engine) makeAntispamPayment(onion string) {
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
if tokenManager.NumTokens() < 5 {
@ -589,7 +599,7 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
}
// sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) {
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int, cachedTokens []*privacypass.Token) {
// sending to groups can fail for a few reasons (slow server, not enough tokens, etc.)
// rather than trying to keep all that logic in method we simply back-off and try again
// but if we fail more than 5 times then we report back to the client so they can investigate other options.
@ -614,15 +624,18 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
tokenApp.tokenBoardHandler.NewTokenHandler(tokenApp.tokenServiceOnion, cachedTokens)
if ok {
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent {
// we failed to post, probably because we ran out of tokens... so make a payment
go tokenApp.PurchaseTokens()
// backoff
time.Sleep(time.Second * 5)
// try again
log.Debugf("sending message to group error attempt: %v", attempts)
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1)
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1, []*privacypass.Token{})
} else {
if numtokens < 5 {
go tokenApp.PurchaseTokens()

View File

@ -3,6 +3,7 @@ package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"strconv"
)
@ -14,6 +15,11 @@ func (e *engine) GroupMessageHandler(server string, gm *groups.EncryptedGroupMes
e.receiveGroupMessage(server, gm)
}
// PostingFailed notifies a peer that a message failed to post
func (e *engine) PostingFailed(group string, sig []byte) {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: group, event.Error: "failed to post message", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
// ServerAuthedHandler is notified when a server has successfully authed
func (e *engine) ServerAuthedHandler(server string) {
e.serverAuthed(server)
@ -31,7 +37,7 @@ func (e *engine) ServerClosedHandler(server string) {
// NewTokenHandler is notified after a successful token acquisition
func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
tokenManager.NewTokens(tokens)
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
@ -39,7 +45,7 @@ func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Toke
// FetchToken is notified when a server requires a new token from the client
func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
token, numTokens, err := tokenManager.FetchToken()
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))

View File

@ -1,6 +1,7 @@
package connections
import (
"encoding/json"
"errors"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"sync"
@ -9,14 +10,23 @@ import (
// TokenManager maintains a list of tokens associated with a single TokenServer
type TokenManager struct {
lock sync.Mutex
tokens []*privacypass.Token
tokens map[string]bool
}
func NewTokenManager() *TokenManager {
tm := new(TokenManager)
tm.tokens = make(map[string]bool)
return tm
}
// NewTokens adds tokens to the internal list managed by this TokenManager
func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) {
tm.lock.Lock()
defer tm.lock.Unlock()
tm.tokens = append(tm.tokens, tokens...)
for _, token := range tokens {
serialized, _ := json.Marshal(token)
tm.tokens[string(serialized)] = true
}
}
// NumTokens returns the current number of tokens
@ -34,7 +44,11 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) {
if len(tm.tokens) == 0 {
return nil, 0, errors.New("no more tokens")
}
token := tm.tokens[0]
tm.tokens = tm.tokens[1:]
for serializedToken := range tm.tokens {
delete(tm.tokens, serializedToken)
token := new(privacypass.Token)
json.Unmarshal([]byte(serializedToken), token)
return token, len(tm.tokens), nil
}
return nil, 0, errors.New("no more tokens")
}

View File

@ -9,6 +9,7 @@ import (
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255"
"sync"
)
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
@ -19,6 +20,7 @@ type TokenBoardHandler interface {
ServerSyncedHandler(server string)
ServerClosedHandler(server string)
NewTokenHandler(tokenService string, tokens []*privacypass.Token)
PostingFailed(server string, sig []byte)
FetchToken(tokenService string) (*privacypass.Token, int, error)
}
@ -46,6 +48,9 @@ type TokenBoardClient struct {
tokenService *privacypass.TokenServer
tokenServiceOnion string
lastKnownSignature []byte
postLock sync.Mutex
postQueue []groups.CachedEncryptedGroupMessage
}
// NewInstance Client a new TokenBoardApp
@ -107,7 +112,18 @@ func (ta *TokenBoardClient) Listen() {
return
}
case groups.PostResultMessage:
// TODO handle failure
ta.postLock.Lock()
egm := ta.postQueue[0]
ta.postQueue = ta.postQueue[1:]
ta.postLock.Unlock()
if !message.PostResult.Success {
// Retry using another token
posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature)
// if posting failed...
if !posted {
ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature)
}
}
case groups.ReplayResultMessage:
if message.ReplayResult != nil {
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
@ -151,13 +167,17 @@ func (ta *TokenBoardClient) PurchaseTokens() {
}
// Post sends a Post Request to the server
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
func (ta *TokenBoardClient) Post(group string, ct []byte, sig []byte) (bool, int) {
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
if err == nil {
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
ta.postLock.Lock()
// ONLY put group in the EGM as a cache / for error reporting...
ta.postQueue = append(ta.postQueue, groups.CachedEncryptedGroupMessage{Group: group, EncryptedGroupMessage: egm})
log.Debugf("Message Length: %s %v", data, len(data))
err := ta.connection.Send(data)
ta.postLock.Unlock()
if err != nil {
return false, numTokens
}

View File

@ -39,6 +39,12 @@ type EncryptedGroupMessage struct {
Signature []byte
}
// CachedEncryptedGroupMessage provides an encapsulation of the encrypted group message for local caching / error reporting
type CachedEncryptedGroupMessage struct {
EncryptedGroupMessage
Group string
}
// ToBytes converts the encrypted group message to a set of bytes for serialization
func (egm EncryptedGroupMessage) ToBytes() []byte {
data, _ := json.Marshal(egm)

View File

@ -12,6 +12,7 @@ import (
"cwtch.im/cwtch/protocol/connections"
"encoding/base64"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
@ -19,6 +20,7 @@ import (
"os"
"os/user"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"testing"
@ -51,6 +53,18 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
}
}
func checkAndLoadTokens() []*privacypass.Token {
var tokens []*privacypass.Token
data, err := os.ReadFile("../tokens")
if err == nil {
err := json.Unmarshal(data, &tokens)
if err != nil {
log.Errorf("could not load tokens from file")
}
}
return tokens
}
func TestCwtchPeerIntegration(t *testing.T) {
// Goroutine Monitoring Start..
@ -62,6 +76,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("tapir")
// checking if we should use the token cache
cachedTokens := checkAndLoadTokens()
if len(cachedTokens) > 7 {
log.Infof("using cached tokens")
}
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
@ -78,10 +99,19 @@ func TestCwtchPeerIntegration(t *testing.T) {
panic(err)
}
useCache := os.Getenv("TORCACHE") == "true"
torDataDir := ""
if useCache {
log.Infof("using tor cache")
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
os.MkdirAll(torDataDir, 0700)
} else {
log.Infof("using clean tor data dir")
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
@ -225,10 +255,14 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Alice has carol's name as '%v'\n", carolName)
// Group Testing
usedTokens := len(aliceLines)
// Simulate Alice Creating a Group
log.Infoln("Alice joining server...")
if _, err := alice.AddServer(string(serverKeyBundle)); err != nil {
if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil {
if len(cachedTokens) > len(aliceLines) {
alice.LoadCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)])
}
t.Fatalf("Failed to Add Server Bundle %v", err)
}
@ -260,6 +294,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = bob.ImportBundle(overlayMessage.Data)
log.Infof("Result of Bob Importing the Bundle from Alice: %v", err)
if len(cachedTokens) > (usedTokens + len(bobLines)) {
bob.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)])
usedTokens += len(bobLines)
}
log.Infof("Waiting for Bob to join connect to group server...")
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
@ -278,15 +316,17 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1])
// Pretend that Carol Aquires the Overlay Message through some other means...
// Pretend that Carol Acquires the Overlay Message through some other means...
json.Unmarshal([]byte(message), &overlayMessage)
log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = carol.ImportBundle(overlayMessage.Data)
log.Infof("Result of Carol Importing the Bundle from Alice: %v", err)
log.Infof("Waiting for Carol to join connect to group server...")
carolGroupConversationID := 3
if len(cachedTokens) > (usedTokens + len(carolLines)) {
carol.LoadCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
}
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
// Check Alice Timeline

View File

@ -128,6 +128,10 @@ func getTokens(bundle string) {
type Handler struct {
}
func (h Handler) PostingFailed(server string, sig []byte) {
}
func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) {
}