diff --git a/app/app.go b/app/app.go index e4427ec..2d040be 100644 --- a/app/app.go +++ b/app/app.go @@ -114,7 +114,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin return } - profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile) + profileStore := storage.CreateProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile) app.storage[profile.Onion] = profileStore pc := app.storage[profile.Onion].GetProfileCopy(true) @@ -182,8 +182,7 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf for _, file := range files { eventBus := event.NewEventManager() - profileStore := storage.NewProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password, nil) - err = profileStore.Load() + profileStore, err := storage.LoadProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password) if err != nil { continue } diff --git a/app/appClient.go b/app/appClient.go index 8a6bd59..fd016ba 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -16,7 +16,7 @@ type applicationClient struct { applicationBridge appletPeers - appBus event.Manager + appBus event.Manager acmutex sync.Mutex } @@ -42,9 +42,10 @@ func (ac *applicationClient) handleEvent(ev *event.Event) { switch ev.EventType { case event.NewPeer: localID := ev.Data[event.Identity] - password := ev.Data[event.Password] - reload := ev.Data[event.Status] == "running" - ac.newPeer(localID, password, reload) + key := ev.Data[event.Key] + salt := ev.Data[event.Salt] + reload := ev.Data[event.Status] == event.StorageRunning + ac.newPeer(localID, key, salt, reload) case event.DeletePeer: onion := ev.Data[event.Identity] ac.handleDeletedPeer(onion) @@ -59,8 +60,12 @@ func (ac *applicationClient) handleEvent(ev *event.Event) { } } -func (ac *applicationClient) newPeer(localID, password string, reload bool) { - profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password) +func (ac *applicationClient) newPeer(localID, key, salt string, reload bool) { + var keyBytes [32]byte + var saltBytes [128]byte + copy(keyBytes[:], key) + copy(saltBytes[:], salt) + profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), keyBytes, saltBytes) if err != nil { log.Errorf("Could not read profile for NewPeer event: %v\n", err) ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("Could not read profile for NewPeer event: %v\n", err))) diff --git a/app/appService.go b/app/appService.go index e52ae48..03055a5 100644 --- a/app/appService.go +++ b/app/appService.go @@ -65,7 +65,9 @@ func (as *applicationService) handleEvent(ev *event.Event) { as.loadProfiles(password) case event.ReloadClient: for _, storage := range as.storage { - message := event.IPCMessage{Dest: DestApp, Message: *storage.GetNewPeerMessage()} + peerMsg := *storage.GetNewPeerMessage() + peerMsg.Data[event.Status] = event.StorageRunning + message := event.IPCMessage{Dest: DestApp, Message: peerMsg} as.bridge.Write(&message) } @@ -103,7 +105,7 @@ func (as *applicationService) createPeer(name, password, tag string) { profile.SetAttribute(AttributeTag, tag) } - profileStore := storage.NewProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile) + profileStore := storage.CreateProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile) blockedPeers := profile.BlockedPeers() // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. @@ -113,7 +115,9 @@ func (as *applicationService) createPeer(name, password, tag string) { as.storage[profile.Onion] = profileStore as.engines[profile.Onion] = engine - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})} + peerMsg := *profileStore.GetNewPeerMessage() + peerMsg.Data[event.Status] = event.StorageNew + message := event.IPCMessage{Dest: DestApp, Message: peerMsg} as.bridge.Write(&message) } @@ -129,7 +133,10 @@ func (as *applicationService) loadProfiles(password string) { as.storage[profile.Onion] = profileStore as.engines[profile.Onion] = engine as.asmutex.Unlock() - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})} + + peerMsg := *profileStore.GetNewPeerMessage() + peerMsg.Data[event.Status] = event.StorageNew + message := event.IPCMessage{Dest: DestApp, Message: peerMsg} as.bridge.Write(&message) count++ }) diff --git a/event/common.go b/event/common.go index 688b2cb..e46fe16 100644 --- a/event/common.go +++ b/event/common.go @@ -142,7 +142,7 @@ const ( CreatePeer = Type("CreatePeer") // service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')] - // app -> Identity(onion) + // app -> Key, Salt NewPeer = Type("NewPeer") // Identity(onion) @@ -184,6 +184,10 @@ const ( // Error: Description of the Error // Onion: the local onion we attempt to check NetworkStatus = Type("NetworkError") + + // For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue + Syn = Type("Syn") + Ack = Type("Ack") ) // Field defines common event attributes @@ -217,6 +221,8 @@ const ( Key = Field("Key") Data = Field("Data") + Salt = Field("Salt") + Error = Field("Error") Progreess = Field("Progress") @@ -230,6 +236,12 @@ const ( AppErrLoaded0 = "Loaded 0 profiles" ) +// Values to be suplied in event.NewPeer for Status +const ( + StorageRunning = "running" + StorageNew = "new" +) + // Defining Protocol Contexts const ( ContextAck = "im.cwtch.acknowledgement" diff --git a/server/metrics/metrics_test.go b/server/metrics/metrics_test.go index 5897d58..b200681 100644 --- a/server/metrics/metrics_test.go +++ b/server/metrics/metrics_test.go @@ -9,13 +9,20 @@ func TestCounter(t *testing.T) { starttime := time.Now() c := NewCounter() - for i := 0; i < 100; i++ { + max := 100 + done := make(chan bool, max) + + // slightly stress test atomic nature of metric by flooding with threads Add()ing + for i := 0; i < max; i++ { go func() { c.Add(1) + done <- true }() } - time.Sleep(2 * time.Second) + for i := 0; i < max; i++ { + <- done + } val := c.Count() if val != 100 { diff --git a/storage/profile_store.go b/storage/profile_store.go index 52affee..3a840d3 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -3,30 +3,20 @@ package storage import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" - "encoding/json" + "cwtch.im/cwtch/storage/v0" + "cwtch.im/cwtch/storage/v1" "git.openprivacy.ca/openprivacy/libricochet-go/log" - "os" - "time" + "io/ioutil" + "path" + "strconv" ) -const groupIDLen = 32 -const peerIDLen = 56 const profileFilename = "profile" - -type profileStore struct { - fs FileStore - streamStores map[string]StreamStore // map [groupId|onion] StreamStore - directory string - password string - profile *model.Profile - eventManager event.Manager - queue event.Queue - writer bool -} +const versionFile = "VERSION" +const currentVersion = 1 // ProfileStore is an interface to managing the storage of Cwtch Profiles type ProfileStore interface { - Load() error Shutdown() Delete() GetProfileCopy(timeline bool) *model.Profile @@ -34,52 +24,25 @@ type ProfileStore interface { GetStatusMessages() []*event.Event } -// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them +// CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them // directory should be $appDir/profiles/$rand -func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore { - os.Mkdir(directory, 0700) - ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} - //ps.queue = event.NewQueue(100) - ps.queue = event.NewQueue() - if profile != nil { - ps.save() - } - go ps.eventHandler() - - ps.eventManager.Subscribe(event.BlockPeer, ps.queue) - ps.eventManager.Subscribe(event.UnblockPeer, ps.queue) - ps.eventManager.Subscribe(event.PeerCreated, ps.queue) - ps.eventManager.Subscribe(event.GroupCreated, ps.queue) - ps.eventManager.Subscribe(event.SetProfileName, ps.queue) - ps.eventManager.Subscribe(event.SetAttribute, ps.queue) - ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue) - ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue) - ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue) - ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue) - ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) - ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) - ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) - ps.eventManager.Subscribe(event.DeleteContact, ps.queue) - ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) - ps.eventManager.Subscribe(event.ChangePassword, ps.queue) - - return ps +func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore { + return v1.CreateProfileWriterStore(eventManager, directory, password, profile) } -// ReadProfile reads a profile from storqage and returns the profile +// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them // directory should be $appDir/profiles/$rand -func ReadProfile(directory, password string) (*model.Profile, error) { - os.Mkdir(directory, 0700) - ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} +func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (ProfileStore, error) { + versionCheckUpgrade(directory, password) - err := ps.Load() - if err != nil { - return nil, err - } + return v1.LoadProfileWriterStore(eventManager, directory, password) +} - profile := ps.GetProfileCopy(true) - - return profile, nil +// ReadProfile reads a profile from storage and returns the profile +// Should only be called for cache refresh of the profile after a ProfileWriterStore has opened +// (and upgraded) the store, and thus supplied the key/salt +func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) { + return v1.ReadProfile(directory, key, salt) } // NewProfile creates a new profile for use in the profile store. @@ -88,254 +51,43 @@ func NewProfile(name string) *model.Profile { return profile } -// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers -func (ps *profileStore) GetNewPeerMessage() *event.Event { - message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running") - return &message +// ********* Versioning and upgrade ********** + +func detectVersion(directory string) int { + vnumberStr, err := ioutil.ReadFile(path.Join(directory, versionFile)) + if err != nil { + return 0 + } + vnumber, err := strconv.Atoi(string(vnumberStr)) + if err != nil { + log.Errorf("Could not parse VERSION file contents: '%v' - %v\n", vnumber, err) + return -1 + } + return vnumber } -func (ps *profileStore) GetStatusMessages() []*event.Event { - messages := []*event.Event{} - for _, contact := range ps.profile.Contacts { - message := event.NewEvent(event.PeerStateChange, map[event.Field]string{ - event.RemotePeer: string(contact.Onion), - event.ConnectionState: contact.State, - }) - messages = append(messages, &message) - } - - doneServers := make(map[string]bool) - for _, group := range ps.profile.Groups { - if _, exists := doneServers[group.GroupServer]; !exists { - message := event.NewEvent(event.ServerStateChange, map[event.Field]string{ - event.GroupServer: string(group.GroupServer), - event.ConnectionState: group.State, - }) - messages = append(messages, &message) - doneServers[group.GroupServer] = true - } - } - - return messages -} - -func (ps *profileStore) ChangePassword(oldpass, newpass, eventID string) { - if oldpass != ps.password { - ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID)) - return - } - - newStreamStores := map[string]StreamStore{} - idToNewLocalID := map[string]string{} - - // Generate all new StreamStores with the new password and write all the old StreamStore data into these ones - for ssid, ss := range ps.streamStores { - // New ss with new pass and new localID - newlocalID := model.GenerateRandomID() - idToNewLocalID[ssid] = newlocalID - - newSS := NewStreamStore(ps.directory, newlocalID, newpass) - newStreamStores[ssid] = newSS - - // write whole store - messages := ss.Read() - newSS.WriteN(messages) - } - - // Switch over - oldStreamStores := ps.streamStores - ps.streamStores = newStreamStores - for ssid, newLocalID := range idToNewLocalID { - if len(ssid) == groupIDLen { - ps.profile.Groups[ssid].LocalID = newLocalID - } else { - ps.profile.Contacts[ssid].LocalID = newLocalID - } - } - - ps.password = newpass - ps.fs.ChangePassword(newpass) - ps.save() - - // Clean up - for _, oldss := range oldStreamStores { - oldss.Delete() - } - - ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID)) - return -} - -func (ps *profileStore) save() error { - if ps.writer { - bytes, _ := json.Marshal(ps.profile) - return ps.fs.Write(bytes) - } - return nil -} - -// Read instantiates a cwtchPeer from the file store -func (ps *profileStore) Load() error { - decrypted, err := ps.fs.Read() +func upgradeV0ToV1(directory, password string) error { + log.Debugln("Attempting storage v0 to v1: Reading v0 profile...") + profile, err := v0.ReadProfile(directory, password) if err != nil { return err } - cp := new(model.Profile) - err = json.Unmarshal(decrypted, &cp) - if err == nil { - ps.profile = cp - for gid, group := range cp.Groups { - ss := NewStreamStore(ps.directory, group.LocalID, ps.password) + log.Debugln("Attempting storage v0 to v1: Writing v1 profile...") + return v1.UpgradeV0Profile(profile, directory, password) +} - cp.Groups[gid].Timeline.SetMessages(ss.Read()) - ps.streamStores[group.GroupID] = ss - } +func versionCheckUpgrade(directory, password string) { + version := detectVersion(directory) + log.Infof("versionCheck: %v\n", version) + if version == -1 { + return } - - return err -} - -func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile { - return ps.profile.GetCopy(timeline) -} - -func (ps *profileStore) eventHandler() { - log.Debugln("eventHandler()!") - for { - ev := ps.queue.Next() - log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID) - - switch ev.EventType { - case event.BlockPeer: - contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) - if exists { - contact.Blocked = true - ps.save() - } - case event.UnblockPeer: - contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) - if exists { - contact.Blocked = false - ps.save() - } - case event.PeerCreated: - var pp *model.PublicProfile - json.Unmarshal([]byte(ev.Data[event.Data]), &pp) - ps.profile.AddContact(ev.Data[event.RemotePeer], pp) - // TODO: configure - allow peers to be configured to turn on limited storage - /*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password) - pp.Timeline.SetMessages(ss.Read()) - ps.streamStores[pp.Onion] = ss - ps.save()*/ - case event.GroupCreated: - var group *model.Group - json.Unmarshal([]byte(ev.Data[event.Data]), &group) - ps.profile.AddGroup(group) - ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password) - ps.save() - case event.SetProfileName: - ps.profile.Name = ev.Data[event.ProfileName] - ps.profile.SetAttribute("name", ev.Data[event.ProfileName]) - ps.save() - case event.SetAttribute: - ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) - ps.save() - case event.SetPeerAttribute: - contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) - if exists { - contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) - ps.save() - } else { - log.Errorf("error setting attribute on peer %v peer does not exist", ev) - } - case event.SetGroupAttribute: - group := ps.profile.GetGroup(ev.Data[event.GroupID]) - if group != nil { - group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) - ps.save() - } else { - log.Errorf("error setting attribute on group %v group does not exist", ev) - } - case event.AcceptGroupInvite: - err := ps.profile.AcceptInvite(ev.Data[event.GroupID]) - if err == nil { - ps.save() - } else { - log.Errorf("error accepting group invite") - } - case event.NewGroupInvite: - gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer]) - log.Errorf("gid: %v err:%v\n", gid, err) - if err == nil { - ps.save() - group := ps.profile.Groups[gid] - ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password) - } else { - log.Errorf("error storing new group invite: %v (%v)", err, ev) - } - case event.NewMessageFromGroup: - groupid := ev.Data[event.GroupID] - received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) - sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) - message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])} - ss, exists := ps.streamStores[groupid] - if exists { - ss.Write(message) - } else { - log.Errorf("error storing new group message: %v stream store does not exist", ev) - } - case event.PeerStateChange: - if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists { - ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] - } - case event.ServerStateChange: - for _, group := range ps.profile.Groups { - if group.GroupServer == ev.Data[event.GroupServer] { - group.State = ev.Data[event.ConnectionState] - } - } - case event.DeleteContact: - onion := ev.Data[event.RemotePeer] - ps.profile.DeleteContact(onion) - ps.save() - case event.DeleteGroup: - groupID := ev.Data[event.GroupID] - ps.profile.DeleteGroup(groupID) - ps.save() - ss, exists := ps.streamStores[groupID] - if exists { - ss.Delete() - delete(ps.streamStores, groupID) - } - case event.ChangePassword: - oldpass := ev.Data[event.Password] - newpass := ev.Data[event.NewPassword] - ps.ChangePassword(oldpass, newpass, ev.EventID) - default: + if version == 0 { + err := upgradeV0ToV1(directory, password) + if err != nil { return } - - } -} - -func (ps *profileStore) Shutdown() { - if ps.queue != nil { - ps.queue.Shutdown() - } -} - -func (ps *profileStore) Delete() { - log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion) - - for _, ss := range ps.streamStores { - ss.Delete() - } - - ps.fs.Delete() - - err := os.RemoveAll(ps.directory) - if err != nil { - log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err) + //version = 1 } } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index 64afb27..afb4658 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -4,83 +4,35 @@ package storage import ( "cwtch.im/cwtch/event" + "cwtch.im/cwtch/storage/v0" "fmt" - "log" + "git.openprivacy.ca/openprivacy/libricochet-go/log" "os" "testing" "time" ) +const testingDir = "./testing" +const filenameBase = "testStream" +const password = "asdfqwer" +const line1 = "Hello from storage!" const testProfileName = "Alice" const testKey = "key" const testVal = "value" const testInitialMessage = "howdy" const testMessage = "Hello from storage" -func TestProfileStoreWriteRead(t *testing.T) { - log.Println("profile store test!") - os.RemoveAll(testingDir) - eventBus := event.NewEventManager() - profile := NewProfile(testProfileName) - ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) - - eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal})) - time.Sleep(1 * time.Second) - - groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - if err != nil { - t.Errorf("Creating group: %v\n", err) - } - if err != nil { - t.Errorf("Creating group invite: %v\n", err) - } - - eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) - time.Sleep(1 * time.Second) - - eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ - event.GroupID: groupid, - event.TimestampSent: time.Now().Format(time.RFC3339Nano), - event.TimestampReceived: time.Now().Format(time.RFC3339Nano), - event.RemotePeer: ps1.GetProfileCopy(true).Onion, - event.Data: testMessage, - })) - time.Sleep(1 * time.Second) - - ps1.Shutdown() - - ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil) - err = ps2.Load() - if err != nil { - t.Errorf("Error createing profileStore: %v\n", err) - } - - profile = ps2.GetProfileCopy(true) - if profile.Name != testProfileName { - t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name) - } - - v, _ := profile.GetAttribute(testKey) - if v != testVal { - t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v) - } - - group2 := ps2.GetProfileCopy(true).Groups[groupid] - if group2 == nil { - t.Errorf("Group not loaded\n") - } - -} - -func TestProfileStoreChangePassword(t *testing.T) { +func TestProfileStoreUpgradeV0toV1(t *testing.T) { + log.SetLevel(log.LevelDebug) os.RemoveAll(testingDir) eventBus := event.NewEventManager() queue := event.NewQueue() eventBus.Subscribe(event.ChangePasswordSuccess, queue) + fmt.Println("Creating and initializing v0 profile and store...") profile := NewProfile(testProfileName) - ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) + ps1 := v0.NewProfileWriterStore(eventBus, testingDir, password, profile) groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") if err != nil { @@ -90,50 +42,19 @@ func TestProfileStoreChangePassword(t *testing.T) { t.Errorf("Creating group invite: %v\n", err) } - eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) - time.Sleep(1 * time.Second) + ps1.AddGroup(invite, profile.Onion) fmt.Println("Sending 200 messages...") for i := 0; i < 200; i++ { - eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ - event.GroupID: groupid, - event.TimestampSent: time.Now().Format(time.RFC3339Nano), - event.TimestampReceived: time.Now().Format(time.RFC3339Nano), - event.RemotePeer: ps1.GetProfileCopy(true).Onion, - event.Data: testMessage, - })) + ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage) } - newPass := "qwerty123" - - fmt.Println("Sending Change Passwords event...") - eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass)) - - ev := queue.Next() - if ev.EventType != event.ChangePasswordSuccess { - t.Errorf("Unexpected event response detected %v\n", ev.EventType) - return - } - - fmt.Println("Sending 10 more messages...") - for i := 0; i < 10; i++ { - eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ - event.GroupID: groupid, - event.TimestampSent: time.Now().Format(time.RFC3339Nano), - event.TimestampReceived: time.Now().Format(time.RFC3339Nano), - event.RemotePeer: ps1.GetProfileCopy(true).Onion, - event.Data: testMessage, - })) - } - time.Sleep(3 * time.Second) - - fmt.Println("Shutdown profile store...") + fmt.Println("Shutdown v0 profile store...") ps1.Shutdown() - fmt.Println("New Profile store...") - ps2 := NewProfileWriterStore(eventBus, testingDir, newPass, nil) - err = ps2.Load() + fmt.Println("New v1 Profile store...") + ps2, err := LoadProfileWriterStore(eventBus, testingDir, password) if err != nil { t.Errorf("Error createing new profileStore with new password: %v\n", err) return @@ -146,7 +67,7 @@ func TestProfileStoreChangePassword(t *testing.T) { return } - if len(profile2.Groups[groupid].Timeline.Messages) != 210 { - t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages)) + if len(profile2.Groups[groupid].Timeline.Messages) != 200 { + t.Errorf("Failed to load group's 200 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages)) } } diff --git a/storage/file_enc.go b/storage/v0/file_enc.go similarity index 99% rename from storage/file_enc.go rename to storage/v0/file_enc.go index 2cad40a..7a12896 100644 --- a/storage/file_enc.go +++ b/storage/v0/file_enc.go @@ -1,4 +1,4 @@ -package storage +package v0 import ( "crypto/rand" diff --git a/storage/file_store.go b/storage/v0/file_store.go similarity index 75% rename from storage/file_store.go rename to storage/v0/file_store.go index ebd6c23..06b6ee1 100644 --- a/storage/file_store.go +++ b/storage/v0/file_store.go @@ -1,9 +1,7 @@ -package storage +package v0 import ( - "git.openprivacy.ca/openprivacy/libricochet-go/log" "io/ioutil" - "os" "path" ) @@ -16,10 +14,8 @@ type fileStore struct { // FileStore is a primitive around storing encrypted files type FileStore interface { - Write([]byte) error Read() ([]byte, error) - Delete() - ChangePassword(newpass string) + Write(data []byte) error } // NewFileStore instantiates a fileStore given a filename and a password @@ -31,6 +27,10 @@ func NewFileStore(directory string, filename string, password string) FileStore return filestore } +func (fps *fileStore) Read() ([]byte, error) { + return readEncryptedFile(fps.directory, fps.filename, fps.password) +} + // write serializes a cwtchPeer to a file func (fps *fileStore) Write(data []byte) error { key, salt, _ := createKey(fps.password) @@ -44,18 +44,3 @@ func (fps *fileStore) Write(data []byte) error { err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600) return err } - -func (fps *fileStore) Read() ([]byte, error) { - return readEncryptedFile(fps.directory, fps.filename, fps.password) -} - -func (fps *fileStore) Delete() { - err := os.Remove(path.Join(fps.directory, fps.filename)) - if err != nil { - log.Errorf("Deleting file %v\n", err) - } -} - -func (fps *fileStore) ChangePassword(newpass string) { - fps.password = newpass -} diff --git a/storage/v0/profile_store.go b/storage/v0/profile_store.go new file mode 100644 index 0000000..e7bb165 --- /dev/null +++ b/storage/v0/profile_store.go @@ -0,0 +1,120 @@ +package v0 + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "encoding/json" + "fmt" + "os" + "time" +) + +const groupIDLen = 32 +const peerIDLen = 56 +const profileFilename = "profile" + +// ProfileStoreV0 is a legacy profile store used now for upgrading legacy profile stores to newer versions +type ProfileStoreV0 struct { + fs FileStore + streamStores map[string]StreamStore // map [groupId|onion] StreamStore + directory string + password string + profile *model.Profile +} + +// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them +// directory should be $appDir/profiles/$rand +func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV0 { + os.Mkdir(directory, 0700) + ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, streamStores: map[string]StreamStore{}} + if profile != nil { + ps.save() + } + + return ps +} + +// ReadProfile reads a profile from storqage and returns the profile +// directory should be $appDir/profiles/$rand +func ReadProfile(directory, password string) (*model.Profile, error) { + os.Mkdir(directory, 0700) + ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, streamStores: map[string]StreamStore{}} + + err := ps.Load() + if err != nil { + return nil, err + } + + profile := ps.getProfileCopy(true) + + return profile, nil +} + +/********************************************************************************************/ + +// AddGroup For testing, adds a group to the profile (and startsa stream store) +func (ps *ProfileStoreV0) AddGroup(invite []byte, peer string) { + gid, err := ps.profile.ProcessInvite(string(invite), peer) + if err == nil { + ps.save() + group := ps.profile.Groups[gid] + ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password) + } +} + +// AddGroupMessage for testing, adds a group message +func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string) { + received, _ := time.Parse(time.RFC3339Nano, timeRecvied) + sent, _ := time.Parse(time.RFC3339Nano, timeSent) + message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: []byte("signature"), PreviousMessageSig: []byte("PreviousSignature")} + ss, exists := ps.streamStores[groupid] + if exists { + ss.Write(message) + } else { + fmt.Println("ERROR") + } +} + +// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers +func (ps *ProfileStoreV0) GetNewPeerMessage() *event.Event { + message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running") + return &message +} + +// Load instantiates a cwtchPeer from the file store +func (ps *ProfileStoreV0) Load() error { + decrypted, err := ps.fs.Read() + if err != nil { + return err + } + cp := new(model.Profile) + err = json.Unmarshal(decrypted, &cp) + if err == nil { + ps.profile = cp + + for gid, group := range cp.Groups { + ss := NewStreamStore(ps.directory, group.LocalID, ps.password) + + cp.Groups[gid].Timeline.SetMessages(ss.Read()) + ps.streamStores[group.GroupID] = ss + } + } + + return err +} + +func (ps *ProfileStoreV0) getProfileCopy(timeline bool) *model.Profile { + return ps.profile.GetCopy(timeline) +} + +// Shutdown saves the storage system +func (ps *ProfileStoreV0) Shutdown() { + ps.save() +} + +/************* Writing *************/ + +func (ps *ProfileStoreV0) save() error { + bytes, _ := json.Marshal(ps.profile) + return ps.fs.Write(bytes) +} diff --git a/storage/v0/profile_store_test.go b/storage/v0/profile_store_test.go new file mode 100644 index 0000000..3d3bc37 --- /dev/null +++ b/storage/v0/profile_store_test.go @@ -0,0 +1,70 @@ +// Known race issue with event bus channel closure + +package v0 + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "log" + "os" + "testing" + "time" +) + +const testProfileName = "Alice" +const testKey = "key" +const testVal = "value" +const testInitialMessage = "howdy" +const testMessage = "Hello from storage" + +// NewProfile creates a new profile for use in the profile store. +func NewProfile(name string) *model.Profile { + profile := model.GenerateNewProfile(name) + return profile +} + +func TestProfileStoreWriteRead(t *testing.T) { + log.Println("profile store test!") + os.RemoveAll(testingDir) + eventBus := event.NewEventManager() + profile := NewProfile(testProfileName) + ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) + + profile.SetAttribute(testKey, testVal) + + groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") + if err != nil { + t.Errorf("Creating group: %v\n", err) + } + if err != nil { + t.Errorf("Creating group invite: %v\n", err) + } + + ps1.AddGroup(invite, profile.Onion) + + ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage) + + ps1.Shutdown() + + ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil) + err = ps2.Load() + if err != nil { + t.Errorf("Error createing ProfileStoreV0: %v\n", err) + } + + profile = ps2.getProfileCopy(true) + if profile.Name != testProfileName { + t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name) + } + + v, _ := profile.GetAttribute(testKey) + if v != testVal { + t.Errorf("Profile attribute '%v' incorrect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v) + } + + group2 := ps2.getProfileCopy(true).Groups[groupid] + if group2 == nil { + t.Errorf("Group not loaded\n") + } + +} diff --git a/storage/v0/stream_store.go b/storage/v0/stream_store.go new file mode 100644 index 0000000..5e2895d --- /dev/null +++ b/storage/v0/stream_store.go @@ -0,0 +1,145 @@ +package v0 + +import ( + "cwtch.im/cwtch/model" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "io/ioutil" + "os" + "path" + "sync" +) + +const ( + fileStorePartitions = 16 + bytesPerFile = 15 * 1024 +) + +// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files +type streamStore struct { + password string + + storeDirectory string + filenameBase string + + lock sync.Mutex + + // Buffer is used just for current file to write to + messages []model.Message + bufferByteCount int +} + +// StreamStore provides a stream like interface to encrypted storage +type StreamStore interface { + Read() []model.Message + Write(m model.Message) +} + +// NewStreamStore returns an initialized StreamStore ready for reading and writing +func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) { + ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password} + os.Mkdir(ss.storeDirectory, 0700) + + ss.initBuffer() + + return ss +} + +// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file) +func (ss *streamStore) Read() (messages []model.Message) { + ss.lock.Lock() + defer ss.lock.Unlock() + + resp := []model.Message{} + + for i := fileStorePartitions - 1; i >= 0; i-- { + filename := fmt.Sprintf("%s.%d", ss.filenameBase, i) + + bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password) + if err != nil { + continue + } + + msgs := []model.Message{} + json.Unmarshal([]byte(bytes), &msgs) + resp = append(resp, msgs...) + } + + // 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without + for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ { + resp[i].Acknowledged = true + resp[i].ReceivedByServer = true + } + + return resp +} + +// ****** Writing *******/ + +func (ss *streamStore) WriteN(messages []model.Message) { + ss.lock.Lock() + defer ss.lock.Unlock() + + for _, m := range messages { + ss.updateBuffer(m) + + if ss.bufferByteCount > bytesPerFile { + ss.updateFile() + log.Debugf("rotating log file") + ss.rotateFileStore() + ss.initBuffer() + } + } +} + +// Write adds a GroupMessage to the store +func (ss *streamStore) Write(m model.Message) { + ss.lock.Lock() + defer ss.lock.Unlock() + ss.updateBuffer(m) + ss.updateFile() + + if ss.bufferByteCount > bytesPerFile { + log.Debugf("rotating log file") + ss.rotateFileStore() + ss.initBuffer() + } +} + +func (ss *streamStore) initBuffer() { + ss.messages = []model.Message{} + ss.bufferByteCount = 0 +} + +func (ss *streamStore) updateBuffer(m model.Message) { + ss.messages = append(ss.messages, m) + ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message) +} + +func (ss *streamStore) updateFile() error { + msgs, err := json.Marshal(ss.messages) + if err != nil { + log.Errorf("Failed to marshal group messages %v\n", err) + } + + // ENCRYPT + key, salt, _ := createKey(ss.password) + encryptedMsgs, err := encryptFileData(msgs, key) + if err != nil { + log.Errorf("Failed to encrypt messages: %v\n", err) + return err + } + encryptedMsgs = append(salt[:], encryptedMsgs...) + + ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700) + return nil +} + +func (ss *streamStore) rotateFileStore() { + os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1))) + + for i := fileStorePartitions - 2; i >= 0; i-- { + os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1))) + } +} diff --git a/storage/stream_store_test.go b/storage/v0/stream_store_test.go similarity index 98% rename from storage/stream_store_test.go rename to storage/v0/stream_store_test.go index 53c36a7..7632f36 100644 --- a/storage/stream_store_test.go +++ b/storage/v0/stream_store_test.go @@ -1,4 +1,4 @@ -package storage +package v0 import ( "cwtch.im/cwtch/model" diff --git a/storage/v1/file_enc.go b/storage/v1/file_enc.go new file mode 100644 index 0000000..592dfc1 --- /dev/null +++ b/storage/v1/file_enc.go @@ -0,0 +1,72 @@ +package v1 + +import ( + "crypto/rand" + "errors" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "golang.org/x/crypto/nacl/secretbox" + "golang.org/x/crypto/pbkdf2" + "golang.org/x/crypto/sha3" + "io" + "io/ioutil" + "path" +) + +// createKeySalt derives a key from a password: returns key, salt, err +func createKeySalt(password string) ([32]byte, [128]byte, error) { + var salt [128]byte + if _, err := io.ReadFull(rand.Reader, salt[:]); err != nil { + log.Errorf("Cannot read from random: %v\n", err) + return [32]byte{}, salt, err + } + dk := pbkdf2.Key([]byte(password), salt[:], 4096, 32, sha3.New512) + + var dkr [32]byte + copy(dkr[:], dk) + return dkr, salt, nil +} + +func createKey(password string, salt []byte) [32]byte { + dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512) + + var dkr [32]byte + copy(dkr[:], dk) + return dkr +} + +//encryptFileData encrypts the cwtchPeer via the specified key. +func encryptFileData(data []byte, key [32]byte) ([]byte, error) { + var nonce [24]byte + + if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { + log.Errorf("Cannot read from random: %v\n", err) + return nil, err + } + + encrypted := secretbox.Seal(nonce[:], data, &nonce, &key) + return encrypted, nil +} + +//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key. +func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) { + var decryptNonce [24]byte + copy(decryptNonce[:], ciphertext[:24]) + decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key) + if ok { + return decrypted, nil + } + return nil, errors.New("Failed to decrypt") +} + +// load instantiates a cwtchPeer from the file store +func readEncryptedFile(directory, filename string, key [32]byte) ([]byte, error) { + encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename)) + if err == nil { + data, err := decryptFile(encryptedbytes, key) + if err == nil { + return data, nil + } + return nil, err + } + return nil, err +} diff --git a/storage/v1/file_store.go b/storage/v1/file_store.go new file mode 100644 index 0000000..2c3e28d --- /dev/null +++ b/storage/v1/file_store.go @@ -0,0 +1,58 @@ +package v1 + +import ( + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "io/ioutil" + "os" + "path" +) + +// fileStore stores a cwtchPeer in an encrypted file +type fileStore struct { + directory string + filename string + key [32]byte +} + +// FileStore is a primitive around storing encrypted files +type FileStore interface { + Write([]byte) error + Read() ([]byte, error) + Delete() + ChangeKey(newkey [32]byte) +} + +// NewFileStore instantiates a fileStore given a filename and a password +func NewFileStore(directory string, filename string, key [32]byte) FileStore { + filestore := new(fileStore) + filestore.key = key + filestore.filename = filename + filestore.directory = directory + return filestore +} + +// write serializes a cwtchPeer to a file +func (fps *fileStore) Write(data []byte) error { + encryptedbytes, err := encryptFileData(data, fps.key) + if err != nil { + return err + } + + err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600) + return err +} + +func (fps *fileStore) Read() ([]byte, error) { + return readEncryptedFile(fps.directory, fps.filename, fps.key) +} + +func (fps *fileStore) Delete() { + err := os.Remove(path.Join(fps.directory, fps.filename)) + if err != nil { + log.Errorf("Deleting file %v\n", err) + } +} + +func (fps *fileStore) ChangeKey(newkey [32]byte) { + fps.key = newkey +} diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go new file mode 100644 index 0000000..1d9006c --- /dev/null +++ b/storage/v1/profile_store.go @@ -0,0 +1,406 @@ +package v1 + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "encoding/json" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "io/ioutil" + "os" + "path" + "time" +) + +const groupIDLen = 32 +const peerIDLen = 56 +const profileFilename = "profile" +const version = "1" +const versionFile = "VERSION" +const saltFile = "SALT" + +//ProfileStoreV1 storage for profiles and message streams that uses in memory key and fs stored salt instead of in memory password +type ProfileStoreV1 struct { + fs FileStore + streamStores map[string]StreamStore // map [groupId|onion] StreamStore + directory string + profile *model.Profile + key [32]byte + salt [128]byte + eventManager event.Manager + queue event.Queue + writer bool +} + +func initV1Directory(directory, password string) ([32]byte, [128]byte, error) { + key, salt, err := createKeySalt(password) + if err != nil { + log.Errorf("Could not create key for profile store from password: %v\n", err) + return [32]byte{}, [128]byte{}, err + } + + ioutil.WriteFile(path.Join(directory, versionFile), []byte(version), 0600) + ioutil.WriteFile(path.Join(directory, saltFile), salt[:], 0600) + + return key, salt, nil +} + +// CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them +// directory should be $appDir/profiles/$rand +func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV1 { + os.Mkdir(directory, 0700) + + key, salt, err := initV1Directory(directory, password) + if err != nil { + return nil + } + + ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} + ps.save() + + ps.initProfileWriterStore() + + return ps +} + +func (ps *ProfileStoreV1) initProfileWriterStore() { + ps.queue = event.NewQueue() + go ps.eventHandler() + + ps.eventManager.Subscribe(event.BlockPeer, ps.queue) + ps.eventManager.Subscribe(event.UnblockPeer, ps.queue) + ps.eventManager.Subscribe(event.PeerCreated, ps.queue) + ps.eventManager.Subscribe(event.GroupCreated, ps.queue) + ps.eventManager.Subscribe(event.SetProfileName, ps.queue) + ps.eventManager.Subscribe(event.SetAttribute, ps.queue) + ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue) + ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue) + ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue) + ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue) + ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) + ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) + ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) + ps.eventManager.Subscribe(event.DeleteContact, ps.queue) + ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) + ps.eventManager.Subscribe(event.ChangePassword, ps.queue) +} + +// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them +// directory should be $appDir/profiles/$rand +func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (*ProfileStoreV1, error) { + salt, err := ioutil.ReadFile(path.Join(directory, saltFile)) + if err != nil { + return nil, err + } + + key := createKey(password, salt) + + ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} + copy(ps.salt[:], salt) + + ps.initProfileWriterStore() + + ps.load() + + return ps, nil +} + +// ReadProfile reads a profile from storqage and returns the profile +// directory should be $appDir/profiles/$rand +func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) { + os.Mkdir(directory, 0700) + ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} + + err := ps.load() + if err != nil { + return nil, err + } + + profile := ps.GetProfileCopy(true) + + return profile, nil +} + +// UpgradeV0Profile takes a profile (presumably from a V0 store) and creates and writes a V1 store +func UpgradeV0Profile(profile *model.Profile, directory, password string) error { + key, salt, err := initV1Directory(directory, password) + if err != nil { + return err + } + + ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} + ps.save() + + for gid, group := range ps.profile.Groups { + ss := NewStreamStore(ps.directory, group.LocalID, ps.key) + ss.WriteN(ps.profile.Groups[gid].Timeline.Messages) + } + + return nil +} + +// NewProfile creates a new profile for use in the profile store. +func NewProfile(name string) *model.Profile { + profile := model.GenerateNewProfile(name) + return profile +} + +// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers +func (ps *ProfileStoreV1) GetNewPeerMessage() *event.Event { + message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Key, string(ps.key[:]), event.Salt, string(ps.salt[:])) + return &message +} + +// GetStatusMessages creates an array of status messages for all peers and group servers from current information +func (ps *ProfileStoreV1) GetStatusMessages() []*event.Event { + messages := []*event.Event{} + for _, contact := range ps.profile.Contacts { + message := event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(contact.Onion), + event.ConnectionState: contact.State, + }) + messages = append(messages, &message) + } + + doneServers := make(map[string]bool) + for _, group := range ps.profile.Groups { + if _, exists := doneServers[group.GroupServer]; !exists { + message := event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: string(group.GroupServer), + event.ConnectionState: group.State, + }) + messages = append(messages, &message) + doneServers[group.GroupServer] = true + } + } + + return messages +} + +// ChangePassword restores all data under a new password's encryption +func (ps *ProfileStoreV1) ChangePassword(oldpass, newpass, eventID string) { + oldkey := createKey(oldpass, ps.salt[:]) + + if oldkey != ps.key { + ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID)) + return + } + + newkey := createKey(newpass, ps.salt[:]) + + newStreamStores := map[string]StreamStore{} + idToNewLocalID := map[string]string{} + + // Generate all new StreamStores with the new password and write all the old StreamStore data into these ones + for ssid, ss := range ps.streamStores { + // New ss with new pass and new localID + newlocalID := model.GenerateRandomID() + idToNewLocalID[ssid] = newlocalID + + newSS := NewStreamStore(ps.directory, newlocalID, newkey) + newStreamStores[ssid] = newSS + + // write whole store + messages := ss.Read() + newSS.WriteN(messages) + } + + // Switch over + oldStreamStores := ps.streamStores + ps.streamStores = newStreamStores + for ssid, newLocalID := range idToNewLocalID { + if len(ssid) == groupIDLen { + ps.profile.Groups[ssid].LocalID = newLocalID + } else { + ps.profile.Contacts[ssid].LocalID = newLocalID + } + } + + ps.key = newkey + ps.fs.ChangeKey(newkey) + ps.save() + + // Clean up + for _, oldss := range oldStreamStores { + oldss.Delete() + } + + ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID)) + return +} + +func (ps *ProfileStoreV1) save() error { + if ps.writer { + bytes, _ := json.Marshal(ps.profile) + return ps.fs.Write(bytes) + } + return nil +} + +// load instantiates a cwtchPeer from the file store +func (ps *ProfileStoreV1) load() error { + decrypted, err := ps.fs.Read() + if err != nil { + return err + } + cp := new(model.Profile) + err = json.Unmarshal(decrypted, &cp) + if err == nil { + ps.profile = cp + + for gid, group := range cp.Groups { + ss := NewStreamStore(ps.directory, group.LocalID, ps.key) + + cp.Groups[gid].Timeline.SetMessages(ss.Read()) + ps.streamStores[group.GroupID] = ss + } + } + + return err +} + +// GetProfileCopy returns a copy of the stored profile +func (ps *ProfileStoreV1) GetProfileCopy(timeline bool) *model.Profile { + return ps.profile.GetCopy(timeline) +} + +func (ps *ProfileStoreV1) eventHandler() { + log.Debugln("eventHandler()!") + for { + ev := ps.queue.Next() + log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID) + + switch ev.EventType { + case event.BlockPeer: + contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) + if exists { + contact.Blocked = true + ps.save() + } + case event.UnblockPeer: + contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) + if exists { + contact.Blocked = false + ps.save() + } + case event.PeerCreated: + var pp *model.PublicProfile + json.Unmarshal([]byte(ev.Data[event.Data]), &pp) + ps.profile.AddContact(ev.Data[event.RemotePeer], pp) + // TODO: configure - allow peers to be configured to turn on limited storage + /*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password) + pp.Timeline.SetMessages(ss.Read()) + ps.streamStores[pp.Onion] = ss + ps.save()*/ + case event.GroupCreated: + var group *model.Group + json.Unmarshal([]byte(ev.Data[event.Data]), &group) + ps.profile.AddGroup(group) + ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key) + ps.save() + case event.SetProfileName: + ps.profile.Name = ev.Data[event.ProfileName] + ps.profile.SetAttribute("name", ev.Data[event.ProfileName]) + ps.save() + case event.SetAttribute: + ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) + ps.save() + case event.SetPeerAttribute: + contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) + if exists { + contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) + ps.save() + } else { + log.Errorf("error setting attribute on peer %v peer does not exist", ev) + } + case event.SetGroupAttribute: + group := ps.profile.GetGroup(ev.Data[event.GroupID]) + if group != nil { + group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) + ps.save() + } else { + log.Errorf("error setting attribute on group %v group does not exist", ev) + } + case event.AcceptGroupInvite: + err := ps.profile.AcceptInvite(ev.Data[event.GroupID]) + if err == nil { + ps.save() + } else { + log.Errorf("error accepting group invite") + } + case event.NewGroupInvite: + gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer]) + log.Errorf("gid: %v err:%v\n", gid, err) + if err == nil { + ps.save() + group := ps.profile.Groups[gid] + ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key) + } else { + log.Errorf("error storing new group invite: %v (%v)", err, ev) + } + case event.NewMessageFromGroup: + groupid := ev.Data[event.GroupID] + received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) + sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) + message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])} + ss, exists := ps.streamStores[groupid] + if exists { + ss.Write(message) + } else { + log.Errorf("error storing new group message: %v stream store does not exist", ev) + } + case event.PeerStateChange: + if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists { + ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] + } + case event.ServerStateChange: + for _, group := range ps.profile.Groups { + if group.GroupServer == ev.Data[event.GroupServer] { + group.State = ev.Data[event.ConnectionState] + } + } + case event.DeleteContact: + onion := ev.Data[event.RemotePeer] + ps.profile.DeleteContact(onion) + ps.save() + case event.DeleteGroup: + groupID := ev.Data[event.GroupID] + ps.profile.DeleteGroup(groupID) + ps.save() + ss, exists := ps.streamStores[groupID] + if exists { + ss.Delete() + delete(ps.streamStores, groupID) + } + case event.ChangePassword: + oldpass := ev.Data[event.Password] + newpass := ev.Data[event.NewPassword] + ps.ChangePassword(oldpass, newpass, ev.EventID) + default: + return + } + + } +} + +// Shutdown shuts down the queue / thread +func (ps *ProfileStoreV1) Shutdown() { + if ps.queue != nil { + ps.queue.Shutdown() + } +} + +// Delete removes all stored files for this stored profile +func (ps *ProfileStoreV1) Delete() { + log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion) + + for _, ss := range ps.streamStores { + ss.Delete() + } + + ps.fs.Delete() + + err := os.RemoveAll(ps.directory) + if err != nil { + log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err) + } +} diff --git a/storage/v1/profile_store_test.go b/storage/v1/profile_store_test.go new file mode 100644 index 0000000..7b0175a --- /dev/null +++ b/storage/v1/profile_store_test.go @@ -0,0 +1,150 @@ +// Known race issue with event bus channel closure + +package v1 + +import ( + "cwtch.im/cwtch/event" + "fmt" + "log" + "os" + "testing" + "time" +) + +const testProfileName = "Alice" +const testKey = "key" +const testVal = "value" +const testInitialMessage = "howdy" +const testMessage = "Hello from storage" + +func TestProfileStoreWriteRead(t *testing.T) { + log.Println("profile store test!") + os.RemoveAll(testingDir) + eventBus := event.NewEventManager() + profile := NewProfile(testProfileName) + ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile) + + eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal})) + time.Sleep(1 * time.Second) + + groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") + if err != nil { + t.Errorf("Creating group: %v\n", err) + } + if err != nil { + t.Errorf("Creating group invite: %v\n", err) + } + + eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) + time.Sleep(1 * time.Second) + + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: ps1.GetProfileCopy(true).Onion, + event.Data: testMessage, + })) + time.Sleep(1 * time.Second) + + ps1.Shutdown() + + ps2, err := LoadProfileWriterStore(eventBus, testingDir, password) + if err != nil { + t.Errorf("Error createing ProfileStoreV1: %v\n", err) + } + + profile = ps2.GetProfileCopy(true) + if profile.Name != testProfileName { + t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name) + } + + v, _ := profile.GetAttribute(testKey) + if v != testVal { + t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v) + } + + group2 := ps2.GetProfileCopy(true).Groups[groupid] + if group2 == nil { + t.Errorf("Group not loaded\n") + } + +} + +func TestProfileStoreChangePassword(t *testing.T) { + os.RemoveAll(testingDir) + eventBus := event.NewEventManager() + + queue := event.NewQueue() + eventBus.Subscribe(event.ChangePasswordSuccess, queue) + + profile := NewProfile(testProfileName) + ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile) + + groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") + if err != nil { + t.Errorf("Creating group: %v\n", err) + } + if err != nil { + t.Errorf("Creating group invite: %v\n", err) + } + + eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) + time.Sleep(1 * time.Second) + + fmt.Println("Sending 200 messages...") + + for i := 0; i < 200; i++ { + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: profile.Onion, + event.Data: testMessage, + })) + } + + newPass := "qwerty123" + + fmt.Println("Sending Change Passwords event...") + eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass)) + + ev := queue.Next() + if ev.EventType != event.ChangePasswordSuccess { + t.Errorf("Unexpected event response detected %v\n", ev.EventType) + return + } + + fmt.Println("Sending 10 more messages...") + for i := 0; i < 10; i++ { + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: profile.Onion, + event.Data: testMessage, + })) + } + time.Sleep(3 * time.Second) + + fmt.Println("Shutdown profile store...") + ps1.Shutdown() + + fmt.Println("New Profile store...") + ps2, err := LoadProfileWriterStore(eventBus, testingDir, newPass) + if err != nil { + t.Errorf("Error createing new ProfileStoreV1 with new password: %v\n", err) + return + } + + profile2 := ps2.GetProfileCopy(true) + + if profile2.Groups[groupid] == nil { + t.Errorf("Failed to load group %v\n", groupid) + return + } + + if len(profile2.Groups[groupid].Timeline.Messages) != 210 { + t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages)) + } +} diff --git a/storage/stream_store.go b/storage/v1/stream_store.go similarity index 81% rename from storage/stream_store.go rename to storage/v1/stream_store.go index 4f55605..96a0f98 100644 --- a/storage/stream_store.go +++ b/storage/v1/stream_store.go @@ -1,4 +1,4 @@ -package storage +package v1 import ( "cwtch.im/cwtch/model" @@ -18,7 +18,7 @@ const ( // streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files type streamStore struct { - password string + key [32]byte storeDirectory string filenameBase string @@ -36,12 +36,11 @@ type StreamStore interface { WriteN(messages []model.Message) Read() []model.Message Delete() - ChangePassword(newpass string) } // NewStreamStore returns an initialized StreamStore ready for reading and writing -func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) { - ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password} +func NewStreamStore(directory string, filenameBase string, key [32]byte) (store StreamStore) { + ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, key: key} os.Mkdir(ss.storeDirectory, 0700) ss.initBuffer() @@ -58,7 +57,7 @@ func (ss *streamStore) initBuffer() { func (ss *streamStore) initBufferFromStorage() error { filename := fmt.Sprintf("%s.%d", ss.filenameBase, 0) - bytes, _ := readEncryptedFile(ss.storeDirectory, filename, ss.password) + bytes, _ := readEncryptedFile(ss.storeDirectory, filename, ss.key) msgs := []model.Message{} err := json.Unmarshal([]byte(bytes), &msgs) @@ -84,13 +83,11 @@ func (ss *streamStore) updateFile() error { } // ENCRYPT - key, salt, _ := createKey(ss.password) - encryptedMsgs, err := encryptFileData(msgs, key) + encryptedMsgs, err := encryptFileData(msgs, ss.key) if err != nil { log.Errorf("Failed to encrypt messages: %v\n", err) return err } - encryptedMsgs = append(salt[:], encryptedMsgs...) ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700) return nil @@ -121,7 +118,7 @@ func (ss *streamStore) Read() (messages []model.Message) { for i := fileStorePartitions - 1; i >= 0; i-- { filename := fmt.Sprintf("%s.%d", ss.filenameBase, i) - bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password) + bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.key) if err != nil { continue } @@ -131,12 +128,6 @@ func (ss *streamStore) Read() (messages []model.Message) { resp = append(resp, msgs...) } - // 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without - for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ { - resp[i].Acknowledged = true - resp[i].ReceivedByServer = true - } - return resp } @@ -158,16 +149,17 @@ func (ss *streamStore) WriteN(messages []model.Message) { ss.lock.Lock() defer ss.lock.Unlock() + log.Infof("WriteN %v messages\n", len(messages)) + i := 0 for _, m := range messages { ss.updateBuffer(m) + i++ if ss.bufferByteCount > bytesPerFile { ss.updateFile() - log.Debugf("rotating log file") ss.rotateFileStore() ss.initBuffer() } } + ss.updateFile() } - -func (ss *streamStore) ChangePassword(newpass string) {} diff --git a/storage/v1/stream_store_test.go b/storage/v1/stream_store_test.go new file mode 100644 index 0000000..b2f73b9 --- /dev/null +++ b/storage/v1/stream_store_test.go @@ -0,0 +1,52 @@ +package v1 + +import ( + "cwtch.im/cwtch/model" + "os" + "testing" +) + +const testingDir = "./testing" +const filenameBase = "testStream" +const password = "asdfqwer" +const line1 = "Hello from storage!" + +func TestStreamStoreWriteRead(t *testing.T) { + os.Remove(".test.json") + os.RemoveAll(testingDir) + os.Mkdir(testingDir, 0777) + key, _, _ := createKeySalt(password) + ss1 := NewStreamStore(testingDir, filenameBase, key) + m := model.Message{Message: line1} + ss1.Write(m) + + ss2 := NewStreamStore(testingDir, filenameBase, key) + messages := ss2.Read() + if len(messages) != 1 { + t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages)) + } + if messages[0].Message != line1 { + t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message) + } +} + +func TestStreamStoreWriteReadRotate(t *testing.T) { + os.Remove(".test.json") + os.RemoveAll(testingDir) + os.Mkdir(testingDir, 0777) + key, _, _ := createKeySalt(password) + ss1 := NewStreamStore(testingDir, filenameBase, key) + m := model.Message{Message: line1} + for i := 0; i < 400; i++ { + ss1.Write(m) + } + + ss2 := NewStreamStore(testingDir, filenameBase, key) + messages := ss2.Read() + if len(messages) != 400 { + t.Errorf("Read messages has wrong length. Expected: 400 Actual: %d\n", len(messages)) + } + if messages[0].Message != line1 { + t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message) + } +} diff --git a/testing/tests.sh b/testing/tests.sh index a5d571b..f7e704a 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -5,6 +5,8 @@ pwd GORACE="haltonerror=1" go test -race ${1} -coverprofile=model.cover.out -v ./model go test -race ${1} -coverprofile=event.cover.out -v ./event +go test -race ${1} -coverprofile=storage.v0.cover.out -v ./storage/v0 +go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1 go test -race ${1} -coverprofile=storage.cover.out -v ./storage go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections go test -race ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam