Purge message history for not-saved conversation on Close + other review comments
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2021-11-23 12:17:11 -08:00
parent 6101e4e031
commit 6ab11fc929
6 changed files with 61 additions and 43 deletions

View File

@ -23,7 +23,8 @@ import (
type Functionality struct {
}
// FunctionalityGate returns contact.Functionality always
// FunctionalityGate returns filesharing if enabled in the given experiment map
// Note: Experiment maps are currently in libcwtch-go
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
if experimentMap["filesharing"] {
return new(Functionality), nil

View File

@ -0,0 +1,6 @@
package constants
const ServerPrefix = "server:"
const TofuBundlePrefix = "tofubundle:"
const GroupPrefix = "torv3"
const ImportBundlePrefix = "importBundle"

View File

@ -162,7 +162,6 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
// SendMessage is a higher level that merges sending messages to contacts and group handles
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
// this function will error
// Status: TODO
func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
@ -579,7 +578,8 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) {
}
log.Debugf("Got new key bundle %v", keyBundle.Serialize())
// TODO if the key bundle is incomplete then error out. In the future we may allow servers to attest to new
// if the key bundle is incomplete then error out.
// TODO In the future we may allow servers to attest to new
// keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups
// (that way we can be assured that the keybundle we store is a valid one)
if !keyBundle.HasKeyType(model.KeyTypeTokenOnion) || !keyBundle.HasKeyType(model.KeyTypeServerOnion) || !keyBundle.HasKeyType(model.KeyTypePrivacyPass) {
@ -666,8 +666,7 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
}
// InviteOnionToGroup kicks off the invite process
// Status: TODO
// SendInviteToConversation kicks off the invite process
func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) error {
var invite model.MessageWrapper
@ -739,50 +738,44 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
return cp.SendMessage(conversationID, string(inviteBytes))
}
const serverPrefix = "server:"
const tofuBundlePrefix = "tofubundle:"
const groupPrefix = "torv3"
const importBundlePrefix = "importBundle"
func (cp *cwtchPeer) ImportBundle(importString string) error {
if tor.IsValidHostname(importString) {
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
if err == nil {
return ConstructResponse(importBundlePrefix, "success")
return ConstructResponse(constants.ImportBundlePrefix, "success")
}
return ConstructResponse(importBundlePrefix, err.Error())
} else if strings.HasPrefix(importString, tofuBundlePrefix) {
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
} else if strings.HasPrefix(importString, constants.TofuBundlePrefix) {
bundle := strings.Split(importString, "||")
if len(bundle) == 2 {
err := cp.ImportBundle(bundle[0][len(tofuBundlePrefix):])
err := cp.ImportBundle(bundle[0][len(constants.TofuBundlePrefix):])
// if the server import failed then abort the whole process..
if err != nil && !strings.HasSuffix(err.Error(), "success") {
return ConstructResponse(importBundlePrefix, err.Error())
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
}
return cp.ImportBundle(bundle[1])
}
} else if strings.HasPrefix(importString, serverPrefix) {
} else if strings.HasPrefix(importString, constants.ServerPrefix) {
// Server Key Bundles are prefixed with
bundle, err := base64.StdEncoding.DecodeString(importString[len(serverPrefix):])
bundle, err := base64.StdEncoding.DecodeString(importString[len(constants.ServerPrefix):])
if err == nil {
if _, err = cp.AddServer(string(bundle)); err != nil {
return ConstructResponse(importBundlePrefix, err.Error())
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
}
return ConstructResponse(importBundlePrefix, "success")
return ConstructResponse(constants.ImportBundlePrefix, "success")
}
return ConstructResponse(importBundlePrefix, err.Error())
} else if strings.HasPrefix(importString, groupPrefix) {
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
} else if strings.HasPrefix(importString, constants.GroupPrefix) {
//eg: torv3JFDWkXExBsZLkjvfkkuAxHsiLGZBk0bvoeJID9ItYnU=EsEBCiBhOWJhZDU1OTQ0NWI3YmM2N2YxYTM5YjkzMTNmNTczNRIgpHeNaG+6jy750eDhwLO39UX4f2xs0irK/M3P6mDSYQIaOTJjM2ttb29ibnlnaGoyenc2cHd2N2Q1N3l6bGQ3NTNhdW8zdWdhdWV6enB2ZmFrM2FoYzRiZHlkCiJAdVSSVgsksceIfHe41OJu9ZFHO8Kwv3G6F5OK3Hw4qZ6hn6SiZjtmJlJezoBH0voZlCahOU7jCOg+dsENndZxAA==
if _, err := cp.ImportGroup(importString); err != nil {
return ConstructResponse(importBundlePrefix, err.Error())
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
}
return ConstructResponse(importBundlePrefix, "success")
return ConstructResponse(constants.ImportBundlePrefix, "success")
}
return ConstructResponse(importBundlePrefix, "invalid_group_invite_prefix")
return ConstructResponse(constants.ImportBundlePrefix, "invalid_group_invite_prefix")
}
// JoinServer manages a new server connection with the given onion address
// Status: TODO
func (cp *cwtchPeer) JoinServer(onion string) error {
ci, err := cp.FetchConversationInfo(onion)
if ci == nil || err != nil {
@ -804,18 +797,13 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
}
// ResyncServer completely tears down and resyncs a new server connection with the given onion address
// Status: TODO
func (cp *cwtchPeer) ResyncServer(onion string) error {
//if cp.GetContact(onion) != nil {
// tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
// tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
// if yExists && onionExists {
// signature := base64.StdEncoding.EncodeToString([]byte{})
// cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
// return nil
// }
//}
return errors.New("no keys found for server connection")
ci, err := cp.FetchConversationInfo(onion)
if ci == nil || err != nil {
return errors.New("no keys found for server connection")
}
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), base64.StdEncoding.EncodeToString([]byte{}))
return cp.JoinServer(onion)
}
// SendGetValToPeer
@ -878,7 +866,6 @@ func (cp *cwtchPeer) Shutdown() {
}
}
// Status: TODO
func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) error {
// TODO maybe atomize this?
ci, err := cp.FetchConversationInfo(handle)

View File

@ -1,6 +1,7 @@
package peer
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"database/sql"
@ -68,7 +69,7 @@ const insertConversationSQLStmt = `insert into conversations(Handle, Attributes,
const fetchAllConversationsSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations;`
const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where ID=(?);`
const selectConversationByHandleSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);`
const acceptedConversationSQLStmt = `update conversations set Accepted=true where ID=(?);`
const acceptConversationSQLStmt = `update conversations set Accepted=true where ID=(?);`
const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where ID=(?) ;`
const setConversationACLSQLStmt = `update conversations set ACL=(?) where ID=(?) ;`
const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
@ -82,6 +83,9 @@ const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Bo
// updateMessageIntoConversationSQLStmt is a template for updating attributes of a message in a conversation
const updateMessageIntoConversationSQLStmt = `update channel_%d_%d_chat set Attributes=(?) where ID=(?);`
// purgeMessagesFromConversationSQLStmt is a template for updating attributes of a message in a conversation
const purgeMessagesFromConversationSQLStmt = `delete from channel_%d_%d_chat;`
// getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation
const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);`
@ -141,9 +145,9 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
return nil, err
}
acceptConversationStmt, err := db.Prepare(acceptedConversationSQLStmt)
acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", acceptedConversationSQLStmt, err)
log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err)
return nil, err
}
@ -264,7 +268,7 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
return int(conversationID), nil
}
// GetConversationByHandle is a convienance method to fetch an active conversation by a handle
// GetConversationByHandle is a convenience method to fetch an active conversation by a handle
// Usage Notes: This should **only** be used to look up p2p conversations by convention.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently
// unavoidable in some circumstances because the event bus references conversations by handle, not by id)
@ -332,7 +336,7 @@ func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, err
}
}
// GetConversation looks up a particular conversation by handle
// GetConversation looks up a particular conversation by id
func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, error) {
rows, err := cps.selectConversationStmt.Query(id)
if err != nil {
@ -635,6 +639,25 @@ func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel
// Close closes the underlying database and prepared statements
func (cps *CwtchProfileStorage) Close() {
if cps.db != nil {
// Purge Messages that are not stored...
ci, err := cps.FetchConversations()
if err == nil {
for _, conversation := range ci {
if !conversation.IsGroup() && !conversation.IsServer() {
if conversation.Attributes[event.SaveHistoryKey] != event.SaveHistoryConfirmed {
log.Infof("purging conversation...")
// TODO: At some point in the future this needs to iterate over channels and make a decision for each on..
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation.ID, 0))
if err != nil {
log.Errorf("error executing transaction: %v", err)
}
conversationStmt.Exec()
}
}
}
}
cps.insertProfileKeyValueStmt.Close()
cps.selectProfileKeyValueStmt.Close()

View File

@ -15,7 +15,6 @@ import (
// This number is larger that the recommend chunk size of libsodium secretbox by an order of magnitude.
// Since this code is not performance-sensitive (and is unlikely to gain any significant performance benefit from
// cache-efficient chunking) this size isnt currently a concern.
// TODO: revise and evaluate better storage options after beta”
const (
fileStorePartitions = 128
bytesPerFile = 128 * 1024

View File

@ -120,6 +120,8 @@ func TestEncryptedStorage(t *testing.T) {
t.Fatalf("expeced GetMostRecentMessages to return 1, instead returned: %v %v", len(messages), messages)
}
app.Shutdown()
}
// Sub Test testing that Alice can add Bob, delete the conversation associated with Bob, and then add Bob again