234 lines
6.9 KiB

package tapir
import (
// ServiceMetrics outlines higher level information about the service e.g. counts of connections
type ServiceMetrics struct {
ConnectionCount int
// Service defines the interface for a Tapir Service
type Service interface {
Init(acn connectivity.ACN, privateKey ed25519.PrivateKey, identity *primitives.Identity)
Connect(hostname string, application Application) (bool, error)
Listen(application Application) error
GetConnection(connectionID string) (Connection, error)
Metrics() ServiceMetrics
Broadcast(message []byte, capability Capability) error
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
// Connection Interface
type Connection interface {
Hostname() string
IsOutbound() bool
ID() *primitives.Identity
Expect() []byte
SetHostname(hostname string)
HasCapability(name Capability) bool
SetCapability(name Capability)
SetEncryptionKey(key [32]byte)
Send(message []byte) error
App() Application
SetApp(application Application)
IsClosed() bool
Broadcast(message []byte, capability Capability) error
// Connection defines a Tapir Connection
type connection struct {
hostname string
conn io.ReadWriteCloser
capabilities sync.Map
encrypted bool
key [32]byte
app Application
identity *primitives.Identity
outbound bool
closed bool
MaxLength int
lock sync.Mutex
service Service
expectBuffer []byte
// NewConnection creates a new Connection
func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
connection := new(connection)
connection.hostname = hostname
connection.conn = conn
connection.app = app
connection.identity = id
connection.outbound = outbound
connection.MaxLength = 8192
connection.service = service
connection.expectBuffer = make([]byte, 8192)
go connection.app.Init(connection)
return connection
// ID returns an identity.Identity encapsulation (for the purposes of cryptographic protocols)
func (c *connection) ID() *primitives.Identity {
return c.identity
// App returns the overarching application using this Connection.
func (c *connection) App() Application {
defer c.lock.Unlock()
return c.app
// App returns the overarching application using this Connection.
func (c *connection) SetApp(application Application) {
defer c.lock.Unlock()
c.app = application
// Hostname returns the hostname of the connection (if the connection has not been authorized it will return the
// temporary hostname identifier)
func (c *connection) Hostname() string {
defer c.lock.Unlock()
return c.hostname
// IsOutbound returns true if this caller was the originator of the connection (i.e. the connection was started
// by calling Connect() rather than Accept()
func (c *connection) IsOutbound() bool {
return c.outbound
// IsClosed returns true if the connection is closed (connections cannot be reopened)
func (c *connection) IsClosed() bool {
defer c.lock.Unlock()
return c.closed
// SetHostname sets the hostname on the connection
func (c *connection) SetHostname(hostname string) {
defer c.lock.Unlock()
log.Debugf("[%v -- %v] Asserting Remote Hostname: %v", c.identity.Hostname(), c.hostname, hostname)
c.hostname = hostname
// SetCapability sets a capability on the connection
func (c *connection) SetCapability(name Capability) {
log.Debugf("[%v -- %v] Setting Capability %v", c.identity.Hostname(), c.hostname, name)
c.capabilities.Store(name, true)
// HasCapability checks if the connection has a given capability
func (c *connection) HasCapability(name Capability) bool {
_, ok := c.capabilities.Load(name)
return ok
// Close forcibly closes the connection
func (c *connection) Close() {
defer c.lock.Unlock()
func (c *connection) closeInner() {
c.closed = true
// Expect blocks and reads a single Tapir packet , from the connection.
func (c *connection) Expect() []byte {
// Multiple goroutines may invoke methods on a Conn simultaneously.
// As such we don't need to mutex around closed.
n, err := io.ReadFull(c.conn, c.expectBuffer)
if n != c.MaxLength || err != nil {
log.Debugf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.hostname, c.identity.Hostname(), n, err)
c.Close() // use the full close function which acquires a lock for the connection state...
return []byte{}
defer c.lock.Unlock()
if c.encrypted {
var decryptNonce [24]byte
copy(decryptNonce[:], c.expectBuffer[:24])
decrypted, ok := secretbox.Open(nil, c.expectBuffer[24:], &decryptNonce, &c.key)
if ok {
copy(c.expectBuffer, decrypted)
} else {
log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.hostname, c.identity.Hostname())
return []byte{}
length, _ := binary.Uvarint(c.expectBuffer[0:2])
if length+2 >= uint64(c.MaxLength) {
return []byte{}
//cplog.Debugf("[%v -> %v] Wire Receive: (%d) %x", c.hostname, c.ID.Hostname(), len, buffer)
return c.expectBuffer[2 : length+2]
// SetEncryptionKey turns on application-level encryption on the connection using the given key.
func (c *connection) SetEncryptionKey(key [32]byte) {
defer c.lock.Unlock()
c.key = key
c.encrypted = true
// Send writes a given message to a Tapir packet (of 1024 bytes in length).
func (c *connection) Send(message []byte) error {
// We can only encode messages up to maxLength
if len(message) >= c.MaxLength {
log.Errorf("attempting to send a message that is too big")
return errors.New("message too long")
buffer := make([]byte, c.MaxLength)
binary.PutUvarint(buffer[0:2], uint64(len(message)))
copy(buffer[2:], message)
defer c.lock.Unlock()
if c.encrypted {
var nonce [24]byte
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
log.Errorf("Could not read sufficient randomness %v. Closing connection", err)
return errors.New("could not read random")
// MaxLength - 40 = MaxLength - 24 nonce bytes and 16 auth tag.
encrypted := secretbox.Seal(nonce[:], buffer[0:c.MaxLength-40], &nonce, &c.key)
copy(buffer, encrypted[0:c.MaxLength])
log.Debugf("[%v -> %v] Wire Send %x", c.identity.Hostname(), c.hostname, buffer)
_, err := c.conn.Write(buffer)
if err != nil {
return err
// Broadcast sends a message to all active service connections with a given capability
func (c *connection) Broadcast(message []byte, capability Capability) error {
return c.service.Broadcast(message, capability)