From 792e79dceb382e2f1772fdacdfec25664a4a5862 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sun, 11 Feb 2024 12:14:07 -0800 Subject: [PATCH] Introduce Channel/Overlay Mappings - Map channel 7 to ephemeral / no ack - Create model methods - Introduce optional latency measurements into Cwtch --- .gitignore | 3 ++- model/message_utils.go | 11 +++++++++ model/overlay.go | 13 ++++++++++ peer/cwtch_peer.go | 43 ++++++++++++++++++++++----------- protocol/connections/engine.go | 10 ++++++++ protocol/connections/peerapp.go | 17 +++++++++++++ 6 files changed, 82 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index 7e47152..24ffb60 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,5 @@ data-dir-cwtchtool/ tokens tordir/ testing/autodownload/download_dir -testing/autodownload/storage \ No newline at end of file +testing/autodownload/storage +*.swp diff --git a/model/message_utils.go b/model/message_utils.go index ba57ce0..42fa343 100644 --- a/model/message_utils.go +++ b/model/message_utils.go @@ -3,6 +3,7 @@ package model import ( "crypto/sha256" "encoding/base64" + "encoding/json" ) // CalculateContentHash derives a hash using the author and the message body. It is intended to be @@ -12,3 +13,13 @@ func CalculateContentHash(author string, messageBody string) string { contentBasedHash := sha256.Sum256(content) return base64.StdEncoding.EncodeToString(contentBasedHash[:]) } + +func DeserializeMessage(message string) (*MessageWrapper, error) { + var cm MessageWrapper + err := json.Unmarshal([]byte(message), &cm) + + if err != nil { + return nil, err + } + return &cm, err +} diff --git a/model/overlay.go b/model/overlay.go index 4149798..4500445 100644 --- a/model/overlay.go +++ b/model/overlay.go @@ -1,9 +1,22 @@ package model +import ( + "time" +) + // MessageWrapper is the canonical Cwtch overlay wrapper type MessageWrapper struct { Overlay int `json:"o"` Data string `json:"d"` + + // when the data was assembled + SendTime time.Time `json:"s,omitempty"` + + // when the data was transmitted (by protocol engine e.g. over Tor) + TransitTime time.Time `json:"t,omitempty"` + + // when the data was received + RecvTime time.Time `json:"r,omitempty"` } // OverlayChat is the canonical identifier for chat overlays diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index d378dd4..116b1a1 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -3,11 +3,20 @@ package peer import ( "context" "crypto/rand" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/settings" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" + "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" + "git.openprivacy.ca/openprivacy/connectivity" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "golang.org/x/crypto/ed25519" "os" path "path/filepath" "sort" @@ -16,17 +25,7 @@ import ( "sync" "time" - "cwtch.im/cwtch/model/constants" - "cwtch.im/cwtch/protocol/groups" - "cwtch.im/cwtch/settings" - "git.openprivacy.ca/cwtch.im/tapir/primitives" - "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" - "git.openprivacy.ca/openprivacy/connectivity" - "git.openprivacy.ca/openprivacy/connectivity/tor" - "golang.org/x/crypto/ed25519" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" "git.openprivacy.ca/openprivacy/log" @@ -435,12 +434,19 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) if tor.IsValidHostname(conversationInfo.Handle) { ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message}) onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) + id := -1 - // For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks - id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message)) - if err != nil { - return -1, err + // check if we should store this message locally... + if cm, err := model.DeserializeMessage(message); err == nil { + if cm.Overlay < 1024 || cm.Overlay&0x7 != 0x7 { + // For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks + id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message)) + if err != nil { + return -1, err + } + } } + cp.eventBus.Publish(ev) return id, nil } else { @@ -1482,6 +1488,15 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) } } + // Don't store messages in channel 7 + if cm, err := model.DeserializeMessage(message); err == nil { + if cm.Overlay > 1024 && cm.Overlay&0x7 == 0x7 { + return -1, nil + } + } else { + return -1, err + } + // Generate a random number and use it as the signature signature := event.GetRandNumber().String() return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message)) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 2d58238..c696e50 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -749,6 +749,16 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri // Fall through handler for the default text conversation. e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) + // Don't ack messages in channel 7 + // Note: this code explictly doesn't care about malformed messages, we deal with them + // later on...we still want to ack the original send...(as some "malformed" messages + // may be future-ok) + if cm, err := model.DeserializeMessage(string(message)); err == nil { + if cm.Overlay > 1024 && cm.Overlay&0x7 == 0x7 { + return + } + } + // Send an explicit acknowledgement // Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil { diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index e2ac4d6..89f82c0 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -2,12 +2,14 @@ package connections import ( "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" model2 "cwtch.im/cwtch/protocol/model" "encoding/json" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/openprivacy/log" "sync/atomic" + "time" ) const cwtchCapability = tapir.Capability("cwtchCapability") @@ -133,6 +135,13 @@ func (pa *PeerApp) listen() { pa.version.Store(Version2) } } else { + if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil { + if !cm.TransitTime.IsZero() { + cm.RecvTime = time.Now().UTC() + data, _ := json.Marshal(cm) + packet.Data = data + } + } pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data) } } @@ -148,6 +157,14 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { var serialized []byte var err error + if cm, err := model.DeserializeMessage(string(message.Data)); err == nil { + if !cm.SendTime.IsZero() { + cm.TransitTime = time.Now().UTC() + data, _ := json.Marshal(cm) + message.Data = data + } + } + if pa.version.Load() == Version2 { // treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size) serialized = message.Serialize()