2021-11-09 23:47:33 +00:00
package peer
import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"database/sql"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
)
// StorageKeyType is an interface wrapper around storage key types
type StorageKeyType string
const (
// TypeAttribute for Profile Scoped and Zoned Attributes
TypeAttribute = StorageKeyType ( "Attribute" )
// TypePrivateKey for Profile Private Keys
TypePrivateKey = StorageKeyType ( "PrivateKey" )
// TypePublicKey for Profile Public Keys
TypePublicKey = StorageKeyType ( "PublicKey" )
)
// CwtchProfileStorage encapsulates common datastore requests so as to not pollute the main cwtch profile
// struct with database knowledge
type CwtchProfileStorage struct {
// Note: Statements are thread safe..
// Profile related statements
insertProfileKeyValueStmt * sql . Stmt
selectProfileKeyValueStmt * sql . Stmt
// Conversation related statements
2021-11-10 22:28:52 +00:00
insertConversationStmt * sql . Stmt
fetchAllConversationsStmt * sql . Stmt
selectConversationStmt * sql . Stmt
selectConversationByHandleStmt * sql . Stmt
acceptConversationStmt * sql . Stmt
deleteConversationStmt * sql . Stmt
setConversationAttributesStmt * sql . Stmt
2021-11-09 23:47:33 +00:00
2021-11-16 23:06:30 +00:00
channelInsertStmts map [ ChannelID ] * sql . Stmt
channelUpdateMessageStmts map [ ChannelID ] * sql . Stmt
channelGetMessageStmts map [ ChannelID ] * sql . Stmt
channelGetMessageBySignatureStmts map [ ChannelID ] * sql . Stmt
2021-11-09 23:47:33 +00:00
db * sql . DB
}
2021-11-11 00:41:43 +00:00
// ChannelID encapsulates the data necessary to reference a channel structure.
2021-11-09 23:47:33 +00:00
type ChannelID struct {
Conversation int
Channel int
}
const insertProfileKeySQLStmt = ` insert into profile_kv(KeyType, KeyName, KeyValue) values(?,?,?); `
const selectProfileKeySQLStmt = ` select KeyValue from profile_kv where KeyType=(?) and KeyName=(?); `
const insertConversationSQLStmt = ` insert into conversations(Handle, Attributes, ACL, Accepted) values(?,?,?,?); `
2021-11-10 22:28:52 +00:00
const fetchAllConversationsSQLStmt = ` select ID, Handle, Attributes, ACL, Accepted from conversations; `
const selectConversationSQLStmt = ` select ID, Handle, Attributes, ACL, Accepted from conversations where ID=(?); `
const selectConversationByHandleSQLStmt = ` select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?); `
const acceptedConversationSQLStmt = ` update conversations set Accepted=true where ID=(?); `
const setConversationAttributesSQLStmt = ` update conversations set Attributes=(?) where ID=(?) ; `
const deleteConversationSQLStmt = ` delete from conversations where ID=(?); `
2021-11-09 23:47:33 +00:00
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
2021-11-16 23:06:30 +00:00
const createTableConversationMessagesSQLStmt = ` create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text); `
2021-11-09 23:47:33 +00:00
// insertMessageIntoConversationSQLStmt is a template for creating conversation based tables...
2021-11-16 23:06:30 +00:00
const insertMessageIntoConversationSQLStmt = ` insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?); `
2021-11-09 23:47:33 +00:00
2021-11-16 23:06:30 +00:00
// updateMessageIntoConversationSQLStmt is a template for updating attributes of a message in a conversation
const updateMessageIntoConversationSQLStmt = ` update channel_%d_%d_chat set Attributes=(?) where ID=(?); `
// getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation
2021-11-09 23:47:33 +00:00
const getMessageFromConversationSQLStmt = ` select Body, Attributes from channel_%d_%d_chat where ID=(?); `
2021-11-16 23:06:30 +00:00
// getMessageBySignatureFromConversationSQLStmt is a template for creating conversation based tables...
const getMessageBySignatureFromConversationSQLStmt = ` select ID from channel_%d_%d_chat where Signature=(?); `
// getMessageByContentHashFromConversationSQLStmt is a template for creating conversation based tables...
const getMessageByContentHashFromConversationSQLStmt = ` select ID from channel_%d_%d_chat where ContentHash=(?); `
2021-11-09 23:47:33 +00:00
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements
func NewCwtchProfileStorage ( db * sql . DB ) ( * CwtchProfileStorage , error ) {
if db == nil {
return nil , errors . New ( "cannot construct cwtch profile storage with a nil database" )
}
insertProfileKeyValueStmt , err := db . Prepare ( insertProfileKeySQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , insertProfileKeySQLStmt , err )
return nil , err
}
selectProfileKeyStmt , err := db . Prepare ( selectProfileKeySQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , selectProfileKeySQLStmt , err )
return nil , err
}
insertConversationStmt , err := db . Prepare ( insertConversationSQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , insertConversationSQLStmt , err )
return nil , err
}
2021-11-10 22:28:52 +00:00
fetchAllConversationsStmt , err := db . Prepare ( fetchAllConversationsSQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , fetchAllConversationsSQLStmt , err )
return nil , err
}
2021-11-09 23:47:33 +00:00
selectConversationStmt , err := db . Prepare ( selectConversationSQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , selectConversationSQLStmt , err )
return nil , err
}
2021-11-10 22:28:52 +00:00
selectConversationByHandleStmt , err := db . Prepare ( selectConversationByHandleSQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , selectConversationByHandleSQLStmt , err )
return nil , err
}
2021-11-09 23:47:33 +00:00
acceptConversationStmt , err := db . Prepare ( acceptedConversationSQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , acceptedConversationSQLStmt , err )
return nil , err
}
deleteConversationStmt , err := db . Prepare ( deleteConversationSQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , deleteConversationSQLStmt , err )
return nil , err
}
setConversationAttributesStmt , err := db . Prepare ( setConversationAttributesSQLStmt )
if err != nil {
log . Errorf ( "error preparing query: %v %v" , setConversationAttributesSQLStmt , err )
return nil , err
}
2021-11-10 22:28:52 +00:00
return & CwtchProfileStorage { db : db ,
2021-11-16 23:06:30 +00:00
insertProfileKeyValueStmt : insertProfileKeyValueStmt ,
selectProfileKeyValueStmt : selectProfileKeyStmt ,
fetchAllConversationsStmt : fetchAllConversationsStmt ,
insertConversationStmt : insertConversationStmt ,
selectConversationStmt : selectConversationStmt ,
selectConversationByHandleStmt : selectConversationByHandleStmt ,
acceptConversationStmt : acceptConversationStmt ,
deleteConversationStmt : deleteConversationStmt ,
setConversationAttributesStmt : setConversationAttributesStmt ,
channelInsertStmts : map [ ChannelID ] * sql . Stmt { } ,
channelUpdateMessageStmts : map [ ChannelID ] * sql . Stmt { } ,
channelGetMessageStmts : map [ ChannelID ] * sql . Stmt { } ,
channelGetMessageBySignatureStmts : map [ ChannelID ] * sql . Stmt { } } ,
nil
2021-11-09 23:47:33 +00:00
}
// StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine
func ( cps * CwtchProfileStorage ) StoreProfileKeyValue ( keyType StorageKeyType , key string , value [ ] byte ) error {
_ , err := cps . insertProfileKeyValueStmt . Exec ( keyType , key , value )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
// LoadProfileKeyValue allows fetching of typed values via a known Key from the Storage Engine
func ( cps * CwtchProfileStorage ) LoadProfileKeyValue ( keyType StorageKeyType , key string ) ( [ ] byte , error ) {
rows , err := cps . selectProfileKeyValueStmt . Query ( keyType , key )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
result := rows . Next ( )
if ! result {
return nil , errors . New ( "no result found" )
}
var keyValue [ ] byte
err = rows . Scan ( & keyValue )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
rows . Close ( )
return keyValue , nil
}
// NewConversation stores a new conversation in the data store
2021-11-11 00:41:43 +00:00
func ( cps * CwtchProfileStorage ) NewConversation ( handle string , attributes model . Attributes , acl model . AccessControlList , accepted bool ) ( int , error ) {
2021-11-09 23:47:33 +00:00
tx , err := cps . db . Begin ( )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , err
2021-11-09 23:47:33 +00:00
}
result , err := tx . Stmt ( cps . insertConversationStmt ) . Exec ( handle , attributes . Serialize ( ) , acl . Serialize ( ) , accepted )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , tx . Rollback ( )
2021-11-09 23:47:33 +00:00
}
id , err := result . LastInsertId ( )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , tx . Rollback ( )
}
result , err = tx . Exec ( fmt . Sprintf ( createTableConversationMessagesSQLStmt , id ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , tx . Rollback ( )
}
conversationID , err := result . LastInsertId ( )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , tx . Rollback ( )
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
err = tx . Commit ( )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
2021-11-11 00:41:43 +00:00
return - 1 , tx . Rollback ( )
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
return int ( conversationID ) , nil
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
// GetConversationByHandle is a convienance method to fetch an active conversation by a handle
// Usage Notes: This should **only** be used to look up p2p conversations by convention.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently
// unavoidable in some circumstances because the event bus references conversations by handle, not by id)
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) GetConversationByHandle ( handle string ) ( * model . Conversation , error ) {
rows , err := cps . selectConversationByHandleStmt . Query ( handle )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
result := rows . Next ( )
if ! result {
return nil , errors . New ( "no result found" )
}
var id int
var acl [ ] byte
var attributes [ ] byte
var accepted bool
2021-11-10 22:28:52 +00:00
err = rows . Scan ( & id , & handle , & attributes , & acl , & accepted )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
rows . Close ( )
return & model . Conversation { ID : id , Handle : handle , ACL : model . DeserializeAccessControlList ( acl ) , Attributes : model . DeserializeAttributes ( attributes ) , Accepted : accepted } , nil
}
2021-11-11 00:41:43 +00:00
// FetchConversations returns *all* active conversations. This method should only be called
// on app start up to build a summary of conversations for the UI. Any further updates should be integrated
// through the event bus.
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) FetchConversations ( ) ( [ ] * model . Conversation , error ) {
rows , err := cps . fetchAllConversationsStmt . Query ( )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
var conversations [ ] * model . Conversation
defer rows . Close ( )
for {
result := rows . Next ( )
if ! result {
return conversations , nil
}
var id int
var handle string
var acl [ ] byte
var attributes [ ] byte
var accepted bool
err = rows . Scan ( & id , & handle , & attributes , & acl , & accepted )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
conversations = append ( conversations , & model . Conversation { ID : id , Handle : handle , ACL : model . DeserializeAccessControlList ( acl ) , Attributes : model . DeserializeAttributes ( attributes ) , Accepted : accepted } )
}
}
// GetConversation looks up a particular conversation by handle
func ( cps * CwtchProfileStorage ) GetConversation ( id int ) ( * model . Conversation , error ) {
rows , err := cps . selectConversationStmt . Query ( id )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return nil , err
}
result := rows . Next ( )
if ! result {
return nil , errors . New ( "no result found" )
}
var handle string
var acl [ ] byte
var attributes [ ] byte
var accepted bool
err = rows . Scan ( & id , & handle , & attributes , & acl , & accepted )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return nil , err
}
rows . Close ( )
2021-11-10 22:28:52 +00:00
return & model . Conversation { ID : id , Handle : handle , ACL : model . DeserializeAccessControlList ( acl ) , Attributes : model . DeserializeAttributes ( attributes ) , Accepted : accepted } , nil
2021-11-09 23:47:33 +00:00
}
// AcceptConversation sets the accepted status of a conversation to true in the backing datastore
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) AcceptConversation ( id int ) error {
_ , err := cps . acceptConversationStmt . Exec ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
// DeleteConversation purges the conversation and any associated message history from the conversation store.
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) DeleteConversation ( id int ) error {
_ , err := cps . deleteConversationStmt . Exec ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
2021-11-11 00:41:43 +00:00
// SetConversationAttribute sets a new attribute on a given conversation.
2021-11-10 22:28:52 +00:00
func ( cps * CwtchProfileStorage ) SetConversationAttribute ( id int , path attr . ScopedZonedPath , value string ) error {
ci , err := cps . GetConversation ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
return err
}
ci . Attributes [ path . ToString ( ) ] = value
2021-11-10 22:28:52 +00:00
_ , err = cps . setConversationAttributesStmt . Exec ( ci . Attributes . Serialize ( ) , id )
2021-11-09 23:47:33 +00:00
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return err
}
return nil
}
2021-11-11 00:41:43 +00:00
// InsertMessage appends a message to a conversation channel, with a given set of attributes
2021-11-16 23:06:30 +00:00
func ( cps * CwtchProfileStorage ) InsertMessage ( conversation int , channel int , body string , attributes model . Attributes , signature string , contentHash string ) error {
2021-11-09 23:47:33 +00:00
channelID := ChannelID { Conversation : conversation , Channel : channel }
_ , exists := cps . channelInsertStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( insertMessageIntoConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return err
}
cps . channelInsertStmts [ channelID ] = conversationStmt
}
2021-11-16 23:06:30 +00:00
_ , err := cps . channelInsertStmts [ channelID ] . Exec ( body , attributes . Serialize ( ) , signature , contentHash )
if err != nil {
log . Errorf ( "error inserting message: %v %v" , signature , err )
return err
}
log . Infof ( "inserted message with signature: %v" , signature )
return nil
}
// UpdateMessageAttributes updates the attributes associated with a message of a given conversation
func ( cps * CwtchProfileStorage ) UpdateMessageAttributes ( conversation int , channel int , messageID int , attributes model . Attributes ) error {
channelID := ChannelID { Conversation : conversation , Channel : channel }
_ , exists := cps . channelUpdateMessageStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( updateMessageIntoConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return err
}
cps . channelUpdateMessageStmts [ channelID ] = conversationStmt
}
_ , err := cps . channelUpdateMessageStmts [ channelID ] . Exec ( attributes . Serialize ( ) , messageID )
2021-11-09 23:47:33 +00:00
if err != nil {
2021-11-16 23:06:30 +00:00
log . Errorf ( "error updating message: %v" , err )
2021-11-09 23:47:33 +00:00
return err
}
return nil
}
2021-11-16 23:06:30 +00:00
// GetChannelMessageBySignature looks up a conversation message by signature instead of identifier. Both are unique but
// signatures are common between conversation participants (in groups) and so are a more useful message to index.
func ( cps * CwtchProfileStorage ) GetChannelMessageBySignature ( conversation int , channel int , signature string ) ( int , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
_ , exists := cps . channelGetMessageBySignatureStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getMessageBySignatureFromConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return - 1 , err
}
cps . channelGetMessageBySignatureStmts [ channelID ] = conversationStmt
}
rows , err := cps . channelGetMessageBySignatureStmts [ channelID ] . Query ( signature )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return - 1 , err
}
result := rows . Next ( )
if ! result {
return - 1 , errors . New ( "no result found" )
}
var id int
err = rows . Scan ( & id )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return - 1 , err
}
rows . Close ( )
return id , nil
}
2021-11-11 00:41:43 +00:00
// GetChannelMessage looks up a channel message by conversation, channel and message id. On success it
// returns the message body and the attributes associated with the message. Otherwise an error is returned.
2021-11-09 23:47:33 +00:00
func ( cps * CwtchProfileStorage ) GetChannelMessage ( conversation int , channel int , messageID int ) ( string , model . Attributes , error ) {
channelID := ChannelID { Conversation : conversation , Channel : channel }
_ , exists := cps . channelGetMessageStmts [ channelID ]
if ! exists {
conversationStmt , err := cps . db . Prepare ( fmt . Sprintf ( getMessageFromConversationSQLStmt , conversation , channel ) )
if err != nil {
log . Errorf ( "error executing transaction: %v" , err )
return "" , nil , err
}
cps . channelGetMessageStmts [ channelID ] = conversationStmt
}
rows , err := cps . channelGetMessageStmts [ channelID ] . Query ( messageID )
if err != nil {
log . Errorf ( "error executing query: %v" , err )
return "" , nil , err
}
result := rows . Next ( )
if ! result {
return "" , nil , errors . New ( "no result found" )
}
// Deserialize the Row
var body string
var attributes [ ] byte
err = rows . Scan ( & body , & attributes )
if err != nil {
log . Errorf ( "error fetching rows: %v" , err )
rows . Close ( )
return "" , nil , err
}
rows . Close ( )
return body , model . DeserializeAttributes ( attributes ) , nil
}
// Close closes the underlying database and prepared statements
func ( cps * CwtchProfileStorage ) Close ( ) {
if cps . db != nil {
cps . insertProfileKeyValueStmt . Close ( )
cps . selectProfileKeyValueStmt . Close ( )
cps . db . Close ( )
}
}