From 6bb510e39e6baa524044457102850898cd9b9939 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Jan 2022 12:24:39 -0800 Subject: [PATCH 1/5] Negotiate Lower Bandwidth / Higher Density Packets for Peers --- event/common.go | 1 + protocol/connections/engine.go | 1 + protocol/connections/peerapp.go | 91 ++++++++++++++++++- protocol/model/peermessage.go.go | 2 +- .../file_sharing_integration_test.go | 2 +- 5 files changed, 91 insertions(+), 6 deletions(-) diff --git a/event/common.go b/event/common.go index 9a0430f..83b83fe 100644 --- a/event/common.go +++ b/event/common.go @@ -300,6 +300,7 @@ const ( ContextInvite = "im.cwtch.invite" ContextRaw = "im.cwtch.raw" ContextGetVal = "im.cwtch.getVal" + ContextVersion = "im.cwtch.version" ContextRetVal = "im.cwtch.retVal" ContextRequestManifest = "im.cwtch.file.request.manifest" ContextSendManifest = "im.cwtch.file.send.manifest" diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 11af969..52d8f18 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -221,6 +221,7 @@ func (e *engine) eventHandler() { key := ev.Data[event.FileKey] size, _ := strconv.Atoi(ev.Data[event.ManifestSize]) if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil { + log.Errorf("error sending manifest: %v", err) e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.ManifestSaved: diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index 5a6734a..7d6492e 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -1,11 +1,15 @@ package connections import ( + "bytes" + "cwtch.im/cwtch/event" model2 "cwtch.im/cwtch/protocol/model" "encoding/json" + "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/openprivacy/log" + "sync/atomic" ) const cwtchCapability = tapir.Capability("cwtchCapability") @@ -21,6 +25,7 @@ type PeerApp struct { OnAuth func(string) OnClose func(string) OnConnecting func(string) + version atomic.Value } type peerGetVal struct { @@ -42,6 +47,7 @@ func (pa *PeerApp) NewInstance() tapir.Application { newApp.OnAuth = pa.OnAuth newApp.OnClose = pa.OnClose newApp.OnConnecting = pa.OnConnecting + newApp.version.Store(0x01) return newApp } @@ -59,6 +65,20 @@ func (pa *PeerApp) Init(connection tapir.Connection) { pa.connection.Close() pa.OnClose(connection.Hostname()) } else { + + // we are authenticated + // attempt to negotiate a more efficient packet format... + // we are abusing the context here slightly by sending a "malformed" GetVal request. + // as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this + // message. + // version *must* be the first message sent to prevent race conditions for aut events fired on auth + // we send the message before we update the rest of the system + pa.SendMessage(model2.PeerMessage{ + ID: event.ContextVersion, + Context: event.ContextGetVal, + Data: []byte{0x02}, + }) + pa.OnAuth(connection.Hostname()) go pa.listen() } @@ -76,11 +96,64 @@ func (pa *PeerApp) listen() { pa.OnClose(pa.connection.Hostname()) return } - var peerMessage model2.PeerMessage - err := json.Unmarshal(message, &peerMessage) + + var packet model2.PeerMessage + var err error + + if pa.version.Load() == 0x01 { + err = json.Unmarshal(message, &packet) + } else if pa.version.Load() == 0x02 { + + // assume the encoding is invalid + err = errors.New("invalid message") + + // find the identifier prefix + idTerminator := bytes.IndexByte(message, '|') + if idTerminator != -1 && idTerminator+1 < len(message) { + // find the context terminator prefix + contextbegin := idTerminator + 1 + contextTerminator := bytes.IndexByte(message[contextbegin:], '|') + if contextTerminator != -1 { + err = nil + + // check that we have data + dataBegin := contextbegin + contextTerminator + 1 + var data []byte + if dataBegin < len(message) { + data = message[dataBegin:] + } + + // compile the message + packet = model2.PeerMessage{ + ID: string(message[0:idTerminator]), + Context: string(message[contextbegin : contextbegin+contextTerminator]), + Data: data, + } + } + } + + // if all else fails...attempt to process this message as a version 1 message + if err != nil { + err = json.Unmarshal(message, &packet) + } + } else { + log.Errorf("invalid version") + pa.OnClose(pa.connection.Hostname()) + return + } + if err == nil { if pa.IsAllowed(pa.connection.Hostname()) { - pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data) + // we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we + // can remove this check all together) + if packet.ID == event.ContextVersion { + if pa.version.Load() == 0x01 && len(packet.Data) == 1 && packet.Data[0] == 0x02 { + log.Debugf("switching to 2") + pa.version.Store(0x02) + } + } else { + pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data)) + } } } else { log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err) @@ -91,7 +164,17 @@ func (pa *PeerApp) listen() { // SendMessage sends the peer a preformatted message // NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { - serialized, err := json.Marshal(message) + + var serialized []byte + var err error + + if pa.version.Load() == 0x02 { + // treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size) + serialized = append(append([]byte(message.ID+"|"), []byte(message.Context+"|")...), message.Data...) + } else { + serialized, err = json.Marshal(message) + } + if err == nil { return pa.connection.Send(serialized) } diff --git a/protocol/model/peermessage.go.go b/protocol/model/peermessage.go.go index 6d3bfa2..58be2ed 100644 --- a/protocol/model/peermessage.go.go +++ b/protocol/model/peermessage.go.go @@ -4,5 +4,5 @@ package model type PeerMessage struct { ID string // A unique Message ID (primarily used for acknowledgments) Context string // A unique context identifier i.e. im.cwtch.chat - Data []byte // The serialized data packet. + Data []byte // A data packet. } diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index a7d3f2e..7eddc64 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) { os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - log.SetLevel(log.LevelDebug) + log.SetLevel(log.LevelInfo) os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") From ff4249e2bc4c05001f5f7d929bad10148ad99400 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Jan 2022 15:39:23 -0800 Subject: [PATCH 2/5] Factor out serialization/parsing code into protocol.Model --- protocol/connections/peerapp.go | 39 ++++---------------------- protocol/model/peermessage.go.go | 48 ++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 35 deletions(-) diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index 7d6492e..b4e3158 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -1,11 +1,9 @@ package connections import ( - "bytes" "cwtch.im/cwtch/event" model2 "cwtch.im/cwtch/protocol/model" "encoding/json" - "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/openprivacy/log" @@ -103,39 +101,14 @@ func (pa *PeerApp) listen() { if pa.version.Load() == 0x01 { err = json.Unmarshal(message, &packet) } else if pa.version.Load() == 0x02 { - - // assume the encoding is invalid - err = errors.New("invalid message") - - // find the identifier prefix - idTerminator := bytes.IndexByte(message, '|') - if idTerminator != -1 && idTerminator+1 < len(message) { - // find the context terminator prefix - contextbegin := idTerminator + 1 - contextTerminator := bytes.IndexByte(message[contextbegin:], '|') - if contextTerminator != -1 { - err = nil - - // check that we have data - dataBegin := contextbegin + contextTerminator + 1 - var data []byte - if dataBegin < len(message) { - data = message[dataBegin:] - } - - // compile the message - packet = model2.PeerMessage{ - ID: string(message[0:idTerminator]), - Context: string(message[contextbegin : contextbegin+contextTerminator]), - Data: data, - } - } - } - + parsePacket, parseErr := model2.ParsePeerMessage(message) // if all else fails...attempt to process this message as a version 1 message - if err != nil { + if parseErr != nil { err = json.Unmarshal(message, &packet) + } else { + packet = *parsePacket } + } else { log.Errorf("invalid version") pa.OnClose(pa.connection.Hostname()) @@ -170,7 +143,7 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { if pa.version.Load() == 0x02 { // treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size) - serialized = append(append([]byte(message.ID+"|"), []byte(message.Context+"|")...), message.Data...) + serialized = message.Serialize() } else { serialized, err = json.Marshal(message) } diff --git a/protocol/model/peermessage.go.go b/protocol/model/peermessage.go.go index 58be2ed..b62a41d 100644 --- a/protocol/model/peermessage.go.go +++ b/protocol/model/peermessage.go.go @@ -1,8 +1,52 @@ package model +import ( + "bytes" + "errors" +) + // PeerMessage is an encapsulation that can be used by higher level applications type PeerMessage struct { - ID string // A unique Message ID (primarily used for acknowledgments) + // ID **must** only contain alphanumeric characters separated by period. + ID string // A unique Message ID (primarily used for acknowledgments) + + // Context **must** only contain alphanumeric characters separated by period. Context string // A unique context identifier i.e. im.cwtch.chat - Data []byte // A data packet. + + // Data can contain anything + Data []byte // A data packet. +} + +// Serialize constructs an efficient serialized representation +func (m *PeerMessage) Serialize() []byte { + return append(append([]byte(m.ID+"|"), []byte(m.Context+"|")...), m.Data...) +} + +// ParsePeerMessage returns either a deserialized PeerMessage or an error if it is malformed +func ParsePeerMessage(message []byte) (*PeerMessage, error) { + + // find the identifier prefix + idTerminator := bytes.IndexByte(message, '|') + if idTerminator != -1 && idTerminator+1 < len(message) { + // find the context terminator prefix + contextbegin := idTerminator + 1 + contextTerminator := bytes.IndexByte(message[contextbegin:], '|') + if contextTerminator != -1 { + + // check that we have data + dataBegin := contextbegin + contextTerminator + 1 + var data []byte + if dataBegin < len(message) { + data = message[dataBegin:] + } + + // compile the message + return &PeerMessage{ + ID: string(message[0:idTerminator]), + Context: string(message[contextbegin : contextbegin+contextTerminator]), + Data: data, + }, nil + } + } + return nil, errors.New("invalid message") } From ea9cf5ca872d036ba4f180a958e14e088be9e7aa Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Jan 2022 15:42:52 -0800 Subject: [PATCH 3/5] Make Version Strings Constant --- protocol/connections/peerapp.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index b4e3158..bb9b9b0 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -35,6 +35,9 @@ type peerRetVal struct { Exists bool } +const Version1 = 0x01 +const Version2 = 0x02 + // NewInstance should always return a new instantiation of the application. func (pa *PeerApp) NewInstance() tapir.Application { newApp := new(PeerApp) @@ -74,7 +77,7 @@ func (pa *PeerApp) Init(connection tapir.Connection) { pa.SendMessage(model2.PeerMessage{ ID: event.ContextVersion, Context: event.ContextGetVal, - Data: []byte{0x02}, + Data: []byte{Version1}, }) pa.OnAuth(connection.Hostname()) @@ -98,9 +101,9 @@ func (pa *PeerApp) listen() { var packet model2.PeerMessage var err error - if pa.version.Load() == 0x01 { + if pa.version.Load() == Version1 { err = json.Unmarshal(message, &packet) - } else if pa.version.Load() == 0x02 { + } else if pa.version.Load() == Version2 { parsePacket, parseErr := model2.ParsePeerMessage(message) // if all else fails...attempt to process this message as a version 1 message if parseErr != nil { @@ -120,9 +123,9 @@ func (pa *PeerApp) listen() { // we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we // can remove this check all together) if packet.ID == event.ContextVersion { - if pa.version.Load() == 0x01 && len(packet.Data) == 1 && packet.Data[0] == 0x02 { - log.Debugf("switching to 2") - pa.version.Store(0x02) + if pa.version.Load() == Version1 && len(packet.Data) == 1 && packet.Data[0] == Version2 { + log.Debugf("switching to protocol version 2") + pa.version.Store(Version2) } } else { pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data)) @@ -141,7 +144,7 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { var serialized []byte var err error - if pa.version.Load() == 0x02 { + if pa.version.Load() == Version2 { // treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size) serialized = message.Serialize() } else { From a088e588b1a8c08484397461394b1e17027161e7 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Jan 2022 15:44:37 -0800 Subject: [PATCH 4/5] Comment on Serialization Format --- protocol/model/peermessage.go.go | 1 + 1 file changed, 1 insertion(+) diff --git a/protocol/model/peermessage.go.go b/protocol/model/peermessage.go.go index b62a41d..3ffed0a 100644 --- a/protocol/model/peermessage.go.go +++ b/protocol/model/peermessage.go.go @@ -18,6 +18,7 @@ type PeerMessage struct { } // Serialize constructs an efficient serialized representation +// Format: [ID String] | [Context String] | Binary Data func (m *PeerMessage) Serialize() []byte { return append(append([]byte(m.ID+"|"), []byte(m.Context+"|")...), m.Data...) } From ec6e025284f97c37770ae39e5b679885d32ad185 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 25 Jan 2022 15:55:38 -0800 Subject: [PATCH 5/5] Version Fixups --- protocol/connections/peerapp.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index bb9b9b0..bfc2c3e 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -48,7 +48,7 @@ func (pa *PeerApp) NewInstance() tapir.Application { newApp.OnAuth = pa.OnAuth newApp.OnClose = pa.OnClose newApp.OnConnecting = pa.OnConnecting - newApp.version.Store(0x01) + newApp.version.Store(Version1) return newApp } @@ -72,12 +72,13 @@ func (pa *PeerApp) Init(connection tapir.Connection) { // we are abusing the context here slightly by sending a "malformed" GetVal request. // as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this // message. - // version *must* be the first message sent to prevent race conditions for aut events fired on auth - // we send the message before we update the rest of the system + // version *must* be the first message sent to prevent race conditions for other events fired after-auth + // (e.g. getVal requests) + // as such, we send this message before we update the rest of the system pa.SendMessage(model2.PeerMessage{ ID: event.ContextVersion, Context: event.ContextGetVal, - Data: []byte{Version1}, + Data: []byte{Version2}, }) pa.OnAuth(connection.Hostname())