package tor import ( "context" "crypto/rand" "encoding/base64" "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/primitives" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "golang.org/x/crypto/ed25519" "sync" "time" ) // BaseOnionService is a concrete implementation of the service interface over Tor onion services. type BaseOnionService struct { connections sync.Map acn connectivity.ACN id *primitives.Identity privateKey ed25519.PrivateKey ls connectivity.ListenService lock sync.Mutex port int shutdownChannel chan bool } // Metrics provides a report of useful information about the status of the service e.g. the number of active // connections func (s *BaseOnionService) Metrics() tapir.ServiceMetrics { s.lock.Lock() defer s.lock.Unlock() count := 0 s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) if !connection.IsClosed() { count++ } return true }) return tapir.ServiceMetrics{ ConnectionCount: count, } } // Init initializes a BaseOnionService with a given private key and identity // The private key is needed to initialize the Onion listen socket, ideally we could just pass an Identity in here. func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id *primitives.Identity) { // run add onion // get listen context s.acn = acn s.id = id s.privateKey = sk s.port = 9878 // blocking so we can wait on shutdown for closing of this goroutine s.shutdownChannel = make(chan bool) go func() { for s.waitOrTimeout() { s.GarbageCollect() } log.Debugf("closing down garbage collection goroutine") }() } func (s *BaseOnionService) waitOrTimeout() bool { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() select { case <-s.shutdownChannel: return false case <-ctx.Done(): return true } } // SetPort configures the port that the service uses. func (s *BaseOnionService) SetPort(port int) { s.port = port } // WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed // (through error or user action) func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) { attempts := 0 for { if attempts > 4 { s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) if connection.Hostname() == cid { if !connection.IsClosed() { connection.Close() s.connections.Delete(key) } } return true }) log.Debugf("WaitForCapabilityOrClose attempts exceeded for %v, all connections closed", cid) return nil, errors.New("failed to acquire capability after multiple attempts, forcibly closing all connections with the peer") } if attempts > 0 { // Allow connections to be torn down / closed before checking again // There is no point in busy looping... time.Sleep(time.Second * time.Duration(attempts)) } // Increment Attempts attempts++ log.Debugf("Lookup up a connection %v...", cid) // Lookup the connection... conn, err := s.GetConnection(cid) // If there are no active connections then return an error... if conn == nil { log.Debugf("no active connection found for %v", cid) return nil, err } if err == nil { // if we have only one connection and it has the desired capability then return the connection with // no error... if conn.HasCapability(name) { return conn, nil } log.Debugf("Found 1 connections for %v, but it lacks the desired capability %v", cid, name) continue } // If We have 2 connections for the same hostname... if err != nil { log.Debugf("found duplicate connections for %v <-> %v %v", s.id.Hostname(), cid, err) inboundCount := 0 // By convention the lowest lexicographical hostname purges all their outbounds to the higher // hostname // Which should only leave a single connection remaining (as we dedupe on connect too) // This does allow people to attempt to guarantee being the outbound connection but due to the bidirectional // authentication this should never result in an advantage in the protocol. // Close all outbound connections to connection s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) if connection.Hostname() == cid { if !connection.IsClosed() && connection.IsOutbound() && s.id.Hostname() < cid { connection.Close() s.connections.Delete(key) } if !connection.IsClosed() && !connection.IsOutbound() { inboundCount++ } } return true }) // If we have more than 1 inbound count then forcibly close all connections... // This shouldn't happen honestly, but if it does then it can cause an infinite check here if inboundCount > 1 { s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) if connection.Hostname() == cid { if !connection.IsClosed() { connection.Close() s.connections.Delete(key) } } return true }) return nil, errors.New("multiple inbound connections found and closed; the only resolution to this is to close them all and try connecting again") } } } } // GetConnection returns a connection for a given hostname. func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, error) { conn := make([]tapir.Connection, 0) s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) if connection.Hostname() == hostname { if !connection.IsClosed() { conn = append(conn, connection) } else { // Delete this Closed Connection s.connections.Delete(key) } } return true }) if len(conn) == 0 { return nil, errors.New("no connection found") } if len(conn) > 1 { // If there are multiple connections we return the first one but also notify the user (this should be a // temporary edgecase (see Connect)) return conn[0], errors.New("multiple connections found") } return conn[0], nil } // GarbageCollect iterates through the connection pool and cleans up any connections that are closed // that haven't been removed from the map. func (s *BaseOnionService) GarbageCollect() { log.Debugf("running garbage collection...") s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) if connection.IsClosed() { // Delete this Closed Connection s.connections.Delete(key) } return true }) } // Connect initializes a new outbound connection to the given peer, using the defined Application func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) { currconn, _ := s.GetConnection(hostname) // We already have a connection if currconn != nil { // Note: This check is not 100% reliable. And we may end up with two connections between peers // This can happen when a client connects to a server as the server is connecting to the client // Because at the start of the connection the server cannot derive the true hostname of the client until it // has auth'd // We mitigate this by performing multiple checks when Connect'ing return true, errors.New("already connected to " + hostname) } // connects to a remote server // spins off to a connection struct log.Debugf("Connecting to %v", hostname) conn, _, err := s.acn.Open(hostname) if err == nil { connectionID := s.getNewConnectionID() // Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because // the auth protocol is quick and Open over onion connections can take some time. // Again this isn't 100% reliable. tconn, _ := s.GetConnection(hostname) if tconn != nil { conn.Close() return true, errors.New("already connected to " + hostname) } log.Debugf("Connected to %v [%v]", hostname, connectionID) s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance())) return true, nil } log.Debugf("Error connecting to %v %v", hostname, err) return false, err } func (s *BaseOnionService) getNewConnectionID() string { id := make([]byte, 10) rand.Read(id) connectionID := "connection-" + base64.StdEncoding.EncodeToString(id) return connectionID } // Listen starts a blocking routine that waits for incoming connections and initializes connections with them based // on the given Application. func (s *BaseOnionService) Listen(app tapir.Application) error { // accepts a new connection // spins off to a connection struct s.lock.Lock() ls, err := s.acn.Listen(s.privateKey, s.port) s.ls = ls s.lock.Unlock() if err == nil { log.Debugf("Starting a service on %v ", s.ls.AddressFull()) for { conn, err := s.ls.Accept() if err == nil { tempHostname := s.getNewConnectionID() log.Debugf("Accepted connection from %v", tempHostname) s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance())) } else { log.Debugf("Error accepting connection %v", err) return err } } } log.Debugf("Error listening to connection %v", err) return err } // Shutdown closes the service and ensures that any connections are closed. func (s *BaseOnionService) Shutdown() { s.lock.Lock() defer s.lock.Unlock() if s.ls != nil { s.ls.Close() } // close all existing connections manually s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) connection.Close() return true }) // wait for the return of our garbage collection goroutine s.shutdownChannel <- true } // Broadcast sends a message to all connections who possess the given capability func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error { s.lock.Lock() defer s.lock.Unlock() s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) if connection.HasCapability(capability) { connection.Send(message) } return true }) return nil }