@ -1,21 +1,12 @@
package storage
import (
"bufio"
"cwtch.im/cwtch/protocol/groups"
" cwtch.im/cwtch/server/metrics "
"encoding/ json "
" database/sql "
"encoding/ base64 "
"fmt"
"git.openprivacy.ca/openprivacy/log"
"os"
"path"
"sync"
)
const (
fileStorePartitions = 10
fileStoreFilename = "cwtch.messages"
directory = "messages"
_ "github.com/mattn/go-sqlite3" // sqlite3 driver
)
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
@ -24,129 +15,82 @@ type MessageStoreInterface interface {
FetchMessages ( ) [ ] * groups . EncryptedGroupMessage
}
// MessageStore is a file-backed implementation of MessageStoreInterface
type MessageStore struct {
activeLogFile * os . File
filePos int
storeDirectory string
lock sync . Mutex
messages [ ] * groups . EncryptedGroupMessage
messageCounter metrics . Counter
maxBufferLines int
bufferPos int
bufferRotated bool
// SqliteMessageStore is an sqlite3 backed message store
type SqliteMessageStore struct {
database * sql . DB
}
// Close closes the message store and underlying resources.
func ( ms * MessageStore ) Close ( ) {
ms . lock . Lock ( )
ms . messages = nil
ms . activeLogFile . Close ( )
ms . lock . Unlock ( )
// Close closes the underlying sqlite3 database to further changes
func ( s * SqliteMessageStore ) Close ( ) {
s . database . Close ( )
}
func ( ms * MessageStore ) updateBuffer ( gm * groups . EncryptedGroupMessage ) {
ms . messages [ ms . bufferPos ] = gm
ms. bufferPos ++
if ms. bufferPos == ms . maxBufferLines {
ms. bufferPos = 0
ms . bufferRotated = true
// AddMessage implements the MessageStoreInterface AddMessage for sqlite message store
func ( s * SqliteMessageStore ) AddMessage ( message groups . EncryptedGroupMessage ) {
tx, err := s . database . Begin ( )
if err != nil {
log. Errorf ( "%q" , err )
return
}
}
func ( ms * MessageStore ) initAndLoadFiles ( ) error {
ms . activeLogFile = nil
for i := fileStorePartitions - 1 ; i >= 0 ; i -- {
ms . filePos = 0
filename := path . Join ( ms . storeDirectory , fmt . Sprintf ( "%s.%d" , fileStoreFilename , i ) )
f , err := os . OpenFile ( filename , os . O_CREATE | os . O_APPEND | os . O_RDWR , 0600 )
if err != nil {
log . Errorf ( "MessageStore could not open: %v: %v" , filename , err )
continue
}
ms . activeLogFile = f
scanner := bufio . NewScanner ( f )
for scanner . Scan ( ) {
gms := scanner . Text ( )
ms . filePos ++
gm := & groups . EncryptedGroupMessage { }
err := json . Unmarshal ( [ ] byte ( gms ) , gm )
if err == nil {
ms . updateBuffer ( gm )
}
}
sqlStmt := ` INSERT INTO messages(signature, ciphertext) values (?,?); `
stmt , err := s . database . Prepare ( sqlStmt )
if err != nil {
log . Errorf ( "%q: %s" , err , sqlStmt )
return
}
if ms . activeLogFile == nil {
return fmt . Errorf ( "Could not create log file to write to in %s" , ms . storeDirectory )
defer stmt . Close ( )
_ , err = stmt . Exec ( base64 . StdEncoding . EncodeToString ( message . Signature ) , base64 . StdEncoding . EncodeToString ( message . Ciphertext ) )
if err != nil {
log . Errorf ( "%q: %s\n" , err , sqlStmt )
return
}
return nil
tx . Commit ( )
}
func ( ms * MessageStore ) updateFile ( gm * groups . EncryptedGroupMessage ) {
s , err := json . Marshal ( gm )
// FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store
func ( s SqliteMessageStore ) FetchMessages ( ) [ ] * groups . EncryptedGroupMessage {
rows , err := s . database . Query ( "SELECT id, signature,ciphertext from messages" )
if err != nil {
log . Errorf ( "Failed to unmarshal group message %v\n" , err )
log . Errorf ( "%v" , err )
return nil
}
fmt . Fprintf ( ms . activeLogFile , "%s\n" , s )
ms . filePos ++
if ms . filePos >= ms . maxBufferLines / fileStorePartitions {
ms . rotateFileStore ( )
defer rows . Close ( )
var messages [ ] * groups . EncryptedGroupMessage
for rows . Next ( ) {
var id int
var signature string
var ciphertext string
err = rows . Scan ( & id , & signature , & ciphertext )
if err != nil {
log . Errorf ( "Error fetching row %v" , err )
}
rawSignature , _ := base64 . StdEncoding . DecodeString ( signature )
rawCiphertext , _ := base64 . StdEncoding . DecodeString ( ciphertext )
messages = append ( messages , & groups . EncryptedGroupMessage {
Signature : rawSignature ,
Ciphertext : rawCiphertext ,
} )
}
return messages
}
func ( ms * MessageStore ) rotateFileStore ( ) {
ms . activeLogFile . Close ( )
os . Remove ( path . Join ( ms . storeDirectory , fmt . Sprintf ( "%s.%d" , fileStoreFilename , fileStorePartitions - 1 ) ) )
for i := fileStorePartitions - 2 ; i >= 0 ; i -- {
os . Rename ( path . Join ( ms . storeDirectory , fmt . Sprintf ( "%s.%d" , fileStoreFilename , i ) ) , path . Join ( ms . storeDirectory , fmt . Sprintf ( "%s.%d" , fileStoreFilename , i + 1 ) ) )
}
f , err := os . OpenFile ( path . Join ( ms . storeDirectory , fmt . Sprintf ( "%s.%d" , fileStoreFilename , 0 ) ) , os . O_CREATE | os . O_APPEND | os . O_RDWR , 0600 )
// InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist)
// and returns an open database
func InitializeSqliteMessageStore ( dbfile string ) ( * SqliteMessageStore , error ) {
db , err := sql . Open ( "sqlite3" , dbfile )
if err != nil {
log . Errorf ( "Could not open new message store file in: %s" , ms . storeDirectory )
log . Errorf ( "database %v cannot be created or opened %v" , dbfile , err )
return nil , fmt . Errorf ( "database %v cannot be created or opened: %v" , dbfile , err )
}
ms . filePos = 0
ms . activeLogFile = f
}
// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename
func ( ms * MessageStore ) Init ( appDirectory string , maxBufferLines int , messageCounter metrics . Counter ) error {
ms . storeDirectory = path . Join ( appDirectory , directory )
os . Mkdir ( ms . storeDirectory , 0700 )
ms . bufferPos = 0
ms . maxBufferLines = maxBufferLines
ms . messages = make ( [ ] * groups . EncryptedGroupMessage , maxBufferLines )
ms . bufferRotated = false
ms . messageCounter = messageCounter
err := ms . initAndLoadFiles ( )
return err
}
// FetchMessages returns all messages from the backing file.
func ( ms * MessageStore ) FetchMessages ( ) ( messages [ ] * groups . EncryptedGroupMessage ) {
ms . lock . Lock ( )
if ! ms . bufferRotated {
messages = make ( [ ] * groups . EncryptedGroupMessage , ms . bufferPos )
copy ( messages , ms . messages [ 0 : ms . bufferPos ] )
} else {
messages = make ( [ ] * groups . EncryptedGroupMessage , ms . maxBufferLines )
copy ( messages , ms . messages [ ms . bufferPos : ms . maxBufferLines ] )
copy ( messages [ ms . bufferPos : ] , ms . messages [ 0 : ms . bufferPos ] )
sqlStmt := ` CREATE TABLE IF NOT EXISTS messages (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, signature TEXT UNIQUE NOT NULL, ciphertext TEXT NOT NULL); `
_ , err = db . Exec ( sqlStmt )
if err != nil {
db . Close ( )
log . Errorf ( "%q: %s" , err , sqlStmt )
return nil , fmt . Errorf ( "%s: %q" , sqlStmt , err )
}
ms . lock . Unlock ( )
return
}
// AddMessage adds a GroupMessage to the store
func ( ms * MessageStore ) AddMessage ( gm groups . EncryptedGroupMessage ) {
ms . messageCounter . Add ( 1 )
ms . lock . Lock ( )
ms . updateBuffer ( & gm )
ms . updateFile ( & gm )
ms . lock . Unlock ( )
log . Infof ( "Database Initialized" )
slms := new ( SqliteMessageStore )
slms . database = db
return slms , nil
}