Tapir provides a framework for building Anonymous / metadata resistant Services
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

222 lines
6.3 KiB

  1. package tapir
  2. import (
  3. "crypto/rand"
  4. "cwtch.im/tapir/primitives"
  5. "encoding/binary"
  6. "git.openprivacy.ca/openprivacy/connectivity"
  7. "git.openprivacy.ca/openprivacy/log"
  8. "golang.org/x/crypto/ed25519"
  9. "golang.org/x/crypto/nacl/secretbox"
  10. "io"
  11. "sync"
  12. )
  13. // ServiceMetrics outlines higher level information about the service e.g. counts of connections
  14. type ServiceMetrics struct {
  15. ConnectionCount int
  16. }
  17. // Service defines the interface for a Tapir Service
  18. type Service interface {
  19. Init(acn connectivity.ACN, privateKey ed25519.PrivateKey, identity *primitives.Identity)
  20. Connect(hostname string, application Application) (bool, error)
  21. Listen(application Application) error
  22. GetConnection(connectionID string) (Connection, error)
  23. Metrics() ServiceMetrics
  24. Broadcast(message []byte, capability Capability) error
  25. WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
  26. Shutdown()
  27. }
  28. // Connection Interface
  29. type Connection interface {
  30. Hostname() string
  31. IsOutbound() bool
  32. ID() *primitives.Identity
  33. Expect() []byte
  34. SetHostname(hostname string)
  35. HasCapability(name Capability) bool
  36. SetCapability(name Capability)
  37. SetEncryptionKey(key [32]byte)
  38. Send(message []byte)
  39. Close()
  40. App() Application
  41. SetApp(application Application)
  42. IsClosed() bool
  43. Broadcast(message []byte, capability Capability) error
  44. }
  45. // Connection defines a Tapir Connection
  46. type connection struct {
  47. hostname string
  48. conn io.ReadWriteCloser
  49. capabilities sync.Map
  50. encrypted bool
  51. key [32]byte
  52. app Application
  53. identity *primitives.Identity
  54. outbound bool
  55. closed bool
  56. MaxLength int
  57. lock sync.Mutex
  58. service Service
  59. }
  60. // NewConnection creates a new Connection
  61. func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
  62. connection := new(connection)
  63. connection.hostname = hostname
  64. connection.conn = conn
  65. connection.app = app
  66. connection.identity = id
  67. connection.outbound = outbound
  68. connection.MaxLength = 8192
  69. connection.service = service
  70. go connection.app.Init(connection)
  71. return connection
  72. }
  73. // ID returns an identity.Identity encapsulation (for the purposes of cryptographic protocols)
  74. func (c *connection) ID() *primitives.Identity {
  75. return c.identity
  76. }
  77. // App returns the overarching application using this Connection.
  78. func (c *connection) App() Application {
  79. c.lock.Lock()
  80. defer c.lock.Unlock()
  81. return c.app
  82. }
  83. // App returns the overarching application using this Connection.
  84. func (c *connection) SetApp(application Application) {
  85. c.lock.Lock()
  86. defer c.lock.Unlock()
  87. c.app = application
  88. }
  89. // Hostname returns the hostname of the connection (if the connection has not been authorized it will return the
  90. // temporary hostname identifier)
  91. func (c *connection) Hostname() string {
  92. c.lock.Lock()
  93. defer c.lock.Unlock()
  94. return c.hostname
  95. }
  96. // IsOutbound returns true if this caller was the originator of the connection (i.e. the connection was started
  97. // by calling Connect() rather than Accept()
  98. func (c *connection) IsOutbound() bool {
  99. return c.outbound
  100. }
  101. // IsClosed returns true if the connection is closed (connections cannot be reopened)
  102. func (c *connection) IsClosed() bool {
  103. c.lock.Lock()
  104. defer c.lock.Unlock()
  105. return c.closed
  106. }
  107. // SetHostname sets the hostname on the connection
  108. func (c *connection) SetHostname(hostname string) {
  109. c.lock.Lock()
  110. defer c.lock.Unlock()
  111. log.Debugf("[%v -- %v] Asserting Remote Hostname: %v", c.identity.Hostname(), c.hostname, hostname)
  112. c.hostname = hostname
  113. }
  114. // SetCapability sets a capability on the connection
  115. func (c *connection) SetCapability(name Capability) {
  116. log.Debugf("[%v -- %v] Setting Capability %v", c.identity.Hostname(), c.hostname, name)
  117. c.capabilities.Store(name, true)
  118. }
  119. // HasCapability checks if the connection has a given capability
  120. func (c *connection) HasCapability(name Capability) bool {
  121. _, ok := c.capabilities.Load(name)
  122. return ok
  123. }
  124. // Close forcibly closes the connection
  125. func (c *connection) Close() {
  126. c.lock.Lock()
  127. defer c.lock.Unlock()
  128. c.closed = true
  129. c.conn.Close()
  130. }
  131. // Expect blocks and reads a single Tapir packet , from the connection.
  132. func (c *connection) Expect() []byte {
  133. buffer := make([]byte, c.MaxLength)
  134. n, err := io.ReadFull(c.conn, buffer)
  135. if n != c.MaxLength || err != nil {
  136. log.Debugf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.hostname, c.identity.Hostname(), n, err)
  137. c.conn.Close()
  138. c.closed = true
  139. return []byte{}
  140. }
  141. c.lock.Lock()
  142. defer c.lock.Unlock()
  143. if c.encrypted {
  144. var decryptNonce [24]byte
  145. copy(decryptNonce[:], buffer[:24])
  146. decrypted, ok := secretbox.Open(nil, buffer[24:], &decryptNonce, &c.key)
  147. if ok {
  148. copy(buffer, decrypted)
  149. } else {
  150. log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.hostname, c.identity.Hostname())
  151. c.conn.Close()
  152. c.closed = true
  153. return []byte{}
  154. }
  155. }
  156. length, _ := binary.Uvarint(buffer[0:2])
  157. if length+2 >= uint64(c.MaxLength) {
  158. return []byte{}
  159. }
  160. //cplog.Debugf("[%v -> %v] Wire Receive: (%d) %x", c.hostname, c.ID.Hostname(), len, buffer)
  161. return buffer[2 : length+2]
  162. }
  163. // SetEncryptionKey turns on application-level encryption on the connection using the given key.
  164. func (c *connection) SetEncryptionKey(key [32]byte) {
  165. c.lock.Lock()
  166. defer c.lock.Unlock()
  167. c.key = key
  168. c.encrypted = true
  169. }
  170. // Send writes a given message to a Tapir packet (of 1024 bytes in length).
  171. func (c *connection) Send(message []byte) {
  172. buffer := make([]byte, c.MaxLength)
  173. binary.PutUvarint(buffer[0:2], uint64(len(message)))
  174. copy(buffer[2:], message)
  175. c.lock.Lock()
  176. defer c.lock.Unlock()
  177. if c.encrypted {
  178. var nonce [24]byte
  179. if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
  180. log.Errorf("Could not read sufficient randomness %v. Closing connection", err)
  181. c.conn.Close()
  182. c.closed = true
  183. }
  184. // MaxLength - 40 = MaxLength - 24 nonce bytes and 16 auth tag.
  185. encrypted := secretbox.Seal(nonce[:], buffer[0:c.MaxLength-40], &nonce, &c.key)
  186. copy(buffer, encrypted[0:c.MaxLength])
  187. }
  188. log.Debugf("[%v -> %v] Wire Send %x", c.identity.Hostname(), c.hostname, buffer)
  189. _, err := c.conn.Write(buffer)
  190. if err != nil {
  191. c.conn.Close()
  192. c.closed = true
  193. }
  194. }
  195. // Broadcast sends a message to all active service connections with a given capability
  196. func (c *connection) Broadcast(message []byte, capability Capability) error {
  197. return c.service.Broadcast(message, capability)
  198. }