Allow Peers to Store History
the build was successful Details

This commit is contained in:
Sarah Jamie Lewis 2020-07-08 11:29:33 -07:00
parent 3529e21b0e
commit db94a7d596
9 changed files with 79 additions and 19 deletions

1
.gitignore vendored
View File

@ -9,5 +9,6 @@ server/app/messages
.reviewboardrc .reviewboardrc
/vendor/ /vendor/
/testing/tor/ /testing/tor/
/storage/*/testing/
/storage/testing/ /storage/testing/
/testing/storage/ /testing/storage/

View File

@ -93,7 +93,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
return return
} }
if message1.Message.EventType != event.NewPeer { if message1.Message.EventType != event.NewPeer {
t.Errorf("Wrong message recieved, expected NewPeer\n") t.Errorf("Wrong message received, expected NewPeer\n")
done <- true done <- true
return return
} }
@ -107,7 +107,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
return return
} }
if message2.Message.EventType != event.DeleteContact { if message2.Message.EventType != event.DeleteContact {
t.Errorf("Wrong message recieved, expected DeleteContact, got %v\n", message2) t.Errorf("Wrong message received, expected DeleteContact, got %v\n", message2)
done <- true done <- true
return return
} }

View File

@ -266,3 +266,18 @@ const (
ContextGetVal = "im.cwtch.getVal" ContextGetVal = "im.cwtch.getVal"
ContextRetVal = "im.cwtch.retVal" ContextRetVal = "im.cwtch.retVal"
) )
// Define Default Attribute Keys
const (
SaveHistoryKey = "SavePeerHistory"
)
// Define Default Attribute Values
const (
// Save History has 3 distinct states. By default we don't save history (DontSaveHistoryDefault), if the peer confirms this
// we change to DontSaveHistoryConfirmed, if they confirm they want to save then this becomes SaveHistoryConfirmed
// We use this distinction between default and confirmed to drive UI
DontSaveHistoryDefault = "DefaultDontSaveHistory"
SaveHistoryConfirmed = "SaveHistory"
DontSaveHistoryConfirmed = "DontSaveHistory"
)

View File

@ -11,7 +11,7 @@ A local peer "Alice" has a PublicScope that is queryable by getVal requests.
By default, for now, other scopes are private, of which we here define SettingsScope By default, for now, other scopes are private, of which we here define SettingsScope
Alice's peer structs of remote peers such as "Bob" keep the queried Alice's peer structs of remote peers such as "Bob" keep the queried
PublicScope values in the PeerScope, which can be overriden by the same named PublicScope values in the PeerScope, which can be overridden by the same named
values stored in the LocalScope. values stored in the LocalScope.
*/ */
@ -27,32 +27,32 @@ const (
SettingsScope = "settings" SettingsScope = "settings"
) )
// Seperator for scope and the rest of path // Separator for scope and the rest of path
const Seperator = "." const Separator = "."
// GetPublicScope takes a path and attaches the pubic scope to it // GetPublicScope takes a path and attaches the pubic scope to it
func GetPublicScope(path string) string { func GetPublicScope(path string) string {
return PublicScope + Seperator + path return PublicScope + Separator + path
} }
// GetSettingsScope takes a path and attaches the settings scope to it // GetSettingsScope takes a path and attaches the settings scope to it
func GetSettingsScope(path string) string { func GetSettingsScope(path string) string {
return SettingsScope + Seperator + path return SettingsScope + Separator + path
} }
// GetLocalScope takes a path and attaches the local scope to it // GetLocalScope takes a path and attaches the local scope to it
func GetLocalScope(path string) string { func GetLocalScope(path string) string {
return LocalScope + Seperator + path return LocalScope + Separator + path
} }
// GetPeerScope takes a path and attaches the peer scope to it // GetPeerScope takes a path and attaches the peer scope to it
func GetPeerScope(path string) string { func GetPeerScope(path string) string {
return PeerScope + Seperator + path return PeerScope + Separator + path
} }
// GetScopePath take a full path and returns the scope and the scope-less path // GetScopePath take a full path and returns the scope and the scope-less path
func GetScopePath(fullPath string) (string, string) { func GetScopePath(fullPath string) (string, string) {
parts := strings.SplitN(fullPath, Seperator, 1) parts := strings.SplitN(fullPath, Separator, 1)
if len(parts) != 2 { if len(parts) != 2 {
return "", "" return "", ""
} }

View File

@ -22,7 +22,7 @@ import (
type Authorization string type Authorization string
const ( const (
// AuthUnknown is an inital state for a new unseen peer // AuthUnknown is an initial state for a new unseen peer
AuthUnknown Authorization = "unknown" AuthUnknown Authorization = "unknown"
// AuthApproved means the client has approved the peer, it can send messages to us, perform GetVals, etc // AuthApproved means the client has approved the peer, it can send messages to us, perform GetVals, etc
AuthApproved Authorization = "approved" AuthApproved Authorization = "approved"
@ -469,6 +469,10 @@ func (p *Profile) GetCopy(timeline bool) *Profile {
for groupID := range newp.Groups { for groupID := range newp.Groups {
newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy() newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy()
} }
for peerID := range newp.Contacts {
newp.Contacts[peerID].Timeline = *p.Contacts[peerID].Timeline.GetCopy()
}
} }
return newp return newp

View File

@ -46,7 +46,7 @@ func NewStreamStore(directory string, filenameBase string, password string) (sto
return ss return ss
} }
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file) // Read returns all messages from the backing file (not the buffer, for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) { func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock() ss.lock.Lock()
defer ss.lock.Unlock() defer ss.lock.Unlock()

View File

@ -258,6 +258,10 @@ func (ps *ProfileStoreV1) load() error {
contact.Authorization = model.AuthApproved contact.Authorization = model.AuthApproved
} }
} }
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read())
ps.streamStores[contact.Onion] = ss
} }
for gid, group := range cp.Groups { for gid, group := range cp.Groups {
@ -266,6 +270,7 @@ func (ps *ProfileStoreV1) load() error {
cp.Groups[gid].Timeline.SetMessages(ss.Read()) cp.Groups[gid].Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss ps.streamStores[group.GroupID] = ss
} }
} }
return err return err
@ -292,11 +297,6 @@ func (ps *ProfileStoreV1) eventHandler() {
var pp *model.PublicProfile var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp) json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp) ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
// TODO: configure - allow peers to be configured to turn on limited storage
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
pp.Timeline.SetMessages(ss.Read())
ps.streamStores[pp.Onion] = ss
ps.save()*/
case event.GroupCreated: case event.GroupCreated:
var group *model.Group var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group) json.Unmarshal([]byte(ev.Data[event.Data]), &group)
@ -343,6 +343,41 @@ func (ps *ProfileStoreV1) eventHandler() {
} else { } else {
log.Errorf("error storing new group invite: %v (%v)", err, ev) log.Errorf("error storing new group invite: %v (%v)", err, ev)
} }
case event.NewMessageFromPeer:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
val, ok := contact.GetAttribute(event.SaveHistoryKey)
if !ok {
contact.SetAttribute(event.SaveHistoryKey, event.DontSaveHistoryDefault)
} else {
switch val {
case event.SaveHistoryConfirmed:
{
peerID := ev.Data[event.RemotePeer]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
message := model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
ss, exists := ps.streamStores[peerID]
if exists {
ss.Write(message)
} else {
log.Errorf("error storing new peer message: %v stream store does not exist", ev)
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
ps.streamStores[peerID] = ss
ss.Write(message)
}
}
case event.DontSaveHistoryDefault:
{
}
case event.DontSaveHistoryConfirmed:
{
}
}
}
ps.save()
} else {
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
}
case event.NewMessageFromGroup: case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID] groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
@ -368,6 +403,11 @@ func (ps *ProfileStoreV1) eventHandler() {
onion := ev.Data[event.RemotePeer] onion := ev.Data[event.RemotePeer]
ps.profile.DeleteContact(onion) ps.profile.DeleteContact(onion)
ps.save() ps.save()
ss, exists := ps.streamStores[onion]
if exists {
ss.Delete()
delete(ps.streamStores, onion)
}
case event.DeleteGroup: case event.DeleteGroup:
groupID := ev.Data[event.GroupID] groupID := ev.Data[event.GroupID]
ps.profile.DeleteGroup(groupID) ps.profile.DeleteGroup(groupID)

View File

@ -108,7 +108,7 @@ func (ss *streamStore) Delete() {
} }
} }
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file) // Read returns all messages from the backing file (not the buffer, for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) { func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock() ss.lock.Lock()
defer ss.lock.Unlock() defer ss.lock.Unlock()

View File

@ -63,7 +63,7 @@ func serverCheck(t *testing.T, serverAddr string) bool {
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) { func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
for { for {
fmt.Printf("%v checking group conection...\n", peer.GetName()) fmt.Printf("%v checking group connection...\n", peer.GetName())
state, ok := peer.GetGroupState(groupID) state, ok := peer.GetGroupState(groupID)
if ok { if ok {
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state) fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state)