|
- package tor
-
- import (
- "crypto/rand"
- "cwtch.im/tapir"
- "cwtch.im/tapir/primitives"
- "encoding/base64"
- "errors"
- "git.openprivacy.ca/openprivacy/connectivity"
- "git.openprivacy.ca/openprivacy/log"
- "golang.org/x/crypto/ed25519"
- "sync"
- "time"
- )
-
- // BaseOnionService is a concrete implementation of the service interface over Tor onion services.
- type BaseOnionService struct {
- connections sync.Map
- acn connectivity.ACN
- id *primitives.Identity
- privateKey ed25519.PrivateKey
- ls connectivity.ListenService
- lock sync.Mutex
- }
-
- // Metrics provides a report of useful information about the status of the service e.g. the number of active
- // connections
- func (s *BaseOnionService) Metrics() tapir.ServiceMetrics {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- count := 0
- s.connections.Range(func(key, value interface{}) bool {
- connection := value.(tapir.Connection)
- if !connection.IsClosed() {
- count++
- }
- return true
- })
-
- return tapir.ServiceMetrics{
- ConnectionCount: count,
- }
- }
-
- // Init initializes a BaseOnionService with a given private key and identity
- // The private key is needed to initialize the Onion listen socket, ideally we could just pass an Identity in here.
- func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id *primitives.Identity) {
- // run add onion
- // get listen context
- s.acn = acn
- s.id = id
- s.privateKey = sk
- }
-
- // WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed
- // (through error or user action)
- func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) {
- conn, err := s.GetConnection(cid)
- // If there is at least one active connection, then check the one that is returned and use it
- for conn != nil {
- if conn.HasCapability(name) {
- return conn, nil
- }
- time.Sleep(time.Millisecond * 200)
- // If we received both a Connection and an Error from GetConnection that means we have multiple connections
- // If that was the case we refresh from the connection store as the likely case is one of them has been
- // closed and the other one has the capabilities we need.
- if err != nil {
- conn, err = s.GetConnection(cid)
- }
- }
- // There are no active connections with that hostname
- return nil, err
- }
-
- // GetConnection returns a connection for a given hostname.
- func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, error) {
- conn := make([]tapir.Connection, 0)
- s.connections.Range(func(key, value interface{}) bool {
- connection := value.(tapir.Connection)
- if connection.Hostname() == hostname {
- if !connection.IsClosed() {
- conn = append(conn, connection)
- return true
- }
- }
- return true
- })
- if len(conn) == 0 {
- return nil, errors.New("no connection found")
- }
- if len(conn) > 1 {
- // If there are multiple connections we return the first one but also notify the user (this should be a
- // temporary edgecase (see Connect))
- return conn[0], errors.New("multiple connections found")
- }
- return conn[0], nil
- }
-
- // Connect initializes a new outbound connection to the given peer, using the defined Application
- func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) {
- _, err := s.GetConnection(hostname)
- if err == nil {
- // Note: This check is not 100% reliable. And we may end up with two connections between peers
- // This can happen when a client connects to a server as the server is connecting to the client
- // Because at the start of the connection the server cannot derive the true hostname of the client until it
- // has auth'd
- // We mitigate this by performing multiple checks when Connect'ing
- return true, errors.New("already connected to " + hostname)
- }
- // connects to a remote server
- // spins off to a connection struct
- log.Debugf("Connecting to %v", hostname)
- conn, _, err := s.acn.Open(hostname)
- if err == nil {
- connectionID := s.getNewConnectionID()
-
- // Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because
- // the auth protocol is quick and Open over onion connections can take some time.
- // Again this isn't 100% reliable.
- _, err := s.GetConnection(hostname)
- if err == nil {
- conn.Close()
- return true, errors.New("already connected to " + hostname)
- }
-
- log.Debugf("Connected to %v [%v]", hostname, connectionID)
- s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
- return true, nil
- }
- log.Debugf("Error connecting to %v %v", hostname, err)
- return false, err
- }
-
- func (s *BaseOnionService) getNewConnectionID() string {
- id := make([]byte, 10)
- rand.Read(id)
- connectionID := "connection-" + base64.StdEncoding.EncodeToString(id)
- return connectionID
- }
-
- // Listen starts a blocking routine that waits for incoming connections and initializes connections with them based
- // on the given Application.
- func (s *BaseOnionService) Listen(app tapir.Application) error {
- // accepts a new connection
- // spins off to a connection struct
- s.lock.Lock()
- ls, err := s.acn.Listen(s.privateKey, 9878)
- s.ls = ls
- log.Debugf("Starting a service on %v ", ls.AddressFull())
- s.lock.Unlock()
-
- if err == nil {
- for {
- conn, err := s.ls.Accept()
- if err == nil {
- tempHostname := s.getNewConnectionID()
- log.Debugf("Accepted connection from %v", tempHostname)
- s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
- } else {
- log.Debugf("Error accepting connection %v", err)
- return err
- }
- }
- }
- log.Debugf("Error listening to connection %v", err)
- return err
- }
-
- // Shutdown closes the service and ensures that any connections are closed.
- func (s *BaseOnionService) Shutdown() {
- s.lock.Lock()
- defer s.lock.Unlock()
- if s.ls != nil {
- s.ls.Close()
- }
- s.connections.Range(func(key, value interface{}) bool {
- connection := value.(tapir.Connection)
- connection.Close()
- return true
- })
- }
-
- // Broadcast sends a message to all connections who possess the given capability
- func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
- s.lock.Lock()
- defer s.lock.Unlock()
- s.connections.Range(func(key, value interface{}) bool {
- connection := value.(tapir.Connection)
- if connection.HasCapability(capability) {
- connection.Send(message)
- }
- return true
- })
- return nil
- }
|