Browse Source

profile and peer messaging refactor. Profiles once again store timelines for peers, should be used as canonical timeline by frontend UI

pull/278/head
Dan Ballard 1 month ago
parent
commit
77d26d3877
10 changed files with 137 additions and 24 deletions
  1. 0
    1
      app/appClient.go
  2. 1
    1
      go.mod
  3. 2
    0
      go.sum
  4. 23
    0
      model/group.go
  5. 3
    0
      model/message.go
  6. 80
    18
      model/profile.go
  7. 21
    2
      peer/cwtch_peer.go
  8. 1
    1
      protocol/connections/engine.go
  9. 0
    1
      protocol/connections/peerapp.go
  10. 6
    0
      storage/stream_store.go

+ 0
- 1
app/appClient.go View File

@@ -86,7 +86,6 @@ func (ac *applicationClient) newPeer(localID, password string, reload bool) {
if reload {
ac.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadPeer, event.Identity, profile.Onion)})
}

}

// CreatePeer messages the service to create a new Peer with the given name

+ 1
- 1
go.mod View File

@@ -1,7 +1,7 @@
module cwtch.im/cwtch

require (
cwtch.im/tapir v0.1.10
cwtch.im/tapir v0.1.11
git.openprivacy.ca/openprivacy/libricochet-go v1.0.6
github.com/c-bata/go-prompt v0.2.3
github.com/golang/protobuf v1.3.2

+ 2
- 0
go.sum View File

@@ -1,5 +1,7 @@
cwtch.im/tapir v0.1.10 h1:V+TkmwXNd6gySZqlVw468wMYEkmDwMSyvhkkpOfUw7w=
cwtch.im/tapir v0.1.10/go.mod h1:EuRYdVrwijeaGBQ4OijDDRHf7R2MDSypqHkSl5DxI34=
cwtch.im/tapir v0.1.11 h1:JLm1MIYq4VXKzhj68+P8OuVPllAU9U6G0DtUor2fbc4=
cwtch.im/tapir v0.1.11/go.mod h1:EuRYdVrwijeaGBQ4OijDDRHf7R2MDSypqHkSl5DxI34=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.4 h1:GWLMJ5jBSIC/gFXzdbbeVz7fIAn2FTgW8+wBci6/3Ek=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.4/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.6 h1:5o4K2qn3otEE1InC5v5CzU0yL7Wl7DhVp4s8H3K6mXY=

+ 23
- 0
model/group.go View File

@@ -112,11 +112,32 @@ func (g *Group) AddSentMessage(message *protocol.DecryptedGroupMessage, sig []by
Signature: sig,
PeerID: message.GetOnion(),
PreviousMessageSig: message.GetPreviousMessageSig(),
ReceivedByServer: false,
}
g.unacknowledgedMessages = append(g.unacknowledgedMessages, timelineMessage)
return timelineMessage
}

// ErrorSentMessage removes a sent message from the unacknowledged list and sets its error flag if found, otherwise returns false
func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
g.lock.Lock()
defer g.lock.Unlock()
var message *Message

// Delete the message from the unack'd buffer if it exists
for i, unAckedMessage := range g.unacknowledgedMessages {
if compareSignatures(unAckedMessage.Signature, sig) {
message = &unAckedMessage
g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...)

message.Error = error
g.Timeline.Insert(message)
return true
}
}
return false
}

// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
func (g *Group) AddMessage(message *protocol.DecryptedGroupMessage, sig []byte) (*Message, bool) {

@@ -138,6 +159,8 @@ func (g *Group) AddMessage(message *protocol.DecryptedGroupMessage, sig []byte)
Signature: sig,
PeerID: message.GetOnion(),
PreviousMessageSig: message.GetPreviousMessageSig(),
ReceivedByServer: true,
Error: "",
}
seen := g.Timeline.Insert(timelineMessage)


+ 3
- 0
model/message.go View File

@@ -23,6 +23,9 @@ type Message struct {
Message string
Signature []byte
PreviousMessageSig []byte
ReceivedByServer bool // messages sent to a server
Acknowledged bool // peer to peer
Error string `json:",omitempty"`
}

// MessageBaseSize is a rough estimate of the base number of bytes the struct uses before strings are populated

+ 80
- 18
model/profile.go View File

@@ -19,16 +19,17 @@ import (

// PublicProfile is a local copy of a CwtchIdentity
type PublicProfile struct {
Name string
Ed25519PublicKey ed25519.PublicKey
Trusted bool
Blocked bool
Onion string
Attributes map[string]string
//Timeline Timeline `json:"-"` // TODO: cache recent messages for client
LocalID string // used by storage engine
State string `json:"-"`
lock sync.Mutex
Name string
Ed25519PublicKey ed25519.PublicKey
Trusted bool
Blocked bool
Onion string
Attributes map[string]string
Timeline Timeline `json:"-"`
LocalID string // used by storage engine
State string `json:"-"`
lock sync.Mutex
unacknowledgedMessages map[string]Message
}

// Profile encapsulates all the attributes necessary to be a Cwtch Peer.
@@ -53,6 +54,7 @@ func (p *PublicProfile) init() {
if p.Attributes == nil {
p.Attributes = make(map[string]string)
}
p.unacknowledgedMessages = make(map[string]Message)
p.LocalID = generateRandomID()
}

@@ -119,25 +121,84 @@ func (p *Profile) RejectInvite(groupID string) {
p.lock.Unlock()
}

/*
// AddSentMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile.
func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt string, sent time.Time, eventID string) *Message {
p.lock.Lock()
defer p.lock.Unlock()

contact, ok := p.Contacts[onion]
if ok {
now := time.Now()
sig := p.SignMessage(onion + messageTxt + sent.String() + now.String())

message := &Message{PeerID: p.Onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: false}
if contact.unacknowledgedMessages == nil {
contact.unacknowledgedMessages = make(map[string]Message)
}
contact.unacknowledgedMessages[eventID] = *message
return message
}
return nil
}

// AddMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile.
func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message string, sent time.Time) {
func (p *Profile) AddMessageToContactTimeline(onion string, messageTxt string, sent time.Time) (message *Message) {
p.lock.Lock()
defer p.lock.Unlock()
contact, ok := p.Contacts[onion]

// We don't really need a Signature here, but we use it to maintain order
now := time.Now()
sig := p.SignMessage(onion + message + sent.String() + now.String())
sig := p.SignMessage(onion + messageTxt + sent.String() + now.String())
if ok {
if fromMe {
contact.Timeline.Insert(&Message{PeerID: p.Onion, Message: message, Timestamp: sent, Received: now, Signature: sig})
} else {
contact.Timeline.Insert(&Message{PeerID: onion, Message: message, Timestamp: sent, Received: now, Signature: sig})
message = &Message{PeerID: onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: true}
contact.Timeline.Insert(message)
}
return
}

// ErrorSentMessageToPeer sets a sent message's error message and removes it from the unacknowledged list
func (p *Profile) ErrorSentMessageToPeer(onion string, eventID string, error string) {
p.lock.Lock()
defer p.lock.Unlock()

contact, ok := p.Contacts[onion]
if ok {
message, ok := contact.unacknowledgedMessages[eventID]
if ok {
message.Error = error
contact.Timeline.Insert(&message) // TODO: do we want a non timeline.Insert way to handle errors
delete(contact.unacknowledgedMessages, eventID)
}
}
}

// AckSentMessageToPeer sets mesage to a peer as acknowledged
func (p *Profile) AckSentMessageToPeer(onion string, eventID string) {
p.lock.Lock()
defer p.lock.Unlock()

contact, ok := p.Contacts[onion]
if ok {
message, ok := contact.unacknowledgedMessages[eventID]
if ok {
message.Acknowledged = true
contact.Timeline.Insert(&message)
delete(contact.unacknowledgedMessages, eventID)
}
}
}

// AddGroupSentMessageError searches matching groups for the message by sig and marks it as an error
func (p *Profile) AddGroupSentMessageError(groupServer string, signature string, error string) {
for _, group := range p.Groups {
if group.GroupServer == groupServer {
if group.ErrorSentMessage([]byte(signature), error) {
break
}
}
}
}
*/

// AcceptInvite accepts a group invite
func (p *Profile) AcceptInvite(groupID string) (err error) {
@@ -335,6 +396,7 @@ func (p *Profile) AddGroup(group *Group) {
}

// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
// If successful, adds the message to the group's timeline
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, bool) {
for _, group := range p.Groups {
success, dgm := group.DecryptMessage(ciphertext)

+ 21
- 2
peer/cwtch_peer.go View File

@@ -14,7 +14,9 @@ import (
"time"
)

var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true}
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true}

// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct {
@@ -90,7 +92,7 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) {
go cp.eventHandler()

cp.eventBus = eventBus
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage})
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.PeerError, event.SendMessageToGroupError})
}

// AutoHandleEvents sets an event (if able) to be handled by this peer
@@ -262,6 +264,9 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
cp.eventBus.Publish(event)

cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID)

return event.EventID
}

@@ -346,11 +351,25 @@ func (cp *cwtchPeer) eventHandler() {
/***** Default auto handled events *****/

case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline
ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
if ok && !seen {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
}

case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)

case event.PeerAcknowledgement:
cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])

case event.SendMessageToGroupError:
cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error])

case event.SendMessageToPeerError:
cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])

/***** Non default but requestable handlable events *****/

case event.NewGroupInvite:

+ 1
- 1
protocol/connections/engine.go View File

@@ -142,7 +142,7 @@ func (e *engine) eventHandler() {
}
err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
}
case event.UnblockPeer:
// We simply remove the peer from our blocklist

+ 0
- 1
protocol/connections/peerapp.go View File

@@ -78,7 +78,6 @@ func (pa PeerApp) listen() {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.Context, peerMessage.Data)

// Acknowledge the message
// TODO Should this be in the ui?
pa.SendMessage(PeerMessage{peerMessage.ID, event.ContextAck, []byte{}})
}
} else {

+ 6
- 0
storage/stream_store.go View File

@@ -129,6 +129,12 @@ func (ss *streamStore) Read() (messages []model.Message) {
resp = append(resp, msgs...)
}

// 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
resp[i].Acknowledged = true
resp[i].ReceivedByServer = true
}

return resp
}


Loading…
Cancel
Save