store acks; group ack messages #337

Merged
sarah merged 1 commits from dan/cwtch:storeAcks into master 2020-12-19 01:16:18 +00:00
2 changed files with 28 additions and 12 deletions

View File

@ -171,6 +171,7 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
PreviousMessageSig: message.PreviousMessageSig, PreviousMessageSig: message.PreviousMessageSig,
ReceivedByServer: true, ReceivedByServer: true,
Error: "", Error: "",
Acknowledged: true,
} }
seen := g.Timeline.Insert(timelineMessage) seen := g.Timeline.Insert(timelineMessage)

View File

@ -77,6 +77,7 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue) ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue) ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue) ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
@ -381,15 +382,28 @@ func (ps *ProfileStoreV1) eventHandler() {
} else { } else {
log.Errorf("error storing new group invite: %v (%v)", err, ev) log.Errorf("error storing new group invite: %v (%v)", err, ev)
} }
case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store.
ps.attemptSavePeerMessage(ev, false) // stream store doesn't support updates, so we don't want to commit it till ack'd
ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID)
case event.NewMessageFromPeer: case event.NewMessageFromPeer:
ps.attemptSavePeerMessage(ev, true) ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true)
case event.PeerAcknowledgement:
onion := ev.Data[event.RemotePeer]
eventID := ev.Data[event.EventID]
contact, ok := ps.profile.Contacts[onion]
if ok {
mIdx, ok := contact.UnacknowledgedMessages[eventID]
if ok {
message := contact.Timeline.Messages[mIdx]
ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false)
}
}
ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
case event.NewMessageFromGroup: case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID] groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])} message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature]), Acknowledged: true}
ss, exists := ps.streamStores[groupid] ss, exists := ps.streamStores[groupid]
if exists { if exists {
ss.Write(message) ss.Write(message)
@ -438,28 +452,29 @@ func (ps *ProfileStoreV1) eventHandler() {
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer // attemptSavePeerMessage checks if the peer has been configured to save history from this peer
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved // and if so the peer saves the message into history. fromPeer is used to control if the message is saved
// as coming from the remote peer or if it was sent by out profile. // as coming from the remote peer or if it was sent by out profile.
func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) { func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) {
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) contact, exists := ps.profile.GetContact(peerID)
if exists { if exists {
val, _ := contact.GetAttribute(event.SaveHistoryKey) val, _ := contact.GetAttribute(event.SaveHistoryKey)
switch val { switch val {
case event.SaveHistoryConfirmed: case event.SaveHistoryConfirmed:
{ {
peerID := ev.Data[event.RemotePeer] peerID := peerID
var received time.Time var received time.Time
var message model.Message var message model.Message
if fromPeer { if fromPeer {
received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) received, _ = time.Parse(time.RFC3339Nano, timestampeReceived)
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}} message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
} else { } else {
received := time.Now() received := time.Now()
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}} message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true}
} }
ss, exists := ps.streamStores[peerID] ss, exists := ps.streamStores[peerID]
if exists { if exists {
ss.Write(message) ss.Write(message)
} else { } else {
log.Errorf("error storing new peer message: %v stream store does not exist", ev) log.Errorf("error storing new peer message: %v stream store does not exist", peerID)
} }
} }
default: default:
@ -467,7 +482,7 @@ func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool)
} }
} }
} else { } else {
log.Errorf("error saving message for peer that doesn't exist: %v", ev) log.Errorf("error saving message for peer that doesn't exist: %v", peerID)
} }
} }