cwtch/peer/cwtchprofilestorage.go

603 lines
21 KiB
Go
Raw Normal View History

package peer
import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"database/sql"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
)
// StorageKeyType is an interface wrapper around storage key types
type StorageKeyType string
const (
// TypeAttribute for Profile Scoped and Zoned Attributes
TypeAttribute = StorageKeyType("Attribute")
// TypePrivateKey for Profile Private Keys
TypePrivateKey = StorageKeyType("PrivateKey")
// TypePublicKey for Profile Public Keys
TypePublicKey = StorageKeyType("PublicKey")
)
// CwtchProfileStorage encapsulates common datastore requests so as to not pollute the main cwtch profile
// struct with database knowledge
type CwtchProfileStorage struct {
// Note: Statements are thread safe..
// Profile related statements
insertProfileKeyValueStmt *sql.Stmt
selectProfileKeyValueStmt *sql.Stmt
// Conversation related statements
insertConversationStmt *sql.Stmt
fetchAllConversationsStmt *sql.Stmt
selectConversationStmt *sql.Stmt
selectConversationByHandleStmt *sql.Stmt
acceptConversationStmt *sql.Stmt
deleteConversationStmt *sql.Stmt
setConversationAttributesStmt *sql.Stmt
setConversationACLStmt *sql.Stmt
channelInsertStmts map[ChannelID]*sql.Stmt
channelUpdateMessageStmts map[ChannelID]*sql.Stmt
channelGetMessageStmts map[ChannelID]*sql.Stmt
channelGetMessageBySignatureStmts map[ChannelID]*sql.Stmt
channelGetCountStmts map[ChannelID]*sql.Stmt
channelGetMostRecentMessagesStmts map[ChannelID]*sql.Stmt
db *sql.DB
}
// ChannelID encapsulates the data necessary to reference a channel structure.
type ChannelID struct {
Conversation int
Channel int
}
const insertProfileKeySQLStmt = `insert into profile_kv(KeyType, KeyName, KeyValue) values(?,?,?);`
const selectProfileKeySQLStmt = `select KeyValue from profile_kv where KeyType=(?) and KeyName=(?);`
const insertConversationSQLStmt = `insert into conversations(Handle, Attributes, ACL, Accepted) values(?,?,?,?);`
const fetchAllConversationsSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations;`
const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where ID=(?);`
const selectConversationByHandleSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);`
const acceptedConversationSQLStmt = `update conversations set Accepted=true where ID=(?);`
const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where ID=(?) ;`
const setConversationACLSQLStmt = `update conversations set ACL=(?) where ID=(?) ;`
const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);`
// insertMessageIntoConversationSQLStmt is a template for creating conversation based tables...
const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?);`
// updateMessageIntoConversationSQLStmt is a template for updating attributes of a message in a conversation
const updateMessageIntoConversationSQLStmt = `update channel_%d_%d_chat set Attributes=(?) where ID=(?);`
// getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation
const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);`
// getMessageBySignatureFromConversationSQLStmt is a template for creating conversation based tables...
const getMessageBySignatureFromConversationSQLStmt = `select ID from channel_%d_%d_chat where Signature=(?);`
// getMessageByContentHashFromConversationSQLStmt is a template for creating conversation based tables...
const getMessageByContentHashFromConversationSQLStmt = `select ID from channel_%d_%d_chat where ContentHash=(?);`
// getMessageCountFromConversationSqlStmt
const getMessageCountFromConversationSqlStmt = `select count(*) from channel_%d_%d_chat;`
// getMostRecentMessagesFromSqlStmt
const getMostRecentMessagesSqlStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);`
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements
func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
if db == nil {
return nil, errors.New("cannot construct cwtch profile storage with a nil database")
}
insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err)
return nil, err
}
selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err)
return nil, err
}
insertConversationStmt, err := db.Prepare(insertConversationSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err)
return nil, err
}
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
return nil, err
}
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
return nil, err
}
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
return nil, err
}
acceptConversationStmt, err := db.Prepare(acceptedConversationSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", acceptedConversationSQLStmt, err)
return nil, err
}
deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err)
return nil, err
}
setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err)
return nil, err
}
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
return nil, err
}
return &CwtchProfileStorage{db: db,
insertProfileKeyValueStmt: insertProfileKeyValueStmt,
selectProfileKeyValueStmt: selectProfileKeyStmt,
fetchAllConversationsStmt: fetchAllConversationsStmt,
insertConversationStmt: insertConversationStmt,
selectConversationStmt: selectConversationStmt,
selectConversationByHandleStmt: selectConversationByHandleStmt,
acceptConversationStmt: acceptConversationStmt,
deleteConversationStmt: deleteConversationStmt,
setConversationAttributesStmt: setConversationAttributesStmt,
setConversationACLStmt: setConversationACLStmt,
channelInsertStmts: map[ChannelID]*sql.Stmt{},
channelUpdateMessageStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageBySignatureStmts: map[ChannelID]*sql.Stmt{},
channelGetMostRecentMessagesStmts: map[ChannelID]*sql.Stmt{},
channelGetCountStmts: map[ChannelID]*sql.Stmt{}},
nil
}
// StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine
func (cps *CwtchProfileStorage) StoreProfileKeyValue(keyType StorageKeyType, key string, value []byte) error {
_, err := cps.insertProfileKeyValueStmt.Exec(keyType, key, value)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
}
return nil
}
// LoadProfileKeyValue allows fetching of typed values via a known Key from the Storage Engine
func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key string) ([]byte, error) {
rows, err := cps.selectProfileKeyValueStmt.Query(keyType, key)
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
result := rows.Next()
if !result {
return nil, errors.New("no result found")
}
var keyValue []byte
err = rows.Scan(&keyValue)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return nil, err
}
rows.Close()
return keyValue, nil
}
// NewConversation stores a new conversation in the data store
func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) (int, error) {
tx, err := cps.db.Begin()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, err
}
result, err := tx.Stmt(cps.insertConversationStmt).Exec(handle, attributes.Serialize(), acl.Serialize(), accepted)
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
}
id, err := result.LastInsertId()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
}
result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
}
conversationID, err := result.LastInsertId()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
}
err = tx.Commit()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
}
return int(conversationID), nil
}
// GetConversationByHandle is a convienance method to fetch an active conversation by a handle
// Usage Notes: This should **only** be used to look up p2p conversations by convention.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently
// unavoidable in some circumstances because the event bus references conversations by handle, not by id)
func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.Conversation, error) {
rows, err := cps.selectConversationByHandleStmt.Query(handle)
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
result := rows.Next()
if !result {
return nil, errors.New("no result found")
}
var id int
var acl []byte
var attributes []byte
var accepted bool
err = rows.Scan(&id, &handle, &attributes, &acl, &accepted)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return nil, err
}
rows.Close()
return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
}
// FetchConversations returns *all* active conversations. This method should only be called
// on app start up to build a summary of conversations for the UI. Any further updates should be integrated
// through the event bus.
func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, error) {
rows, err := cps.fetchAllConversationsStmt.Query()
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
var conversations []*model.Conversation
defer rows.Close()
for {
result := rows.Next()
if !result {
return conversations, nil
}
var id int
var handle string
var acl []byte
var attributes []byte
var accepted bool
err = rows.Scan(&id, &handle, &attributes, &acl, &accepted)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return nil, err
}
conversations = append(conversations, &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted})
}
}
// GetConversation looks up a particular conversation by handle
func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, error) {
rows, err := cps.selectConversationStmt.Query(id)
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
result := rows.Next()
if !result {
return nil, errors.New("no result found")
}
var handle string
var acl []byte
var attributes []byte
var accepted bool
err = rows.Scan(&id, &handle, &attributes, &acl, &accepted)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return nil, err
}
rows.Close()
return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
}
// AcceptConversation sets the accepted status of a conversation to true in the backing datastore
func (cps *CwtchProfileStorage) AcceptConversation(id int) error {
_, err := cps.acceptConversationStmt.Exec(id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
}
return nil
}
// DeleteConversation purges the conversation and any associated message history from the conversation store.
func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
_, err := cps.deleteConversationStmt.Exec(id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
}
return nil
}
// SetConversationACL sets a new ACL on a given conversation.
func (cps *CwtchProfileStorage) SetConversationACL(id int, acl model.AccessControlList) error {
_, err := cps.setConversationACLStmt.Exec(acl, id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
}
return nil
}
// SetConversationAttribute sets a new attribute on a given conversation.
func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
ci, err := cps.GetConversation(id)
if err != nil {
return err
}
ci.Attributes[path.ToString()] = value
_, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
}
return nil
}
// InsertMessage appends a message to a conversation channel, with a given set of attributes
func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes, signature string, contentHash string) error {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelInsertStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(insertMessageIntoConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return err
}
cps.channelInsertStmts[channelID] = conversationStmt
}
_, err := cps.channelInsertStmts[channelID].Exec(body, attributes.Serialize(), signature, contentHash)
if err != nil {
log.Errorf("error inserting message: %v %v", signature, err)
return err
}
log.Infof("inserted message with signature: %v", signature)
return nil
}
// UpdateMessageAttributes updates the attributes associated with a message of a given conversation
func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channel int, messageID int, attributes model.Attributes) error {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelUpdateMessageStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(updateMessageIntoConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return err
}
cps.channelUpdateMessageStmts[channelID] = conversationStmt
}
_, err := cps.channelUpdateMessageStmts[channelID].Exec(attributes.Serialize(), messageID)
if err != nil {
log.Errorf("error updating message: %v", err)
return err
}
return nil
}
// GetChannelMessageBySignature looks up a conversation message by signature instead of identifier. Both are unique but
// signatures are common between conversation participants (in groups) and so are a more useful message to index.
func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, channel int, signature string) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetMessageBySignatureStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageBySignatureFromConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, err
}
cps.channelGetMessageBySignatureStmts[channelID] = conversationStmt
}
rows, err := cps.channelGetMessageBySignatureStmts[channelID].Query(signature)
if err != nil {
log.Errorf("error executing query: %v", err)
return -1, err
}
result := rows.Next()
if !result {
return -1, errors.New("no result found")
}
var id int
err = rows.Scan(&id)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return -1, err
}
rows.Close()
return id, nil
}
// GetChannelMessage looks up a channel message by conversation, channel and message id. On success it
// returns the message body and the attributes associated with the message. Otherwise an error is returned.
func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetMessageStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageFromConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return "", nil, err
}
cps.channelGetMessageStmts[channelID] = conversationStmt
}
rows, err := cps.channelGetMessageStmts[channelID].Query(messageID)
if err != nil {
log.Errorf("error executing query: %v", err)
return "", nil, err
}
result := rows.Next()
if !result {
return "", nil, errors.New("no result found")
}
// Deserialize the Row
var body string
var attributes []byte
err = rows.Scan(&body, &attributes)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return "", nil, err
}
rows.Close()
return body, model.DeserializeAttributes(attributes), nil
}
// GetChannelMessageCount returns the number of messages in a channel
func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel int) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetCountStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageCountFromConversationSqlStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, err
}
cps.channelGetCountStmts[channelID] = conversationStmt
}
var count int
err := cps.channelGetCountStmts[channelID].QueryRow().Scan(&count)
if err != nil {
log.Errorf("error executing query: %v", err)
return -1, err
}
return count, nil
}
// GetChannelMessageCount returns the number of messages in a channel
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetMostRecentMessagesStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMostRecentMessagesSqlStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return nil, err
}
cps.channelGetMostRecentMessagesStmts[channelID] = conversationStmt
}
rows, err := cps.channelGetMostRecentMessagesStmts[channelID].Query(limit, offset)
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
var conversationMessages []model.ConversationMessage
defer rows.Close()
for {
result := rows.Next()
if !result {
return conversationMessages, nil
}
var id int
var body string
var attributes []byte
var sig string
var contenthash string
err = rows.Scan(&id, &body, &attributes, &sig, &contenthash)
if err != nil {
return conversationMessages, err
}
conversationMessages = append(conversationMessages, model.ConversationMessage{ID: id, Body: body, Attr: model.DeserializeAttributes(attributes), Signature: sig, ContentHash: contenthash})
}
}
// Close closes the underlying database and prepared statements
func (cps *CwtchProfileStorage) Close() {
if cps.db != nil {
cps.insertProfileKeyValueStmt.Close()
cps.selectProfileKeyValueStmt.Close()
cps.db.Close()
}
}