Browse Source

stream storage for timelines, wired into profile store

pull/202/head
Dan Ballard 10 months ago
parent
commit
a0dab022ad

+ 3
- 21
app/app.go View File

@@ -1,11 +1,9 @@
package app

import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/storage"
"encoding/hex"
"fmt"

"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
@@ -13,7 +11,6 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
)

@@ -52,25 +49,15 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
return app
}

func generateRandomFilename() string {
randBytes := make([]byte, 16)
rand.Read(randBytes)
return filepath.Join(hex.EncodeToString(randBytes))
}

// NewProfile creates a new cwtchPeer with a given name.
func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) {
log.Debugf("CreatePeer(%v)\n", name)

randomFileName := generateRandomFilename()
// TODO: eventBus per profile
profileStore, err := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", randomFileName), password)
profileStore.Init(name)
p := peer.NewCwtchPeer(name)

app.eventBus.Publish(event.NewEvent(event.SetProfileName, map[event.Field]string{
event.ProfileName: name,
}))
profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", p.GetProfile().LocalID), password)
err := profileStore.Init(name)

if err != nil {
return nil, err
@@ -98,12 +85,7 @@ func (app *application) LoadProfiles(password string) error {
for _, file := range files {

// TODO: Per profile eventBus
profileStore, err := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password)

if err != nil {
continue
}

profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password)
err = profileStore.Load()
if err != nil {
continue

+ 1
- 1
app/cli/main.go View File

@@ -512,7 +512,7 @@ func main() {
group = g

fmt.Printf("--------------- %v ---------------\n", group.GroupID)
gms := group.Timeline.Messages
gms := group.Timeline.GetMessages()
max := 20
if len(gms) < max {
max = len(gms)

+ 2
- 2
app/cwtchutil/main.go View File

@@ -38,7 +38,7 @@ func convertTorFile(filename string, password string) error {
peer.GetProfile().Ed25519PublicKey = pk
peer.GetProfile().Onion = string(onion)
fileStore := storage2.NewFileStore(filename, password)
err = fileStore.Save(peer)
err = fileStore.save(peer)
if err != nil {
return err
}
@@ -147,7 +147,7 @@ func main() {

fileStore2, _ := storage.NewProfileStore(nil, os.Args[2], newpw1)
// No way to copy, populate this method
err = fileStore2.Save(peer)
err = fileStore2.save(peer)
if err != nil {
log.Errorln(err)
os.Exit(1)

+ 1
- 1
event/common.go View File

@@ -72,7 +72,7 @@ const (

ProfileName = Field("ProfileName")

Key = Field("Key")
Key = Field("Key")
Data = Field("Data")

Error = Field("Error")

+ 2
- 0
model/group.go View File

@@ -29,11 +29,13 @@ type Group struct {
Attributes map[string]string
lock sync.Mutex
NewMessage chan Message `json:"-"`
LocalID string
}

// NewGroup initializes a new group associated with a given CwtchServer
func NewGroup(server string) (*Group, error) {
group := new(Group)
group.LocalID = generateRandomID()

if utils.IsValidHostname(server) == false {
return nil, errors.New("Server is not a valid v3 onion")

+ 21
- 11
model/message.go View File

@@ -9,7 +9,7 @@ import (
// Timeline encapsulates a collection of ordered messages, and a mechanism to access them
// in a threadsafe manner.
type Timeline struct {
Messages []Message
messages []Message
SignedGroupID []byte
lock sync.Mutex
}
@@ -24,6 +24,9 @@ type Message struct {
PreviousMessageSig []byte
}

// MessageBaseSize is a rough estimate of the base number of bytes the struct uses before strings are populated
const MessageBaseSize = 104

func compareSignatures(a []byte, b []byte) bool {
if len(a) != len(b) {
return false
@@ -39,39 +42,46 @@ func compareSignatures(a []byte, b []byte) bool {
// GetMessages returns a copy of the entire timeline
func (t *Timeline) GetMessages() []Message {
t.lock.Lock()
messages := make([]Message, len(t.Messages))
copy(messages[:], t.Messages[:])
messages := make([]Message, len(t.messages))
copy(messages[:], t.messages[:])
t.lock.Unlock()
return messages
}

// SetMessages sets the messages of this timeline. Only to be used in loading/initialization
func (t *Timeline) SetMessages(messages []Message) {
t.lock.Lock()
defer t.lock.Unlock()
t.messages = messages
}

// Len gets the length of the timeline
func (t *Timeline) Len() int {
return len(t.Messages)
return len(t.messages)
}

// Swap swaps 2 messages on the timeline.
func (t *Timeline) Swap(i, j int) {
t.Messages[i], t.Messages[j] = t.Messages[j], t.Messages[i]
t.messages[i], t.messages[j] = t.messages[j], t.messages[i]
}

// Less checks 2 messages (i and j) in the timeline and returns true if i occurred before j, else false
func (t *Timeline) Less(i, j int) bool {

if t.Messages[i].Timestamp.Before(t.Messages[j].Timestamp) {
if t.messages[i].Timestamp.Before(t.messages[j].Timestamp) {
return true
}

// Short circuit false if j is before i, signature checks will give a wrong order in this case.
if t.Messages[j].Timestamp.Before(t.Messages[i].Timestamp) {
if t.messages[j].Timestamp.Before(t.messages[i].Timestamp) {
return false
}

if compareSignatures(t.Messages[i].PreviousMessageSig, t.SignedGroupID) {
if compareSignatures(t.messages[i].PreviousMessageSig, t.SignedGroupID) {
return true
}

if compareSignatures(t.Messages[i].Signature, t.Messages[j].PreviousMessageSig) {
if compareSignatures(t.messages[i].Signature, t.messages[j].PreviousMessageSig) {
return true
}

@@ -91,14 +101,14 @@ func (t *Timeline) Insert(mi *Message) bool {
t.lock.Lock()
defer t.lock.Unlock()

for _, m := range t.Messages {
for _, m := range t.messages {
// If the message already exists, then we don't add it
if compareSignatures(m.Signature, mi.Signature) {
return true
}
}

t.Messages = append(t.Messages, *mi)
t.messages = append(t.messages, *mi)
sort.Sort(t)
return false
}

+ 1
- 1
model/message_test.go View File

@@ -99,6 +99,6 @@ func TestTranscriptConsistency(t *testing.T) {
t.Fatalf("Timeline Out of Order!: %v %v", i, m)
}

t.Logf("Messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
t.Logf("messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
}
}

+ 12
- 2
model/profile.go View File

@@ -4,12 +4,14 @@ import (
"crypto/rand"
"cwtch.im/cwtch/protocol"
"encoding/base32"
"encoding/hex"
"encoding/json"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519"
"io"
"path/filepath"
"strings"
"sync"
"time"
@@ -24,6 +26,7 @@ type PublicProfile struct {
Onion string
Attributes map[string]string
Timeline Timeline
LocalID string // used by storage engine
lock sync.Mutex
}

@@ -39,8 +42,15 @@ type Profile struct {
// TODO: Should this be per server?
const MaxGroupMessageLength = 1800

func generateRandomID() string {
randBytes := make([]byte, 16)
rand.Read(randBytes)
return filepath.Join(hex.EncodeToString(randBytes))
}

func (p *PublicProfile) init() {
p.Attributes = make(map[string]string)
p.LocalID = generateRandomID()
}

// SetAttribute allows applications to store arbitrary configuration info at the profile level.
@@ -327,8 +337,8 @@ func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte,
timestamp := time.Now().Unix()

var prevSig []byte
if len(group.Timeline.Messages) > 0 {
prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature
if len(group.Timeline.messages) > 0 {
prevSig = group.Timeline.messages[len(group.Timeline.messages)-1].Signature
} else {
prevSig = group.SignedGroupID
}

+ 1
- 1
server/metrics/monitors.go View File

@@ -68,7 +68,7 @@ func (mp *Monitors) report() {

fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime))

fmt.Fprintln(w, "Messages:")
fmt.Fprintln(w, "messages:")
mp.Messages.Report(w)

fmt.Fprintln(w, "\nClient Connections:")

+ 1
- 1
server/server_instance_test.go View File

@@ -27,7 +27,7 @@ func TestServerInstance(t *testing.T) {
res := si.HandleFetchRequest()

if len(res) != 1 {
t.Errorf("Expected 1 Group Messages Instead got %v", res)
t.Errorf("Expected 1 Group messages Instead got %v", res)
}

// ra.HandleApplicationInstance(ai)

+ 47
- 0
storage/file_enc.go View File

@@ -2,10 +2,14 @@ package storage

import (
"crypto/rand"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"golang.org/x/crypto/nacl/secretbox"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/crypto/sha3"
"io"
"io/ioutil"
"path"
)

// createKey derives a key from a password
@@ -21,3 +25,46 @@ func createKey(password string) ([32]byte, [128]byte, error) {
copy(dkr[:], dk)
return dkr, salt, nil
}

//encryptFileData encrypts the cwtchPeer via the specified key.
func encryptFileData(data []byte, key [32]byte) ([]byte, error) {
var nonce [24]byte

if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
log.Errorf("Cannot read from random: %v\n", err)
return nil, err
}

encrypted := secretbox.Seal(nonce[:], data, &nonce, &key)
return encrypted, nil
}

//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key.
func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
var decryptNonce [24]byte
copy(decryptNonce[:], ciphertext[:24])
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key)
if ok {
return decrypted, nil
}
return nil, errors.New("Failed to decrypt")
}

// Load instantiates a cwtchPeer from the file store
func readEncryptedFile(directory, filename, password string) ([]byte, error) {
encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename))
if err == nil {
var dkr [32]byte
//Separate the salt from the encrypted bytes, then generate the derived key
salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:]
dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512)
copy(dkr[:], dk)

data, err := decryptFile(encryptedbytes, dkr)
if err == nil {
return data, nil
}
return nil, err
}
return nil, err
}

+ 9
- 52
storage/file_store.go View File

@@ -1,20 +1,15 @@
package storage

import (
"crypto/rand"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"golang.org/x/crypto/nacl/secretbox"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/crypto/sha3"
"io"
"io/ioutil"
"path"
)

// fileStore stores a cwtchPeer in an encrypted file
type fileStore struct {
filename string
password string
directory string
filename string
password string
}

// FileStore is a primitive around storing encrypted files
@@ -24,14 +19,15 @@ type FileStore interface {
}

// NewFileStore instantiates a fileStore given a filename and a password
func NewFileStore(filename string, password string) FileStore {
func NewFileStore(directory string, filename string, password string) FileStore {
filestore := new(fileStore)
filestore.password = password
filestore.filename = filename
filestore.directory = directory
return filestore
}

// Save serializes a cwtchPeer to a file
// save serializes a cwtchPeer to a file
func (fps *fileStore) Save(data []byte) error {
key, salt, _ := createKey(fps.password)
encryptedbytes, err := encryptFileData(data, key)
@@ -41,50 +37,11 @@ func (fps *fileStore) Save(data []byte) error {

// the salt for the derived key is appended to the front of the file
encryptedbytes = append(salt[:], encryptedbytes...)
err = ioutil.WriteFile(fps.filename, encryptedbytes, 0600)
err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600)
return err

}

//encryptFileData encrypts the cwtchPeer via the specified key.
func encryptFileData(data []byte, key [32]byte) ([]byte, error) {
var nonce [24]byte

if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
log.Errorf("Cannot read from random: %v\n", err)
return nil, err
}

encrypted := secretbox.Seal(nonce[:], data, &nonce, &key)
return encrypted, nil
}

//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key.
func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
var decryptNonce [24]byte
copy(decryptNonce[:], ciphertext[:24])
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key)
if ok {
return decrypted, nil
}
return nil, fmt.Errorf("Failed to decrypt")
}

// Load instantiates a cwtchPeer from the file store
func (fps *fileStore) Load() ([]byte, error) {
encryptedbytes, err := ioutil.ReadFile(fps.filename)
if err == nil {
var dkr [32]byte
//Separate the salt from the encrypted bytes, then generate the derived key
salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:]
dk := pbkdf2.Key([]byte(fps.password), salt, 4096, 32, sha3.New512)
copy(dkr[:], dk)

data, err := decryptFile(encryptedbytes, dkr)
if err == nil {
return data, nil
}
return nil, err
}
return nil, err
return readEncryptedFile(fps.directory, fps.filename, fps.password)
}

+ 79
- 15
storage/profile_store.go View File

@@ -3,11 +3,20 @@ package storage
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"encoding/json"
"github.com/golang/protobuf/proto"
"os"
"time"
)

const profileFilename = "profile"

type profileStore struct {
fs FileStore
streamStores map[string]StreamStore
directory string
password string
profile *model.Profile
eventManager *event.Manager
queue *event.Queue
@@ -15,32 +24,37 @@ type profileStore struct {

// ProfileStore is an interface to managing the storage of Cwtch Profiles
type ProfileStore interface {
Save() error
Init(name string)
Init(name string) error
Load() error
Shutdown()
GetProfileCopy() *model.Profile
}

// NewProfileStore returns a profile store backed by a filestore listening for events and saving them
func NewProfileStore(eventManager *event.Manager, filename string, password string) (ProfileStore, error) {
ps := &profileStore{fs: NewFileStore(filename, password), profile: nil, eventManager: eventManager}
err := ps.Load()
if err == nil {
ps.queue = event.NewEventQueue(100)
go ps.eventHandler()
// directory should be $appDir/profiles/$rand
func NewProfileStore(eventManager *event.Manager, directory, password string) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}}
ps.queue = event.NewEventQueue(100)
go ps.eventHandler()

ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
}
return ps, err
ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel)

return ps
}

func (ps *profileStore) Init(name string) {
func (ps *profileStore) Init(name string) error {
ps.profile = model.GenerateNewProfile(name)
ps.Save()
return ps.save()
}

func (ps *profileStore) Save() error {
func (ps *profileStore) save() error {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Save(bytes)
}
@@ -57,6 +71,19 @@ func (ps *profileStore) Load() error {
ps.profile = cp
return nil
}

for _, profile := range cp.Contacts {
ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
profile.Timeline.SetMessages(ss.Read())
ps.streamStores[profile.Onion] = ss
}

for _, group := range cp.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
group.Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
}

return err
}

@@ -72,12 +99,49 @@ func (ps *profileStore) eventHandler() {
contact, exists := ps.profile.GetContact(ev.Data["Onion"])
if exists {
contact.Blocked = true
ps.save()
}
case event.SetProfileName:
ps.profile.Name = ev.Data[event.ProfileName]
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
ps.save()

case event.SetAttribute:
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
case event.SetPeerAttribute:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
}
case event.SetGroupAttribute:
group, exists := ps.profile.Groups[ev.Data[event.GroupID]]
if exists {
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
}
case event.NewGroupInvite:
var gci protocol.CwtchPeerPacket //protocol.GroupChatInvite
proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &gci)
groupInvite := gci.GroupChatInvite
ps.profile.ProcessInvite(groupInvite, ev.Data[event.RemotePeer])
ps.save()
group := ps.profile.Groups[groupInvite.GetGroupName()]
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)

case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
// TODO: Sig, prev message Sig
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer]}
//ps.profile.Groups[groupid].AddMessage(message) <- wants protocol.DecryptedGroupMessage so group.Timeline will drift here from launch when it's initialized
ps.streamStores[groupid].Write(message)
default:
return
}

ps.Save()
}
}


+ 72
- 0
storage/profile_store_test.go View File

@@ -0,0 +1,72 @@
package storage

import (
"cwtch.im/cwtch/event"
"os"
"testing"
"time"
)

const testProfileName = "Alice"
const testKey = "key"
const testVal = "value"
const testInitialMessage = "howdy"
const testMessage = "Hello from storage"

func TestProfileStoreWriteRead(t *testing.T) {
os.RemoveAll(testingDir)
eventBus := new(event.Manager)
eventBus.Initialize()
ps1 := NewProfileStore(eventBus, testingDir, password)

err := ps1.Init(testProfileName)
if err != nil {
t.Errorf("Error initializing profileStore: %v\n", err)
}

eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
time.Sleep(1 * time.Second)

groupid, invite, err := ps1.GetProfileCopy().StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy().Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)

eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: ps1.GetProfileCopy().Onion,
event.Data: testMessage,
}))
time.Sleep(1 * time.Second)

ps1.Shutdown()

ps2 := NewProfileStore(eventBus, testingDir, password)
err = ps2.Load()
if err != nil {
t.Errorf("Error createing profileStore: %v\n", err)
}

profile := ps2.GetProfileCopy()
if profile.Name != testProfileName {
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
}

v, _ := profile.GetAttribute(testKey)
if v != testVal {
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
}

group2 := ps2.GetProfileCopy().Groups[groupid]
if group2 == nil {
t.Errorf("Group not loaded\n")
}

}

+ 145
- 0
storage/stream_store.go View File

@@ -0,0 +1,145 @@
package storage

import (
"bufio"
"cwtch.im/cwtch/model"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"io/ioutil"
"os"
"path"
"sync"
)

const (
fileStorePartitions = 16
bytesPerFile = 15 * 1024
)

// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
type streamStore struct {
password string

storeDirectory string
filenameBase string

messages []model.Message
bufferByteCount int

lock sync.Mutex
}

// StreamStore provides a stream like interface to encrypted storage
type StreamStore interface {
Write(message model.Message)
Read() []model.Message
}

// NewStreamStore returns an initialized StreamStore ready for reading and writing
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
os.Mkdir(ss.storeDirectory, 0700)

ss.initBuffer()
ss.initBufferFromStorage()

return ss
}

func (ss *streamStore) initBuffer() {
ss.messages = []model.Message{}
ss.bufferByteCount = 0
}

func (ss *streamStore) initBufferFromStorage() error {
filename := path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0))
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil {
log.Errorf("StreamStore could not open: %v: %v", filename, err)
return err
}
defer f.Close()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
mText := scanner.Text()
m := model.Message{}
err := json.Unmarshal([]byte(mText), &m)
if err == nil {
ss.updateBuffer(m)
}
}

return nil
}

func (ss *streamStore) updateBuffer(m model.Message) {
ss.messages = append(ss.messages, m)
ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message)
}

func (ss *streamStore) updateFile() error {
msgs, err := json.Marshal(ss.messages)
if err != nil {
log.Errorf("Failed to marshal group messages %v\n", err)
}

// ENCRYPT
key, salt, _ := createKey(ss.password)
encryptedMsgs, err := encryptFileData(msgs, key)
if err != nil {
log.Errorf("Failed to encrypt messages: %v\n", err)
return err
}
encryptedMsgs = append(salt[:], encryptedMsgs...)

ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
return nil
}

func (ss *streamStore) rotateFileStore() {
os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))

for i := fileStorePartitions - 2; i >= 0; i-- {
os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
}
}

// FetchMessages returns all messages from the backing file.
func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()

resp := []model.Message{}

for i := fileStorePartitions - 1; i >= 0; i-- {
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)

bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
if err != nil {
continue
}

msgs := []model.Message{}
err = json.Unmarshal([]byte(bytes), &msgs)
if err == nil {
resp = append(resp, msgs...)
}
}

return resp
}

// AddMessage adds a GroupMessage to the store
func (ss *streamStore) Write(m model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.updateBuffer(m)
ss.updateFile()

if ss.bufferByteCount > bytesPerFile {
ss.rotateFileStore()
ss.initBuffer()
}
}

+ 29
- 0
storage/stream_store_test.go View File

@@ -0,0 +1,29 @@
package storage

import (
"cwtch.im/cwtch/model"
"os"
"testing"
)

const testingDir = "./testing"
const filenameBase = "testStream"
const password = "asdfqwer"
const line1 = "Hello from storage!"

func TestStreamStoreWriteRead(t *testing.T) {
os.RemoveAll(testingDir)
os.Mkdir(testingDir, 0777)
ss1 := NewStreamStore(testingDir, filenameBase, password)
m := model.Message{Message: line1}
ss1.Write(m)

ss2 := NewStreamStore(testingDir, filenameBase, password)
messages := ss2.Read()
if len(messages) != 1 {
t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages))
}
if messages[0].Message != line1 {
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
}
}

+ 1
- 1
testing/quality.sh View File

@@ -19,6 +19,6 @@ gofmt -l -s -w .
echo "Checking for ineffectual assignment of errors (unchecked errors...)"
ineffassign .

# misspell (https://github.com/client9/misspell)
# misspell (https://github.com/client9/misspell/cmd/misspell)
echo "Checking for misspelled words..."
misspell . | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea"

Loading…
Cancel
Save