forked from cwtch.im/cwtch
Merge pull request 'Allow Peers to Store History' (#316) from peer_history into master
Reviewed-on: cwtch.im/cwtch#316
This commit is contained in:
commit
2c13feb71e
|
@ -9,5 +9,6 @@ server/app/messages
|
|||
.reviewboardrc
|
||||
/vendor/
|
||||
/testing/tor/
|
||||
/storage/*/testing/
|
||||
/storage/testing/
|
||||
/testing/storage/
|
||||
|
|
|
@ -93,7 +93,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
|
|||
return
|
||||
}
|
||||
if message1.Message.EventType != event.NewPeer {
|
||||
t.Errorf("Wrong message recieved, expected NewPeer\n")
|
||||
t.Errorf("Wrong message received, expected NewPeer\n")
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ func stableService(t *testing.T, in, out string, done chan bool) {
|
|||
return
|
||||
}
|
||||
if message2.Message.EventType != event.DeleteContact {
|
||||
t.Errorf("Wrong message recieved, expected DeleteContact, got %v\n", message2)
|
||||
t.Errorf("Wrong message received, expected DeleteContact, got %v\n", message2)
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
|
|
|
@ -266,3 +266,18 @@ const (
|
|||
ContextGetVal = "im.cwtch.getVal"
|
||||
ContextRetVal = "im.cwtch.retVal"
|
||||
)
|
||||
|
||||
// Define Default Attribute Keys
|
||||
const (
|
||||
SaveHistoryKey = "SavePeerHistory"
|
||||
)
|
||||
|
||||
// Define Default Attribute Values
|
||||
const (
|
||||
// Save History has 3 distinct states. By default we don't save history (DefaultDeleteHistory), if the peer confirms this
|
||||
// we change to DeleteHistoryConfirmed, if they confirm they want to save then this becomes SaveHistoryConfirmed
|
||||
// We use this distinction between default and confirmed to drive UI
|
||||
DeleteHistoryDefault = "DefaultDeleteHistory"
|
||||
SaveHistoryConfirmed = "SaveHistory"
|
||||
DeleteHistoryConfirmed = "DeleteHistoryConfirmed"
|
||||
)
|
||||
|
|
|
@ -11,7 +11,7 @@ A local peer "Alice" has a PublicScope that is queryable by getVal requests.
|
|||
By default, for now, other scopes are private, of which we here define SettingsScope
|
||||
|
||||
Alice's peer structs of remote peers such as "Bob" keep the queried
|
||||
PublicScope values in the PeerScope, which can be overriden by the same named
|
||||
PublicScope values in the PeerScope, which can be overridden by the same named
|
||||
values stored in the LocalScope.
|
||||
|
||||
*/
|
||||
|
@ -27,32 +27,32 @@ const (
|
|||
SettingsScope = "settings"
|
||||
)
|
||||
|
||||
// Seperator for scope and the rest of path
|
||||
const Seperator = "."
|
||||
// Separator for scope and the rest of path
|
||||
const Separator = "."
|
||||
|
||||
// GetPublicScope takes a path and attaches the pubic scope to it
|
||||
func GetPublicScope(path string) string {
|
||||
return PublicScope + Seperator + path
|
||||
return PublicScope + Separator + path
|
||||
}
|
||||
|
||||
// GetSettingsScope takes a path and attaches the settings scope to it
|
||||
func GetSettingsScope(path string) string {
|
||||
return SettingsScope + Seperator + path
|
||||
return SettingsScope + Separator + path
|
||||
}
|
||||
|
||||
// GetLocalScope takes a path and attaches the local scope to it
|
||||
func GetLocalScope(path string) string {
|
||||
return LocalScope + Seperator + path
|
||||
return LocalScope + Separator + path
|
||||
}
|
||||
|
||||
// GetPeerScope takes a path and attaches the peer scope to it
|
||||
func GetPeerScope(path string) string {
|
||||
return PeerScope + Seperator + path
|
||||
return PeerScope + Separator + path
|
||||
}
|
||||
|
||||
// GetScopePath take a full path and returns the scope and the scope-less path
|
||||
func GetScopePath(fullPath string) (string, string) {
|
||||
parts := strings.SplitN(fullPath, Seperator, 1)
|
||||
parts := strings.SplitN(fullPath, Separator, 1)
|
||||
if len(parts) != 2 {
|
||||
return "", ""
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
type Authorization string
|
||||
|
||||
const (
|
||||
// AuthUnknown is an inital state for a new unseen peer
|
||||
// AuthUnknown is an initial state for a new unseen peer
|
||||
AuthUnknown Authorization = "unknown"
|
||||
// AuthApproved means the client has approved the peer, it can send messages to us, perform GetVals, etc
|
||||
AuthApproved Authorization = "approved"
|
||||
|
@ -469,6 +469,10 @@ func (p *Profile) GetCopy(timeline bool) *Profile {
|
|||
for groupID := range newp.Groups {
|
||||
newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy()
|
||||
}
|
||||
|
||||
for peerID := range newp.Contacts {
|
||||
newp.Contacts[peerID].Timeline = *p.Contacts[peerID].Timeline.GetCopy()
|
||||
}
|
||||
}
|
||||
|
||||
return newp
|
||||
|
|
|
@ -199,6 +199,9 @@ func (cp *cwtchPeer) AddContact(nick, onion string, authorization model.Authoriz
|
|||
event.RemotePeer: onion,
|
||||
}))
|
||||
cp.eventBus.Publish(event.NewEventList(event.SetPeerAuthorization, event.RemotePeer, onion, event.Authorization, string(authorization)))
|
||||
|
||||
// Default to Deleting Peer History
|
||||
cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault))
|
||||
}
|
||||
|
||||
// GetContacts returns an unordered list of onions
|
||||
|
|
|
@ -46,7 +46,7 @@ func NewStreamStore(directory string, filenameBase string, password string) (sto
|
|||
return ss
|
||||
}
|
||||
|
||||
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
|
||||
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
|
||||
func (ss *streamStore) Read() (messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
|
|
@ -76,6 +76,8 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
|
|||
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
|
||||
|
@ -258,6 +260,18 @@ func (ps *ProfileStoreV1) load() error {
|
|||
contact.Authorization = model.AuthApproved
|
||||
}
|
||||
}
|
||||
|
||||
// Check if there is any saved history...
|
||||
saveHistory, keyExists := contact.GetAttribute(event.SaveHistoryKey)
|
||||
if !keyExists {
|
||||
contact.SetAttribute(event.SaveHistoryKey, event.DeleteHistoryDefault)
|
||||
}
|
||||
|
||||
if saveHistory == event.SaveHistoryConfirmed {
|
||||
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
|
||||
cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[contact.Onion] = ss
|
||||
}
|
||||
}
|
||||
|
||||
for gid, group := range cp.Groups {
|
||||
|
@ -266,6 +280,7 @@ func (ps *ProfileStoreV1) load() error {
|
|||
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -292,11 +307,6 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
var pp *model.PublicProfile
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
|
||||
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
|
||||
// TODO: configure - allow peers to be configured to turn on limited storage
|
||||
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
|
||||
pp.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[pp.Onion] = ss
|
||||
ps.save()*/
|
||||
case event.GroupCreated:
|
||||
var group *model.Group
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
|
||||
|
@ -315,6 +325,27 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
if exists {
|
||||
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
|
||||
switch ev.Data[event.Key] {
|
||||
case event.SaveHistoryKey:
|
||||
if event.DeleteHistoryConfirmed == ev.Data[event.Data] {
|
||||
ss, exists := ps.streamStores[ev.Data[event.RemotePeer]]
|
||||
if exists {
|
||||
ss.Delete()
|
||||
delete(ps.streamStores, ev.Data[event.RemotePeer])
|
||||
}
|
||||
} else if event.SaveHistoryConfirmed == ev.Data[event.Data] {
|
||||
_, exists := ps.streamStores[ev.Data[event.RemotePeer]]
|
||||
if !exists {
|
||||
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
|
||||
ps.streamStores[ev.Data[event.RemotePeer]] = ss
|
||||
}
|
||||
}
|
||||
default:
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
|
||||
}
|
||||
|
@ -343,6 +374,10 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
} else {
|
||||
log.Errorf("error storing new group invite: %v (%v)", err, ev)
|
||||
}
|
||||
case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer
|
||||
ps.attemptSavePeerMessage(ev, false)
|
||||
case event.NewMessageFromPeer:
|
||||
ps.attemptSavePeerMessage(ev, true)
|
||||
case event.NewMessageFromGroup:
|
||||
groupid := ev.Data[event.GroupID]
|
||||
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
|
@ -368,6 +403,11 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
onion := ev.Data[event.RemotePeer]
|
||||
ps.profile.DeleteContact(onion)
|
||||
ps.save()
|
||||
ss, exists := ps.streamStores[onion]
|
||||
if exists {
|
||||
ss.Delete()
|
||||
delete(ps.streamStores, onion)
|
||||
}
|
||||
case event.DeleteGroup:
|
||||
groupID := ev.Data[event.GroupID]
|
||||
ps.profile.DeleteGroup(groupID)
|
||||
|
@ -388,6 +428,42 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
}
|
||||
}
|
||||
|
||||
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
|
||||
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
|
||||
// as coming from the remote peer or if it was sent by out profile.
|
||||
func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) {
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
val, _ := contact.GetAttribute(event.SaveHistoryKey)
|
||||
switch val {
|
||||
case event.SaveHistoryConfirmed:
|
||||
{
|
||||
peerID := ev.Data[event.RemotePeer]
|
||||
var received time.Time
|
||||
var message model.Message
|
||||
if fromPeer {
|
||||
received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
||||
} else {
|
||||
received := time.Now()
|
||||
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
||||
}
|
||||
ss, exists := ps.streamStores[peerID]
|
||||
if exists {
|
||||
ss.Write(message)
|
||||
} else {
|
||||
log.Errorf("error storing new peer message: %v stream store does not exist", ev)
|
||||
}
|
||||
}
|
||||
default:
|
||||
{
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Errorf("error saving message for peer that doesn't exist: %v", ev)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the queue / thread
|
||||
func (ps *ProfileStoreV1) Shutdown() {
|
||||
if ps.queue != nil {
|
||||
|
|
|
@ -108,7 +108,7 @@ func (ss *streamStore) Delete() {
|
|||
}
|
||||
}
|
||||
|
||||
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
|
||||
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
|
||||
func (ss *streamStore) Read() (messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
|
|
@ -63,7 +63,7 @@ func serverCheck(t *testing.T, serverAddr string) bool {
|
|||
|
||||
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
|
||||
for {
|
||||
fmt.Printf("%v checking group conection...\n", peer.GetName())
|
||||
fmt.Printf("%v checking group connection...\n", peer.GetName())
|
||||
state, ok := peer.GetGroupState(groupID)
|
||||
if ok {
|
||||
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state)
|
||||
|
|
Loading…
Reference in New Issue