Introduce Channel/Overlay Mappings
continuous-integration/drone/pr Build is failing Details

- Map channel 7 to ephemeral / no ack
- Create model methods
- Introduce optional latency measurements into Cwtch
This commit is contained in:
Sarah Jamie Lewis 2024-02-11 12:14:07 -08:00
parent 3e0680943a
commit 792e79dceb
6 changed files with 82 additions and 15 deletions

3
.gitignore vendored
View File

@ -33,4 +33,5 @@ data-dir-cwtchtool/
tokens
tordir/
testing/autodownload/download_dir
testing/autodownload/storage
testing/autodownload/storage
*.swp

View File

@ -3,6 +3,7 @@ package model
import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
)
// CalculateContentHash derives a hash using the author and the message body. It is intended to be
@ -12,3 +13,13 @@ func CalculateContentHash(author string, messageBody string) string {
contentBasedHash := sha256.Sum256(content)
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
}
func DeserializeMessage(message string) (*MessageWrapper, error) {
var cm MessageWrapper
err := json.Unmarshal([]byte(message), &cm)
if err != nil {
return nil, err
}
return &cm, err
}

View File

@ -1,9 +1,22 @@
package model
import (
"time"
)
// MessageWrapper is the canonical Cwtch overlay wrapper
type MessageWrapper struct {
Overlay int `json:"o"`
Data string `json:"d"`
// when the data was assembled
SendTime time.Time `json:"s,omitempty"`
// when the data was transmitted (by protocol engine e.g. over Tor)
TransitTime time.Time `json:"t,omitempty"`
// when the data was received
RecvTime time.Time `json:"r,omitempty"`
}
// OverlayChat is the canonical identifier for chat overlays

View File

@ -3,11 +3,20 @@ package peer
import (
"context"
"crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os"
path "path/filepath"
"sort"
@ -16,17 +25,7 @@ import (
"sync"
"time"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
@ -435,12 +434,19 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
if tor.IsValidHostname(conversationInfo.Handle) {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
id := -1
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
// check if we should store this message locally...
if cm, err := model.DeserializeMessage(message); err == nil {
if cm.Overlay < 1024 || cm.Overlay&0x7 != 0x7 {
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
}
}
cp.eventBus.Publish(ev)
return id, nil
} else {
@ -1482,6 +1488,15 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
}
}
// Don't store messages in channel 7
if cm, err := model.DeserializeMessage(message); err == nil {
if cm.Overlay > 1024 && cm.Overlay&0x7 == 0x7 {
return -1, nil
}
} else {
return -1, err
}
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))

View File

@ -749,6 +749,16 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
// Fall through handler for the default text conversation.
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
// Don't ack messages in channel 7
// Note: this code explictly doesn't care about malformed messages, we deal with them
// later on...we still want to ack the original send...(as some "malformed" messages
// may be future-ok)
if cm, err := model.DeserializeMessage(string(message)); err == nil {
if cm.Overlay > 1024 && cm.Overlay&0x7 == 0x7 {
return
}
}
// Send an explicit acknowledgement
// Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {

View File

@ -2,12 +2,14 @@ package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log"
"sync/atomic"
"time"
)
const cwtchCapability = tapir.Capability("cwtchCapability")
@ -133,6 +135,13 @@ func (pa *PeerApp) listen() {
pa.version.Store(Version2)
}
} else {
if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil {
if !cm.TransitTime.IsZero() {
cm.RecvTime = time.Now().UTC()
data, _ := json.Marshal(cm)
packet.Data = data
}
}
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data)
}
}
@ -148,6 +157,14 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
var serialized []byte
var err error
if cm, err := model.DeserializeMessage(string(message.Data)); err == nil {
if !cm.SendTime.IsZero() {
cm.TransitTime = time.Now().UTC()
data, _ := json.Marshal(cm)
message.Data = data
}
}
if pa.version.Load() == Version2 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = message.Serialize()