From 32d81046cfa8d6ea939e460706a45da9e915622d Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 7 May 2021 12:29:31 -0700 Subject: [PATCH 1/6] Replace legacy message store with sqlite3 message store --- .gitignore | 5 +- go.mod | 1 + go.sum | 8 ++ server.go | 9 +- storage/message_store.go | 186 ++++++++++++---------------------- storage/message_store_test.go | 58 ++++------- 6 files changed, 102 insertions(+), 165 deletions(-) diff --git a/.gitignore b/.gitignore index d2d532c..84752fb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,7 @@ serverConfig.json coverage.out tordir .idea/ -messages \ No newline at end of file +messages +cwtch.messages +serverMonitorReport.txt +testcwtchmessages.db \ No newline at end of file diff --git a/go.mod b/go.mod index 02a8c8e..cfbe583 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( git.openprivacy.ca/openprivacy/connectivity v1.4.3 git.openprivacy.ca/openprivacy/log v1.0.2 github.com/gtank/ristretto255 v0.1.2 + github.com/mattn/go-sqlite3 v1.14.7 github.com/struCoder/pidusage v0.1.3 golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee ) diff --git a/go.sum b/go.sum index 4d5f4db..7031215 100644 --- a/go.sum +++ b/go.sum @@ -16,16 +16,21 @@ github.com/gtank/merlin v0.1.1 h1:eQ90iG7K9pOhtereWsmyRJ6RAwcP4tHTDBHXNg+u5is= github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= github.com/gtank/ristretto255 v0.1.2 h1:JEqUCPA1NvLq5DwYtuzigd7ss8fwbYay9fi4/5uMzcc= github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA= +github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 h1:hLDRPB66XQT/8+wG9WsDpiCvZf1yKO7sz7scAjSlBa0= github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -48,9 +53,12 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e h1:FDhOuMEY4JVRztM/gsbk+IKUQ8kj74bxZrgw87eMMVc= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/server.go b/server.go index 4a6c031..11e66d9 100644 --- a/server.go +++ b/server.go @@ -3,9 +3,9 @@ package server import ( "crypto/ed25519" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/server/metrics" - "cwtch.im/cwtch/server/storage" "fmt" + "git.openprivacy.ca/cwtch.im/server/metrics" + "git.openprivacy.ca/cwtch.im/server/storage" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" tor2 "git.openprivacy.ca/cwtch.im/tapir/networks/tor" @@ -62,10 +62,9 @@ func (s *Server) Run(acn connectivity.ACN) error { log.Infof("cwtch server running on cwtch:%s\n", addressIdentity+".onion:") s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile) - ms := new(storage.MessageStore) - err := ms.Init(s.config.ConfigDir, s.config.MaxBufferLines, s.metricsPack.MessageCounter) + ms, err := storage.InitializeSqliteMessageStore("cwtch.messages") if err != nil { - return err + return fmt.Errorf("could not open database: %v", err) } // Needed because we only collect metrics on a per-session basis diff --git a/storage/message_store.go b/storage/message_store.go index 2fb1c93..66303b1 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -1,21 +1,12 @@ package storage import ( - "bufio" "cwtch.im/cwtch/protocol/groups" - "cwtch.im/cwtch/server/metrics" - "encoding/json" + "database/sql" + "encoding/base64" "fmt" "git.openprivacy.ca/openprivacy/log" - "os" - "path" - "sync" -) - -const ( - fileStorePartitions = 10 - fileStoreFilename = "cwtch.messages" - directory = "messages" + _ "github.com/mattn/go-sqlite3" // sqlite3 driver ) // MessageStoreInterface defines an interface to interact with a store of cwtch messages. @@ -24,129 +15,82 @@ type MessageStoreInterface interface { FetchMessages() []*groups.EncryptedGroupMessage } -// MessageStore is a file-backed implementation of MessageStoreInterface -type MessageStore struct { - activeLogFile *os.File - filePos int - storeDirectory string - lock sync.Mutex - messages []*groups.EncryptedGroupMessage - messageCounter metrics.Counter - maxBufferLines int - bufferPos int - bufferRotated bool +// SqliteMessageStore is an sqlite3 backed message store +type SqliteMessageStore struct { + database *sql.DB } -// Close closes the message store and underlying resources. -func (ms *MessageStore) Close() { - ms.lock.Lock() - ms.messages = nil - ms.activeLogFile.Close() - ms.lock.Unlock() +// Close closes the underlying sqlite3 database to further changes +func (s *SqliteMessageStore) Close() { + s.database.Close() } -func (ms *MessageStore) updateBuffer(gm *groups.EncryptedGroupMessage) { - ms.messages[ms.bufferPos] = gm - ms.bufferPos++ - if ms.bufferPos == ms.maxBufferLines { - ms.bufferPos = 0 - ms.bufferRotated = true +// AddMessage implements the MessageStoreInterface AddMessage for sqlite message store +func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { + tx, err := s.database.Begin() + if err != nil { + log.Errorf("%q", err) + return } + sqlStmt := `INSERT INTO messages(signature, ciphertext) values (?,?);` + stmt, err := s.database.Prepare(sqlStmt) + if err != nil { + log.Errorf("%q: %s", err, sqlStmt) + return + } + defer stmt.Close() + _, err = stmt.Exec(base64.StdEncoding.EncodeToString(message.Signature), base64.StdEncoding.EncodeToString(message.Ciphertext)) + if err != nil { + log.Errorf("%q: %s\n", err, sqlStmt) + return + } + tx.Commit() } -func (ms *MessageStore) initAndLoadFiles() error { - ms.activeLogFile = nil - for i := fileStorePartitions - 1; i >= 0; i-- { - ms.filePos = 0 - filename := path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)) - f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) +// FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store +func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { + rows, err := s.database.Query("SELECT id, signature,ciphertext from messages") + if err != nil { + log.Errorf("%v", err) + return nil + } + defer rows.Close() + var messages []*groups.EncryptedGroupMessage + for rows.Next() { + var id int + var signature string + var ciphertext string + err = rows.Scan(&id, &signature, &ciphertext) if err != nil { - log.Errorf("MessageStore could not open: %v: %v", filename, err) - continue - } - ms.activeLogFile = f - - scanner := bufio.NewScanner(f) - for scanner.Scan() { - gms := scanner.Text() - ms.filePos++ - gm := &groups.EncryptedGroupMessage{} - err := json.Unmarshal([]byte(gms), gm) - if err == nil { - ms.updateBuffer(gm) - } + log.Errorf("Error fetching row %v", err) } + rawSignature, _ := base64.StdEncoding.DecodeString(signature) + rawCiphertext, _ := base64.StdEncoding.DecodeString(ciphertext) + messages = append(messages, &groups.EncryptedGroupMessage{ + Signature: rawSignature, + Ciphertext: rawCiphertext, + }) } - if ms.activeLogFile == nil { - return fmt.Errorf("Could not create log file to write to in %s", ms.storeDirectory) - } - return nil + return messages } -func (ms *MessageStore) updateFile(gm *groups.EncryptedGroupMessage) { - s, err := json.Marshal(gm) +// InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist) +// and returns an open database +func InitializeSqliteMessageStore(dbfile string) (*SqliteMessageStore, error) { + db, err := sql.Open("sqlite3", dbfile) if err != nil { - log.Errorf("Failed to unmarshal group message %v\n", err) + log.Errorf("database %v cannot be created or opened %v", dbfile, err) + return nil, fmt.Errorf("database %v cannot be created or opened: %v", dbfile, err) } - fmt.Fprintf(ms.activeLogFile, "%s\n", s) - ms.filePos++ - if ms.filePos >= ms.maxBufferLines/fileStorePartitions { - ms.rotateFileStore() - } -} - -func (ms *MessageStore) rotateFileStore() { - ms.activeLogFile.Close() - os.Remove(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, fileStorePartitions-1))) - - for i := fileStorePartitions - 2; i >= 0; i-- { - os.Rename(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)), path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i+1))) - } - - f, err := os.OpenFile(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, 0)), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) + sqlStmt := `CREATE TABLE IF NOT EXISTS messages (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, signature TEXT UNIQUE NOT NULL, ciphertext TEXT NOT NULL);` + _, err = db.Exec(sqlStmt) if err != nil { - log.Errorf("Could not open new message store file in: %s", ms.storeDirectory) + db.Close() + log.Errorf("%q: %s", err, sqlStmt) + return nil, fmt.Errorf("%s: %q", sqlStmt, err) } - ms.filePos = 0 - ms.activeLogFile = f -} - -// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename -func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error { - ms.storeDirectory = path.Join(appDirectory, directory) - os.Mkdir(ms.storeDirectory, 0700) - - ms.bufferPos = 0 - ms.maxBufferLines = maxBufferLines - ms.messages = make([]*groups.EncryptedGroupMessage, maxBufferLines) - ms.bufferRotated = false - ms.messageCounter = messageCounter - - err := ms.initAndLoadFiles() - return err -} - -// FetchMessages returns all messages from the backing file. -func (ms *MessageStore) FetchMessages() (messages []*groups.EncryptedGroupMessage) { - ms.lock.Lock() - if !ms.bufferRotated { - messages = make([]*groups.EncryptedGroupMessage, ms.bufferPos) - copy(messages, ms.messages[0:ms.bufferPos]) - } else { - messages = make([]*groups.EncryptedGroupMessage, ms.maxBufferLines) - copy(messages, ms.messages[ms.bufferPos:ms.maxBufferLines]) - copy(messages[ms.bufferPos:], ms.messages[0:ms.bufferPos]) - } - ms.lock.Unlock() - return -} - -// AddMessage adds a GroupMessage to the store -func (ms *MessageStore) AddMessage(gm groups.EncryptedGroupMessage) { - ms.messageCounter.Add(1) - ms.lock.Lock() - ms.updateBuffer(&gm) - ms.updateFile(&gm) - - ms.lock.Unlock() + log.Infof("Database Initialized") + slms := new(SqliteMessageStore) + slms.database = db + return slms, nil } diff --git a/storage/message_store_test.go b/storage/message_store_test.go index d6ce82e..dd5bd9a 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -2,53 +2,35 @@ package storage import ( "cwtch.im/cwtch/protocol/groups" - "cwtch.im/cwtch/server/metrics" + "git.openprivacy.ca/openprivacy/log" "os" - "strconv" "testing" ) func TestMessageStore(t *testing.T) { - os.Remove("ms.test") - ms := new(MessageStore) - counter := metrics.NewCounter() - ms.Init("./", 1000, counter) - for i := 0; i < 499; i++ { - gm := groups.EncryptedGroupMessage{ - Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), - } - ms.AddMessage(gm) - } - if counter.Count() != 499 { - t.Errorf("Counter should be at 499 was %v", counter.Count()) - } - ms.Close() - ms.Init("./", 1000, counter) - m := ms.FetchMessages() - if len(m) != 499 { - t.Errorf("Should have been 499 was %v", len(m)) + os.Remove("../testcwtchmessages.db") + log.SetLevel(log.LevelDebug) + db, err := InitializeSqliteMessageStore("../testcwtchmessages.db") + if err != nil { + t.Fatalf("Error: %v", err) } - counter.Reset() + db.AddMessage(groups.EncryptedGroupMessage{ + Signature: []byte("Hello world 2"), + Ciphertext: []byte("Hello world"), + }) - for i := 0; i < 1000; i++ { - gm := groups.EncryptedGroupMessage{ - Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), - } - ms.AddMessage(gm) - } + db.AddMessage(groups.EncryptedGroupMessage{ + Signature: []byte("Hello world 1"), + Ciphertext: []byte("Hello world"), + }) - m = ms.FetchMessages() - if len(m) != 1000 { - t.Errorf("Should have been 1000 was %v", len(m)) + messages := db.FetchMessages() + for _, message := range messages { + t.Logf("Message: %v", message) } - ms.Close() - ms.Init("./", 1000, counter) - m = ms.FetchMessages() - if len(m) != 999 { - t.Errorf("Should have been 999 was %v", len(m)) + if len(messages) != 2 { + t.Fatalf("Incorrect number of messages returned") } - ms.Close() - - os.RemoveAll("./messages") + db.Close() } From cc4d5ee4285ace839ab56997bde064c8efb193eb Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 7 May 2021 13:43:15 -0700 Subject: [PATCH 2/6] Add FetchMessagesFrom --- server_tokenboard.go | 18 +++++++----- storage/message_store.go | 55 ++++++++++++++++++++++++++++------- storage/message_store_test.go | 39 +++++++++++++++++++------ 3 files changed, 85 insertions(+), 27 deletions(-) diff --git a/server_tokenboard.go b/server_tokenboard.go index 1b14272..f30eb4b 100644 --- a/server_tokenboard.go +++ b/server_tokenboard.go @@ -2,8 +2,8 @@ package server import ( "cwtch.im/cwtch/protocol/groups" - "cwtch.im/cwtch/server/storage" "encoding/json" + "git.openprivacy.ca/cwtch.im/server/storage" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" @@ -76,23 +76,25 @@ func (ta *TokenboardServer) Listen() { case groups.ReplayRequestMessage: if message.ReplayRequest != nil { log.Debugf("Received Replay Request %v", message.ReplayRequest) - messages := ta.LegacyMessageStore.FetchMessages() + messages := ta.LegacyMessageStore.FetchMessagesFrom(message.ReplayRequest.LastCommit) response, _ := json.Marshal(groups.Message{MessageType: groups.ReplayResultMessage, ReplayResult: &groups.ReplayResult{NumMessages: len(messages)}}) log.Debugf("Sending Replay Response %v", groups.ReplayResult{NumMessages: len(messages)}) ta.connection.Send(response) + lastSignature := message.ReplayRequest.LastCommit for _, message := range messages { + lastSignature = message.Signature data, _ = json.Marshal(message) ta.connection.Send(data) } log.Debugf("Finished Requested Sync") // Set sync and then send any new messages that might have happened while we were syncing ta.connection.SetCapability(groups.CwtchServerSyncedCapability) - newMessages := ta.LegacyMessageStore.FetchMessages() - if len(newMessages) > len(messages) { - for _, message := range newMessages[len(messages):] { - data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: *message}}) - ta.connection.Send(data) - } + // Because we have set the sync capability any new messages that arrive after this point will just + // need to do a basic lookup from the last seen message + newMessages := ta.LegacyMessageStore.FetchMessagesFrom(lastSignature) + for _, message := range newMessages[len(messages):] { + data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: *message}}) + ta.connection.Send(data) } } else { log.Debugf("Server Closing Connection Because of Malformed ReplayRequestMessage Packet") diff --git a/storage/message_store.go b/storage/message_store.go index 66303b1..41e675f 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -13,15 +13,22 @@ import ( type MessageStoreInterface interface { AddMessage(groups.EncryptedGroupMessage) FetchMessages() []*groups.EncryptedGroupMessage + FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage } // SqliteMessageStore is an sqlite3 backed message store type SqliteMessageStore struct { database *sql.DB + + // Some prepared queries... + preparedInsertStatement *sql.Stmt // A Stmt is safe for concurrent use by multiple goroutines. + preparedFetchFromQuery *sql.Stmt } // Close closes the underlying sqlite3 database to further changes func (s *SqliteMessageStore) Close() { + s.preparedInsertStatement.Close() + s.preparedFetchFromQuery.Close() s.database.Close() } @@ -32,16 +39,9 @@ func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { log.Errorf("%q", err) return } - sqlStmt := `INSERT INTO messages(signature, ciphertext) values (?,?);` - stmt, err := s.database.Prepare(sqlStmt) + stmt, err := s.preparedInsertStatement.Exec(base64.StdEncoding.EncodeToString(message.Signature), base64.StdEncoding.EncodeToString(message.Ciphertext)) if err != nil { - log.Errorf("%q: %s", err, sqlStmt) - return - } - defer stmt.Close() - _, err = stmt.Exec(base64.StdEncoding.EncodeToString(message.Signature), base64.StdEncoding.EncodeToString(message.Ciphertext)) - if err != nil { - log.Errorf("%q: %s\n", err, sqlStmt) + log.Errorf("%v %q", stmt, err) return } tx.Commit() @@ -55,12 +55,31 @@ func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { return nil } defer rows.Close() + return s.compileRows(rows) +} + +// FetchMessagesFrom implements the MessageStoreInterface FetchMessagesFrom for sqlite message store +func (s SqliteMessageStore) FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage { + if signature == nil { + return s.FetchMessages() + } + + rows, err := s.preparedFetchFromQuery.Query(base64.StdEncoding.EncodeToString(signature)) + if err != nil { + log.Errorf("%v", err) + return nil + } + defer rows.Close() + return s.compileRows(rows) +} + +func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGroupMessage { var messages []*groups.EncryptedGroupMessage for rows.Next() { var id int var signature string var ciphertext string - err = rows.Scan(&id, &signature, &ciphertext) + err := rows.Scan(&id, &signature, &ciphertext) if err != nil { log.Errorf("Error fetching row %v", err) } @@ -92,5 +111,21 @@ func InitializeSqliteMessageStore(dbfile string) (*SqliteMessageStore, error) { log.Infof("Database Initialized") slms := new(SqliteMessageStore) slms.database = db + + sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);` + stmt, err := slms.database.Prepare(sqlStmt) + if err != nil { + log.Errorf("%q: %s", err, sqlStmt) + return nil, fmt.Errorf("%s: %q", sqlStmt, err) + } + slms.preparedInsertStatement = stmt + + query, err := slms.database.Prepare("SELECT id, signature,ciphertext FROM messages WHERE id>=(SELECT id FROM messages WHERE signature=(?));") + if err != nil { + log.Errorf("%v", err) + return nil, fmt.Errorf("%s: %q", sqlStmt, err) + } + slms.preparedFetchFromQuery = query + return slms, nil } diff --git a/storage/message_store_test.go b/storage/message_store_test.go index dd5bd9a..eb343a0 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -2,6 +2,7 @@ package storage import ( "cwtch.im/cwtch/protocol/groups" + "encoding/binary" "git.openprivacy.ca/openprivacy/log" "os" "testing" @@ -15,22 +16,42 @@ func TestMessageStore(t *testing.T) { t.Fatalf("Error: %v", err) } - db.AddMessage(groups.EncryptedGroupMessage{ - Signature: []byte("Hello world 2"), - Ciphertext: []byte("Hello world"), - }) + numMessages := 100 - db.AddMessage(groups.EncryptedGroupMessage{ - Signature: []byte("Hello world 1"), - Ciphertext: []byte("Hello world"), - }) + t.Logf("Populating Database") + for i := 0; i < numMessages; i++ { + buf := make([]byte, 4) + binary.PutUvarint(buf, uint64(i)) + db.AddMessage(groups.EncryptedGroupMessage{ + Signature: append([]byte("Hello world"), buf...), + Ciphertext: []byte("Hello world"), + }) + t.Logf("Inserted %v", i) + } + // Wait for inserts to complete.. messages := db.FetchMessages() for _, message := range messages { t.Logf("Message: %v", message) } - if len(messages) != 2 { + if len(messages) != numMessages { t.Fatalf("Incorrect number of messages returned") } + + t.Logf("Testing FetchMessagesFrom...") + + numToFetch := numMessages / 2 + + buf := make([]byte, 4) + binary.PutUvarint(buf, uint64(numToFetch)) + sig := append([]byte("Hello world"), buf...) + messages = db.FetchMessagesFrom(sig) + for _, message := range messages { + t.Logf("Message: %v", message) + } + if len(messages) != numToFetch { + t.Fatalf("Incorrect number of messages returned : %v", len(messages)) + } + db.Close() } From dfbcd473b39365801ee856c35723ca60027f32bf Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 7 May 2021 14:21:18 -0700 Subject: [PATCH 3/6] Testing Performance --- server_tokenboard.go | 2 +- storage/message_store.go | 6 ------ storage/message_store_test.go | 37 ++++++++++++++++++++++------------- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/server_tokenboard.go b/server_tokenboard.go index f30eb4b..cfd59f6 100644 --- a/server_tokenboard.go +++ b/server_tokenboard.go @@ -92,7 +92,7 @@ func (ta *TokenboardServer) Listen() { // Because we have set the sync capability any new messages that arrive after this point will just // need to do a basic lookup from the last seen message newMessages := ta.LegacyMessageStore.FetchMessagesFrom(lastSignature) - for _, message := range newMessages[len(messages):] { + for _, message := range newMessages { data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: *message}}) ta.connection.Send(data) } diff --git a/storage/message_store.go b/storage/message_store.go index 41e675f..99d70de 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -34,17 +34,11 @@ func (s *SqliteMessageStore) Close() { // AddMessage implements the MessageStoreInterface AddMessage for sqlite message store func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { - tx, err := s.database.Begin() - if err != nil { - log.Errorf("%q", err) - return - } stmt, err := s.preparedInsertStatement.Exec(base64.StdEncoding.EncodeToString(message.Signature), base64.StdEncoding.EncodeToString(message.Ciphertext)) if err != nil { log.Errorf("%v %q", stmt, err) return } - tx.Commit() } // FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store diff --git a/storage/message_store_test.go b/storage/message_store_test.go index eb343a0..0646753 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -6,35 +6,44 @@ import ( "git.openprivacy.ca/openprivacy/log" "os" "testing" + "time" ) func TestMessageStore(t *testing.T) { - os.Remove("../testcwtchmessages.db") + filename := "/home/sarah/testcwtchmessages.db" + os.Remove(filename) log.SetLevel(log.LevelDebug) - db, err := InitializeSqliteMessageStore("../testcwtchmessages.db") + db, err := InitializeSqliteMessageStore(filename) if err != nil { t.Fatalf("Error: %v", err) } numMessages := 100 - t.Logf("Populating Database") + t.Logf("Generating Data...") + var messages []groups.EncryptedGroupMessage for i := 0; i < numMessages; i++ { buf := make([]byte, 4) binary.PutUvarint(buf, uint64(i)) - db.AddMessage(groups.EncryptedGroupMessage{ + messages = append(messages, groups.EncryptedGroupMessage{ Signature: append([]byte("Hello world"), buf...), Ciphertext: []byte("Hello world"), }) - t.Logf("Inserted %v", i) } - // Wait for inserts to complete.. - messages := db.FetchMessages() + t.Logf("Populating Database") + start := time.Now() for _, message := range messages { - t.Logf("Message: %v", message) + db.AddMessage(message) } - if len(messages) != numMessages { + t.Logf("Time to Insert: %v", time.Since(start)) + + // Wait for inserts to complete.. + fetchedMessages := db.FetchMessages() + //for _, message := range fetchedMessages { + //t.Logf("Message: %v", message) + //} + if len(fetchedMessages) != numMessages { t.Fatalf("Incorrect number of messages returned") } @@ -45,11 +54,11 @@ func TestMessageStore(t *testing.T) { buf := make([]byte, 4) binary.PutUvarint(buf, uint64(numToFetch)) sig := append([]byte("Hello world"), buf...) - messages = db.FetchMessagesFrom(sig) - for _, message := range messages { - t.Logf("Message: %v", message) - } - if len(messages) != numToFetch { + fetchedMessages = db.FetchMessagesFrom(sig) + //for _, message := range fetchedMessages { + // t.Logf("Message: %v", message) + //} + if len(fetchedMessages) != numToFetch { t.Fatalf("Incorrect number of messages returned : %v", len(messages)) } From e1f0b43eb6411cb08cc1f50ae2b389d6a5a5c949 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 7 May 2021 14:33:45 -0700 Subject: [PATCH 4/6] Update DB Name --- storage/message_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/message_store_test.go b/storage/message_store_test.go index 0646753..6d9c9a4 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -10,7 +10,7 @@ import ( ) func TestMessageStore(t *testing.T) { - filename := "/home/sarah/testcwtchmessages.db" + filename := "../testcwtchmessages.db" os.Remove(filename) log.SetLevel(log.LevelDebug) db, err := InitializeSqliteMessageStore(filename) From 6c57555b81adf880c0730161814554a44ab0e6e7 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 7 May 2021 15:58:15 -0700 Subject: [PATCH 5/6] Fix Sync Bug --- storage/message_store.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/storage/message_store.go b/storage/message_store.go index 99d70de..75de602 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -54,7 +54,9 @@ func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { // FetchMessagesFrom implements the MessageStoreInterface FetchMessagesFrom for sqlite message store func (s SqliteMessageStore) FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage { - if signature == nil { + + // If signature is empty then treat this as a complete sync request + if len(signature) == 0 || signature == nil { return s.FetchMessages() } From 4687ea8d79081dcd4b796c33eb585b8247c1d1c0 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 7 May 2021 16:33:07 -0700 Subject: [PATCH 6/6] Treat no returned messages on FetchMessagesFrom as a compelte sync --- storage/message_store.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/storage/message_store.go b/storage/message_store.go index 75de602..b53ea09 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -56,7 +56,7 @@ func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { func (s SqliteMessageStore) FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage { // If signature is empty then treat this as a complete sync request - if len(signature) == 0 || signature == nil { + if signature == nil || len(signature) == 0 { return s.FetchMessages() } @@ -86,6 +86,13 @@ func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGrou Ciphertext: rawCiphertext, }) } + + // if we don't have *any* messages then either the signature next existed + // or the server purged it...either way treat this as a full sync... + if len(messages) < 1 { + return s.FetchMessages() + } + return messages }