New Storage Refactor #404
|
@ -1193,7 +1193,11 @@ func (cp *cwtchPeer) GetChannelMessageBySignature(conversationID int, channelID
|
|||
func (cp *cwtchPeer) GetChannelMessageByContentHash(conversationID int, channelID int, contenthash string) (int, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.GetChannelMessageByContentHash(conversationID, channelID, contenthash)
|
||||
messageID, err := cp.storage.GetChannelMessageByContentHash(conversationID, channelID, contenthash)
|
||||
if err == nil {
|
||||
return cp.storage.GetRowNumberByMessageID(conversationID, channelID, messageID)
|
||||
}
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// constructGroupFromConversation returns a model.Group wrapper around a database back groups. Useful for
|
||||
|
|
|
@ -52,6 +52,7 @@ type CwtchProfileStorage struct {
|
|||
channelGetCountStmts map[ChannelID]*sql.Stmt
|
||||
channelGetMostRecentMessagesStmts map[ChannelID]*sql.Stmt
|
||||
sarah marked this conversation as resolved
|
||||
channelGetMessageByContentHashStmts map[ChannelID]*sql.Stmt
|
||||
channelRowNumberStmts map[ChannelID]*sql.Stmt
|
||||
ProfileDirectory string
|
||||
db *sql.DB
|
||||
}
|
||||
|
@ -95,6 +96,9 @@ const getMessageBySignatureFromConversationSQLStmt = `select ID from channel_%d_
|
|||
// getMessageByContentHashFromConversationSQLStmt is a template for selecting conversation messages by content hash
|
||||
const getMessageByContentHashFromConversationSQLStmt = `select ID from channel_%d_%d_chat where ContentHash=(?) order by ID desc limit 1;`
|
||||
|
||||
// getLocalIndexOfMessageIdSQLStmt is a template for fetching the offset of a message from the bottom of the database.
|
||||
const getLocalIndexOfMessageIdSQLStmt = `select count (*) from channel_%d_%d_chat where ID >= (?) order by ID desc;`
|
||||
|
||||
// getMessageCountFromConversationSQLStmt is a template for fetching the count of a messages in a conversation channel
|
||||
const getMessageCountFromConversationSQLStmt = `select count(*) from channel_%d_%d_chat;`
|
||||
|
||||
|
@ -187,7 +191,9 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
|
|||
channelGetMessageBySignatureStmts: map[ChannelID]*sql.Stmt{},
|
||||
channelGetMessageByContentHashStmts: map[ChannelID]*sql.Stmt{},
|
||||
channelGetMostRecentMessagesStmts: map[ChannelID]*sql.Stmt{},
|
||||
channelGetCountStmts: map[ChannelID]*sql.Stmt{}},
|
||||
channelGetCountStmts: map[ChannelID]*sql.Stmt{},
|
||||
channelRowNumberStmts: map[ChannelID]*sql.Stmt{},
|
||||
},
|
||||
nil
|
||||
}
|
||||
|
||||
|
@ -530,9 +536,48 @@ func (cps *CwtchProfileStorage) GetChannelMessageByContentHash(conversation int,
|
|||
return -1, err
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// GetRowNumberByMessageID looks up the row number of a message by the message ID
|
||||
func (cps *CwtchProfileStorage) GetRowNumberByMessageID(conversation int, channel int, id int) (int, error) {
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
_, exists := cps.channelRowNumberStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getLocalIndexOfMessageIdSQLStmt, conversation, channel))
|
||||
if err != nil {
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
return -1, err
|
||||
}
|
||||
cps.channelRowNumberStmts[channelID] = conversationStmt
|
||||
}
|
||||
|
||||
rows, err := cps.channelRowNumberStmts[channelID].Query(id)
|
||||
if err != nil {
|
||||
log.Errorf("error executing query: %v", err)
|
||||
return -1, err
|
||||
}
|
||||
|
||||
result := rows.Next()
|
||||
|
||||
if !result {
|
||||
return -1, errors.New("no result found")
|
||||
}
|
||||
|
||||
var rownum int
|
||||
err = rows.Scan(&rownum)
|
||||
if err != nil {
|
||||
log.Errorf("error fetching rows: %v", err)
|
||||
rows.Close()
|
||||
return -1, err
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
return rownum, nil
|
||||
}
|
||||
|
||||
// GetChannelMessage looks up a channel message by conversation, channel and message id. On success it
|
||||
// returns the message body and the attributes associated with the message. Otherwise an error is returned.
|
||||
func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) {
|
||||
|
@ -636,6 +681,16 @@ func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel
|
|||
}
|
||||
}
|
||||
|
||||
func (cps *CwtchProfileStorage) PurgeConversationChannel(conversation int, channel int) error {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation, channel))
|
||||
if err != nil {
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
return err
|
||||
}
|
||||
conversationStmt.Exec()
|
||||
return conversationStmt.Close()
|
||||
}
|
||||
|
||||
// Close closes the underlying database and prepared statements
|
||||
func (cps *CwtchProfileStorage) Close() {
|
||||
if cps.db != nil {
|
||||
|
@ -648,11 +703,7 @@ func (cps *CwtchProfileStorage) Close() {
|
|||
if conversation.Attributes[event.SaveHistoryKey] != event.SaveHistoryConfirmed {
|
||||
log.Infof("purging conversation...")
|
||||
// TODO: At some point in the future this needs to iterate over channels and make a decision for each on..
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation.ID, 0))
|
||||
if err != nil {
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
}
|
||||
conversationStmt.Exec()
|
||||
cps.PurgeConversationChannel(conversation.ID, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
statements are threadsafe but these maps are not