Negotiate Lower Bandwidth / Higher Density Packets for Peers #428
|
@ -300,6 +300,7 @@ const (
|
||||||
ContextInvite = "im.cwtch.invite"
|
ContextInvite = "im.cwtch.invite"
|
||||||
ContextRaw = "im.cwtch.raw"
|
ContextRaw = "im.cwtch.raw"
|
||||||
ContextGetVal = "im.cwtch.getVal"
|
ContextGetVal = "im.cwtch.getVal"
|
||||||
|
ContextVersion = "im.cwtch.version"
|
||||||
ContextRetVal = "im.cwtch.retVal"
|
ContextRetVal = "im.cwtch.retVal"
|
||||||
ContextRequestManifest = "im.cwtch.file.request.manifest"
|
ContextRequestManifest = "im.cwtch.file.request.manifest"
|
||||||
ContextSendManifest = "im.cwtch.file.send.manifest"
|
ContextSendManifest = "im.cwtch.file.send.manifest"
|
||||||
|
|
|
@ -221,6 +221,7 @@ func (e *engine) eventHandler() {
|
||||||
key := ev.Data[event.FileKey]
|
key := ev.Data[event.FileKey]
|
||||||
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
|
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
|
||||||
if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil {
|
if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil {
|
||||||
|
log.Errorf("error sending manifest: %v", err)
|
||||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||||
}
|
}
|
||||||
case event.ManifestSaved:
|
case event.ManifestSaved:
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
package connections
|
package connections
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"cwtch.im/cwtch/event"
|
||||||
model2 "cwtch.im/cwtch/protocol/model"
|
model2 "cwtch.im/cwtch/protocol/model"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir"
|
"git.openprivacy.ca/cwtch.im/tapir"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
const cwtchCapability = tapir.Capability("cwtchCapability")
|
const cwtchCapability = tapir.Capability("cwtchCapability")
|
||||||
|
@ -21,6 +23,7 @@ type PeerApp struct {
|
||||||
OnAuth func(string)
|
OnAuth func(string)
|
||||||
OnClose func(string)
|
OnClose func(string)
|
||||||
OnConnecting func(string)
|
OnConnecting func(string)
|
||||||
|
version atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerGetVal struct {
|
type peerGetVal struct {
|
||||||
|
@ -32,6 +35,9 @@ type peerRetVal struct {
|
||||||
Exists bool
|
Exists bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const Version1 = 0x01
|
||||||
|
const Version2 = 0x02
|
||||||
|
|
||||||
// NewInstance should always return a new instantiation of the application.
|
// NewInstance should always return a new instantiation of the application.
|
||||||
func (pa *PeerApp) NewInstance() tapir.Application {
|
func (pa *PeerApp) NewInstance() tapir.Application {
|
||||||
newApp := new(PeerApp)
|
newApp := new(PeerApp)
|
||||||
|
@ -42,6 +48,7 @@ func (pa *PeerApp) NewInstance() tapir.Application {
|
||||||
newApp.OnAuth = pa.OnAuth
|
newApp.OnAuth = pa.OnAuth
|
||||||
newApp.OnClose = pa.OnClose
|
newApp.OnClose = pa.OnClose
|
||||||
newApp.OnConnecting = pa.OnConnecting
|
newApp.OnConnecting = pa.OnConnecting
|
||||||
|
newApp.version.Store(Version1)
|
||||||
return newApp
|
return newApp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,6 +66,21 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
|
||||||
pa.connection.Close()
|
pa.connection.Close()
|
||||||
pa.OnClose(connection.Hostname())
|
pa.OnClose(connection.Hostname())
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
// we are authenticated
|
||||||
|
// attempt to negotiate a more efficient packet format...
|
||||||
|
// we are abusing the context here slightly by sending a "malformed" GetVal request.
|
||||||
|
// as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this
|
||||||
|
// message.
|
||||||
|
// version *must* be the first message sent to prevent race conditions for other events fired after-auth
|
||||||
|
// (e.g. getVal requests)
|
||||||
|
// as such, we send this message before we update the rest of the system
|
||||||
|
pa.SendMessage(model2.PeerMessage{
|
||||||
|
ID: event.ContextVersion,
|
||||||
|
Context: event.ContextGetVal,
|
||||||
|
Data: []byte{Version2},
|
||||||
|
})
|
||||||
|
|
||||||
pa.OnAuth(connection.Hostname())
|
pa.OnAuth(connection.Hostname())
|
||||||
go pa.listen()
|
go pa.listen()
|
||||||
}
|
}
|
||||||
|
@ -76,11 +98,39 @@ func (pa *PeerApp) listen() {
|
||||||
pa.OnClose(pa.connection.Hostname())
|
pa.OnClose(pa.connection.Hostname())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var peerMessage model2.PeerMessage
|
|
||||||
err := json.Unmarshal(message, &peerMessage)
|
var packet model2.PeerMessage
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if pa.version.Load() == Version1 {
|
||||||
|
err = json.Unmarshal(message, &packet)
|
||||||
|
} else if pa.version.Load() == Version2 {
|
||||||
|
parsePacket, parseErr := model2.ParsePeerMessage(message)
|
||||||
|
// if all else fails...attempt to process this message as a version 1 message
|
||||||
|
if parseErr != nil {
|
||||||
|
err = json.Unmarshal(message, &packet)
|
||||||
|
} else {
|
||||||
|
packet = *parsePacket
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Errorf("invalid version")
|
||||||
|
pa.OnClose(pa.connection.Hostname())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if pa.IsAllowed(pa.connection.Hostname()) {
|
if pa.IsAllowed(pa.connection.Hostname()) {
|
||||||
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
|
// we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we
|
||||||
|
// can remove this check all together)
|
||||||
|
if packet.ID == event.ContextVersion {
|
||||||
|
if pa.version.Load() == Version1 && len(packet.Data) == 1 && packet.Data[0] == Version2 {
|
||||||
|
log.Debugf("switching to protocol version 2")
|
||||||
|
pa.version.Store(Version2)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
|
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
|
||||||
|
@ -91,7 +141,17 @@ func (pa *PeerApp) listen() {
|
||||||
// SendMessage sends the peer a preformatted message
|
// SendMessage sends the peer a preformatted message
|
||||||
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
||||||
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
||||||
serialized, err := json.Marshal(message)
|
|
||||||
|
var serialized []byte
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if pa.version.Load() == Version2 {
|
||||||
|
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
|
||||||
|
serialized = message.Serialize()
|
||||||
|
} else {
|
||||||
|
serialized, err = json.Marshal(message)
|
||||||
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return pa.connection.Send(serialized)
|
return pa.connection.Send(serialized)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,53 @@
|
||||||
package model
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
// PeerMessage is an encapsulation that can be used by higher level applications
|
// PeerMessage is an encapsulation that can be used by higher level applications
|
||||||
type PeerMessage struct {
|
type PeerMessage struct {
|
||||||
|
// ID **must** only contain alphanumeric characters separated by period.
|
||||||
ID string // A unique Message ID (primarily used for acknowledgments)
|
ID string // A unique Message ID (primarily used for acknowledgments)
|
||||||
|
|
||||||
|
// Context **must** only contain alphanumeric characters separated by period.
|
||||||
Context string // A unique context identifier i.e. im.cwtch.chat
|
Context string // A unique context identifier i.e. im.cwtch.chat
|
||||||
Data []byte // The serialized data packet.
|
|
||||||
|
// Data can contain anything
|
||||||
|
Data []byte // A data packet.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serialize constructs an efficient serialized representation
|
||||||
|
// Format: [ID String] | [Context String] | Binary Data
|
||||||
|
func (m *PeerMessage) Serialize() []byte {
|
||||||
|
return append(append([]byte(m.ID+"|"), []byte(m.Context+"|")...), m.Data...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParsePeerMessage returns either a deserialized PeerMessage or an error if it is malformed
|
||||||
|
func ParsePeerMessage(message []byte) (*PeerMessage, error) {
|
||||||
|
|
||||||
|
// find the identifier prefix
|
||||||
|
idTerminator := bytes.IndexByte(message, '|')
|
||||||
|
if idTerminator != -1 && idTerminator+1 < len(message) {
|
||||||
|
// find the context terminator prefix
|
||||||
|
contextbegin := idTerminator + 1
|
||||||
|
contextTerminator := bytes.IndexByte(message[contextbegin:], '|')
|
||||||
|
if contextTerminator != -1 {
|
||||||
|
|
||||||
|
// check that we have data
|
||||||
|
dataBegin := contextbegin + contextTerminator + 1
|
||||||
|
var data []byte
|
||||||
|
if dataBegin < len(message) {
|
||||||
|
data = message[dataBegin:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// compile the message
|
||||||
|
return &PeerMessage{
|
||||||
|
ID: string(message[0:idTerminator]),
|
||||||
|
Context: string(message[contextbegin : contextbegin+contextTerminator]),
|
||||||
|
Data: data,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, errors.New("invalid message")
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
|
||||||
os.RemoveAll("cwtch.out.png")
|
os.RemoveAll("cwtch.out.png")
|
||||||
os.RemoveAll("cwtch.out.png.manifest")
|
os.RemoveAll("cwtch.out.png.manifest")
|
||||||
|
|
||||||
log.SetLevel(log.LevelDebug)
|
log.SetLevel(log.LevelInfo)
|
||||||
|
|
||||||
os.Mkdir("tordir", 0700)
|
os.Mkdir("tordir", 0700)
|
||||||
dataDir := path.Join("tordir", "tor")
|
dataDir := path.Join("tordir", "tor")
|
||||||
|
|
Loading…
Reference in New Issue