diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 4747297..c767352 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -23,7 +23,8 @@ import ( type Functionality struct { } -// FunctionalityGate returns contact.Functionality always +// FunctionalityGate returns filesharing if enabled in the given experiment map +// Note: Experiment maps are currently in libcwtch-go func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) { if experimentMap["filesharing"] { return new(Functionality), nil diff --git a/model/constants/bundles.go b/model/constants/bundles.go new file mode 100644 index 0000000..47e911e --- /dev/null +++ b/model/constants/bundles.go @@ -0,0 +1,6 @@ +package constants + +const ServerPrefix = "server:" +const TofuBundlePrefix = "tofubundle:" +const GroupPrefix = "torv3" +const ImportBundlePrefix = "importBundle" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index b2bb6af..c4c07a5 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -162,7 +162,6 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k // SendMessage is a higher level that merges sending messages to contacts and group handles // If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then // this function will error -// Status: TODO func (cp *cwtchPeer) SendMessage(conversation int, message string) error { cp.mutex.Lock() defer cp.mutex.Unlock() @@ -579,7 +578,8 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { } log.Debugf("Got new key bundle %v", keyBundle.Serialize()) - // TODO if the key bundle is incomplete then error out. In the future we may allow servers to attest to new + // if the key bundle is incomplete then error out. + // TODO In the future we may allow servers to attest to new // keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups // (that way we can be assured that the keybundle we store is a valid one) if !keyBundle.HasKeyType(model.KeyTypeTokenOnion) || !keyBundle.HasKeyType(model.KeyTypeServerOnion) || !keyBundle.HasKeyType(model.KeyTypePrivacyPass) { @@ -666,8 +666,7 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) { cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) } -// InviteOnionToGroup kicks off the invite process -// Status: TODO +// SendInviteToConversation kicks off the invite process func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) error { var invite model.MessageWrapper @@ -739,50 +738,44 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa return cp.SendMessage(conversationID, string(inviteBytes)) } -const serverPrefix = "server:" -const tofuBundlePrefix = "tofubundle:" -const groupPrefix = "torv3" -const importBundlePrefix = "importBundle" - func (cp *cwtchPeer) ImportBundle(importString string) error { if tor.IsValidHostname(importString) { _, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true) if err == nil { - return ConstructResponse(importBundlePrefix, "success") + return ConstructResponse(constants.ImportBundlePrefix, "success") } - return ConstructResponse(importBundlePrefix, err.Error()) - } else if strings.HasPrefix(importString, tofuBundlePrefix) { + return ConstructResponse(constants.ImportBundlePrefix, err.Error()) + } else if strings.HasPrefix(importString, constants.TofuBundlePrefix) { bundle := strings.Split(importString, "||") if len(bundle) == 2 { - err := cp.ImportBundle(bundle[0][len(tofuBundlePrefix):]) + err := cp.ImportBundle(bundle[0][len(constants.TofuBundlePrefix):]) // if the server import failed then abort the whole process.. if err != nil && !strings.HasSuffix(err.Error(), "success") { - return ConstructResponse(importBundlePrefix, err.Error()) + return ConstructResponse(constants.ImportBundlePrefix, err.Error()) } return cp.ImportBundle(bundle[1]) } - } else if strings.HasPrefix(importString, serverPrefix) { + } else if strings.HasPrefix(importString, constants.ServerPrefix) { // Server Key Bundles are prefixed with - bundle, err := base64.StdEncoding.DecodeString(importString[len(serverPrefix):]) + bundle, err := base64.StdEncoding.DecodeString(importString[len(constants.ServerPrefix):]) if err == nil { if _, err = cp.AddServer(string(bundle)); err != nil { - return ConstructResponse(importBundlePrefix, err.Error()) + return ConstructResponse(constants.ImportBundlePrefix, err.Error()) } - return ConstructResponse(importBundlePrefix, "success") + return ConstructResponse(constants.ImportBundlePrefix, "success") } - return ConstructResponse(importBundlePrefix, err.Error()) - } else if strings.HasPrefix(importString, groupPrefix) { + return ConstructResponse(constants.ImportBundlePrefix, err.Error()) + } else if strings.HasPrefix(importString, constants.GroupPrefix) { //eg: torv3JFDWkXExBsZLkjvfkkuAxHsiLGZBk0bvoeJID9ItYnU=EsEBCiBhOWJhZDU1OTQ0NWI3YmM2N2YxYTM5YjkzMTNmNTczNRIgpHeNaG+6jy750eDhwLO39UX4f2xs0irK/M3P6mDSYQIaOTJjM2ttb29ibnlnaGoyenc2cHd2N2Q1N3l6bGQ3NTNhdW8zdWdhdWV6enB2ZmFrM2FoYzRiZHlkCiJAdVSSVgsksceIfHe41OJu9ZFHO8Kwv3G6F5OK3Hw4qZ6hn6SiZjtmJlJezoBH0voZlCahOU7jCOg+dsENndZxAA== if _, err := cp.ImportGroup(importString); err != nil { - return ConstructResponse(importBundlePrefix, err.Error()) + return ConstructResponse(constants.ImportBundlePrefix, err.Error()) } - return ConstructResponse(importBundlePrefix, "success") + return ConstructResponse(constants.ImportBundlePrefix, "success") } - return ConstructResponse(importBundlePrefix, "invalid_group_invite_prefix") + return ConstructResponse(constants.ImportBundlePrefix, "invalid_group_invite_prefix") } // JoinServer manages a new server connection with the given onion address -// Status: TODO func (cp *cwtchPeer) JoinServer(onion string) error { ci, err := cp.FetchConversationInfo(onion) if ci == nil || err != nil { @@ -804,18 +797,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error { } // ResyncServer completely tears down and resyncs a new server connection with the given onion address -// Status: TODO func (cp *cwtchPeer) ResyncServer(onion string) error { - //if cp.GetContact(onion) != nil { - // tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass)) - // tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion)) - // if yExists && onionExists { - // signature := base64.StdEncoding.EncodeToString([]byte{}) - // cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature})) - // return nil - // } - //} - return errors.New("no keys found for server connection") + ci, err := cp.FetchConversationInfo(onion) + if ci == nil || err != nil { + return errors.New("no keys found for server connection") + } + cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), base64.StdEncoding.EncodeToString([]byte{})) + return cp.JoinServer(onion) } // SendGetValToPeer @@ -878,7 +866,6 @@ func (cp *cwtchPeer) Shutdown() { } } -// Status: TODO func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) error { // TODO maybe atomize this? ci, err := cp.FetchConversationInfo(handle) diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index e9e2db6..567a40c 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -1,6 +1,7 @@ package peer import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "database/sql" @@ -68,7 +69,7 @@ const insertConversationSQLStmt = `insert into conversations(Handle, Attributes, const fetchAllConversationsSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations;` const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where ID=(?);` const selectConversationByHandleSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);` -const acceptedConversationSQLStmt = `update conversations set Accepted=true where ID=(?);` +const acceptConversationSQLStmt = `update conversations set Accepted=true where ID=(?);` const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where ID=(?) ;` const setConversationACLSQLStmt = `update conversations set ACL=(?) where ID=(?) ;` const deleteConversationSQLStmt = `delete from conversations where ID=(?);` @@ -82,6 +83,9 @@ const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Bo // updateMessageIntoConversationSQLStmt is a template for updating attributes of a message in a conversation const updateMessageIntoConversationSQLStmt = `update channel_%d_%d_chat set Attributes=(?) where ID=(?);` +// purgeMessagesFromConversationSQLStmt is a template for updating attributes of a message in a conversation +const purgeMessagesFromConversationSQLStmt = `delete from channel_%d_%d_chat;` + // getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);` @@ -141,9 +145,9 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS return nil, err } - acceptConversationStmt, err := db.Prepare(acceptedConversationSQLStmt) + acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt) if err != nil { - log.Errorf("error preparing query: %v %v", acceptedConversationSQLStmt, err) + log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err) return nil, err } @@ -264,7 +268,7 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model. return int(conversationID), nil } -// GetConversationByHandle is a convienance method to fetch an active conversation by a handle +// GetConversationByHandle is a convenience method to fetch an active conversation by a handle // Usage Notes: This should **only** be used to look up p2p conversations by convention. // Ideally this function should not exist, and all lookups should happen by ID (this is currently // unavoidable in some circumstances because the event bus references conversations by handle, not by id) @@ -332,7 +336,7 @@ func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, err } } -// GetConversation looks up a particular conversation by handle +// GetConversation looks up a particular conversation by id func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, error) { rows, err := cps.selectConversationStmt.Query(id) if err != nil { @@ -635,6 +639,25 @@ func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel // Close closes the underlying database and prepared statements func (cps *CwtchProfileStorage) Close() { if cps.db != nil { + + // Purge Messages that are not stored... + ci, err := cps.FetchConversations() + if err == nil { + for _, conversation := range ci { + if !conversation.IsGroup() && !conversation.IsServer() { + if conversation.Attributes[event.SaveHistoryKey] != event.SaveHistoryConfirmed { + log.Infof("purging conversation...") + // TODO: At some point in the future this needs to iterate over channels and make a decision for each on.. + conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation.ID, 0)) + if err != nil { + log.Errorf("error executing transaction: %v", err) + } + conversationStmt.Exec() + } + } + } + } + cps.insertProfileKeyValueStmt.Close() cps.selectProfileKeyValueStmt.Close() diff --git a/storage/v1/stream_store.go b/storage/v1/stream_store.go index 9ec8e99..75ec293 100644 --- a/storage/v1/stream_store.go +++ b/storage/v1/stream_store.go @@ -15,7 +15,6 @@ import ( // This number is larger that the recommend chunk size of libsodium secretbox by an order of magnitude. // Since this code is not performance-sensitive (and is unlikely to gain any significant performance benefit from // cache-efficient chunking) this size isn’t currently a concern. -// TODO: revise and evaluate better storage options after beta” const ( fileStorePartitions = 128 bytesPerFile = 128 * 1024 diff --git a/testing/encryptedstorage/encrypted_storage_integration_test.go b/testing/encryptedstorage/encrypted_storage_integration_test.go index a2f6682..e63cd54 100644 --- a/testing/encryptedstorage/encrypted_storage_integration_test.go +++ b/testing/encryptedstorage/encrypted_storage_integration_test.go @@ -120,6 +120,8 @@ func TestEncryptedStorage(t *testing.T) { t.Fatalf("expeced GetMostRecentMessages to return 1, instead returned: %v %v", len(messages), messages) } + app.Shutdown() + } // Sub Test testing that Alice can add Bob, delete the conversation associated with Bob, and then add Bob again