Allow using cached tokens for local integ testing #470
|
@ -145,11 +145,14 @@ func (app *application) DeletePeer(onion string, password string) {
|
|||
defer app.appmutex.Unlock()
|
||||
|
||||
if app.peers[onion].CheckPassword(password) {
|
||||
app.shutdownPeer(onion)
|
||||
// soft-shutdown
|
||||
app.peers[onion].Shutdown()
|
||||
// delete the underlying storage
|
||||
app.peers[onion].Delete()
|
||||
// hard shutdown / remove from app
|
||||
app.shutdownPeer(onion)
|
||||
|
||||
// Shutdown and Remove the Engine
|
||||
|
||||
log.Debugf("Delete peer for %v Done\n", onion)
|
||||
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
|
||||
return
|
||||
|
|
|
@ -219,6 +219,7 @@ const (
|
|||
RemotePeer = Field("RemotePeer")
|
||||
Ciphertext = Field("Ciphertext")
|
||||
Signature = Field("Signature")
|
||||
CachedTokens = Field("CachedTokens")
|
||||
PreviousSignature = Field("PreviousSignature")
|
||||
TimestampSent = Field("TimestampSent")
|
||||
TimestampReceived = Field("TimestampReceived")
|
||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module cwtch.im/cwtch
|
|||
go 1.17
|
||||
|
||||
require (
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.5.5
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.6.0
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.8.6
|
||||
git.openprivacy.ca/openprivacy/log v1.0.3
|
||||
github.com/gtank/ristretto255 v0.1.3-0.20210930101514-6bb39798585c
|
||||
|
|
4
go.sum
4
go.sum
|
@ -1,8 +1,8 @@
|
|||
filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
|
||||
filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek=
|
||||
filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.5.5 h1:km6UDrLYH/GCEn2s+S299/TiRHhxKCIAipYr9GbG3Hk=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.5.5/go.mod h1:bWWHrDYBtHvxMri59RwIB/w7Eg1aC0BrQ/ycKlnbB5k=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.6.0 h1:TtnKjxitkIDMM7Qn0n/u+mOHRLJzuQUYjYRu5n0/QFY=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.6.0/go.mod h1:iQIq4y7N+DuP3CxyG66WNEC/d6vzh+wXvvOmelB+KoY=
|
||||
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
|
||||
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.8.6 h1:g74PyDGvpMZ3+K0dXy3mlTJh+e0rcwNk0XF8owzkmOA=
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
|
@ -75,6 +76,17 @@ type cwtchPeer struct {
|
|||
eventBus event.Manager
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) {
|
||||
|
||||
ci, err := cp.FetchConversationInfo(tokenServer)
|
||||
if ci != nil && err == nil {
|
||||
// Overwrite any existing tokens..
|
||||
tokenPath := attr.LocalScope.ConstructScopedZonedPath(attr.ServerZone.ConstructZonedPath("tokens"))
|
||||
data, _ := json.Marshal(tokens)
|
||||
log.Debugf("storing cached tokens for %v", tokenServer)
|
||||
cp.SetConversationAttribute(ci.ID, tokenPath, string(data))
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) Export(file string) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
@ -87,42 +99,54 @@ func (cp *cwtchPeer) Delete() {
|
|||
cp.storage.Delete()
|
||||
}
|
||||
|
||||
// CheckPassword returns true if the given password can be used to derive the key that encrypts the underlying
|
||||
// cwtch storage database. Returns false otherwise.
|
||||
func (cp *cwtchPeer) CheckPassword(password string) bool {
|
||||
|
||||
// this lock is not really needed, but because we directly access cp.storage.ProfileDirectory
|
||||
// we keep it here.
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
// open *our* database with the given password (set createIfNotExists to false)
|
||||
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false)
|
||||
if db == nil || err != nil {
|
||||
// this will only fail in the rare cases that ProfileDirectory has been moved or deleted
|
||||
// it is actually a critical error, but far beyond the scope of Cwtch to deal with.
|
||||
return false
|
||||
}
|
||||
db.Close()
|
||||
// check that the storage object is valid (this will fail if the DB key is incorrect)
|
||||
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
|
||||
if err != nil {
|
||||
// this will error if any SQL queries fail, which will be the case if the profile is invalid.
|
||||
return false
|
||||
}
|
||||
// we have a valid database, close the storage (but don't purge as we may be using those conversations...)
|
||||
cps.Close(false)
|
||||
|
||||
// success!
|
||||
return true
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpasswordAgain string) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
db, err := openEncryptedDatabase(cp.storage.ProfileDirectory, password, false)
|
||||
if db == nil || err != nil {
|
||||
return errors.New(constants.InvalidPasswordError)
|
||||
}
|
||||
cps, err := NewCwtchProfileStorage(db, cp.storage.ProfileDirectory)
|
||||
if err != nil {
|
||||
return errors.New(constants.InvalidPasswordError)
|
||||
}
|
||||
cps.Close()
|
||||
if cp.CheckPassword(password) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
salt, err := os.ReadFile(path.Join(cp.storage.ProfileDirectory, saltFile))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// probably redundant but we like api safety
|
||||
if newpassword == newpasswordAgain {
|
||||
rekey := createKey(newpassword, salt)
|
||||
log.Infof("rekeying database...")
|
||||
return cp.storage.Rekey(rekey)
|
||||
// probably redundant but we like api safety
|
||||
if newpassword == newpasswordAgain {
|
||||
rekey := createKey(newpassword, salt)
|
||||
log.Infof("rekeying database...")
|
||||
return cp.storage.Rekey(rekey)
|
||||
}
|
||||
return errors.New(constants.PasswordsDoNotMatchError)
|
||||
}
|
||||
return errors.New(constants.PasswordsDoNotMatchError)
|
||||
return errors.New(constants.InvalidPasswordError)
|
||||
}
|
||||
|
||||
// GenerateProtocolEngine
|
||||
|
@ -920,7 +944,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
|||
if !exists {
|
||||
signature = base64.StdEncoding.EncodeToString([]byte{})
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
|
||||
|
||||
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
|
||||
if hasCachedTokens {
|
||||
log.Debugf("using cached tokens for %v", ci.Handle)
|
||||
}
|
||||
|
||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
|
||||
return nil
|
||||
}
|
||||
return errors.New("no keys found for server connection")
|
||||
|
@ -1008,7 +1038,7 @@ func (cp *cwtchPeer) Shutdown() {
|
|||
cp.shutdown = true
|
||||
cp.queue.Shutdown()
|
||||
if cp.storage != nil {
|
||||
cp.storage.Close()
|
||||
cp.storage.Close(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1395,6 +1425,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
|
|||
// by the given handle and attempts to mark the message as errored. returns error on failure
|
||||
// to either find the contact or the associated message
|
||||
func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, error string) error {
|
||||
|
||||
ci, err := cp.FetchConversationInfo(handle)
|
||||
// We should *never* received an error for a conversation that doesn't exist...
|
||||
if ci != nil && err == nil {
|
||||
|
|
|
@ -124,66 +124,77 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
|
|||
|
||||
insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
insertConversationStmt, err := db.Prepare(insertConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
|
||||
if err != nil {
|
||||
db.Close()
|
||||
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -771,12 +782,13 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
|
|||
}
|
||||
|
||||
// Close closes the underlying database and prepared statements
|
||||
func (cps *CwtchProfileStorage) Close() {
|
||||
func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) {
|
||||
dan marked this conversation as resolved
dan
commented
just asking, whats the future use of this planned to be? just asking, whats the future use of this planned to be?
sarah
commented
it's used now to differentiate between closing a concurrent database (after opening it to check the password - where we don't want to purge all temp messages) v.s. closing it to shutdown cwtch (where we do) it's used now to differentiate between closing a concurrent database (after opening it to check the password - where we don't want to purge all temp messages) v.s. closing it to shutdown cwtch (where we do)
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
if cps.db != nil {
|
||||
|
||||
cps.PurgeNonSavedMessages()
|
||||
if purgeAllNonSavedMessages {
|
||||
cps.PurgeNonSavedMessages()
|
||||
}
|
||||
|
||||
cps.insertProfileKeyValueStmt.Close()
|
||||
cps.selectProfileKeyValueStmt.Close()
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
)
|
||||
|
||||
|
@ -116,9 +117,17 @@ type CwtchPeer interface {
|
|||
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
|
||||
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
|
||||
|
||||
// File Sharing APIS
|
||||
// TODO move these to feature protected interfaces
|
||||
ShareFile(fileKey string, serializedManifest string)
|
||||
StopFileShare(fileKey string)
|
||||
StopAllFileShares()
|
||||
|
||||
// Server Token APIS
|
||||
// TODO move these to feature protected interfaces
|
||||
StoreCachedTokens(tokenServer string, tokens []*privacypass.Token)
|
||||
|
||||
// Profile Management
|
||||
CheckPassword(password string) bool
|
||||
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
|
||||
Export(file string) error
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -168,7 +169,16 @@ func (e *engine) eventHandler() {
|
|||
// will result in a full sync
|
||||
signature = []byte{}
|
||||
}
|
||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||
// if we have been sent cached tokens, also deserialize them
|
||||
cachedTokensJson := ev.Data[event.CachedTokens]
|
||||
var cachedTokens []*privacypass.Token
|
||||
if len(cachedTokensJson) != 0 {
|
||||
json.Unmarshal([]byte(cachedTokensJson), &cachedTokens)
|
||||
}
|
||||
|
||||
// create a new token handler...
|
||||
e.NewTokenHandler(ev.Data[event.ServerTokenOnion], cachedTokens)
|
||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature, cachedTokens)
|
||||
case event.MakeAntispamPayment:
|
||||
go e.makeAntispamPayment(ev.Data[event.GroupServer])
|
||||
case event.LeaveServer:
|
||||
|
@ -181,6 +191,8 @@ func (e *engine) eventHandler() {
|
|||
case event.SendMessageToGroup:
|
||||
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
|
||||
// launch a goroutine to post to the server
|
||||
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0)
|
||||
case event.SendMessageToPeer:
|
||||
// TODO: remove this passthrough once the UI is integrated.
|
||||
|
@ -366,7 +378,7 @@ func (e *engine) makeAntispamPayment(onion string) {
|
|||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
if ok {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, NewTokenManager())
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
|
||||
if tokenManager.NumTokens() < 5 {
|
||||
|
@ -378,7 +390,7 @@ func (e *engine) makeAntispamPayment(onion string) {
|
|||
|
||||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||
// needs to be run in a goroutine as will block on Open.
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte, cachedTokens []*privacypass.Token) {
|
||||
e.ephemeralServicesLock.Lock()
|
||||
_, exists := e.ephemeralServices[onion]
|
||||
|
||||
|
@ -480,14 +492,14 @@ func (e *engine) peerAuthed(onion string) {
|
|||
|
||||
func (e *engine) peerConnecting(onion string) {
|
||||
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
||||
event.RemotePeer: string(onion),
|
||||
event.RemotePeer: onion,
|
||||
event.ConnectionState: ConnectionStateName[CONNECTING],
|
||||
}))
|
||||
}
|
||||
|
||||
func (e *engine) serverConnecting(onion string) {
|
||||
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: string(onion),
|
||||
event.GroupServer: onion,
|
||||
event.ConnectionState: ConnectionStateName[CONNECTING],
|
||||
}))
|
||||
}
|
||||
|
@ -615,7 +627,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
|
|||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
if ok {
|
||||
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
|
||||
if spent, numtokens := tokenApp.Post(groupID, ct, sig); !spent {
|
||||
// we failed to post, probably because we ran out of tokens... so make a payment
|
||||
go tokenApp.PurchaseTokens()
|
||||
// backoff
|
||||
|
|
|
@ -3,6 +3,7 @@ package connections
|
|||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"encoding/base64"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"strconv"
|
||||
)
|
||||
|
@ -14,6 +15,11 @@ func (e *engine) GroupMessageHandler(server string, gm *groups.EncryptedGroupMes
|
|||
e.receiveGroupMessage(server, gm)
|
||||
}
|
||||
|
||||
// PostingFailed notifies a peer that a message failed to post
|
||||
func (e *engine) PostingFailed(group string, sig []byte) {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: group, event.Error: "failed to post message", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||
}
|
||||
|
||||
// ServerAuthedHandler is notified when a server has successfully authed
|
||||
func (e *engine) ServerAuthedHandler(server string) {
|
||||
e.serverAuthed(server)
|
||||
|
@ -31,15 +37,15 @@ func (e *engine) ServerClosedHandler(server string) {
|
|||
|
||||
// NewTokenHandler is notified after a successful token acquisition
|
||||
func (e *engine) NewTokenHandler(tokenService string, tokens []*privacypass.Token) {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
tokenManager.NewTokens(tokens)
|
||||
tokenManager.StoreNewTokens(tokens)
|
||||
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
|
||||
}
|
||||
|
||||
// FetchToken is notified when a server requires a new token from the client
|
||||
func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error) {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, new(TokenManager))
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
token, numTokens, err := tokenManager.FetchToken()
|
||||
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))
|
||||
|
|
|
@ -1,22 +1,34 @@
|
|||
package connections
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// TokenManager maintains a list of tokens associated with a single TokenServer
|
||||
type TokenManager struct {
|
||||
lock sync.Mutex
|
||||
tokens []*privacypass.Token
|
||||
tokens map[string]*privacypass.Token
|
||||
}
|
||||
|
||||
// NewTokens adds tokens to the internal list managed by this TokenManager
|
||||
func (tm *TokenManager) NewTokens(tokens []*privacypass.Token) {
|
||||
func NewTokenManager() *TokenManager {
|
||||
tm := new(TokenManager)
|
||||
tm.tokens = make(map[string]*privacypass.Token)
|
||||
return tm
|
||||
}
|
||||
|
||||
// StoreNewTokens adds tokens to the internal list managed by this TokenManager
|
||||
func (tm *TokenManager) StoreNewTokens(tokens []*privacypass.Token) {
|
||||
dan
commented
it's called 'store' here it's called 'store' here
|
||||
tm.lock.Lock()
|
||||
defer tm.lock.Unlock()
|
||||
tm.tokens = append(tm.tokens, tokens...)
|
||||
log.Debugf("acquired %v new tokens", tokens)
|
||||
for _, token := range tokens {
|
||||
serialized, _ := json.Marshal(token)
|
||||
tm.tokens[string(serialized)] = token
|
||||
}
|
||||
}
|
||||
|
||||
// NumTokens returns the current number of tokens
|
||||
|
@ -34,7 +46,9 @@ func (tm *TokenManager) FetchToken() (*privacypass.Token, int, error) {
|
|||
if len(tm.tokens) == 0 {
|
||||
return nil, 0, errors.New("no more tokens")
|
||||
}
|
||||
token := tm.tokens[0]
|
||||
tm.tokens = tm.tokens[1:]
|
||||
return token, len(tm.tokens), nil
|
||||
for serializedToken, token := range tm.tokens {
|
||||
dan marked this conversation as resolved
dan
commented
wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here wnh the change to this for range loop? the above code had a len == 0 check, curious what was tripping up here
sarah
commented
changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention) changed from a list to a map so we can preserve keys (and delete by key), can't access a map by an index (since all elements have a random index in go by convention)
|
||||
delete(tm.tokens, serializedToken)
|
||||
return token, len(tm.tokens), nil
|
||||
}
|
||||
return nil, 0, errors.New("no more tokens")
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"github.com/gtank/ristretto255"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
|
||||
|
@ -19,6 +20,7 @@ type TokenBoardHandler interface {
|
|||
ServerSyncedHandler(server string)
|
||||
ServerClosedHandler(server string)
|
||||
NewTokenHandler(tokenService string, tokens []*privacypass.Token)
|
||||
PostingFailed(server string, sig []byte)
|
||||
FetchToken(tokenService string) (*privacypass.Token, int, error)
|
||||
}
|
||||
|
||||
|
@ -46,6 +48,9 @@ type TokenBoardClient struct {
|
|||
tokenService *privacypass.TokenServer
|
||||
tokenServiceOnion string
|
||||
lastKnownSignature []byte
|
||||
|
||||
postLock sync.Mutex
|
||||
postQueue []groups.CachedEncryptedGroupMessage
|
||||
}
|
||||
|
||||
// NewInstance Client a new TokenBoardApp
|
||||
|
@ -61,17 +66,22 @@ func (ta *TokenBoardClient) NewInstance() tapir.Application {
|
|||
|
||||
// Init initializes the cryptographic TokenBoardApp
|
||||
func (ta *TokenBoardClient) Init(connection tapir.Connection) {
|
||||
// connection.Hostname is always valid because we are ALWAYS the initiating party
|
||||
log.Debugf("connecting to server: %v", connection.Hostname())
|
||||
ta.AuthApp.Init(connection)
|
||||
log.Debugf("server protocol complete: %v", connection.Hostname())
|
||||
if connection.HasCapability(applications.AuthCapability) {
|
||||
ta.connection = connection
|
||||
ta.tokenBoardHandler.ServerAuthedHandler(ta.connection.Hostname())
|
||||
log.Debugf("Successfully Initialized Connection to %v", connection.Hostname())
|
||||
ta.connection = connection
|
||||
ta.tokenBoardHandler.ServerAuthedHandler(connection.Hostname())
|
||||
go ta.Listen()
|
||||
// Optimistically acquire many tokens for this server...
|
||||
go ta.PurchaseTokens()
|
||||
go ta.PurchaseTokens()
|
||||
ta.Replay()
|
||||
} else {
|
||||
log.Debugf("Error Connecting to %v", connection.Hostname())
|
||||
ta.tokenBoardHandler.ServerClosedHandler(connection.Hostname())
|
||||
connection.Close()
|
||||
}
|
||||
}
|
||||
|
@ -107,7 +117,20 @@ func (ta *TokenBoardClient) Listen() {
|
|||
return
|
||||
}
|
||||
case groups.PostResultMessage:
|
||||
// TODO handle failure
|
||||
ta.postLock.Lock()
|
||||
egm := ta.postQueue[0]
|
||||
ta.postQueue = ta.postQueue[1:]
|
||||
ta.postLock.Unlock()
|
||||
if !message.PostResult.Success {
|
||||
log.Debugf("post result message: %v", message.PostResult)
|
||||
// Retry using another token
|
||||
posted, _ := ta.Post(egm.Group, egm.Ciphertext, egm.Signature)
|
||||
// if posting failed...
|
||||
if !posted {
|
||||
log.Errorf("error posting message")
|
||||
ta.tokenBoardHandler.PostingFailed(egm.Group, egm.Signature)
|
||||
}
|
||||
}
|
||||
case groups.ReplayResultMessage:
|
||||
if message.ReplayResult != nil {
|
||||
log.Debugf("Replaying %v Messages...", message.ReplayResult.NumMessages)
|
||||
|
@ -151,13 +174,17 @@ func (ta *TokenBoardClient) PurchaseTokens() {
|
|||
}
|
||||
|
||||
// Post sends a Post Request to the server
|
||||
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
|
||||
func (ta *TokenBoardClient) Post(group string, ct []byte, sig []byte) (bool, int) {
|
||||
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
|
||||
token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
|
||||
if err == nil {
|
||||
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
|
||||
ta.postLock.Lock()
|
||||
// ONLY put group in the EGM as a cache / for error reporting...
|
||||
ta.postQueue = append(ta.postQueue, groups.CachedEncryptedGroupMessage{Group: group, EncryptedGroupMessage: egm})
|
||||
log.Debugf("Message Length: %s %v", data, len(data))
|
||||
err := ta.connection.Send(data)
|
||||
ta.postLock.Unlock()
|
||||
if err != nil {
|
||||
return false, numTokens
|
||||
}
|
||||
|
|
|
@ -39,6 +39,12 @@ type EncryptedGroupMessage struct {
|
|||
Signature []byte
|
||||
}
|
||||
|
||||
// CachedEncryptedGroupMessage provides an encapsulation of the encrypted group message for local caching / error reporting
|
||||
type CachedEncryptedGroupMessage struct {
|
||||
EncryptedGroupMessage
|
||||
Group string
|
||||
}
|
||||
|
||||
// ToBytes converts the encrypted group message to a set of bytes for serialization
|
||||
func (egm EncryptedGroupMessage) ToBytes() []byte {
|
||||
data, _ := json.Marshal(egm)
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"cwtch.im/cwtch/protocol/connections"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mutecomm/go-sqlcipher/v4"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"testing"
|
||||
|
@ -51,6 +53,18 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
|
|||
}
|
||||
}
|
||||
|
||||
func checkAndLoadTokens() []*privacypass.Token {
|
||||
var tokens []*privacypass.Token
|
||||
data, err := os.ReadFile("../tokens")
|
||||
if err == nil {
|
||||
err := json.Unmarshal(data, &tokens)
|
||||
if err != nil {
|
||||
log.Errorf("could not load tokens from file")
|
||||
}
|
||||
}
|
||||
return tokens
|
||||
}
|
||||
|
||||
func TestCwtchPeerIntegration(t *testing.T) {
|
||||
|
||||
// Goroutine Monitoring Start..
|
||||
|
@ -62,6 +76,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
||||
log.ExcludeFromPattern("event/eventmanager")
|
||||
log.ExcludeFromPattern("tapir")
|
||||
|
||||
// checking if we should use the token cache
|
||||
cachedTokens := checkAndLoadTokens()
|
||||
if len(cachedTokens) > 7 {
|
||||
log.Infof("using cached tokens")
|
||||
}
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
dataDir := path.Join("tordir", "tor")
|
||||
os.MkdirAll(dataDir, 0700)
|
||||
|
@ -78,9 +99,18 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
useCache := os.Getenv("TORCACHE") == "true"
|
||||
|
||||
torDataDir := ""
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
if useCache {
|
||||
log.Infof("using tor cache")
|
||||
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
|
||||
os.MkdirAll(torDataDir, 0700)
|
||||
} else {
|
||||
log.Infof("using clean tor data dir")
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
}
|
||||
}
|
||||
|
||||
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
|
||||
|
@ -225,10 +255,14 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
log.Infof("Alice has carol's name as '%v'\n", carolName)
|
||||
|
||||
// Group Testing
|
||||
|
||||
usedTokens := len(aliceLines)
|
||||
// Simulate Alice Creating a Group
|
||||
log.Infoln("Alice joining server...")
|
||||
if _, err := alice.AddServer(string(serverKeyBundle)); err != nil {
|
||||
if serverOnion, err := alice.AddServer(string(serverKeyBundle)); err != nil {
|
||||
if len(cachedTokens) > len(aliceLines) {
|
||||
alice.StoreCachedTokens(serverOnion, cachedTokens[0:len(aliceLines)])
|
||||
}
|
||||
|
||||
t.Fatalf("Failed to Add Server Bundle %v", err)
|
||||
}
|
||||
|
||||
|
@ -260,6 +294,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
log.Infof("Parsed Overlay Message: %v", overlayMessage)
|
||||
err = bob.ImportBundle(overlayMessage.Data)
|
||||
log.Infof("Result of Bob Importing the Bundle from Alice: %v", err)
|
||||
if len(cachedTokens) > (usedTokens + len(bobLines)) {
|
||||
bob.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(bobLines)])
|
||||
usedTokens += len(bobLines)
|
||||
}
|
||||
|
||||
log.Infof("Waiting for Bob to join connect to group server...")
|
||||
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
|
||||
|
@ -278,15 +316,17 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
|
||||
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1])
|
||||
|
||||
// Pretend that Carol Aquires the Overlay Message through some other means...
|
||||
// Pretend that Carol Acquires the Overlay Message through some other means...
|
||||
json.Unmarshal([]byte(message), &overlayMessage)
|
||||
log.Infof("Parsed Overlay Message: %v", overlayMessage)
|
||||
err = carol.ImportBundle(overlayMessage.Data)
|
||||
log.Infof("Result of Carol Importing the Bundle from Alice: %v", err)
|
||||
log.Infof("Waiting for Carol to join connect to group server...")
|
||||
carolGroupConversationID := 3
|
||||
if len(cachedTokens) > (usedTokens + len(carolLines)) {
|
||||
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
|
||||
}
|
||||
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
|
||||
|
||||
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
|
||||
|
||||
// Check Alice Timeline
|
||||
|
@ -354,7 +394,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
|
||||
log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
|
||||
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
|
||||
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
|
||||
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
|
||||
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)
|
||||
|
||||
|
|
|
@ -128,6 +128,10 @@ func getTokens(bundle string) {
|
|||
type Handler struct {
|
||||
}
|
||||
|
||||
func (h Handler) PostingFailed(server string, sig []byte) {
|
||||
|
||||
}
|
||||
|
||||
func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) {
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
possibly StoreCachedTokens