2021-11-09 23:47:33 +00:00
package peer
import (
2022-03-08 21:45:26 +00:00
"archive/tar"
"compress/gzip"
2021-11-23 20:17:11 +00:00
"cwtch.im/cwtch/event"
2021-11-09 23:47:33 +00:00
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"database/sql"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
2022-03-08 21:45:26 +00:00
"io"
2021-11-19 19:49:04 +00:00
"os"
2022-03-08 21:45:26 +00:00
"path/filepath"
"strings"
2022-08-26 20:54:48 +00:00
"sync"
2021-11-09 23:47:33 +00:00
)
// StorageKeyType is an interface wrapper around storage key types
type StorageKeyType string
const (
// TypeAttribute for Profile Scoped and Zoned Attributes
TypeAttribute = StorageKeyType ( "Attribute" )
// TypePrivateKey for Profile Private Keys
TypePrivateKey = StorageKeyType ( "PrivateKey" )
// TypePublicKey for Profile Public Keys
TypePublicKey = StorageKeyType ( "PublicKey" )
)
// CwtchProfileStorage encapsulates common datastore requests so as to not pollute the main cwtch profile
// struct with database knowledge
type CwtchProfileStorage struct {
// Note: Statements are thread safe..
2022-08-26 20:54:48 +00:00
mutex sync . Mutex
2021-11-09 23:47:33 +00:00
// Profile related statements
insertProfileKeyValueStmt * sql . Stmt
selectProfileKeyValueStmt * sql . Stmt
2022-07-05 22:31:44 +00:00
findProfileKeySQLStmt * sql . Stmt
2021-11-09 23:47:33 +00:00
// Conversation related statements
2021-11-10 22:28:52 +00:00
insertConversationStmt * sql . Stmt
fetchAllConversationsStmt * sql . Stmt
selectConversationStmt * sql . Stmt
selectConversationByHandleStmt * sql . Stmt
acceptConversationStmt * sql . Stmt
deleteConversationStmt * sql . Stmt
setConversationAttributesStmt * sql . Stmt
2021-11-17 22:34:13 +00:00
setConversationACLStmt * sql . Stmt
2021-11-09 23:47:33 +00:00
2021-11-18 23:43:58 +00:00
channelInsertStmts map [ ChannelID ] * sql . Stmt
channelUpdateMessageStmts map [ ChannelID ] * sql . Stmt
channelGetMessageStmts map [ ChannelID ] * sql . Stmt
channelGetMessageBySignatureStmts map [ ChannelID ] * sql . Stmt
channelGetCountStmts map [ ChannelID ] * sql . Stmt
channelGetMostRecentMessagesStmts map [ ChannelID ] * sql . Stmt
channelGetMessageByContentHashStmts map [ ChannelID ] * sql . Stmt
2021-11-23 22:26:11 +00:00
channelRowNumberStmts map [ ChannelID ] * sql . Stmt
2021-11-19 19:49:04 +00:00
ProfileDirectory string
db * sql . DB
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
// ChannelID encapsulates the data necessary to reference a channel structure.
2021-11-09 23:47:33 +00:00
type ChannelID struct {
Conversation int
Channel int
}
2021-11-25 22:34:47 +00:00
const insertProfileKeySQLStmt = ` insert or replace into profile_kv(KeyType, KeyName, KeyValue) values(?,?,?); `
2021-11-09 23:47:33 +00:00
const selectProfileKeySQLStmt = ` select KeyValue from profile_kv where KeyType=(?) and KeyName=(?); `
2022-07-05 22:31:44 +00:00
const findProfileKeySQLStmt = ` select KeyName from profile_kv where KeyType=(?) and KeyName LIKE (?); `
2021-11-09 23:47:33 +00:00
const insertConversationSQLStmt = ` insert into conversations(Handle, Attributes, ACL, Accepted) values(?,?,?,?); `
2021-11-10 22:28:52 +00:00
const fetchAllConversationsSQLStmt = ` select ID, Handle, Attributes, ACL, Accepted from conversations; `
const selectConversationSQLStmt = ` select ID, Handle, Attributes, ACL, Accepted from conversations where ID=(?); `
const selectConversationByHandleSQLStmt = ` select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?); `
2021-11-23 20:17:11 +00:00
const acceptConversationSQLStmt = ` update conversations set Accepted=true where ID=(?); `
2021-11-10 22:28:52 +00:00
const setConversationAttributesSQLStmt = ` update conversations set Attributes=(?) where ID=(?) ; `
2021-11-17 22:34:13 +00:00
const setConversationACLSQLStmt = ` update conversations set ACL=(?) where ID=(?) ; `
2021-11-10 22:28:52 +00:00
const deleteConversationSQLStmt = ` delete from conversations where ID=(?); `
2021-11-09 23:47:33 +00:00
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
2021-11-16 23:06:30 +00:00
const createTableConversationMessagesSQLStmt = ` create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text); `
2021-11-09 23:47:33 +00:00
// insertMessageIntoConversationSQLStmt is a template for creating conversation based tables...
2021-11-16 23:06:30 +00:00
const insertMessageIntoConversationSQLStmt = ` insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?); `
2021-11-09 23:47:33 +00:00
2021-11-16 23:06:30 +00:00
// updateMessageIntoConversationSQLStmt is a template for updating attributes of a message in a conversation
const updateMessageIntoConversationSQLStmt = ` update channel_%d_%d_chat set Attributes=(?) where ID=(?); `
2021-11-23 20:17:11 +00:00
// purgeMessagesFromConversationSQLStmt is a template for updating attributes of a message in a conversation
const purgeMessagesFromConversationSQLStmt = ` delete from channel_%d_%d_chat; `
2021-11-16 23:06:30 +00:00
// getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation
2021-11-09 23:47:33 +00:00
const getMessageFromConversationSQLStmt = ` select Body, Attributes from channel_%d_%d_chat where ID=(?); `
2021-11-17 23:59:52 +00:00
// getMessageBySignatureFromConversationSQLStmt is a template for selecting conversation messages by signature
2021-11-16 23:06:30 +00:00
const getMessageBySignatureFromConversationSQLStmt = ` select ID from channel_%d_%d_chat where Signature=(?); `
2021-11-17 23:59:52 +00:00
// getMessageByContentHashFromConversationSQLStmt is a template for selecting conversation messages by content hash
2021-11-18 23:43:58 +00:00
const getMessageByContentHashFromConversationSQLStmt = ` select ID from channel_%d_%d_chat where ContentHash=(?) order by ID desc limit 1; `
2021-11-16 23:06:30 +00:00
2021-11-23 23:00:16 +00:00
// getLocalIndexOfMessageIDSQLStmt is a template for fetching the offset of a message from the bottom of the database.
const getLocalIndexOfMessageIDSQLStmt = ` select count (*) from channel_%d_%d_chat where ID >= (?) order by ID desc; `
2021-11-23 22:26:11 +00:00
2021-11-17 23:59:52 +00:00
// getMessageCountFromConversationSQLStmt is a template for fetching the count of a messages in a conversation channel
const getMessageCountFromConversationSQLStmt = ` select count(*) from channel_%d_%d_chat; `
2021-11-17 22:34:13 +00:00
2021-11-17 23:59:52 +00:00
// getMostRecentMessagesSQLStmt is a template for fetching the most recent N messages in a conversation channel
const getMostRecentMessagesSQLStmt = ` select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?); `
2021-11-17 22:34:13 +00:00
2021-11-09 23:47:33 +00:00
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements
2021-11-19 19:49:04 +00:00
func NewCwtchProfileStorage ( db * sql . DB , profileDirectory string ) ( * CwtchProfileStorage , error ) {
2021-11-09 23:47:33 +00:00
if db == nil {
return nil , errors . New ( "cannot construct cwtch profile storage with a nil database" )
}
insertProfileKeyValueStmt , err := db . Prepare ( insertProfileKeySQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2023-01-25 20:32:26 +00:00
// note: this is debug because we expect failure here when opening an encrypted database with an
// incorrect password. The rest are errors because failure is not expected.
log . Debugf ( "error preparing query: %v %v" , insertProfileKeySQLStmt , err )
2021-11-09 23:47:33 +00:00
return nil , err
}
selectProfileKeyStmt , err := db . Prepare ( selectProfileKeySQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-09 23:47:33 +00:00
log . Errorf ( "error preparing query: %v %v" , selectProfileKeySQLStmt , err )
return nil , err
}
2022-07-05 22:31:44 +00:00
findProfileKeyStmt , err := db . Prepare ( findProfileKeySQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2022-07-05 22:31:44 +00:00
log . Errorf ( "error preparing query: %v %v" , findProfileKeySQLStmt , err )
return nil , err
}
2021-11-09 23:47:33 +00:00
insertConversationStmt , err := db . Prepare ( insertConversationSQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-09 23:47:33 +00:00
log . Errorf ( "error preparing query: %v %v" , insertConversationSQLStmt , err )
return nil , err
}
2021-11-10 22:28:52 +00:00
fetchAllConversationsStmt , err := db . Prepare ( fetchAllConversationsSQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-10 22:28:52 +00:00
log . Errorf ( "error preparing query: %v %v" , fetchAllConversationsSQLStmt , err )
return nil , err
}
2021-11-09 23:47:33 +00:00
selectConversationStmt , err := db . Prepare ( selectConversationSQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-09 23:47:33 +00:00
log . Errorf ( "error preparing query: %v %v" , selectConversationSQLStmt , err )
return nil , err
}
2021-11-10 22:28:52 +00:00
selectConversationByHandleStmt , err := db . Prepare ( selectConversationByHandleSQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-10 22:28:52 +00:00
log . Errorf ( "error preparing query: %v %v" , selectConversationByHandleSQLStmt , err )
return nil , err
}
2021-11-23 20:17:11 +00:00
acceptConversationStmt , err := db . Prepare ( acceptConversationSQLStmt )
2021-11-09 23:47:33 +00:00
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-23 20:17:11 +00:00
log . Errorf ( "error preparing query: %v %v" , acceptConversationSQLStmt , err )
2021-11-09 23:47:33 +00:00
return nil , err
}
deleteConversationStmt , err := db . Prepare ( deleteConversationSQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-09 23:47:33 +00:00
log . Errorf ( "error preparing query: %v %v" , deleteConversationSQLStmt , err )
return nil , err
}
setConversationAttributesStmt , err := db . Prepare ( setConversationAttributesSQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-09 23:47:33 +00:00
log . Errorf ( "error preparing query: %v %v" , setConversationAttributesSQLStmt , err )
return nil , err
}
2021-11-17 22:34:13 +00:00
setConversationACLStmt , err := db . Prepare ( setConversationACLSQLStmt )
if err != nil {
2022-10-25 21:05:08 +00:00
db . Close ( )
2021-11-17 22:34:13 +00:00
log . Errorf ( "error preparing query: %v %v" , setConversationACLSQLStmt , err )
return nil , err
}
2021-11-10 22:28:52 +00:00
return & CwtchProfileStorage { db : db ,
2021-11-19 19:49:04 +00:00
ProfileDirectory : profileDirectory ,
2021-11-18 23:43:58 +00:00
insertProfileKeyValueStmt : insertProfileKeyValueStmt ,
selectProfileKeyValueStmt : selectProfileKeyStmt ,
2022-07-05 22:31:44 +00:00
findProfileKeySQLStmt : findProfileKeyStmt ,
2021-11-18 23:43:58 +00:00
fetchAllConversationsStmt : fetchAllConversationsStmt ,
insertConversationStmt : insertConversationStmt ,
selectConversationStmt : selectConversationStmt ,
selectConversationByHandleStmt : selectConversationByHandleStmt ,
acceptConversationStmt : acceptConversationStmt ,
deleteConversationStmt : deleteConversationStmt ,
setConversationAttributesStmt : setConversationAttributesStmt ,
setConversationACLStmt : setConversationACLStmt ,
channelInsertStmts : map [ ChannelID ] * sql . Stmt { } ,
channelUpdateMessageStmts : map [ ChannelID ] * sql . Stmt { } ,
channelGetMessageStmts : map [ ChannelID ] * sql . Stmt { } ,
channelGetMessageBySignatureStmts : map [ ChannelID ] * sql . Stmt { } ,
channelGetMessageByContentHashStmts : map [ ChannelID ] * sql . Stmt { } ,
channelGetMostRecentMessagesStmts : map [ ChannelID ] * sql . Stmt { } ,
2021-11-23 22:26:11 +00:00
channelGetCountStmts : map [ ChannelID ] * sql . Stmt { } ,
channelRowNumberStmts : map [ ChannelID ] * sql . Stmt { } ,
} ,
2021-11-16 23:06:30 +00:00
nil
2021-11-09 23:47:33 +00:00
}
// StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine
func ( cps * CwtchProfileStorage ) StoreProfileKeyValue ( keyType StorageKeyType , key string , value [ ] byte ) error {
_ , err := cps . insertProfileKeyValueStmt . Exec ( keyType , key , value )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
2022-07-05 22:31:44 +00:00
// FindProfileKeysByPrefix allows fetching of typed values via a known Key from the Storage Engine
func ( cps * CwtchProfileStorage ) FindProfileKeysByPrefix ( keyType StorageKeyType , prefix string ) ( [ ] string , error ) {
rows , err := cps . findProfileKeySQLStmt . Query ( keyType , prefix + "%" )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
var keys [ ] string
defer rows . Close ( )
for {
result := rows . Next ( )
if ! result {
return keys , nil
}
var key [ ] byte
err = rows . Scan ( & key )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
keys = append ( keys , string ( key ) )
}
}
2021-11-09 23:47:33 +00:00
// LoadProfileKeyValue allows fetching of typed values via a known Key from the Storage Engine
func ( cps * CwtchProfileStorage ) LoadProfileKeyValue ( keyType StorageKeyType , key string ) ( [ ] byte , error ) {
rows , err := cps . selectProfileKeyValueStmt . Query ( keyType , key )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
result := rows . Next ( )
if ! result {
return nil , errors . New ( "no result found" )
}
var keyValue [ ] byte
err = rows . Scan ( & keyValue )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
rows . Close ( )
return keyValue , nil
}
// NewConversation stores a new conversation in the data store
2021-11-11 00:41:43 +00:00
func ( cps * CwtchProfileStorage ) NewConversation ( handle string , attributes model . Attributes , acl model . AccessControlList , accepted bool ) ( int , error ) {
2021-11-09 23:47:33 +00:00
tx , err := cps . db . Begin ( )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , err
2021-11-09 23:47:33 +00:00
}
result , err := tx . Stmt ( cps . insertConversationStmt ) . Exec ( handle , attributes . Serialize ( ) , acl . Serialize ( ) , accepted )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , tx . Rollback ( )
2021-11-09 23:47:33 +00:00
}
id , err := result . LastInsertId ( )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , tx . Rollback ( )
}
result , err = tx . Exec ( fmt . Sprintf ( createTableConversationMessagesSQLStmt , id ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , tx . Rollback ( )
}
conversationID , err := result . LastInsertId ( )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , tx . Rollback ( )
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
err = tx . Commit ( )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , tx . Rollback ( )
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
return int ( conversationID ) , nil
2021-11-09 23:47:33 +00:00
}
2021-11-23 20:17:11 +00:00
// GetConversationByHandle is a convenience method to fetch an active conversation by a handle
2021-11-11 00:41:43 +00:00
// Usage Notes: This should **only** be used to look up p2p conversations by convention.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently
// unavoidable in some circumstances because the event bus references conversations by handle, not by id)
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) GetConversationByHandle ( handle string ) ( * model . Conversation , error ) {
rows , err := cps . selectConversationByHandleStmt . Query ( handle )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
result := rows . Next ( )
if ! result {
return nil , errors . New ( "no result found" )
}
var id int
var acl [ ] byte
var attributes [ ] byte
var accepted bool
2021-11-10 22:28:52 +00:00
err = rows . Scan ( & id , & handle , & attributes , & acl , & accepted )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
rows . Close ( )
return & model . Conversation { ID : id , Handle : handle , ACL : model . DeserializeAccessControlList ( acl ) , Attributes : model . DeserializeAttributes ( attributes ) , Accepted : accepted } , nil
}
2021-11-11 00:41:43 +00:00
// FetchConversations returns *all* active conversations. This method should only be called
// on app start up to build a summary of conversations for the UI. Any further updates should be integrated
// through the event bus.
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) FetchConversations ( ) ( [ ] * model . Conversation , error ) {
rows , err := cps . fetchAllConversationsStmt . Query ( )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
var conversations [ ] * model . Conversation
defer rows . Close ( )
for {
result := rows . Next ( )
if ! result {
return conversations , nil
}
var id int
var handle string
var acl [ ] byte
var attributes [ ] byte
var accepted bool
err = rows . Scan ( & id , & handle , & attributes , & acl , & accepted )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
conversations = append ( conversations , & model . Conversation { ID : id , Handle : handle , ACL : model . DeserializeAccessControlList ( acl ) , Attributes : model . DeserializeAttributes ( attributes ) , Accepted : accepted } )
}
}
2021-11-23 20:17:11 +00:00
// GetConversation looks up a particular conversation by id
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) GetConversation ( id int ) ( * model . Conversation , error ) {
rows , err := cps . selectConversationStmt . Query ( id )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
result := rows . Next ( )
if ! result {
return nil , errors . New ( "no result found" )
}
var handle string
var acl [ ] byte
var attributes [ ] byte
var accepted bool
err = rows . Scan ( & id , & handle , & attributes , & acl , & accepted )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
rows . Close ( )
2021-11-10 22:28:52 +00:00
return & model . Conversation { ID : id , Handle : handle , ACL : model . DeserializeAccessControlList ( acl ) , Attributes : model . DeserializeAttributes ( attributes ) , Accepted : accepted } , nil
2021-11-09 23:47:33 +00:00
}
// AcceptConversation sets the accepted status of a conversation to true in the backing datastore
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) AcceptConversation ( id int ) error {
_ , err := cps . acceptConversationStmt . Exec ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
// DeleteConversation purges the conversation and any associated message history from the conversation store.
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) DeleteConversation ( id int ) error {
_ , err := cps . deleteConversationStmt . Exec ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
2021-11-17 22:34:13 +00:00
// SetConversationACL sets a new ACL on a given conversation.
func ( cps * CwtchProfileStorage ) SetConversationACL ( id int , acl model . AccessControlList ) error {
2022-01-06 17:55:26 +00:00
_ , err := cps . setConversationACLStmt . Exec ( acl . Serialize ( ) , id )
2021-11-17 22:34:13 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
2021-11-11 00:41:43 +00:00
// SetConversationAttribute sets a new attribute on a given conversation.
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) SetConversationAttribute ( id int , path attr . ScopedZonedPath , value string ) error {
ci , err := cps . GetConversation ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
return err
}
ci . Attributes [ path . ToString ( ) ] = value
2021-11-10 22:28:52 +00:00
_ , err = cps . setConversationAttributesStmt . Exec ( ci . Attributes . Serialize ( ) , id )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
2021-11-11 00:41:43 +00:00
// InsertMessage appends a message to a conversation channel, with a given set of attributes
2021-12-06 20:20:38 +00:00
func ( cps * CwtchProfileStorage ) InsertMessage ( conversation int , channel int , body string , attributes model . Attributes , signature string , contentHash string ) ( int , error ) {
2021-11-09 23:47:33 +00:00
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-09 23:47:33 +00:00
_ , exists := cps . channelInsertStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( insertMessageIntoConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-12-06 20:20:38 +00:00
return - 1 , err
2021-11-09 23:47:33 +00:00
}
cps . channelInsertStmts [ channelID ] = conversationStmt
}
2021-12-06 20:20:38 +00:00
result , err := cps . channelInsertStmts [ channelID ] . Exec ( body , attributes . Serialize ( ) , signature , contentHash )
2021-11-16 23:06:30 +00:00
if err != nil {
log . Errorf ( "error inserting message: %v %v" , signature , err )
2021-12-06 20:20:38 +00:00
return - 1 , err
2021-11-16 23:06:30 +00:00
}
2021-12-06 20:20:38 +00:00
id , err := result . LastInsertId ( )
return int ( id ) , err
2021-11-16 23:06:30 +00:00
}
// UpdateMessageAttributes updates the attributes associated with a message of a given conversation
func ( cps * CwtchProfileStorage ) UpdateMessageAttributes ( conversation int , channel int , messageID int , attributes model . Attributes ) error {
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-16 23:06:30 +00:00
_ , exists := cps . channelUpdateMessageStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( updateMessageIntoConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return err
}
cps . channelUpdateMessageStmts [ channelID ] = conversationStmt
}
_ , err := cps . channelUpdateMessageStmts [ channelID ] . Exec ( attributes . Serialize ( ) , messageID )
2021-11-09 23:47:33 +00:00
if err != nil {
2021-11-16 23:06:30 +00:00
log . Errorf ( "error updating message: %v" , err )
2021-11-09 23:47:33 +00:00
return err
}
return nil
}
2021-11-16 23:06:30 +00:00
// GetChannelMessageBySignature looks up a conversation message by signature instead of identifier. Both are unique but
// signatures are common between conversation participants (in groups) and so are a more useful message to index.
func ( cps * CwtchProfileStorage ) GetChannelMessageBySignature ( conversation int , channel int , signature string ) ( int , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-16 23:06:30 +00:00
_ , exists := cps . channelGetMessageBySignatureStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getMessageBySignatureFromConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , err
}
cps . channelGetMessageBySignatureStmts [ channelID ] = conversationStmt
}
rows , err := cps . channelGetMessageBySignatureStmts [ channelID ] . Query ( signature )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return - 1 , err
}
result := rows . Next ( )
if ! result {
return - 1 , errors . New ( "no result found" )
}
var id int
err = rows . Scan ( & id )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return - 1 , err
}
rows . Close ( )
return id , nil
}
2021-11-18 23:43:58 +00:00
// GetChannelMessageByContentHash looks up a conversation message by hash instead of identifier.
func ( cps * CwtchProfileStorage ) GetChannelMessageByContentHash ( conversation int , channel int , hash string ) ( int , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-18 23:43:58 +00:00
_ , exists := cps . channelGetMessageByContentHashStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getMessageByContentHashFromConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , err
}
cps . channelGetMessageByContentHashStmts [ channelID ] = conversationStmt
}
rows , err := cps . channelGetMessageByContentHashStmts [ channelID ] . Query ( hash )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return - 1 , err
}
result := rows . Next ( )
if ! result {
return - 1 , errors . New ( "no result found" )
}
var id int
err = rows . Scan ( & id )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return - 1 , err
}
rows . Close ( )
2021-11-23 22:26:11 +00:00
2021-11-26 00:16:05 +00:00
return id , nil
2021-11-18 23:43:58 +00:00
}
2021-11-23 22:26:11 +00:00
// GetRowNumberByMessageID looks up the row number of a message by the message ID
func ( cps * CwtchProfileStorage ) GetRowNumberByMessageID ( conversation int , channel int , id int ) ( int , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-23 22:26:11 +00:00
_ , exists := cps . channelRowNumberStmts [ channelID ]
if ! exists {
2021-11-23 23:00:16 +00:00
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getLocalIndexOfMessageIDSQLStmt , conversation , channel ) )
2021-11-23 22:26:11 +00:00
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , err
}
cps . channelRowNumberStmts [ channelID ] = conversationStmt
}
rows , err := cps . channelRowNumberStmts [ channelID ] . Query ( id )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return - 1 , err
}
result := rows . Next ( )
if ! result {
return - 1 , errors . New ( "no result found" )
}
var rownum int
err = rows . Scan ( & rownum )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return - 1 , err
}
rows . Close ( )
2021-11-26 00:16:05 +00:00
// Return the offset **not** the count
return rownum - 1 , nil
2021-11-23 22:26:11 +00:00
}
2021-11-11 00:41:43 +00:00
// GetChannelMessage looks up a channel message by conversation, channel and message id. On success it
// returns the message body and the attributes associated with the message. Otherwise an error is returned.
2021-11-09 23:47:33 +00:00
func ( cps * CwtchProfileStorage ) GetChannelMessage ( conversation int , channel int , messageID int ) ( string , model . Attributes , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-09 23:47:33 +00:00
_ , exists := cps . channelGetMessageStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getMessageFromConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return "" , nil , err
}
cps . channelGetMessageStmts [ channelID ] = conversationStmt
}
rows , err := cps . channelGetMessageStmts [ channelID ] . Query ( messageID )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return "" , nil , err
}
result := rows . Next ( )
if ! result {
return "" , nil , errors . New ( "no result found" )
}
// Deserialize the Row
var body string
var attributes [ ] byte
err = rows . Scan ( & body , & attributes )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return "" , nil , err
}
rows . Close ( )
return body , model . DeserializeAttributes ( attributes ) , nil
}
2021-11-17 22:34:13 +00:00
// GetChannelMessageCount returns the number of messages in a channel
func ( cps * CwtchProfileStorage ) GetChannelMessageCount ( conversation int , channel int ) ( int , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-17 22:34:13 +00:00
_ , exists := cps . channelGetCountStmts [ channelID ]
if ! exists {
2021-11-17 23:59:52 +00:00
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getMessageCountFromConversationSQLStmt , conversation , channel ) )
2021-11-17 22:34:13 +00:00
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , err
}
cps . channelGetCountStmts [ channelID ] = conversationStmt
}
var count int
err := cps . channelGetCountStmts [ channelID ] . QueryRow ( ) . Scan ( & count )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return - 1 , err
}
return count , nil
}
2021-11-17 23:59:52 +00:00
// GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset
2021-11-17 22:34:13 +00:00
func ( cps * CwtchProfileStorage ) GetMostRecentMessages ( conversation int , channel int , offset int , limit int ) ( [ ] model . ConversationMessage , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-17 22:34:13 +00:00
_ , exists := cps . channelGetMostRecentMessagesStmts [ channelID ]
if ! exists {
2021-11-17 23:59:52 +00:00
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getMostRecentMessagesSQLStmt , conversation , channel ) )
2021-11-17 22:34:13 +00:00
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return nil , err
}
cps . channelGetMostRecentMessagesStmts [ channelID ] = conversationStmt
}
rows , err := cps . channelGetMostRecentMessagesStmts [ channelID ] . Query ( limit , offset )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
var conversationMessages [ ] model . ConversationMessage
defer rows . Close ( )
for {
result := rows . Next ( )
if ! result {
return conversationMessages , nil
}
var id int
var body string
var attributes [ ] byte
var sig string
var contenthash string
err = rows . Scan ( & id , & body , & attributes , & sig , & contenthash )
if err != nil {
return conversationMessages , err
}
conversationMessages = append ( conversationMessages , model . ConversationMessage { ID : id , Body : body , Attr : model . DeserializeAttributes ( attributes ) , Signature : sig , ContentHash : contenthash } )
}
}
2021-11-23 22:54:06 +00:00
// PurgeConversationChannel deletes all message for a conversation channel.
2021-11-23 22:26:11 +00:00
func ( cps * CwtchProfileStorage ) PurgeConversationChannel ( conversation int , channel int ) error {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( purgeMessagesFromConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return err
}
conversationStmt . Exec ( )
return conversationStmt . Close ( )
}
2021-11-25 23:39:08 +00:00
// PurgeNonSavedMessages deletes all message conversations that are not explicitly set to saved.
func ( cps * CwtchProfileStorage ) PurgeNonSavedMessages ( ) {
// Purge Messages that are not stored...
ci , err := cps . FetchConversations ( )
if err == nil {
for _ , conversation := range ci {
if ! conversation . IsGroup ( ) && ! conversation . IsServer ( ) {
if conversation . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( event . SaveHistoryKey ) ) . ToString ( ) ] != event . SaveHistoryConfirmed {
2023-01-25 20:32:26 +00:00
log . Debugf ( "purging conversation..." )
2021-11-25 23:39:08 +00:00
// TODO: At some point in the future this needs to iterate over channels and make a decision for each on..
cps . PurgeConversationChannel ( conversation . ID , 0 )
}
}
}
}
}
2021-11-09 23:47:33 +00:00
// Close closes the underlying database and prepared statements
2022-10-25 20:59:05 +00:00
func ( cps * CwtchProfileStorage ) Close ( purgeAllNonSavedMessages bool ) {
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-11-09 23:47:33 +00:00
if cps . db != nil {
2022-10-25 20:59:05 +00:00
if purgeAllNonSavedMessages {
cps . PurgeNonSavedMessages ( )
}
2021-11-23 20:17:11 +00:00
2021-11-09 23:47:33 +00:00
cps . insertProfileKeyValueStmt . Close ( )
cps . selectProfileKeyValueStmt . Close ( )
2021-11-19 19:49:04 +00:00
cps . insertConversationStmt . Close ( )
cps . fetchAllConversationsStmt . Close ( )
cps . selectConversationStmt . Close ( )
cps . selectConversationByHandleStmt . Close ( )
cps . acceptConversationStmt . Close ( )
cps . deleteConversationStmt . Close ( )
cps . setConversationAttributesStmt . Close ( )
cps . setConversationACLStmt . Close ( )
for _ , v := range cps . channelInsertStmts {
v . Close ( )
}
for _ , v := range cps . channelUpdateMessageStmts {
v . Close ( )
}
for _ , v := range cps . channelGetMessageStmts {
v . Close ( )
}
for _ , v := range cps . channelGetMessageBySignatureStmts {
v . Close ( )
}
for _ , v := range cps . channelGetCountStmts {
v . Close ( )
}
for _ , v := range cps . channelGetMostRecentMessagesStmts {
v . Close ( )
}
for _ , v := range cps . channelGetMessageByContentHashStmts {
v . Close ( )
}
2021-11-09 23:47:33 +00:00
cps . db . Close ( )
}
}
2021-11-19 19:49:04 +00:00
// Delete unconditionally destroys the profile directory associated with the store.
// This is unrecoverable.
func ( cps * CwtchProfileStorage ) Delete ( ) {
err := os . RemoveAll ( cps . ProfileDirectory )
if err != nil {
log . Errorf ( "error deleting profile directory" , err )
}
}
2021-12-17 21:58:54 +00:00
2021-12-19 00:55:14 +00:00
// Rekey re-encrypts the datastore with the new key.
// **note* this is technically a very dangerous API and should only be called after
// checks on the current password and the derived new password.
2021-12-17 21:58:54 +00:00
func ( cps * CwtchProfileStorage ) Rekey ( newkey [ 32 ] byte ) error {
2022-08-26 20:54:48 +00:00
cps . mutex . Lock ( )
defer cps . mutex . Unlock ( )
2021-12-17 21:58:54 +00:00
// PRAGMA queries don't allow subs...
_ , err := cps . db . Exec ( fmt . Sprintf ( ` PRAGMA rekey="x'%x'"; ` , newkey ) )
return err
}
2022-03-08 21:45:26 +00:00
// Export takes in a file name and creates an exported cwtch profile file (which in reality is a compressed tarball).
func ( cps * CwtchProfileStorage ) Export ( filename string ) error {
profileDB := filepath . Join ( cps . ProfileDirectory , dbFile )
profileSalt := filepath . Join ( cps . ProfileDirectory , saltFile )
profileVersion := filepath . Join ( cps . ProfileDirectory , versionFile )
file , err := os . Create ( filename )
if err != nil {
return fmt . Errorf ( "could not create tarball file '%s', got error '%s'" , filename , err . Error ( ) )
}
defer file . Close ( )
gzipWriter := gzip . NewWriter ( file )
defer gzipWriter . Close ( )
tarWriter := tar . NewWriter ( gzipWriter )
defer tarWriter . Close ( )
// We need to know the base directory so we can import it later (and prevent duplicates)...
2022-07-05 22:31:44 +00:00
profilePath := filepath . Base ( cps . ProfileDirectory )
2022-03-08 21:45:26 +00:00
err = addFileToTarWriter ( profilePath , profileDB , tarWriter )
if err != nil {
return fmt . Errorf ( "could not add file '%s', to tarball, got error '%s'" , profileDB , err . Error ( ) )
}
err = addFileToTarWriter ( profilePath , profileSalt , tarWriter )
if err != nil {
return fmt . Errorf ( "could not add file '%s', to tarball, got error '%s'" , profileDB , err . Error ( ) )
}
err = addFileToTarWriter ( profilePath , profileVersion , tarWriter )
if err != nil {
return fmt . Errorf ( "could not add file '%s', to tarball, got error '%s'" , profileDB , err . Error ( ) )
}
return nil
}
func addFileToTarWriter ( profilePath string , filePath string , tarWriter * tar . Writer ) error {
file , err := os . Open ( filePath )
if err != nil {
return fmt . Errorf ( "could not open file '%s', got error '%s'" , filePath , err . Error ( ) )
}
defer file . Close ( )
stat , err := file . Stat ( )
if err != nil {
return fmt . Errorf ( "could not get stat for file '%s', got error '%s'" , filePath , err . Error ( ) )
}
header := & tar . Header {
// Note: we are using strings.Join here deliberately so that we can import the profile
// in a cross platform way (e.g. using filepath here would result in different names on Windows v.s Linux)
Name : strings . Join ( [ ] string { profilePath , stat . Name ( ) } , "/" ) ,
Size : stat . Size ( ) ,
Mode : int64 ( stat . Mode ( ) ) ,
ModTime : stat . ModTime ( ) ,
}
err = tarWriter . WriteHeader ( header )
if err != nil {
return fmt . Errorf ( "could not write header for file '%s', got error '%s'" , filePath , err . Error ( ) )
}
_ , err = io . Copy ( tarWriter , file )
if err != nil {
return fmt . Errorf ( "could not copy the file '%s' data to the tarball, got error '%s'" , filePath , err . Error ( ) )
}
return nil
}