Tapir provides a framework for building Anonymous / metadata resistant Services
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

205 lines
6.5 KiB

  1. package tor
  2. import (
  3. "crypto/rand"
  4. "encoding/base64"
  5. "errors"
  6. "git.openprivacy.ca/cwtch.im/tapir"
  7. "git.openprivacy.ca/cwtch.im/tapir/primitives"
  8. "git.openprivacy.ca/openprivacy/connectivity"
  9. "git.openprivacy.ca/openprivacy/log"
  10. "golang.org/x/crypto/ed25519"
  11. "sync"
  12. "time"
  13. )
  14. // BaseOnionService is a concrete implementation of the service interface over Tor onion services.
  15. type BaseOnionService struct {
  16. connections sync.Map
  17. acn connectivity.ACN
  18. id *primitives.Identity
  19. privateKey ed25519.PrivateKey
  20. ls connectivity.ListenService
  21. lock sync.Mutex
  22. port int
  23. }
  24. // Metrics provides a report of useful information about the status of the service e.g. the number of active
  25. // connections
  26. func (s *BaseOnionService) Metrics() tapir.ServiceMetrics {
  27. s.lock.Lock()
  28. defer s.lock.Unlock()
  29. count := 0
  30. s.connections.Range(func(key, value interface{}) bool {
  31. connection := value.(tapir.Connection)
  32. if !connection.IsClosed() {
  33. count++
  34. }
  35. return true
  36. })
  37. return tapir.ServiceMetrics{
  38. ConnectionCount: count,
  39. }
  40. }
  41. // Init initializes a BaseOnionService with a given private key and identity
  42. // The private key is needed to initialize the Onion listen socket, ideally we could just pass an Identity in here.
  43. func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id *primitives.Identity) {
  44. // run add onion
  45. // get listen context
  46. s.acn = acn
  47. s.id = id
  48. s.privateKey = sk
  49. s.port = 9878
  50. }
  51. // SetPort configures the port that the service uses.
  52. func (s *BaseOnionService) SetPort(port int) {
  53. s.port = port
  54. }
  55. // WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed
  56. // (through error or user action)
  57. func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) {
  58. conn, err := s.GetConnection(cid)
  59. // If there is at least one active connection, then check the one that is returned and use it
  60. for conn != nil {
  61. if conn.HasCapability(name) {
  62. return conn, nil
  63. }
  64. time.Sleep(time.Millisecond * 200)
  65. // If we received both a Connection and an Error from GetConnection that means we have multiple connections
  66. // If that was the case we refresh from the connection store as the likely case is one of them has been
  67. // closed and the other one has the capabilities we need.
  68. if err != nil {
  69. conn, err = s.GetConnection(cid)
  70. }
  71. }
  72. // There are no active connections with that hostname
  73. return nil, err
  74. }
  75. // GetConnection returns a connection for a given hostname.
  76. func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, error) {
  77. conn := make([]tapir.Connection, 0)
  78. s.connections.Range(func(key, value interface{}) bool {
  79. connection := value.(tapir.Connection)
  80. if connection.Hostname() == hostname {
  81. if !connection.IsClosed() {
  82. conn = append(conn, connection)
  83. return true
  84. }
  85. }
  86. return true
  87. })
  88. if len(conn) == 0 {
  89. return nil, errors.New("no connection found")
  90. }
  91. if len(conn) > 1 {
  92. // If there are multiple connections we return the first one but also notify the user (this should be a
  93. // temporary edgecase (see Connect))
  94. return conn[0], errors.New("multiple connections found")
  95. }
  96. return conn[0], nil
  97. }
  98. // Connect initializes a new outbound connection to the given peer, using the defined Application
  99. func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) {
  100. _, err := s.GetConnection(hostname)
  101. if err == nil {
  102. // Note: This check is not 100% reliable. And we may end up with two connections between peers
  103. // This can happen when a client connects to a server as the server is connecting to the client
  104. // Because at the start of the connection the server cannot derive the true hostname of the client until it
  105. // has auth'd
  106. // We mitigate this by performing multiple checks when Connect'ing
  107. return true, errors.New("already connected to " + hostname)
  108. }
  109. // connects to a remote server
  110. // spins off to a connection struct
  111. log.Debugf("Connecting to %v", hostname)
  112. conn, _, err := s.acn.Open(hostname)
  113. if err == nil {
  114. connectionID := s.getNewConnectionID()
  115. // Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because
  116. // the auth protocol is quick and Open over onion connections can take some time.
  117. // Again this isn't 100% reliable.
  118. _, err := s.GetConnection(hostname)
  119. if err == nil {
  120. conn.Close()
  121. return true, errors.New("already connected to " + hostname)
  122. }
  123. log.Debugf("Connected to %v [%v]", hostname, connectionID)
  124. s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
  125. return true, nil
  126. }
  127. log.Debugf("Error connecting to %v %v", hostname, err)
  128. return false, err
  129. }
  130. func (s *BaseOnionService) getNewConnectionID() string {
  131. id := make([]byte, 10)
  132. rand.Read(id)
  133. connectionID := "connection-" + base64.StdEncoding.EncodeToString(id)
  134. return connectionID
  135. }
  136. // Listen starts a blocking routine that waits for incoming connections and initializes connections with them based
  137. // on the given Application.
  138. func (s *BaseOnionService) Listen(app tapir.Application) error {
  139. // accepts a new connection
  140. // spins off to a connection struct
  141. s.lock.Lock()
  142. ls, err := s.acn.Listen(s.privateKey, s.port)
  143. s.ls = ls
  144. log.Debugf("Starting a service on %v ", ls.AddressFull())
  145. s.lock.Unlock()
  146. if err == nil {
  147. for {
  148. conn, err := s.ls.Accept()
  149. if err == nil {
  150. tempHostname := s.getNewConnectionID()
  151. log.Debugf("Accepted connection from %v", tempHostname)
  152. s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
  153. } else {
  154. log.Debugf("Error accepting connection %v", err)
  155. return err
  156. }
  157. }
  158. }
  159. log.Debugf("Error listening to connection %v", err)
  160. return err
  161. }
  162. // Shutdown closes the service and ensures that any connections are closed.
  163. func (s *BaseOnionService) Shutdown() {
  164. s.lock.Lock()
  165. defer s.lock.Unlock()
  166. if s.ls != nil {
  167. s.ls.Close()
  168. }
  169. s.connections.Range(func(key, value interface{}) bool {
  170. connection := value.(tapir.Connection)
  171. connection.Close()
  172. return true
  173. })
  174. }
  175. // Broadcast sends a message to all connections who possess the given capability
  176. func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
  177. s.lock.Lock()
  178. defer s.lock.Unlock()
  179. s.connections.Range(func(key, value interface{}) bool {
  180. connection := value.(tapir.Connection)
  181. if connection.HasCapability(capability) {
  182. connection.Send(message)
  183. }
  184. return true
  185. })
  186. return nil
  187. }