Merge pull request 'Refactor WaitForCapabilityOrClose' (#43) from wait into master
continuous-integration/drone/push Build is pending Details
continuous-integration/drone/tag Build is pending Details

Reviewed-on: #43
Reviewed-by: erinn <erinn@openprivacy.ca>
This commit is contained in:
erinn 2021-09-27 22:09:35 +00:00
commit aae071a121
2 changed files with 81 additions and 36 deletions

2
go.mod
View File

@ -2,7 +2,7 @@ module git.openprivacy.ca/cwtch.im/tapir
require ( require (
git.openprivacy.ca/openprivacy/connectivity v1.4.5 git.openprivacy.ca/openprivacy/connectivity v1.4.5
git.openprivacy.ca/openprivacy/log v1.0.2 git.openprivacy.ca/openprivacy/log v1.0.3
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gtank/merlin v0.1.1 github.com/gtank/merlin v0.1.1
github.com/gtank/ristretto255 v0.1.2 github.com/gtank/ristretto255 v0.1.2

View File

@ -10,6 +10,7 @@ import (
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"sync" "sync"
"time"
) )
// BaseOnionService is a concrete implementation of the service interface over Tor onion services. // BaseOnionService is a concrete implementation of the service interface over Tor onion services.
@ -62,59 +63,97 @@ func (s *BaseOnionService) SetPort(port int) {
// WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed // WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed
// (through error or user action) // (through error or user action)
func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) { func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name tapir.Capability) (tapir.Connection, error) {
conn, err := s.GetConnection(cid) attempts := 0
// If there is at least one active connection, then check the one that is returned and use it for {
for conn != nil { if attempts > 4 {
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.Hostname() == cid {
if !connection.IsClosed() {
connection.Close()
s.connections.Delete(key)
}
}
return true
})
log.Debugf("WaitForCapabilityOrClose attempts exceeded for %v, all connections closed", cid)
return nil, errors.New("failed to acquire capability after multiple attempts, forcibly closing all connections with the peer")
}
// if we have only one and it has the right capability then return if attempts > 0 {
if conn.HasCapability(name) && err == nil { // Allow connections to be torn down / closed before checking again
return conn, nil // There is no point in busy looping...
time.Sleep(time.Second * time.Duration(attempts))
}
// Increment Attempts
attempts++
log.Debugf("Lookup up a connection %v...", cid)
// Lookup the connection...
conn, err := s.GetConnection(cid)
// If there are no active connections then return an error...
if conn == nil {
log.Debugf("no active connection found for %v", cid)
return nil, err
}
if err == nil {
// if we have only one connection and it has the desired capability then return the connection with
// no error...
if conn.HasCapability(name) {
return conn, nil
}
log.Debugf("Found 1 connections for %v, but it lacks the desired capability %v", cid, name)
continue
} }
// If We have 2 connections for the same hostname... // If We have 2 connections for the same hostname...
if err != nil { if err != nil {
log.Debugf("found duplicate connections for %v <-> %v %v", s.id.Hostname(), cid, err) log.Debugf("found duplicate connections for %v <-> %v %v", s.id.Hostname(), cid, err)
inboundCount := 0
// By convention the lowest lexicographical hostname purges all their outbounds to the higher // By convention the lowest lexicographical hostname purges all their outbounds to the higher
// hostname // hostname
// Which should only leave a single connection remaining (as we dedupe on connect too) // Which should only leave a single connection remaining (as we dedupe on connect too)
// This does allow people to attempt to guarantee being the outbound connection but due to the bidirectional // This does allow people to attempt to guarantee being the outbound connection but due to the bidirectional
// authentication this should never result in an advantage in the protocol. // authentication this should never result in an advantage in the protocol.
inboundCount := 0 // Close all outbound connections to connection
if s.id.Hostname() < cid { s.connections.Range(func(key, value interface{}) bool {
// Close all outbound connections to connection connection := value.(tapir.Connection)
if connection.Hostname() == cid {
if !connection.IsClosed() && connection.IsOutbound() && s.id.Hostname() < cid {
connection.Close()
s.connections.Delete(key)
}
if !connection.IsClosed() && !connection.IsOutbound() {
inboundCount++
}
}
return true
})
// If we have more than 1 inbound count then forcibly close all connections...
// This shouldn't happen honestly, but if it does then it can cause an infinite check here
if inboundCount > 1 {
s.connections.Range(func(key, value interface{}) bool { s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection) connection := value.(tapir.Connection)
if connection.Hostname() == cid { if connection.Hostname() == cid {
if !connection.IsClosed() && connection.IsOutbound() { if !connection.IsClosed() {
connection.Close() connection.Close()
} else if !connection.IsClosed() && !connection.IsOutbound() { s.connections.Delete(key)
inboundCount++
} }
} }
return true return true
}) })
return nil, errors.New("multiple inbound connections found and closed; the only resolution to this is to close them all and try connecting again")
// This shouldn't happen honestly, but if it does then it can cause an infinite check here
if inboundCount > 1 {
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.Hostname() == cid {
if !connection.IsClosed() {
connection.Close()
}
}
return true
})
return nil, errors.New("multiple inbound connections found and closed; the only resolution to this is to close them all and try connecting again")
}
} }
} }
// We had a connection but it didn't have a capability...try again
conn, err = s.GetConnection(cid)
} }
// There are no active connections with that hostname
return nil, err
} }
// GetConnection returns a connection for a given hostname. // GetConnection returns a connection for a given hostname.
@ -125,11 +164,14 @@ func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, err
if connection.Hostname() == hostname { if connection.Hostname() == hostname {
if !connection.IsClosed() { if !connection.IsClosed() {
conn = append(conn, connection) conn = append(conn, connection)
return true } else {
// Delete this Closed Connection
s.connections.Delete(key)
} }
} }
return true return true
}) })
if len(conn) == 0 { if len(conn) == 0 {
return nil, errors.New("no connection found") return nil, errors.New("no connection found")
} }
@ -143,7 +185,9 @@ func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, err
// Connect initializes a new outbound connection to the given peer, using the defined Application // Connect initializes a new outbound connection to the given peer, using the defined Application
func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) { func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) {
currconn, _ := s.GetConnection(hostname) currconn,_ := s.GetConnection(hostname)
// We already have a connection
if currconn != nil { if currconn != nil {
// Note: This check is not 100% reliable. And we may end up with two connections between peers // Note: This check is not 100% reliable. And we may end up with two connections between peers
// This can happen when a client connects to a server as the server is connecting to the client // This can happen when a client connects to a server as the server is connecting to the client
@ -152,6 +196,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
// We mitigate this by performing multiple checks when Connect'ing // We mitigate this by performing multiple checks when Connect'ing
return true, errors.New("already connected to " + hostname) return true, errors.New("already connected to " + hostname)
} }
// connects to a remote server // connects to a remote server
// spins off to a connection struct // spins off to a connection struct
log.Debugf("Connecting to %v", hostname) log.Debugf("Connecting to %v", hostname)
@ -162,8 +207,8 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
// Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because // Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because
// the auth protocol is quick and Open over onion connections can take some time. // the auth protocol is quick and Open over onion connections can take some time.
// Again this isn't 100% reliable. // Again this isn't 100% reliable.
_, err := s.GetConnection(hostname) tconn, _ := s.GetConnection(hostname)
if err == nil { if tconn != nil {
conn.Close() conn.Close()
return true, errors.New("already connected to " + hostname) return true, errors.New("already connected to " + hostname)
} }