2018-03-09 20:44:13 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
2018-05-28 18:05:06 +00:00
|
|
|
"cwtch.im/cwtch/protocol"
|
2018-06-27 15:14:59 +00:00
|
|
|
"cwtch.im/cwtch/server/metrics"
|
2018-03-09 20:44:13 +00:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
2018-05-09 19:09:00 +00:00
|
|
|
"log"
|
2018-03-09 20:44:13 +00:00
|
|
|
"os"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
2018-09-20 21:01:46 +00:00
|
|
|
const (
|
|
|
|
fileStorePartitions = 10
|
|
|
|
fileStoreFilename = "cwtch.messages"
|
|
|
|
directory = "messages"
|
|
|
|
)
|
|
|
|
|
2018-05-16 21:11:04 +00:00
|
|
|
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
|
2018-03-09 20:44:13 +00:00
|
|
|
type MessageStoreInterface interface {
|
|
|
|
AddMessage(protocol.GroupMessage)
|
|
|
|
FetchMessages() []*protocol.GroupMessage
|
|
|
|
}
|
|
|
|
|
2018-05-16 21:11:04 +00:00
|
|
|
// MessageStore is a file-backed implementation of MessageStoreInterface
|
2018-03-09 20:44:13 +00:00
|
|
|
type MessageStore struct {
|
2018-09-20 21:01:46 +00:00
|
|
|
activeLogFile *os.File
|
|
|
|
filePos int
|
|
|
|
storeDirectory string
|
2018-06-27 15:14:59 +00:00
|
|
|
lock sync.Mutex
|
|
|
|
messages []*protocol.GroupMessage
|
|
|
|
messageCounter metrics.Counter
|
2018-09-20 21:01:46 +00:00
|
|
|
maxBufferLines int
|
|
|
|
bufferPos int
|
|
|
|
bufferRotated bool
|
2018-03-09 20:44:13 +00:00
|
|
|
}
|
|
|
|
|
2018-05-16 21:11:04 +00:00
|
|
|
// Close closes the message store and underlying resources.
|
2018-03-09 20:44:13 +00:00
|
|
|
func (ms *MessageStore) Close() {
|
2018-06-03 19:02:42 +00:00
|
|
|
ms.lock.Lock()
|
2018-03-09 20:44:13 +00:00
|
|
|
ms.messages = nil
|
2018-09-20 21:01:46 +00:00
|
|
|
ms.activeLogFile.Close()
|
2018-06-03 19:02:42 +00:00
|
|
|
ms.lock.Unlock()
|
2018-03-09 20:44:13 +00:00
|
|
|
}
|
|
|
|
|
2018-06-03 19:36:20 +00:00
|
|
|
func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
|
2018-09-20 21:01:46 +00:00
|
|
|
ms.messages[ms.bufferPos] = gm
|
|
|
|
ms.bufferPos++
|
|
|
|
if ms.bufferPos == ms.maxBufferLines {
|
|
|
|
ms.bufferPos = 0
|
|
|
|
ms.bufferRotated = true
|
2018-06-03 19:36:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-20 21:01:46 +00:00
|
|
|
func (ms *MessageStore) initAndLoadFiles() error {
|
|
|
|
ms.activeLogFile = nil
|
|
|
|
for i := fileStorePartitions - 1; i >= 0; i-- {
|
|
|
|
ms.filePos = 0
|
|
|
|
filename := fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i)
|
|
|
|
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error: MessageStore could not open: %v: %v", filename, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
ms.activeLogFile = f
|
|
|
|
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
|
|
for scanner.Scan() {
|
|
|
|
gms := scanner.Text()
|
|
|
|
ms.filePos++
|
|
|
|
gm := &protocol.GroupMessage{}
|
|
|
|
err := json.Unmarshal([]byte(gms), gm)
|
|
|
|
if err == nil {
|
|
|
|
ms.updateBuffer(gm)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if ms.activeLogFile == nil {
|
|
|
|
return fmt.Errorf("Could not create log file to write to in %s", ms.storeDirectory)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ms *MessageStore) updateFile(gm *protocol.GroupMessage) {
|
|
|
|
s, err := json.Marshal(gm)
|
2018-03-09 20:44:13 +00:00
|
|
|
if err != nil {
|
2018-09-20 21:01:46 +00:00
|
|
|
log.Printf("[ERROR] Failed to unmarshal group message %v\n", err)
|
2018-03-09 20:44:13 +00:00
|
|
|
}
|
2018-09-20 21:01:46 +00:00
|
|
|
fmt.Fprintf(ms.activeLogFile, "%s\n", s)
|
|
|
|
ms.filePos++
|
|
|
|
if ms.filePos >= ms.maxBufferLines/fileStorePartitions {
|
|
|
|
ms.rotateFileStore()
|
|
|
|
}
|
|
|
|
}
|
2018-03-09 20:44:13 +00:00
|
|
|
|
2018-09-20 21:01:46 +00:00
|
|
|
func (ms *MessageStore) rotateFileStore() {
|
|
|
|
ms.activeLogFile.Close()
|
|
|
|
os.Remove(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, fileStorePartitions-1))
|
|
|
|
|
|
|
|
for i := fileStorePartitions - 2; i >= 0; i-- {
|
|
|
|
os.Rename(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i), fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i+1))
|
2018-03-09 20:44:13 +00:00
|
|
|
}
|
|
|
|
|
2018-09-20 21:01:46 +00:00
|
|
|
f, err := os.OpenFile(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, 0), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("ERROR: Could not open new message store file in: %s", ms.storeDirectory)
|
2018-03-09 20:44:13 +00:00
|
|
|
}
|
2018-09-20 21:01:46 +00:00
|
|
|
ms.filePos = 0
|
|
|
|
ms.activeLogFile = f
|
|
|
|
}
|
|
|
|
|
|
|
|
// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename
|
|
|
|
func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error {
|
|
|
|
ms.storeDirectory = appDirectory + "/" + directory + "/"
|
|
|
|
os.Mkdir(ms.storeDirectory, 0700)
|
2018-03-09 20:44:13 +00:00
|
|
|
|
2018-09-20 21:01:46 +00:00
|
|
|
ms.bufferPos = 0
|
|
|
|
ms.maxBufferLines = maxBufferLines
|
|
|
|
ms.messages = make([]*protocol.GroupMessage, maxBufferLines)
|
|
|
|
ms.bufferRotated = false
|
|
|
|
ms.messageCounter = messageCounter
|
|
|
|
|
|
|
|
err := ms.initAndLoadFiles()
|
|
|
|
return err
|
2018-03-09 20:44:13 +00:00
|
|
|
}
|
|
|
|
|
2018-05-16 21:11:04 +00:00
|
|
|
// FetchMessages returns all messages from the backing file.
|
2018-03-09 20:44:13 +00:00
|
|
|
func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
|
|
|
|
ms.lock.Lock()
|
2018-09-20 21:01:46 +00:00
|
|
|
if !ms.bufferRotated {
|
|
|
|
messages = make([]*protocol.GroupMessage, ms.bufferPos)
|
|
|
|
copy(messages, ms.messages[0:ms.bufferPos])
|
2018-06-03 19:36:20 +00:00
|
|
|
} else {
|
2018-09-20 21:01:46 +00:00
|
|
|
messages = make([]*protocol.GroupMessage, ms.maxBufferLines)
|
|
|
|
copy(messages, ms.messages[ms.bufferPos:ms.maxBufferLines])
|
|
|
|
copy(messages[ms.bufferPos:], ms.messages[0:ms.bufferPos])
|
2018-06-03 19:36:20 +00:00
|
|
|
}
|
2018-03-09 20:44:13 +00:00
|
|
|
ms.lock.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-05-16 21:11:04 +00:00
|
|
|
// AddMessage adds a GroupMessage to the store
|
2018-03-09 20:44:13 +00:00
|
|
|
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
|
2018-06-27 15:14:59 +00:00
|
|
|
ms.messageCounter.Add(1)
|
2018-03-09 20:44:13 +00:00
|
|
|
ms.lock.Lock()
|
2018-06-03 19:36:20 +00:00
|
|
|
ms.updateBuffer(&gm)
|
2018-09-20 21:01:46 +00:00
|
|
|
ms.updateFile(&gm)
|
|
|
|
|
2018-03-09 20:44:13 +00:00
|
|
|
ms.lock.Unlock()
|
|
|
|
}
|