Allow using cached tokens for local integ testing #470

Merged
sarah merged 5 commits from cached_tokens into master 2022-11-30 16:51:55 +00:00
14 changed files with 224 additions and 59 deletions

View File

@ -145,11 +145,14 @@ func (app *application) DeletePeer(onion string, password string) {
defer app.appmutex.Unlock()
if app.peers[onion].CheckPassword(password) {
app.shutdownPeer(onion)
// soft-shutdown
app.peers[onion].Shutdown()
// delete the underlying storage
app.peers[onion].Delete()
// hard shutdown / remove from app
app.shutdownPeer(onion)
// Shutdown and Remove the Engine
log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
return

View File

@ -219,6 +219,7 @@ const (
RemotePeer = Field("RemotePeer")
Ciphertext = Field("Ciphertext")
Signature = Field("Signature")
CachedTokens = Field("CachedTokens")
PreviousSignature = Field("PreviousSignature")
TimestampSent = Field("TimestampSent")
TimestampReceived = Field("TimestampReceived")

2
go.mod
View File

@ -3,7 +3,7 @@ module cwtch.im/cwtch
go 1.17
require (
git.openprivacy.ca/cwtch.im/tapir v0.5.5
git.openprivacy.ca/cwtch.im/tapir v0.6.0
git.openprivacy.ca/openprivacy/connectivity v1.8.6
git.openprivacy.ca/openprivacy/log v1.0.3
github.com/gtank/ristretto255 v0.1.3-0.20210930101514-6bb39798585c

4
go.sum
View File

@ -1,8 +1,8 @@
filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek=
filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
git.openprivacy.ca/cwtch.im/tapir v0.5.5 h1:km6UDrLYH/GCEn2s+S299/TiRHhxKCIAipYr9GbG3Hk=
git.openprivacy.ca/cwtch.im/tapir v0.5.5/go.mod h1:bWWHrDYBtHvxMri59RwIB/w7Eg1aC0BrQ/ycKlnbB5k=
git.openprivacy.ca/cwtch.im/tapir v0.6.0 h1:TtnKjxitkIDMM7Qn0n/u+mOHRLJzuQUYjYRu5n0/QFY=
git.openprivacy.ca/cwtch.im/tapir v0.6.0/go.mod h1:iQIq4y7N+DuP3CxyG66WNEC/d6vzh+wXvvOmelB+KoY=
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
git.openprivacy.ca/openprivacy/connectivity v1.8.6 h1:g74PyDGvpMZ3+K0dXy3mlTJh+e0rcwNk0XF8owzkmOA=

View File

@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
@ -75,6 +76,17 @@ type cwtchPeer struct {
eventBus event.Manager
}
func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) {
ci, err := cp.FetchConversationInfo(tokenServer)
if ci != nil && err == nil {
// Overwrite any existing tokens..
tokenPath := attr.LocalScope.ConstructScopedZonedPath(attr.ServerZone.ConstructZonedPath("tokens"))
data, _ := json.Marshal(tokens)
log.Debugf("storing cached tokens for %v", tokenServer)
cp.SetConversationAttribute(ci.ID, tokenPath, string(data))
}
}
func (cp *cwtchPeer) Export(file string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
@ -87,42 +99,54 @@ func (cp *cwtchPeer) Delete() {
cp.storage.Delete()
}
// CheckPassword returns true if the given password can be used to derive the key that encrypts the underlying
// cwtch storage database. Returns false otherwise.
func (cp *cwtchPeer) CheckPassword(password string) bool {
// this lock is not really needed, but because we directly access cp.storage.ProfileDirectory
// we keep it here.
cp.mutex.Lock()
defer cp.mutex.Unlock()
// open *our* database with the given password (set createIfNotExists to false)
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false)
if db == nil || err != nil {
// this will only fail in the rare cases that ProfileDirectory has been moved or deleted
// it is actually a critical error, but far beyond the scope of Cwtch to deal with.
return false
}
db.Close()
// check that the storage object is valid (this will fail if the DB key is incorrect)
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
if err != nil {
// this will error if any SQL queries fail, which will be the case if the profile is invalid.
return false
}
// we have a valid database, close the storage (but don't purge as we may be using those conversations...)
cps.Close(false)
// success!
return true
}
func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpasswordAgain string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false)
if db == nil || err != nil {
return errors.New(constants.InvalidPasswordError)
}
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
if err != nil {
return errors.New(constants.InvalidPasswordError)
}
cps.Close()
if cp.CheckPassword(password) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile))
if err != nil {
return err
}
salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile))
if err != nil {
return err
}
// probably redundant but we like api safety
if newpassword == newpasswordAgain {
rekey := createKey(newpassword, salt)
log.Infof("rekeying database...")
return cp.storage.Rekey(rekey)
// probably redundant but we like api safety
if newpassword == newpasswordAgain {
rekey := createKey(newpassword, salt)
log.Infof("rekeying database...")
return cp.storage.Rekey(rekey)
}
return errors.New(constants.PasswordsDoNotMatchError)
}
return errors.New(constants.PasswordsDoNotMatchError)
return errors.New(constants.InvalidPasswordError)
}
// GenerateProtocolEngine
@ -920,7 +944,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{})
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil
}
return errors.New("no keys found for server connection")
@ -1008,7 +1038,7 @@ func (cp *cwtchPeer) Shutdown() {
cp.shutdown = true
cp.queue.Shutdown()
if cp.storage != nil {
cp.storage.Close()
cp.storage.Close(true)
}
}
@ -1395,6 +1425,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
// by the given handle and attempts to mark the message as errored. returns error on failure
// to either find the contact or the associated message
func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error {
ci, err := cp.FetchConversationInfo(handle)
// We should *never* received an error for a conversation that doesn't exist...
if ci != nil && err == nil {

View File

@ -124,66 +124,77 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err)
return nil, err
}
selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err)
return nil, err
}
findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err)
return nil, err
}
insertConversationStmt, err := db.Prepare(insertConversationSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err)
return nil, err
}
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
return nil, err
}
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
return nil, err
}
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
return nil, err
}
acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err)
return nil, err
}
deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err)
return nil, err
}
setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err)
return nil, err
}
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
return nil, err
}
@ -771,12 +782,13 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
}
// Close closes the underlying database and prepared statements
func (cps *CwtchProfileStorage) Close() {
func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) {
dan marked this conversation as resolved
Review

just asking, whats the future use of this planned to be?

just asking, whats the future use of this planned to be?
Review

it's used now to differentiate between closing a concurrent database (after opening it to check the password - where we don't want to purge all temp messages) v.s. closing it to shutdown cwtch (where we do)

it's used now to differentiate between closing a concurrent database (after opening it to check the password - where we don't want to purge all temp messages) v.s. closing it to shutdown cwtch (where we do)
cps.mutex.Lock()
defer cps.mutex.Unlock()
if cps.db != nil {
cps.PurgeNonSavedMessages()
if purgeAllNonSavedMessages {
cps.PurgeNonSavedMessages()
}
cps.insertProfileKeyValueStmt.Close()
cps.selectProfileKeyValueStmt.Close()

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
)
@ -116,9 +117,17 @@ type CwtchPeer interface {
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
// File Sharing APIS
// TODO move these to feature protected interfaces
ShareFile(fileKey string, serializedManifest string)
StopFileShare(fileKey string)
StopAllFileShares()
// Server Token APIS
// TODO move these to feature protected interfaces
StoreCachedTokens(tokenServer string, tokens []*privacypass.Token)
// Profile Management
CheckPassword(password string) bool
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
Export(file string) error

View File

@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"strconv"
"strings"
"sync"
@ -168,7 +169,16 @@ func (e *engine) eventHandler() {
// will result in a full sync
signature = []byte{}
}
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
// if we have been sent cached tokens, also deserialize them
cachedTokensJson := ev.Data[event.CachedTokens]
var cachedTokens []*privacypass.Token
if len(cachedTokensJson) != 0 {
json.Unmarshal([]byte(cachedTokensJson), &cachedTokens)
}
// create a new token handler...
e.NewTokenHandler(ev.Data[event.ServerTokenOnion], cachedTokens)
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature, cachedTokens)
case event.MakeAntispamPayment:
go e.makeAntispamPayment(ev.Data[event.GroupServer])
case event.LeaveServer:
@ -181,6 +191,8 @@ func (e *engine) eventHandler() {
case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
// launch a goroutine to post to the server
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0)
case event.SendMessageToPeer:
// TODO: remove this passthrough once the UI is integrated.
@ -366,7 +378,7 @@ func (e *engine) makeAntispamPayment(onion string) {
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
if tokenManager.NumTokens() < 5 {
@ -378,7 +390,7 @@ func (e *engine) makeAntispamPayment(onion string) {
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte, cachedTokens []*privacypass.Token) {
e.ephemeralServicesLock.Lock()
_, exists := e.ephemeralServices[onion]
@ -480,14 +492,14 @@ func (e *engine) peerAuthed(onion string) {
func (e *engine) peerConnecting(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.RemotePeer: onion,
event.ConnectionState: ConnectionStateName[CONNECTING],
}))
}
func (e *engine) serverConnecting(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(onion),
event.GroupServer: onion,
event.ConnectionState: ConnectionStateName[CONNECTING],
}))
}
@ -615,7 +627,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent {
// we failed to post, probably because we ran out of tokens... so make a payment
go tokenApp.PurchaseTokens()
// backoff

View File

@ -3,6 +3,7 @@ package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"strconv"
)
@ -14,6 +15,11 @@ func (e *engine) GroupMessageHandler(server string, gm *groups.EncryptedGroupMes
e.receiveGroupMessage(server, gm)
}
// PostingFailed notifies a peer that a message failed to post
func (e *engine) PostingFailed(group string, sig []byte) {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: group, event.Error: "failed to post message", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
// ServerAuthedHandler is notified when a server has successfully authed
func (e *engine) ServerAuthedHandler(server string) {
e.serverAuthed(server)
@ -31,15 +37,15 @@ func (e *engine) ServerClosedHandler(server string) {
// NewTokenHandler is notified after a successful token acquisition
func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
tokenManager.NewTokens(tokens)
tokenManager.StoreNewTokens(tokens)
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
}
// FetchToken is notified when a server requires a new token from the client
func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
token, numTokens, err := tokenManager.FetchToken()
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))

View File

@ -1,22 +1,34 @@
package connections
import (
"encoding/json"
"errors"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/log"
"sync"
)
// TokenManager maintains a list of tokens associated with a single TokenServer
type TokenManager struct {
lock sync.Mutex
tokens []*privacypass.Token
tokens map[string]*privacypass.Token
}
// NewTokens adds tokens to the internal list managed by this TokenManager
func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) {
func NewTokenManager() *TokenManager {
tm := new(TokenManager)
tm.tokens = make(map[string]*privacypass.Token)
return tm
}
// StoreNewTokens adds tokens to the internal list managed by this TokenManager
func (tm *TokenManager) StoreNewTokens(tokens []*privacypass.Token) {
Review

it's called 'store' here

it's called 'store' here
tm.lock.Lock()
defer tm.lock.Unlock()
tm.tokens = append(tm.tokens, tokens...)
log.Debugf("acquired %v new tokens", tokens)
for _, token := range tokens {
serialized, _ := json.Marshal(token)
tm.tokens[string(serialized)] = token
}
}
// NumTokens returns the current number of tokens
@ -34,7 +46,9 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) {
if len(tm.tokens) == 0 {
return nil, 0, errors.New("no more tokens")
}
token := tm.tokens[0]
tm.tokens = tm.tokens[1:]
return token, len(tm.tokens), nil
for serializedToken, token := range tm.tokens {
dan marked this conversation as resolved
Review

wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here

wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here
Review

changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention)

changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention)
delete(tm.tokens, serializedToken)
return token, len(tm.tokens), nil
}
return nil, 0, errors.New("no more tokens")
}

View File

@ -9,6 +9,7 @@ import (
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255"
"sync"
)
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
@ -19,6 +20,7 @@ type TokenBoardHandler interface {
ServerSyncedHandler(server string)
ServerClosedHandler(server string)
NewTokenHandler(tokenService string, tokens []*privacypass.Token)
PostingFailed(server string, sig []byte)
FetchToken(tokenService string) (*privacypass.Token, int, error)
}
@ -46,6 +48,9 @@ type TokenBoardClient struct {
tokenService *privacypass.TokenServer
tokenServiceOnion string
lastKnownSignature []byte
postLock sync.Mutex
postQueue []groups.CachedEncryptedGroupMessage
}
// NewInstance Client a new TokenBoardApp
@ -61,17 +66,22 @@ func (ta *TokenBoardClient) NewInstance() tapir.Application {
// Init initializes the cryptographic TokenBoardApp
func (ta *TokenBoardClient) Init(connection tapir.Connection) {
// connection.Hostname is always valid because we are ALWAYS the initiating party
log.Debugf("connecting to server: %v", connection.Hostname())
ta.AuthApp.Init(connection)
log.Debugf("server protocol complete: %v", connection.Hostname())
if connection.HasCapability(applications.AuthCapability) {
ta.connection = connection
ta.tokenBoardHandler.ServerAuthedHandler(ta.connection.Hostname())
log.Debugf("Successfully Initialized Connection to %v", connection.Hostname())
ta.connection = connection
ta.tokenBoardHandler.ServerAuthedHandler(connection.Hostname())
go ta.Listen()
// Optimistically acquire many tokens for this server...
go ta.PurchaseTokens()
go ta.PurchaseTokens()
ta.Replay()
} else {
log.Debugf("Error Connecting to %v", connection.Hostname())
ta.tokenBoardHandler.ServerClosedHandler(connection.Hostname())
connection.Close()
}
}
@ -107,7 +117,20 @@ func (ta *TokenBoardClient) Listen() {
return
}
case groups.PostResultMessage:
// TODO handle failure
ta.postLock.Lock()
egm := ta.postQueue[0]
ta.postQueue = ta.postQueue[1:]
ta.postLock.Unlock()
if !message.PostResult.Success {
log.Debugf("post result message: %v", message.PostResult)
// Retry using another token
posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature)
// if posting failed...
if !posted {
log.Errorf("error posting message")
ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature)
}
}
case groups.ReplayResultMessage:
if message.ReplayResult != nil {
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
@ -151,13 +174,17 @@ func (ta *TokenBoardClient) PurchaseTokens() {
}
// Post sends a Post Request to the server
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
func (ta *TokenBoardClient) Post(group string, ct []byte, sig []byte) (bool, int) {
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
if err == nil {
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
ta.postLock.Lock()
// ONLY put group in the EGM as a cache / for error reporting...
ta.postQueue = append(ta.postQueue, groups.CachedEncryptedGroupMessage{Group: group, EncryptedGroupMessage: egm})
log.Debugf("Message Length: %s %v", data, len(data))
err := ta.connection.Send(data)
ta.postLock.Unlock()
if err != nil {
return false, numTokens
}

View File

@ -39,6 +39,12 @@ type EncryptedGroupMessage struct {
Signature []byte
}
// CachedEncryptedGroupMessage provides an encapsulation of the encrypted group message for local caching / error reporting
type CachedEncryptedGroupMessage struct {
EncryptedGroupMessage
Group string
}
// ToBytes converts the encrypted group message to a set of bytes for serialization
func (egm EncryptedGroupMessage) ToBytes() []byte {
data, _ := json.Marshal(egm)

View File

@ -12,6 +12,7 @@ import (
"cwtch.im/cwtch/protocol/connections"
"encoding/base64"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
@ -19,6 +20,7 @@ import (
"os"
"os/user"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"testing"
@ -51,6 +53,18 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
}
}
func checkAndLoadTokens() []*privacypass.Token {
var tokens []*privacypass.Token
data, err := os.ReadFile("../tokens")
if err == nil {
err := json.Unmarshal(data, &tokens)
if err != nil {
log.Errorf("could not load tokens from file")
}
}
return tokens
}
func TestCwtchPeerIntegration(t *testing.T) {
// Goroutine Monitoring Start..
@ -62,6 +76,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("tapir")
// checking if we should use the token cache
cachedTokens := checkAndLoadTokens()
if len(cachedTokens) > 7 {
log.Infof("using cached tokens")
}
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
@ -78,9 +99,18 @@ func TestCwtchPeerIntegration(t *testing.T) {
panic(err)
}
useCache := os.Getenv("TORCACHE") == "true"
torDataDir := ""
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
if useCache {
log.Infof("using tor cache")
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
os.MkdirAll(torDataDir, 0700)
} else {
log.Infof("using clean tor data dir")
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
@ -225,10 +255,14 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Alice has carol's name as '%v'\n", carolName)
// Group Testing
usedTokens := len(aliceLines)
// Simulate Alice Creating a Group
log.Infoln("Alice joining server...")
if _, err := alice.AddServer(string(serverKeyBundle)); err != nil {
if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil {
if len(cachedTokens) > len(aliceLines) {
alice.StoreCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)])
}
t.Fatalf("Failed to Add Server Bundle %v", err)
}
@ -260,6 +294,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = bob.ImportBundle(overlayMessage.Data)
log.Infof("Result of Bob Importing the Bundle from Alice: %v", err)
if len(cachedTokens) > (usedTokens + len(bobLines)) {
bob.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)])
usedTokens += len(bobLines)
}
log.Infof("Waiting for Bob to join connect to group server...")
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
@ -278,15 +316,17 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1])
// Pretend that Carol Aquires the Overlay Message through some other means...
// Pretend that Carol Acquires the Overlay Message through some other means...
json.Unmarshal([]byte(message), &overlayMessage)
log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = carol.ImportBundle(overlayMessage.Data)
log.Infof("Result of Carol Importing the Bundle from Alice: %v", err)
log.Infof("Waiting for Carol to join connect to group server...")
carolGroupConversationID := 3
if len(cachedTokens) > (usedTokens + len(carolLines)) {
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
}
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
// Check Alice Timeline
@ -354,7 +394,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)

View File

@ -128,6 +128,10 @@ func getTokens(bundle string) {
type Handler struct {
}
func (h Handler) PostingFailed(server string, sig []byte) {
}
func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) {
}