store acks; group ack messages #337
|
@ -171,6 +171,7 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
|
||||||
PreviousMessageSig: message.PreviousMessageSig,
|
PreviousMessageSig: message.PreviousMessageSig,
|
||||||
ReceivedByServer: true,
|
ReceivedByServer: true,
|
||||||
Error: "",
|
Error: "",
|
||||||
|
Acknowledged: true,
|
||||||
}
|
}
|
||||||
seen := g.Timeline.Insert(timelineMessage)
|
seen := g.Timeline.Insert(timelineMessage)
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,7 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
|
||||||
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
|
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
|
||||||
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
|
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
|
||||||
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
|
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
|
||||||
|
ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue)
|
||||||
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
|
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
|
||||||
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
|
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
|
||||||
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
|
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
|
||||||
|
@ -381,15 +382,28 @@ func (ps *ProfileStoreV1) eventHandler() {
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("error storing new group invite: %v (%v)", err, ev)
|
log.Errorf("error storing new group invite: %v (%v)", err, ev)
|
||||||
}
|
}
|
||||||
case event.SendMessageToPeer: // We need this to be able to save the outgoing messages to a peer
|
case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store.
|
||||||
ps.attemptSavePeerMessage(ev, false)
|
// stream store doesn't support updates, so we don't want to commit it till ack'd
|
||||||
|
ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID)
|
||||||
case event.NewMessageFromPeer:
|
case event.NewMessageFromPeer:
|
||||||
ps.attemptSavePeerMessage(ev, true)
|
ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true)
|
||||||
|
case event.PeerAcknowledgement:
|
||||||
|
onion := ev.Data[event.RemotePeer]
|
||||||
|
eventID := ev.Data[event.EventID]
|
||||||
|
contact, ok := ps.profile.Contacts[onion]
|
||||||
|
if ok {
|
||||||
|
mIdx, ok := contact.UnacknowledgedMessages[eventID]
|
||||||
|
if ok {
|
||||||
|
message := contact.Timeline.Messages[mIdx]
|
||||||
|
ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
|
||||||
case event.NewMessageFromGroup:
|
case event.NewMessageFromGroup:
|
||||||
groupid := ev.Data[event.GroupID]
|
groupid := ev.Data[event.GroupID]
|
||||||
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||||
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
|
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
|
||||||
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
|
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature]), Acknowledged: true}
|
||||||
ss, exists := ps.streamStores[groupid]
|
ss, exists := ps.streamStores[groupid]
|
||||||
if exists {
|
if exists {
|
||||||
ss.Write(message)
|
ss.Write(message)
|
||||||
|
@ -438,28 +452,29 @@ func (ps *ProfileStoreV1) eventHandler() {
|
||||||
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
|
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
|
||||||
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
|
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
|
||||||
// as coming from the remote peer or if it was sent by out profile.
|
// as coming from the remote peer or if it was sent by out profile.
|
||||||
func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool) {
|
func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) {
|
||||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
contact, exists := ps.profile.GetContact(peerID)
|
||||||
if exists {
|
if exists {
|
||||||
val, _ := contact.GetAttribute(event.SaveHistoryKey)
|
val, _ := contact.GetAttribute(event.SaveHistoryKey)
|
||||||
switch val {
|
switch val {
|
||||||
case event.SaveHistoryConfirmed:
|
case event.SaveHistoryConfirmed:
|
||||||
{
|
{
|
||||||
peerID := ev.Data[event.RemotePeer]
|
peerID := peerID
|
||||||
var received time.Time
|
var received time.Time
|
||||||
var message model.Message
|
var message model.Message
|
||||||
|
|
||||||
if fromPeer {
|
if fromPeer {
|
||||||
received, _ = time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
received, _ = time.Parse(time.RFC3339Nano, timestampeReceived)
|
||||||
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
||||||
} else {
|
} else {
|
||||||
received := time.Now()
|
received := time.Now()
|
||||||
message = model.Message{Received: received, Timestamp: received, Message: ev.Data[event.Data], PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true}
|
||||||
}
|
}
|
||||||
ss, exists := ps.streamStores[peerID]
|
ss, exists := ps.streamStores[peerID]
|
||||||
if exists {
|
if exists {
|
||||||
ss.Write(message)
|
ss.Write(message)
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("error storing new peer message: %v stream store does not exist", ev)
|
log.Errorf("error storing new peer message: %v stream store does not exist", peerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -467,7 +482,7 @@ func (ps *ProfileStoreV1) attemptSavePeerMessage(ev *event.Event, fromPeer bool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("error saving message for peer that doesn't exist: %v", ev)
|
log.Errorf("error saving message for peer that doesn't exist: %v", peerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue