messageStore now uses log like rotation on files

This commit is contained in:
Dan Ballard 2018-09-20 14:01:46 -07:00
parent f217533044
commit 4e95427f44
5 changed files with 114 additions and 60 deletions

View File

@ -1,2 +0,0 @@
{"ciphertext":"SGVsbG8gdGhpcyBpcyBhIGZhaXJseSBhdmVyYWdlIGxlbmd0aCBtZXNzYWdlIHRoYXQgd2UgYXJlIHdyaXRpbmcgaGVyZS4="}
{"ciphertext":"SGVsbG8gdGhpcyBpcyBhIGZhaXJseSBhdmVyYWdlIGxlbmd0aCBtZXNzYWdlIHRoYXQgd2UgYXJlIHdyaXRpbmcgaGVyZS4="}

View File

@ -34,7 +34,10 @@ func (s *Server) Run(serverConfig *Config) {
af := application.ApplicationInstanceFactory{}
af.Init()
ms := new(storage.MessageStore)
ms.Init("cwtch.messages", s.config.MaxBufferLines, s.metricsPack.MessageCounter)
err = ms.Init(".", s.config.MaxBufferLines, s.metricsPack.MessageCounter)
if err != nil {
log.Fatal(err)
}
af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler {
return func() channels.Handler {
cslc := new(listen.CwtchServerListenChannel)

View File

@ -15,8 +15,8 @@ func TestServerInstance(t *testing.T) {
ai := new(application.ApplicationInstance)
ra := new(application.RicochetApplication)
msi := new(storage.MessageStore)
os.Remove("ms.test")
msi.Init("ms.test", 5, metrics.NewCounter())
os.RemoveAll("messages")
msi.Init(".", 5, metrics.NewCounter())
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."),
Spamguard: []byte{},

View File

@ -11,6 +11,12 @@ import (
"sync"
)
const (
fileStorePartitions = 10
fileStoreFilename = "cwtch.messages"
directory = "messages"
)
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
type MessageStoreInterface interface {
AddMessage(protocol.GroupMessage)
@ -19,72 +25,116 @@ type MessageStoreInterface interface {
// MessageStore is a file-backed implementation of MessageStoreInterface
type MessageStore struct {
file *os.File
activeLogFile *os.File
filePos int
storeDirectory string
lock sync.Mutex
messages []*protocol.GroupMessage
messageCounter metrics.Counter
bufferSize int
pos int
rotated bool
maxBufferLines int
bufferPos int
bufferRotated bool
}
// Close closes the message store and underlying resources.
func (ms *MessageStore) Close() {
ms.lock.Lock()
ms.messages = nil
ms.file.Close()
ms.activeLogFile.Close()
ms.lock.Unlock()
}
func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
ms.messages[ms.pos] = gm
ms.pos++
if ms.pos == ms.bufferSize {
ms.pos = 0
ms.rotated = true
ms.messages[ms.bufferPos] = gm
ms.bufferPos++
if ms.bufferPos == ms.maxBufferLines {
ms.bufferPos = 0
ms.bufferRotated = true
}
}
// Init sets up a MessageStore of size bufferSize backed by filename
func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
ms.file = f
ms.pos = 0
ms.bufferSize = bufferSize
ms.messages = make([]*protocol.GroupMessage, bufferSize)
ms.rotated = false
ms.messageCounter = messageCounter
func (ms *MessageStore) initAndLoadFiles() error {
ms.activeLogFile = nil
for i := fileStorePartitions - 1; i >= 0; i-- {
ms.filePos = 0
filename := fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i)
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil {
log.Printf("Error: MessageStore could not open: %v: %v", filename, err)
continue
}
ms.activeLogFile = f
scanner := bufio.NewScanner(f)
for scanner.Scan() {
gms := scanner.Text()
gm := &protocol.GroupMessage{}
err := json.Unmarshal([]byte(gms), gm)
if err == nil {
ms.updateBuffer(gm)
} else {
panic(err)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
gms := scanner.Text()
ms.filePos++
gm := &protocol.GroupMessage{}
err := json.Unmarshal([]byte(gms), gm)
if err == nil {
ms.updateBuffer(gm)
}
}
}
if ms.activeLogFile == nil {
return fmt.Errorf("Could not create log file to write to in %s", ms.storeDirectory)
}
return nil
}
if err := scanner.Err(); err != nil {
panic(err)
func (ms *MessageStore) updateFile(gm *protocol.GroupMessage) {
s, err := json.Marshal(gm)
if err != nil {
log.Printf("[ERROR] Failed to unmarshal group message %v\n", err)
}
fmt.Fprintf(ms.activeLogFile, "%s\n", s)
ms.filePos++
if ms.filePos >= ms.maxBufferLines/fileStorePartitions {
ms.rotateFileStore()
}
}
func (ms *MessageStore) rotateFileStore() {
ms.activeLogFile.Close()
os.Remove(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, fileStorePartitions-1))
for i := fileStorePartitions - 2; i >= 0; i-- {
os.Rename(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i), fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i+1))
}
f, err := os.OpenFile(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, 0), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil {
log.Printf("ERROR: Could not open new message store file in: %s", ms.storeDirectory)
}
ms.filePos = 0
ms.activeLogFile = f
}
// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename
func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error {
ms.storeDirectory = appDirectory + "/" + directory + "/"
os.Mkdir(ms.storeDirectory, 0700)
ms.bufferPos = 0
ms.maxBufferLines = maxBufferLines
ms.messages = make([]*protocol.GroupMessage, maxBufferLines)
ms.bufferRotated = false
ms.messageCounter = messageCounter
err := ms.initAndLoadFiles()
return err
}
// FetchMessages returns all messages from the backing file.
func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
ms.lock.Lock()
if !ms.rotated {
messages = make([]*protocol.GroupMessage, ms.pos)
copy(messages, ms.messages[0:ms.pos])
if !ms.bufferRotated {
messages = make([]*protocol.GroupMessage, ms.bufferPos)
copy(messages, ms.messages[0:ms.bufferPos])
} else {
messages = make([]*protocol.GroupMessage, ms.bufferSize)
copy(messages, ms.messages)
messages = make([]*protocol.GroupMessage, ms.maxBufferLines)
copy(messages, ms.messages[ms.bufferPos:ms.maxBufferLines])
copy(messages[ms.bufferPos:], ms.messages[0:ms.bufferPos])
}
ms.lock.Unlock()
return
@ -95,10 +145,7 @@ func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
ms.messageCounter.Add(1)
ms.lock.Lock()
ms.updateBuffer(&gm)
s, err := json.Marshal(gm)
if err != nil {
log.Printf("[ERROR] Failed to unmarshal group message %v\n", err)
}
fmt.Fprintf(ms.file, "%s\n", s)
ms.updateFile(&gm)
ms.lock.Unlock()
}

View File

@ -12,27 +12,27 @@ func TestMessageStore(t *testing.T) {
os.Remove("ms.test")
ms := new(MessageStore)
counter := metrics.NewCounter()
ms.Init("ms.test", 100000, counter)
for i := 0; i < 50000; i++ {
ms.Init("./", 1000, counter)
for i := 0; i < 499; i++ {
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
Spamguard: []byte{},
}
ms.AddMessage(gm)
}
if counter.Count() != 50000 {
t.Errorf("Counter should be at 50000 was %v", counter.Count())
if counter.Count() != 499 {
t.Errorf("Counter should be at 499 was %v", counter.Count())
}
ms.Close()
ms.Init("ms.test", 100000, counter)
ms.Init("./", 1000, counter)
m := ms.FetchMessages()
if len(m) != 50000 {
t.Errorf("Should have been 50000 was %v", len(m))
if len(m) != 499 {
t.Errorf("Should have been 499 was %v", len(m))
}
counter.Reset()
for i := 0; i < 100000; i++ {
for i := 0; i < 1000; i++ {
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
Spamguard: []byte{},
@ -41,10 +41,16 @@ func TestMessageStore(t *testing.T) {
}
m = ms.FetchMessages()
if len(m) != 100000 {
t.Errorf("Should have been 100000 was %v", len(m))
if len(m) != 1000 {
t.Errorf("Should have been 1000 was %v", len(m))
}
ms.Close()
os.Remove("ms.test")
ms.Init("./", 1000, counter)
m = ms.FetchMessages()
if len(m) != 999 {
t.Errorf("Should have been 999 was %v", len(m))
}
ms.Close()
os.RemoveAll("./messages")
}