From 6bb510e39e6baa524044457102850898cd9b9939 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Jan 2022 12:24:39 -0800 Subject: [PATCH] Negotiate Lower Bandwidth / Higher Density Packets for Peers --- event/common.go | 1 + protocol/connections/engine.go | 1 + protocol/connections/peerapp.go | 91 ++++++++++++++++++- protocol/model/peermessage.go.go | 2 +- .../file_sharing_integration_test.go | 2 +- 5 files changed, 91 insertions(+), 6 deletions(-) diff --git a/event/common.go b/event/common.go index 9a0430f..83b83fe 100644 --- a/event/common.go +++ b/event/common.go @@ -300,6 +300,7 @@ const ( ContextInvite = "im.cwtch.invite" ContextRaw = "im.cwtch.raw" ContextGetVal = "im.cwtch.getVal" + ContextVersion = "im.cwtch.version" ContextRetVal = "im.cwtch.retVal" ContextRequestManifest = "im.cwtch.file.request.manifest" ContextSendManifest = "im.cwtch.file.send.manifest" diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 11af969..52d8f18 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -221,6 +221,7 @@ func (e *engine) eventHandler() { key := ev.Data[event.FileKey] size, _ := strconv.Atoi(ev.Data[event.ManifestSize]) if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil { + log.Errorf("error sending manifest: %v", err) e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.ManifestSaved: diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index 5a6734a..7d6492e 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -1,11 +1,15 @@ package connections import ( + "bytes" + "cwtch.im/cwtch/event" model2 "cwtch.im/cwtch/protocol/model" "encoding/json" + "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/openprivacy/log" + "sync/atomic" ) const cwtchCapability = tapir.Capability("cwtchCapability") @@ -21,6 +25,7 @@ type PeerApp struct { OnAuth func(string) OnClose func(string) OnConnecting func(string) + version atomic.Value } type peerGetVal struct { @@ -42,6 +47,7 @@ func (pa *PeerApp) NewInstance() tapir.Application { newApp.OnAuth = pa.OnAuth newApp.OnClose = pa.OnClose newApp.OnConnecting = pa.OnConnecting + newApp.version.Store(0x01) return newApp } @@ -59,6 +65,20 @@ func (pa *PeerApp) Init(connection tapir.Connection) { pa.connection.Close() pa.OnClose(connection.Hostname()) } else { + + // we are authenticated + // attempt to negotiate a more efficient packet format... + // we are abusing the context here slightly by sending a "malformed" GetVal request. + // as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this + // message. + // version *must* be the first message sent to prevent race conditions for aut events fired on auth + // we send the message before we update the rest of the system + pa.SendMessage(model2.PeerMessage{ + ID: event.ContextVersion, + Context: event.ContextGetVal, + Data: []byte{0x02}, + }) + pa.OnAuth(connection.Hostname()) go pa.listen() } @@ -76,11 +96,64 @@ func (pa *PeerApp) listen() { pa.OnClose(pa.connection.Hostname()) return } - var peerMessage model2.PeerMessage - err := json.Unmarshal(message, &peerMessage) + + var packet model2.PeerMessage + var err error + + if pa.version.Load() == 0x01 { + err = json.Unmarshal(message, &packet) + } else if pa.version.Load() == 0x02 { + + // assume the encoding is invalid + err = errors.New("invalid message") + + // find the identifier prefix + idTerminator := bytes.IndexByte(message, '|') + if idTerminator != -1 && idTerminator+1 < len(message) { + // find the context terminator prefix + contextbegin := idTerminator + 1 + contextTerminator := bytes.IndexByte(message[contextbegin:], '|') + if contextTerminator != -1 { + err = nil + + // check that we have data + dataBegin := contextbegin + contextTerminator + 1 + var data []byte + if dataBegin < len(message) { + data = message[dataBegin:] + } + + // compile the message + packet = model2.PeerMessage{ + ID: string(message[0:idTerminator]), + Context: string(message[contextbegin : contextbegin+contextTerminator]), + Data: data, + } + } + } + + // if all else fails...attempt to process this message as a version 1 message + if err != nil { + err = json.Unmarshal(message, &packet) + } + } else { + log.Errorf("invalid version") + pa.OnClose(pa.connection.Hostname()) + return + } + if err == nil { if pa.IsAllowed(pa.connection.Hostname()) { - pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data) + // we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we + // can remove this check all together) + if packet.ID == event.ContextVersion { + if pa.version.Load() == 0x01 && len(packet.Data) == 1 && packet.Data[0] == 0x02 { + log.Debugf("switching to 2") + pa.version.Store(0x02) + } + } else { + pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data)) + } } } else { log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err) @@ -91,7 +164,17 @@ func (pa *PeerApp) listen() { // SendMessage sends the peer a preformatted message // NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { - serialized, err := json.Marshal(message) + + var serialized []byte + var err error + + if pa.version.Load() == 0x02 { + // treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size) + serialized = append(append([]byte(message.ID+"|"), []byte(message.Context+"|")...), message.Data...) + } else { + serialized, err = json.Marshal(message) + } + if err == nil { return pa.connection.Send(serialized) } diff --git a/protocol/model/peermessage.go.go b/protocol/model/peermessage.go.go index 6d3bfa2..58be2ed 100644 --- a/protocol/model/peermessage.go.go +++ b/protocol/model/peermessage.go.go @@ -4,5 +4,5 @@ package model type PeerMessage struct { ID string // A unique Message ID (primarily used for acknowledgments) Context string // A unique context identifier i.e. im.cwtch.chat - Data []byte // The serialized data packet. + Data []byte // A data packet. } diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index a7d3f2e..7eddc64 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) { os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - log.SetLevel(log.LevelDebug) + log.SetLevel(log.LevelInfo) os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor")