From 4e95427f44345c799543b66576a281cf3ce4a7ae Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Thu, 20 Sep 2018 14:01:46 -0700 Subject: [PATCH] messageStore now uses log like rotation on files --- server/ms.test | 2 - server/server.go | 5 +- server/server_instance_test.go | 4 +- storage/message_store.go | 133 ++++++++++++++++++++++----------- storage/message_store_test.go | 30 +++++--- 5 files changed, 114 insertions(+), 60 deletions(-) delete mode 100644 server/ms.test diff --git a/server/ms.test b/server/ms.test deleted file mode 100644 index afe6366..0000000 --- a/server/ms.test +++ /dev/null @@ -1,2 +0,0 @@ -{"ciphertext":"SGVsbG8gdGhpcyBpcyBhIGZhaXJseSBhdmVyYWdlIGxlbmd0aCBtZXNzYWdlIHRoYXQgd2UgYXJlIHdyaXRpbmcgaGVyZS4="} -{"ciphertext":"SGVsbG8gdGhpcyBpcyBhIGZhaXJseSBhdmVyYWdlIGxlbmd0aCBtZXNzYWdlIHRoYXQgd2UgYXJlIHdyaXRpbmcgaGVyZS4="} diff --git a/server/server.go b/server/server.go index 119b486..cdd37f0 100644 --- a/server/server.go +++ b/server/server.go @@ -34,7 +34,10 @@ func (s *Server) Run(serverConfig *Config) { af := application.ApplicationInstanceFactory{} af.Init() ms := new(storage.MessageStore) - ms.Init("cwtch.messages", s.config.MaxBufferLines, s.metricsPack.MessageCounter) + err = ms.Init(".", s.config.MaxBufferLines, s.metricsPack.MessageCounter) + if err != nil { + log.Fatal(err) + } af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler { return func() channels.Handler { cslc := new(listen.CwtchServerListenChannel) diff --git a/server/server_instance_test.go b/server/server_instance_test.go index 14afb32..9be7fa4 100644 --- a/server/server_instance_test.go +++ b/server/server_instance_test.go @@ -15,8 +15,8 @@ func TestServerInstance(t *testing.T) { ai := new(application.ApplicationInstance) ra := new(application.RicochetApplication) msi := new(storage.MessageStore) - os.Remove("ms.test") - msi.Init("ms.test", 5, metrics.NewCounter()) + os.RemoveAll("messages") + msi.Init(".", 5, metrics.NewCounter()) gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."), Spamguard: []byte{}, diff --git a/storage/message_store.go b/storage/message_store.go index 41ed151..df95e7a 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -11,6 +11,12 @@ import ( "sync" ) +const ( + fileStorePartitions = 10 + fileStoreFilename = "cwtch.messages" + directory = "messages" +) + // MessageStoreInterface defines an interface to interact with a store of cwtch messages. type MessageStoreInterface interface { AddMessage(protocol.GroupMessage) @@ -19,72 +25,116 @@ type MessageStoreInterface interface { // MessageStore is a file-backed implementation of MessageStoreInterface type MessageStore struct { - file *os.File + activeLogFile *os.File + filePos int + storeDirectory string lock sync.Mutex messages []*protocol.GroupMessage messageCounter metrics.Counter - bufferSize int - pos int - rotated bool + maxBufferLines int + bufferPos int + bufferRotated bool } // Close closes the message store and underlying resources. func (ms *MessageStore) Close() { ms.lock.Lock() ms.messages = nil - ms.file.Close() + ms.activeLogFile.Close() ms.lock.Unlock() } func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) { - ms.messages[ms.pos] = gm - ms.pos++ - if ms.pos == ms.bufferSize { - ms.pos = 0 - ms.rotated = true + ms.messages[ms.bufferPos] = gm + ms.bufferPos++ + if ms.bufferPos == ms.maxBufferLines { + ms.bufferPos = 0 + ms.bufferRotated = true } } -// Init sets up a MessageStore of size bufferSize backed by filename -func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) { - f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) - if err != nil { - panic(err) - } - ms.file = f - ms.pos = 0 - ms.bufferSize = bufferSize - ms.messages = make([]*protocol.GroupMessage, bufferSize) - ms.rotated = false - ms.messageCounter = messageCounter +func (ms *MessageStore) initAndLoadFiles() error { + ms.activeLogFile = nil + for i := fileStorePartitions - 1; i >= 0; i-- { + ms.filePos = 0 + filename := fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i) + f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) + if err != nil { + log.Printf("Error: MessageStore could not open: %v: %v", filename, err) + continue + } + ms.activeLogFile = f - scanner := bufio.NewScanner(f) - for scanner.Scan() { - gms := scanner.Text() - gm := &protocol.GroupMessage{} - err := json.Unmarshal([]byte(gms), gm) - if err == nil { - ms.updateBuffer(gm) - } else { - panic(err) + scanner := bufio.NewScanner(f) + for scanner.Scan() { + gms := scanner.Text() + ms.filePos++ + gm := &protocol.GroupMessage{} + err := json.Unmarshal([]byte(gms), gm) + if err == nil { + ms.updateBuffer(gm) + } } } + if ms.activeLogFile == nil { + return fmt.Errorf("Could not create log file to write to in %s", ms.storeDirectory) + } + return nil +} - if err := scanner.Err(); err != nil { - panic(err) +func (ms *MessageStore) updateFile(gm *protocol.GroupMessage) { + s, err := json.Marshal(gm) + if err != nil { + log.Printf("[ERROR] Failed to unmarshal group message %v\n", err) + } + fmt.Fprintf(ms.activeLogFile, "%s\n", s) + ms.filePos++ + if ms.filePos >= ms.maxBufferLines/fileStorePartitions { + ms.rotateFileStore() + } +} + +func (ms *MessageStore) rotateFileStore() { + ms.activeLogFile.Close() + os.Remove(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, fileStorePartitions-1)) + + for i := fileStorePartitions - 2; i >= 0; i-- { + os.Rename(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i), fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i+1)) } + f, err := os.OpenFile(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, 0), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) + if err != nil { + log.Printf("ERROR: Could not open new message store file in: %s", ms.storeDirectory) + } + ms.filePos = 0 + ms.activeLogFile = f +} + +// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename +func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error { + ms.storeDirectory = appDirectory + "/" + directory + "/" + os.Mkdir(ms.storeDirectory, 0700) + + ms.bufferPos = 0 + ms.maxBufferLines = maxBufferLines + ms.messages = make([]*protocol.GroupMessage, maxBufferLines) + ms.bufferRotated = false + ms.messageCounter = messageCounter + + err := ms.initAndLoadFiles() + return err } // FetchMessages returns all messages from the backing file. func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { ms.lock.Lock() - if !ms.rotated { - messages = make([]*protocol.GroupMessage, ms.pos) - copy(messages, ms.messages[0:ms.pos]) + if !ms.bufferRotated { + messages = make([]*protocol.GroupMessage, ms.bufferPos) + copy(messages, ms.messages[0:ms.bufferPos]) } else { - messages = make([]*protocol.GroupMessage, ms.bufferSize) - copy(messages, ms.messages) + messages = make([]*protocol.GroupMessage, ms.maxBufferLines) + copy(messages, ms.messages[ms.bufferPos:ms.maxBufferLines]) + copy(messages[ms.bufferPos:], ms.messages[0:ms.bufferPos]) } ms.lock.Unlock() return @@ -95,10 +145,7 @@ func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) { ms.messageCounter.Add(1) ms.lock.Lock() ms.updateBuffer(&gm) - s, err := json.Marshal(gm) - if err != nil { - log.Printf("[ERROR] Failed to unmarshal group message %v\n", err) - } - fmt.Fprintf(ms.file, "%s\n", s) + ms.updateFile(&gm) + ms.lock.Unlock() } diff --git a/storage/message_store_test.go b/storage/message_store_test.go index da10bf9..85a9dd1 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -12,27 +12,27 @@ func TestMessageStore(t *testing.T) { os.Remove("ms.test") ms := new(MessageStore) counter := metrics.NewCounter() - ms.Init("ms.test", 100000, counter) - for i := 0; i < 50000; i++ { + ms.Init("./", 1000, counter) + for i := 0; i < 499; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), Spamguard: []byte{}, } ms.AddMessage(gm) } - if counter.Count() != 50000 { - t.Errorf("Counter should be at 50000 was %v", counter.Count()) + if counter.Count() != 499 { + t.Errorf("Counter should be at 499 was %v", counter.Count()) } ms.Close() - ms.Init("ms.test", 100000, counter) + ms.Init("./", 1000, counter) m := ms.FetchMessages() - if len(m) != 50000 { - t.Errorf("Should have been 50000 was %v", len(m)) + if len(m) != 499 { + t.Errorf("Should have been 499 was %v", len(m)) } counter.Reset() - for i := 0; i < 100000; i++ { + for i := 0; i < 1000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), Spamguard: []byte{}, @@ -41,10 +41,16 @@ func TestMessageStore(t *testing.T) { } m = ms.FetchMessages() - if len(m) != 100000 { - t.Errorf("Should have been 100000 was %v", len(m)) + if len(m) != 1000 { + t.Errorf("Should have been 1000 was %v", len(m)) } - ms.Close() - os.Remove("ms.test") + ms.Init("./", 1000, counter) + m = ms.FetchMessages() + if len(m) != 999 { + t.Errorf("Should have been 999 was %v", len(m)) + } + ms.Close() + + os.RemoveAll("./messages") }