store acks; group ack messages
the build was successful Details

This commit is contained in:
Dan Ballard 2020-12-18 17:03:35 -08:00
parent 94b47d213f
commit d06b2e1241
2 changed files with 28 additions and 12 deletions

View File

@ -171,6 +171,7 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
PreviousMessageSig: message.PreviousMessageSig,
ReceivedByServer: true,
Error: "",
Acknowledged: true,
}
seen := g.Timeline.Insert(timelineMessage)

View File

@ -77,6 +77,7 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
@ -381,15 +382,28 @@ func (ps *ProfileStoreV1) eventHandler() {
} else {
log.Errorf("error storing new group invite: %v (%v)", err, ev)
}
case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer
ps.attemptSavePeerMessage(ev, false)
case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store.
// stream store doesn't support updates, so we don't want to commit it till ack'd
ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID)
case event.NewMessageFromPeer:
ps.attemptSavePeerMessage(ev, true)
ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true)
case event.PeerAcknowledgement:
onion := ev.Data[event.RemotePeer]
eventID := ev.Data[event.EventID]
contact, ok := ps.profile.Contacts[onion]
if ok {
mIdx, ok := contact.UnacknowledgedMessages[eventID]
if ok {
message := contact.Timeline.Messages[mIdx]
ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false)
}
}
ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature]), Acknowledged: true}
ss, exists := ps.streamStores[groupid]
if exists {
ss.Write(message)
@ -438,28 +452,29 @@ func (ps *ProfileStoreV1) eventHandler() {
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
// as coming from the remote peer or if it was sent by out profile.
func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) {
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) {
contact, exists := ps.profile.GetContact(peerID)
if exists {
val, _ := contact.GetAttribute(event.SaveHistoryKey)
switch val {
case event.SaveHistoryConfirmed:
{
peerID := ev.Data[event.RemotePeer]
peerID := peerID
var received time.Time
var message model.Message
if fromPeer {
received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
received, _ = time.Parse(time.RFC3339Nano, timestampeReceived)
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
} else {
received := time.Now()
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}}
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true}
}
ss, exists := ps.streamStores[peerID]
if exists {
ss.Write(message)
} else {
log.Errorf("error storing new peer message: %v stream store does not exist", ev)
log.Errorf("error storing new peer message: %v stream store does not exist", peerID)
}
}
default:
@ -467,7 +482,7 @@ func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool)
}
}
} else {
log.Errorf("error saving message for peer that doesn't exist: %v", ev)
log.Errorf("error saving message for peer that doesn't exist: %v", peerID)
}
}