Introduce Channel/Overlay Mappings #549

Merged
sarah merged 4 commits from overlays into master 2024-02-11 23:11:00 +00:00
6 changed files with 102 additions and 15 deletions

3
.gitignore vendored
View File

@ -33,4 +33,5 @@ data-dir-cwtchtool/
tokens
tordir/
testing/autodownload/download_dir
testing/autodownload/storage
testing/autodownload/storage
*.swp

View File

@ -3,6 +3,7 @@ package model
import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
)
// CalculateContentHash derives a hash using the author and the message body. It is intended to be
@ -12,3 +13,13 @@ func CalculateContentHash(author string, messageBody string) string {
contentBasedHash := sha256.Sum256(content)
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
}
func DeserializeMessage(message string) (*MessageWrapper, error) {
var cm MessageWrapper
err := json.Unmarshal([]byte(message), &cm)
if err != nil {
return nil, err
}
return &cm, err
}

View File

@ -1,9 +1,40 @@
package model
import (
"time"
)
// MessageWrapper is the canonical Cwtch overlay wrapper
type MessageWrapper struct {
Overlay int `json:"o"`
Data string `json:"d"`
// when the data was assembled
SendTime *time.Time `json:"s,omitempty"`
sarah marked this conversation as resolved Outdated
Outdated
Review

bad news

https://stackoverflow.com/questions/32643815/json-omitempty-with-time-time-field

The omitempty tag option does not work with time.Time as it is a struct. There is a "zero" value for structs, but that is a struct value where all fields have their zero values. This is a "valid" value, so it is not treated as "empty".

But by simply changing it to a pointer: *time.Time, it will work 

confirmed with a quick test

package main

import (
        "time"
        "encoding/json"
        "fmt"
)

type TimeDemo struct {
        Date1 time.Time `json:"s,omitempty"`
        Date2 time.Time `json:"t,omitempty"`
        Date3 time.Time `json:"r,omitempty"`
}


func main() {
        dates := TimeDemo{ Date1: time.Now().UTC() }

        datesJson, _ := json.Marshal(dates)

        fmt.Printf("%v => %v\n", dates, string(datesJson))

        var dates2 TimeDemo 
        json.Unmarshal(datesJson, &dates2)

        fmt.Printf("=> %v\n", dates2)

}

and

> ./timedemo
{2024-02-11 21:44:07.01793423 +0000 UTC 0001-01-01 00:00:00 +0000 UTC 0001-01-01 00:00:00 +0000 UTC} => {"s":"2024-02-11T21:44:07.01793423Z","t":"0001-01-01T00:00:00Z","r":"0001-01-01T00:00:00Z"}
=> {2024-02-11 21:44:07.01793423 +0000 UTC 0001-01-01 00:00:00 +0000 UTC 0001-01-01 00:00:00 +0000 UTC}

we don't want to be padding our messages with 3 time stamps one only filled on receipt

bad news https://stackoverflow.com/questions/32643815/json-omitempty-with-time-time-field ``` The omitempty tag option does not work with time.Time as it is a struct. There is a "zero" value for structs, but that is a struct value where all fields have their zero values. This is a "valid" value, so it is not treated as "empty". But by simply changing it to a pointer: *time.Time, it will work ``` confirmed with a quick test ```go package main import ( "time" "encoding/json" "fmt" ) type TimeDemo struct { Date1 time.Time `json:"s,omitempty"` Date2 time.Time `json:"t,omitempty"` Date3 time.Time `json:"r,omitempty"` } func main() { dates := TimeDemo{ Date1: time.Now().UTC() } datesJson, _ := json.Marshal(dates) fmt.Printf("%v => %v\n", dates, string(datesJson)) var dates2 TimeDemo json.Unmarshal(datesJson, &dates2) fmt.Printf("=> %v\n", dates2) } ``` and ``` > ./timedemo {2024-02-11 21:44:07.01793423 +0000 UTC 0001-01-01 00:00:00 +0000 UTC 0001-01-01 00:00:00 +0000 UTC} => {"s":"2024-02-11T21:44:07.01793423Z","t":"0001-01-01T00:00:00Z","r":"0001-01-01T00:00:00Z"} => {2024-02-11 21:44:07.01793423 +0000 UTC 0001-01-01 00:00:00 +0000 UTC 0001-01-01 00:00:00 +0000 UTC} ``` we don't want to be padding our messages with 3 time stamps one only filled on receipt
// when the data was transmitted (by protocol engine e.g. over Tor)
TransitTime *time.Time `json:"t,omitempty"`
// when the data was received
RecvTime *time.Time `json:"r,omitempty"`
}
// Channel is defined as being the last 3 bits of the overlay id
// Channel 0 is reserved for the main conversation
// Channel 2 is reserved for conversation admin (managed groups)
// Channel 7 is reserved for streams (no ack, no store)
func (mw MessageWrapper) Channel() int {
if mw.Overlay > 1024 {
return mw.Overlay & 0x07
}
// for backward compatibilty all overlays less than 0x400 i.e. 1024 are
// mapped to channel 0 regardless of their channel status.
return 0
}
// If Overlay is a Stream Message it should not be ackd, or stored.
func (mw MessageWrapper) IsStream() bool {
return mw.Channel() == 0x07
}
// OverlayChat is the canonical identifier for chat overlays

View File

@ -3,11 +3,20 @@ package peer
import (
"context"
"crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os"
path "path/filepath"
"sort"
@ -16,17 +25,7 @@ import (
"sync"
"time"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
@ -435,12 +434,19 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
if tor.IsValidHostname(conversationInfo.Handle) {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
id := -1
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
// check if we should store this message locally...
if cm, err := model.DeserializeMessage(message); err == nil {
if !cm.IsStream() {
dan marked this conversation as resolved Outdated
Outdated
Review

whats up with < 1024 and not 7?

your last pr had things partitioned along 0x300 i believe?

can we make these some constants

whats up with < 1024 and not 7? your last pr had things partitioned along 0x300 i believe? can we make these some constants
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
}
}
cp.eventBus.Publish(ev)
return id, nil
} else {
@ -1482,6 +1488,15 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
}
}
// Don't store messages in channel 7
if cm, err := model.DeserializeMessage(message); err == nil {
if cm.IsStream() {
return -1, nil
}
} else {
return -1, err
}
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))

View File

@ -749,6 +749,16 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
// Fall through handler for the default text conversation.
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
// Don't ack messages in channel 7
// Note: this code explictly doesn't care about malformed messages, we deal with them
// later on...we still want to ack the original send...(as some "malformed" messages
// may be future-ok)
if cm, err := model.DeserializeMessage(string(message)); err == nil {
if cm.IsStream() {
return
}
}
// Send an explicit acknowledgement
// Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {

View File

@ -2,12 +2,14 @@ package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log"
"sync/atomic"
"time"
)
const cwtchCapability = tapir.Capability("cwtchCapability")
@ -133,6 +135,14 @@ func (pa *PeerApp) listen() {
pa.version.Store(Version2)
}
} else {
if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil {
dan marked this conversation as resolved
Review

if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil && cm != nil {

and delete if below

`if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil && cm != nil {` and delete if below
if cm.TransitTime != nil {
rt := time.Now().UTC()
cm.RecvTime = &rt
data, _ := json.Marshal(cm)
packet.Data = data
}
}
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data)
}
}
@ -148,6 +158,15 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
var serialized []byte
var err error
if cm, err := model.DeserializeMessage(string(message.Data)); err == nil {
dan marked this conversation as resolved
Review

if cm, err := model.DeserializeMessage(string(message.Data)); err == nil && cm != nil {

then delete if below

`if cm, err := model.DeserializeMessage(string(message.Data)); err == nil && cm != nil { ` then delete if below
if cm.SendTime != nil {
tt := time.Now().UTC()
cm.TransitTime = &tt
data, _ := json.Marshal(cm)
message.Data = data
}
}
if pa.version.Load() == Version2 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = message.Serialize()