forked from cwtch.im/cwtch
1
0
Fork 0
cwtch/storage/v0/stream_store.go

146 lines
3.6 KiB
Go

package v0
import (
"cwtch.im/cwtch/model"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
"os"
"path"
"sync"
)
const (
fileStorePartitions = 16
bytesPerFile = 15 * 1024
)
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
type streamStore struct {
password string
storeDirectory string
filenameBase string
lock sync.Mutex
// Buffer is used just for current file to write to
messages []model.Message
bufferByteCount int
}
// StreamStore provides a stream like interface to encrypted storage
type StreamStore interface {
Read() []model.Message
Write(m model.Message)
}
// NewStreamStore returns an initialized StreamStore ready for reading and writing
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
os.Mkdir(ss.storeDirectory, 0700)
ss.initBuffer()
return ss
}
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
resp := []model.Message{}
for i := fileStorePartitions - 1; i >= 0; i-- {
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
if err != nil {
continue
}
msgs := []model.Message{}
json.Unmarshal([]byte(bytes), &msgs)
resp = append(resp, msgs...)
}
// 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
resp[i].Acknowledged = true
resp[i].ReceivedByServer = true
}
return resp
}
// ****** Writing *******/
func (ss *streamStore) WriteN(messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
for _, m := range messages {
ss.updateBuffer(m)
if ss.bufferByteCount > bytesPerFile {
ss.updateFile()
log.Debugf("rotating log file")
ss.rotateFileStore()
ss.initBuffer()
}
}
}
// Write adds a GroupMessage to the store
func (ss *streamStore) Write(m model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.updateBuffer(m)
ss.updateFile()
if ss.bufferByteCount > bytesPerFile {
log.Debugf("rotating log file")
ss.rotateFileStore()
ss.initBuffer()
}
}
func (ss *streamStore) initBuffer() {
ss.messages = []model.Message{}
ss.bufferByteCount = 0
}
func (ss *streamStore) updateBuffer(m model.Message) {
ss.messages = append(ss.messages, m)
ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message)
}
func (ss *streamStore) updateFile() error {
msgs, err := json.Marshal(ss.messages)
if err != nil {
log.Errorf("Failed to marshal group messages %v\n", err)
}
// ENCRYPT
key, salt, _ := createKey(ss.password)
encryptedMsgs, err := encryptFileData(msgs, key)
if err != nil {
log.Errorf("Failed to encrypt messages: %v\n", err)
return err
}
encryptedMsgs = append(salt[:], encryptedMsgs...)
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
return nil
}
func (ss *streamStore) rotateFileStore() {
os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))
for i := fileStorePartitions - 2; i >= 0; i-- {
os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
}
}