Negotiate Lower Bandwidth / Higher Density Packets for Peers #428
|
@ -300,6 +300,7 @@ const (
|
|||
ContextInvite = "im.cwtch.invite"
|
||||
ContextRaw = "im.cwtch.raw"
|
||||
ContextGetVal = "im.cwtch.getVal"
|
||||
ContextVersion = "im.cwtch.version"
|
||||
ContextRetVal = "im.cwtch.retVal"
|
||||
ContextRequestManifest = "im.cwtch.file.request.manifest"
|
||||
ContextSendManifest = "im.cwtch.file.send.manifest"
|
||||
|
|
|
@ -221,6 +221,7 @@ func (e *engine) eventHandler() {
|
|||
key := ev.Data[event.FileKey]
|
||||
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
|
||||
if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil {
|
||||
log.Errorf("error sending manifest: %v", err)
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
}
|
||||
case event.ManifestSaved:
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package connections
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
model2 "cwtch.im/cwtch/protocol/model"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
const cwtchCapability = tapir.Capability("cwtchCapability")
|
||||
|
@ -21,6 +23,7 @@ type PeerApp struct {
|
|||
OnAuth func(string)
|
||||
OnClose func(string)
|
||||
OnConnecting func(string)
|
||||
version atomic.Value
|
||||
}
|
||||
|
||||
type peerGetVal struct {
|
||||
|
@ -32,6 +35,9 @@ type peerRetVal struct {
|
|||
Exists bool
|
||||
}
|
||||
|
||||
const Version1 = 0x01
|
||||
const Version2 = 0x02
|
||||
|
||||
// NewInstance should always return a new instantiation of the application.
|
||||
func (pa *PeerApp) NewInstance() tapir.Application {
|
||||
newApp := new(PeerApp)
|
||||
|
@ -42,6 +48,7 @@ func (pa *PeerApp) NewInstance() tapir.Application {
|
|||
newApp.OnAuth = pa.OnAuth
|
||||
newApp.OnClose = pa.OnClose
|
||||
newApp.OnConnecting = pa.OnConnecting
|
||||
sarah marked this conversation as resolved
Outdated
|
||||
newApp.version.Store(Version1)
|
||||
sarah marked this conversation as resolved
Outdated
dan
commented
Store(Version1) Store(Version1)
|
||||
return newApp
|
||||
}
|
||||
|
||||
|
@ -59,6 +66,21 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
|
|||
pa.connection.Close()
|
||||
pa.OnClose(connection.Hostname())
|
||||
} else {
|
||||
|
||||
// we are authenticated
|
||||
// attempt to negotiate a more efficient packet format...
|
||||
// we are abusing the context here slightly by sending a "malformed" GetVal request.
|
||||
// as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this
|
||||
// message.
|
||||
// version *must* be the first message sent to prevent race conditions for other events fired after-auth
|
||||
sarah marked this conversation as resolved
Outdated
dan
commented
for 'aut' events? for 'aut' events?
|
||||
// (e.g. getVal requests)
|
||||
// as such, we send this message before we update the rest of the system
|
||||
pa.SendMessage(model2.PeerMessage{
|
||||
ID: event.ContextVersion,
|
||||
sarah marked this conversation as resolved
Outdated
dan
commented
constant? constant?
|
||||
Context: event.ContextGetVal,
|
||||
sarah marked this conversation as resolved
Outdated
dan
commented
version2? version2?
|
||||
Data: []byte{Version2},
|
||||
})
|
||||
|
||||
pa.OnAuth(connection.Hostname())
|
||||
go pa.listen()
|
||||
}
|
||||
|
@ -76,11 +98,39 @@ func (pa *PeerApp) listen() {
|
|||
pa.OnClose(pa.connection.Hostname())
|
||||
return
|
||||
}
|
||||
var peerMessage model2.PeerMessage
|
||||
err := json.Unmarshal(message, &peerMessage)
|
||||
|
||||
var packet model2.PeerMessage
|
||||
var err error
|
||||
|
||||
if pa.version.Load() == Version1 {
|
||||
err = json.Unmarshal(message, &packet)
|
||||
} else if pa.version.Load() == Version2 {
|
||||
sarah marked this conversation as resolved
Outdated
dan
commented
this is a big chunk, might be best moved to a function? this is a big chunk, might be best moved to a function?
|
||||
parsePacket, parseErr := model2.ParsePeerMessage(message)
|
||||
// if all else fails...attempt to process this message as a version 1 message
|
||||
if parseErr != nil {
|
||||
err = json.Unmarshal(message, &packet)
|
||||
sarah marked this conversation as resolved
Outdated
dan
commented
I'm a little nervous mixing unspecified Ids, event IDs are strings, even tho they should be populated by int -> strings only? its not type specified anywhere so someone could make it binary or strings and either could generate a '|', contexts are strings that right now use '.' as seperators... could we use Pascal style strings here? neither id nor context should exceed 256 bytes so just have each lead with a byte of length to parse? I'm a little nervous mixing unspecified Ids, event IDs are strings, even tho they should be populated by int -> strings only? its not type specified anywhere so someone could make it binary or strings and either could generate a '|', contexts are strings that right now use '.' as seperators...
we have a lot of string manipulation based on not well documented seperators and we're mixing it with binary data.
could we use Pascal style strings here? neither id nor context should exceed 256 bytes so just have each lead with a byte of length to parse?
|
||||
} else {
|
||||
packet = *parsePacket
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Errorf("invalid version")
|
||||
pa.OnClose(pa.connection.Hostname())
|
||||
return
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
if pa.IsAllowed(pa.connection.Hostname()) {
|
||||
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
|
||||
// we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we
|
||||
// can remove this check all together)
|
||||
if packet.ID == event.ContextVersion {
|
||||
if pa.version.Load() == Version1 && len(packet.Data) == 1 && packet.Data[0] == Version2 {
|
||||
log.Debugf("switching to protocol version 2")
|
||||
pa.version.Store(Version2)
|
||||
}
|
||||
} else {
|
||||
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
|
||||
|
@ -91,7 +141,17 @@ func (pa *PeerApp) listen() {
|
|||
// SendMessage sends the peer a preformatted message
|
||||
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
||||
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
||||
serialized, err := json.Marshal(message)
|
||||
|
||||
var serialized []byte
|
||||
var err error
|
||||
|
||||
if pa.version.Load() == Version2 {
|
||||
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
|
||||
serialized = message.Serialize()
|
||||
} else {
|
||||
sarah marked this conversation as resolved
Outdated
dan
commented
switching what to 2 :P switching what to 2 :P
|
||||
serialized, err = json.Marshal(message)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
return pa.connection.Send(serialized)
|
||||
}
|
||||
|
|
|
@ -1,8 +1,53 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
)
|
||||
|
||||
// PeerMessage is an encapsulation that can be used by higher level applications
|
||||
type PeerMessage struct {
|
||||
ID string // A unique Message ID (primarily used for acknowledgments)
|
||||
// ID **must** only contain alphanumeric characters separated by period.
|
||||
ID string // A unique Message ID (primarily used for acknowledgments)
|
||||
|
||||
// Context **must** only contain alphanumeric characters separated by period.
|
||||
Context string // A unique context identifier i.e. im.cwtch.chat
|
||||
Data []byte // The serialized data packet.
|
||||
|
||||
// Data can contain anything
|
||||
Data []byte // A data packet.
|
||||
}
|
||||
|
||||
// Serialize constructs an efficient serialized representation
|
||||
// Format: [ID String] | [Context String] | Binary Data
|
||||
func (m *PeerMessage) Serialize() []byte {
|
||||
return append(append([]byte(m.ID+"|"), []byte(m.Context+"|")...), m.Data...)
|
||||
}
|
||||
|
||||
// ParsePeerMessage returns either a deserialized PeerMessage or an error if it is malformed
|
||||
func ParsePeerMessage(message []byte) (*PeerMessage, error) {
|
||||
|
||||
// find the identifier prefix
|
||||
idTerminator := bytes.IndexByte(message, '|')
|
||||
if idTerminator != -1 && idTerminator+1 < len(message) {
|
||||
// find the context terminator prefix
|
||||
contextbegin := idTerminator + 1
|
||||
contextTerminator := bytes.IndexByte(message[contextbegin:], '|')
|
||||
if contextTerminator != -1 {
|
||||
|
||||
// check that we have data
|
||||
dataBegin := contextbegin + contextTerminator + 1
|
||||
var data []byte
|
||||
if dataBegin < len(message) {
|
||||
data = message[dataBegin:]
|
||||
}
|
||||
|
||||
// compile the message
|
||||
return &PeerMessage{
|
||||
ID: string(message[0:idTerminator]),
|
||||
Context: string(message[contextbegin : contextbegin+contextTerminator]),
|
||||
Data: data,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("invalid message")
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
|
|||
os.RemoveAll("cwtch.out.png")
|
||||
os.RemoveAll("cwtch.out.png.manifest")
|
||||
|
||||
log.SetLevel(log.LevelDebug)
|
||||
log.SetLevel(log.LevelInfo)
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
dataDir := path.Join("tordir", "tor")
|
||||
|
|
Loading…
Reference in New Issue
constant?