First cut of connection-deduplication
This commit is contained in:
parent
1fec260185
commit
17871cade7
31
cmd/main.go
31
cmd/main.go
|
@ -37,6 +37,20 @@ func (ea SimpleApp) Init(connection *tapir.Connection) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckConnection is a simple test that GetConnection is working.
|
||||||
|
func CheckConnection(service tapir.Service, hostname string) {
|
||||||
|
for {
|
||||||
|
_, err := service.GetConnection(hostname)
|
||||||
|
if err == nil {
|
||||||
|
log.Infof("Authed!")
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
log.Errorf("Error %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
log.SetLevel(log.LevelDebug)
|
log.SetLevel(log.LevelDebug)
|
||||||
|
@ -53,17 +67,18 @@ func main() {
|
||||||
id := identity.InitializeV3("server", &sk, &pk)
|
id := identity.InitializeV3("server", &sk, &pk)
|
||||||
|
|
||||||
// Init a Client to Connect to the Server
|
// Init a Client to Connect to the Server
|
||||||
go client(acn, pubkey)
|
client, clienthostname := genclient(acn)
|
||||||
|
go connectclient(client, pubkey)
|
||||||
|
|
||||||
// Init the Server running the Simple App.
|
// Init the Server running the Simple App.
|
||||||
var service tapir.Service
|
var service tapir.Service
|
||||||
service = new(tor.BaseOnionService)
|
service = new(tor.BaseOnionService)
|
||||||
service.Init(acn, sk, id)
|
service.Init(acn, sk, id)
|
||||||
|
go CheckConnection(service, clienthostname)
|
||||||
service.Listen(SimpleApp{})
|
service.Listen(SimpleApp{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Client will Connect and launch it's own Echo App goroutine.
|
func genclient(acn connectivity.ACN) (tapir.Service, string) {
|
||||||
func client(acn connectivity.ACN, key ed25519.PublicKey) {
|
|
||||||
pubkey, privateKey, _ := ed25519.GenerateKey(rand.Reader)
|
pubkey, privateKey, _ := ed25519.GenerateKey(rand.Reader)
|
||||||
sk := ed25519.PrivateKey(privateKey)
|
sk := ed25519.PrivateKey(privateKey)
|
||||||
pk := ed25519.PublicKey(pubkey)
|
pk := ed25519.PublicKey(pubkey)
|
||||||
|
@ -71,13 +86,19 @@ func client(acn connectivity.ACN, key ed25519.PublicKey) {
|
||||||
var client tapir.Service
|
var client tapir.Service
|
||||||
client = new(tor.BaseOnionService)
|
client = new(tor.BaseOnionService)
|
||||||
client.Init(acn, sk, id)
|
client.Init(acn, sk, id)
|
||||||
cid, _ := client.Connect(utils.GetTorV3Hostname(key), SimpleApp{})
|
return client, utils.GetTorV3Hostname(pk)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client will Connect and launch it's own Echo App goroutine.
|
||||||
|
func connectclient(client tapir.Service, key ed25519.PublicKey) {
|
||||||
|
|
||||||
|
client.Connect(utils.GetTorV3Hostname(key), SimpleApp{})
|
||||||
|
|
||||||
// Once connected, it shouldn't take long to authenticate and run the application. So for the purposes of this demo
|
// Once connected, it shouldn't take long to authenticate and run the application. So for the purposes of this demo
|
||||||
// we will wait a little while then exit.
|
// we will wait a little while then exit.
|
||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
|
|
||||||
conn, _ := client.GetConnection(cid)
|
conn, _ := client.GetConnection(utils.GetTorV3Hostname(key))
|
||||||
log.Debugf("Client has Auth: %v", conn.HasCapability(applications.AuthCapability))
|
log.Debugf("Client has Auth: %v", conn.HasCapability(applications.AuthCapability))
|
||||||
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
|
|
|
@ -50,23 +50,51 @@ func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name string) (*t
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetConnection returns a connection for a given hostname.
|
// GetConnection returns a connection for a given hostname.
|
||||||
func (s *BaseOnionService) GetConnection(connectionID string) (*tapir.Connection, error) {
|
func (s *BaseOnionService) GetConnection(hostname string) (*tapir.Connection, error) {
|
||||||
conn, ok := s.connections.Load(connectionID)
|
var conn *tapir.Connection
|
||||||
if !ok {
|
s.connections.Range(func(key, value interface{}) bool {
|
||||||
|
connection := value.(*tapir.Connection)
|
||||||
|
if connection.Hostname == hostname {
|
||||||
|
if !connection.Closed {
|
||||||
|
conn = connection
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if conn == nil {
|
||||||
return nil, errors.New("no connection found")
|
return nil, errors.New("no connection found")
|
||||||
}
|
}
|
||||||
connection := conn.(*tapir.Connection)
|
return conn, nil
|
||||||
return connection, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect initializes a new outbound connection to the given peer, using the defined Application
|
// Connect initializes a new outbound connection to the given peer, using the defined Application
|
||||||
func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (string, error) {
|
func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (string, error) {
|
||||||
|
_, err := s.GetConnection(hostname)
|
||||||
|
if err == nil {
|
||||||
|
// Note: This check is not 100% reliable. And we may end up with two connections between peers
|
||||||
|
// This can happen when a client connects to a server as the server is connecting to the client
|
||||||
|
// Because at the start of the connection the server cannot derive the true hostname of the client until it
|
||||||
|
// has auth'd
|
||||||
|
// We mitigate this by performing multiple checks when Connect'ing
|
||||||
|
return "", errors.New("already connected to " + hostname)
|
||||||
|
}
|
||||||
// connects to a remote server
|
// connects to a remote server
|
||||||
// spins off to a connection struct
|
// spins off to a connection struct
|
||||||
log.Debugf("Connecting to %v", hostname)
|
log.Debugf("Connecting to %v", hostname)
|
||||||
conn, _, err := s.acn.Open(hostname)
|
conn, _, err := s.acn.Open(hostname)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
connectionID := s.getNewConnectionID()
|
connectionID := s.getNewConnectionID()
|
||||||
|
|
||||||
|
// Second check. If we didn't catch a double connection attempt before the Open we *should* catch it now because
|
||||||
|
// the auth protocol is quick and Open over onion connections can take some time.
|
||||||
|
// Again this isn't 100% reliable.
|
||||||
|
_, err := s.GetConnection(hostname)
|
||||||
|
if err == nil {
|
||||||
|
conn.Close()
|
||||||
|
return "", errors.New("already connected to " + hostname)
|
||||||
|
}
|
||||||
|
|
||||||
log.Debugf("Connected to %v [%v]", hostname, connectionID)
|
log.Debugf("Connected to %v [%v]", hostname, connectionID)
|
||||||
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance()))
|
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance()))
|
||||||
return connectionID, nil
|
return connectionID, nil
|
||||||
|
|
16
service.go
16
service.go
|
@ -24,7 +24,7 @@ type Service interface {
|
||||||
|
|
||||||
// Connection defines a Tapir Connection
|
// Connection defines a Tapir Connection
|
||||||
type Connection struct {
|
type Connection struct {
|
||||||
hostname string
|
Hostname string
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
capabilities sync.Map
|
capabilities sync.Map
|
||||||
encrypted bool
|
encrypted bool
|
||||||
|
@ -39,7 +39,7 @@ type Connection struct {
|
||||||
// NewConnection creates a new Connection
|
// NewConnection creates a new Connection
|
||||||
func NewConnection(id identity.Identity, hostname string, outbound bool, conn net.Conn, app Application) *Connection {
|
func NewConnection(id identity.Identity, hostname string, outbound bool, conn net.Conn, app Application) *Connection {
|
||||||
connection := new(Connection)
|
connection := new(Connection)
|
||||||
connection.hostname = hostname
|
connection.Hostname = hostname
|
||||||
connection.conn = conn
|
connection.conn = conn
|
||||||
connection.App = app
|
connection.App = app
|
||||||
connection.ID = id
|
connection.ID = id
|
||||||
|
@ -51,13 +51,13 @@ func NewConnection(id identity.Identity, hostname string, outbound bool, conn ne
|
||||||
|
|
||||||
// SetHostname sets the hostname on the connection
|
// SetHostname sets the hostname on the connection
|
||||||
func (c *Connection) SetHostname(hostname string) {
|
func (c *Connection) SetHostname(hostname string) {
|
||||||
log.Debugf("[%v -- %v] Asserting Remote Hostname: %v", c.ID.Hostname(), c.hostname, hostname)
|
log.Debugf("[%v -- %v] Asserting Remote Hostname: %v", c.ID.Hostname(), c.Hostname, hostname)
|
||||||
c.hostname = hostname
|
c.Hostname = hostname
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCapability sets a capability on the connection
|
// SetCapability sets a capability on the connection
|
||||||
func (c *Connection) SetCapability(name string) {
|
func (c *Connection) SetCapability(name string) {
|
||||||
log.Debugf("[%v -- %v] Setting Capability %v", c.ID.Hostname(), c.hostname, name)
|
log.Debugf("[%v -- %v] Setting Capability %v", c.ID.Hostname(), c.Hostname, name)
|
||||||
c.capabilities.Store(name, true)
|
c.capabilities.Store(name, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ func (c *Connection) Expect() []byte {
|
||||||
n, err := io.ReadFull(c.conn, buffer)
|
n, err := io.ReadFull(c.conn, buffer)
|
||||||
|
|
||||||
if n != c.MaxLength || err != nil {
|
if n != c.MaxLength || err != nil {
|
||||||
log.Errorf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.hostname, c.ID.Hostname(), n, err)
|
log.Errorf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.Hostname, c.ID.Hostname(), n, err)
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.Closed = true
|
c.Closed = true
|
||||||
return []byte{}
|
return []byte{}
|
||||||
|
@ -91,7 +91,7 @@ func (c *Connection) Expect() []byte {
|
||||||
if ok {
|
if ok {
|
||||||
copy(buffer, decrypted)
|
copy(buffer, decrypted)
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.hostname, c.ID.Hostname())
|
log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.Hostname, c.ID.Hostname())
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.Closed = true
|
c.Closed = true
|
||||||
return []byte{}
|
return []byte{}
|
||||||
|
@ -126,7 +126,7 @@ func (c *Connection) Send(message []byte) {
|
||||||
encrypted := secretbox.Seal(nonce[:], buffer[0:c.MaxLength-40], &nonce, &c.key)
|
encrypted := secretbox.Seal(nonce[:], buffer[0:c.MaxLength-40], &nonce, &c.key)
|
||||||
copy(buffer, encrypted[0:c.MaxLength])
|
copy(buffer, encrypted[0:c.MaxLength])
|
||||||
}
|
}
|
||||||
log.Debugf("[%v -> %v] Wire Send %x", c.ID.Hostname(), c.hostname, buffer)
|
log.Debugf("[%v -> %v] Wire Send %x", c.ID.Hostname(), c.Hostname, buffer)
|
||||||
_, err := c.conn.Write(buffer)
|
_, err := c.conn.Write(buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
|
|
Loading…
Reference in New Issue