diff --git a/app/app.go b/app/app.go index 7245529..d31bf11 100644 --- a/app/app.go +++ b/app/app.go @@ -18,13 +18,14 @@ import ( "sync" ) +// AttributeTag is a const name for a peer attribute that can be set at creation time, for example for versioning info const AttributeTag = "tag" type applicationCore struct { eventBuses map[string]event.Manager directory string - mutex sync.Mutex + acmutex sync.Mutex } type application struct { @@ -32,9 +33,10 @@ type application struct { appletPeers appletACN appletPlugins - storage map[string]storage.ProfileStore - engines map[string]connections.Engine - appBus event.Manager + storage map[string]storage.ProfileStore + engines map[string]connections.Engine + appBus event.Manager + appmutex sync.Mutex } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers @@ -44,6 +46,7 @@ type Application interface { CreateTaggedPeer(name string, password string, tag string) DeletePeer(onion string) AddPeerPlugin(onion string, pluginID plugins.PluginID) + ChangePeerPassword(onion, oldpass, newpass string) LaunchPeers() GetPrimaryBus() event.Manager @@ -82,8 +85,8 @@ func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) { profile := storage.NewProfile(name) - ac.mutex.Lock() - defer ac.mutex.Unlock() + ac.acmutex.Lock() + defer ac.acmutex.Unlock() _, exists := ac.eventBuses[profile.Onion] if exists { @@ -97,8 +100,8 @@ func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) { } func (ac *applicationCore) DeletePeer(onion string) { - ac.mutex.Lock() - defer ac.mutex.Unlock() + ac.acmutex.Lock() + defer ac.acmutex.Unlock() ac.eventBuses[onion].Shutdown() delete(ac.eventBuses, onion) @@ -133,7 +136,6 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) } - // CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine) func (app *application) CreatePeer(name string, password string) { app.CreateTaggedPeer(name, password, "") @@ -141,7 +143,8 @@ func (app *application) CreatePeer(name string, password string) { func (app *application) DeletePeer(onion string) { log.Infof("DeletePeer called on %v\n", onion) - app.mutex.Lock() + app.appmutex.Lock() + defer app.appmutex.Lock() app.appletPlugins.ShutdownPeer(onion) app.plugins.Delete(onion) @@ -157,12 +160,15 @@ func (app *application) DeletePeer(onion string) { delete(app.storage, onion) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - app.mutex.Unlock() app.applicationCore.DeletePeer(onion) log.Debugf("Delete peer for %v Done\n", onion) } +func (app *application) ChangePeerPassword(onion, oldpass, newpass string) { + app.eventBuses[onion].Publish(event.NewEventList(event.ChangePassword, event.Password, oldpass, event.NewPassword, newpass)) +} + func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) } @@ -192,9 +198,9 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf continue } - ac.mutex.Lock() + ac.acmutex.Lock() ac.eventBuses[profile.Onion] = eventBus - ac.mutex.Unlock() + ac.acmutex.Unlock() loadProfileFn(profile, profileStore) } @@ -211,11 +217,11 @@ func (app *application) LoadProfiles(password string) { blockedPeers := profile.BlockedPeers() identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], profile.GetContacts(), blockedPeers) - app.mutex.Lock() + app.appmutex.Lock() app.peers[profile.Onion] = peer app.storage[profile.Onion] = profileStore app.engines[profile.Onion] = engine - app.mutex.Unlock() + app.appmutex.Unlock() app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) count++ }) @@ -255,8 +261,8 @@ func (app *application) QueryACNStatus() { // ShutdownPeer shuts down a peer and removes it from the app's management func (app *application) ShutdownPeer(onion string) { - app.mutex.Lock() - defer app.mutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() app.eventBuses[onion].Shutdown() delete(app.eventBuses, onion) app.peers[onion].Shutdown() diff --git a/app/appClient.go b/app/appClient.go index 800ba0b..0d76eab 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -108,6 +108,11 @@ func (ac *applicationClient) DeletePeer(onion string) { ac.bridge.Write(&message) } +func (ac *applicationClient) ChangePeerPassword(onion, oldpass, newpass string) { + message := event.IPCMessage{Dest: onion, Message: event.NewEventList(event.ChangePassword, event.Password, oldpass, event.NewPassword, newpass)} + ac.bridge.Write(&message) +} + func (ac *applicationClient) handleDeletedPeer(onion string) { ac.mutex.Lock() ac.peers[onion].Shutdown() diff --git a/app/appService.go b/app/appService.go index 576d46b..328756e 100644 --- a/app/appService.go +++ b/app/appService.go @@ -153,8 +153,6 @@ func (as *applicationService) deletePeer(onion string) { as.appletPlugins.ShutdownPeer(onion) as.plugins.Delete(onion) - - as.engines[onion].Shutdown() delete(as.engines, onion) diff --git a/app/cwtchutil/main.go b/app/cwtchutil/main.go index 39343a2..08a13df 100644 --- a/app/cwtchutil/main.go +++ b/app/cwtchutil/main.go @@ -131,7 +131,7 @@ func main() { profileStore, _ := storage.NewProfileStore(nil, os.Args[2], pw) - err = profileStore.Load() + err = profileStore.Read() if err != nil { log.Errorln(err) os.Exit(1) diff --git a/event/common.go b/event/common.go index 6d0a475..688b2cb 100644 --- a/event/common.go +++ b/event/common.go @@ -82,6 +82,15 @@ const ( // RemotePeer [eg ""] PeerCreated = Type("PeerCreated") + // Password, NewPassword + ChangePassword = Type("ChangePassword") + + // Error(err), EventID + ChangePasswordError = Type("ChangePasswordError") + + // EventID + ChangePasswordSuccess = Type("ChangePasswordSuccess") + // a group has been successfully added or newly created // attributes: // Data [serialized *model.Group] @@ -159,6 +168,7 @@ const ( Shutdown = Type("Shutdown") // Error(err) + // Error creating peer PeerError = Type("PeerError") // Error(err) @@ -200,6 +210,7 @@ const ( ProfileName = Field("ProfileName") Password = Field("Password") + NewPassword = Field("NewPassword") ConnectionState = Field("ConnectionState") diff --git a/model/group.go b/model/group.go index d2444a5..b0cd5aa 100644 --- a/model/group.go +++ b/model/group.go @@ -36,7 +36,7 @@ type Group struct { // NewGroup initializes a new group associated with a given CwtchServer func NewGroup(server string) (*Group, error) { group := new(Group) - group.LocalID = generateRandomID() + group.LocalID = GenerateRandomID() if utils.IsValidHostname(server) == false { return nil, errors.New("Server is not a valid v3 onion") diff --git a/model/profile.go b/model/profile.go index 8460c69..d631012 100644 --- a/model/profile.go +++ b/model/profile.go @@ -44,7 +44,8 @@ type Profile struct { // TODO: Should this be per server? const MaxGroupMessageLength = 1800 -func generateRandomID() string { +// GenerateRandomID generates a random 16 byte hex id code +func GenerateRandomID() string { randBytes := make([]byte, 16) rand.Read(randBytes) return filepath.Join(hex.EncodeToString(randBytes)) @@ -55,7 +56,7 @@ func (p *PublicProfile) init() { p.Attributes = make(map[string]string) } p.unacknowledgedMessages = make(map[string]Message) - p.LocalID = generateRandomID() + p.LocalID = GenerateRandomID() } // SetAttribute allows applications to store arbitrary configuration info at the profile level. @@ -371,7 +372,7 @@ func (p *Profile) ProcessInvite(invite string, peerHostname string) (string, err if err == nil { group := new(Group) group.GroupID = gci.GetGroupName() - group.LocalID = generateRandomID() + group.LocalID = GenerateRandomID() group.SignedGroupID = gci.GetSignedGroupId() copy(group.GroupKey[:], gci.GetGroupSharedKey()[:]) group.GroupServer = gci.GetServerHost() diff --git a/storage/file_store.go b/storage/file_store.go index ee802c6..ebd6c23 100644 --- a/storage/file_store.go +++ b/storage/file_store.go @@ -1,8 +1,8 @@ package storage import ( - "io/ioutil" "git.openprivacy.ca/openprivacy/libricochet-go/log" + "io/ioutil" "os" "path" ) @@ -16,9 +16,10 @@ type fileStore struct { // FileStore is a primitive around storing encrypted files type FileStore interface { - Save([]byte) error - Load() ([]byte, error) + Write([]byte) error + Read() ([]byte, error) Delete() + ChangePassword(newpass string) } // NewFileStore instantiates a fileStore given a filename and a password @@ -30,8 +31,8 @@ func NewFileStore(directory string, filename string, password string) FileStore return filestore } -// save serializes a cwtchPeer to a file -func (fps *fileStore) Save(data []byte) error { +// write serializes a cwtchPeer to a file +func (fps *fileStore) Write(data []byte) error { key, salt, _ := createKey(fps.password) encryptedbytes, err := encryptFileData(data, key) if err != nil { @@ -44,7 +45,7 @@ func (fps *fileStore) Save(data []byte) error { return err } -func (fps *fileStore) Load() ([]byte, error) { +func (fps *fileStore) Read() ([]byte, error) { return readEncryptedFile(fps.directory, fps.filename, fps.password) } @@ -53,4 +54,8 @@ func (fps *fileStore) Delete() { if err != nil { log.Errorf("Deleting file %v\n", err) } -} \ No newline at end of file +} + +func (fps *fileStore) ChangePassword(newpass string) { + fps.password = newpass +} diff --git a/storage/profile_store.go b/storage/profile_store.go index 27288ba..9fd371c 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -9,11 +9,13 @@ import ( "time" ) +const groupIDLen = 32 +const peerIDLen = 56 const profileFilename = "profile" type profileStore struct { fs FileStore - streamStores map[string]StreamStore + streamStores map[string]StreamStore // map [groupId|onion] StreamStore directory string password string profile *model.Profile @@ -59,6 +61,7 @@ func NewProfileWriterStore(eventManager event.Manager, directory, password strin ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) ps.eventManager.Subscribe(event.DeleteContact, ps.queue) ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) + ps.eventManager.Subscribe(event.ChangePassword, ps.queue) return ps } @@ -116,17 +119,64 @@ func (ps *profileStore) GetStatusMessages() []*event.Event { return messages } +func (ps *profileStore) ChangePassword(oldpass, newpass, eventID string) { + if oldpass != ps.password { + ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID)) + return + } + + newStreamStores := map[string]StreamStore{} + idToNewLocalID := map[string]string{} + + // Generate all new StreamStores with the new password and write all the old StreamStore data into these ones + for ssid, ss := range ps.streamStores { + // New ss with new pass and new localID + newlocalID := model.GenerateRandomID() + idToNewLocalID[ssid] = newlocalID + + newSS := NewStreamStore(ps.directory, newlocalID, newpass) + newStreamStores[ssid] = newSS + + // write whole store + messages := ss.Read() + newSS.WriteN(messages) + } + + // Switch over + oldStreamStores := ps.streamStores + ps.streamStores = newStreamStores + for ssid, newLocalID := range idToNewLocalID { + if len(ssid) == groupIDLen { + ps.profile.Groups[ssid].LocalID = newLocalID + } else { + ps.profile.Contacts[ssid].LocalID = newLocalID + } + } + + ps.password = newpass + ps.fs.ChangePassword(newpass) + ps.save() + + // Clean up + for _, oldss := range oldStreamStores { + oldss.Delete() + } + + ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID)) + return +} + func (ps *profileStore) save() error { if ps.writer { bytes, _ := json.Marshal(ps.profile) - return ps.fs.Save(bytes) + return ps.fs.Write(bytes) } return nil } -// Load instantiates a cwtchPeer from the file store +// Read instantiates a cwtchPeer from the file store func (ps *profileStore) Load() error { - decrypted, err := ps.fs.Load() + decrypted, err := ps.fs.Read() if err != nil { return err } @@ -137,6 +187,7 @@ func (ps *profileStore) Load() error { for gid, group := range cp.Groups { ss := NewStreamStore(ps.directory, group.LocalID, ps.password) + cp.Groups[gid].Timeline.SetMessages(ss.Read()) ps.streamStores[group.GroupID] = ss } @@ -150,8 +201,11 @@ func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile { } func (ps *profileStore) eventHandler() { + log.Infoln("eventHandler()!") for { ev := ps.queue.Next() + log.Infof("eventHandler event %v\n", ev) + switch ev.EventType { case event.BlockPeer: contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) @@ -254,6 +308,10 @@ func (ps *profileStore) eventHandler() { ss.Delete() delete(ps.streamStores, groupID) } + case event.ChangePassword: + oldpass := ev.Data[event.Password] + newpass := ev.Data[event.NewPassword] + ps.ChangePassword(oldpass, newpass, ev.EventID) default: return } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index eb8cc72..64afb27 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -4,6 +4,8 @@ package storage import ( "cwtch.im/cwtch/event" + "fmt" + "log" "os" "testing" "time" @@ -16,6 +18,7 @@ const testInitialMessage = "howdy" const testMessage = "Hello from storage" func TestProfileStoreWriteRead(t *testing.T) { + log.Println("profile store test!") os.RemoveAll(testingDir) eventBus := event.NewEventManager() profile := NewProfile(testProfileName) @@ -68,3 +71,82 @@ func TestProfileStoreWriteRead(t *testing.T) { } } + +func TestProfileStoreChangePassword(t *testing.T) { + os.RemoveAll(testingDir) + eventBus := event.NewEventManager() + + queue := event.NewQueue() + eventBus.Subscribe(event.ChangePasswordSuccess, queue) + + profile := NewProfile(testProfileName) + ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) + + groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") + if err != nil { + t.Errorf("Creating group: %v\n", err) + } + if err != nil { + t.Errorf("Creating group invite: %v\n", err) + } + + eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) + time.Sleep(1 * time.Second) + + fmt.Println("Sending 200 messages...") + + for i := 0; i < 200; i++ { + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: ps1.GetProfileCopy(true).Onion, + event.Data: testMessage, + })) + } + + newPass := "qwerty123" + + fmt.Println("Sending Change Passwords event...") + eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass)) + + ev := queue.Next() + if ev.EventType != event.ChangePasswordSuccess { + t.Errorf("Unexpected event response detected %v\n", ev.EventType) + return + } + + fmt.Println("Sending 10 more messages...") + for i := 0; i < 10; i++ { + eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ + event.GroupID: groupid, + event.TimestampSent: time.Now().Format(time.RFC3339Nano), + event.TimestampReceived: time.Now().Format(time.RFC3339Nano), + event.RemotePeer: ps1.GetProfileCopy(true).Onion, + event.Data: testMessage, + })) + } + time.Sleep(3 * time.Second) + + fmt.Println("Shutdown profile store...") + ps1.Shutdown() + + fmt.Println("New Profile store...") + ps2 := NewProfileWriterStore(eventBus, testingDir, newPass, nil) + err = ps2.Load() + if err != nil { + t.Errorf("Error createing new profileStore with new password: %v\n", err) + return + } + + profile2 := ps2.GetProfileCopy(true) + + if profile2.Groups[groupid] == nil { + t.Errorf("Failed to load group %v\n", groupid) + return + } + + if len(profile2.Groups[groupid].Timeline.Messages) != 210 { + t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages)) + } +} diff --git a/storage/stream_store.go b/storage/stream_store.go index 845037d..4f55605 100644 --- a/storage/stream_store.go +++ b/storage/stream_store.go @@ -33,8 +33,10 @@ type streamStore struct { // StreamStore provides a stream like interface to encrypted storage type StreamStore interface { Write(message model.Message) + WriteN(messages []model.Message) Read() []model.Message Delete() + ChangePassword(newpass string) } // NewStreamStore returns an initialized StreamStore ready for reading and writing @@ -138,7 +140,7 @@ func (ss *streamStore) Read() (messages []model.Message) { return resp } -// AddMessage adds a GroupMessage to the store +// Write adds a GroupMessage to the store func (ss *streamStore) Write(m model.Message) { ss.lock.Lock() defer ss.lock.Unlock() @@ -151,3 +153,21 @@ func (ss *streamStore) Write(m model.Message) { ss.initBuffer() } } + +func (ss *streamStore) WriteN(messages []model.Message) { + ss.lock.Lock() + defer ss.lock.Unlock() + + for _, m := range messages { + ss.updateBuffer(m) + + if ss.bufferByteCount > bytesPerFile { + ss.updateFile() + log.Debugf("rotating log file") + ss.rotateFileStore() + ss.initBuffer() + } + } +} + +func (ss *streamStore) ChangePassword(newpass string) {} diff --git a/storage/stream_store_test.go b/storage/stream_store_test.go index b9a8e2e..53c36a7 100644 --- a/storage/stream_store_test.go +++ b/storage/stream_store_test.go @@ -2,7 +2,6 @@ package storage import ( "cwtch.im/cwtch/model" - "git.openprivacy.ca/openprivacy/libricochet-go/log" "os" "testing" ) @@ -13,9 +12,6 @@ const password = "asdfqwer" const line1 = "Hello from storage!" func TestStreamStoreWriteRead(t *testing.T) { - - log.SetLevel(log.LevelDebug) - os.Remove(".test.json") os.RemoveAll(testingDir) os.Mkdir(testingDir, 0777) @@ -34,8 +30,6 @@ func TestStreamStoreWriteRead(t *testing.T) { } func TestStreamStoreWriteReadRotate(t *testing.T) { - - log.SetLevel(log.LevelDebug) os.Remove(".test.json") os.RemoveAll(testingDir) os.Mkdir(testingDir, 0777)