cwtch/protocol/connections/peerapp.go

156 lines
4.4 KiB
Go
Raw Normal View History

2019-07-17 19:10:52 +00:00
package connections
import (
"cwtch.im/cwtch/event"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
2021-04-09 01:22:08 +00:00
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log"
"sync/atomic"
2019-07-17 19:10:52 +00:00
)
const cwtchCapability = tapir.Capability("cwtchCapability")
2019-07-17 19:10:52 +00:00
// PeerApp encapsulates the behaviour of a Cwtch Peer
type PeerApp struct {
applications.AuthApp
2019-08-08 18:39:38 +00:00
connection tapir.Connection
MessageHandler func(string, string, string, []byte)
IsBlocked func(string) bool
IsAllowed func(string) bool
2019-07-29 20:21:58 +00:00
OnAcknowledgement func(string, string)
OnAuth func(string)
OnClose func(string)
2019-07-23 17:57:04 +00:00
OnConnecting func(string)
version atomic.Value
2019-07-17 19:10:52 +00:00
}
type peerGetVal struct {
Scope, Path string
}
type peerRetVal struct {
Val string
Exists bool
}
2019-07-17 19:10:52 +00:00
// NewInstance should always return a new instantiation of the application.
func (pa *PeerApp) NewInstance() tapir.Application {
2019-07-17 19:10:52 +00:00
newApp := new(PeerApp)
newApp.MessageHandler = pa.MessageHandler
newApp.IsBlocked = pa.IsBlocked
newApp.IsAllowed = pa.IsAllowed
2019-07-23 17:57:04 +00:00
newApp.OnAcknowledgement = pa.OnAcknowledgement
2019-07-17 19:10:52 +00:00
newApp.OnAuth = pa.OnAuth
newApp.OnClose = pa.OnClose
2019-07-23 17:57:04 +00:00
newApp.OnConnecting = pa.OnConnecting
newApp.version.Store(0x01)
2019-07-17 19:10:52 +00:00
return newApp
}
// Init is run when the connection is first started.
2019-08-08 18:39:38 +00:00
func (pa *PeerApp) Init(connection tapir.Connection) {
2019-07-17 19:10:52 +00:00
// First run the Authentication App
pa.AuthApp.Init(connection)
if connection.HasCapability(applications.AuthCapability) {
2019-07-17 19:10:52 +00:00
pa.connection = connection
connection.SetCapability(cwtchCapability)
2019-08-08 18:39:38 +00:00
if pa.IsBlocked(connection.Hostname()) {
pa.connection.Close()
2019-08-08 18:39:38 +00:00
pa.OnClose(connection.Hostname())
} else {
// we are authenticated
// attempt to negotiate a more efficient packet format...
// we are abusing the context here slightly by sending a "malformed" GetVal request.
// as a rule cwtch ignores getval requests that it cannot deserialize so older clients will ignore this
// message.
// version *must* be the first message sent to prevent race conditions for aut events fired on auth
// we send the message before we update the rest of the system
pa.SendMessage(model2.PeerMessage{
ID: event.ContextVersion,
Context: event.ContextGetVal,
Data: []byte{0x02},
})
2019-08-08 18:39:38 +00:00
pa.OnAuth(connection.Hostname())
go pa.listen()
}
2019-07-17 19:10:52 +00:00
} else {
// The auth protocol wasn't completed, we can safely shutdown the connection
connection.Close()
2019-07-17 19:10:52 +00:00
}
}
func (pa *PeerApp) listen() {
2019-07-17 19:10:52 +00:00
for {
message := pa.connection.Expect()
if len(message) == 0 {
2020-07-14 00:46:05 +00:00
log.Debugf("0 byte read, socket has likely failed. Closing the listen goroutine")
2019-08-08 18:39:38 +00:00
pa.OnClose(pa.connection.Hostname())
2019-07-17 19:10:52 +00:00
return
}
var packet model2.PeerMessage
var err error
if pa.version.Load() == 0x01 {
err = json.Unmarshal(message, &packet)
} else if pa.version.Load() == 0x02 {
parsePacket, parseErr := model2.ParsePeerMessage(message)
// if all else fails...attempt to process this message as a version 1 message
if parseErr != nil {
err = json.Unmarshal(message, &packet)
} else {
packet = *parsePacket
}
} else {
log.Errorf("invalid version")
pa.OnClose(pa.connection.Hostname())
return
}
if err == nil {
if pa.IsAllowed(pa.connection.Hostname()) {
// we don't expose im.cwtch.version messages outside of PeerApp (ideally at some point in the future we
// can remove this check all together)
if packet.ID == event.ContextVersion {
if pa.version.Load() == 0x01 && len(packet.Data) == 1 && packet.Data[0] == 0x02 {
log.Debugf("switching to 2")
pa.version.Store(0x02)
}
} else {
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data))
}
}
} else {
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
}
2019-07-17 19:10:52 +00:00
}
}
// SendMessage sends the peer a preformatted message
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
var serialized []byte
var err error
if pa.version.Load() == 0x02 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = message.Serialize()
} else {
serialized, err = json.Marshal(message)
}
if err == nil {
return pa.connection.Send(serialized)
}
return err
2019-07-17 19:10:52 +00:00
}