Merge pull request 'Introduce Channel/Overlay Mappings' (#549) from overlays into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #549
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2024-02-11 23:10:59 +00:00
commit 0907af57d5
6 changed files with 102 additions and 15 deletions

3
.gitignore vendored
View File

@ -33,4 +33,5 @@ data-dir-cwtchtool/
tokens
tordir/
testing/autodownload/download_dir
testing/autodownload/storage
testing/autodownload/storage
*.swp

View File

@ -3,6 +3,7 @@ package model
import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
)
// CalculateContentHash derives a hash using the author and the message body. It is intended to be
@ -12,3 +13,13 @@ func CalculateContentHash(author string, messageBody string) string {
contentBasedHash := sha256.Sum256(content)
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
}
func DeserializeMessage(message string) (*MessageWrapper, error) {
var cm MessageWrapper
err := json.Unmarshal([]byte(message), &cm)
if err != nil {
return nil, err
}
return &cm, err
}

View File

@ -1,9 +1,40 @@
package model
import (
"time"
)
// MessageWrapper is the canonical Cwtch overlay wrapper
type MessageWrapper struct {
Overlay int `json:"o"`
Data string `json:"d"`
// when the data was assembled
SendTime *time.Time `json:"s,omitempty"`
// when the data was transmitted (by protocol engine e.g. over Tor)
TransitTime *time.Time `json:"t,omitempty"`
// when the data was received
RecvTime *time.Time `json:"r,omitempty"`
}
// Channel is defined as being the last 3 bits of the overlay id
// Channel 0 is reserved for the main conversation
// Channel 2 is reserved for conversation admin (managed groups)
// Channel 7 is reserved for streams (no ack, no store)
func (mw MessageWrapper) Channel() int {
if mw.Overlay > 1024 {
return mw.Overlay & 0x07
}
// for backward compatibilty all overlays less than 0x400 i.e. 1024 are
// mapped to channel 0 regardless of their channel status.
return 0
}
// If Overlay is a Stream Message it should not be ackd, or stored.
func (mw MessageWrapper) IsStream() bool {
return mw.Channel() == 0x07
}
// OverlayChat is the canonical identifier for chat overlays

View File

@ -3,11 +3,20 @@ package peer
import (
"context"
"crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os"
path "path/filepath"
"sort"
@ -16,17 +25,7 @@ import (
"sync"
"time"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
@ -435,12 +434,19 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
if tor.IsValidHostname(conversationInfo.Handle) {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
id := -1
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
// check if we should store this message locally...
if cm, err := model.DeserializeMessage(message); err == nil {
if !cm.IsStream() {
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
}
}
cp.eventBus.Publish(ev)
return id, nil
} else {
@ -1482,6 +1488,15 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
}
}
// Don't store messages in channel 7
if cm, err := model.DeserializeMessage(message); err == nil {
if cm.IsStream() {
return -1, nil
}
} else {
return -1, err
}
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))

View File

@ -749,6 +749,16 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
// Fall through handler for the default text conversation.
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
// Don't ack messages in channel 7
// Note: this code explictly doesn't care about malformed messages, we deal with them
// later on...we still want to ack the original send...(as some "malformed" messages
// may be future-ok)
if cm, err := model.DeserializeMessage(string(message)); err == nil {
if cm.IsStream() {
return
}
}
// Send an explicit acknowledgement
// Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {

View File

@ -2,12 +2,14 @@ package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log"
"sync/atomic"
"time"
)
const cwtchCapability = tapir.Capability("cwtchCapability")
@ -133,6 +135,14 @@ func (pa *PeerApp) listen() {
pa.version.Store(Version2)
}
} else {
if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil {
if cm.TransitTime != nil {
rt := time.Now().UTC()
cm.RecvTime = &rt
data, _ := json.Marshal(cm)
packet.Data = data
}
}
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data)
}
}
@ -148,6 +158,15 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
var serialized []byte
var err error
if cm, err := model.DeserializeMessage(string(message.Data)); err == nil {
if cm.SendTime != nil {
tt := time.Now().UTC()
cm.TransitTime = &tt
data, _ := json.Marshal(cm)
message.Data = data
}
}
if pa.version.Load() == Version2 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = message.Serialize()