|
- package tapir
-
- import (
- "crypto/rand"
- "encoding/binary"
- "git.openprivacy.ca/cwtch.im/tapir/primitives"
- "git.openprivacy.ca/openprivacy/connectivity"
- "git.openprivacy.ca/openprivacy/log"
- "golang.org/x/crypto/ed25519"
- "golang.org/x/crypto/nacl/secretbox"
- "io"
- "sync"
- )
-
-
- // ServiceMetrics outlines higher level information about the service e.g. counts of connections
- type ServiceMetrics struct {
- ConnectionCount int
- }
-
- // Service defines the interface for a Tapir Service
- type Service interface {
- Init(acn connectivity.ACN, privateKey ed25519.PrivateKey, identity *primitives.Identity)
- Connect(hostname string, application Application) (bool, error)
- Listen(application Application) error
- GetConnection(connectionID string) (Connection, error)
- Metrics() ServiceMetrics
- Broadcast(message []byte, capability Capability) error
- WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
- Shutdown()
- }
-
- // Connection Interface
- type Connection interface {
- Hostname() string
- IsOutbound() bool
- ID() *primitives.Identity
- Expect() []byte
- SetHostname(hostname string)
- HasCapability(name Capability) bool
- SetCapability(name Capability)
- SetEncryptionKey(key [32]byte)
- Send(message []byte)
- Close()
- App() Application
- SetApp(application Application)
- IsClosed() bool
- Broadcast(message []byte, capability Capability) error
- }
-
- // Connection defines a Tapir Connection
- type connection struct {
- hostname string
- conn io.ReadWriteCloser
- capabilities sync.Map
- encrypted bool
- key [32]byte
- app Application
- identity *primitives.Identity
- outbound bool
- closed bool
- MaxLength int
- lock sync.Mutex
- service Service
- }
-
- // NewConnection creates a new Connection
- func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
- connection := new(connection)
- connection.hostname = hostname
- connection.conn = conn
- connection.app = app
- connection.identity = id
- connection.outbound = outbound
- connection.MaxLength = 8192
- connection.service = service
-
- go connection.app.Init(connection)
- return connection
- }
-
- // ID returns an identity.Identity encapsulation (for the purposes of cryptographic protocols)
- func (c *connection) ID() *primitives.Identity {
- return c.identity
- }
-
- // App returns the overarching application using this Connection.
- func (c *connection) App() Application {
- c.lock.Lock()
- defer c.lock.Unlock()
- return c.app
- }
-
- // App returns the overarching application using this Connection.
- func (c *connection) SetApp(application Application) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.app = application
- }
-
- // Hostname returns the hostname of the connection (if the connection has not been authorized it will return the
- // temporary hostname identifier)
- func (c *connection) Hostname() string {
- c.lock.Lock()
- defer c.lock.Unlock()
- return c.hostname
- }
-
- // IsOutbound returns true if this caller was the originator of the connection (i.e. the connection was started
- // by calling Connect() rather than Accept()
- func (c *connection) IsOutbound() bool {
- return c.outbound
- }
-
- // IsClosed returns true if the connection is closed (connections cannot be reopened)
- func (c *connection) IsClosed() bool {
- c.lock.Lock()
- defer c.lock.Unlock()
- return c.closed
- }
-
- // SetHostname sets the hostname on the connection
- func (c *connection) SetHostname(hostname string) {
- c.lock.Lock()
- defer c.lock.Unlock()
- log.Debugf("[%v -- %v] Asserting Remote Hostname: %v", c.identity.Hostname(), c.hostname, hostname)
- c.hostname = hostname
- }
-
- // SetCapability sets a capability on the connection
- func (c *connection) SetCapability(name Capability) {
- log.Debugf("[%v -- %v] Setting Capability %v", c.identity.Hostname(), c.hostname, name)
- c.capabilities.Store(name, true)
- }
-
- // HasCapability checks if the connection has a given capability
- func (c *connection) HasCapability(name Capability) bool {
- _, ok := c.capabilities.Load(name)
- return ok
- }
-
- // Close forcibly closes the connection
- func (c *connection) Close() {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.closeInner()
- }
-
- func (c *connection) closeInner() {
- c.closed = true
- c.conn.Close()
- }
-
- // Expect blocks and reads a single Tapir packet , from the connection.
- func (c *connection) Expect() []byte {
- buffer := make([]byte, c.MaxLength)
- // Multiple goroutines may invoke methods on a Conn simultaneously.
- // As such we don't need to mutex around closed.
- n, err := io.ReadFull(c.conn, buffer)
-
- if n != c.MaxLength || err != nil {
- log.Debugf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.hostname, c.identity.Hostname(), n, err)
- c.Close() // use the full close function which acquires a lock for the connection state...
- return []byte{}
- }
- c.lock.Lock()
- defer c.lock.Unlock()
- if c.encrypted {
- var decryptNonce [24]byte
- copy(decryptNonce[:], buffer[:24])
- decrypted, ok := secretbox.Open(nil, buffer[24:], &decryptNonce, &c.key)
- if ok {
- copy(buffer, decrypted)
- } else {
- log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.hostname, c.identity.Hostname())
- c.closeInner()
- return []byte{}
- }
- }
- length, _ := binary.Uvarint(buffer[0:2])
- if length+2 >= uint64(c.MaxLength) {
- return []byte{}
- }
- //cplog.Debugf("[%v -> %v] Wire Receive: (%d) %x", c.hostname, c.ID.Hostname(), len, buffer)
- return buffer[2 : length+2]
- }
-
- // SetEncryptionKey turns on application-level encryption on the connection using the given key.
- func (c *connection) SetEncryptionKey(key [32]byte) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.key = key
- c.encrypted = true
- }
-
- // Send writes a given message to a Tapir packet (of 1024 bytes in length).
- func (c *connection) Send(message []byte) {
-
- buffer := make([]byte, c.MaxLength)
- binary.PutUvarint(buffer[0:2], uint64(len(message)))
- copy(buffer[2:], message)
-
- c.lock.Lock()
- defer c.lock.Unlock()
- if c.encrypted {
- var nonce [24]byte
- if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
- log.Errorf("Could not read sufficient randomness %v. Closing connection", err)
- c.closeInner()
- }
- // MaxLength - 40 = MaxLength - 24 nonce bytes and 16 auth tag.
- encrypted := secretbox.Seal(nonce[:], buffer[0:c.MaxLength-40], &nonce, &c.key)
- copy(buffer, encrypted[0:c.MaxLength])
- }
- log.Debugf("[%v -> %v] Wire Send %x", c.identity.Hostname(), c.hostname, buffer)
- _, err := c.conn.Write(buffer)
- if err != nil {
- c.closeInner()
- }
- }
-
- // Broadcast sends a message to all active service connections with a given capability
- func (c *connection) Broadcast(message []byte, capability Capability) error {
- return c.service.Broadcast(message, capability)
- }
|