2019-05-15 19:45:45 +00:00
|
|
|
package tapir
|
|
|
|
|
|
|
|
import (
|
|
|
|
"crypto/rand"
|
|
|
|
"encoding/binary"
|
2022-01-24 20:28:55 +00:00
|
|
|
"errors"
|
2021-04-09 00:55:17 +00:00
|
|
|
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
2020-02-06 23:54:13 +00:00
|
|
|
"git.openprivacy.ca/openprivacy/connectivity"
|
|
|
|
"git.openprivacy.ca/openprivacy/log"
|
2019-05-15 19:45:45 +00:00
|
|
|
"golang.org/x/crypto/ed25519"
|
|
|
|
"golang.org/x/crypto/nacl/secretbox"
|
|
|
|
"io"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
2020-07-14 23:35:21 +00:00
|
|
|
// ServiceMetrics outlines higher level information about the service e.g. counts of connections
|
2020-07-14 23:35:21 +00:00
|
|
|
type ServiceMetrics struct {
|
|
|
|
ConnectionCount int
|
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// Service defines the interface for a Tapir Service
|
2019-05-15 19:45:45 +00:00
|
|
|
type Service interface {
|
2019-08-08 18:11:31 +00:00
|
|
|
Init(acn connectivity.ACN, privateKey ed25519.PrivateKey, identity *primitives.Identity)
|
2019-07-30 23:43:07 +00:00
|
|
|
Connect(hostname string, application Application) (bool, error)
|
2019-05-15 19:45:45 +00:00
|
|
|
Listen(application Application) error
|
2019-08-07 20:08:02 +00:00
|
|
|
GetConnection(connectionID string) (Connection, error)
|
2020-07-14 23:35:21 +00:00
|
|
|
Metrics() ServiceMetrics
|
2020-07-14 21:23:27 +00:00
|
|
|
Broadcast(message []byte, capability Capability) error
|
2019-09-15 21:20:05 +00:00
|
|
|
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
|
2019-07-17 18:44:08 +00:00
|
|
|
Shutdown()
|
2019-05-15 19:45:45 +00:00
|
|
|
}
|
|
|
|
|
2019-08-07 20:08:02 +00:00
|
|
|
// Connection Interface
|
|
|
|
type Connection interface {
|
|
|
|
Hostname() string
|
|
|
|
IsOutbound() bool
|
2019-08-08 18:11:31 +00:00
|
|
|
ID() *primitives.Identity
|
2019-08-07 20:08:02 +00:00
|
|
|
Expect() []byte
|
|
|
|
SetHostname(hostname string)
|
2019-09-15 21:20:05 +00:00
|
|
|
HasCapability(name Capability) bool
|
|
|
|
SetCapability(name Capability)
|
2019-08-07 20:08:02 +00:00
|
|
|
SetEncryptionKey(key [32]byte)
|
2022-01-24 20:28:55 +00:00
|
|
|
Send(message []byte) error
|
2019-08-07 20:08:02 +00:00
|
|
|
Close()
|
2019-08-08 18:28:55 +00:00
|
|
|
App() Application
|
2019-09-15 21:20:05 +00:00
|
|
|
SetApp(application Application)
|
2019-08-07 20:08:02 +00:00
|
|
|
IsClosed() bool
|
2020-07-14 21:23:27 +00:00
|
|
|
Broadcast(message []byte, capability Capability) error
|
2019-08-07 20:08:02 +00:00
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// Connection defines a Tapir Connection
|
2019-08-07 20:08:02 +00:00
|
|
|
type connection struct {
|
|
|
|
hostname string
|
2019-09-15 21:20:05 +00:00
|
|
|
conn io.ReadWriteCloser
|
2019-05-15 19:45:45 +00:00
|
|
|
capabilities sync.Map
|
|
|
|
encrypted bool
|
|
|
|
key [32]byte
|
2019-08-08 18:28:55 +00:00
|
|
|
app Application
|
2019-08-08 18:11:31 +00:00
|
|
|
identity *primitives.Identity
|
2019-08-07 20:08:02 +00:00
|
|
|
outbound bool
|
|
|
|
closed bool
|
2019-07-16 20:22:30 +00:00
|
|
|
MaxLength int
|
2019-11-26 21:10:09 +00:00
|
|
|
lock sync.Mutex
|
2020-07-14 21:23:27 +00:00
|
|
|
service Service
|
2019-05-15 19:45:45 +00:00
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// NewConnection creates a new Connection
|
2020-07-14 21:59:08 +00:00
|
|
|
func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
|
2019-08-07 20:08:02 +00:00
|
|
|
connection := new(connection)
|
|
|
|
connection.hostname = hostname
|
2019-05-15 19:45:45 +00:00
|
|
|
connection.conn = conn
|
2019-08-08 18:28:55 +00:00
|
|
|
connection.app = app
|
2019-08-08 18:11:31 +00:00
|
|
|
connection.identity = id
|
2019-08-07 20:08:02 +00:00
|
|
|
connection.outbound = outbound
|
2020-07-14 21:23:27 +00:00
|
|
|
connection.MaxLength = 8192
|
|
|
|
connection.service = service
|
2021-04-09 01:44:45 +00:00
|
|
|
|
2019-08-08 18:28:55 +00:00
|
|
|
go connection.app.Init(connection)
|
2019-05-15 19:45:45 +00:00
|
|
|
return connection
|
|
|
|
}
|
|
|
|
|
2019-08-07 20:08:02 +00:00
|
|
|
// ID returns an identity.Identity encapsulation (for the purposes of cryptographic protocols)
|
2019-08-08 18:11:31 +00:00
|
|
|
func (c *connection) ID() *primitives.Identity {
|
2019-08-07 20:08:02 +00:00
|
|
|
return c.identity
|
|
|
|
}
|
|
|
|
|
2019-08-08 18:28:55 +00:00
|
|
|
// App returns the overarching application using this Connection.
|
|
|
|
func (c *connection) App() Application {
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-08-08 18:28:55 +00:00
|
|
|
return c.app
|
|
|
|
}
|
|
|
|
|
2019-09-15 21:20:05 +00:00
|
|
|
// App returns the overarching application using this Connection.
|
|
|
|
func (c *connection) SetApp(application Application) {
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-09-15 21:20:05 +00:00
|
|
|
c.app = application
|
|
|
|
}
|
|
|
|
|
2019-08-07 20:08:02 +00:00
|
|
|
// Hostname returns the hostname of the connection (if the connection has not been authorized it will return the
|
|
|
|
// temporary hostname identifier)
|
|
|
|
func (c *connection) Hostname() string {
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-08-07 20:08:02 +00:00
|
|
|
return c.hostname
|
|
|
|
}
|
|
|
|
|
|
|
|
// IsOutbound returns true if this caller was the originator of the connection (i.e. the connection was started
|
|
|
|
// by calling Connect() rather than Accept()
|
|
|
|
func (c *connection) IsOutbound() bool {
|
|
|
|
return c.outbound
|
|
|
|
}
|
|
|
|
|
|
|
|
// IsClosed returns true if the connection is closed (connections cannot be reopened)
|
|
|
|
func (c *connection) IsClosed() bool {
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-08-07 20:08:02 +00:00
|
|
|
return c.closed
|
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// SetHostname sets the hostname on the connection
|
2019-08-07 20:08:02 +00:00
|
|
|
func (c *connection) SetHostname(hostname string) {
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-08-07 20:08:02 +00:00
|
|
|
log.Debugf("[%v -- %v] Asserting Remote Hostname: %v", c.identity.Hostname(), c.hostname, hostname)
|
|
|
|
c.hostname = hostname
|
2019-05-15 19:45:45 +00:00
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// SetCapability sets a capability on the connection
|
2019-09-15 21:20:05 +00:00
|
|
|
func (c *connection) SetCapability(name Capability) {
|
2019-08-07 20:08:02 +00:00
|
|
|
log.Debugf("[%v -- %v] Setting Capability %v", c.identity.Hostname(), c.hostname, name)
|
2019-05-15 19:45:45 +00:00
|
|
|
c.capabilities.Store(name, true)
|
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// HasCapability checks if the connection has a given capability
|
2019-09-15 21:20:05 +00:00
|
|
|
func (c *connection) HasCapability(name Capability) bool {
|
2019-05-15 19:45:45 +00:00
|
|
|
_, ok := c.capabilities.Load(name)
|
|
|
|
return ok
|
|
|
|
}
|
|
|
|
|
2019-06-05 19:02:03 +00:00
|
|
|
// Close forcibly closes the connection
|
2019-08-07 20:08:02 +00:00
|
|
|
func (c *connection) Close() {
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2021-04-09 01:44:45 +00:00
|
|
|
c.closeInner()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *connection) closeInner() {
|
2019-08-12 20:04:39 +00:00
|
|
|
c.closed = true
|
2019-06-05 19:02:03 +00:00
|
|
|
c.conn.Close()
|
|
|
|
}
|
2019-06-05 18:44:26 +00:00
|
|
|
|
2019-07-16 19:14:42 +00:00
|
|
|
// Expect blocks and reads a single Tapir packet , from the connection.
|
2019-08-07 20:08:02 +00:00
|
|
|
func (c *connection) Expect() []byte {
|
2019-07-16 19:14:42 +00:00
|
|
|
buffer := make([]byte, c.MaxLength)
|
2021-04-09 01:44:45 +00:00
|
|
|
// Multiple goroutines may invoke methods on a Conn simultaneously.
|
|
|
|
// As such we don't need to mutex around closed.
|
2019-05-21 18:28:10 +00:00
|
|
|
n, err := io.ReadFull(c.conn, buffer)
|
|
|
|
|
2019-07-16 19:14:42 +00:00
|
|
|
if n != c.MaxLength || err != nil {
|
2020-07-07 18:38:33 +00:00
|
|
|
log.Debugf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.hostname, c.identity.Hostname(), n, err)
|
2021-04-09 01:44:45 +00:00
|
|
|
c.Close() // use the full close function which acquires a lock for the connection state...
|
2019-05-21 18:28:10 +00:00
|
|
|
return []byte{}
|
|
|
|
}
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-05-15 19:45:45 +00:00
|
|
|
if c.encrypted {
|
|
|
|
var decryptNonce [24]byte
|
|
|
|
copy(decryptNonce[:], buffer[:24])
|
|
|
|
decrypted, ok := secretbox.Open(nil, buffer[24:], &decryptNonce, &c.key)
|
|
|
|
if ok {
|
|
|
|
copy(buffer, decrypted)
|
|
|
|
} else {
|
2019-08-07 20:08:02 +00:00
|
|
|
log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.hostname, c.identity.Hostname())
|
2021-04-09 01:44:45 +00:00
|
|
|
c.closeInner()
|
2019-05-15 19:45:45 +00:00
|
|
|
return []byte{}
|
|
|
|
}
|
|
|
|
}
|
2019-09-15 21:20:05 +00:00
|
|
|
length, _ := binary.Uvarint(buffer[0:2])
|
|
|
|
if length+2 >= uint64(c.MaxLength) {
|
|
|
|
return []byte{}
|
|
|
|
}
|
2019-07-30 23:43:07 +00:00
|
|
|
//cplog.Debugf("[%v -> %v] Wire Receive: (%d) %x", c.hostname, c.ID.Hostname(), len, buffer)
|
2019-09-15 21:20:05 +00:00
|
|
|
return buffer[2 : length+2]
|
2019-05-15 19:45:45 +00:00
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// SetEncryptionKey turns on application-level encryption on the connection using the given key.
|
2019-08-07 20:08:02 +00:00
|
|
|
func (c *connection) SetEncryptionKey(key [32]byte) {
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-05-15 19:45:45 +00:00
|
|
|
c.key = key
|
|
|
|
c.encrypted = true
|
|
|
|
}
|
|
|
|
|
2019-05-21 18:28:10 +00:00
|
|
|
// Send writes a given message to a Tapir packet (of 1024 bytes in length).
|
2022-01-24 20:28:55 +00:00
|
|
|
func (c *connection) Send(message []byte) error {
|
|
|
|
|
|
|
|
// We can only encode messages up to maxLength
|
|
|
|
if len(message) >= c.MaxLength {
|
|
|
|
log.Errorf("attempting to send a message that is too big")
|
|
|
|
return errors.New("message too long")
|
|
|
|
}
|
2019-05-15 19:45:45 +00:00
|
|
|
|
2019-07-16 19:14:42 +00:00
|
|
|
buffer := make([]byte, c.MaxLength)
|
2019-05-15 19:45:45 +00:00
|
|
|
binary.PutUvarint(buffer[0:2], uint64(len(message)))
|
|
|
|
copy(buffer[2:], message)
|
|
|
|
|
2019-11-26 21:10:09 +00:00
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
2019-05-15 19:45:45 +00:00
|
|
|
if c.encrypted {
|
|
|
|
var nonce [24]byte
|
|
|
|
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
|
2019-08-12 20:04:39 +00:00
|
|
|
log.Errorf("Could not read sufficient randomness %v. Closing connection", err)
|
2021-04-09 01:44:45 +00:00
|
|
|
c.closeInner()
|
2022-01-24 20:28:55 +00:00
|
|
|
return errors.New("could not read random")
|
2019-05-15 19:45:45 +00:00
|
|
|
}
|
|
|
|
// MaxLength - 40 = MaxLength - 24 nonce bytes and 16 auth tag.
|
2019-07-16 19:14:42 +00:00
|
|
|
encrypted := secretbox.Seal(nonce[:], buffer[0:c.MaxLength-40], &nonce, &c.key)
|
|
|
|
copy(buffer, encrypted[0:c.MaxLength])
|
2019-06-05 18:44:26 +00:00
|
|
|
}
|
2019-08-07 20:08:02 +00:00
|
|
|
log.Debugf("[%v -> %v] Wire Send %x", c.identity.Hostname(), c.hostname, buffer)
|
2019-06-05 18:44:26 +00:00
|
|
|
_, err := c.conn.Write(buffer)
|
|
|
|
if err != nil {
|
2021-04-09 01:44:45 +00:00
|
|
|
c.closeInner()
|
2019-05-15 19:45:45 +00:00
|
|
|
}
|
2022-01-24 20:28:55 +00:00
|
|
|
return err
|
2019-05-15 19:45:45 +00:00
|
|
|
}
|
2020-07-14 21:23:27 +00:00
|
|
|
|
|
|
|
// Broadcast sends a message to all active service connections with a given capability
|
|
|
|
func (c *connection) Broadcast(message []byte, capability Capability) error {
|
|
|
|
return c.service.Broadcast(message, capability)
|
|
|
|
}
|