From a0dab022ada78b10b3aa74065ea964766742680f Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Tue, 29 Jan 2019 12:56:59 -0800 Subject: [PATCH] stream storage for timelines, wired into profile store --- app/app.go | 24 +----- app/cli/main.go | 2 +- app/cwtchutil/main.go | 4 +- event/common.go | 2 +- model/group.go | 2 + model/message.go | 32 +++++--- model/message_test.go | 2 +- model/profile.go | 14 +++- server/metrics/monitors.go | 2 +- server/server_instance_test.go | 2 +- storage/file_enc.go | 47 +++++++++++ storage/file_store.go | 61 ++------------ storage/profile_store.go | 94 +++++++++++++++++---- storage/profile_store_test.go | 72 ++++++++++++++++ storage/stream_store.go | 145 +++++++++++++++++++++++++++++++++ storage/stream_store_test.go | 29 +++++++ testing/quality.sh | 2 +- 17 files changed, 427 insertions(+), 109 deletions(-) create mode 100644 storage/profile_store_test.go create mode 100644 storage/stream_store.go create mode 100644 storage/stream_store_test.go diff --git a/app/app.go b/app/app.go index 85d89ce..b95f58c 100644 --- a/app/app.go +++ b/app/app.go @@ -1,11 +1,9 @@ package app import ( - "crypto/rand" "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/storage" - "encoding/hex" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" @@ -13,7 +11,6 @@ import ( "io/ioutil" "os" "path" - "path/filepath" "sync" ) @@ -52,25 +49,15 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { return app } -func generateRandomFilename() string { - randBytes := make([]byte, 16) - rand.Read(randBytes) - return filepath.Join(hex.EncodeToString(randBytes)) -} - // NewProfile creates a new cwtchPeer with a given name. func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) { log.Debugf("CreatePeer(%v)\n", name) - randomFileName := generateRandomFilename() // TODO: eventBus per profile - profileStore, err := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", randomFileName), password) - profileStore.Init(name) p := peer.NewCwtchPeer(name) - app.eventBus.Publish(event.NewEvent(event.SetProfileName, map[event.Field]string{ - event.ProfileName: name, - })) + profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", p.GetProfile().LocalID), password) + err := profileStore.Init(name) if err != nil { return nil, err @@ -98,12 +85,7 @@ func (app *application) LoadProfiles(password string) error { for _, file := range files { // TODO: Per profile eventBus - profileStore, err := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password) - - if err != nil { - continue - } - + profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password) err = profileStore.Load() if err != nil { continue diff --git a/app/cli/main.go b/app/cli/main.go index 1e1f28e..a0cbc7c 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -512,7 +512,7 @@ func main() { group = g fmt.Printf("--------------- %v ---------------\n", group.GroupID) - gms := group.Timeline.Messages + gms := group.Timeline.GetMessages() max := 20 if len(gms) < max { max = len(gms) diff --git a/app/cwtchutil/main.go b/app/cwtchutil/main.go index f4aab6f..39343a2 100644 --- a/app/cwtchutil/main.go +++ b/app/cwtchutil/main.go @@ -38,7 +38,7 @@ func convertTorFile(filename string, password string) error { peer.GetProfile().Ed25519PublicKey = pk peer.GetProfile().Onion = string(onion) fileStore := storage2.NewFileStore(filename, password) - err = fileStore.Save(peer) + err = fileStore.save(peer) if err != nil { return err } @@ -147,7 +147,7 @@ func main() { fileStore2, _ := storage.NewProfileStore(nil, os.Args[2], newpw1) // No way to copy, populate this method - err = fileStore2.Save(peer) + err = fileStore2.save(peer) if err != nil { log.Errorln(err) os.Exit(1) diff --git a/event/common.go b/event/common.go index e6221eb..208968d 100644 --- a/event/common.go +++ b/event/common.go @@ -72,7 +72,7 @@ const ( ProfileName = Field("ProfileName") - Key = Field("Key") + Key = Field("Key") Data = Field("Data") Error = Field("Error") diff --git a/model/group.go b/model/group.go index 6c98fa1..a801124 100644 --- a/model/group.go +++ b/model/group.go @@ -29,11 +29,13 @@ type Group struct { Attributes map[string]string lock sync.Mutex NewMessage chan Message `json:"-"` + LocalID string } // NewGroup initializes a new group associated with a given CwtchServer func NewGroup(server string) (*Group, error) { group := new(Group) + group.LocalID = generateRandomID() if utils.IsValidHostname(server) == false { return nil, errors.New("Server is not a valid v3 onion") diff --git a/model/message.go b/model/message.go index 065fc71..0afda1e 100644 --- a/model/message.go +++ b/model/message.go @@ -9,7 +9,7 @@ import ( // Timeline encapsulates a collection of ordered messages, and a mechanism to access them // in a threadsafe manner. type Timeline struct { - Messages []Message + messages []Message SignedGroupID []byte lock sync.Mutex } @@ -24,6 +24,9 @@ type Message struct { PreviousMessageSig []byte } +// MessageBaseSize is a rough estimate of the base number of bytes the struct uses before strings are populated +const MessageBaseSize = 104 + func compareSignatures(a []byte, b []byte) bool { if len(a) != len(b) { return false @@ -39,39 +42,46 @@ func compareSignatures(a []byte, b []byte) bool { // GetMessages returns a copy of the entire timeline func (t *Timeline) GetMessages() []Message { t.lock.Lock() - messages := make([]Message, len(t.Messages)) - copy(messages[:], t.Messages[:]) + messages := make([]Message, len(t.messages)) + copy(messages[:], t.messages[:]) t.lock.Unlock() return messages } +// SetMessages sets the messages of this timeline. Only to be used in loading/initialization +func (t *Timeline) SetMessages(messages []Message) { + t.lock.Lock() + defer t.lock.Unlock() + t.messages = messages +} + // Len gets the length of the timeline func (t *Timeline) Len() int { - return len(t.Messages) + return len(t.messages) } // Swap swaps 2 messages on the timeline. func (t *Timeline) Swap(i, j int) { - t.Messages[i], t.Messages[j] = t.Messages[j], t.Messages[i] + t.messages[i], t.messages[j] = t.messages[j], t.messages[i] } // Less checks 2 messages (i and j) in the timeline and returns true if i occurred before j, else false func (t *Timeline) Less(i, j int) bool { - if t.Messages[i].Timestamp.Before(t.Messages[j].Timestamp) { + if t.messages[i].Timestamp.Before(t.messages[j].Timestamp) { return true } // Short circuit false if j is before i, signature checks will give a wrong order in this case. - if t.Messages[j].Timestamp.Before(t.Messages[i].Timestamp) { + if t.messages[j].Timestamp.Before(t.messages[i].Timestamp) { return false } - if compareSignatures(t.Messages[i].PreviousMessageSig, t.SignedGroupID) { + if compareSignatures(t.messages[i].PreviousMessageSig, t.SignedGroupID) { return true } - if compareSignatures(t.Messages[i].Signature, t.Messages[j].PreviousMessageSig) { + if compareSignatures(t.messages[i].Signature, t.messages[j].PreviousMessageSig) { return true } @@ -91,14 +101,14 @@ func (t *Timeline) Insert(mi *Message) bool { t.lock.Lock() defer t.lock.Unlock() - for _, m := range t.Messages { + for _, m := range t.messages { // If the message already exists, then we don't add it if compareSignatures(m.Signature, mi.Signature) { return true } } - t.Messages = append(t.Messages, *mi) + t.messages = append(t.messages, *mi) sort.Sort(t) return false } diff --git a/model/message_test.go b/model/message_test.go index 775f849..30c7807 100644 --- a/model/message_test.go +++ b/model/message_test.go @@ -99,6 +99,6 @@ func TestTranscriptConsistency(t *testing.T) { t.Fatalf("Timeline Out of Order!: %v %v", i, m) } - t.Logf("Messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig) + t.Logf("messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig) } } diff --git a/model/profile.go b/model/profile.go index b4a5356..9868952 100644 --- a/model/profile.go +++ b/model/profile.go @@ -4,12 +4,14 @@ import ( "crypto/rand" "cwtch.im/cwtch/protocol" "encoding/base32" + "encoding/hex" "encoding/json" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/utils" "github.com/golang/protobuf/proto" "golang.org/x/crypto/ed25519" "io" + "path/filepath" "strings" "sync" "time" @@ -24,6 +26,7 @@ type PublicProfile struct { Onion string Attributes map[string]string Timeline Timeline + LocalID string // used by storage engine lock sync.Mutex } @@ -39,8 +42,15 @@ type Profile struct { // TODO: Should this be per server? const MaxGroupMessageLength = 1800 +func generateRandomID() string { + randBytes := make([]byte, 16) + rand.Read(randBytes) + return filepath.Join(hex.EncodeToString(randBytes)) +} + func (p *PublicProfile) init() { p.Attributes = make(map[string]string) + p.LocalID = generateRandomID() } // SetAttribute allows applications to store arbitrary configuration info at the profile level. @@ -327,8 +337,8 @@ func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte, timestamp := time.Now().Unix() var prevSig []byte - if len(group.Timeline.Messages) > 0 { - prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature + if len(group.Timeline.messages) > 0 { + prevSig = group.Timeline.messages[len(group.Timeline.messages)-1].Signature } else { prevSig = group.SignedGroupID } diff --git a/server/metrics/monitors.go b/server/metrics/monitors.go index 47381a1..c63983b 100644 --- a/server/metrics/monitors.go +++ b/server/metrics/monitors.go @@ -68,7 +68,7 @@ func (mp *Monitors) report() { fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime)) - fmt.Fprintln(w, "Messages:") + fmt.Fprintln(w, "messages:") mp.Messages.Report(w) fmt.Fprintln(w, "\nClient Connections:") diff --git a/server/server_instance_test.go b/server/server_instance_test.go index 40589cc..950bab3 100644 --- a/server/server_instance_test.go +++ b/server/server_instance_test.go @@ -27,7 +27,7 @@ func TestServerInstance(t *testing.T) { res := si.HandleFetchRequest() if len(res) != 1 { - t.Errorf("Expected 1 Group Messages Instead got %v", res) + t.Errorf("Expected 1 Group messages Instead got %v", res) } // ra.HandleApplicationInstance(ai) diff --git a/storage/file_enc.go b/storage/file_enc.go index e6d377b..a9521dd 100644 --- a/storage/file_enc.go +++ b/storage/file_enc.go @@ -2,10 +2,14 @@ package storage import ( "crypto/rand" + "errors" "git.openprivacy.ca/openprivacy/libricochet-go/log" + "golang.org/x/crypto/nacl/secretbox" "golang.org/x/crypto/pbkdf2" "golang.org/x/crypto/sha3" "io" + "io/ioutil" + "path" ) // createKey derives a key from a password @@ -21,3 +25,46 @@ func createKey(password string) ([32]byte, [128]byte, error) { copy(dkr[:], dk) return dkr, salt, nil } + +//encryptFileData encrypts the cwtchPeer via the specified key. +func encryptFileData(data []byte, key [32]byte) ([]byte, error) { + var nonce [24]byte + + if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { + log.Errorf("Cannot read from random: %v\n", err) + return nil, err + } + + encrypted := secretbox.Seal(nonce[:], data, &nonce, &key) + return encrypted, nil +} + +//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key. +func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) { + var decryptNonce [24]byte + copy(decryptNonce[:], ciphertext[:24]) + decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key) + if ok { + return decrypted, nil + } + return nil, errors.New("Failed to decrypt") +} + +// Load instantiates a cwtchPeer from the file store +func readEncryptedFile(directory, filename, password string) ([]byte, error) { + encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename)) + if err == nil { + var dkr [32]byte + //Separate the salt from the encrypted bytes, then generate the derived key + salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:] + dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512) + copy(dkr[:], dk) + + data, err := decryptFile(encryptedbytes, dkr) + if err == nil { + return data, nil + } + return nil, err + } + return nil, err +} diff --git a/storage/file_store.go b/storage/file_store.go index 2060112..7398ca5 100644 --- a/storage/file_store.go +++ b/storage/file_store.go @@ -1,20 +1,15 @@ package storage import ( - "crypto/rand" - "fmt" - "git.openprivacy.ca/openprivacy/libricochet-go/log" - "golang.org/x/crypto/nacl/secretbox" - "golang.org/x/crypto/pbkdf2" - "golang.org/x/crypto/sha3" - "io" "io/ioutil" + "path" ) // fileStore stores a cwtchPeer in an encrypted file type fileStore struct { - filename string - password string + directory string + filename string + password string } // FileStore is a primitive around storing encrypted files @@ -24,14 +19,15 @@ type FileStore interface { } // NewFileStore instantiates a fileStore given a filename and a password -func NewFileStore(filename string, password string) FileStore { +func NewFileStore(directory string, filename string, password string) FileStore { filestore := new(fileStore) filestore.password = password filestore.filename = filename + filestore.directory = directory return filestore } -// Save serializes a cwtchPeer to a file +// save serializes a cwtchPeer to a file func (fps *fileStore) Save(data []byte) error { key, salt, _ := createKey(fps.password) encryptedbytes, err := encryptFileData(data, key) @@ -41,50 +37,11 @@ func (fps *fileStore) Save(data []byte) error { // the salt for the derived key is appended to the front of the file encryptedbytes = append(salt[:], encryptedbytes...) - err = ioutil.WriteFile(fps.filename, encryptedbytes, 0600) + err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600) return err } -//encryptFileData encrypts the cwtchPeer via the specified key. -func encryptFileData(data []byte, key [32]byte) ([]byte, error) { - var nonce [24]byte - - if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { - log.Errorf("Cannot read from random: %v\n", err) - return nil, err - } - - encrypted := secretbox.Seal(nonce[:], data, &nonce, &key) - return encrypted, nil -} - -//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key. -func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) { - var decryptNonce [24]byte - copy(decryptNonce[:], ciphertext[:24]) - decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key) - if ok { - return decrypted, nil - } - return nil, fmt.Errorf("Failed to decrypt") -} - -// Load instantiates a cwtchPeer from the file store func (fps *fileStore) Load() ([]byte, error) { - encryptedbytes, err := ioutil.ReadFile(fps.filename) - if err == nil { - var dkr [32]byte - //Separate the salt from the encrypted bytes, then generate the derived key - salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:] - dk := pbkdf2.Key([]byte(fps.password), salt, 4096, 32, sha3.New512) - copy(dkr[:], dk) - - data, err := decryptFile(encryptedbytes, dkr) - if err == nil { - return data, nil - } - return nil, err - } - return nil, err + return readEncryptedFile(fps.directory, fps.filename, fps.password) } diff --git a/storage/profile_store.go b/storage/profile_store.go index ffbb0c2..1200bc4 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -3,11 +3,20 @@ package storage import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" + "cwtch.im/cwtch/protocol" "encoding/json" + "github.com/golang/protobuf/proto" + "os" + "time" ) +const profileFilename = "profile" + type profileStore struct { fs FileStore + streamStores map[string]StreamStore + directory string + password string profile *model.Profile eventManager *event.Manager queue *event.Queue @@ -15,32 +24,37 @@ type profileStore struct { // ProfileStore is an interface to managing the storage of Cwtch Profiles type ProfileStore interface { - Save() error - Init(name string) + Init(name string) error Load() error Shutdown() GetProfileCopy() *model.Profile } // NewProfileStore returns a profile store backed by a filestore listening for events and saving them -func NewProfileStore(eventManager *event.Manager, filename string, password string) (ProfileStore, error) { - ps := &profileStore{fs: NewFileStore(filename, password), profile: nil, eventManager: eventManager} - err := ps.Load() - if err == nil { - ps.queue = event.NewEventQueue(100) - go ps.eventHandler() +// directory should be $appDir/profiles/$rand +func NewProfileStore(eventManager *event.Manager, directory, password string) ProfileStore { + os.Mkdir(directory, 0700) + ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}} + ps.queue = event.NewEventQueue(100) + go ps.eventHandler() - ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel) - } - return ps, err + ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel) + + return ps } -func (ps *profileStore) Init(name string) { +func (ps *profileStore) Init(name string) error { ps.profile = model.GenerateNewProfile(name) - ps.Save() + return ps.save() } -func (ps *profileStore) Save() error { +func (ps *profileStore) save() error { bytes, _ := json.Marshal(ps.profile) return ps.fs.Save(bytes) } @@ -57,6 +71,19 @@ func (ps *profileStore) Load() error { ps.profile = cp return nil } + + for _, profile := range cp.Contacts { + ss := NewStreamStore(ps.directory, profile.LocalID, ps.password) + profile.Timeline.SetMessages(ss.Read()) + ps.streamStores[profile.Onion] = ss + } + + for _, group := range cp.Groups { + ss := NewStreamStore(ps.directory, group.LocalID, ps.password) + group.Timeline.SetMessages(ss.Read()) + ps.streamStores[group.GroupID] = ss + } + return err } @@ -72,12 +99,49 @@ func (ps *profileStore) eventHandler() { contact, exists := ps.profile.GetContact(ev.Data["Onion"]) if exists { contact.Blocked = true + ps.save() } + case event.SetProfileName: + ps.profile.Name = ev.Data[event.ProfileName] + ps.profile.SetAttribute("name", ev.Data[event.ProfileName]) + ps.save() + + case event.SetAttribute: + ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) + ps.save() + case event.SetPeerAttribute: + contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) + if exists { + contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) + ps.save() + } + case event.SetGroupAttribute: + group, exists := ps.profile.Groups[ev.Data[event.GroupID]] + if exists { + group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) + ps.save() + } + case event.NewGroupInvite: + var gci protocol.CwtchPeerPacket //protocol.GroupChatInvite + proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &gci) + groupInvite := gci.GroupChatInvite + ps.profile.ProcessInvite(groupInvite, ev.Data[event.RemotePeer]) + ps.save() + group := ps.profile.Groups[groupInvite.GetGroupName()] + ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password) + + case event.NewMessageFromGroup: + groupid := ev.Data[event.GroupID] + received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) + sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) + // TODO: Sig, prev message Sig + message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer]} + //ps.profile.Groups[groupid].AddMessage(message) <- wants protocol.DecryptedGroupMessage so group.Timeline will drift here from launch when it's initialized + ps.streamStores[groupid].Write(message) default: return } - ps.Save() } } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go new file mode 100644 index 0000000..9d304cb --- /dev/null +++ b/storage/profile_store_test.go @@ -0,0 +1,72 @@ +package storage + +import ( + "cwtch.im/cwtch/event" + "os" + "testing" + "time" +) + +const testProfileName = "Alice" +const testKey = "key" +const testVal = "value" +const testInitialMessage = "howdy" +const testMessage = "Hello from storage" + +func TestProfileStoreWriteRead(t *testing.T) { + os.RemoveAll(testingDir) + eventBus := new(event.Manager) + eventBus.Initialize() + ps1 := NewProfileStore(eventBus, testingDir, password) + + err := ps1.Init(testProfileName) + if err != nil { + t.Errorf("Error initializing profileStore: %v\n", err) + } + + eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal})) + time.Sleep(1 * time.Second) + + groupid, invite, err := ps1.GetProfileCopy().StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") + if err != nil { + t.Errorf("Creating group: %v\n", err) + } + if err != nil { + t.Errorf("Creating group invite: %v\n", err) + } + eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy().Onion, event.GroupInvite: string(invite)})) + time.Sleep(1 * time.Second) + + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: ps1.GetProfileCopy().Onion, + event.Data: testMessage, + })) + time.Sleep(1 * time.Second) + + ps1.Shutdown() + + ps2 := NewProfileStore(eventBus, testingDir, password) + err = ps2.Load() + if err != nil { + t.Errorf("Error createing profileStore: %v\n", err) + } + + profile := ps2.GetProfileCopy() + if profile.Name != testProfileName { + t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name) + } + + v, _ := profile.GetAttribute(testKey) + if v != testVal { + t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v) + } + + group2 := ps2.GetProfileCopy().Groups[groupid] + if group2 == nil { + t.Errorf("Group not loaded\n") + } + +} diff --git a/storage/stream_store.go b/storage/stream_store.go new file mode 100644 index 0000000..36fbbbb --- /dev/null +++ b/storage/stream_store.go @@ -0,0 +1,145 @@ +package storage + +import ( + "bufio" + "cwtch.im/cwtch/model" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "io/ioutil" + "os" + "path" + "sync" +) + +const ( + fileStorePartitions = 16 + bytesPerFile = 15 * 1024 +) + +// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files +type streamStore struct { + password string + + storeDirectory string + filenameBase string + + messages []model.Message + bufferByteCount int + + lock sync.Mutex +} + +// StreamStore provides a stream like interface to encrypted storage +type StreamStore interface { + Write(message model.Message) + Read() []model.Message +} + +// NewStreamStore returns an initialized StreamStore ready for reading and writing +func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) { + ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password} + os.Mkdir(ss.storeDirectory, 0700) + + ss.initBuffer() + ss.initBufferFromStorage() + + return ss +} + +func (ss *streamStore) initBuffer() { + ss.messages = []model.Message{} + ss.bufferByteCount = 0 +} + +func (ss *streamStore) initBufferFromStorage() error { + filename := path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)) + f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) + if err != nil { + log.Errorf("StreamStore could not open: %v: %v", filename, err) + return err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + mText := scanner.Text() + m := model.Message{} + err := json.Unmarshal([]byte(mText), &m) + if err == nil { + ss.updateBuffer(m) + } + } + + return nil +} + +func (ss *streamStore) updateBuffer(m model.Message) { + ss.messages = append(ss.messages, m) + ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message) +} + +func (ss *streamStore) updateFile() error { + msgs, err := json.Marshal(ss.messages) + if err != nil { + log.Errorf("Failed to marshal group messages %v\n", err) + } + + // ENCRYPT + key, salt, _ := createKey(ss.password) + encryptedMsgs, err := encryptFileData(msgs, key) + if err != nil { + log.Errorf("Failed to encrypt messages: %v\n", err) + return err + } + encryptedMsgs = append(salt[:], encryptedMsgs...) + + ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700) + return nil +} + +func (ss *streamStore) rotateFileStore() { + os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1))) + + for i := fileStorePartitions - 2; i >= 0; i-- { + os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1))) + } +} + +// FetchMessages returns all messages from the backing file. +func (ss *streamStore) Read() (messages []model.Message) { + ss.lock.Lock() + defer ss.lock.Unlock() + + resp := []model.Message{} + + for i := fileStorePartitions - 1; i >= 0; i-- { + filename := fmt.Sprintf("%s.%d", ss.filenameBase, i) + + bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password) + if err != nil { + continue + } + + msgs := []model.Message{} + err = json.Unmarshal([]byte(bytes), &msgs) + if err == nil { + resp = append(resp, msgs...) + } + } + + return resp +} + +// AddMessage adds a GroupMessage to the store +func (ss *streamStore) Write(m model.Message) { + ss.lock.Lock() + defer ss.lock.Unlock() + ss.updateBuffer(m) + ss.updateFile() + + if ss.bufferByteCount > bytesPerFile { + ss.rotateFileStore() + ss.initBuffer() + } +} diff --git a/storage/stream_store_test.go b/storage/stream_store_test.go new file mode 100644 index 0000000..54aeb56 --- /dev/null +++ b/storage/stream_store_test.go @@ -0,0 +1,29 @@ +package storage + +import ( + "cwtch.im/cwtch/model" + "os" + "testing" +) + +const testingDir = "./testing" +const filenameBase = "testStream" +const password = "asdfqwer" +const line1 = "Hello from storage!" + +func TestStreamStoreWriteRead(t *testing.T) { + os.RemoveAll(testingDir) + os.Mkdir(testingDir, 0777) + ss1 := NewStreamStore(testingDir, filenameBase, password) + m := model.Message{Message: line1} + ss1.Write(m) + + ss2 := NewStreamStore(testingDir, filenameBase, password) + messages := ss2.Read() + if len(messages) != 1 { + t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages)) + } + if messages[0].Message != line1 { + t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message) + } +} diff --git a/testing/quality.sh b/testing/quality.sh index 0d2741c..c913c92 100755 --- a/testing/quality.sh +++ b/testing/quality.sh @@ -19,6 +19,6 @@ gofmt -l -s -w . echo "Checking for ineffectual assignment of errors (unchecked errors...)" ineffassign . -# misspell (https://github.com/client9/misspell) +# misspell (https://github.com/client9/misspell/cmd/misspell) echo "Checking for misspelled words..." misspell . | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea"