Negotiate Lower Bandwidth / Higher Density Packets for Peers

This commit is contained in:
Sarah Jamie Lewis 2022-01-25 12:24:39 -08:00
parent c6805149fa
commit 6bb510e39e
5 changed files with 91 additions and 6 deletions

View File

@ -300,6 +300,7 @@ const (
ContextInvite = "im.cwtch.invite"
ContextRaw = "im.cwtch.raw"
ContextGetVal = "im.cwtch.getVal"
ContextVersion = "im.cwtch.version"
ContextRetVal = "im.cwtch.retVal"
ContextRequestManifest = "im.cwtch.file.request.manifest"
ContextSendManifest = "im.cwtch.file.send.manifest"

View File

@ -221,6 +221,7 @@ func (e *engine) eventHandler() {
key := ev.Data[event.FileKey]
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil {
log.Errorf("error sending manifest: %v", err)
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.ManifestSaved:

View File

@ -1,11 +1,15 @@
package connections
import (
"bytes"
"cwtch.im/cwtch/event"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
"errors"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log"
"sync/atomic"
)
const cwtchCapability = tapir.Capability("cwtchCapability")
@ -21,6 +25,7 @@ type PeerApp struct {
OnAuth func(string)
OnClose func(string)
OnConnecting func(string)
version atomic.Value
}
type peerGetVal struct {
@ -42,6 +47,7 @@ func (pa *PeerApp) NewInstance() tapir.Application {
newApp.OnAuth = pa.OnAuth
newApp.OnClose = pa.OnClose
newApp.OnConnecting = pa.OnConnecting
newApp.version.Store(0x01)
return newApp
}
@ -59,6 +65,20 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
pa.connection.Close()
pa.OnClose(connection.Hostname())
} else {
// we are authenticated
// attempt to negotiate a more efficient packet format...
// we are abusing the context here slightly by sending a "malformed" GetVal request.
// as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this
// message.
// version *must* be the first message sent to prevent race conditions for aut events fired on auth
// we send the message before we update the rest of the system
pa.SendMessage(model2.PeerMessage{
ID: event.ContextVersion,
Context: event.ContextGetVal,
Data: []byte{0x02},
})
pa.OnAuth(connection.Hostname())
go pa.listen()
}
@ -76,11 +96,64 @@ func (pa *PeerApp) listen() {
pa.OnClose(pa.connection.Hostname())
return
}
var peerMessage model2.PeerMessage
err := json.Unmarshal(message, &peerMessage)
var packet model2.PeerMessage
var err error
if pa.version.Load() == 0x01 {
err = json.Unmarshal(message, &packet)
} else if pa.version.Load() == 0x02 {
// assume the encoding is invalid
err = errors.New("invalid message")
// find the identifier prefix
idTerminator := bytes.IndexByte(message, '|')
if idTerminator != -1 && idTerminator+1 < len(message) {
// find the context terminator prefix
contextbegin := idTerminator + 1
contextTerminator := bytes.IndexByte(message[contextbegin:], '|')
if contextTerminator != -1 {
err = nil
// check that we have data
dataBegin := contextbegin + contextTerminator + 1
var data []byte
if dataBegin < len(message) {
data = message[dataBegin:]
}
// compile the message
packet = model2.PeerMessage{
ID: string(message[0:idTerminator]),
Context: string(message[contextbegin : contextbegin+contextTerminator]),
Data: data,
}
}
}
// if all else fails...attempt to process this message as a version 1 message
if err != nil {
err = json.Unmarshal(message, &packet)
}
} else {
log.Errorf("invalid version")
pa.OnClose(pa.connection.Hostname())
return
}
if err == nil {
if pa.IsAllowed(pa.connection.Hostname()) {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
// we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we
// can remove this check all together)
if packet.ID == event.ContextVersion {
if pa.version.Load() == 0x01 && len(packet.Data) == 1 && packet.Data[0] == 0x02 {
log.Debugf("switching to 2")
pa.version.Store(0x02)
}
} else {
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data))
}
}
} else {
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
@ -91,7 +164,17 @@ func (pa *PeerApp) listen() {
// SendMessage sends the peer a preformatted message
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
serialized, err := json.Marshal(message)
var serialized []byte
var err error
if pa.version.Load() == 0x02 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = append(append([]byte(message.ID+"|"), []byte(message.Context+"|")...), message.Data...)
} else {
serialized, err = json.Marshal(message)
}
if err == nil {
return pa.connection.Send(serialized)
}

View File

@ -4,5 +4,5 @@ package model
type PeerMessage struct {
ID string // A unique Message ID (primarily used for acknowledgments)
Context string // A unique context identifier i.e. im.cwtch.chat
Data []byte // The serialized data packet.
Data []byte // A data packet.
}

View File

@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelDebug)
log.SetLevel(log.LevelInfo)
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")