From 422b0d5debce4790fe8298f1334285133d2f4437 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 8 Jul 2020 11:29:33 -0700 Subject: [PATCH] Allow Peers to Store History --- .gitignore | 1 + event/bridge/pipeBridge_test.go | 4 +- event/common.go | 15 ++++ model/attr/scope.go | 16 ++-- model/profile.go | 6 +- peer/cwtch_peer.go | 3 + storage/v0/stream_store.go | 2 +- storage/v1/profile_store.go | 86 +++++++++++++++++-- storage/v1/stream_store.go | 2 +- testing/cwtch_peer_server_integration_test.go | 2 +- 10 files changed, 118 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index e5b23c2..c85bdd3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,6 @@ server/app/messages .reviewboardrc /vendor/ /testing/tor/ +/storage/*/testing/ /storage/testing/ /testing/storage/ diff --git a/event/bridge/pipeBridge_test.go b/event/bridge/pipeBridge_test.go index 46fd7f1..637f3a6 100644 --- a/event/bridge/pipeBridge_test.go +++ b/event/bridge/pipeBridge_test.go @@ -93,7 +93,7 @@ func stableService(t *testing.T, in, out string, done chan bool) { return } if message1.Message.EventType != event.NewPeer { - t.Errorf("Wrong message recieved, expected NewPeer\n") + t.Errorf("Wrong message received, expected NewPeer\n") done <- true return } @@ -107,7 +107,7 @@ func stableService(t *testing.T, in, out string, done chan bool) { return } if message2.Message.EventType != event.DeleteContact { - t.Errorf("Wrong message recieved, expected DeleteContact, got %v\n", message2) + t.Errorf("Wrong message received, expected DeleteContact, got %v\n", message2) done <- true return } diff --git a/event/common.go b/event/common.go index 6e299d9..ab950a8 100644 --- a/event/common.go +++ b/event/common.go @@ -266,3 +266,18 @@ const ( ContextGetVal = "im.cwtch.getVal" ContextRetVal = "im.cwtch.retVal" ) + +// Define Default Attribute Keys +const ( + SaveHistoryKey = "SavePeerHistory" +) + +// Define Default Attribute Values +const ( + // Save History has 3 distinct states. By default we don't save history (DefaultDeleteHistory), if the peer confirms this + // we change to DeleteHistoryConfirmed, if they confirm they want to save then this becomes SaveHistoryConfirmed + // We use this distinction between default and confirmed to drive UI + DeleteHistoryDefault = "DefaultDeleteHistory" + SaveHistoryConfirmed = "SaveHistory" + DeleteHistoryConfirmed = "DeleteHistoryConfirmed" +) diff --git a/model/attr/scope.go b/model/attr/scope.go index 5bf8eb9..60d40cc 100644 --- a/model/attr/scope.go +++ b/model/attr/scope.go @@ -11,7 +11,7 @@ A local peer "Alice" has a PublicScope that is queryable by getVal requests. By default, for now, other scopes are private, of which we here define SettingsScope Alice's peer structs of remote peers such as "Bob" keep the queried -PublicScope values in the PeerScope, which can be overriden by the same named +PublicScope values in the PeerScope, which can be overridden by the same named values stored in the LocalScope. */ @@ -27,32 +27,32 @@ const ( SettingsScope = "settings" ) -// Seperator for scope and the rest of path -const Seperator = "." +// Separator for scope and the rest of path +const Separator = "." // GetPublicScope takes a path and attaches the pubic scope to it func GetPublicScope(path string) string { - return PublicScope + Seperator + path + return PublicScope + Separator + path } // GetSettingsScope takes a path and attaches the settings scope to it func GetSettingsScope(path string) string { - return SettingsScope + Seperator + path + return SettingsScope + Separator + path } // GetLocalScope takes a path and attaches the local scope to it func GetLocalScope(path string) string { - return LocalScope + Seperator + path + return LocalScope + Separator + path } // GetPeerScope takes a path and attaches the peer scope to it func GetPeerScope(path string) string { - return PeerScope + Seperator + path + return PeerScope + Separator + path } // GetScopePath take a full path and returns the scope and the scope-less path func GetScopePath(fullPath string) (string, string) { - parts := strings.SplitN(fullPath, Seperator, 1) + parts := strings.SplitN(fullPath, Separator, 1) if len(parts) != 2 { return "", "" } diff --git a/model/profile.go b/model/profile.go index 38bf513..0fb2c69 100644 --- a/model/profile.go +++ b/model/profile.go @@ -22,7 +22,7 @@ import ( type Authorization string const ( - // AuthUnknown is an inital state for a new unseen peer + // AuthUnknown is an initial state for a new unseen peer AuthUnknown Authorization = "unknown" // AuthApproved means the client has approved the peer, it can send messages to us, perform GetVals, etc AuthApproved Authorization = "approved" @@ -469,6 +469,10 @@ func (p *Profile) GetCopy(timeline bool) *Profile { for groupID := range newp.Groups { newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy() } + + for peerID := range newp.Contacts { + newp.Contacts[peerID].Timeline = *p.Contacts[peerID].Timeline.GetCopy() + } } return newp diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index d52bd3e..24566e5 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -199,6 +199,9 @@ func (cp *cwtchPeer) AddContact(nick, onion string, authorization model.Authoriz event.RemotePeer: onion, })) cp.eventBus.Publish(event.NewEventList(event.SetPeerAuthorization, event.RemotePeer, onion, event.Authorization, string(authorization))) + + // Default to Deleting Peer History + cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault)) } // GetContacts returns an unordered list of onions diff --git a/storage/v0/stream_store.go b/storage/v0/stream_store.go index e6d0dd2..a3c2da5 100644 --- a/storage/v0/stream_store.go +++ b/storage/v0/stream_store.go @@ -46,7 +46,7 @@ func NewStreamStore(directory string, filenameBase string, password string) (sto return ss } -// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file) +// Read returns all messages from the backing file (not the buffer, for writing to the current file) func (ss *streamStore) Read() (messages []model.Message) { ss.lock.Lock() defer ss.lock.Unlock() diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go index cfae079..f15337e 100644 --- a/storage/v1/profile_store.go +++ b/storage/v1/profile_store.go @@ -76,6 +76,8 @@ func (ps *ProfileStoreV1) initProfileWriterStore() { ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue) ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue) ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) + ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue) + ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue) ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) ps.eventManager.Subscribe(event.DeleteContact, ps.queue) @@ -258,6 +260,18 @@ func (ps *ProfileStoreV1) load() error { contact.Authorization = model.AuthApproved } } + + // Check if there is any saved history... + saveHistory, keyExists := contact.GetAttribute(event.SaveHistoryKey) + if !keyExists { + contact.SetAttribute(event.SaveHistoryKey, event.DeleteHistoryDefault) + } + + if saveHistory == event.SaveHistoryConfirmed { + ss := NewStreamStore(ps.directory, contact.LocalID, ps.key) + cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read()) + ps.streamStores[contact.Onion] = ss + } } for gid, group := range cp.Groups { @@ -266,6 +280,7 @@ func (ps *ProfileStoreV1) load() error { cp.Groups[gid].Timeline.SetMessages(ss.Read()) ps.streamStores[group.GroupID] = ss } + } return err @@ -292,11 +307,6 @@ func (ps *ProfileStoreV1) eventHandler() { var pp *model.PublicProfile json.Unmarshal([]byte(ev.Data[event.Data]), &pp) ps.profile.AddContact(ev.Data[event.RemotePeer], pp) - // TODO: configure - allow peers to be configured to turn on limited storage - /*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password) - pp.Timeline.SetMessages(ss.Read()) - ps.streamStores[pp.Onion] = ss - ps.save()*/ case event.GroupCreated: var group *model.Group json.Unmarshal([]byte(ev.Data[event.Data]), &group) @@ -315,6 +325,27 @@ func (ps *ProfileStoreV1) eventHandler() { if exists { contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) ps.save() + + switch ev.Data[event.Key] { + case event.SaveHistoryKey: + if event.DeleteHistoryConfirmed == ev.Data[event.Data] { + ss, exists := ps.streamStores[ev.Data[event.RemotePeer]] + if exists { + ss.Delete() + delete(ps.streamStores, ev.Data[event.RemotePeer]) + } + } else if event.SaveHistoryConfirmed == ev.Data[event.Data] { + _, exists := ps.streamStores[ev.Data[event.RemotePeer]] + if !exists { + ss := NewStreamStore(ps.directory, contact.LocalID, ps.key) + ps.streamStores[ev.Data[event.RemotePeer]] = ss + } + } + default: + { + } + } + } else { log.Errorf("error setting attribute on peer %v peer does not exist", ev) } @@ -343,6 +374,10 @@ func (ps *ProfileStoreV1) eventHandler() { } else { log.Errorf("error storing new group invite: %v (%v)", err, ev) } + case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer + ps.attemptSavePeerMessage(ev, false) + case event.NewMessageFromPeer: + ps.attemptSavePeerMessage(ev, true) case event.NewMessageFromGroup: groupid := ev.Data[event.GroupID] received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) @@ -368,6 +403,11 @@ func (ps *ProfileStoreV1) eventHandler() { onion := ev.Data[event.RemotePeer] ps.profile.DeleteContact(onion) ps.save() + ss, exists := ps.streamStores[onion] + if exists { + ss.Delete() + delete(ps.streamStores, onion) + } case event.DeleteGroup: groupID := ev.Data[event.GroupID] ps.profile.DeleteGroup(groupID) @@ -388,6 +428,42 @@ func (ps *ProfileStoreV1) eventHandler() { } } +// attemptSavePeerMessage checks if the peer has been configured to save history from this peer +// and if so the peer saves the message into history. fromPeer is used to control if the message is saved +// as coming from the remote peer or if it was sent by out profile. +func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) { + contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) + if exists { + val, _ := contact.GetAttribute(event.SaveHistoryKey) + switch val { + case event.SaveHistoryConfirmed: + { + peerID := ev.Data[event.RemotePeer] + var received time.Time + var message model.Message + if fromPeer { + received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) + message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}} + } else { + received := time.Now() + message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}} + } + ss, exists := ps.streamStores[peerID] + if exists { + ss.Write(message) + } else { + log.Errorf("error storing new peer message: %v stream store does not exist", ev) + } + } + default: + { + } + } + } else { + log.Errorf("error saving message for peer that doesn't exist: %v", ev) + } +} + // Shutdown shuts down the queue / thread func (ps *ProfileStoreV1) Shutdown() { if ps.queue != nil { diff --git a/storage/v1/stream_store.go b/storage/v1/stream_store.go index 14d82bd..a160e23 100644 --- a/storage/v1/stream_store.go +++ b/storage/v1/stream_store.go @@ -108,7 +108,7 @@ func (ss *streamStore) Delete() { } } -// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file) +// Read returns all messages from the backing file (not the buffer, for writing to the current file) func (ss *streamStore) Read() (messages []model.Message) { ss.lock.Lock() defer ss.lock.Unlock() diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index f3ab4fc..32493ef 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -63,7 +63,7 @@ func serverCheck(t *testing.T, serverAddr string) bool { func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) { for { - fmt.Printf("%v checking group conection...\n", peer.GetName()) + fmt.Printf("%v checking group connection...\n", peer.GetName()) state, ok := peer.GetGroupState(groupID) if ok { fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state)