cwtch/storage/message_store.go

101 lines
2.2 KiB
Go
Raw Normal View History

2018-03-09 20:44:13 +00:00
package storage
import (
"bufio"
2018-05-28 18:05:06 +00:00
"cwtch.im/cwtch/protocol"
2018-03-09 20:44:13 +00:00
"encoding/json"
"fmt"
"log"
2018-03-09 20:44:13 +00:00
"os"
"sync"
)
2018-05-16 21:11:04 +00:00
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
2018-03-09 20:44:13 +00:00
type MessageStoreInterface interface {
AddMessage(protocol.GroupMessage)
FetchMessages() []*protocol.GroupMessage
}
2018-05-16 21:11:04 +00:00
// MessageStore is a file-backed implementation of MessageStoreInterface
2018-03-09 20:44:13 +00:00
type MessageStore struct {
file *os.File
lock sync.Mutex
messages []*protocol.GroupMessage
bufferSize int
pos int
rotated bool
2018-03-09 20:44:13 +00:00
}
2018-05-16 21:11:04 +00:00
// Close closes the message store and underlying resources.
2018-03-09 20:44:13 +00:00
func (ms *MessageStore) Close() {
ms.lock.Lock()
2018-03-09 20:44:13 +00:00
ms.messages = nil
ms.file.Close()
ms.lock.Unlock()
2018-03-09 20:44:13 +00:00
}
2018-06-03 19:36:20 +00:00
func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
ms.messages[ms.pos] = gm
ms.pos++
if ms.pos == ms.bufferSize {
ms.pos = 0
ms.rotated = true
}
}
// Init sets up a MessageStore of size bufferSize backed by filename
func (ms *MessageStore) Init(filename string, bufferSize int) {
2018-03-09 20:44:13 +00:00
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
ms.file = f
2018-06-03 19:36:20 +00:00
ms.pos = 0
ms.bufferSize = bufferSize
ms.messages = make([]*protocol.GroupMessage, bufferSize)
ms.rotated = false
2018-03-09 20:44:13 +00:00
scanner := bufio.NewScanner(f)
for scanner.Scan() {
gms := scanner.Text()
gm := &protocol.GroupMessage{}
err := json.Unmarshal([]byte(gms), gm)
if err == nil {
2018-06-03 19:36:20 +00:00
ms.updateBuffer(gm)
2018-03-09 20:44:13 +00:00
} else {
panic(err)
}
}
if err := scanner.Err(); err != nil {
panic(err)
}
}
2018-05-16 21:11:04 +00:00
// FetchMessages returns all messages from the backing file.
2018-03-09 20:44:13 +00:00
func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
ms.lock.Lock()
2018-06-03 19:36:20 +00:00
if !ms.rotated {
messages = make([]*protocol.GroupMessage, ms.pos)
copy(messages, ms.messages[0:ms.pos])
} else {
messages = make([]*protocol.GroupMessage, ms.bufferSize)
copy(messages, ms.messages)
}
2018-03-09 20:44:13 +00:00
ms.lock.Unlock()
return
}
2018-05-16 21:11:04 +00:00
// AddMessage adds a GroupMessage to the store
2018-03-09 20:44:13 +00:00
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
ms.lock.Lock()
2018-06-03 19:36:20 +00:00
ms.updateBuffer(&gm)
s, err := json.Marshal(gm)
if err != nil {
log.Printf("[ERROR] Failed to unmarshal group message %v\n", err)
}
2018-03-09 20:44:13 +00:00
fmt.Fprintf(ms.file, "%s\n", s)
ms.lock.Unlock()
}