Allow Peers to Store History #316
|
@ -9,5 +9,6 @@ server/app/messages
|
|||
.reviewboardrc
|
||||
/vendor/
|
||||
/testing/tor/
|
||||
/storage/*/testing/
|
||||
/storage/testing/
|
||||
/testing/storage/
|
||||
|
|
|
@ -93,7 +93,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
|
|||
return
|
||||
}
|
||||
if message1.Message.EventType != event.NewPeer {
|
||||
t.Errorf("Wrong message recieved, expected NewPeer\n")
|
||||
t.Errorf("Wrong message received, expected NewPeer\n")
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
|
|||
return
|
||||
}
|
||||
if message2.Message.EventType != event.DeleteContact {
|
||||
t.Errorf("Wrong message recieved, expected DeleteContact, got %v\n", message2)
|
||||
t.Errorf("Wrong message received, expected DeleteContact, got %v\n", message2)
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
|
|
|
@ -266,3 +266,18 @@ const (
|
|||
ContextGetVal = "im.cwtch.getVal"
|
||||
ContextRetVal = "im.cwtch.retVal"
|
||||
)
|
||||
|
||||
// Define Default Attribute Keys
|
||||
const (
|
||||
SaveHistoryKey = "SavePeerHistory"
|
||||
)
|
||||
|
||||
// Define Default Attribute Values
|
||||
const (
|
||||
// Save History has 3 distinct states. By default we don't save history (DefaultDeleteHistory), if the peer confirms this
|
||||
// we change to DeleteHistoryConfirmed, if they confirm they want to save then this becomes SaveHistoryConfirmed
|
||||
// We use this distinction between default and confirmed to drive UI
|
||||
DeleteHistoryDefault = "DefaultDeleteHistory"
|
||||
SaveHistoryConfirmed = "SaveHistory"
|
||||
DeleteHistoryConfirmed = "DeleteHistoryConfirmed"
|
||||
)
|
||||
|
|
|
@ -11,7 +11,7 @@ A local peer "Alice" has a PublicScope that is queryable by getVal requests.
|
|||
By default, for now, other scopes are private, of which we here define SettingsScope
|
||||
|
||||
Alice's peer structs of remote peers such as "Bob" keep the queried
|
||||
PublicScope values in the PeerScope, which can be overriden by the same named
|
||||
PublicScope values in the PeerScope, which can be overridden by the same named
|
||||
values stored in the LocalScope.
|
||||
|
||||
*/
|
||||
|
@ -27,32 +27,32 @@ const (
|
|||
SettingsScope = "settings"
|
||||
)
|
||||
|
||||
// Seperator for scope and the rest of path
|
||||
const Seperator = "."
|
||||
// Separator for scope and the rest of path
|
||||
const Separator = "."
|
||||
|
||||
// GetPublicScope takes a path and attaches the pubic scope to it
|
||||
func GetPublicScope(path string) string {
|
||||
return PublicScope + Seperator + path
|
||||
return PublicScope + Separator + path
|
||||
}
|
||||
|
||||
// GetSettingsScope takes a path and attaches the settings scope to it
|
||||
func GetSettingsScope(path string) string {
|
||||
return SettingsScope + Seperator + path
|
||||
return SettingsScope + Separator + path
|
||||
}
|
||||
|
||||
// GetLocalScope takes a path and attaches the local scope to it
|
||||
func GetLocalScope(path string) string {
|
||||
return LocalScope + Seperator + path
|
||||
return LocalScope + Separator + path
|
||||
}
|
||||
|
||||
// GetPeerScope takes a path and attaches the peer scope to it
|
||||
func GetPeerScope(path string) string {
|
||||
return PeerScope + Seperator + path
|
||||
return PeerScope + Separator + path
|
||||
}
|
||||
|
||||
// GetScopePath take a full path and returns the scope and the scope-less path
|
||||
func GetScopePath(fullPath string) (string, string) {
|
||||
parts := strings.SplitN(fullPath, Seperator, 1)
|
||||
parts := strings.SplitN(fullPath, Separator, 1)
|
||||
if len(parts) != 2 {
|
||||
return "", ""
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
type Authorization string
|
||||
|
||||
const (
|
||||
// AuthUnknown is an inital state for a new unseen peer
|
||||
// AuthUnknown is an initial state for a new unseen peer
|
||||
AuthUnknown Authorization = "unknown"
|
||||
// AuthApproved means the client has approved the peer, it can send messages to us, perform GetVals, etc
|
||||
AuthApproved Authorization = "approved"
|
||||
|
@ -469,6 +469,10 @@ func (p *Profile) GetCopy(timeline bool) *Profile {
|
|||
for groupID := range newp.Groups {
|
||||
newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy()
|
||||
}
|
||||
|
||||
for peerID := range newp.Contacts {
|
||||
newp.Contacts[peerID].Timeline = *p.Contacts[peerID].Timeline.GetCopy()
|
||||
}
|
||||
}
|
||||
|
||||
return newp
|
||||
|
|
|
@ -199,6 +199,9 @@ func (cp *cwtchPeer) AddContact(nick, onion string, authorization model.Authoriz
|
|||
event.RemotePeer: onion,
|
||||
}))
|
||||
cp.eventBus.Publish(event.NewEventList(event.SetPeerAuthorization, event.RemotePeer, onion, event.Authorization, string(authorization)))
|
||||
|
||||
// Default to Deleting Peer History
|
||||
cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault))
|
||||
}
|
||||
|
||||
// GetContacts returns an unordered list of onions
|
||||
|
|
|
@ -46,7 +46,7 @@ func NewStreamStore(directory string, filenameBase string, password string) (sto
|
|||
return ss
|
||||
}
|
||||
|
||||
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
|
||||
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
|
||||
func (ss *streamStore) Read() (messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
|
|
@ -76,6 +76,8 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
|
|||
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
|
||||
|
@ -258,6 +260,18 @@ func (ps *ProfileStoreV1) load() error {
|
|||
contact.Authorization = model.AuthApproved
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Check if there is any saved history...
|
||||
saveHistory, keyExists := contact.GetAttribute(event.SaveHistoryKey)
|
||||
if !keyExists {
|
||||
contact.SetAttribute(event.SaveHistoryKey, event.DeleteHistoryDefault)
|
||||
}
|
||||
|
||||
if saveHistory == event.SaveHistoryConfirmed {
|
||||
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
|
||||
cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[contact.Onion] = ss
|
||||
}
|
||||
}
|
||||
|
||||
for gid, group := range cp.Groups {
|
||||
|
@ -266,6 +280,7 @@ func (ps *ProfileStoreV1) load() error {
|
|||
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -292,11 +307,6 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
var pp *model.PublicProfile
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
|
||||
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
|
||||
// TODO: configure - allow peers to be configured to turn on limited storage
|
||||
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
|
||||
pp.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[pp.Onion] = ss
|
||||
ps.save()*/
|
||||
case event.GroupCreated:
|
||||
var group *model.Group
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
|
||||
|
@ -315,6 +325,27 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
if exists {
|
||||
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
|
||||
switch ev.Data[event.Key] {
|
||||
case event.SaveHistoryKey:
|
||||
if event.DeleteHistoryConfirmed == ev.Data[event.Data] {
|
||||
ss, exists := ps.streamStores[ev.Data[event.RemotePeer]]
|
||||
if exists {
|
||||
ss.Delete()
|
||||
delete(ps.streamStores, ev.Data[event.RemotePeer])
|
||||
}
|
||||
} else if event.SaveHistoryConfirmed == ev.Data[event.Data] {
|
||||
_, exists := ps.streamStores[ev.Data[event.RemotePeer]]
|
||||
if !exists {
|
||||
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
|
||||
ps.streamStores[ev.Data[event.RemotePeer]] = ss
|
||||
}
|
||||
}
|
||||
default:
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
|
||||
}
|
||||
|
@ -343,6 +374,10 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
} else {
|
||||
log.Errorf("error storing new group invite: %v (%v)", err, ev)
|
||||
}
|
||||
case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer
|
||||
ps.attemptSavePeerMessage(ev, false)
|
||||
case event.NewMessageFromPeer:
|
||||
dan
commented
shouldn't need this if we aren't setting a default here shouldn't need this if we aren't setting a default here
|
||||
ps.attemptSavePeerMessage(ev, true)
|
||||
case event.NewMessageFromGroup:
|
||||
groupid := ev.Data[event.GroupID]
|
||||
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
|
@ -368,6 +403,11 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
onion := ev.Data[event.RemotePeer]
|
||||
ps.profile.DeleteContact(onion)
|
||||
ps.save()
|
||||
ss, exists := ps.streamStores[onion]
|
||||
if exists {
|
||||
ss.Delete()
|
||||
delete(ps.streamStores, onion)
|
||||
}
|
||||
case event.DeleteGroup:
|
||||
groupID := ev.Data[event.GroupID]
|
||||
ps.profile.DeleteGroup(groupID)
|
||||
|
@ -388,6 +428,42 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
}
|
||||
}
|
||||
|
||||
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
|
||||
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
|
||||
// as coming from the remote peer or if it was sent by out profile.
|
||||
func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) {
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
val, _ := contact.GetAttribute(event.SaveHistoryKey)
|
||||
switch val {
|
||||
case event.SaveHistoryConfirmed:
|
||||
{
|
||||
peerID := ev.Data[event.RemotePeer]
|
||||
var received time.Time
|
||||
var message model.Message
|
||||
if fromPeer {
|
||||
received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
||||
} else {
|
||||
received := time.Now()
|
||||
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
||||
}
|
||||
ss, exists := ps.streamStores[peerID]
|
||||
if exists {
|
||||
ss.Write(message)
|
||||
} else {
|
||||
log.Errorf("error storing new peer message: %v stream store does not exist", ev)
|
||||
}
|
||||
}
|
||||
default:
|
||||
{
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Errorf("error saving message for peer that doesn't exist: %v", ev)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the queue / thread
|
||||
func (ps *ProfileStoreV1) Shutdown() {
|
||||
if ps.queue != nil {
|
||||
|
|
|
@ -108,7 +108,7 @@ func (ss *streamStore) Delete() {
|
|||
}
|
||||
}
|
||||
|
||||
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
|
||||
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
|
||||
func (ss *streamStore) Read() (messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
|
|
@ -63,7 +63,7 @@ func serverCheck(t *testing.T, serverAddr string) bool {
|
|||
|
||||
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
|
||||
for {
|
||||
fmt.Printf("%v checking group conection...\n", peer.GetName())
|
||||
fmt.Printf("%v checking group connection...\n", peer.GetName())
|
||||
state, ok := peer.GetGroupState(groupID)
|
||||
if ok {
|
||||
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state)
|
||||
|
|
Loading…
Reference in New Issue
like above, prolly better to check at load if the attribute is set or not and set it accordingly instead of the double initializer for it you have in the event handler below. Also since this is a required attribute, should also be added to cwtchPeer.AddContact, line 194 has the attributes being initialized.
then we can create a stream store only where needed, then the event handler can just reach for the stream store and if there is one, use it, if not, not use it, simplifying that code a bunch in the event handler
BUT it does mean then doing processing of the SavePeerHistory event and creating or deleting stream stores there.
but that's actually good as right now we don't capture that, so we never delete history when its changed? which means when someone turns it off, the existing history sits there, forever being loaded, but never updated, which isn't great, so that event does need handling regardless
also prolly want a ps.save after the loop to capture and save any new defaults added