diff --git a/event/common.go b/event/common.go index 9a0430f..83b83fe 100644 --- a/event/common.go +++ b/event/common.go @@ -300,6 +300,7 @@ const ( ContextInvite = "im.cwtch.invite" ContextRaw = "im.cwtch.raw" ContextGetVal = "im.cwtch.getVal" + ContextVersion = "im.cwtch.version" ContextRetVal = "im.cwtch.retVal" ContextRequestManifest = "im.cwtch.file.request.manifest" ContextSendManifest = "im.cwtch.file.send.manifest" diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 11af969..52d8f18 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -221,6 +221,7 @@ func (e *engine) eventHandler() { key := ev.Data[event.FileKey] size, _ := strconv.Atoi(ev.Data[event.ManifestSize]) if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil { + log.Errorf("error sending manifest: %v", err) e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.ManifestSaved: diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index 5a6734a..bfc2c3e 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -1,11 +1,13 @@ package connections import ( + "cwtch.im/cwtch/event" model2 "cwtch.im/cwtch/protocol/model" "encoding/json" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/openprivacy/log" + "sync/atomic" ) const cwtchCapability = tapir.Capability("cwtchCapability") @@ -21,6 +23,7 @@ type PeerApp struct { OnAuth func(string) OnClose func(string) OnConnecting func(string) + version atomic.Value } type peerGetVal struct { @@ -32,6 +35,9 @@ type peerRetVal struct { Exists bool } +const Version1 = 0x01 +const Version2 = 0x02 + // NewInstance should always return a new instantiation of the application. func (pa *PeerApp) NewInstance() tapir.Application { newApp := new(PeerApp) @@ -42,6 +48,7 @@ func (pa *PeerApp) NewInstance() tapir.Application { newApp.OnAuth = pa.OnAuth newApp.OnClose = pa.OnClose newApp.OnConnecting = pa.OnConnecting + newApp.version.Store(Version1) return newApp } @@ -59,6 +66,21 @@ func (pa *PeerApp) Init(connection tapir.Connection) { pa.connection.Close() pa.OnClose(connection.Hostname()) } else { + + // we are authenticated + // attempt to negotiate a more efficient packet format... + // we are abusing the context here slightly by sending a "malformed" GetVal request. + // as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this + // message. + // version *must* be the first message sent to prevent race conditions for other events fired after-auth + // (e.g. getVal requests) + // as such, we send this message before we update the rest of the system + pa.SendMessage(model2.PeerMessage{ + ID: event.ContextVersion, + Context: event.ContextGetVal, + Data: []byte{Version2}, + }) + pa.OnAuth(connection.Hostname()) go pa.listen() } @@ -76,11 +98,39 @@ func (pa *PeerApp) listen() { pa.OnClose(pa.connection.Hostname()) return } - var peerMessage model2.PeerMessage - err := json.Unmarshal(message, &peerMessage) + + var packet model2.PeerMessage + var err error + + if pa.version.Load() == Version1 { + err = json.Unmarshal(message, &packet) + } else if pa.version.Load() == Version2 { + parsePacket, parseErr := model2.ParsePeerMessage(message) + // if all else fails...attempt to process this message as a version 1 message + if parseErr != nil { + err = json.Unmarshal(message, &packet) + } else { + packet = *parsePacket + } + + } else { + log.Errorf("invalid version") + pa.OnClose(pa.connection.Hostname()) + return + } + if err == nil { if pa.IsAllowed(pa.connection.Hostname()) { - pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data) + // we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we + // can remove this check all together) + if packet.ID == event.ContextVersion { + if pa.version.Load() == Version1 && len(packet.Data) == 1 && packet.Data[0] == Version2 { + log.Debugf("switching to protocol version 2") + pa.version.Store(Version2) + } + } else { + pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data)) + } } } else { log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err) @@ -91,7 +141,17 @@ func (pa *PeerApp) listen() { // SendMessage sends the peer a preformatted message // NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { - serialized, err := json.Marshal(message) + + var serialized []byte + var err error + + if pa.version.Load() == Version2 { + // treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size) + serialized = message.Serialize() + } else { + serialized, err = json.Marshal(message) + } + if err == nil { return pa.connection.Send(serialized) } diff --git a/protocol/model/peermessage.go.go b/protocol/model/peermessage.go.go index 6d3bfa2..3ffed0a 100644 --- a/protocol/model/peermessage.go.go +++ b/protocol/model/peermessage.go.go @@ -1,8 +1,53 @@ package model +import ( + "bytes" + "errors" +) + // PeerMessage is an encapsulation that can be used by higher level applications type PeerMessage struct { - ID string // A unique Message ID (primarily used for acknowledgments) + // ID **must** only contain alphanumeric characters separated by period. + ID string // A unique Message ID (primarily used for acknowledgments) + + // Context **must** only contain alphanumeric characters separated by period. Context string // A unique context identifier i.e. im.cwtch.chat - Data []byte // The serialized data packet. + + // Data can contain anything + Data []byte // A data packet. +} + +// Serialize constructs an efficient serialized representation +// Format: [ID String] | [Context String] | Binary Data +func (m *PeerMessage) Serialize() []byte { + return append(append([]byte(m.ID+"|"), []byte(m.Context+"|")...), m.Data...) +} + +// ParsePeerMessage returns either a deserialized PeerMessage or an error if it is malformed +func ParsePeerMessage(message []byte) (*PeerMessage, error) { + + // find the identifier prefix + idTerminator := bytes.IndexByte(message, '|') + if idTerminator != -1 && idTerminator+1 < len(message) { + // find the context terminator prefix + contextbegin := idTerminator + 1 + contextTerminator := bytes.IndexByte(message[contextbegin:], '|') + if contextTerminator != -1 { + + // check that we have data + dataBegin := contextbegin + contextTerminator + 1 + var data []byte + if dataBegin < len(message) { + data = message[dataBegin:] + } + + // compile the message + return &PeerMessage{ + ID: string(message[0:idTerminator]), + Context: string(message[contextbegin : contextbegin+contextTerminator]), + Data: data, + }, nil + } + } + return nil, errors.New("invalid message") } diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index a7d3f2e..7eddc64 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) { os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - log.SetLevel(log.LevelDebug) + log.SetLevel(log.LevelInfo) os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor")