package peer import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "database/sql" "errors" "fmt" "git.openprivacy.ca/openprivacy/log" ) // StorageKeyType is an interface wrapper around storage key types type StorageKeyType string const ( // TypeAttribute for Profile Scoped and Zoned Attributes TypeAttribute = StorageKeyType("Attribute") // TypePrivateKey for Profile Private Keys TypePrivateKey = StorageKeyType("PrivateKey") // TypePublicKey for Profile Public Keys TypePublicKey = StorageKeyType("PublicKey") ) // CwtchProfileStorage encapsulates common datastore requests so as to not pollute the main cwtch profile // struct with database knowledge type CwtchProfileStorage struct { // Note: Statements are thread safe.. // Profile related statements insertProfileKeyValueStmt *sql.Stmt selectProfileKeyValueStmt *sql.Stmt // Conversation related statements insertConversationStmt *sql.Stmt selectConversationStmt *sql.Stmt acceptConversationStmt *sql.Stmt deleteConversationStmt *sql.Stmt setConversationAttributesStmt *sql.Stmt channelInsertStmts map[ChannelID]*sql.Stmt channelGetMessageStmts map[ChannelID]*sql.Stmt db *sql.DB } type ChannelID struct { Conversation int Channel int } const insertProfileKeySQLStmt = `insert into profile_kv(KeyType, KeyName, KeyValue) values(?,?,?);` const selectProfileKeySQLStmt = `select KeyValue from profile_kv where KeyType=(?) and KeyName=(?);` const insertConversationSQLStmt = `insert into conversations(Handle, Attributes, ACL, Accepted) values(?,?,?,?);` const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);` const acceptedConversationSQLStmt = `update conversations set Accepted=true where Handle=(?);` const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where Handle=(?) ;` const deleteConversationSQLStmt = `delete from conversations where Handle=(?);` // createTableConversationMessagesSQLStmt is a template for creating conversation based tables... const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime);` // insertMessageIntoConversationSQLStmt is a template for creating conversation based tables... const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes) values(?,?);` // getMessageFromConversationSQLStmt is a template for creating conversation based tables... const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);` // NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for // Preparing commonly used SQL Statements func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) { if db == nil { return nil, errors.New("cannot construct cwtch profile storage with a nil database") } insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err) return nil, err } selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err) return nil, err } insertConversationStmt, err := db.Prepare(insertConversationSQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err) return nil, err } selectConversationStmt, err := db.Prepare(selectConversationSQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err) return nil, err } acceptConversationStmt, err := db.Prepare(acceptedConversationSQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", acceptedConversationSQLStmt, err) return nil, err } deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err) return nil, err } setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err) return nil, err } return &CwtchProfileStorage{db: db, insertProfileKeyValueStmt: insertProfileKeyValueStmt, selectProfileKeyValueStmt: selectProfileKeyStmt, insertConversationStmt: insertConversationStmt, selectConversationStmt: selectConversationStmt, acceptConversationStmt: acceptConversationStmt, deleteConversationStmt: deleteConversationStmt, setConversationAttributesStmt: setConversationAttributesStmt, channelInsertStmts: map[ChannelID]*sql.Stmt{}, channelGetMessageStmts: map[ChannelID]*sql.Stmt{}}, nil } // StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine func (cps *CwtchProfileStorage) StoreProfileKeyValue(keyType StorageKeyType, key string, value []byte) error { _, err := cps.insertProfileKeyValueStmt.Exec(keyType, key, value) if err != nil { log.Errorf("error executing query: %v", err) return err } return nil } // LoadProfileKeyValue allows fetching of typed values via a known Key from the Storage Engine func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key string) ([]byte, error) { rows, err := cps.selectProfileKeyValueStmt.Query(keyType, key) if err != nil { log.Errorf("error executing query: %v", err) return nil, err } result := rows.Next() if !result { return nil, errors.New("no result found") } var keyValue []byte err = rows.Scan(&keyValue) if err != nil { log.Errorf("error fetching rows: %v", err) rows.Close() return nil, err } rows.Close() return keyValue, nil } // NewConversation stores a new conversation in the data store func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) error { tx, err := cps.db.Begin() if err != nil { log.Errorf("error executing transaction: %v", err) return err } result, err := tx.Stmt(cps.insertConversationStmt).Exec(handle, attributes.Serialize(), acl.Serialize(), accepted) if err != nil { log.Errorf("error executing transaction: %v", err) return tx.Rollback() } id, err := result.LastInsertId() if err != nil { log.Errorf("error executing transaction: %v", err) return tx.Rollback() } _, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id)) if err != nil { log.Errorf("error executing transaction: %v", err) return tx.Rollback() } return tx.Commit() } // GetConversation looks up a particular conversation by handle func (cps *CwtchProfileStorage) GetConversation(handle string) (*model.Conversation, error) { rows, err := cps.selectConversationStmt.Query(handle) if err != nil { log.Errorf("error executing query: %v", err) return nil, err } result := rows.Next() if !result { return nil, errors.New("no result found") } var id int var rhandle string var acl []byte var attributes []byte var accepted bool err = rows.Scan(&id, &rhandle, &attributes, &acl, &accepted) if err != nil { log.Errorf("error fetching rows: %v", err) rows.Close() return nil, err } rows.Close() return &model.Conversation{ID: id, Handle: rhandle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil } // AcceptConversation sets the accepted status of a conversation to true in the backing datastore func (cps *CwtchProfileStorage) AcceptConversation(handle string) error { _, err := cps.acceptConversationStmt.Exec(handle) if err != nil { log.Errorf("error executing query: %v", err) return err } return nil } // DeleteConversation purges the conversation and any associated message history from the conversation store. func (cps *CwtchProfileStorage) DeleteConversation(handle string) error { _, err := cps.deleteConversationStmt.Exec(handle) if err != nil { log.Errorf("error executing query: %v", err) return err } return nil } func (cps *CwtchProfileStorage) SetConversationAttribute(handle string, path attr.ScopedZonedPath, value string) error { ci, err := cps.GetConversation(handle) if err != nil { return err } ci.Attributes[path.ToString()] = value _, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), handle) if err != nil { log.Errorf("error executing query: %v", err) return err } return nil } func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes) error { channelID := ChannelID{Conversation: conversation, Channel: channel} _, exists := cps.channelInsertStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(insertMessageIntoConversationSQLStmt, conversation, channel)) if err != nil { log.Errorf("error executing transaction: %v", err) return err } cps.channelInsertStmts[channelID] = conversationStmt } _, err := cps.channelInsertStmts[channelID].Exec(body, attributes.Serialize()) if err != nil { log.Errorf("error inserting message: %v", err) return err } return nil } func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} _, exists := cps.channelGetMessageStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageFromConversationSQLStmt, conversation, channel)) if err != nil { log.Errorf("error executing transaction: %v", err) return "", nil, err } cps.channelGetMessageStmts[channelID] = conversationStmt } rows, err := cps.channelGetMessageStmts[channelID].Query(messageID) if err != nil { log.Errorf("error executing query: %v", err) return "", nil, err } result := rows.Next() if !result { return "", nil, errors.New("no result found") } // Deserialize the Row var body string var attributes []byte err = rows.Scan(&body, &attributes) if err != nil { log.Errorf("error fetching rows: %v", err) rows.Close() return "", nil, err } rows.Close() return body, model.DeserializeAttributes(attributes), nil } // Close closes the underlying database and prepared statements func (cps *CwtchProfileStorage) Close() { if cps.db != nil { cps.insertProfileKeyValueStmt.Close() cps.selectProfileKeyValueStmt.Close() cps.db.Close() } }