From 1cc60bdfdd6fef74907eabf88d449c77f0605472 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Tue, 10 Dec 2019 15:45:43 -0800 Subject: [PATCH 1/3] add app delete profile and tag profile --- app/app.go | 49 ++++++++++++++++++++++++++++++++++++++-- app/appClient.go | 25 +++++++++++++++++++- app/appService.go | 35 ++++++++++++++++++++++++++-- event/common.go | 5 +++- storage/file_store.go | 11 ++++++++- storage/profile_store.go | 16 +++++++++++++ 6 files changed, 134 insertions(+), 7 deletions(-) diff --git a/app/app.go b/app/app.go index 4984770..7245529 100644 --- a/app/app.go +++ b/app/app.go @@ -18,6 +18,8 @@ import ( "sync" ) +const AttributeTag = "tag" + type applicationCore struct { eventBuses map[string]event.Manager @@ -39,6 +41,8 @@ type application struct { type Application interface { LoadProfiles(password string) CreatePeer(name string, password string) + CreateTaggedPeer(name string, password string, tag string) + DeletePeer(onion string) AddPeerPlugin(onion string, pluginID plugins.PluginID) LaunchPeers() @@ -92,8 +96,15 @@ func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) { return profile, nil } -// CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine) -func (app *application) CreatePeer(name string, password string) { +func (ac *applicationCore) DeletePeer(onion string) { + ac.mutex.Lock() + defer ac.mutex.Unlock() + + ac.eventBuses[onion].Shutdown() + delete(ac.eventBuses, onion) +} + +func (app *application) CreateTaggedPeer(name string, password string, tag string) { profile, err := app.applicationCore.CreatePeer(name) if err != nil { app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) @@ -115,9 +126,43 @@ func (app *application) CreatePeer(name string, password string) { app.peers[profile.Onion] = peer app.engines[profile.Onion] = engine + if tag != "" { + peer.SetAttribute(AttributeTag, tag) + } + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) } + +// CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine) +func (app *application) CreatePeer(name string, password string) { + app.CreateTaggedPeer(name, password, "") +} + +func (app *application) DeletePeer(onion string) { + log.Infof("DeletePeer called on %v\n", onion) + app.mutex.Lock() + + app.appletPlugins.ShutdownPeer(onion) + app.plugins.Delete(onion) + + app.peers[onion].Shutdown() + delete(app.peers, onion) + + app.engines[onion].Shutdown() + delete(app.engines, onion) + + app.storage[onion].Shutdown() + app.storage[onion].Delete() + delete(app.storage, onion) + + app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) + app.mutex.Unlock() + + app.applicationCore.DeletePeer(onion) + log.Debugf("Delete peer for %v Done\n", onion) +} + func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) } diff --git a/app/appClient.go b/app/appClient.go index 1117afc..800ba0b 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -43,6 +43,9 @@ func (ac *applicationClient) handleEvent(ev *event.Event) { password := ev.Data[event.Password] reload := ev.Data[event.Status] == "running" ac.newPeer(localID, password, reload) + case event.DeletePeer: + onion := ev.Data[event.Identity] + ac.handleDeletedPeer(onion) case event.PeerError: ac.appBus.Publish(*ev) case event.AppError: @@ -90,11 +93,31 @@ func (ac *applicationClient) newPeer(localID, password string, reload bool) { // CreatePeer messages the service to create a new Peer with the given name func (ac *applicationClient) CreatePeer(name string, password string) { + ac.CreateTaggedPeer(name, password, "") +} + +func (ac *applicationClient) CreateTaggedPeer(name, password, tag string) { log.Infof("appClient CreatePeer %v\n", name) - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.CreatePeer, map[event.Field]string{event.ProfileName: name, event.Password: password})} + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.CreatePeer, map[event.Field]string{event.ProfileName: name, event.Password: password, event.Data: tag})} ac.bridge.Write(&message) } +// DeletePeer messages tehe service to delete a peer +func (ac *applicationClient) DeletePeer(onion string) { + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.DeletePeer, map[event.Field]string{event.Identity: onion})} + ac.bridge.Write(&message) +} + +func (ac *applicationClient) handleDeletedPeer(onion string) { + ac.mutex.Lock() + ac.peers[onion].Shutdown() + delete(ac.peers, onion) + ac.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) + ac.mutex.Unlock() + + ac.applicationCore.DeletePeer(onion) +} + func (ac *applicationClient) AddPeerPlugin(onion string, pluginID plugins.PluginID) { message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.AddPeerPlugin, map[event.Field]string{event.Identity: onion, event.Data: strconv.Itoa(int(pluginID))})} ac.bridge.Write(&message) diff --git a/app/appService.go b/app/appService.go index 03539e8..576d46b 100644 --- a/app/appService.go +++ b/app/appService.go @@ -46,7 +46,14 @@ func (as *applicationService) handleEvent(ev *event.Event) { case event.CreatePeer: profileName := ev.Data[event.ProfileName] password := ev.Data[event.Password] - as.createPeer(profileName, password) + tag := ev.Data[event.Data] + as.createPeer(profileName, password, tag) + case event.DeletePeer: + onion := ev.Data[event.Identity] + as.deletePeer(onion) + + message := event.IPCMessage{Dest: DestApp, Message: *ev} + as.bridge.Write(&message) case event.AddPeerPlugin: onion := ev.Data[event.Identity] pluginID, _ := strconv.Atoi(ev.Data[event.Data]) @@ -79,7 +86,7 @@ func (as *applicationService) handleEvent(ev *event.Event) { } } -func (as *applicationService) createPeer(name, password string) { +func (as *applicationService) createPeer(name, password, tag string) { log.Infof("app Service create peer %v %v\n", name, password) profile, err := as.applicationCore.CreatePeer(name) as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion]) @@ -90,6 +97,10 @@ func (as *applicationService) createPeer(name, password string) { return } + if tag != "" { + profile.SetAttribute(AttributeTag, tag) + } + profileStore := storage.NewProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile) blockedPeers := profile.BlockedPeers() @@ -136,6 +147,26 @@ func (as *applicationService) getACNStatusHandler() func(int, string) { } } +func (as *applicationService) deletePeer(onion string) { + as.mutex.Lock() + + as.appletPlugins.ShutdownPeer(onion) + as.plugins.Delete(onion) + + + + as.engines[onion].Shutdown() + delete(as.engines, onion) + + as.storage[onion].Shutdown() + as.storage[onion].Delete() + delete(as.storage, onion) + + as.mutex.Unlock() + + as.applicationCore.DeletePeer(onion) +} + func (as *applicationService) ShutdownPeer(onion string) { as.engines[onion].Shutdown() delete(as.engines, onion) diff --git a/event/common.go b/event/common.go index 443062a..6d0a475 100644 --- a/event/common.go +++ b/event/common.go @@ -129,13 +129,16 @@ const ( /***** Application client / service messages *****/ - // ProfileName, Password + // ProfileName, Password, Data(tag) CreatePeer = Type("CreatePeer") // service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')] // app -> Identity(onion) NewPeer = Type("NewPeer") + // Identity(onion) + DeletePeer = Type("DeletePeer") + // Identity(onion), Data(pluginID) AddPeerPlugin = Type("AddPeerPlugin") diff --git a/storage/file_store.go b/storage/file_store.go index 7398ca5..ee802c6 100644 --- a/storage/file_store.go +++ b/storage/file_store.go @@ -2,6 +2,8 @@ package storage import ( "io/ioutil" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "os" "path" ) @@ -16,6 +18,7 @@ type fileStore struct { type FileStore interface { Save([]byte) error Load() ([]byte, error) + Delete() } // NewFileStore instantiates a fileStore given a filename and a password @@ -39,9 +42,15 @@ func (fps *fileStore) Save(data []byte) error { encryptedbytes = append(salt[:], encryptedbytes...) err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600) return err - } func (fps *fileStore) Load() ([]byte, error) { return readEncryptedFile(fps.directory, fps.filename, fps.password) } + +func (fps *fileStore) Delete() { + err := os.Remove(path.Join(fps.directory, fps.filename)) + if err != nil { + log.Errorf("Deleting file %v\n", err) + } +} \ No newline at end of file diff --git a/storage/profile_store.go b/storage/profile_store.go index a7954f8..27288ba 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -26,6 +26,7 @@ type profileStore struct { type ProfileStore interface { Load() error Shutdown() + Delete() GetProfileCopy(timeline bool) *model.Profile GetNewPeerMessage() *event.Event GetStatusMessages() []*event.Event @@ -265,3 +266,18 @@ func (ps *profileStore) Shutdown() { ps.queue.Shutdown() } } + +func (ps *profileStore) Delete() { + log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion) + + for _, ss := range ps.streamStores { + ss.Delete() + } + + ps.fs.Delete() + + err := os.RemoveAll(ps.directory) + if err != nil { + log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err) + } +} From 4ecc7c0f2b53c24aab4eac859d4a62c2b1a31ee6 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Thu, 12 Dec 2019 12:21:14 -0800 Subject: [PATCH 2/3] Change password on a peer and it's storage --- app/app.go | 40 +++++++++-------- app/appClient.go | 5 +++ app/appService.go | 2 - app/cwtchutil/main.go | 2 +- event/common.go | 11 +++++ model/group.go | 2 +- model/profile.go | 7 +-- storage/file_store.go | 19 +++++--- storage/profile_store.go | 66 ++++++++++++++++++++++++++-- storage/profile_store_test.go | 82 +++++++++++++++++++++++++++++++++++ storage/stream_store.go | 22 +++++++++- storage/stream_store_test.go | 6 --- 12 files changed, 222 insertions(+), 42 deletions(-) diff --git a/app/app.go b/app/app.go index 7245529..d31bf11 100644 --- a/app/app.go +++ b/app/app.go @@ -18,13 +18,14 @@ import ( "sync" ) +// AttributeTag is a const name for a peer attribute that can be set at creation time, for example for versioning info const AttributeTag = "tag" type applicationCore struct { eventBuses map[string]event.Manager directory string - mutex sync.Mutex + acmutex sync.Mutex } type application struct { @@ -32,9 +33,10 @@ type application struct { appletPeers appletACN appletPlugins - storage map[string]storage.ProfileStore - engines map[string]connections.Engine - appBus event.Manager + storage map[string]storage.ProfileStore + engines map[string]connections.Engine + appBus event.Manager + appmutex sync.Mutex } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers @@ -44,6 +46,7 @@ type Application interface { CreateTaggedPeer(name string, password string, tag string) DeletePeer(onion string) AddPeerPlugin(onion string, pluginID plugins.PluginID) + ChangePeerPassword(onion, oldpass, newpass string) LaunchPeers() GetPrimaryBus() event.Manager @@ -82,8 +85,8 @@ func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) { profile := storage.NewProfile(name) - ac.mutex.Lock() - defer ac.mutex.Unlock() + ac.acmutex.Lock() + defer ac.acmutex.Unlock() _, exists := ac.eventBuses[profile.Onion] if exists { @@ -97,8 +100,8 @@ func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) { } func (ac *applicationCore) DeletePeer(onion string) { - ac.mutex.Lock() - defer ac.mutex.Unlock() + ac.acmutex.Lock() + defer ac.acmutex.Unlock() ac.eventBuses[onion].Shutdown() delete(ac.eventBuses, onion) @@ -133,7 +136,6 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) } - // CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine) func (app *application) CreatePeer(name string, password string) { app.CreateTaggedPeer(name, password, "") @@ -141,7 +143,8 @@ func (app *application) CreatePeer(name string, password string) { func (app *application) DeletePeer(onion string) { log.Infof("DeletePeer called on %v\n", onion) - app.mutex.Lock() + app.appmutex.Lock() + defer app.appmutex.Lock() app.appletPlugins.ShutdownPeer(onion) app.plugins.Delete(onion) @@ -157,12 +160,15 @@ func (app *application) DeletePeer(onion string) { delete(app.storage, onion) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - app.mutex.Unlock() app.applicationCore.DeletePeer(onion) log.Debugf("Delete peer for %v Done\n", onion) } +func (app *application) ChangePeerPassword(onion, oldpass, newpass string) { + app.eventBuses[onion].Publish(event.NewEventList(event.ChangePassword, event.Password, oldpass, event.NewPassword, newpass)) +} + func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) } @@ -192,9 +198,9 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf continue } - ac.mutex.Lock() + ac.acmutex.Lock() ac.eventBuses[profile.Onion] = eventBus - ac.mutex.Unlock() + ac.acmutex.Unlock() loadProfileFn(profile, profileStore) } @@ -211,11 +217,11 @@ func (app *application) LoadProfiles(password string) { blockedPeers := profile.BlockedPeers() identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], profile.GetContacts(), blockedPeers) - app.mutex.Lock() + app.appmutex.Lock() app.peers[profile.Onion] = peer app.storage[profile.Onion] = profileStore app.engines[profile.Onion] = engine - app.mutex.Unlock() + app.appmutex.Unlock() app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) count++ }) @@ -255,8 +261,8 @@ func (app *application) QueryACNStatus() { // ShutdownPeer shuts down a peer and removes it from the app's management func (app *application) ShutdownPeer(onion string) { - app.mutex.Lock() - defer app.mutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() app.eventBuses[onion].Shutdown() delete(app.eventBuses, onion) app.peers[onion].Shutdown() diff --git a/app/appClient.go b/app/appClient.go index 800ba0b..0d76eab 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -108,6 +108,11 @@ func (ac *applicationClient) DeletePeer(onion string) { ac.bridge.Write(&message) } +func (ac *applicationClient) ChangePeerPassword(onion, oldpass, newpass string) { + message := event.IPCMessage{Dest: onion, Message: event.NewEventList(event.ChangePassword, event.Password, oldpass, event.NewPassword, newpass)} + ac.bridge.Write(&message) +} + func (ac *applicationClient) handleDeletedPeer(onion string) { ac.mutex.Lock() ac.peers[onion].Shutdown() diff --git a/app/appService.go b/app/appService.go index 576d46b..328756e 100644 --- a/app/appService.go +++ b/app/appService.go @@ -153,8 +153,6 @@ func (as *applicationService) deletePeer(onion string) { as.appletPlugins.ShutdownPeer(onion) as.plugins.Delete(onion) - - as.engines[onion].Shutdown() delete(as.engines, onion) diff --git a/app/cwtchutil/main.go b/app/cwtchutil/main.go index 39343a2..08a13df 100644 --- a/app/cwtchutil/main.go +++ b/app/cwtchutil/main.go @@ -131,7 +131,7 @@ func main() { profileStore, _ := storage.NewProfileStore(nil, os.Args[2], pw) - err = profileStore.Load() + err = profileStore.Read() if err != nil { log.Errorln(err) os.Exit(1) diff --git a/event/common.go b/event/common.go index 6d0a475..688b2cb 100644 --- a/event/common.go +++ b/event/common.go @@ -82,6 +82,15 @@ const ( // RemotePeer [eg ""] PeerCreated = Type("PeerCreated") + // Password, NewPassword + ChangePassword = Type("ChangePassword") + + // Error(err), EventID + ChangePasswordError = Type("ChangePasswordError") + + // EventID + ChangePasswordSuccess = Type("ChangePasswordSuccess") + // a group has been successfully added or newly created // attributes: // Data [serialized *model.Group] @@ -159,6 +168,7 @@ const ( Shutdown = Type("Shutdown") // Error(err) + // Error creating peer PeerError = Type("PeerError") // Error(err) @@ -200,6 +210,7 @@ const ( ProfileName = Field("ProfileName") Password = Field("Password") + NewPassword = Field("NewPassword") ConnectionState = Field("ConnectionState") diff --git a/model/group.go b/model/group.go index d2444a5..b0cd5aa 100644 --- a/model/group.go +++ b/model/group.go @@ -36,7 +36,7 @@ type Group struct { // NewGroup initializes a new group associated with a given CwtchServer func NewGroup(server string) (*Group, error) { group := new(Group) - group.LocalID = generateRandomID() + group.LocalID = GenerateRandomID() if utils.IsValidHostname(server) == false { return nil, errors.New("Server is not a valid v3 onion") diff --git a/model/profile.go b/model/profile.go index 8460c69..d631012 100644 --- a/model/profile.go +++ b/model/profile.go @@ -44,7 +44,8 @@ type Profile struct { // TODO: Should this be per server? const MaxGroupMessageLength = 1800 -func generateRandomID() string { +// GenerateRandomID generates a random 16 byte hex id code +func GenerateRandomID() string { randBytes := make([]byte, 16) rand.Read(randBytes) return filepath.Join(hex.EncodeToString(randBytes)) @@ -55,7 +56,7 @@ func (p *PublicProfile) init() { p.Attributes = make(map[string]string) } p.unacknowledgedMessages = make(map[string]Message) - p.LocalID = generateRandomID() + p.LocalID = GenerateRandomID() } // SetAttribute allows applications to store arbitrary configuration info at the profile level. @@ -371,7 +372,7 @@ func (p *Profile) ProcessInvite(invite string, peerHostname string) (string, err if err == nil { group := new(Group) group.GroupID = gci.GetGroupName() - group.LocalID = generateRandomID() + group.LocalID = GenerateRandomID() group.SignedGroupID = gci.GetSignedGroupId() copy(group.GroupKey[:], gci.GetGroupSharedKey()[:]) group.GroupServer = gci.GetServerHost() diff --git a/storage/file_store.go b/storage/file_store.go index ee802c6..ebd6c23 100644 --- a/storage/file_store.go +++ b/storage/file_store.go @@ -1,8 +1,8 @@ package storage import ( - "io/ioutil" "git.openprivacy.ca/openprivacy/libricochet-go/log" + "io/ioutil" "os" "path" ) @@ -16,9 +16,10 @@ type fileStore struct { // FileStore is a primitive around storing encrypted files type FileStore interface { - Save([]byte) error - Load() ([]byte, error) + Write([]byte) error + Read() ([]byte, error) Delete() + ChangePassword(newpass string) } // NewFileStore instantiates a fileStore given a filename and a password @@ -30,8 +31,8 @@ func NewFileStore(directory string, filename string, password string) FileStore return filestore } -// save serializes a cwtchPeer to a file -func (fps *fileStore) Save(data []byte) error { +// write serializes a cwtchPeer to a file +func (fps *fileStore) Write(data []byte) error { key, salt, _ := createKey(fps.password) encryptedbytes, err := encryptFileData(data, key) if err != nil { @@ -44,7 +45,7 @@ func (fps *fileStore) Save(data []byte) error { return err } -func (fps *fileStore) Load() ([]byte, error) { +func (fps *fileStore) Read() ([]byte, error) { return readEncryptedFile(fps.directory, fps.filename, fps.password) } @@ -53,4 +54,8 @@ func (fps *fileStore) Delete() { if err != nil { log.Errorf("Deleting file %v\n", err) } -} \ No newline at end of file +} + +func (fps *fileStore) ChangePassword(newpass string) { + fps.password = newpass +} diff --git a/storage/profile_store.go b/storage/profile_store.go index 27288ba..9fd371c 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -9,11 +9,13 @@ import ( "time" ) +const groupIDLen = 32 +const peerIDLen = 56 const profileFilename = "profile" type profileStore struct { fs FileStore - streamStores map[string]StreamStore + streamStores map[string]StreamStore // map [groupId|onion] StreamStore directory string password string profile *model.Profile @@ -59,6 +61,7 @@ func NewProfileWriterStore(eventManager event.Manager, directory, password strin ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) ps.eventManager.Subscribe(event.DeleteContact, ps.queue) ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) + ps.eventManager.Subscribe(event.ChangePassword, ps.queue) return ps } @@ -116,17 +119,64 @@ func (ps *profileStore) GetStatusMessages() []*event.Event { return messages } +func (ps *profileStore) ChangePassword(oldpass, newpass, eventID string) { + if oldpass != ps.password { + ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID)) + return + } + + newStreamStores := map[string]StreamStore{} + idToNewLocalID := map[string]string{} + + // Generate all new StreamStores with the new password and write all the old StreamStore data into these ones + for ssid, ss := range ps.streamStores { + // New ss with new pass and new localID + newlocalID := model.GenerateRandomID() + idToNewLocalID[ssid] = newlocalID + + newSS := NewStreamStore(ps.directory, newlocalID, newpass) + newStreamStores[ssid] = newSS + + // write whole store + messages := ss.Read() + newSS.WriteN(messages) + } + + // Switch over + oldStreamStores := ps.streamStores + ps.streamStores = newStreamStores + for ssid, newLocalID := range idToNewLocalID { + if len(ssid) == groupIDLen { + ps.profile.Groups[ssid].LocalID = newLocalID + } else { + ps.profile.Contacts[ssid].LocalID = newLocalID + } + } + + ps.password = newpass + ps.fs.ChangePassword(newpass) + ps.save() + + // Clean up + for _, oldss := range oldStreamStores { + oldss.Delete() + } + + ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID)) + return +} + func (ps *profileStore) save() error { if ps.writer { bytes, _ := json.Marshal(ps.profile) - return ps.fs.Save(bytes) + return ps.fs.Write(bytes) } return nil } -// Load instantiates a cwtchPeer from the file store +// Read instantiates a cwtchPeer from the file store func (ps *profileStore) Load() error { - decrypted, err := ps.fs.Load() + decrypted, err := ps.fs.Read() if err != nil { return err } @@ -137,6 +187,7 @@ func (ps *profileStore) Load() error { for gid, group := range cp.Groups { ss := NewStreamStore(ps.directory, group.LocalID, ps.password) + cp.Groups[gid].Timeline.SetMessages(ss.Read()) ps.streamStores[group.GroupID] = ss } @@ -150,8 +201,11 @@ func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile { } func (ps *profileStore) eventHandler() { + log.Infoln("eventHandler()!") for { ev := ps.queue.Next() + log.Infof("eventHandler event %v\n", ev) + switch ev.EventType { case event.BlockPeer: contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) @@ -254,6 +308,10 @@ func (ps *profileStore) eventHandler() { ss.Delete() delete(ps.streamStores, groupID) } + case event.ChangePassword: + oldpass := ev.Data[event.Password] + newpass := ev.Data[event.NewPassword] + ps.ChangePassword(oldpass, newpass, ev.EventID) default: return } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index eb8cc72..64afb27 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -4,6 +4,8 @@ package storage import ( "cwtch.im/cwtch/event" + "fmt" + "log" "os" "testing" "time" @@ -16,6 +18,7 @@ const testInitialMessage = "howdy" const testMessage = "Hello from storage" func TestProfileStoreWriteRead(t *testing.T) { + log.Println("profile store test!") os.RemoveAll(testingDir) eventBus := event.NewEventManager() profile := NewProfile(testProfileName) @@ -68,3 +71,82 @@ func TestProfileStoreWriteRead(t *testing.T) { } } + +func TestProfileStoreChangePassword(t *testing.T) { + os.RemoveAll(testingDir) + eventBus := event.NewEventManager() + + queue := event.NewQueue() + eventBus.Subscribe(event.ChangePasswordSuccess, queue) + + profile := NewProfile(testProfileName) + ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) + + groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") + if err != nil { + t.Errorf("Creating group: %v\n", err) + } + if err != nil { + t.Errorf("Creating group invite: %v\n", err) + } + + eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) + time.Sleep(1 * time.Second) + + fmt.Println("Sending 200 messages...") + + for i := 0; i < 200; i++ { + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: ps1.GetProfileCopy(true).Onion, + event.Data: testMessage, + })) + } + + newPass := "qwerty123" + + fmt.Println("Sending Change Passwords event...") + eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass)) + + ev := queue.Next() + if ev.EventType != event.ChangePasswordSuccess { + t.Errorf("Unexpected event response detected %v\n", ev.EventType) + return + } + + fmt.Println("Sending 10 more messages...") + for i := 0; i < 10; i++ { + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: ps1.GetProfileCopy(true).Onion, + event.Data: testMessage, + })) + } + time.Sleep(3 * time.Second) + + fmt.Println("Shutdown profile store...") + ps1.Shutdown() + + fmt.Println("New Profile store...") + ps2 := NewProfileWriterStore(eventBus, testingDir, newPass, nil) + err = ps2.Load() + if err != nil { + t.Errorf("Error createing new profileStore with new password: %v\n", err) + return + } + + profile2 := ps2.GetProfileCopy(true) + + if profile2.Groups[groupid] == nil { + t.Errorf("Failed to load group %v\n", groupid) + return + } + + if len(profile2.Groups[groupid].Timeline.Messages) != 210 { + t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages)) + } +} diff --git a/storage/stream_store.go b/storage/stream_store.go index 845037d..4f55605 100644 --- a/storage/stream_store.go +++ b/storage/stream_store.go @@ -33,8 +33,10 @@ type streamStore struct { // StreamStore provides a stream like interface to encrypted storage type StreamStore interface { Write(message model.Message) + WriteN(messages []model.Message) Read() []model.Message Delete() + ChangePassword(newpass string) } // NewStreamStore returns an initialized StreamStore ready for reading and writing @@ -138,7 +140,7 @@ func (ss *streamStore) Read() (messages []model.Message) { return resp } -// AddMessage adds a GroupMessage to the store +// Write adds a GroupMessage to the store func (ss *streamStore) Write(m model.Message) { ss.lock.Lock() defer ss.lock.Unlock() @@ -151,3 +153,21 @@ func (ss *streamStore) Write(m model.Message) { ss.initBuffer() } } + +func (ss *streamStore) WriteN(messages []model.Message) { + ss.lock.Lock() + defer ss.lock.Unlock() + + for _, m := range messages { + ss.updateBuffer(m) + + if ss.bufferByteCount > bytesPerFile { + ss.updateFile() + log.Debugf("rotating log file") + ss.rotateFileStore() + ss.initBuffer() + } + } +} + +func (ss *streamStore) ChangePassword(newpass string) {} diff --git a/storage/stream_store_test.go b/storage/stream_store_test.go index b9a8e2e..53c36a7 100644 --- a/storage/stream_store_test.go +++ b/storage/stream_store_test.go @@ -2,7 +2,6 @@ package storage import ( "cwtch.im/cwtch/model" - "git.openprivacy.ca/openprivacy/libricochet-go/log" "os" "testing" ) @@ -13,9 +12,6 @@ const password = "asdfqwer" const line1 = "Hello from storage!" func TestStreamStoreWriteRead(t *testing.T) { - - log.SetLevel(log.LevelDebug) - os.Remove(".test.json") os.RemoveAll(testingDir) os.Mkdir(testingDir, 0777) @@ -34,8 +30,6 @@ func TestStreamStoreWriteRead(t *testing.T) { } func TestStreamStoreWriteReadRotate(t *testing.T) { - - log.SetLevel(log.LevelDebug) os.Remove(".test.json") os.RemoveAll(testingDir) os.Mkdir(testingDir, 0777) From 2dbab8cfc498c5879452e26abf0ee087c1ad03a8 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 13 Dec 2019 11:34:59 -0800 Subject: [PATCH 3/3] composed apps use their own mutexs --- app/app.go | 16 ++++++++-------- app/appClient.go | 14 ++++++++------ app/appService.go | 11 ++++++----- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/app/app.go b/app/app.go index d31bf11..e4427ec 100644 --- a/app/app.go +++ b/app/app.go @@ -25,7 +25,7 @@ type applicationCore struct { eventBuses map[string]event.Manager directory string - acmutex sync.Mutex + coremutex sync.Mutex } type application struct { @@ -85,8 +85,8 @@ func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) { profile := storage.NewProfile(name) - ac.acmutex.Lock() - defer ac.acmutex.Unlock() + ac.coremutex.Lock() + defer ac.coremutex.Unlock() _, exists := ac.eventBuses[profile.Onion] if exists { @@ -100,8 +100,8 @@ func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) { } func (ac *applicationCore) DeletePeer(onion string) { - ac.acmutex.Lock() - defer ac.acmutex.Unlock() + ac.coremutex.Lock() + defer ac.coremutex.Unlock() ac.eventBuses[onion].Shutdown() delete(ac.eventBuses, onion) @@ -144,7 +144,7 @@ func (app *application) CreatePeer(name string, password string) { func (app *application) DeletePeer(onion string) { log.Infof("DeletePeer called on %v\n", onion) app.appmutex.Lock() - defer app.appmutex.Lock() + defer app.appmutex.Unlock() app.appletPlugins.ShutdownPeer(onion) app.plugins.Delete(onion) @@ -198,9 +198,9 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf continue } - ac.acmutex.Lock() + ac.coremutex.Lock() ac.eventBuses[profile.Onion] = eventBus - ac.acmutex.Unlock() + ac.coremutex.Unlock() loadProfileFn(profile, profileStore) } diff --git a/app/appClient.go b/app/appClient.go index 0d76eab..8a6bd59 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -9,6 +9,7 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" "path" "strconv" + "sync" ) type applicationClient struct { @@ -16,6 +17,7 @@ type applicationClient struct { appletPeers appBus event.Manager + acmutex sync.Mutex } // NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied @@ -76,8 +78,8 @@ func (ac *applicationClient) newPeer(localID, password string, reload bool) { peer := peer.FromProfile(profile) peer.Init(eventBus) - ac.mutex.Lock() - defer ac.mutex.Unlock() + ac.acmutex.Lock() + defer ac.acmutex.Unlock() ac.peers[profile.Onion] = peer ac.eventBuses[profile.Onion] = eventBus npEvent := event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}) @@ -114,11 +116,11 @@ func (ac *applicationClient) ChangePeerPassword(onion, oldpass, newpass string) } func (ac *applicationClient) handleDeletedPeer(onion string) { - ac.mutex.Lock() + ac.acmutex.Lock() + defer ac.acmutex.Unlock() ac.peers[onion].Shutdown() delete(ac.peers, onion) ac.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - ac.mutex.Unlock() ac.applicationCore.DeletePeer(onion) } @@ -141,8 +143,8 @@ func (ac *applicationClient) QueryACNStatus() { // ShutdownPeer shuts down a peer and removes it from the app's management func (ac *applicationClient) ShutdownPeer(onion string) { - ac.mutex.Lock() - defer ac.mutex.Unlock() + ac.acmutex.Lock() + defer ac.acmutex.Unlock() ac.eventBuses[onion].Shutdown() delete(ac.eventBuses, onion) ac.peers[onion].Shutdown() diff --git a/app/appService.go b/app/appService.go index 328756e..e52ae48 100644 --- a/app/appService.go +++ b/app/appService.go @@ -11,6 +11,7 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" "path" "strconv" + "sync" ) type applicationService struct { @@ -20,6 +21,7 @@ type applicationService struct { storage map[string]storage.ProfileStore engines map[string]connections.Engine + asmutex sync.Mutex } // ApplicationService is the back end of an application that manages engines and writing storage and communicates to an ApplicationClient by an IPCBridge @@ -123,10 +125,10 @@ func (as *applicationService) loadProfiles(password string) { blockedPeers := profile.BlockedPeers() identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], profile.GetContacts(), blockedPeers) - as.mutex.Lock() + as.asmutex.Lock() as.storage[profile.Onion] = profileStore as.engines[profile.Onion] = engine - as.mutex.Unlock() + as.asmutex.Unlock() message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})} as.bridge.Write(&message) count++ @@ -148,7 +150,8 @@ func (as *applicationService) getACNStatusHandler() func(int, string) { } func (as *applicationService) deletePeer(onion string) { - as.mutex.Lock() + as.asmutex.Lock() + defer as.asmutex.Unlock() as.appletPlugins.ShutdownPeer(onion) as.plugins.Delete(onion) @@ -160,8 +163,6 @@ func (as *applicationService) deletePeer(onion string) { as.storage[onion].Delete() delete(as.storage, onion) - as.mutex.Unlock() - as.applicationCore.DeletePeer(onion) }