creating a new v1 storage system with shared salt and only key in memory;
the build was successful Details

also make server/metrics test deterministic and not sleep based
This commit is contained in:
Dan Ballard 2020-01-13 14:11:00 -08:00
parent 884e658305
commit 639ea560d5
20 changed files with 1205 additions and 450 deletions

View File

@ -114,7 +114,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin
return
}
profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
profileStore := storage.CreateProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
app.storage[profile.Onion] = profileStore
pc := app.storage[profile.Onion].GetProfileCopy(true)
@ -182,8 +182,7 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf
for _, file := range files {
eventBus := event.NewEventManager()
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password, nil)
err = profileStore.Load()
profileStore, err := storage.LoadProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password)
if err != nil {
continue
}

View File

@ -16,7 +16,7 @@ type applicationClient struct {
applicationBridge
appletPeers
appBus event.Manager
appBus event.Manager
acmutex sync.Mutex
}
@ -42,9 +42,10 @@ func (ac *applicationClient) handleEvent(ev *event.Event) {
switch ev.EventType {
case event.NewPeer:
localID := ev.Data[event.Identity]
password := ev.Data[event.Password]
reload := ev.Data[event.Status] == "running"
ac.newPeer(localID, password, reload)
key := ev.Data[event.Key]
salt := ev.Data[event.Salt]
reload := ev.Data[event.Status] == event.StorageRunning
ac.newPeer(localID, key, salt, reload)
case event.DeletePeer:
onion := ev.Data[event.Identity]
ac.handleDeletedPeer(onion)
@ -59,8 +60,12 @@ func (ac *applicationClient) handleEvent(ev *event.Event) {
}
}
func (ac *applicationClient) newPeer(localID, password string, reload bool) {
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password)
func (ac *applicationClient) newPeer(localID, key, salt string, reload bool) {
var keyBytes [32]byte
var saltBytes [128]byte
copy(keyBytes[:], key)
copy(saltBytes[:], salt)
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), keyBytes, saltBytes)
if err != nil {
log.Errorf("Could not read profile for NewPeer event: %v\n", err)
ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("Could not read profile for NewPeer event: %v\n", err)))

View File

@ -65,7 +65,9 @@ func (as *applicationService) handleEvent(ev *event.Event) {
as.loadProfiles(password)
case event.ReloadClient:
for _, storage := range as.storage {
message := event.IPCMessage{Dest: DestApp, Message: *storage.GetNewPeerMessage()}
peerMsg := *storage.GetNewPeerMessage()
peerMsg.Data[event.Status] = event.StorageRunning
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
as.bridge.Write(&message)
}
@ -103,7 +105,7 @@ func (as *applicationService) createPeer(name, password, tag string) {
profile.SetAttribute(AttributeTag, tag)
}
profileStore := storage.NewProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile)
profileStore := storage.CreateProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile)
blockedPeers := profile.BlockedPeers()
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
@ -113,7 +115,9 @@ func (as *applicationService) createPeer(name, password, tag string) {
as.storage[profile.Onion] = profileStore
as.engines[profile.Onion] = engine
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})}
peerMsg := *profileStore.GetNewPeerMessage()
peerMsg.Data[event.Status] = event.StorageNew
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
as.bridge.Write(&message)
}
@ -129,7 +133,10 @@ func (as *applicationService) loadProfiles(password string) {
as.storage[profile.Onion] = profileStore
as.engines[profile.Onion] = engine
as.asmutex.Unlock()
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})}
peerMsg := *profileStore.GetNewPeerMessage()
peerMsg.Data[event.Status] = event.StorageNew
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
as.bridge.Write(&message)
count++
})

View File

@ -142,7 +142,7 @@ const (
CreatePeer = Type("CreatePeer")
// service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')]
// app -> Identity(onion)
// app -> Key, Salt
NewPeer = Type("NewPeer")
// Identity(onion)
@ -184,6 +184,10 @@ const (
// Error: Description of the Error
// Onion: the local onion we attempt to check
NetworkStatus = Type("NetworkError")
// For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue
Syn = Type("Syn")
Ack = Type("Ack")
)
// Field defines common event attributes
@ -217,6 +221,8 @@ const (
Key = Field("Key")
Data = Field("Data")
Salt = Field("Salt")
Error = Field("Error")
Progreess = Field("Progress")
@ -230,6 +236,12 @@ const (
AppErrLoaded0 = "Loaded 0 profiles"
)
// Values to be suplied in event.NewPeer for Status
const (
StorageRunning = "running"
StorageNew = "new"
)
// Defining Protocol Contexts
const (
ContextAck = "im.cwtch.acknowledgement"

View File

@ -9,13 +9,20 @@ func TestCounter(t *testing.T) {
starttime := time.Now()
c := NewCounter()
for i := 0; i < 100; i++ {
max := 100
done := make(chan bool, max)
// slightly stress test atomic nature of metric by flooding with threads Add()ing
for i := 0; i < max; i++ {
go func() {
c.Add(1)
done <- true
}()
}
time.Sleep(2 * time.Second)
for i := 0; i < max; i++ {
<- done
}
val := c.Count()
if val != 100 {

View File

@ -3,30 +3,20 @@ package storage
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"encoding/json"
"cwtch.im/cwtch/storage/v0"
"cwtch.im/cwtch/storage/v1"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"os"
"time"
"io/ioutil"
"path"
"strconv"
)
const groupIDLen = 32
const peerIDLen = 56
const profileFilename = "profile"
type profileStore struct {
fs FileStore
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
directory string
password string
profile *model.Profile
eventManager event.Manager
queue event.Queue
writer bool
}
const versionFile = "VERSION"
const currentVersion = 1
// ProfileStore is an interface to managing the storage of Cwtch Profiles
type ProfileStore interface {
Load() error
Shutdown()
Delete()
GetProfileCopy(timeline bool) *model.Profile
@ -34,52 +24,25 @@ type ProfileStore interface {
GetStatusMessages() []*event.Event
}
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
// CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
//ps.queue = event.NewQueue(100)
ps.queue = event.NewQueue()
if profile != nil {
ps.save()
}
go ps.eventHandler()
ps.eventManager.Subscribe(event.BlockPeer, ps.queue)
ps.eventManager.Subscribe(event.UnblockPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
return ps
func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
return v1.CreateProfileWriterStore(eventManager, directory, password, profile)
}
// ReadProfile reads a profile from storqage and returns the profile
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func ReadProfile(directory, password string) (*model.Profile, error) {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (ProfileStore, error) {
versionCheckUpgrade(directory, password)
err := ps.Load()
if err != nil {
return nil, err
}
return v1.LoadProfileWriterStore(eventManager, directory, password)
}
profile := ps.GetProfileCopy(true)
return profile, nil
// ReadProfile reads a profile from storage and returns the profile
// Should only be called for cache refresh of the profile after a ProfileWriterStore has opened
// (and upgraded) the store, and thus supplied the key/salt
func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) {
return v1.ReadProfile(directory, key, salt)
}
// NewProfile creates a new profile for use in the profile store.
@ -88,254 +51,43 @@ func NewProfile(name string) *model.Profile {
return profile
}
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
func (ps *profileStore) GetNewPeerMessage() *event.Event {
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
return &message
// ********* Versioning and upgrade **********
func detectVersion(directory string) int {
vnumberStr, err := ioutil.ReadFile(path.Join(directory, versionFile))
if err != nil {
return 0
}
vnumber, err := strconv.Atoi(string(vnumberStr))
if err != nil {
log.Errorf("Could not parse VERSION file contents: '%v' - %v\n", vnumber, err)
return -1
}
return vnumber
}
func (ps *profileStore) GetStatusMessages() []*event.Event {
messages := []*event.Event{}
for _, contact := range ps.profile.Contacts {
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(contact.Onion),
event.ConnectionState: contact.State,
})
messages = append(messages, &message)
}
doneServers := make(map[string]bool)
for _, group := range ps.profile.Groups {
if _, exists := doneServers[group.GroupServer]; !exists {
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(group.GroupServer),
event.ConnectionState: group.State,
})
messages = append(messages, &message)
doneServers[group.GroupServer] = true
}
}
return messages
}
func (ps *profileStore) ChangePassword(oldpass, newpass, eventID string) {
if oldpass != ps.password {
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID))
return
}
newStreamStores := map[string]StreamStore{}
idToNewLocalID := map[string]string{}
// Generate all new StreamStores with the new password and write all the old StreamStore data into these ones
for ssid, ss := range ps.streamStores {
// New ss with new pass and new localID
newlocalID := model.GenerateRandomID()
idToNewLocalID[ssid] = newlocalID
newSS := NewStreamStore(ps.directory, newlocalID, newpass)
newStreamStores[ssid] = newSS
// write whole store
messages := ss.Read()
newSS.WriteN(messages)
}
// Switch over
oldStreamStores := ps.streamStores
ps.streamStores = newStreamStores
for ssid, newLocalID := range idToNewLocalID {
if len(ssid) == groupIDLen {
ps.profile.Groups[ssid].LocalID = newLocalID
} else {
ps.profile.Contacts[ssid].LocalID = newLocalID
}
}
ps.password = newpass
ps.fs.ChangePassword(newpass)
ps.save()
// Clean up
for _, oldss := range oldStreamStores {
oldss.Delete()
}
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID))
return
}
func (ps *profileStore) save() error {
if ps.writer {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Write(bytes)
}
return nil
}
// Read instantiates a cwtchPeer from the file store
func (ps *profileStore) Load() error {
decrypted, err := ps.fs.Read()
func upgradeV0ToV1(directory, password string) error {
log.Debugln("Attempting storage v0 to v1: Reading v0 profile...")
profile, err := v0.ReadProfile(directory, password)
if err != nil {
return err
}
cp := new(model.Profile)
err = json.Unmarshal(decrypted, &cp)
if err == nil {
ps.profile = cp
for gid, group := range cp.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
log.Debugln("Attempting storage v0 to v1: Writing v1 profile...")
return v1.UpgradeV0Profile(profile, directory, password)
}
cp.Groups[gid].Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
}
func versionCheckUpgrade(directory, password string) {
version := detectVersion(directory)
log.Infof("versionCheck: %v\n", version)
if version == -1 {
return
}
return err
}
func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile {
return ps.profile.GetCopy(timeline)
}
func (ps *profileStore) eventHandler() {
log.Debugln("eventHandler()!")
for {
ev := ps.queue.Next()
log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID)
switch ev.EventType {
case event.BlockPeer:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.Blocked = true
ps.save()
}
case event.UnblockPeer:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.Blocked = false
ps.save()
}
case event.PeerCreated:
var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
// TODO: configure - allow peers to be configured to turn on limited storage
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
pp.Timeline.SetMessages(ss.Read())
ps.streamStores[pp.Onion] = ss
ps.save()*/
case event.GroupCreated:
var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
ps.profile.AddGroup(group)
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
ps.save()
case event.SetProfileName:
ps.profile.Name = ev.Data[event.ProfileName]
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
ps.save()
case event.SetAttribute:
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
case event.SetPeerAttribute:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
} else {
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
}
case event.SetGroupAttribute:
group := ps.profile.GetGroup(ev.Data[event.GroupID])
if group != nil {
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
} else {
log.Errorf("error setting attribute on group %v group does not exist", ev)
}
case event.AcceptGroupInvite:
err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
if err == nil {
ps.save()
} else {
log.Errorf("error accepting group invite")
}
case event.NewGroupInvite:
gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
log.Errorf("gid: %v err:%v\n", gid, err)
if err == nil {
ps.save()
group := ps.profile.Groups[gid]
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
} else {
log.Errorf("error storing new group invite: %v (%v)", err, ev)
}
case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
ss, exists := ps.streamStores[groupid]
if exists {
ss.Write(message)
} else {
log.Errorf("error storing new group message: %v stream store does not exist", ev)
}
case event.PeerStateChange:
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
}
case event.ServerStateChange:
for _, group := range ps.profile.Groups {
if group.GroupServer == ev.Data[event.GroupServer] {
group.State = ev.Data[event.ConnectionState]
}
}
case event.DeleteContact:
onion := ev.Data[event.RemotePeer]
ps.profile.DeleteContact(onion)
ps.save()
case event.DeleteGroup:
groupID := ev.Data[event.GroupID]
ps.profile.DeleteGroup(groupID)
ps.save()
ss, exists := ps.streamStores[groupID]
if exists {
ss.Delete()
delete(ps.streamStores, groupID)
}
case event.ChangePassword:
oldpass := ev.Data[event.Password]
newpass := ev.Data[event.NewPassword]
ps.ChangePassword(oldpass, newpass, ev.EventID)
default:
if version == 0 {
err := upgradeV0ToV1(directory, password)
if err != nil {
return
}
}
}
func (ps *profileStore) Shutdown() {
if ps.queue != nil {
ps.queue.Shutdown()
}
}
func (ps *profileStore) Delete() {
log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion)
for _, ss := range ps.streamStores {
ss.Delete()
}
ps.fs.Delete()
err := os.RemoveAll(ps.directory)
if err != nil {
log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err)
//version = 1
}
}

View File

@ -4,83 +4,35 @@ package storage
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/storage/v0"
"fmt"
"log"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"os"
"testing"
"time"
)
const testingDir = "./testing"
const filenameBase = "testStream"
const password = "asdfqwer"
const line1 = "Hello from storage!"
const testProfileName = "Alice"
const testKey = "key"
const testVal = "value"
const testInitialMessage = "howdy"
const testMessage = "Hello from storage"
func TestProfileStoreWriteRead(t *testing.T) {
log.Println("profile store test!")
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
profile := NewProfile(testProfileName)
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
time.Sleep(1 * time.Second)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
event.Data: testMessage,
}))
time.Sleep(1 * time.Second)
ps1.Shutdown()
ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil)
err = ps2.Load()
if err != nil {
t.Errorf("Error createing profileStore: %v\n", err)
}
profile = ps2.GetProfileCopy(true)
if profile.Name != testProfileName {
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
}
v, _ := profile.GetAttribute(testKey)
if v != testVal {
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
}
group2 := ps2.GetProfileCopy(true).Groups[groupid]
if group2 == nil {
t.Errorf("Group not loaded\n")
}
}
func TestProfileStoreChangePassword(t *testing.T) {
func TestProfileStoreUpgradeV0toV1(t *testing.T) {
log.SetLevel(log.LevelDebug)
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
queue := event.NewQueue()
eventBus.Subscribe(event.ChangePasswordSuccess, queue)
fmt.Println("Creating and initializing v0 profile and store...")
profile := NewProfile(testProfileName)
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
ps1 := v0.NewProfileWriterStore(eventBus, testingDir, password, profile)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
@ -90,50 +42,19 @@ func TestProfileStoreChangePassword(t *testing.T) {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
ps1.AddGroup(invite, profile.Onion)
fmt.Println("Sending 200 messages...")
for i := 0; i < 200; i++ {
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
event.Data: testMessage,
}))
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage)
}
newPass := "qwerty123"
fmt.Println("Sending Change Passwords event...")
eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass))
ev := queue.Next()
if ev.EventType != event.ChangePasswordSuccess {
t.Errorf("Unexpected event response detected %v\n", ev.EventType)
return
}
fmt.Println("Sending 10 more messages...")
for i := 0; i < 10; i++ {
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
event.Data: testMessage,
}))
}
time.Sleep(3 * time.Second)
fmt.Println("Shutdown profile store...")
fmt.Println("Shutdown v0 profile store...")
ps1.Shutdown()
fmt.Println("New Profile store...")
ps2 := NewProfileWriterStore(eventBus, testingDir, newPass, nil)
err = ps2.Load()
fmt.Println("New v1 Profile store...")
ps2, err := LoadProfileWriterStore(eventBus, testingDir, password)
if err != nil {
t.Errorf("Error createing new profileStore with new password: %v\n", err)
return
@ -146,7 +67,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
return
}
if len(profile2.Groups[groupid].Timeline.Messages) != 210 {
t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
if len(profile2.Groups[groupid].Timeline.Messages) != 200 {
t.Errorf("Failed to load group's 200 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
}
}

View File

@ -1,4 +1,4 @@
package storage
package v0
import (
"crypto/rand"

View File

@ -1,9 +1,7 @@
package storage
package v0
import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"io/ioutil"
"os"
"path"
)
@ -16,10 +14,8 @@ type fileStore struct {
// FileStore is a primitive around storing encrypted files
type FileStore interface {
Write([]byte) error
Read() ([]byte, error)
Delete()
ChangePassword(newpass string)
Write(data []byte) error
}
// NewFileStore instantiates a fileStore given a filename and a password
@ -31,6 +27,10 @@ func NewFileStore(directory string, filename string, password string) FileStore
return filestore
}
func (fps *fileStore) Read() ([]byte, error) {
return readEncryptedFile(fps.directory, fps.filename, fps.password)
}
// write serializes a cwtchPeer to a file
func (fps *fileStore) Write(data []byte) error {
key, salt, _ := createKey(fps.password)
@ -44,18 +44,3 @@ func (fps *fileStore) Write(data []byte) error {
err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600)
return err
}
func (fps *fileStore) Read() ([]byte, error) {
return readEncryptedFile(fps.directory, fps.filename, fps.password)
}
func (fps *fileStore) Delete() {
err := os.Remove(path.Join(fps.directory, fps.filename))
if err != nil {
log.Errorf("Deleting file %v\n", err)
}
}
func (fps *fileStore) ChangePassword(newpass string) {
fps.password = newpass
}

120
storage/v0/profile_store.go Normal file
View File

@ -0,0 +1,120 @@
package v0
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"encoding/json"
"fmt"
"os"
"time"
)
const groupIDLen = 32
const peerIDLen = 56
const profileFilename = "profile"
// ProfileStoreV0 is a legacy profile store used now for upgrading legacy profile stores to newer versions
type ProfileStoreV0 struct {
fs FileStore
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
directory string
password string
profile *model.Profile
}
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV0 {
os.Mkdir(directory, 0700)
ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, streamStores: map[string]StreamStore{}}
if profile != nil {
ps.save()
}
return ps
}
// ReadProfile reads a profile from storqage and returns the profile
// directory should be $appDir/profiles/$rand
func ReadProfile(directory, password string) (*model.Profile, error) {
os.Mkdir(directory, 0700)
ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, streamStores: map[string]StreamStore{}}
err := ps.Load()
if err != nil {
return nil, err
}
profile := ps.getProfileCopy(true)
return profile, nil
}
/********************************************************************************************/
// AddGroup For testing, adds a group to the profile (and startsa stream store)
func (ps *ProfileStoreV0) AddGroup(invite []byte, peer string) {
gid, err := ps.profile.ProcessInvite(string(invite), peer)
if err == nil {
ps.save()
group := ps.profile.Groups[gid]
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
}
}
// AddGroupMessage for testing, adds a group message
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string) {
received, _ := time.Parse(time.RFC3339Nano, timeRecvied)
sent, _ := time.Parse(time.RFC3339Nano, timeSent)
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: []byte("signature"), PreviousMessageSig: []byte("PreviousSignature")}
ss, exists := ps.streamStores[groupid]
if exists {
ss.Write(message)
} else {
fmt.Println("ERROR")
}
}
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
func (ps *ProfileStoreV0) GetNewPeerMessage() *event.Event {
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
return &message
}
// Load instantiates a cwtchPeer from the file store
func (ps *ProfileStoreV0) Load() error {
decrypted, err := ps.fs.Read()
if err != nil {
return err
}
cp := new(model.Profile)
err = json.Unmarshal(decrypted, &cp)
if err == nil {
ps.profile = cp
for gid, group := range cp.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
cp.Groups[gid].Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
}
}
return err
}
func (ps *ProfileStoreV0) getProfileCopy(timeline bool) *model.Profile {
return ps.profile.GetCopy(timeline)
}
// Shutdown saves the storage system
func (ps *ProfileStoreV0) Shutdown() {
ps.save()
}
/************* Writing *************/
func (ps *ProfileStoreV0) save() error {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Write(bytes)
}

View File

@ -0,0 +1,70 @@
// Known race issue with event bus channel closure
package v0
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"log"
"os"
"testing"
"time"
)
const testProfileName = "Alice"
const testKey = "key"
const testVal = "value"
const testInitialMessage = "howdy"
const testMessage = "Hello from storage"
// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
return profile
}
func TestProfileStoreWriteRead(t *testing.T) {
log.Println("profile store test!")
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
profile := NewProfile(testProfileName)
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
profile.SetAttribute(testKey, testVal)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
ps1.AddGroup(invite, profile.Onion)
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage)
ps1.Shutdown()
ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil)
err = ps2.Load()
if err != nil {
t.Errorf("Error createing ProfileStoreV0: %v\n", err)
}
profile = ps2.getProfileCopy(true)
if profile.Name != testProfileName {
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
}
v, _ := profile.GetAttribute(testKey)
if v != testVal {
t.Errorf("Profile attribute '%v' incorrect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
}
group2 := ps2.getProfileCopy(true).Groups[groupid]
if group2 == nil {
t.Errorf("Group not loaded\n")
}
}

145
storage/v0/stream_store.go Normal file
View File

@ -0,0 +1,145 @@
package v0
import (
"cwtch.im/cwtch/model"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"io/ioutil"
"os"
"path"
"sync"
)
const (
fileStorePartitions = 16
bytesPerFile = 15 * 1024
)
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
type streamStore struct {
password string
storeDirectory string
filenameBase string
lock sync.Mutex
// Buffer is used just for current file to write to
messages []model.Message
bufferByteCount int
}
// StreamStore provides a stream like interface to encrypted storage
type StreamStore interface {
Read() []model.Message
Write(m model.Message)
}
// NewStreamStore returns an initialized StreamStore ready for reading and writing
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
os.Mkdir(ss.storeDirectory, 0700)
ss.initBuffer()
return ss
}
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
resp := []model.Message{}
for i := fileStorePartitions - 1; i >= 0; i-- {
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
if err != nil {
continue
}
msgs := []model.Message{}
json.Unmarshal([]byte(bytes), &msgs)
resp = append(resp, msgs...)
}
// 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
resp[i].Acknowledged = true
resp[i].ReceivedByServer = true
}
return resp
}
// ****** Writing *******/
func (ss *streamStore) WriteN(messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
for _, m := range messages {
ss.updateBuffer(m)
if ss.bufferByteCount > bytesPerFile {
ss.updateFile()
log.Debugf("rotating log file")
ss.rotateFileStore()
ss.initBuffer()
}
}
}
// Write adds a GroupMessage to the store
func (ss *streamStore) Write(m model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.updateBuffer(m)
ss.updateFile()
if ss.bufferByteCount > bytesPerFile {
log.Debugf("rotating log file")
ss.rotateFileStore()
ss.initBuffer()
}
}
func (ss *streamStore) initBuffer() {
ss.messages = []model.Message{}
ss.bufferByteCount = 0
}
func (ss *streamStore) updateBuffer(m model.Message) {
ss.messages = append(ss.messages, m)
ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message)
}
func (ss *streamStore) updateFile() error {
msgs, err := json.Marshal(ss.messages)
if err != nil {
log.Errorf("Failed to marshal group messages %v\n", err)
}
// ENCRYPT
key, salt, _ := createKey(ss.password)
encryptedMsgs, err := encryptFileData(msgs, key)
if err != nil {
log.Errorf("Failed to encrypt messages: %v\n", err)
return err
}
encryptedMsgs = append(salt[:], encryptedMsgs...)
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
return nil
}
func (ss *streamStore) rotateFileStore() {
os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))
for i := fileStorePartitions - 2; i >= 0; i-- {
os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
}
}

View File

@ -1,4 +1,4 @@
package storage
package v0
import (
"cwtch.im/cwtch/model"

72
storage/v1/file_enc.go Normal file
View File

@ -0,0 +1,72 @@
package v1
import (
"crypto/rand"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"golang.org/x/crypto/nacl/secretbox"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/crypto/sha3"
"io"
"io/ioutil"
"path"
)
// createKeySalt derives a key from a password: returns key, salt, err
func createKeySalt(password string) ([32]byte, [128]byte, error) {
var salt [128]byte
if _, err := io.ReadFull(rand.Reader, salt[:]); err != nil {
log.Errorf("Cannot read from random: %v\n", err)
return [32]byte{}, salt, err
}
dk := pbkdf2.Key([]byte(password), salt[:], 4096, 32, sha3.New512)
var dkr [32]byte
copy(dkr[:], dk)
return dkr, salt, nil
}
func createKey(password string, salt []byte) [32]byte {
dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512)
var dkr [32]byte
copy(dkr[:], dk)
return dkr
}
//encryptFileData encrypts the cwtchPeer via the specified key.
func encryptFileData(data []byte, key [32]byte) ([]byte, error) {
var nonce [24]byte
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
log.Errorf("Cannot read from random: %v\n", err)
return nil, err
}
encrypted := secretbox.Seal(nonce[:], data, &nonce, &key)
return encrypted, nil
}
//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key.
func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
var decryptNonce [24]byte
copy(decryptNonce[:], ciphertext[:24])
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key)
if ok {
return decrypted, nil
}
return nil, errors.New("Failed to decrypt")
}
// load instantiates a cwtchPeer from the file store
func readEncryptedFile(directory, filename string, key [32]byte) ([]byte, error) {
encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename))
if err == nil {
data, err := decryptFile(encryptedbytes, key)
if err == nil {
return data, nil
}
return nil, err
}
return nil, err
}

58
storage/v1/file_store.go Normal file
View File

@ -0,0 +1,58 @@
package v1
import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"io/ioutil"
"os"
"path"
)
// fileStore stores a cwtchPeer in an encrypted file
type fileStore struct {
directory string
filename string
key [32]byte
}
// FileStore is a primitive around storing encrypted files
type FileStore interface {
Write([]byte) error
Read() ([]byte, error)
Delete()
ChangeKey(newkey [32]byte)
}
// NewFileStore instantiates a fileStore given a filename and a password
func NewFileStore(directory string, filename string, key [32]byte) FileStore {
filestore := new(fileStore)
filestore.key = key
filestore.filename = filename
filestore.directory = directory
return filestore
}
// write serializes a cwtchPeer to a file
func (fps *fileStore) Write(data []byte) error {
encryptedbytes, err := encryptFileData(data, fps.key)
if err != nil {
return err
}
err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600)
return err
}
func (fps *fileStore) Read() ([]byte, error) {
return readEncryptedFile(fps.directory, fps.filename, fps.key)
}
func (fps *fileStore) Delete() {
err := os.Remove(path.Join(fps.directory, fps.filename))
if err != nil {
log.Errorf("Deleting file %v\n", err)
}
}
func (fps *fileStore) ChangeKey(newkey [32]byte) {
fps.key = newkey
}

406
storage/v1/profile_store.go Normal file
View File

@ -0,0 +1,406 @@
package v1
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"encoding/json"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"io/ioutil"
"os"
"path"
"time"
)
const groupIDLen = 32
const peerIDLen = 56
const profileFilename = "profile"
const version = "1"
const versionFile = "VERSION"
const saltFile = "SALT"
//ProfileStoreV1 storage for profiles and message streams that uses in memory key and fs stored salt instead of in memory password
type ProfileStoreV1 struct {
fs FileStore
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
directory string
profile *model.Profile
key [32]byte
salt [128]byte
eventManager event.Manager
queue event.Queue
writer bool
}
func initV1Directory(directory, password string) ([32]byte, [128]byte, error) {
key, salt, err := createKeySalt(password)
if err != nil {
log.Errorf("Could not create key for profile store from password: %v\n", err)
return [32]byte{}, [128]byte{}, err
}
ioutil.WriteFile(path.Join(directory, versionFile), []byte(version), 0600)
ioutil.WriteFile(path.Join(directory, saltFile), salt[:], 0600)
return key, salt, nil
}
// CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV1 {
os.Mkdir(directory, 0700)
key, salt, err := initV1Directory(directory, password)
if err != nil {
return nil
}
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
ps.save()
ps.initProfileWriterStore()
return ps
}
func (ps *ProfileStoreV1) initProfileWriterStore() {
ps.queue = event.NewQueue()
go ps.eventHandler()
ps.eventManager.Subscribe(event.BlockPeer, ps.queue)
ps.eventManager.Subscribe(event.UnblockPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
}
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (*ProfileStoreV1, error) {
salt, err := ioutil.ReadFile(path.Join(directory, saltFile))
if err != nil {
return nil, err
}
key := createKey(password, salt)
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
copy(ps.salt[:], salt)
ps.initProfileWriterStore()
ps.load()
return ps, nil
}
// ReadProfile reads a profile from storqage and returns the profile
// directory should be $appDir/profiles/$rand
func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) {
os.Mkdir(directory, 0700)
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
err := ps.load()
if err != nil {
return nil, err
}
profile := ps.GetProfileCopy(true)
return profile, nil
}
// UpgradeV0Profile takes a profile (presumably from a V0 store) and creates and writes a V1 store
func UpgradeV0Profile(profile *model.Profile, directory, password string) error {
key, salt, err := initV1Directory(directory, password)
if err != nil {
return err
}
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
ps.save()
for gid, group := range ps.profile.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
ss.WriteN(ps.profile.Groups[gid].Timeline.Messages)
}
return nil
}
// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
return profile
}
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
func (ps *ProfileStoreV1) GetNewPeerMessage() *event.Event {
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Key, string(ps.key[:]), event.Salt, string(ps.salt[:]))
return &message
}
// GetStatusMessages creates an array of status messages for all peers and group servers from current information
func (ps *ProfileStoreV1) GetStatusMessages() []*event.Event {
messages := []*event.Event{}
for _, contact := range ps.profile.Contacts {
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(contact.Onion),
event.ConnectionState: contact.State,
})
messages = append(messages, &message)
}
doneServers := make(map[string]bool)
for _, group := range ps.profile.Groups {
if _, exists := doneServers[group.GroupServer]; !exists {
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(group.GroupServer),
event.ConnectionState: group.State,
})
messages = append(messages, &message)
doneServers[group.GroupServer] = true
}
}
return messages
}
// ChangePassword restores all data under a new password's encryption
func (ps *ProfileStoreV1) ChangePassword(oldpass, newpass, eventID string) {
oldkey := createKey(oldpass, ps.salt[:])
if oldkey != ps.key {
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID))
return
}
newkey := createKey(newpass, ps.salt[:])
newStreamStores := map[string]StreamStore{}
idToNewLocalID := map[string]string{}
// Generate all new StreamStores with the new password and write all the old StreamStore data into these ones
for ssid, ss := range ps.streamStores {
// New ss with new pass and new localID
newlocalID := model.GenerateRandomID()
idToNewLocalID[ssid] = newlocalID
newSS := NewStreamStore(ps.directory, newlocalID, newkey)
newStreamStores[ssid] = newSS
// write whole store
messages := ss.Read()
newSS.WriteN(messages)
}
// Switch over
oldStreamStores := ps.streamStores
ps.streamStores = newStreamStores
for ssid, newLocalID := range idToNewLocalID {
if len(ssid) == groupIDLen {
ps.profile.Groups[ssid].LocalID = newLocalID
} else {
ps.profile.Contacts[ssid].LocalID = newLocalID
}
}
ps.key = newkey
ps.fs.ChangeKey(newkey)
ps.save()
// Clean up
for _, oldss := range oldStreamStores {
oldss.Delete()
}
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID))
return
}
func (ps *ProfileStoreV1) save() error {
if ps.writer {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Write(bytes)
}
return nil
}
// load instantiates a cwtchPeer from the file store
func (ps *ProfileStoreV1) load() error {
decrypted, err := ps.fs.Read()
if err != nil {
return err
}
cp := new(model.Profile)
err = json.Unmarshal(decrypted, &cp)
if err == nil {
ps.profile = cp
for gid, group := range cp.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
cp.Groups[gid].Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
}
}
return err
}
// GetProfileCopy returns a copy of the stored profile
func (ps *ProfileStoreV1) GetProfileCopy(timeline bool) *model.Profile {
return ps.profile.GetCopy(timeline)
}
func (ps *ProfileStoreV1) eventHandler() {
log.Debugln("eventHandler()!")
for {
ev := ps.queue.Next()
log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID)
switch ev.EventType {
case event.BlockPeer:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.Blocked = true
ps.save()
}
case event.UnblockPeer:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.Blocked = false
ps.save()
}
case event.PeerCreated:
var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
// TODO: configure - allow peers to be configured to turn on limited storage
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
pp.Timeline.SetMessages(ss.Read())
ps.streamStores[pp.Onion] = ss
ps.save()*/
case event.GroupCreated:
var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
ps.profile.AddGroup(group)
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
ps.save()
case event.SetProfileName:
ps.profile.Name = ev.Data[event.ProfileName]
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
ps.save()
case event.SetAttribute:
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
case event.SetPeerAttribute:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
} else {
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
}
case event.SetGroupAttribute:
group := ps.profile.GetGroup(ev.Data[event.GroupID])
if group != nil {
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
} else {
log.Errorf("error setting attribute on group %v group does not exist", ev)
}
case event.AcceptGroupInvite:
err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
if err == nil {
ps.save()
} else {
log.Errorf("error accepting group invite")
}
case event.NewGroupInvite:
gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
log.Errorf("gid: %v err:%v\n", gid, err)
if err == nil {
ps.save()
group := ps.profile.Groups[gid]
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
} else {
log.Errorf("error storing new group invite: %v (%v)", err, ev)
}
case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
ss, exists := ps.streamStores[groupid]
if exists {
ss.Write(message)
} else {
log.Errorf("error storing new group message: %v stream store does not exist", ev)
}
case event.PeerStateChange:
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
}
case event.ServerStateChange:
for _, group := range ps.profile.Groups {
if group.GroupServer == ev.Data[event.GroupServer] {
group.State = ev.Data[event.ConnectionState]
}
}
case event.DeleteContact:
onion := ev.Data[event.RemotePeer]
ps.profile.DeleteContact(onion)
ps.save()
case event.DeleteGroup:
groupID := ev.Data[event.GroupID]
ps.profile.DeleteGroup(groupID)
ps.save()
ss, exists := ps.streamStores[groupID]
if exists {
ss.Delete()
delete(ps.streamStores, groupID)
}
case event.ChangePassword:
oldpass := ev.Data[event.Password]
newpass := ev.Data[event.NewPassword]
ps.ChangePassword(oldpass, newpass, ev.EventID)
default:
return
}
}
}
// Shutdown shuts down the queue / thread
func (ps *ProfileStoreV1) Shutdown() {
if ps.queue != nil {
ps.queue.Shutdown()
}
}
// Delete removes all stored files for this stored profile
func (ps *ProfileStoreV1) Delete() {
log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion)
for _, ss := range ps.streamStores {
ss.Delete()
}
ps.fs.Delete()
err := os.RemoveAll(ps.directory)
if err != nil {
log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err)
}
}

View File

@ -0,0 +1,150 @@
// Known race issue with event bus channel closure
package v1
import (
"cwtch.im/cwtch/event"
"fmt"
"log"
"os"
"testing"
"time"
)
const testProfileName = "Alice"
const testKey = "key"
const testVal = "value"
const testInitialMessage = "howdy"
const testMessage = "Hello from storage"
func TestProfileStoreWriteRead(t *testing.T) {
log.Println("profile store test!")
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
profile := NewProfile(testProfileName)
ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile)
eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
time.Sleep(1 * time.Second)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
event.Data: testMessage,
}))
time.Sleep(1 * time.Second)
ps1.Shutdown()
ps2, err := LoadProfileWriterStore(eventBus, testingDir, password)
if err != nil {
t.Errorf("Error createing ProfileStoreV1: %v\n", err)
}
profile = ps2.GetProfileCopy(true)
if profile.Name != testProfileName {
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
}
v, _ := profile.GetAttribute(testKey)
if v != testVal {
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
}
group2 := ps2.GetProfileCopy(true).Groups[groupid]
if group2 == nil {
t.Errorf("Group not loaded\n")
}
}
func TestProfileStoreChangePassword(t *testing.T) {
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
queue := event.NewQueue()
eventBus.Subscribe(event.ChangePasswordSuccess, queue)
profile := NewProfile(testProfileName)
ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
fmt.Println("Sending 200 messages...")
for i := 0; i < 200; i++ {
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: profile.Onion,
event.Data: testMessage,
}))
}
newPass := "qwerty123"
fmt.Println("Sending Change Passwords event...")
eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass))
ev := queue.Next()
if ev.EventType != event.ChangePasswordSuccess {
t.Errorf("Unexpected event response detected %v\n", ev.EventType)
return
}
fmt.Println("Sending 10 more messages...")
for i := 0; i < 10; i++ {
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: profile.Onion,
event.Data: testMessage,
}))
}
time.Sleep(3 * time.Second)
fmt.Println("Shutdown profile store...")
ps1.Shutdown()
fmt.Println("New Profile store...")
ps2, err := LoadProfileWriterStore(eventBus, testingDir, newPass)
if err != nil {
t.Errorf("Error createing new ProfileStoreV1 with new password: %v\n", err)
return
}
profile2 := ps2.GetProfileCopy(true)
if profile2.Groups[groupid] == nil {
t.Errorf("Failed to load group %v\n", groupid)
return
}
if len(profile2.Groups[groupid].Timeline.Messages) != 210 {
t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
}
}

View File

@ -1,4 +1,4 @@
package storage
package v1
import (
"cwtch.im/cwtch/model"
@ -18,7 +18,7 @@ const (
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
type streamStore struct {
password string
key [32]byte
storeDirectory string
filenameBase string
@ -36,12 +36,11 @@ type StreamStore interface {
WriteN(messages []model.Message)
Read() []model.Message
Delete()
ChangePassword(newpass string)
}
// NewStreamStore returns an initialized StreamStore ready for reading and writing
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
func NewStreamStore(directory string, filenameBase string, key [32]byte) (store StreamStore) {
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, key: key}
os.Mkdir(ss.storeDirectory, 0700)
ss.initBuffer()
@ -58,7 +57,7 @@ func (ss *streamStore) initBuffer() {
func (ss *streamStore) initBufferFromStorage() error {
filename := fmt.Sprintf("%s.%d", ss.filenameBase, 0)
bytes, _ := readEncryptedFile(ss.storeDirectory, filename, ss.password)
bytes, _ := readEncryptedFile(ss.storeDirectory, filename, ss.key)
msgs := []model.Message{}
err := json.Unmarshal([]byte(bytes), &msgs)
@ -84,13 +83,11 @@ func (ss *streamStore) updateFile() error {
}
// ENCRYPT
key, salt, _ := createKey(ss.password)
encryptedMsgs, err := encryptFileData(msgs, key)
encryptedMsgs, err := encryptFileData(msgs, ss.key)
if err != nil {
log.Errorf("Failed to encrypt messages: %v\n", err)
return err
}
encryptedMsgs = append(salt[:], encryptedMsgs...)
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
return nil
@ -121,7 +118,7 @@ func (ss *streamStore) Read() (messages []model.Message) {
for i := fileStorePartitions - 1; i >= 0; i-- {
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.key)
if err != nil {
continue
}
@ -131,12 +128,6 @@ func (ss *streamStore) Read() (messages []model.Message) {
resp = append(resp, msgs...)
}
// 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
resp[i].Acknowledged = true
resp[i].ReceivedByServer = true
}
return resp
}
@ -158,16 +149,17 @@ func (ss *streamStore) WriteN(messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
log.Infof("WriteN %v messages\n", len(messages))
i := 0
for _, m := range messages {
ss.updateBuffer(m)
i++
if ss.bufferByteCount > bytesPerFile {
ss.updateFile()
log.Debugf("rotating log file")
ss.rotateFileStore()
ss.initBuffer()
}
}
ss.updateFile()
}
func (ss *streamStore) ChangePassword(newpass string) {}

View File

@ -0,0 +1,52 @@
package v1
import (
"cwtch.im/cwtch/model"
"os"
"testing"
)
const testingDir = "./testing"
const filenameBase = "testStream"
const password = "asdfqwer"
const line1 = "Hello from storage!"
func TestStreamStoreWriteRead(t *testing.T) {
os.Remove(".test.json")
os.RemoveAll(testingDir)
os.Mkdir(testingDir, 0777)
key, _, _ := createKeySalt(password)
ss1 := NewStreamStore(testingDir, filenameBase, key)
m := model.Message{Message: line1}
ss1.Write(m)
ss2 := NewStreamStore(testingDir, filenameBase, key)
messages := ss2.Read()
if len(messages) != 1 {
t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages))
}
if messages[0].Message != line1 {
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
}
}
func TestStreamStoreWriteReadRotate(t *testing.T) {
os.Remove(".test.json")
os.RemoveAll(testingDir)
os.Mkdir(testingDir, 0777)
key, _, _ := createKeySalt(password)
ss1 := NewStreamStore(testingDir, filenameBase, key)
m := model.Message{Message: line1}
for i := 0; i < 400; i++ {
ss1.Write(m)
}
ss2 := NewStreamStore(testingDir, filenameBase, key)
messages := ss2.Read()
if len(messages) != 400 {
t.Errorf("Read messages has wrong length. Expected: 400 Actual: %d\n", len(messages))
}
if messages[0].Message != line1 {
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
}
}

View File

@ -5,6 +5,8 @@ pwd
GORACE="haltonerror=1"
go test -race ${1} -coverprofile=model.cover.out -v ./model
go test -race ${1} -coverprofile=event.cover.out -v ./event
go test -race ${1} -coverprofile=storage.v0.cover.out -v ./storage/v0
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
go test -race ${1} -coverprofile=storage.cover.out -v ./storage
go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
go test -race ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam