Refactor WaitForCapabilityOrClose
continuous-integration/drone/push Build is pending Details
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2021-09-27 14:38:01 -07:00
parent f1e3f2ca54
commit 81810ed531
1 changed files with 81 additions and 35 deletions

View File

@ -10,6 +10,7 @@ import (
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"sync" "sync"
"time"
) )
// BaseOnionService is a concrete implementation of the service interface over Tor onion services. // BaseOnionService is a concrete implementation of the service interface over Tor onion services.
@ -62,59 +63,98 @@ func (s *BaseOnionService) SetPort(port int) {
// WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed // WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed
// (through error or user action) // (through error or user action)
func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) { func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) {
conn, err := s.GetConnection(cid) attempts := 0
// If there is at least one active connection, then check the one that is returned and use it for {
for conn != nil { if attempts > 4 {
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.Hostname() == cid {
if !connection.IsClosed() {
connection.Close()
s.connections.Delete(key)
}
}
return true
})
log.Debugf("WaitForCapabilityOrClose attempts exceeded for %v all, connections closed", cid)
return nil, errors.New("failed to acquire capability after multiple attempts, forcibly closing all connections with the peer")
}
// if we have only one and it has the right capability then return if attempts > 0 {
if conn.HasCapability(name) && err == nil { // Allow connections to be torn down / closed before checking again
return conn, nil // There is no point in busy looping...
time.Sleep(time.Second * time.Duration(attempts))
}
// Increment Attempts
attempts++
log.Debugf("Lookup up a connection %v...", cid)
// Lookup the connection...
conn, err := s.GetConnection(cid)
log.Debugf("Found %v %v", conn, err)
// If there are no active connections then return an error...
if conn == nil {
log.Debugf("Now active connection found for %v", cid)
return nil, err
}
if err == nil {
// if we have only one connection and it has the desired capability then return the connection with
// no error...
if conn.HasCapability(name) {
return conn, nil
}
log.Debugf("Found 1 connections for %v, but it lacks the desired capability %v", cid, name)
continue
} }
// If We have 2 connections for the same hostname... // If We have 2 connections for the same hostname...
if err != nil { if err != nil {
log.Debugf("found duplicate connections for %v <-> %v %v", s.id.Hostname(), cid, err) log.Debugf("found duplicate connections for %v <-> %v %v", s.id.Hostname(), cid, err)
inboundCount := 0
// By convention the lowest lexicographical hostname purges all their outbounds to the higher // By convention the lowest lexicographical hostname purges all their outbounds to the higher
// hostname // hostname
// Which should only leave a single connection remaining (as we dedupe on connect too) // Which should only leave a single connection remaining (as we dedupe on connect too)
// This does allow people to attempt to guarantee being the outbound connection but due to the bidirectional // This does allow people to attempt to guarantee being the outbound connection but due to the bidirectional
// authentication this should never result in an advantage in the protocol. // authentication this should never result in an advantage in the protocol.
inboundCount := 0 // Close all outbound connections to connection
if s.id.Hostname() < cid { s.connections.Range(func(key, value interface{}) bool {
// Close all outbound connections to connection connection := value.(tapir.Connection)
if connection.Hostname() == cid {
if !connection.IsClosed() && connection.IsOutbound() && s.id.Hostname() < cid {
connection.Close()
s.connections.Delete(key)
}
if !connection.IsClosed() && !connection.IsOutbound() {
inboundCount++
}
}
return true
})
// If we have more than 1 inbound count then forcibly close all connections...
// This shouldn't happen honestly, but if it does then it can cause an infinite check here
if inboundCount > 1 {
s.connections.Range(func(key, value interface{}) bool { s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection) connection := value.(tapir.Connection)
if connection.Hostname() == cid { if connection.Hostname() == cid {
if !connection.IsClosed() && connection.IsOutbound() { if !connection.IsClosed() {
connection.Close() connection.Close()
} else if !connection.IsClosed() && !connection.IsOutbound() { s.connections.Delete(key)
inboundCount++
} }
} }
return true return true
}) })
return nil, errors.New("multiple inbound connections found and closed; the only resolution to this is to close them all and try connecting again")
// This shouldn't happen honestly, but if it does then it can cause an infinite check here
if inboundCount > 1 {
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.Hostname() == cid {
if !connection.IsClosed() {
connection.Close()
}
}
return true
})
return nil, errors.New("multiple inbound connections found and closed; the only resolution to this is to close them all and try connecting again")
}
} }
} }
// We had a connection but it didn't have a capability...try again
conn, err = s.GetConnection(cid)
} }
// There are no active connections with that hostname
return nil, err
} }
// GetConnection returns a connection for a given hostname. // GetConnection returns a connection for a given hostname.
@ -125,11 +165,14 @@ func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, err
if connection.Hostname() == hostname { if connection.Hostname() == hostname {
if !connection.IsClosed() { if !connection.IsClosed() {
conn = append(conn, connection) conn = append(conn, connection)
return true } else {
// Delete this Closed Connection
s.connections.Delete(key)
} }
} }
return true return true
}) })
if len(conn) == 0 { if len(conn) == 0 {
return nil, errors.New("no connection found") return nil, errors.New("no connection found")
} }
@ -143,7 +186,9 @@ func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, err
// Connect initializes a new outbound connection to the given peer, using the defined Application // Connect initializes a new outbound connection to the given peer, using the defined Application
func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) { func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) {
currconn, _ := s.GetConnection(hostname) currconn, err := s.GetConnection(hostname)
// We already have a connection
if currconn != nil { if currconn != nil {
// Note: This check is not 100% reliable. And we may end up with two connections between peers // Note: This check is not 100% reliable. And we may end up with two connections between peers
// This can happen when a client connects to a server as the server is connecting to the client // This can happen when a client connects to a server as the server is connecting to the client
@ -152,6 +197,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
// We mitigate this by performing multiple checks when Connect'ing // We mitigate this by performing multiple checks when Connect'ing
return true, errors.New("already connected to " + hostname) return true, errors.New("already connected to " + hostname)
} }
// connects to a remote server // connects to a remote server
// spins off to a connection struct // spins off to a connection struct
log.Debugf("Connecting to %v", hostname) log.Debugf("Connecting to %v", hostname)
@ -162,8 +208,8 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
// Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because // Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because
// the auth protocol is quick and Open over onion connections can take some time. // the auth protocol is quick and Open over onion connections can take some time.
// Again this isn't 100% reliable. // Again this isn't 100% reliable.
_, err := s.GetConnection(hostname) tconn, _ := s.GetConnection(hostname)
if err == nil { if tconn != nil {
conn.Close() conn.Close()
return true, errors.New("already connected to " + hostname) return true, errors.New("already connected to " + hostname)
} }