146 lines
3.5 KiB
Go
146 lines
3.5 KiB
Go
|
package storage
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"cwtch.im/cwtch/model"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||
|
"io/ioutil"
|
||
|
"os"
|
||
|
"path"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
fileStorePartitions = 16
|
||
|
bytesPerFile = 15 * 1024
|
||
|
)
|
||
|
|
||
|
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
|
||
|
type streamStore struct {
|
||
|
password string
|
||
|
|
||
|
storeDirectory string
|
||
|
filenameBase string
|
||
|
|
||
|
messages []model.Message
|
||
|
bufferByteCount int
|
||
|
|
||
|
lock sync.Mutex
|
||
|
}
|
||
|
|
||
|
// StreamStore provides a stream like interface to encrypted storage
|
||
|
type StreamStore interface {
|
||
|
Write(message model.Message)
|
||
|
Read() []model.Message
|
||
|
}
|
||
|
|
||
|
// NewStreamStore returns an initialized StreamStore ready for reading and writing
|
||
|
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
|
||
|
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
|
||
|
os.Mkdir(ss.storeDirectory, 0700)
|
||
|
|
||
|
ss.initBuffer()
|
||
|
ss.initBufferFromStorage()
|
||
|
|
||
|
return ss
|
||
|
}
|
||
|
|
||
|
func (ss *streamStore) initBuffer() {
|
||
|
ss.messages = []model.Message{}
|
||
|
ss.bufferByteCount = 0
|
||
|
}
|
||
|
|
||
|
func (ss *streamStore) initBufferFromStorage() error {
|
||
|
filename := path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0))
|
||
|
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
||
|
if err != nil {
|
||
|
log.Errorf("StreamStore could not open: %v: %v", filename, err)
|
||
|
return err
|
||
|
}
|
||
|
defer f.Close()
|
||
|
|
||
|
scanner := bufio.NewScanner(f)
|
||
|
for scanner.Scan() {
|
||
|
mText := scanner.Text()
|
||
|
m := model.Message{}
|
||
|
err := json.Unmarshal([]byte(mText), &m)
|
||
|
if err == nil {
|
||
|
ss.updateBuffer(m)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ss *streamStore) updateBuffer(m model.Message) {
|
||
|
ss.messages = append(ss.messages, m)
|
||
|
ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message)
|
||
|
}
|
||
|
|
||
|
func (ss *streamStore) updateFile() error {
|
||
|
msgs, err := json.Marshal(ss.messages)
|
||
|
if err != nil {
|
||
|
log.Errorf("Failed to marshal group messages %v\n", err)
|
||
|
}
|
||
|
|
||
|
// ENCRYPT
|
||
|
key, salt, _ := createKey(ss.password)
|
||
|
encryptedMsgs, err := encryptFileData(msgs, key)
|
||
|
if err != nil {
|
||
|
log.Errorf("Failed to encrypt messages: %v\n", err)
|
||
|
return err
|
||
|
}
|
||
|
encryptedMsgs = append(salt[:], encryptedMsgs...)
|
||
|
|
||
|
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ss *streamStore) rotateFileStore() {
|
||
|
os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))
|
||
|
|
||
|
for i := fileStorePartitions - 2; i >= 0; i-- {
|
||
|
os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// FetchMessages returns all messages from the backing file.
|
||
|
func (ss *streamStore) Read() (messages []model.Message) {
|
||
|
ss.lock.Lock()
|
||
|
defer ss.lock.Unlock()
|
||
|
|
||
|
resp := []model.Message{}
|
||
|
|
||
|
for i := fileStorePartitions - 1; i >= 0; i-- {
|
||
|
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
|
||
|
|
||
|
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
|
||
|
if err != nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
msgs := []model.Message{}
|
||
|
err = json.Unmarshal([]byte(bytes), &msgs)
|
||
|
if err == nil {
|
||
|
resp = append(resp, msgs...)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return resp
|
||
|
}
|
||
|
|
||
|
// AddMessage adds a GroupMessage to the store
|
||
|
func (ss *streamStore) Write(m model.Message) {
|
||
|
ss.lock.Lock()
|
||
|
defer ss.lock.Unlock()
|
||
|
ss.updateBuffer(m)
|
||
|
ss.updateFile()
|
||
|
|
||
|
if ss.bufferByteCount > bytesPerFile {
|
||
|
ss.rotateFileStore()
|
||
|
ss.initBuffer()
|
||
|
}
|
||
|
}
|