Allow using cached tokens for local integ testing #470

Merged
sarah merged 5 commits from cached_tokens into master 2022-11-30 16:51:55 +00:00
14 changed files with 224 additions and 59 deletions

View File

@ -145,11 +145,14 @@ func (app *application) DeletePeer(onion string, password string) {
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
if app.peers[onion].CheckPassword(password) { if app.peers[onion].CheckPassword(password) {
app.shutdownPeer(onion) // soft-shutdown
app.peers[onion].Shutdown()
// delete the underlying storage
app.peers[onion].Delete() app.peers[onion].Delete()
// hard shutdown / remove from app
app.shutdownPeer(onion)
// Shutdown and Remove the Engine // Shutdown and Remove the Engine
log.Debugf("Delete peer for %v Done\n", onion) log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
return return

View File

@ -219,6 +219,7 @@ const (
RemotePeer = Field("RemotePeer") RemotePeer = Field("RemotePeer")
Ciphertext = Field("Ciphertext") Ciphertext = Field("Ciphertext")
Signature = Field("Signature") Signature = Field("Signature")
CachedTokens = Field("CachedTokens")
PreviousSignature = Field("PreviousSignature") PreviousSignature = Field("PreviousSignature")
TimestampSent = Field("TimestampSent") TimestampSent = Field("TimestampSent")
TimestampReceived = Field("TimestampReceived") TimestampReceived = Field("TimestampReceived")

2
go.mod
View File

@ -3,7 +3,7 @@ module cwtch.im/cwtch
go 1.17 go 1.17
require ( require (
git.openprivacy.ca/cwtch.im/tapir v0.5.5 git.openprivacy.ca/cwtch.im/tapir v0.6.0
git.openprivacy.ca/openprivacy/connectivity v1.8.6 git.openprivacy.ca/openprivacy/connectivity v1.8.6
git.openprivacy.ca/openprivacy/log v1.0.3 git.openprivacy.ca/openprivacy/log v1.0.3
github.com/gtank/ristretto255 v0.1.3-0.20210930101514-6bb39798585c github.com/gtank/ristretto255 v0.1.3-0.20210930101514-6bb39798585c

4
go.sum
View File

@ -1,8 +1,8 @@
filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek= filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek=
filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
git.openprivacy.ca/cwtch.im/tapir v0.5.5 h1:km6UDrLYH/GCEn2s+S299/TiRHhxKCIAipYr9GbG3Hk= git.openprivacy.ca/cwtch.im/tapir v0.6.0 h1:TtnKjxitkIDMM7Qn0n/u+mOHRLJzuQUYjYRu5n0/QFY=
git.openprivacy.ca/cwtch.im/tapir v0.5.5/go.mod h1:bWWHrDYBtHvxMri59RwIB/w7Eg1aC0BrQ/ycKlnbB5k= git.openprivacy.ca/cwtch.im/tapir v0.6.0/go.mod h1:iQIq4y7N+DuP3CxyG66WNEC/d6vzh+wXvvOmelB+KoY=
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
git.openprivacy.ca/openprivacy/connectivity v1.8.6 h1:g74PyDGvpMZ3+K0dXy3mlTJh+e0rcwNk0XF8owzkmOA= git.openprivacy.ca/openprivacy/connectivity v1.8.6 h1:g74PyDGvpMZ3+K0dXy3mlTJh+e0rcwNk0XF8owzkmOA=

View File

@ -10,6 +10,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives" "git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
@ -75,6 +76,17 @@ type cwtchPeer struct {
eventBus event.Manager eventBus event.Manager
} }
func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) {
ci, err := cp.FetchConversationInfo(tokenServer)
if ci != nil && err == nil {
// Overwrite any existing tokens..
tokenPath := attr.LocalScope.ConstructScopedZonedPath(attr.ServerZone.ConstructZonedPath("tokens"))
data, _ := json.Marshal(tokens)
log.Debugf("storing cached tokens for %v", tokenServer)
cp.SetConversationAttribute(ci.ID, tokenPath, string(data))
}
}
func (cp *cwtchPeer) Export(file string) error { func (cp *cwtchPeer) Export(file string) error {
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
@ -87,42 +99,54 @@ func (cp *cwtchPeer) Delete() {
cp.storage.Delete() cp.storage.Delete()
} }
// CheckPassword returns true if the given password can be used to derive the key that encrypts the underlying
// cwtch storage database. Returns false otherwise.
func (cp *cwtchPeer) CheckPassword(password string) bool { func (cp *cwtchPeer) CheckPassword(password string) bool {
// this lock is not really needed, but because we directly access cp.storage.ProfileDirectory
// we keep it here.
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
// open *our* database with the given password (set createIfNotExists to false)
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false) db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false)
if db == nil || err != nil { if db == nil || err != nil {
// this will only fail in the rare cases that ProfileDirectory has been moved or deleted
// it is actually a critical error, but far beyond the scope of Cwtch to deal with.
return false return false
} }
db.Close() // check that the storage object is valid (this will fail if the DB key is incorrect)
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
if err != nil {
// this will error if any SQL queries fail, which will be the case if the profile is invalid.
return false
}
// we have a valid database, close the storage (but don't purge as we may be using those conversations...)
cps.Close(false)
// success!
return true return true
} }
func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpasswordAgain string) error { func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpasswordAgain string) error {
cp.mutex.Lock() if cp.CheckPassword(password) {
defer cp.mutex.Unlock() cp.mutex.Lock()
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false) defer cp.mutex.Unlock()
if db == nil || err != nil {
return errors.New(constants.InvalidPasswordError)
}
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
if err != nil {
return errors.New(constants.InvalidPasswordError)
}
cps.Close()
salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile)) salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile))
if err != nil { if err != nil {
return err return err
} }
// probably redundant but we like api safety // probably redundant but we like api safety
if newpassword == newpasswordAgain { if newpassword == newpasswordAgain {
rekey := createKey(newpassword, salt) rekey := createKey(newpassword, salt)
log.Infof("rekeying database...") log.Infof("rekeying database...")
return cp.storage.Rekey(rekey) return cp.storage.Rekey(rekey)
}
return errors.New(constants.PasswordsDoNotMatchError)
} }
return errors.New(constants.PasswordsDoNotMatchError) return errors.New(constants.InvalidPasswordError)
} }
// GenerateProtocolEngine // GenerateProtocolEngine
@ -920,7 +944,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
if !exists { if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{}) signature = base64.StdEncoding.EncodeToString([]byte{})
} }
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil return nil
} }
return errors.New("no keys found for server connection") return errors.New("no keys found for server connection")
@ -1008,7 +1038,7 @@ func (cp *cwtchPeer) Shutdown() {
cp.shutdown = true cp.shutdown = true
cp.queue.Shutdown() cp.queue.Shutdown()
if cp.storage != nil { if cp.storage != nil {
cp.storage.Close() cp.storage.Close(true)
} }
} }
@ -1395,6 +1425,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
// by the given handle and attempts to mark the message as errored. returns error on failure // by the given handle and attempts to mark the message as errored. returns error on failure
// to either find the contact or the associated message // to either find the contact or the associated message
func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error { func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error {
ci, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
// We should *never* received an error for a conversation that doesn't exist... // We should *never* received an error for a conversation that doesn't exist...
if ci != nil && err == nil { if ci != nil && err == nil {

View File

@ -124,66 +124,77 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt) insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err) log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err)
return nil, err return nil, err
} }
selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt) selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err) log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err)
return nil, err return nil, err
} }
findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt) findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err) log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err)
return nil, err return nil, err
} }
insertConversationStmt, err := db.Prepare(insertConversationSQLStmt) insertConversationStmt, err := db.Prepare(insertConversationSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err) log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err)
return nil, err return nil, err
} }
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt) fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err) log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
return nil, err return nil, err
} }
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt) selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err) log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
return nil, err return nil, err
} }
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt) selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err) log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
return nil, err return nil, err
} }
acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt) acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err) log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err)
return nil, err return nil, err
} }
deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt) deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err) log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err)
return nil, err return nil, err
} }
setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt) setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err) log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err)
return nil, err return nil, err
} }
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt) setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
if err != nil { if err != nil {
db.Close()
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err) log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
return nil, err return nil, err
} }
@ -771,12 +782,13 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
} }
// Close closes the underlying database and prepared statements // Close closes the underlying database and prepared statements
func (cps *CwtchProfileStorage) Close() { func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) {
dan marked this conversation as resolved
Review

just asking, whats the future use of this planned to be?

just asking, whats the future use of this planned to be?
Review

it's used now to differentiate between closing a concurrent database (after opening it to check the password - where we don't want to purge all temp messages) v.s. closing it to shutdown cwtch (where we do)

it's used now to differentiate between closing a concurrent database (after opening it to check the password - where we don't want to purge all temp messages) v.s. closing it to shutdown cwtch (where we do)
cps.mutex.Lock() cps.mutex.Lock()
defer cps.mutex.Unlock() defer cps.mutex.Unlock()
if cps.db != nil { if cps.db != nil {
if purgeAllNonSavedMessages {
cps.PurgeNonSavedMessages() cps.PurgeNonSavedMessages()
}
cps.insertProfileKeyValueStmt.Close() cps.insertProfileKeyValueStmt.Close()
cps.selectProfileKeyValueStmt.Close() cps.selectProfileKeyValueStmt.Close()

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
) )
@ -116,9 +117,17 @@ type CwtchPeer interface {
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
// File Sharing APIS
// TODO move these to feature protected interfaces
ShareFile(fileKey string, serializedManifest string) ShareFile(fileKey string, serializedManifest string)
StopFileShare(fileKey string) StopFileShare(fileKey string)
StopAllFileShares() StopAllFileShares()
// Server Token APIS
// TODO move these to feature protected interfaces
StoreCachedTokens(tokenServer string, tokens []*privacypass.Token)
// Profile Management
CheckPassword(password string) bool CheckPassword(password string) bool
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
Export(file string) error Export(file string) error

View File

@ -4,6 +4,7 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -168,7 +169,16 @@ func (e *engine) eventHandler() {
// will result in a full sync // will result in a full sync
signature = []byte{} signature = []byte{}
} }
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) // if we have been sent cached tokens, also deserialize them
cachedTokensJson := ev.Data[event.CachedTokens]
var cachedTokens []*privacypass.Token
if len(cachedTokensJson) != 0 {
json.Unmarshal([]byte(cachedTokensJson), &cachedTokens)
}
// create a new token handler...
e.NewTokenHandler(ev.Data[event.ServerTokenOnion], cachedTokens)
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature, cachedTokens)
case event.MakeAntispamPayment: case event.MakeAntispamPayment:
go e.makeAntispamPayment(ev.Data[event.GroupServer]) go e.makeAntispamPayment(ev.Data[event.GroupServer])
case event.LeaveServer: case event.LeaveServer:
@ -181,6 +191,8 @@ func (e *engine) eventHandler() {
case event.SendMessageToGroup: case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
// launch a goroutine to post to the server
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0)
case event.SendMessageToPeer: case event.SendMessageToPeer:
// TODO: remove this passthrough once the UI is integrated. // TODO: remove this passthrough once the UI is integrated.
@ -366,7 +378,7 @@ func (e *engine) makeAntispamPayment(onion string) {
if err == nil { if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient) tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok { if ok {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager) tokenManager := tokenManagerPointer.(*TokenManager)
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
if tokenManager.NumTokens() < 5 { if tokenManager.NumTokens() < 5 {
@ -378,7 +390,7 @@ func (e *engine) makeAntispamPayment(onion string) {
// peerWithTokenServer is the entry point for cwtchPeer - server relationships // peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open. // needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte, cachedTokens []*privacypass.Token) {
e.ephemeralServicesLock.Lock() e.ephemeralServicesLock.Lock()
_, exists := e.ephemeralServices[onion] _, exists := e.ephemeralServices[onion]
@ -480,14 +492,14 @@ func (e *engine) peerAuthed(onion string) {
func (e *engine) peerConnecting(onion string) { func (e *engine) peerConnecting(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion), event.RemotePeer: onion,
event.ConnectionState: ConnectionStateName[CONNECTING], event.ConnectionState: ConnectionStateName[CONNECTING],
})) }))
} }
func (e *engine) serverConnecting(onion string) { func (e *engine) serverConnecting(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(onion), event.GroupServer: onion,
event.ConnectionState: ConnectionStateName[CONNECTING], event.ConnectionState: ConnectionStateName[CONNECTING],
})) }))
} }
@ -615,7 +627,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
if err == nil { if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient) tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok { if ok {
if spent, numtokens := tokenApp.Post(ct, sig); !spent { if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent {
// we failed to post, probably because we ran out of tokens... so make a payment // we failed to post, probably because we ran out of tokens... so make a payment
go tokenApp.PurchaseTokens() go tokenApp.PurchaseTokens()
// backoff // backoff

View File

@ -3,6 +3,7 @@ package connections
import ( import (
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/groups" "cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"strconv" "strconv"
) )
@ -14,6 +15,11 @@ func (e *engine) GroupMessageHandler(server string, gm *groups.EncryptedGroupMes
e.receiveGroupMessage(server, gm) e.receiveGroupMessage(server, gm)
} }
// PostingFailed notifies a peer that a message failed to post
func (e *engine) PostingFailed(group string, sig []byte) {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: group, event.Error: "failed to post message", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
// ServerAuthedHandler is notified when a server has successfully authed // ServerAuthedHandler is notified when a server has successfully authed
func (e *engine) ServerAuthedHandler(server string) { func (e *engine) ServerAuthedHandler(server string) {
e.serverAuthed(server) e.serverAuthed(server)
@ -31,15 +37,15 @@ func (e *engine) ServerClosedHandler(server string) {
// NewTokenHandler is notified after a successful token acquisition // NewTokenHandler is notified after a successful token acquisition
func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) { func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager)) tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager) tokenManager := tokenManagerPointer.(*TokenManager)
tokenManager.NewTokens(tokens) tokenManager.StoreNewTokens(tokens)
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())})) e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
} }
// FetchToken is notified when a server requires a new token from the client // FetchToken is notified when a server requires a new token from the client
func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) { func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager)) tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager) tokenManager := tokenManagerPointer.(*TokenManager)
token, numTokens, err := tokenManager.FetchToken() token, numTokens, err := tokenManager.FetchToken()
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)})) e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))

View File

@ -1,22 +1,34 @@
package connections package connections
import ( import (
"encoding/json"
"errors" "errors"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/log"
"sync" "sync"
) )
// TokenManager maintains a list of tokens associated with a single TokenServer // TokenManager maintains a list of tokens associated with a single TokenServer
type TokenManager struct { type TokenManager struct {
lock sync.Mutex lock sync.Mutex
tokens []*privacypass.Token tokens map[string]*privacypass.Token
} }
// NewTokens adds tokens to the internal list managed by this TokenManager func NewTokenManager() *TokenManager {
func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) { tm := new(TokenManager)
tm.tokens = make(map[string]*privacypass.Token)
return tm
}
// StoreNewTokens adds tokens to the internal list managed by this TokenManager
func (tm *TokenManager) StoreNewTokens(tokens []*privacypass.Token) {
Review

it's called 'store' here

it's called 'store' here
tm.lock.Lock() tm.lock.Lock()
defer tm.lock.Unlock() defer tm.lock.Unlock()
tm.tokens = append(tm.tokens, tokens...) log.Debugf("acquired %v new tokens", tokens)
for _, token := range tokens {
serialized, _ := json.Marshal(token)
tm.tokens[string(serialized)] = token
}
} }
// NumTokens returns the current number of tokens // NumTokens returns the current number of tokens
@ -34,7 +46,9 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) {
if len(tm.tokens) == 0 { if len(tm.tokens) == 0 {
return nil, 0, errors.New("no more tokens") return nil, 0, errors.New("no more tokens")
} }
token := tm.tokens[0] for serializedToken, token := range tm.tokens {
dan marked this conversation as resolved
Review

wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here

wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here
Review

changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention)

changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention)
tm.tokens = tm.tokens[1:] delete(tm.tokens, serializedToken)
return token, len(tm.tokens), nil return token, len(tm.tokens), nil
}
return nil, 0, errors.New("no more tokens")
} }

View File

@ -9,6 +9,7 @@ import (
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255" "github.com/gtank/ristretto255"
"sync"
) )
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board // TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
@ -19,6 +20,7 @@ type TokenBoardHandler interface {
ServerSyncedHandler(server string) ServerSyncedHandler(server string)
ServerClosedHandler(server string) ServerClosedHandler(server string)
NewTokenHandler(tokenService string, tokens []*privacypass.Token) NewTokenHandler(tokenService string, tokens []*privacypass.Token)
PostingFailed(server string, sig []byte)
FetchToken(tokenService string) (*privacypass.Token, int, error) FetchToken(tokenService string) (*privacypass.Token, int, error)
} }
@ -46,6 +48,9 @@ type TokenBoardClient struct {
tokenService *privacypass.TokenServer tokenService *privacypass.TokenServer
tokenServiceOnion string tokenServiceOnion string
lastKnownSignature []byte lastKnownSignature []byte
postLock sync.Mutex
postQueue []groups.CachedEncryptedGroupMessage
} }
// NewInstance Client a new TokenBoardApp // NewInstance Client a new TokenBoardApp
@ -61,17 +66,22 @@ func (ta *TokenBoardClient) NewInstance() tapir.Application {
// Init initializes the cryptographic TokenBoardApp // Init initializes the cryptographic TokenBoardApp
func (ta *TokenBoardClient) Init(connection tapir.Connection) { func (ta *TokenBoardClient) Init(connection tapir.Connection) {
// connection.Hostname is always valid because we are ALWAYS the initiating party
log.Debugf("connecting to server: %v", connection.Hostname())
ta.AuthApp.Init(connection) ta.AuthApp.Init(connection)
log.Debugf("server protocol complete: %v", connection.Hostname())
if connection.HasCapability(applications.AuthCapability) { if connection.HasCapability(applications.AuthCapability) {
ta.connection = connection
ta.tokenBoardHandler.ServerAuthedHandler(ta.connection.Hostname())
log.Debugf("Successfully Initialized Connection to %v", connection.Hostname()) log.Debugf("Successfully Initialized Connection to %v", connection.Hostname())
ta.connection = connection
ta.tokenBoardHandler.ServerAuthedHandler(connection.Hostname())
go ta.Listen() go ta.Listen()
// Optimistically acquire many tokens for this server... // Optimistically acquire many tokens for this server...
go ta.PurchaseTokens() go ta.PurchaseTokens()
go ta.PurchaseTokens() go ta.PurchaseTokens()
ta.Replay() ta.Replay()
} else { } else {
log.Debugf("Error Connecting to %v", connection.Hostname())
ta.tokenBoardHandler.ServerClosedHandler(connection.Hostname())
connection.Close() connection.Close()
} }
} }
@ -107,7 +117,20 @@ func (ta *TokenBoardClient) Listen() {
return return
} }
case groups.PostResultMessage: case groups.PostResultMessage:
// TODO handle failure ta.postLock.Lock()
egm := ta.postQueue[0]
ta.postQueue = ta.postQueue[1:]
ta.postLock.Unlock()
if !message.PostResult.Success {
log.Debugf("post result message: %v", message.PostResult)
// Retry using another token
posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature)
// if posting failed...
if !posted {
log.Errorf("error posting message")
ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature)
}
}
case groups.ReplayResultMessage: case groups.ReplayResultMessage:
if message.ReplayResult != nil { if message.ReplayResult != nil {
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages) log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
@ -151,13 +174,17 @@ func (ta *TokenBoardClient) PurchaseTokens() {
} }
// Post sends a Post Request to the server // Post sends a Post Request to the server
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) { func (ta *TokenBoardClient) Post(group string, ct []byte, sig []byte) (bool, int) {
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig} egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
if err == nil { if err == nil {
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}}) data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
ta.postLock.Lock()
// ONLY put group in the EGM as a cache / for error reporting...
ta.postQueue = append(ta.postQueue, groups.CachedEncryptedGroupMessage{Group: group, EncryptedGroupMessage: egm})
log.Debugf("Message Length: %s %v", data, len(data)) log.Debugf("Message Length: %s %v", data, len(data))
err := ta.connection.Send(data) err := ta.connection.Send(data)
ta.postLock.Unlock()
if err != nil { if err != nil {
return false, numTokens return false, numTokens
} }

View File

@ -39,6 +39,12 @@ type EncryptedGroupMessage struct {
Signature []byte Signature []byte
} }
// CachedEncryptedGroupMessage provides an encapsulation of the encrypted group message for local caching / error reporting
type CachedEncryptedGroupMessage struct {
EncryptedGroupMessage
Group string
}
// ToBytes converts the encrypted group message to a set of bytes for serialization // ToBytes converts the encrypted group message to a set of bytes for serialization
func (egm EncryptedGroupMessage) ToBytes() []byte { func (egm EncryptedGroupMessage) ToBytes() []byte {
data, _ := json.Marshal(egm) data, _ := json.Marshal(egm)

View File

@ -12,6 +12,7 @@ import (
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4" _ "github.com/mutecomm/go-sqlcipher/v4"
@ -19,6 +20,7 @@ import (
"os" "os"
"os/user" "os/user"
"path" "path"
"path/filepath"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"testing" "testing"
@ -51,6 +53,18 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
} }
} }
func checkAndLoadTokens() []*privacypass.Token {
var tokens []*privacypass.Token
data, err := os.ReadFile("../tokens")
if err == nil {
err := json.Unmarshal(data, &tokens)
if err != nil {
log.Errorf("could not load tokens from file")
}
}
return tokens
}
func TestCwtchPeerIntegration(t *testing.T) { func TestCwtchPeerIntegration(t *testing.T) {
// Goroutine Monitoring Start.. // Goroutine Monitoring Start..
@ -62,6 +76,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.ExcludeFromPattern("outbound/3dhauthchannel") log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager") log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("tapir") log.ExcludeFromPattern("tapir")
// checking if we should use the token cache
cachedTokens := checkAndLoadTokens()
if len(cachedTokens) > 7 {
log.Infof("using cached tokens")
}
os.Mkdir("tordir", 0700) os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor") dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700) os.MkdirAll(dataDir, 0700)
@ -78,9 +99,18 @@ func TestCwtchPeerIntegration(t *testing.T) {
panic(err) panic(err)
} }
useCache := os.Getenv("TORCACHE") == "true"
torDataDir := "" torDataDir := ""
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { if useCache {
t.Fatalf("could not create data dir") log.Infof("using tor cache")
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
os.MkdirAll(torDataDir, 0700)
} else {
log.Infof("using clean tor data dir")
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
} }
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
@ -225,10 +255,14 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Alice has carol's name as '%v'\n", carolName) log.Infof("Alice has carol's name as '%v'\n", carolName)
// Group Testing // Group Testing
usedTokens := len(aliceLines)
// Simulate Alice Creating a Group // Simulate Alice Creating a Group
log.Infoln("Alice joining server...") log.Infoln("Alice joining server...")
if _, err := alice.AddServer(string(serverKeyBundle)); err != nil { if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil {
if len(cachedTokens) > len(aliceLines) {
alice.StoreCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)])
}
t.Fatalf("Failed to Add Server Bundle %v", err) t.Fatalf("Failed to Add Server Bundle %v", err)
} }
@ -260,6 +294,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Parsed Overlay Message: %v", overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = bob.ImportBundle(overlayMessage.Data) err = bob.ImportBundle(overlayMessage.Data)
log.Infof("Result of Bob Importing the Bundle from Alice: %v", err) log.Infof("Result of Bob Importing the Bundle from Alice: %v", err)
if len(cachedTokens) > (usedTokens + len(bobLines)) {
bob.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)])
usedTokens += len(bobLines)
}
log.Infof("Waiting for Bob to join connect to group server...") log.Infof("Waiting for Bob to join connect to group server...")
waitForConnection(t, bob, ServerAddr, connections.SYNCED) waitForConnection(t, bob, ServerAddr, connections.SYNCED)
@ -278,15 +316,17 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1])
// Pretend that Carol Aquires the Overlay Message through some other means... // Pretend that Carol Acquires the Overlay Message through some other means...
json.Unmarshal([]byte(message), &overlayMessage) json.Unmarshal([]byte(message), &overlayMessage)
log.Infof("Parsed Overlay Message: %v", overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = carol.ImportBundle(overlayMessage.Data) err = carol.ImportBundle(overlayMessage.Data)
log.Infof("Result of Carol Importing the Bundle from Alice: %v", err) log.Infof("Result of Carol Importing the Bundle from Alice: %v", err)
log.Infof("Waiting for Carol to join connect to group server...") log.Infof("Waiting for Carol to join connect to group server...")
carolGroupConversationID := 3 carolGroupConversationID := 3
if len(cachedTokens) > (usedTokens + len(carolLines)) {
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
}
waitForConnection(t, carol, ServerAddr, connections.SYNCED) waitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine() numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
// Check Alice Timeline // Check Alice Timeline
@ -354,7 +394,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)

View File

@ -128,6 +128,10 @@ func getTokens(bundle string) {
type Handler struct { type Handler struct {
} }
func (h Handler) PostingFailed(server string, sig []byte) {
}
func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) { func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) {
} }