package v1 import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "encoding/base64" "encoding/json" "git.openprivacy.ca/openprivacy/log" "io/ioutil" "os" "path" "strconv" "time" ) const groupIDLen = 32 const peerIDLen = 56 const profileFilename = "profile" const version = "1" const versionFile = "VERSION" const saltFile = "SALT" //ProfileStoreV1 storage for profiles and message streams that uses in memory key and fs stored salt instead of in memory password type ProfileStoreV1 struct { fs FileStore streamStores map[string]StreamStore // map [groupId|onion] StreamStore directory string profile *model.Profile key [32]byte salt [128]byte eventManager event.Manager queue event.Queue writer bool } // CheckPassword returns true if the given password produces the same key as the current stored key, otherwise false. func (ps *ProfileStoreV1) CheckPassword(checkpass string) bool { oldkey := CreateKey(checkpass, ps.salt[:]) return oldkey == ps.key } // InitV1Directory generates a key and salt from a password, writes a SALT and VERSION file and returns the key and salt func InitV1Directory(directory, password string) ([32]byte, [128]byte, error) { os.Mkdir(directory, 0700) key, salt, err := CreateKeySalt(password) if err != nil { log.Errorf("Could not create key for profile store from password: %v\n", err) return [32]byte{}, [128]byte{}, err } if err = ioutil.WriteFile(path.Join(directory, versionFile), []byte(version), 0600); err != nil { log.Errorf("Could not write version file: %v", err) return [32]byte{}, [128]byte{}, err } if err = ioutil.WriteFile(path.Join(directory, saltFile), salt[:], 0600); err != nil { log.Errorf("Could not write salt file: %v", err) return [32]byte{}, [128]byte{}, err } return key, salt, nil } // CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them // directory should be $appDir/profiles/$rand func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV1 { key, salt, err := InitV1Directory(directory, password) if err != nil { return nil } ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} ps.save() ps.initProfileWriterStore() return ps } func (ps *ProfileStoreV1) initProfileWriterStore() { ps.queue = event.NewQueue() go ps.eventHandler() ps.eventManager.Subscribe(event.SetPeerAuthorization, ps.queue) ps.eventManager.Subscribe(event.PeerCreated, ps.queue) ps.eventManager.Subscribe(event.GroupCreated, ps.queue) ps.eventManager.Subscribe(event.SetAttribute, ps.queue) ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue) ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue) ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue) ps.eventManager.Subscribe(event.RejectGroupInvite, ps.queue) ps.eventManager.Subscribe(event.NewGroup, ps.queue) ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue) ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue) ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue) ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) ps.eventManager.Subscribe(event.DeleteContact, ps.queue) ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) ps.eventManager.Subscribe(event.ChangePassword, ps.queue) ps.eventManager.Subscribe(event.UpdateMessageFlags, ps.queue) } // LoadProfileWriterStore loads a profile store from filestore listening for events and saving them // directory should be $appDir/profiles/$rand func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (*ProfileStoreV1, error) { salt, err := ioutil.ReadFile(path.Join(directory, saltFile)) if err != nil { return nil, err } key := CreateKey(password, salt) ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} copy(ps.salt[:], salt) err = ps.load() if err != nil { return nil, err } ps.initProfileWriterStore() return ps, nil } // ReadProfile reads a profile from storqage and returns the profile // directory should be $appDir/profiles/$rand func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) { os.Mkdir(directory, 0700) ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} err := ps.load() if err != nil { return nil, err } profile := ps.GetProfileCopy(true) return profile, nil } // UpgradeV0Profile takes a profile (presumably from a V0 store) and creates and writes a V1 store func UpgradeV0Profile(profile *model.Profile, directory, password string) error { key, salt, err := InitV1Directory(directory, password) if err != nil { return err } ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} ps.save() for gid, group := range ps.profile.Groups { ss := NewStreamStore(ps.directory, group.LocalID, ps.key) ss.WriteN(ps.profile.Groups[gid].Timeline.Messages) } return nil } // NewProfile creates a new profile for use in the profile store. func NewProfile(name string) *model.Profile { profile := model.GenerateNewProfile(name) return profile } // GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers func (ps *ProfileStoreV1) GetNewPeerMessage() *event.Event { message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Key, string(ps.key[:]), event.Salt, string(ps.salt[:])) return &message } // GetStatusMessages creates an array of status messages for all peers and group servers from current information func (ps *ProfileStoreV1) GetStatusMessages() []*event.Event { messages := []*event.Event{} for _, contact := range ps.profile.Contacts { message := event.NewEvent(event.PeerStateChange, map[event.Field]string{ event.RemotePeer: string(contact.Onion), event.ConnectionState: contact.State, }) messages = append(messages, &message) } doneServers := make(map[string]bool) for _, group := range ps.profile.Groups { if _, exists := doneServers[group.GroupServer]; !exists { message := event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: string(group.GroupServer), event.ConnectionState: group.State, }) messages = append(messages, &message) doneServers[group.GroupServer] = true } } return messages } // ChangePassword restores all data under a new password's encryption func (ps *ProfileStoreV1) ChangePassword(oldpass, newpass, eventID string) { oldkey := CreateKey(oldpass, ps.salt[:]) if oldkey != ps.key { ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID)) return } newkey := CreateKey(newpass, ps.salt[:]) newStreamStores := map[string]StreamStore{} idToNewLocalID := map[string]string{} // Generate all new StreamStores with the new password and write all the old StreamStore data into these ones for ssid, ss := range ps.streamStores { // New ss with new pass and new localID newlocalID := model.GenerateRandomID() idToNewLocalID[ssid] = newlocalID newSS := NewStreamStore(ps.directory, newlocalID, newkey) newStreamStores[ssid] = newSS // write whole store messages := ss.Read() newSS.WriteN(messages) } // Switch over oldStreamStores := ps.streamStores ps.streamStores = newStreamStores for ssid, newLocalID := range idToNewLocalID { if len(ssid) == groupIDLen { ps.profile.Groups[ssid].LocalID = newLocalID } else { if ps.profile.Contacts[ssid] != nil { ps.profile.Contacts[ssid].LocalID = newLocalID } else { log.Errorf("Unknown Contact: %v. This is probably the result of corrupted development data from fuzzing. This contact will not appear in the new profile.", ssid) } } } ps.key = newkey ps.fs.ChangeKey(newkey) ps.save() // Clean up for _, oldss := range oldStreamStores { oldss.Delete() } ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID)) return } func (ps *ProfileStoreV1) save() error { if ps.writer { bytes, _ := json.Marshal(ps.profile) return ps.fs.Write(bytes) } return nil } func (ps *ProfileStoreV1) regenStreamStore(messages []model.Message, contact string) { oldss := ps.streamStores[contact] newLocalID := model.GenerateRandomID() newSS := NewStreamStore(ps.directory, newLocalID, ps.key) newSS.WriteN(messages) if len(contact) == groupIDLen { ps.profile.Groups[contact].LocalID = newLocalID } else { // We can assume this exists as regen stream store should only happen to *update* a message ps.profile.Contacts[contact].LocalID = newLocalID } ps.streamStores[contact] = newSS ps.save() oldss.Delete() } // load instantiates a cwtchPeer from the file store func (ps *ProfileStoreV1) load() error { decrypted, err := ps.fs.Read() if err != nil { return err } cp := new(model.Profile) err = json.Unmarshal(decrypted, &cp) if err == nil { ps.profile = cp // TODO 2020.06.09: v1 update, Remove on v2 // if we already have the contact it can be assumed "approved" unless blocked for _, contact := range cp.Contacts { if contact.Authorization == "" { if contact.DeprecatedBlocked { contact.Authorization = model.AuthBlocked } else { contact.Authorization = model.AuthApproved } } // Check if there is any saved history... saveHistory, keyExists := contact.GetAttribute(event.SaveHistoryKey) if !keyExists { contact.SetAttribute(event.SaveHistoryKey, event.DeleteHistoryDefault) } if saveHistory == event.SaveHistoryConfirmed { ss := NewStreamStore(ps.directory, contact.LocalID, ps.key) cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read()) ps.streamStores[contact.Onion] = ss } } for gid, group := range cp.Groups { if group.Version == 0 { log.Infof("group %v is of unsupported version 0. dropping group...\n", group.GroupID) delete(cp.Groups, gid) continue } ss := NewStreamStore(ps.directory, group.LocalID, ps.key) cp.Groups[gid].Timeline.SetMessages(ss.Read()) cp.Groups[gid].Timeline.Sort() ps.streamStores[group.GroupID] = ss } ps.save() } return err } // GetProfileCopy returns a copy of the stored profile func (ps *ProfileStoreV1) GetProfileCopy(timeline bool) *model.Profile { return ps.profile.GetCopy(timeline) } func (ps *ProfileStoreV1) eventHandler() { for { ev := ps.queue.Next() log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID) switch ev.EventType { case event.SetPeerAuthorization: err := ps.profile.SetContactAuthorization(ev.Data[event.RemotePeer], model.Authorization(ev.Data[event.Authorization])) if err == nil { ps.save() } case event.PeerCreated: var pp *model.PublicProfile json.Unmarshal([]byte(ev.Data[event.Data]), &pp) ps.profile.AddContact(ev.Data[event.RemotePeer], pp) case event.GroupCreated: var group *model.Group json.Unmarshal([]byte(ev.Data[event.Data]), &group) ps.profile.AddGroup(group) ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key) ps.save() case event.SetAttribute: ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) ps.save() case event.SetPeerAttribute: contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) if exists { contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) ps.save() switch ev.Data[event.Key] { case event.SaveHistoryKey: if event.DeleteHistoryConfirmed == ev.Data[event.Data] { ss, exists := ps.streamStores[ev.Data[event.RemotePeer]] if exists { ss.Delete() delete(ps.streamStores, ev.Data[event.RemotePeer]) } } else if event.SaveHistoryConfirmed == ev.Data[event.Data] { _, exists := ps.streamStores[ev.Data[event.RemotePeer]] if !exists { ss := NewStreamStore(ps.directory, contact.LocalID, ps.key) ps.streamStores[ev.Data[event.RemotePeer]] = ss } } default: { } } } else { log.Errorf("error setting attribute on peer %v peer does not exist", ev) } case event.SetGroupAttribute: group := ps.profile.GetGroup(ev.Data[event.GroupID]) if group != nil { group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) ps.save() } else { log.Errorf("error setting attribute on group %v group does not exist", ev) } case event.AcceptGroupInvite: err := ps.profile.AcceptInvite(ev.Data[event.GroupID]) if err == nil { ps.save() } else { log.Errorf("error accepting group invite") } case event.RejectGroupInvite: ps.profile.RejectInvite(ev.Data[event.GroupID]) ps.save() case event.NewGroup: gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite]) if err == nil { ps.save() group := ps.profile.Groups[gid] ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key) } else { log.Errorf("error storing new group invite: %v (%v)", err, ev) } case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store. // stream store doesn't support updates, so we don't want to commit it till ack'd ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID) case event.NewMessageFromPeer: ps.profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now()) ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true) case event.PeerAcknowledgement: onion := ev.Data[event.RemotePeer] eventID := ev.Data[event.EventID] contact, ok := ps.profile.Contacts[onion] if ok { mIdx, ok := contact.UnacknowledgedMessages[eventID] if ok { message := contact.Timeline.Messages[mIdx] ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false) } } ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) case event.NewMessageFromGroup: groupid := ev.Data[event.GroupID] received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) sig, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) prevsig, _ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature]) message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true} ss, exists := ps.streamStores[groupid] if exists { // We need to store a local copy of the message... ps.profile.GetGroup(groupid).Timeline.Insert(&message) ss.Write(message) } else { log.Errorf("error storing new group message: %v stream store does not exist", ev) } case event.PeerStateChange: if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists { ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] } case event.ServerStateChange: for _, group := range ps.profile.Groups { if group.GroupServer == ev.Data[event.GroupServer] { group.State = ev.Data[event.ConnectionState] } } case event.DeleteContact: onion := ev.Data[event.RemotePeer] ps.profile.DeleteContact(onion) ps.save() ss, exists := ps.streamStores[onion] if exists { ss.Delete() delete(ps.streamStores, onion) } case event.DeleteGroup: groupID := ev.Data[event.GroupID] ps.profile.DeleteGroup(groupID) ps.save() ss, exists := ps.streamStores[groupID] if exists { ss.Delete() delete(ps.streamStores, groupID) } case event.ChangePassword: oldpass := ev.Data[event.Password] newpass := ev.Data[event.NewPassword] ps.ChangePassword(oldpass, newpass, ev.EventID) case event.UpdateMessageFlags: handle := ev.Data[event.Handle] mIx, err := strconv.Atoi(ev.Data[event.Index]) if err != nil { log.Errorf("Invalid Message Index: %v", err) return } flags, err := strconv.ParseUint(ev.Data[event.Flags], 2, 64) if err != nil { log.Errorf("Invalid Message Flags: %v", err) return } ps.profile.UpdateMessageFlags(handle, mIx, flags) if len(handle) == groupIDLen { ps.regenStreamStore(ps.profile.GetGroup(handle).Timeline.Messages, handle) } else if contact, exists := ps.profile.GetContact(handle); exists { if exists { val, _ := contact.GetAttribute(event.SaveHistoryKey) if val == event.SaveHistoryConfirmed { ps.regenStreamStore(contact.Timeline.Messages, handle) } } } default: log.Debugf("shutting down profile store: %v", ev) return } } } // attemptSavePeerMessage checks if the peer has been configured to save history from this peer // and if so the peer saves the message into history. fromPeer is used to control if the message is saved // as coming from the remote peer or if it was sent by out profile. func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) { contact, exists := ps.profile.GetContact(peerID) if exists { val, _ := contact.GetAttribute(event.SaveHistoryKey) switch val { case event.SaveHistoryConfirmed: { peerID := peerID var received time.Time var message model.Message if fromPeer { received, _ = time.Parse(time.RFC3339Nano, timestampeReceived) message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}} } else { received := time.Now() message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true} } ss, exists := ps.streamStores[peerID] if exists { ss.Write(message) } else { log.Errorf("error storing new peer message: %v stream store does not exist", peerID) } } default: { } } } else { log.Errorf("error saving message for peer that doesn't exist: %v", peerID) } } // Shutdown shuts down the queue / thread func (ps *ProfileStoreV1) Shutdown() { if ps.queue != nil { ps.queue.Shutdown() } } // Delete removes all stored files for this stored profile func (ps *ProfileStoreV1) Delete() { log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion) for _, ss := range ps.streamStores { ss.Delete() } ps.fs.Delete() err := os.RemoveAll(ps.directory) if err != nil { log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err) } }