package tapir import ( "crypto/rand" "encoding/binary" "errors" "git.openprivacy.ca/cwtch.im/tapir/primitives" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "golang.org/x/crypto/ed25519" "golang.org/x/crypto/nacl/secretbox" "io" "sync" ) // ServiceMetrics outlines higher level information about the service e.g. counts of connections type ServiceMetrics struct { ConnectionCount int } // Service defines the interface for a Tapir Service type Service interface { Init(acn connectivity.ACN, privateKey ed25519.PrivateKey, identity *primitives.Identity) Connect(hostname string, application Application) (bool, error) Listen(application Application) error GetConnection(connectionID string) (Connection, error) Metrics() ServiceMetrics Broadcast(message []byte, capability Capability) error WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error) Shutdown() } // Connection Interface type Connection interface { Hostname() string IsOutbound() bool ID() *primitives.Identity Expect() []byte SetHostname(hostname string) HasCapability(name Capability) bool SetCapability(name Capability) SetEncryptionKey(key [32]byte) Send(message []byte) error Close() App() Application SetApp(application Application) IsClosed() bool Broadcast(message []byte, capability Capability) error } // Connection defines a Tapir Connection type connection struct { hostname string conn io.ReadWriteCloser capabilities sync.Map encrypted bool key [32]byte app Application identity *primitives.Identity outbound bool closed bool MaxLength int lock sync.Mutex service Service expectBuffer []byte } // NewConnection creates a new Connection func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection { connection := new(connection) connection.hostname = hostname connection.conn = conn connection.app = app connection.identity = id connection.outbound = outbound connection.MaxLength = 8192 connection.service = service connection.expectBuffer = make([]byte, 8192) go connection.app.Init(connection) return connection } // ID returns an identity.Identity encapsulation (for the purposes of cryptographic protocols) func (c *connection) ID() *primitives.Identity { return c.identity } // App returns the overarching application using this Connection. func (c *connection) App() Application { c.lock.Lock() defer c.lock.Unlock() return c.app } // App returns the overarching application using this Connection. func (c *connection) SetApp(application Application) { c.lock.Lock() defer c.lock.Unlock() c.app = application } // Hostname returns the hostname of the connection (if the connection has not been authorized it will return the // temporary hostname identifier) func (c *connection) Hostname() string { c.lock.Lock() defer c.lock.Unlock() return c.hostname } // IsOutbound returns true if this caller was the originator of the connection (i.e. the connection was started // by calling Connect() rather than Accept() func (c *connection) IsOutbound() bool { return c.outbound } // IsClosed returns true if the connection is closed (connections cannot be reopened) func (c *connection) IsClosed() bool { c.lock.Lock() defer c.lock.Unlock() return c.closed } // SetHostname sets the hostname on the connection func (c *connection) SetHostname(hostname string) { c.lock.Lock() defer c.lock.Unlock() log.Debugf("[%v -- %v] Asserting Remote Hostname: %v", c.identity.Hostname(), c.hostname, hostname) c.hostname = hostname } // SetCapability sets a capability on the connection func (c *connection) SetCapability(name Capability) { log.Debugf("[%v -- %v] Setting Capability %v", c.identity.Hostname(), c.hostname, name) c.capabilities.Store(name, true) } // HasCapability checks if the connection has a given capability func (c *connection) HasCapability(name Capability) bool { _, ok := c.capabilities.Load(name) return ok } // Close forcibly closes the connection func (c *connection) Close() { c.lock.Lock() defer c.lock.Unlock() c.closeInner() } func (c *connection) closeInner() { c.closed = true c.conn.Close() } // Expect blocks and reads a single Tapir packet , from the connection. func (c *connection) Expect() []byte { // Multiple goroutines may invoke methods on a Conn simultaneously. // As such we don't need to mutex around closed. n, err := io.ReadFull(c.conn, c.expectBuffer) if n != c.MaxLength || err != nil { log.Debugf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.hostname, c.identity.Hostname(), n, err) c.Close() // use the full close function which acquires a lock for the connection state... return []byte{} } c.lock.Lock() defer c.lock.Unlock() if c.encrypted { var decryptNonce [24]byte copy(decryptNonce[:], c.expectBuffer[:24]) decrypted, ok := secretbox.Open(nil, c.expectBuffer[24:], &decryptNonce, &c.key) if ok { copy(c.expectBuffer, decrypted) } else { log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.hostname, c.identity.Hostname()) c.closeInner() return []byte{} } } length, _ := binary.Uvarint(c.expectBuffer[0:2]) if length+2 >= uint64(c.MaxLength) { return []byte{} } //cplog.Debugf("[%v -> %v] Wire Receive: (%d) %x", c.hostname, c.ID.Hostname(), len, buffer) return c.expectBuffer[2 : length+2] } // SetEncryptionKey turns on application-level encryption on the connection using the given key. func (c *connection) SetEncryptionKey(key [32]byte) { c.lock.Lock() defer c.lock.Unlock() c.key = key c.encrypted = true } // Send writes a given message to a Tapir packet (of 1024 bytes in length). func (c *connection) Send(message []byte) error { // We can only encode messages up to maxLength if len(message) >= c.MaxLength { log.Errorf("attempting to send a message that is too big") return errors.New("message too long") } buffer := make([]byte, c.MaxLength) binary.PutUvarint(buffer[0:2], uint64(len(message))) copy(buffer[2:], message) c.lock.Lock() defer c.lock.Unlock() if c.encrypted { var nonce [24]byte if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { log.Errorf("Could not read sufficient randomness %v. Closing connection", err) c.closeInner() return errors.New("could not read random") } // MaxLength - 40 = MaxLength - 24 nonce bytes and 16 auth tag. encrypted := secretbox.Seal(nonce[:], buffer[0:c.MaxLength-40], &nonce, &c.key) copy(buffer, encrypted[0:c.MaxLength]) } log.Debugf("[%v -> %v] Wire Send %x", c.identity.Hostname(), c.hostname, buffer) _, err := c.conn.Write(buffer) if err != nil { c.closeInner() } return err } // Broadcast sends a message to all active service connections with a given capability func (c *connection) Broadcast(message []byte, capability Capability) error { return c.service.Broadcast(message, capability) }