From b144bc34cc636c81ab927b870a19907d60fa749e Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Mon, 25 Apr 2022 17:31:46 -0700 Subject: [PATCH 1/3] add config and storage support for a message cap --- server.go | 17 ++++++-- serverConfig.go | 31 +++++++++++++ storage/message_store.go | 82 +++++++++++++++++++++++++++++++---- storage/message_store_test.go | 2 +- 4 files changed, 119 insertions(+), 13 deletions(-) diff --git a/server.go b/server.go index 56e7247..854b2b8 100644 --- a/server.go +++ b/server.go @@ -113,7 +113,7 @@ func (s *server) Run(acn connectivity.ACN) error { } var err error - s.messageStore, err = storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.incMessageCount) + s.messageStore, err = storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.config.GetMaxMessages(), s.incMessageCount) if err != nil { return fmt.Errorf("could not open database: %v", err) } @@ -160,7 +160,7 @@ func (s *server) CheckStatus() (bool, error) { } // Stop turns off the server so it cannot receive connections and frees most resourses. -// The server is still in a reRunable state and tokenServer still has an active persistance +// The server is still in a reRunable state and tokenServer still has an active persistence func (s *server) Stop() { log.Infof("Shutting down server") s.lock.Lock() @@ -176,7 +176,7 @@ func (s *server) Stop() { } } -// Destroy frees the last of the resources the server has active (toklenServer persistance) leaving it un-re-runable and completely shutdown +// Destroy frees the last of the resources the server has active (toklenServer persistence) leaving it un-re-runable and completely shutdown func (s *server) Destroy() { s.Stop() s.lock.Lock() @@ -246,6 +246,17 @@ func (s *server) SetAttribute(key, val string) { s.config.SetAttribute(key, val) } +// GetMessageCap gets a server's MaxStorageMBs value +func (s *server) GetMaxStoreageMBs() int { + return s.config.GetMaxMessageMBs() +} + +// SetMaxStoreageMBs sets a server's MaxStoreageMBs and sets MaxMessages for storage (which can trigger a prune) +func (s *server) SetMaxStoreageMBs(val int) { + s.config.SetMaxMessageMBs(val) + s.messageStore.SetMessageCap(s.config.GetMaxMessages()) +} + // SetMonitorLogging turns on or off the monitor logging suite, and logging to a file in the server dir func (s *server) SetMonitorLogging(do bool) { s.config.ServerReporting.LogMetricsToFile = do diff --git a/serverConfig.go b/serverConfig.go index 6f9b124..eaee95b 100644 --- a/serverConfig.go +++ b/serverConfig.go @@ -42,6 +42,9 @@ type Reporting struct { LogMetricsToFile bool `json:"logMetricsToFile"` } +// messages are ~4kb of storage +const MessagesPerMB = 250 + // Config is a struct for storing basic server configuration type Config struct { ConfigDir string `json:"-"` @@ -62,6 +65,10 @@ type Config struct { Attributes map[string]string `json:"attributes"` + // messages are ~4kb of storage + // -1 == infinite + MaxStorageMBs int `json:"maxStorageMBs"` + lock sync.Mutex encFileStore storage.FileStore } @@ -90,6 +97,7 @@ func initDefaultConfig(configDir, filename string, encrypted bool) *Config { LogMetricsToFile: false, } config.Attributes[AttrAutostart] = "false" + config.MaxStorageMBs = -1 k := new(ristretto255.Scalar) b := make([]byte, 64) @@ -212,3 +220,26 @@ func (config *Config) GetAttribute(key string) string { defer config.lock.Unlock() return config.Attributes[key] } + +// GetMaxMessages returns the config setting for Max messages converting from MaxMB to messages +// or -1 for infinite +func (config *Config) GetMaxMessages() int { + config.lock.Lock() + defer config.lock.Unlock() + if config.MaxStorageMBs == -1 { + return -1 + } + return config.MaxStorageMBs * MessagesPerMB +} + +func (config *Config) GetMaxMessageMBs() int { + config.lock.Lock() + defer config.lock.Unlock() + return config.MaxStorageMBs +} + +func (config *Config) SetMaxMessageMBs(newval int) { + config.lock.Lock() + defer config.lock.Unlock() + config.MaxStorageMBs = newval +} diff --git a/storage/message_store.go b/storage/message_store.go index 4f90734..d393f99 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "fmt" "git.openprivacy.ca/openprivacy/log" + "sync" ) // MessageStoreInterface defines an interface to interact with a store of cwtch messages. @@ -14,17 +15,26 @@ type MessageStoreInterface interface { FetchMessages() []*groups.EncryptedGroupMessage MessagesCount() int FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage + SetMessageCap(newcap int) Close() } // SqliteMessageStore is an sqlite3 backed message store type SqliteMessageStore struct { incMessageCounterFn func() - database *sql.DB + messageCap int + + messageCount int + countLock sync.Mutex + + database *sql.DB // Some prepared queries... preparedInsertStatement *sql.Stmt // A Stmt is safe for concurrent use by multiple goroutines. preparedFetchFromQuery *sql.Stmt + preparedFetchQuery *sql.Stmt + preparedCountQuery *sql.Stmt + preparedPruneStatement *sql.Stmt } // Close closes the underlying sqlite3 database to further changes @@ -34,6 +44,13 @@ func (s *SqliteMessageStore) Close() { s.database.Close() } +func (s *SqliteMessageStore) SetMessageCap(newcap int) { + s.countLock.Lock() + defer s.countLock.Unlock() + s.messageCap = newcap + s.checkPruneMessages() +} + // AddMessage implements the MessageStoreInterface AddMessage for sqlite message store func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { if s.incMessageCounterFn != nil { @@ -49,10 +66,29 @@ func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { log.Errorf("%v %q", stmt, err) return } + + s.countLock.Lock() + defer s.countLock.Unlock() + s.messageCount++ + s.checkPruneMessages() } -func (s SqliteMessageStore) MessagesCount() int { - rows, err := s.database.Query("SELECT COUNT(*) from messages") +func (s *SqliteMessageStore) checkPruneMessages() { + if s.messageCap != -1 && s.messageCount > s.messageCap { + log.Debugf("Message Count: %d / Message Cap: %d, message cap exceeded, pruning oldest 10%...", s.messageCount, s.messageCap) + // Delete 10% of messages + delCount := s.messageCap / 10 + stmt, err := s.preparedPruneStatement.Exec(s.messageCap / 10) + if err != nil { + log.Errorf("%v %q", stmt, err) + } + s.messageCount -= delCount + } +} + +func (s *SqliteMessageStore) MessagesCount() int { + rows, err := s.preparedCountQuery.Query() + if err != nil { log.Errorf("%v", err) return -1 @@ -75,8 +111,8 @@ func (s SqliteMessageStore) MessagesCount() int { } // FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store -func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { - rows, err := s.database.Query("SELECT id, signature,ciphertext from messages") +func (s *SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { + rows, err := s.preparedFetchQuery.Query() if err != nil { log.Errorf("%v", err) return nil @@ -86,7 +122,7 @@ func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { } // FetchMessagesFrom implements the MessageStoreInterface FetchMessagesFrom for sqlite message store -func (s SqliteMessageStore) FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage { +func (s *SqliteMessageStore) FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage { // If signature is empty then treat this as a complete sync request if len(signature) == 0 { @@ -132,7 +168,7 @@ func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGrou // InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist) // and returns an open database -func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*SqliteMessageStore, error) { +func InitializeSqliteMessageStore(dbfile string, messageCap int, incMessageCounterFn func()) (*SqliteMessageStore, error) { db, err := sql.Open("sqlite3", dbfile) if err != nil { log.Errorf("database %v cannot be created or opened %v", dbfile, err) @@ -149,6 +185,7 @@ func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*S slms := new(SqliteMessageStore) slms.database = db slms.incMessageCounterFn = incMessageCounterFn + slms.messageCap = messageCap sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);` stmt, err := slms.database.Prepare(sqlStmt) @@ -158,12 +195,39 @@ func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*S } slms.preparedInsertStatement = stmt - query, err := slms.database.Prepare("SELECT id, signature,ciphertext FROM messages WHERE id>=(SELECT id FROM messages WHERE signature=(?));") + sqlStmt = "SELECT id, signature,ciphertext from messages" + query, err := slms.database.Prepare(sqlStmt) if err != nil { - log.Errorf("%v", err) + log.Errorf("%q: %s", err, sqlStmt) + return nil, fmt.Errorf("%s: %q", sqlStmt, err) + } + slms.preparedFetchQuery = query + + sqlStmt = "SELECT id, signature,ciphertext FROM messages WHERE id>=(SELECT id FROM messages WHERE signature=(?));" + query, err = slms.database.Prepare(sqlStmt) + if err != nil { + log.Errorf("%q: %s", err, sqlStmt) return nil, fmt.Errorf("%s: %q", sqlStmt, err) } slms.preparedFetchFromQuery = query + sqlStmt = "SELECT COUNT(*) from messages" + stmt, err = slms.database.Prepare(sqlStmt) + if err != nil { + log.Errorf("%q: %s", err, sqlStmt) + return nil, fmt.Errorf("%s: %q", sqlStmt, err) + } + slms.preparedCountQuery = stmt + + sqlStmt = "DELETE FROM messages WHERE id IN (SELECT id FROM messages ORDER BY id ASC LIMIT (?))" + stmt, err = slms.database.Prepare(sqlStmt) + if err != nil { + log.Errorf("%q: %s", err, sqlStmt) + return nil, fmt.Errorf("%s: %q", sqlStmt, err) + } + slms.preparedPruneStatement = stmt + + slms.messageCount = slms.MessagesCount() + return slms, nil } diff --git a/storage/message_store_test.go b/storage/message_store_test.go index 619632b..71a1a1a 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -16,7 +16,7 @@ func TestMessageStore(t *testing.T) { os.Remove(filename) log.SetLevel(log.LevelDebug) counter := metrics.NewCounter() - db, err := InitializeSqliteMessageStore(filename, func() { counter.Add(1) }) + db, err := InitializeSqliteMessageStore(filename, -1, func() { counter.Add(1) }) if err != nil { t.Fatalf("Error: %v", err) } From a8ef9fe00131a016e89563b17a4d81803f0388c3 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Mon, 25 Apr 2022 18:34:57 -0700 Subject: [PATCH 2/3] deps version bump --- app/main.go | 2 +- go.mod | 6 +++--- go.sum | 6 ++++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/app/main.go b/app/main.go index 9a72810..123e7b3 100644 --- a/app/main.go +++ b/app/main.go @@ -80,7 +80,7 @@ func main() { os.MkdirAll("tordir/tor", 0700) tor.NewTorrc().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("./tordir/tor/torrc") - acn, err := tor.NewTorACNWithAuth("tordir", "", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + acn, err := tor.NewTorACNWithAuth("tordir", "", "tordir/tor", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) if err != nil { log.Errorf("\nError connecting to Tor: %v\n", err) diff --git a/go.mod b/go.mod index a4f9529..4daafe3 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module git.openprivacy.ca/cwtch.im/server go 1.14 require ( - cwtch.im/cwtch v0.14.9 - git.openprivacy.ca/cwtch.im/tapir v0.4.9 - git.openprivacy.ca/openprivacy/connectivity v1.5.0 + cwtch.im/cwtch v0.16.8 + git.openprivacy.ca/cwtch.im/tapir v0.5.4 + git.openprivacy.ca/openprivacy/connectivity v1.8.3 git.openprivacy.ca/openprivacy/log v1.0.3 github.com/gtank/ristretto255 v0.1.2 github.com/mattn/go-sqlite3 v1.14.7 diff --git a/go.sum b/go.sum index 66d6689..f9e599f 100644 --- a/go.sum +++ b/go.sum @@ -12,15 +12,21 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cwtch.im/cwtch v0.14.9 h1:VYXbQG6f41fCoLpLEYDAeiJSG+9Gxstl1DOk1Hv4tjM= cwtch.im/cwtch v0.14.9/go.mod h1:/fLuoYLY/7JHw6RojFojpd245CiOcU24QpWqzh9FRDI= +cwtch.im/cwtch v0.16.8 h1:/RNFVM9DnLtJ1nexvc136X65mabESJd1JVmh0cklP/k= +cwtch.im/cwtch v0.16.8/go.mod h1:Lxn88WAdGJotH3VECLL/uHIFnvf9RfX11YEeoZqmjcw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/edwards25519 v1.0.0-rc.1 h1:m0VOOB23frXZvAOK44usCgLWvtsxIoMCTBGJZlpmGfU= filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= git.openprivacy.ca/cwtch.im/tapir v0.4.9 h1:LXonlztwvI1F1++0IyomIcDH1/Bxzo+oN8YjGonNvjM= git.openprivacy.ca/cwtch.im/tapir v0.4.9/go.mod h1:p4bHo3DAO8wwimU6JAeZXbfPQ4jnoA2bV+4YvknWTNQ= +git.openprivacy.ca/cwtch.im/tapir v0.5.4 h1:CUcRVsM82Zx/pfcGqIviycavZcC50wXm67TiQ3mx6WY= +git.openprivacy.ca/cwtch.im/tapir v0.5.4/go.mod h1:VJitTBzerc+WO53c5XY30P2JD2Nx9mgxuII1FBVwW8E= git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= git.openprivacy.ca/openprivacy/connectivity v1.5.0 h1:ZxsR/ZaVKXIkD2x6FlajZn62ciNQjamrI4i/5xIpdoQ= git.openprivacy.ca/openprivacy/connectivity v1.5.0/go.mod h1:UjQiGBnWbotmBzIw59B8H6efwDadjkKzm3RPT1UaIRw= +git.openprivacy.ca/openprivacy/connectivity v1.8.3 h1:bWM8aQHqHIpobYQcLQ9OsNPoIl+H+4JFWbYGdG0nHlg= +git.openprivacy.ca/openprivacy/connectivity v1.8.3/go.mod h1:UjQiGBnWbotmBzIw59B8H6efwDadjkKzm3RPT1UaIRw= git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0= git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= From eba86577c8bd4b54d19fe3a3082ad1dd02ac3f19 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Mon, 25 Apr 2022 18:47:24 -0700 Subject: [PATCH 3/3] spelling --- server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index 854b2b8..c07408c 100644 --- a/server.go +++ b/server.go @@ -176,7 +176,7 @@ func (s *server) Stop() { } } -// Destroy frees the last of the resources the server has active (toklenServer persistence) leaving it un-re-runable and completely shutdown +// Destroy frees the last of the resources the server has active (tokenServer persistence) leaving it un-re-runable and completely shutdown func (s *server) Destroy() { s.Stop() s.lock.Lock() @@ -247,12 +247,12 @@ func (s *server) SetAttribute(key, val string) { } // GetMessageCap gets a server's MaxStorageMBs value -func (s *server) GetMaxStoreageMBs() int { +func (s *server) GetMaxStorageMBs() int { return s.config.GetMaxMessageMBs() } -// SetMaxStoreageMBs sets a server's MaxStoreageMBs and sets MaxMessages for storage (which can trigger a prune) -func (s *server) SetMaxStoreageMBs(val int) { +// SetMaxStorageMBs sets a server's MaxStorageMBs and sets MaxMessages for storage (which can trigger a prune) +func (s *server) SetMaxStorageMBs(val int) { s.config.SetMaxMessageMBs(val) s.messageStore.SetMessageCap(s.config.GetMaxMessages()) }