Browse Source

Allow Peers to Store History

peer_history
Sarah Jamie Lewis 1 year ago
parent
commit
422b0d5deb
  1. 1
      .gitignore
  2. 4
      event/bridge/pipeBridge_test.go
  3. 15
      event/common.go
  4. 16
      model/attr/scope.go
  5. 6
      model/profile.go
  6. 3
      peer/cwtch_peer.go
  7. 2
      storage/v0/stream_store.go
  8. 86
      storage/v1/profile_store.go
  9. 2
      storage/v1/stream_store.go
  10. 2
      testing/cwtch_peer_server_integration_test.go

1
.gitignore

@ -9,5 +9,6 @@ server/app/messages
.reviewboardrc
/vendor/
/testing/tor/
/storage/*/testing/
/storage/testing/
/testing/storage/

4
event/bridge/pipeBridge_test.go

@ -93,7 +93,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
return
}
if message1.Message.EventType != event.NewPeer {
t.Errorf("Wrong message recieved, expected NewPeer\n")
t.Errorf("Wrong message received, expected NewPeer\n")
done <- true
return
}
@ -107,7 +107,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
return
}
if message2.Message.EventType != event.DeleteContact {
t.Errorf("Wrong message recieved, expected DeleteContact, got %v\n", message2)
t.Errorf("Wrong message received, expected DeleteContact, got %v\n", message2)
done <- true
return
}

15
event/common.go

@ -266,3 +266,18 @@ const (
ContextGetVal = "im.cwtch.getVal"
ContextRetVal = "im.cwtch.retVal"
)
// Define Default Attribute Keys
const (
SaveHistoryKey = "SavePeerHistory"
)
// Define Default Attribute Values
const (
// Save History has 3 distinct states. By default we don't save history (DefaultDeleteHistory), if the peer confirms this
// we change to DeleteHistoryConfirmed, if they confirm they want to save then this becomes SaveHistoryConfirmed
// We use this distinction between default and confirmed to drive UI
DeleteHistoryDefault = "DefaultDeleteHistory"
SaveHistoryConfirmed = "SaveHistory"
DeleteHistoryConfirmed = "DeleteHistoryConfirmed"
)

16
model/attr/scope.go

@ -11,7 +11,7 @@ A local peer "Alice" has a PublicScope that is queryable by getVal requests.
By default, for now, other scopes are private, of which we here define SettingsScope
Alice's peer structs of remote peers such as "Bob" keep the queried
PublicScope values in the PeerScope, which can be overriden by the same named
PublicScope values in the PeerScope, which can be overridden by the same named
values stored in the LocalScope.
*/
@ -27,32 +27,32 @@ const (
SettingsScope = "settings"
)
// Seperator for scope and the rest of path
const Seperator = "."
// Separator for scope and the rest of path
const Separator = "."
// GetPublicScope takes a path and attaches the pubic scope to it
func GetPublicScope(path string) string {
return PublicScope + Seperator + path
return PublicScope + Separator + path
}
// GetSettingsScope takes a path and attaches the settings scope to it
func GetSettingsScope(path string) string {
return SettingsScope + Seperator + path
return SettingsScope + Separator + path
}
// GetLocalScope takes a path and attaches the local scope to it
func GetLocalScope(path string) string {
return LocalScope + Seperator + path
return LocalScope + Separator + path
}
// GetPeerScope takes a path and attaches the peer scope to it
func GetPeerScope(path string) string {
return PeerScope + Seperator + path
return PeerScope + Separator + path
}
// GetScopePath take a full path and returns the scope and the scope-less path
func GetScopePath(fullPath string) (string, string) {
parts := strings.SplitN(fullPath, Seperator, 1)
parts := strings.SplitN(fullPath, Separator, 1)
if len(parts) != 2 {
return "", ""
}

6
model/profile.go

@ -22,7 +22,7 @@ import (
type Authorization string
const (
// AuthUnknown is an inital state for a new unseen peer
// AuthUnknown is an initial state for a new unseen peer
AuthUnknown Authorization = "unknown"
// AuthApproved means the client has approved the peer, it can send messages to us, perform GetVals, etc
AuthApproved Authorization = "approved"
@ -469,6 +469,10 @@ func (p *Profile) GetCopy(timeline bool) *Profile {
for groupID := range newp.Groups {
newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy()
}
for peerID := range newp.Contacts {
newp.Contacts[peerID].Timeline = *p.Contacts[peerID].Timeline.GetCopy()
}
}
return newp

3
peer/cwtch_peer.go

@ -199,6 +199,9 @@ func (cp *cwtchPeer) AddContact(nick, onion string, authorization model.Authoriz
event.RemotePeer: onion,
}))
cp.eventBus.Publish(event.NewEventList(event.SetPeerAuthorization, event.RemotePeer, onion, event.Authorization, string(authorization)))
// Default to Deleting Peer History
cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault))
}
// GetContacts returns an unordered list of onions

2
storage/v0/stream_store.go

@ -46,7 +46,7 @@ func NewStreamStore(directory string, filenameBase string, password string) (sto
return ss
}
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()

86
storage/v1/profile_store.go

@ -76,6 +76,8 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
@ -258,6 +260,18 @@ func (ps *ProfileStoreV1) load() error {
contact.Authorization = model.AuthApproved
}
}
// Check if there is any saved history...
saveHistory, keyExists := contact.GetAttribute(event.SaveHistoryKey)
if !keyExists {
contact.SetAttribute(event.SaveHistoryKey, event.DeleteHistoryDefault)
}
if saveHistory == event.SaveHistoryConfirmed {
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read())
ps.streamStores[contact.Onion] = ss
}
}
for gid, group := range cp.Groups {
@ -266,6 +280,7 @@ func (ps *ProfileStoreV1) load() error {
cp.Groups[gid].Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
}
}
return err
@ -292,11 +307,6 @@ func (ps *ProfileStoreV1) eventHandler() {
var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
// TODO: configure - allow peers to be configured to turn on limited storage
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
pp.Timeline.SetMessages(ss.Read())
ps.streamStores[pp.Onion] = ss
ps.save()*/
case event.GroupCreated:
var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
@ -315,6 +325,27 @@ func (ps *ProfileStoreV1) eventHandler() {
if exists {
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
switch ev.Data[event.Key] {
case event.SaveHistoryKey:
if event.DeleteHistoryConfirmed == ev.Data[event.Data] {
ss, exists := ps.streamStores[ev.Data[event.RemotePeer]]
if exists {
ss.Delete()
delete(ps.streamStores, ev.Data[event.RemotePeer])
}
} else if event.SaveHistoryConfirmed == ev.Data[event.Data] {
_, exists := ps.streamStores[ev.Data[event.RemotePeer]]
if !exists {
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
ps.streamStores[ev.Data[event.RemotePeer]] = ss
}
}
default:
{
}
}
} else {
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
}
@ -343,6 +374,10 @@ func (ps *ProfileStoreV1) eventHandler() {
} else {
log.Errorf("error storing new group invite: %v (%v)", err, ev)
}
case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer
ps.attemptSavePeerMessage(ev, false)
case event.NewMessageFromPeer:
ps.attemptSavePeerMessage(ev, true)
case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
@ -368,6 +403,11 @@ func (ps *ProfileStoreV1) eventHandler() {
onion := ev.Data[event.RemotePeer]
ps.profile.DeleteContact(onion)
ps.save()
ss, exists := ps.streamStores[onion]
if exists {
ss.Delete()
delete(ps.streamStores, onion)
}
case event.DeleteGroup:
groupID := ev.Data[event.GroupID]
ps.profile.DeleteGroup(groupID)
@ -388,6 +428,42 @@ func (ps *ProfileStoreV1) eventHandler() {
}
}
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
// as coming from the remote peer or if it was sent by out profile.
func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) {
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
val, _ := contact.GetAttribute(event.SaveHistoryKey)
switch val {
case event.SaveHistoryConfirmed:
{
peerID := ev.Data[event.RemotePeer]
var received time.Time
var message model.Message
if fromPeer {
received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
} else {
received := time.Now()
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}}
}
ss, exists := ps.streamStores[peerID]
if exists {
ss.Write(message)
} else {
log.Errorf("error storing new peer message: %v stream store does not exist", ev)
}
}
default:
{
}
}
} else {
log.Errorf("error saving message for peer that doesn't exist: %v", ev)
}
}
// Shutdown shuts down the queue / thread
func (ps *ProfileStoreV1) Shutdown() {
if ps.queue != nil {

2
storage/v1/stream_store.go

@ -108,7 +108,7 @@ func (ss *streamStore) Delete() {
}
}
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()

2
testing/cwtch_peer_server_integration_test.go

@ -63,7 +63,7 @@ func serverCheck(t *testing.T, serverAddr string) bool {
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
for {
fmt.Printf("%v checking group conection...\n", peer.GetName())
fmt.Printf("%v checking group connection...\n", peer.GetName())
state, ok := peer.GetGroupState(groupID)
if ok {
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state)

Loading…
Cancel
Save