Allow using cached tokens for local integ testing #470
|
@ -145,11 +145,14 @@ func (app *application) DeletePeer(onion string, password string) {
|
|||
defer app.appmutex.Unlock()
|
||||
|
||||
if app.peers[onion].CheckPassword(password) {
|
||||
app.shutdownPeer(onion)
|
||||
// soft-shutdown
|
||||
app.peers[onion].Shutdown()
|
||||
// delete the underlying storage
|
||||
app.peers[onion].Delete()
|
||||
// hard shutdown / remove from app
|
||||
app.shutdownPeer(onion)
|
||||
|
||||
// Shutdown and Remove the Engine
|
||||
|
||||
log.Debugf("Delete peer for %v Done\n", onion)
|
||||
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
|
||||
return
|
||||
|
|
|
@ -219,6 +219,7 @@ const (
|
|||
RemotePeer = Field("RemotePeer")
|
||||
Ciphertext = Field("Ciphertext")
|
||||
Signature = Field("Signature")
|
||||
CachedTokens = Field("CachedTokens")
|
||||
PreviousSignature = Field("PreviousSignature")
|
||||
TimestampSent = Field("TimestampSent")
|
||||
TimestampReceived = Field("TimestampReceived")
|
||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module cwtch.im/cwtch
|
|||
go 1.17
|
||||
|
||||
require (
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.5.5
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.6.0
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.8.6
|
||||
git.openprivacy.ca/openprivacy/log v1.0.3
|
||||
github.com/gtank/ristretto255 v0.1.3-0.20210930101514-6bb39798585c
|
||||
|
|
4
go.sum
4
go.sum
|
@ -1,8 +1,8 @@
|
|||
filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
|
||||
filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek=
|
||||
filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.5.5 h1:km6UDrLYH/GCEn2s+S299/TiRHhxKCIAipYr9GbG3Hk=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.5.5/go.mod h1:bWWHrDYBtHvxMri59RwIB/w7Eg1aC0BrQ/ycKlnbB5k=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.6.0 h1:TtnKjxitkIDMM7Qn0n/u+mOHRLJzuQUYjYRu5n0/QFY=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.6.0/go.mod h1:iQIq4y7N+DuP3CxyG66WNEC/d6vzh+wXvvOmelB+KoY=
|
||||
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
|
||||
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.8.6 h1:g74PyDGvpMZ3+K0dXy3mlTJh+e0rcwNk0XF8owzkmOA=
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
|
@ -75,6 +76,17 @@ type cwtchPeer struct {
|
|||
eventBus event.Manager
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) {
|
||||
ci, err := cp.FetchConversationInfo(tokenServer)
|
||||
if ci != nil && err == nil {
|
||||
// Overwrite any existing tokens..
|
||||
tokenPath := attr.LocalScope.ConstructScopedZonedPath(attr.ServerZone.ConstructZonedPath("tokens"))
|
||||
data, _ := json.Marshal(tokens)
|
||||
log.Debugf("storing cached tokens for %v", tokenServer)
|
||||
cp.SetConversationAttribute(ci.ID, tokenPath, string(data))
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) Export(file string) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
@ -87,42 +99,54 @@ func (cp *cwtchPeer) Delete() {
|
|||
cp.storage.Delete()
|
||||
}
|
||||
|
||||
// CheckPassword returns true if the given password can be used to derive the key that encrypts the underlying
|
||||
// cwtch storage database. Returns false otherwise.
|
||||
func (cp *cwtchPeer) CheckPassword(password string) bool {
|
||||
|
||||
// this lock is not really needed, but because we directly access cp.storage.ProfileDirectory
|
||||
// we keep it here.
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
// open *our* database with the given password (set createIfNotExists to false)
|
||||
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false)
|
||||
if db == nil || err != nil {
|
||||
// this will only fail in the rare cases that ProfileDirectory has been moved or deleted
|
||||
// it is actually a critical error, but far beyond the scope of Cwtch to deal with.
|
||||
return false
|
||||
}
|
||||
db.Close()
|
||||
// check that the storage object is valid (this will fail if the DB key is incorrect)
|
||||
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
|
||||
if err != nil {
|
||||
// this will error if any SQL queries fail, which will be the case if the profile is invalid.
|
||||
return false
|
||||
}
|
||||
// we have a valid database, close the storage (but don't purge as we may be using those conversations...)
|
||||
cps.Close(false)
|
||||
|
||||
// success!
|
||||
return true
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpasswordAgain string) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false)
|
||||
if db == nil || err != nil {
|
||||
return errors.New(constants.InvalidPasswordError)
|
||||
}
|
||||
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
|
||||
if err != nil {
|
||||
return errors.New(constants.InvalidPasswordError)
|
||||
}
|
||||
cps.Close()
|
||||
if cp.CheckPassword(password) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// probably redundant but we like api safety
|
||||
if newpassword == newpasswordAgain {
|
||||
rekey := createKey(newpassword, salt)
|
||||
log.Infof("rekeying database...")
|
||||
return cp.storage.Rekey(rekey)
|
||||
// probably redundant but we like api safety
|
||||
if newpassword == newpasswordAgain {
|
||||
rekey := createKey(newpassword, salt)
|
||||
log.Infof("rekeying database...")
|
||||
return cp.storage.Rekey(rekey)
|
||||
}
|
||||
return errors.New(constants.PasswordsDoNotMatchError)
|
||||
}
|
||||
return errors.New(constants.PasswordsDoNotMatchError)
|
||||
return errors.New(constants.InvalidPasswordError)
|
||||
}
|
||||
|
||||
// GenerateProtocolEngine
|
||||
|
@ -920,7 +944,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
|||
if !exists {
|
||||
signature = base64.StdEncoding.EncodeToString([]byte{})
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
|
||||
|
||||
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
|
||||
if hasCachedTokens {
|
||||
log.Debugf("using cached tokens for %v", ci.Handle)
|
||||
}
|
||||
|
||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
|
||||
return nil
|
||||
}
|
||||
return errors.New("no keys found for server connection")
|
||||
|
@ -1008,7 +1038,7 @@ func (cp *cwtchPeer) Shutdown() {
|
|||
cp.shutdown = true
|
||||
cp.queue.Shutdown()
|
||||
if cp.storage != nil {
|
||||
cp.storage.Close()
|
||||
cp.storage.Close(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1395,6 +1425,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
|
|||
// by the given handle and attempts to mark the message as errored. returns error on failure
|
||||
// to either find the contact or the associated message
|
||||
func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error {
|
||||
|
||||
ci, err := cp.FetchConversationInfo(handle)
|
||||
// We should *never* received an error for a conversation that doesn't exist...
|
||||
if ci != nil && err == nil {
|
||||
|
|
|
@ -124,66 +124,77 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
|
|||
|
||||
insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
insertConversationStmt, err := db.Prepare(insertConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -771,12 +782,13 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
|
|||
}
|
||||
|
||||
// Close closes the underlying database and prepared statements
|
||||
func (cps *CwtchProfileStorage) Close() {
|
||||
func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) {
|
||||
dan marked this conversation as resolved
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
if cps.db != nil {
|
||||
|
||||
cps.PurgeNonSavedMessages()
|
||||
if purgeAllNonSavedMessages {
|
||||
cps.PurgeNonSavedMessages()
|
||||
}
|
||||
|
||||
cps.insertProfileKeyValueStmt.Close()
|
||||
cps.selectProfileKeyValueStmt.Close()
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
)
|
||||
|
||||
|
@ -116,9 +117,17 @@ type CwtchPeer interface {
|
|||
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
|
||||
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
|
||||
|
||||
// File Sharing APIS
|
||||
// TODO move these to feature protected interfaces
|
||||
ShareFile(fileKey string, serializedManifest string)
|
||||
StopFileShare(fileKey string)
|
||||
StopAllFileShares()
|
||||
|
||||
// Server Token APIS
|
||||
// TODO move these to feature protected interfaces
|
||||
StoreCachedTokens(tokenServer string, tokens []*privacypass.Token)
|
||||
|
||||
// Profile Management
|
||||
CheckPassword(password string) bool
|
||||
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
|
||||
Export(file string) error
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -168,7 +169,16 @@ func (e *engine) eventHandler() {
|
|||
// will result in a full sync
|
||||
signature = []byte{}
|
||||
}
|
||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||
// if we have been sent cached tokens, also deserialize them
|
||||
cachedTokensJson := ev.Data[event.CachedTokens]
|
||||
var cachedTokens []*privacypass.Token
|
||||
if len(cachedTokensJson) != 0 {
|
||||
json.Unmarshal([]byte(cachedTokensJson), &cachedTokens)
|
||||
}
|
||||
|
||||
// create a new token handler...
|
||||
e.NewTokenHandler(ev.Data[event.ServerTokenOnion], cachedTokens)
|
||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature, cachedTokens)
|
||||
case event.MakeAntispamPayment:
|
||||
go e.makeAntispamPayment(ev.Data[event.GroupServer])
|
||||
case event.LeaveServer:
|
||||
|
@ -181,6 +191,8 @@ func (e *engine) eventHandler() {
|
|||
case event.SendMessageToGroup:
|
||||
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
|
||||
// launch a goroutine to post to the server
|
||||
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0)
|
||||
case event.SendMessageToPeer:
|
||||
// TODO: remove this passthrough once the UI is integrated.
|
||||
|
@ -366,7 +378,7 @@ func (e *engine) makeAntispamPayment(onion string) {
|
|||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
if ok {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, NewTokenManager())
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
|
||||
if tokenManager.NumTokens() < 5 {
|
||||
|
@ -378,7 +390,7 @@ func (e *engine) makeAntispamPayment(onion string) {
|
|||
|
||||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||
// needs to be run in a goroutine as will block on Open.
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte, cachedTokens []*privacypass.Token) {
|
||||
e.ephemeralServicesLock.Lock()
|
||||
_, exists := e.ephemeralServices[onion]
|
||||
|
||||
|
@ -480,14 +492,14 @@ func (e *engine) peerAuthed(onion string) {
|
|||
|
||||
func (e *engine) peerConnecting(onion string) {
|
||||
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
||||
event.RemotePeer: string(onion),
|
||||
event.RemotePeer: onion,
|
||||
event.ConnectionState: ConnectionStateName[CONNECTING],
|
||||
}))
|
||||
}
|
||||
|
||||
func (e *engine) serverConnecting(onion string) {
|
||||
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: string(onion),
|
||||
event.GroupServer: onion,
|
||||
event.ConnectionState: ConnectionStateName[CONNECTING],
|
||||
}))
|
||||
}
|
||||
|
@ -615,7 +627,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
|
|||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
if ok {
|
||||
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
|
||||
if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent {
|
||||
// we failed to post, probably because we ran out of tokens... so make a payment
|
||||
go tokenApp.PurchaseTokens()
|
||||
// backoff
|
||||
|
|
|
@ -3,6 +3,7 @@ package connections
|
|||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"encoding/base64"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"strconv"
|
||||
)
|
||||
|
@ -14,6 +15,11 @@ func (e *engine) GroupMessageHandler(server string, gm *groups.EncryptedGroupMes
|
|||
e.receiveGroupMessage(server, gm)
|
||||
}
|
||||
|
||||
// PostingFailed notifies a peer that a message failed to post
|
||||
func (e *engine) PostingFailed(group string, sig []byte) {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: group, event.Error: "failed to post message", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||
}
|
||||
|
||||
// ServerAuthedHandler is notified when a server has successfully authed
|
||||
func (e *engine) ServerAuthedHandler(server string) {
|
||||
e.serverAuthed(server)
|
||||
|
@ -31,15 +37,15 @@ func (e *engine) ServerClosedHandler(server string) {
|
|||
|
||||
// NewTokenHandler is notified after a successful token acquisition
|
||||
func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
tokenManager.NewTokens(tokens)
|
||||
tokenManager.StoreNewTokens(tokens)
|
||||
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
|
||||
}
|
||||
|
||||
// FetchToken is notified when a server requires a new token from the client
|
||||
func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
token, numTokens, err := tokenManager.FetchToken()
|
||||
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))
|
||||
|
|
|
@ -1,22 +1,34 @@
|
|||
package connections
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// TokenManager maintains a list of tokens associated with a single TokenServer
|
||||
type TokenManager struct {
|
||||
lock sync.Mutex
|
||||
tokens []*privacypass.Token
|
||||
tokens map[string]*privacypass.Token
|
||||
}
|
||||
|
||||
// NewTokens adds tokens to the internal list managed by this TokenManager
|
||||
func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) {
|
||||
func NewTokenManager() *TokenManager {
|
||||
tm := new(TokenManager)
|
||||
tm.tokens = make(map[string]*privacypass.Token)
|
||||
return tm
|
||||
}
|
||||
|
||||
// StoreNewTokens adds tokens to the internal list managed by this TokenManager
|
||||
func (tm *TokenManager) StoreNewTokens(tokens []*privacypass.Token) {
|
||||
dan
commented
it's called 'store' here it's called 'store' here
|
||||
tm.lock.Lock()
|
||||
defer tm.lock.Unlock()
|
||||
tm.tokens = append(tm.tokens, tokens...)
|
||||
log.Debugf("acquired %v new tokens", tokens)
|
||||
for _, token := range tokens {
|
||||
serialized, _ := json.Marshal(token)
|
||||
tm.tokens[string(serialized)] = token
|
||||
}
|
||||
}
|
||||
|
||||
// NumTokens returns the current number of tokens
|
||||
|
@ -34,7 +46,9 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) {
|
|||
if len(tm.tokens) == 0 {
|
||||
return nil, 0, errors.New("no more tokens")
|
||||
}
|
||||
token := tm.tokens[0]
|
||||
tm.tokens = tm.tokens[1:]
|
||||
return token, len(tm.tokens), nil
|
||||
for serializedToken, token := range tm.tokens {
|
||||
dan marked this conversation as resolved
dan
commented
wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here
sarah
commented
changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention) changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention)
|
||||
delete(tm.tokens, serializedToken)
|
||||
return token, len(tm.tokens), nil
|
||||
}
|
||||
return nil, 0, errors.New("no more tokens")
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"github.com/gtank/ristretto255"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
|
||||
|
@ -19,6 +20,7 @@ type TokenBoardHandler interface {
|
|||
ServerSyncedHandler(server string)
|
||||
ServerClosedHandler(server string)
|
||||
NewTokenHandler(tokenService string, tokens []*privacypass.Token)
|
||||
PostingFailed(server string, sig []byte)
|
||||
FetchToken(tokenService string) (*privacypass.Token, int, error)
|
||||
}
|
||||
|
||||
|
@ -46,6 +48,9 @@ type TokenBoardClient struct {
|
|||
tokenService *privacypass.TokenServer
|
||||
tokenServiceOnion string
|
||||
lastKnownSignature []byte
|
||||
|
||||
postLock sync.Mutex
|
||||
postQueue []groups.CachedEncryptedGroupMessage
|
||||
}
|
||||
|
||||
// NewInstance Client a new TokenBoardApp
|
||||
|
@ -61,17 +66,22 @@ func (ta *TokenBoardClient) NewInstance() tapir.Application {
|
|||
|
||||
// Init initializes the cryptographic TokenBoardApp
|
||||
func (ta *TokenBoardClient) Init(connection tapir.Connection) {
|
||||
// connection.Hostname is always valid because we are ALWAYS the initiating party
|
||||
log.Debugf("connecting to server: %v", connection.Hostname())
|
||||
ta.AuthApp.Init(connection)
|
||||
log.Debugf("server protocol complete: %v", connection.Hostname())
|
||||
if connection.HasCapability(applications.AuthCapability) {
|
||||
ta.connection = connection
|
||||
ta.tokenBoardHandler.ServerAuthedHandler(ta.connection.Hostname())
|
||||
log.Debugf("Successfully Initialized Connection to %v", connection.Hostname())
|
||||
ta.connection = connection
|
||||
ta.tokenBoardHandler.ServerAuthedHandler(connection.Hostname())
|
||||
go ta.Listen()
|
||||
// Optimistically acquire many tokens for this server...
|
||||
go ta.PurchaseTokens()
|
||||
go ta.PurchaseTokens()
|
||||
ta.Replay()
|
||||
} else {
|
||||
log.Debugf("Error Connecting to %v", connection.Hostname())
|
||||
ta.tokenBoardHandler.ServerClosedHandler(connection.Hostname())
|
||||
connection.Close()
|
||||
}
|
||||
}
|
||||
|
@ -107,7 +117,20 @@ func (ta *TokenBoardClient) Listen() {
|
|||
return
|
||||
}
|
||||
case groups.PostResultMessage:
|
||||
// TODO handle failure
|
||||
ta.postLock.Lock()
|
||||
egm := ta.postQueue[0]
|
||||
ta.postQueue = ta.postQueue[1:]
|
||||
ta.postLock.Unlock()
|
||||
if !message.PostResult.Success {
|
||||
log.Debugf("post result message: %v", message.PostResult)
|
||||
// Retry using another token
|
||||
posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature)
|
||||
// if posting failed...
|
||||
if !posted {
|
||||
log.Errorf("error posting message")
|
||||
ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature)
|
||||
}
|
||||
}
|
||||
case groups.ReplayResultMessage:
|
||||
if message.ReplayResult != nil {
|
||||
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
|
||||
|
@ -151,13 +174,17 @@ func (ta *TokenBoardClient) PurchaseTokens() {
|
|||
}
|
||||
|
||||
// Post sends a Post Request to the server
|
||||
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
|
||||
func (ta *TokenBoardClient) Post(group string, ct []byte, sig []byte) (bool, int) {
|
||||
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
|
||||
token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
|
||||
if err == nil {
|
||||
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
|
||||
ta.postLock.Lock()
|
||||
// ONLY put group in the EGM as a cache / for error reporting...
|
||||
ta.postQueue = append(ta.postQueue, groups.CachedEncryptedGroupMessage{Group: group, EncryptedGroupMessage: egm})
|
||||
log.Debugf("Message Length: %s %v", data, len(data))
|
||||
err := ta.connection.Send(data)
|
||||
ta.postLock.Unlock()
|
||||
if err != nil {
|
||||
return false, numTokens
|
||||
}
|
||||
|
|
|
@ -39,6 +39,12 @@ type EncryptedGroupMessage struct {
|
|||
Signature []byte
|
||||
}
|
||||
|
||||
// CachedEncryptedGroupMessage provides an encapsulation of the encrypted group message for local caching / error reporting
|
||||
type CachedEncryptedGroupMessage struct {
|
||||
EncryptedGroupMessage
|
||||
Group string
|
||||
}
|
||||
|
||||
// ToBytes converts the encrypted group message to a set of bytes for serialization
|
||||
func (egm EncryptedGroupMessage) ToBytes() []byte {
|
||||
data, _ := json.Marshal(egm)
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"cwtch.im/cwtch/protocol/connections"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mutecomm/go-sqlcipher/v4"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"testing"
|
||||
|
@ -51,6 +53,18 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
|
|||
}
|
||||
}
|
||||
|
||||
func checkAndLoadTokens() []*privacypass.Token {
|
||||
var tokens []*privacypass.Token
|
||||
data, err := os.ReadFile("../tokens")
|
||||
if err == nil {
|
||||
err := json.Unmarshal(data, &tokens)
|
||||
if err != nil {
|
||||
log.Errorf("could not load tokens from file")
|
||||
}
|
||||
}
|
||||
return tokens
|
||||
}
|
||||
|
||||
func TestCwtchPeerIntegration(t *testing.T) {
|
||||
|
||||
// Goroutine Monitoring Start..
|
||||
|
@ -62,6 +76,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
||||
log.ExcludeFromPattern("event/eventmanager")
|
||||
log.ExcludeFromPattern("tapir")
|
||||
|
||||
// checking if we should use the token cache
|
||||
cachedTokens := checkAndLoadTokens()
|
||||
if len(cachedTokens) > 7 {
|
||||
log.Infof("using cached tokens")
|
||||
}
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
dataDir := path.Join("tordir", "tor")
|
||||
os.MkdirAll(dataDir, 0700)
|
||||
|
@ -78,9 +99,18 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
useCache := os.Getenv("TORCACHE") == "true"
|
||||
|
||||
torDataDir := ""
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
if useCache {
|
||||
log.Infof("using tor cache")
|
||||
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
|
||||
os.MkdirAll(torDataDir, 0700)
|
||||
} else {
|
||||
log.Infof("using clean tor data dir")
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
}
|
||||
}
|
||||
|
||||
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
|
||||
|
@ -225,10 +255,14 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
log.Infof("Alice has carol's name as '%v'\n", carolName)
|
||||
|
||||
// Group Testing
|
||||
|
||||
usedTokens := len(aliceLines)
|
||||
// Simulate Alice Creating a Group
|
||||
log.Infoln("Alice joining server...")
|
||||
if _, err := alice.AddServer(string(serverKeyBundle)); err != nil {
|
||||
if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil {
|
||||
if len(cachedTokens) > len(aliceLines) {
|
||||
alice.StoreCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)])
|
||||
}
|
||||
|
||||
t.Fatalf("Failed to Add Server Bundle %v", err)
|
||||
}
|
||||
|
||||
|
@ -260,6 +294,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
log.Infof("Parsed Overlay Message: %v", overlayMessage)
|
||||
err = bob.ImportBundle(overlayMessage.Data)
|
||||
log.Infof("Result of Bob Importing the Bundle from Alice: %v", err)
|
||||
if len(cachedTokens) > (usedTokens + len(bobLines)) {
|
||||
bob.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)])
|
||||
usedTokens += len(bobLines)
|
||||
}
|
||||
|
||||
log.Infof("Waiting for Bob to join connect to group server...")
|
||||
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
|
||||
|
@ -278,15 +316,17 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
|
||||
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1])
|
||||
|
||||
// Pretend that Carol Aquires the Overlay Message through some other means...
|
||||
// Pretend that Carol Acquires the Overlay Message through some other means...
|
||||
json.Unmarshal([]byte(message), &overlayMessage)
|
||||
log.Infof("Parsed Overlay Message: %v", overlayMessage)
|
||||
err = carol.ImportBundle(overlayMessage.Data)
|
||||
log.Infof("Result of Carol Importing the Bundle from Alice: %v", err)
|
||||
log.Infof("Waiting for Carol to join connect to group server...")
|
||||
carolGroupConversationID := 3
|
||||
if len(cachedTokens) > (usedTokens + len(carolLines)) {
|
||||
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
|
||||
}
|
||||
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
|
||||
|
||||
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
|
||||
|
||||
// Check Alice Timeline
|
||||
|
@ -354,7 +394,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
|
||||
log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
|
||||
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
|
||||
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
|
||||
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
|
||||
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)
|
||||
|
||||
|
|
|
@ -128,6 +128,10 @@ func getTokens(bundle string) {
|
|||
type Handler struct {
|
||||
}
|
||||
|
||||
func (h Handler) PostingFailed(server string, sig []byte) {
|
||||
|
||||
}
|
||||
|
||||
func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) {
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
just asking, whats the future use of this planned to be?
it's used now to differentiate between closing a concurrent database (after opening it to check the password - where we don't want to purge all temp messages) v.s. closing it to shutdown cwtch (where we do)