Negotiate Lower Bandwidth / Higher Density Packets for Peers #428

Merged
erinn merged 5 commits from fastercwtch into master 2022-01-26 20:02:50 +00:00
5 changed files with 114 additions and 7 deletions

View File

@ -300,6 +300,7 @@ const (
ContextInvite = "im.cwtch.invite"
ContextRaw = "im.cwtch.raw"
ContextGetVal = "im.cwtch.getVal"
ContextVersion = "im.cwtch.version"
ContextRetVal = "im.cwtch.retVal"
ContextRequestManifest = "im.cwtch.file.request.manifest"
ContextSendManifest = "im.cwtch.file.send.manifest"

View File

@ -221,6 +221,7 @@ func (e *engine) eventHandler() {
key := ev.Data[event.FileKey]
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil {
log.Errorf("error sending manifest: %v", err)
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.ManifestSaved:

View File

@ -1,11 +1,13 @@
package connections
import (
"cwtch.im/cwtch/event"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log"
"sync/atomic"
)
const cwtchCapability = tapir.Capability("cwtchCapability")
@ -21,6 +23,7 @@ type PeerApp struct {
OnAuth func(string)
OnClose func(string)
OnConnecting func(string)
version atomic.Value
}
type peerGetVal struct {
@ -32,6 +35,9 @@ type peerRetVal struct {
Exists bool
}
const Version1 = 0x01
const Version2 = 0x02
// NewInstance should always return a new instantiation of the application.
func (pa *PeerApp) NewInstance() tapir.Application {
newApp := new(PeerApp)
@ -42,6 +48,7 @@ func (pa *PeerApp) NewInstance() tapir.Application {
newApp.OnAuth = pa.OnAuth
newApp.OnClose = pa.OnClose
newApp.OnConnecting = pa.OnConnecting
sarah marked this conversation as resolved Outdated
Outdated
Review

constant?

constant?
newApp.version.Store(Version1)
sarah marked this conversation as resolved Outdated
Outdated
Review

Store(Version1)

Store(Version1)
return newApp
}
@ -59,6 +66,21 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
pa.connection.Close()
pa.OnClose(connection.Hostname())
} else {
// we are authenticated
// attempt to negotiate a more efficient packet format...
// we are abusing the context here slightly by sending a "malformed" GetVal request.
// as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this
// message.
// version *must* be the first message sent to prevent race conditions for other events fired after-auth
sarah marked this conversation as resolved Outdated
Outdated
Review

for 'aut' events?

for 'aut' events?
// (e.g. getVal requests)
// as such, we send this message before we update the rest of the system
pa.SendMessage(model2.PeerMessage{
ID: event.ContextVersion,
sarah marked this conversation as resolved Outdated
Outdated
Review

constant?

constant?
Context: event.ContextGetVal,
sarah marked this conversation as resolved Outdated
Outdated
Review

version2?

version2?
Data: []byte{Version2},
})
pa.OnAuth(connection.Hostname())
go pa.listen()
}
@ -76,11 +98,39 @@ func (pa *PeerApp) listen() {
pa.OnClose(pa.connection.Hostname())
return
}
var peerMessage model2.PeerMessage
err := json.Unmarshal(message, &peerMessage)
var packet model2.PeerMessage
var err error
if pa.version.Load() == Version1 {
err = json.Unmarshal(message, &packet)
} else if pa.version.Load() == Version2 {
sarah marked this conversation as resolved Outdated
Outdated
Review

this is a big chunk, might be best moved to a function?

this is a big chunk, might be best moved to a function?
parsePacket, parseErr := model2.ParsePeerMessage(message)
// if all else fails...attempt to process this message as a version 1 message
if parseErr != nil {
err = json.Unmarshal(message, &packet)
sarah marked this conversation as resolved Outdated
Outdated
Review

I'm a little nervous mixing unspecified Ids, event IDs are strings, even tho they should be populated by int -> strings only? its not type specified anywhere so someone could make it binary or strings and either could generate a '|', contexts are strings that right now use '.' as seperators...
we have a lot of string manipulation based on not well documented seperators and we're mixing it with binary data.

could we use Pascal style strings here? neither id nor context should exceed 256 bytes so just have each lead with a byte of length to parse?

I'm a little nervous mixing unspecified Ids, event IDs are strings, even tho they should be populated by int -> strings only? its not type specified anywhere so someone could make it binary or strings and either could generate a '|', contexts are strings that right now use '.' as seperators... we have a lot of string manipulation based on not well documented seperators and we're mixing it with binary data. could we use Pascal style strings here? neither id nor context should exceed 256 bytes so just have each lead with a byte of length to parse?
} else {
packet = *parsePacket
}
} else {
log.Errorf("invalid version")
pa.OnClose(pa.connection.Hostname())
return
}
if err == nil {
if pa.IsAllowed(pa.connection.Hostname()) {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
// we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we
// can remove this check all together)
if packet.ID == event.ContextVersion {
if pa.version.Load() == Version1 && len(packet.Data) == 1 && packet.Data[0] == Version2 {
log.Debugf("switching to protocol version 2")
pa.version.Store(Version2)
}
} else {
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data))
}
}
} else {
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
@ -91,7 +141,17 @@ func (pa *PeerApp) listen() {
// SendMessage sends the peer a preformatted message
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
serialized, err := json.Marshal(message)
var serialized []byte
var err error
if pa.version.Load() == Version2 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = message.Serialize()
} else {
sarah marked this conversation as resolved Outdated
Outdated
Review

switching what to 2 :P

switching what to 2 :P
serialized, err = json.Marshal(message)
}
if err == nil {
return pa.connection.Send(serialized)
}

View File

@ -1,8 +1,53 @@
package model
import (
"bytes"
"errors"
)
// PeerMessage is an encapsulation that can be used by higher level applications
type PeerMessage struct {
ID string // A unique Message ID (primarily used for acknowledgments)
// ID **must** only contain alphanumeric characters separated by period.
ID string // A unique Message ID (primarily used for acknowledgments)
// Context **must** only contain alphanumeric characters separated by period.
Context string // A unique context identifier i.e. im.cwtch.chat
Data []byte // The serialized data packet.
// Data can contain anything
Data []byte // A data packet.
}
// Serialize constructs an efficient serialized representation
// Format: [ID String] | [Context String] | Binary Data
func (m *PeerMessage) Serialize() []byte {
return append(append([]byte(m.ID+"|"), []byte(m.Context+"|")...), m.Data...)
}
// ParsePeerMessage returns either a deserialized PeerMessage or an error if it is malformed
func ParsePeerMessage(message []byte) (*PeerMessage, error) {
// find the identifier prefix
idTerminator := bytes.IndexByte(message, '|')
if idTerminator != -1 && idTerminator+1 < len(message) {
// find the context terminator prefix
contextbegin := idTerminator + 1
contextTerminator := bytes.IndexByte(message[contextbegin:], '|')
if contextTerminator != -1 {
// check that we have data
dataBegin := contextbegin + contextTerminator + 1
var data []byte
if dataBegin < len(message) {
data = message[dataBegin:]
}
// compile the message
return &PeerMessage{
ID: string(message[0:idTerminator]),
Context: string(message[contextbegin : contextbegin+contextTerminator]),
Data: data,
}, nil
}
}
return nil, errors.New("invalid message")
}

View File

@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelDebug)
log.SetLevel(log.LevelInfo)
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")