diff --git a/model/group.go b/model/group.go index 04d04e8..fddc7b7 100644 --- a/model/group.go +++ b/model/group.go @@ -171,6 +171,7 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (* PreviousMessageSig: message.PreviousMessageSig, ReceivedByServer: true, Error: "", + Acknowledged: true, } seen := g.Timeline.Insert(timelineMessage) diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go index dab50fd..b26b485 100644 --- a/storage/v1/profile_store.go +++ b/storage/v1/profile_store.go @@ -77,6 +77,7 @@ func (ps *ProfileStoreV1) initProfileWriterStore() { ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue) ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue) + ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue) ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue) ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) @@ -381,15 +382,28 @@ func (ps *ProfileStoreV1) eventHandler() { } else { log.Errorf("error storing new group invite: %v (%v)", err, ev) } - case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer - ps.attemptSavePeerMessage(ev, false) + case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store. + // stream store doesn't support updates, so we don't want to commit it till ack'd + ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID) case event.NewMessageFromPeer: - ps.attemptSavePeerMessage(ev, true) + ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true) + case event.PeerAcknowledgement: + onion := ev.Data[event.RemotePeer] + eventID := ev.Data[event.EventID] + contact, ok := ps.profile.Contacts[onion] + if ok { + mIdx, ok := contact.UnacknowledgedMessages[eventID] + if ok { + message := contact.Timeline.Messages[mIdx] + ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false) + } + } + ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) case event.NewMessageFromGroup: groupid := ev.Data[event.GroupID] received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) - message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])} + message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature]), Acknowledged: true} ss, exists := ps.streamStores[groupid] if exists { ss.Write(message) @@ -438,28 +452,29 @@ func (ps *ProfileStoreV1) eventHandler() { // attemptSavePeerMessage checks if the peer has been configured to save history from this peer // and if so the peer saves the message into history. fromPeer is used to control if the message is saved // as coming from the remote peer or if it was sent by out profile. -func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) { - contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) +func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) { + contact, exists := ps.profile.GetContact(peerID) if exists { val, _ := contact.GetAttribute(event.SaveHistoryKey) switch val { case event.SaveHistoryConfirmed: { - peerID := ev.Data[event.RemotePeer] + peerID := peerID var received time.Time var message model.Message + if fromPeer { - received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) - message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}} + received, _ = time.Parse(time.RFC3339Nano, timestampeReceived) + message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}} } else { received := time.Now() - message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}} + message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true} } ss, exists := ps.streamStores[peerID] if exists { ss.Write(message) } else { - log.Errorf("error storing new peer message: %v stream store does not exist", ev) + log.Errorf("error storing new peer message: %v stream store does not exist", peerID) } } default: @@ -467,7 +482,7 @@ func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) } } } else { - log.Errorf("error saving message for peer that doesn't exist: %v", ev) + log.Errorf("error saving message for peer that doesn't exist: %v", peerID) } }