Tapir provides a framework for building Anonymous / metadata resistant Services
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.3 KiB

  1. package tor
  2. import (
  3. "crypto/rand"
  4. "cwtch.im/tapir"
  5. "cwtch.im/tapir/primitives"
  6. "encoding/base64"
  7. "errors"
  8. "git.openprivacy.ca/openprivacy/connectivity"
  9. "git.openprivacy.ca/openprivacy/log"
  10. "golang.org/x/crypto/ed25519"
  11. "sync"
  12. "time"
  13. )
  14. // BaseOnionService is a concrete implementation of the service interface over Tor onion services.
  15. type BaseOnionService struct {
  16. connections sync.Map
  17. acn connectivity.ACN
  18. id *primitives.Identity
  19. privateKey ed25519.PrivateKey
  20. ls connectivity.ListenService
  21. lock sync.Mutex
  22. }
  23. // Metrics provides a report of useful information about the status of the service e.g. the number of active
  24. // connections
  25. func (s *BaseOnionService) Metrics() tapir.ServiceMetrics {
  26. s.lock.Lock()
  27. defer s.lock.Unlock()
  28. count := 0
  29. s.connections.Range(func(key, value interface{}) bool {
  30. connection := value.(tapir.Connection)
  31. if !connection.IsClosed() {
  32. count++
  33. }
  34. return true
  35. })
  36. return tapir.ServiceMetrics{
  37. ConnectionCount: count,
  38. }
  39. }
  40. // Init initializes a BaseOnionService with a given private key and identity
  41. // The private key is needed to initialize the Onion listen socket, ideally we could just pass an Identity in here.
  42. func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id *primitives.Identity) {
  43. // run add onion
  44. // get listen context
  45. s.acn = acn
  46. s.id = id
  47. s.privateKey = sk
  48. }
  49. // WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed
  50. // (through error or user action)
  51. func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) {
  52. conn, err := s.GetConnection(cid)
  53. // If there is at least one active connection, then check the one that is returned and use it
  54. for conn != nil {
  55. if conn.HasCapability(name) {
  56. return conn, nil
  57. }
  58. time.Sleep(time.Millisecond * 200)
  59. // If we received both a Connection and an Error from GetConnection that means we have multiple connections
  60. // If that was the case we refresh from the connection store as the likely case is one of them has been
  61. // closed and the other one has the capabilities we need.
  62. if err != nil {
  63. conn, err = s.GetConnection(cid)
  64. }
  65. }
  66. // There are no active connections with that hostname
  67. return nil, err
  68. }
  69. // GetConnection returns a connection for a given hostname.
  70. func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, error) {
  71. conn := make([]tapir.Connection, 0)
  72. s.connections.Range(func(key, value interface{}) bool {
  73. connection := value.(tapir.Connection)
  74. if connection.Hostname() == hostname {
  75. if !connection.IsClosed() {
  76. conn = append(conn, connection)
  77. return true
  78. }
  79. }
  80. return true
  81. })
  82. if len(conn) == 0 {
  83. return nil, errors.New("no connection found")
  84. }
  85. if len(conn) > 1 {
  86. // If there are multiple connections we return the first one but also notify the user (this should be a
  87. // temporary edgecase (see Connect))
  88. return conn[0], errors.New("multiple connections found")
  89. }
  90. return conn[0], nil
  91. }
  92. // Connect initializes a new outbound connection to the given peer, using the defined Application
  93. func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) {
  94. _, err := s.GetConnection(hostname)
  95. if err == nil {
  96. // Note: This check is not 100% reliable. And we may end up with two connections between peers
  97. // This can happen when a client connects to a server as the server is connecting to the client
  98. // Because at the start of the connection the server cannot derive the true hostname of the client until it
  99. // has auth'd
  100. // We mitigate this by performing multiple checks when Connect'ing
  101. return true, errors.New("already connected to " + hostname)
  102. }
  103. // connects to a remote server
  104. // spins off to a connection struct
  105. log.Debugf("Connecting to %v", hostname)
  106. conn, _, err := s.acn.Open(hostname)
  107. if err == nil {
  108. connectionID := s.getNewConnectionID()
  109. // Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because
  110. // the auth protocol is quick and Open over onion connections can take some time.
  111. // Again this isn't 100% reliable.
  112. _, err := s.GetConnection(hostname)
  113. if err == nil {
  114. conn.Close()
  115. return true, errors.New("already connected to " + hostname)
  116. }
  117. log.Debugf("Connected to %v [%v]", hostname, connectionID)
  118. s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
  119. return true, nil
  120. }
  121. log.Debugf("Error connecting to %v %v", hostname, err)
  122. return false, err
  123. }
  124. func (s *BaseOnionService) getNewConnectionID() string {
  125. id := make([]byte, 10)
  126. rand.Read(id)
  127. connectionID := "connection-" + base64.StdEncoding.EncodeToString(id)
  128. return connectionID
  129. }
  130. // Listen starts a blocking routine that waits for incoming connections and initializes connections with them based
  131. // on the given Application.
  132. func (s *BaseOnionService) Listen(app tapir.Application) error {
  133. // accepts a new connection
  134. // spins off to a connection struct
  135. s.lock.Lock()
  136. ls, err := s.acn.Listen(s.privateKey, 9878)
  137. s.ls = ls
  138. log.Debugf("Starting a service on %v ", ls.AddressFull())
  139. s.lock.Unlock()
  140. if err == nil {
  141. for {
  142. conn, err := s.ls.Accept()
  143. if err == nil {
  144. tempHostname := s.getNewConnectionID()
  145. log.Debugf("Accepted connection from %v", tempHostname)
  146. s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
  147. } else {
  148. log.Debugf("Error accepting connection %v", err)
  149. return err
  150. }
  151. }
  152. }
  153. log.Debugf("Error listening to connection %v", err)
  154. return err
  155. }
  156. // Shutdown closes the service and ensures that any connections are closed.
  157. func (s *BaseOnionService) Shutdown() {
  158. s.lock.Lock()
  159. defer s.lock.Unlock()
  160. if s.ls != nil {
  161. s.ls.Close()
  162. }
  163. s.connections.Range(func(key, value interface{}) bool {
  164. connection := value.(tapir.Connection)
  165. connection.Close()
  166. return true
  167. })
  168. }
  169. // Broadcast sends a message to all connections who possess the given capability
  170. func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
  171. s.lock.Lock()
  172. defer s.lock.Unlock()
  173. s.connections.Range(func(key, value interface{}) bool {
  174. connection := value.(tapir.Connection)
  175. if connection.HasCapability(capability) {
  176. connection.Send(message)
  177. }
  178. return true
  179. })
  180. return nil
  181. }