|
|
@ -6,6 +6,7 @@ import ( |
|
|
|
"encoding/base64" |
|
|
|
"fmt" |
|
|
|
"git.openprivacy.ca/openprivacy/log" |
|
|
|
"sync" |
|
|
|
) |
|
|
|
|
|
|
|
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
|
|
|
@ -14,17 +15,26 @@ type MessageStoreInterface interface { |
|
|
|
FetchMessages() []*groups.EncryptedGroupMessage |
|
|
|
MessagesCount() int |
|
|
|
FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage |
|
|
|
SetMessageCap(newcap int) |
|
|
|
Close() |
|
|
|
} |
|
|
|
|
|
|
|
// SqliteMessageStore is an sqlite3 backed message store
|
|
|
|
type SqliteMessageStore struct { |
|
|
|
incMessageCounterFn func() |
|
|
|
database *sql.DB |
|
|
|
messageCap int |
|
|
|
|
|
|
|
messageCount int |
|
|
|
countLock sync.Mutex |
|
|
|
|
|
|
|
database *sql.DB |
|
|
|
|
|
|
|
// Some prepared queries...
|
|
|
|
preparedInsertStatement *sql.Stmt // A Stmt is safe for concurrent use by multiple goroutines.
|
|
|
|
preparedFetchFromQuery *sql.Stmt |
|
|
|
preparedFetchQuery *sql.Stmt |
|
|
|
preparedCountQuery *sql.Stmt |
|
|
|
preparedPruneStatement *sql.Stmt |
|
|
|
} |
|
|
|
|
|
|
|
// Close closes the underlying sqlite3 database to further changes
|
|
|
@ -34,6 +44,13 @@ func (s *SqliteMessageStore) Close() { |
|
|
|
s.database.Close() |
|
|
|
} |
|
|
|
|
|
|
|
func (s *SqliteMessageStore) SetMessageCap(newcap int) { |
|
|
|
s.countLock.Lock() |
|
|
|
defer s.countLock.Unlock() |
|
|
|
s.messageCap = newcap |
|
|
|
s.checkPruneMessages() |
|
|
|
} |
|
|
|
|
|
|
|
// AddMessage implements the MessageStoreInterface AddMessage for sqlite message store
|
|
|
|
func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { |
|
|
|
if s.incMessageCounterFn != nil { |
|
|
@ -49,10 +66,29 @@ func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { |
|
|
|
log.Errorf("%v %q", stmt, err) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
s.countLock.Lock() |
|
|
|
defer s.countLock.Unlock() |
|
|
|
s.messageCount++ |
|
|
|
s.checkPruneMessages() |
|
|
|
} |
|
|
|
|
|
|
|
func (s SqliteMessageStore) MessagesCount() int { |
|
|
|
rows, err := s.database.Query("SELECT COUNT(*) from messages") |
|
|
|
func (s *SqliteMessageStore) checkPruneMessages() { |
|
|
|
if s.messageCap != -1 && s.messageCount > s.messageCap { |
|
|
|
log.Debugf("Message Count: %d / Message Cap: %d, message cap exceeded, pruning oldest 10%...", s.messageCount, s.messageCap) |
|
|
|
// Delete 10% of messages
|
|
|
|
delCount := s.messageCap / 10 |
|
|
|
stmt, err := s.preparedPruneStatement.Exec(s.messageCap / 10) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("%v %q", stmt, err) |
|
|
|
} |
|
|
|
s.messageCount -= delCount |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (s *SqliteMessageStore) MessagesCount() int { |
|
|
|
rows, err := s.preparedCountQuery.Query() |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
log.Errorf("%v", err) |
|
|
|
return -1 |
|
|
@ -75,8 +111,8 @@ func (s SqliteMessageStore) MessagesCount() int { |
|
|
|
} |
|
|
|
|
|
|
|
// FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store
|
|
|
|
func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { |
|
|
|
rows, err := s.database.Query("SELECT id, signature,ciphertext from messages") |
|
|
|
func (s *SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { |
|
|
|
rows, err := s.preparedFetchQuery.Query() |
|
|
|
if err != nil { |
|
|
|
log.Errorf("%v", err) |
|
|
|
return nil |
|
|
@ -86,7 +122,7 @@ func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { |
|
|
|
} |
|
|
|
|
|
|
|
// FetchMessagesFrom implements the MessageStoreInterface FetchMessagesFrom for sqlite message store
|
|
|
|
func (s SqliteMessageStore) FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage { |
|
|
|
func (s *SqliteMessageStore) FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage { |
|
|
|
|
|
|
|
// If signature is empty then treat this as a complete sync request
|
|
|
|
if len(signature) == 0 { |
|
|
@ -132,7 +168,7 @@ func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGrou |
|
|
|
|
|
|
|
// InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist)
|
|
|
|
// and returns an open database
|
|
|
|
func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*SqliteMessageStore, error) { |
|
|
|
func InitializeSqliteMessageStore(dbfile string, messageCap int, incMessageCounterFn func()) (*SqliteMessageStore, error) { |
|
|
|
db, err := sql.Open("sqlite3", dbfile) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("database %v cannot be created or opened %v", dbfile, err) |
|
|
@ -149,6 +185,7 @@ func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*S |
|
|
|
slms := new(SqliteMessageStore) |
|
|
|
slms.database = db |
|
|
|
slms.incMessageCounterFn = incMessageCounterFn |
|
|
|
slms.messageCap = messageCap |
|
|
|
|
|
|
|
sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);` |
|
|
|
stmt, err := slms.database.Prepare(sqlStmt) |
|
|
@ -158,12 +195,39 @@ func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*S |
|
|
|
} |
|
|
|
slms.preparedInsertStatement = stmt |
|
|
|
|
|
|
|
query, err := slms.database.Prepare("SELECT id, signature,ciphertext FROM messages WHERE id>=(SELECT id FROM messages WHERE signature=(?));") |
|
|
|
sqlStmt = "SELECT id, signature,ciphertext from messages" |
|
|
|
query, err := slms.database.Prepare(sqlStmt) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("%v", err) |
|
|
|
log.Errorf("%q: %s", err, sqlStmt) |
|
|
|
return nil, fmt.Errorf("%s: %q", sqlStmt, err) |
|
|
|
} |
|
|
|
slms.preparedFetchQuery = query |
|
|
|
|
|
|
|
sqlStmt = "SELECT id, signature,ciphertext FROM messages WHERE id>=(SELECT id FROM messages WHERE signature=(?));" |
|
|
|
query, err = slms.database.Prepare(sqlStmt) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("%q: %s", err, sqlStmt) |
|
|
|
return nil, fmt.Errorf("%s: %q", sqlStmt, err) |
|
|
|
} |
|
|
|
slms.preparedFetchFromQuery = query |
|
|
|
|
|
|
|
sqlStmt = "SELECT COUNT(*) from messages" |
|
|
|
stmt, err = slms.database.Prepare(sqlStmt) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("%q: %s", err, sqlStmt) |
|
|
|
return nil, fmt.Errorf("%s: %q", sqlStmt, err) |
|
|
|
} |
|
|
|
slms.preparedCountQuery = stmt |
|
|
|
|
|
|
|
sqlStmt = "DELETE FROM messages WHERE id IN (SELECT id FROM messages ORDER BY id ASC LIMIT (?))" |
|
|
|
stmt, err = slms.database.Prepare(sqlStmt) |
|
|
|
if err != nil { |
|
|
|
log.Errorf("%q: %s", err, sqlStmt) |
|
|
|
return nil, fmt.Errorf("%s: %q", sqlStmt, err) |
|
|
|
} |
|
|
|
slms.preparedPruneStatement = stmt |
|
|
|
|
|
|
|
slms.messageCount = slms.MessagesCount() |
|
|
|
|
|
|
|
return slms, nil |
|
|
|
} |
|
|
|