From 77d26d3877503ac5d35c297cf160ce588e3405ef Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 18 Oct 2019 16:56:10 -0700 Subject: [PATCH] profile and peer messaging refactor. Profiles once again store timelines for peers, should be used as canonical timeline by frontend UI --- app/appClient.go | 1 - go.mod | 2 +- go.sum | 2 + model/group.go | 23 ++++++++ model/message.go | 3 + model/profile.go | 98 +++++++++++++++++++++++++++------ peer/cwtch_peer.go | 23 +++++++- protocol/connections/engine.go | 2 +- protocol/connections/peerapp.go | 1 - storage/stream_store.go | 6 ++ 10 files changed, 137 insertions(+), 24 deletions(-) diff --git a/app/appClient.go b/app/appClient.go index 5664880..cfe901d 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -86,7 +86,6 @@ func (ac *applicationClient) newPeer(localID, password string, reload bool) { if reload { ac.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadPeer, event.Identity, profile.Onion)}) } - } // CreatePeer messages the service to create a new Peer with the given name diff --git a/go.mod b/go.mod index c441eee..a1f5094 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module cwtch.im/cwtch require ( - cwtch.im/tapir v0.1.10 + cwtch.im/tapir v0.1.11 git.openprivacy.ca/openprivacy/libricochet-go v1.0.6 github.com/c-bata/go-prompt v0.2.3 github.com/golang/protobuf v1.3.2 diff --git a/go.sum b/go.sum index 62d9ed1..54100c9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ cwtch.im/tapir v0.1.10 h1:V+TkmwXNd6gySZqlVw468wMYEkmDwMSyvhkkpOfUw7w= cwtch.im/tapir v0.1.10/go.mod h1:EuRYdVrwijeaGBQ4OijDDRHf7R2MDSypqHkSl5DxI34= +cwtch.im/tapir v0.1.11 h1:JLm1MIYq4VXKzhj68+P8OuVPllAU9U6G0DtUor2fbc4= +cwtch.im/tapir v0.1.11/go.mod h1:EuRYdVrwijeaGBQ4OijDDRHf7R2MDSypqHkSl5DxI34= git.openprivacy.ca/openprivacy/libricochet-go v1.0.4 h1:GWLMJ5jBSIC/gFXzdbbeVz7fIAn2FTgW8+wBci6/3Ek= git.openprivacy.ca/openprivacy/libricochet-go v1.0.4/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= git.openprivacy.ca/openprivacy/libricochet-go v1.0.6 h1:5o4K2qn3otEE1InC5v5CzU0yL7Wl7DhVp4s8H3K6mXY= diff --git a/model/group.go b/model/group.go index ed49d8d..377badc 100644 --- a/model/group.go +++ b/model/group.go @@ -112,11 +112,32 @@ func (g *Group) AddSentMessage(message *protocol.DecryptedGroupMessage, sig []by Signature: sig, PeerID: message.GetOnion(), PreviousMessageSig: message.GetPreviousMessageSig(), + ReceivedByServer: false, } g.unacknowledgedMessages = append(g.unacknowledgedMessages, timelineMessage) return timelineMessage } +// ErrorSentMessage removes a sent message from the unacknowledged list and sets its error flag if found, otherwise returns false +func (g *Group) ErrorSentMessage(sig []byte, error string) bool { + g.lock.Lock() + defer g.lock.Unlock() + var message *Message + + // Delete the message from the unack'd buffer if it exists + for i, unAckedMessage := range g.unacknowledgedMessages { + if compareSignatures(unAckedMessage.Signature, sig) { + message = &unAckedMessage + g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...) + + message.Error = error + g.Timeline.Insert(message) + return true + } + } + return false +} + // AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline func (g *Group) AddMessage(message *protocol.DecryptedGroupMessage, sig []byte) (*Message, bool) { @@ -138,6 +159,8 @@ func (g *Group) AddMessage(message *protocol.DecryptedGroupMessage, sig []byte) Signature: sig, PeerID: message.GetOnion(), PreviousMessageSig: message.GetPreviousMessageSig(), + ReceivedByServer: true, + Error: "", } seen := g.Timeline.Insert(timelineMessage) diff --git a/model/message.go b/model/message.go index e5b2f4a..4bcd378 100644 --- a/model/message.go +++ b/model/message.go @@ -23,6 +23,9 @@ type Message struct { Message string Signature []byte PreviousMessageSig []byte + ReceivedByServer bool // messages sent to a server + Acknowledged bool // peer to peer + Error string `json:",omitempty"` } // MessageBaseSize is a rough estimate of the base number of bytes the struct uses before strings are populated diff --git a/model/profile.go b/model/profile.go index b2e1719..f434d8d 100644 --- a/model/profile.go +++ b/model/profile.go @@ -19,16 +19,17 @@ import ( // PublicProfile is a local copy of a CwtchIdentity type PublicProfile struct { - Name string - Ed25519PublicKey ed25519.PublicKey - Trusted bool - Blocked bool - Onion string - Attributes map[string]string - //Timeline Timeline `json:"-"` // TODO: cache recent messages for client - LocalID string // used by storage engine - State string `json:"-"` - lock sync.Mutex + Name string + Ed25519PublicKey ed25519.PublicKey + Trusted bool + Blocked bool + Onion string + Attributes map[string]string + Timeline Timeline `json:"-"` + LocalID string // used by storage engine + State string `json:"-"` + lock sync.Mutex + unacknowledgedMessages map[string]Message } // Profile encapsulates all the attributes necessary to be a Cwtch Peer. @@ -53,6 +54,7 @@ func (p *PublicProfile) init() { if p.Attributes == nil { p.Attributes = make(map[string]string) } + p.unacknowledgedMessages = make(map[string]Message) p.LocalID = generateRandomID() } @@ -119,25 +121,84 @@ func (p *Profile) RejectInvite(groupID string) { p.lock.Unlock() } -/* +// AddSentMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile. +func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt string, sent time.Time, eventID string) *Message { + p.lock.Lock() + defer p.lock.Unlock() + + contact, ok := p.Contacts[onion] + if ok { + now := time.Now() + sig := p.SignMessage(onion + messageTxt + sent.String() + now.String()) + + message := &Message{PeerID: p.Onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: false} + if contact.unacknowledgedMessages == nil { + contact.unacknowledgedMessages = make(map[string]Message) + } + contact.unacknowledgedMessages[eventID] = *message + return message + } + return nil +} + // AddMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile. -func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message string, sent time.Time) { +func (p *Profile) AddMessageToContactTimeline(onion string, messageTxt string, sent time.Time) (message *Message) { p.lock.Lock() defer p.lock.Unlock() contact, ok := p.Contacts[onion] // We don't really need a Signature here, but we use it to maintain order now := time.Now() - sig := p.SignMessage(onion + message + sent.String() + now.String()) + sig := p.SignMessage(onion + messageTxt + sent.String() + now.String()) if ok { - if fromMe { - contact.Timeline.Insert(&Message{PeerID: p.Onion, Message: message, Timestamp: sent, Received: now, Signature: sig}) - } else { - contact.Timeline.Insert(&Message{PeerID: onion, Message: message, Timestamp: sent, Received: now, Signature: sig}) + message = &Message{PeerID: onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: true} + contact.Timeline.Insert(message) + } + return +} + +// ErrorSentMessageToPeer sets a sent message's error message and removes it from the unacknowledged list +func (p *Profile) ErrorSentMessageToPeer(onion string, eventID string, error string) { + p.lock.Lock() + defer p.lock.Unlock() + + contact, ok := p.Contacts[onion] + if ok { + message, ok := contact.unacknowledgedMessages[eventID] + if ok { + message.Error = error + contact.Timeline.Insert(&message) // TODO: do we want a non timeline.Insert way to handle errors + delete(contact.unacknowledgedMessages, eventID) + } + } +} + +// AckSentMessageToPeer sets mesage to a peer as acknowledged +func (p *Profile) AckSentMessageToPeer(onion string, eventID string) { + p.lock.Lock() + defer p.lock.Unlock() + + contact, ok := p.Contacts[onion] + if ok { + message, ok := contact.unacknowledgedMessages[eventID] + if ok { + message.Acknowledged = true + contact.Timeline.Insert(&message) + delete(contact.unacknowledgedMessages, eventID) + } + } +} + +// AddGroupSentMessageError searches matching groups for the message by sig and marks it as an error +func (p *Profile) AddGroupSentMessageError(groupServer string, signature string, error string) { + for _, group := range p.Groups { + if group.GroupServer == groupServer { + if group.ErrorSentMessage([]byte(signature), error) { + break + } } } } -*/ // AcceptInvite accepts a group invite func (p *Profile) AcceptInvite(groupID string) (err error) { @@ -335,6 +396,7 @@ func (p *Profile) AddGroup(group *Group) { } // AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups. +// If successful, adds the message to the group's timeline func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, bool) { for _, group := range p.Groups { success, dgm := group.DecryptMessage(ciphertext) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 1930917..44cd4b6 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -14,7 +14,9 @@ import ( "time" ) -var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true} +var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, + event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, + event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true} // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer type cwtchPeer struct { @@ -90,7 +92,7 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) { go cp.eventHandler() cp.eventBus = eventBus - cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage}) + cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.PeerError, event.SendMessageToGroupError}) } // AutoHandleEvents sets an event (if able) to be handled by this peer @@ -262,6 +264,9 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) ( func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message}) cp.eventBus.Publish(event) + + cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID) + return event.EventID } @@ -346,11 +351,25 @@ func (cp *cwtchPeer) eventHandler() { /***** Default auto handled events *****/ case event.EncryptedGroupMessage: + // If successful, a side effect is the message is added to the group's timeline ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) if ok && !seen { cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) } + case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data + ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) + cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) + + case event.PeerAcknowledgement: + cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) + + case event.SendMessageToGroupError: + cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error]) + + case event.SendMessageToPeerError: + cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error]) + /***** Non default but requestable handlable events *****/ case event.NewGroupInvite: diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 6eb71d3..b5a3026 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -142,7 +142,7 @@ func (e *engine) eventHandler() { } err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data])) if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.UnblockPeer: // We simply remove the peer from our blocklist diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index b9e0800..c795fa4 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -78,7 +78,6 @@ func (pa PeerApp) listen() { pa.MessageHandler(pa.connection.Hostname(), peerMessage.Context, peerMessage.Data) // Acknowledge the message - // TODO Should this be in the ui? pa.SendMessage(PeerMessage{peerMessage.ID, event.ContextAck, []byte{}}) } } else { diff --git a/storage/stream_store.go b/storage/stream_store.go index 706314c..845037d 100644 --- a/storage/stream_store.go +++ b/storage/stream_store.go @@ -129,6 +129,12 @@ func (ss *streamStore) Read() (messages []model.Message) { resp = append(resp, msgs...) } + // 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without + for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ { + resp[i].Acknowledged = true + resp[i].ReceivedByServer = true + } + return resp }