From 66d6b0b51eeedb16d14c911b079f50102e393cd0 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 14 Jul 2020 14:23:27 -0700 Subject: [PATCH 1/2] Add Broacast --- applications/auth_test.go | 4 ++++ networks/tor/BaseOnionService.go | 18 ++++++++++++++++-- service.go | 13 +++++++++++-- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/applications/auth_test.go b/applications/auth_test.go index bd5129a..ef5f08f 100644 --- a/applications/auth_test.go +++ b/applications/auth_test.go @@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool { panic("implement me") } +func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error { + panic("implement me") +} + func TestAuthApp_Failed(t *testing.T) { var authApp AuthApp ai := authApp.NewInstance() diff --git a/networks/tor/BaseOnionService.go b/networks/tor/BaseOnionService.go index f9e0080..98ed5ee 100644 --- a/networks/tor/BaseOnionService.go +++ b/networks/tor/BaseOnionService.go @@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool } log.Debugf("Connected to %v [%v]", hostname, connectionID) - s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance())) + s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance(), s)) return true, nil } log.Debugf("Error connecting to %v %v", hostname, err) @@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error { if err == nil { tempHostname := s.getNewConnectionID() log.Debugf("Accepted connection from %v", tempHostname) - s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance())) + s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance(), s)) } else { log.Debugf("Error accepting connection %v", err) return err @@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() { return true }) } + +// Broadcast sends a message to all connections who possess the given capability +func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error { + s.lock.Lock() + defer s.lock.Unlock() + s.connections.Range(func(key, value interface{}) bool { + connection := value.(tapir.Connection) + if connection.HasCapability(capability) { + connection.Send(message) + } + return true + }) + return nil +} diff --git a/service.go b/service.go index 80da505..22452b2 100644 --- a/service.go +++ b/service.go @@ -18,6 +18,7 @@ type Service interface { Connect(hostname string, application Application) (bool, error) Listen(application Application) error GetConnection(connectionID string) (Connection, error) + Broadcast(message []byte, capability Capability) error WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error) Shutdown() } @@ -37,6 +38,7 @@ type Connection interface { App() Application SetApp(application Application) IsClosed() bool + Broadcast(message []byte, capability Capability) error } // Connection defines a Tapir Connection @@ -52,17 +54,19 @@ type connection struct { closed bool MaxLength int lock sync.Mutex + service Service } // NewConnection creates a new Connection -func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection { +func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application, service Service) Connection { connection := new(connection) connection.hostname = hostname connection.conn = conn connection.app = app connection.identity = id connection.outbound = outbound - connection.MaxLength = 1024 + connection.MaxLength = 8192 + connection.service = service go connection.app.Init(connection) return connection } @@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) { c.closed = true } } + +// Broadcast sends a message to all active service connections with a given capability +func (c *connection) Broadcast(message []byte, capability Capability) error { + return c.service.Broadcast(message, capability) +} From 8eeea02997bde3844e173949f36fde33d939472b Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 14 Jul 2020 14:59:08 -0700 Subject: [PATCH 2/2] Update Signature --- networks/tor/BaseOnionService.go | 4 ++-- persistence/test.dbgi | Bin 0 -> 32768 bytes service.go | 2 +- .../tapir_malicious_remote_integration_test.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) create mode 100644 persistence/test.dbgi diff --git a/networks/tor/BaseOnionService.go b/networks/tor/BaseOnionService.go index 98ed5ee..fe9c2e5 100644 --- a/networks/tor/BaseOnionService.go +++ b/networks/tor/BaseOnionService.go @@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool } log.Debugf("Connected to %v [%v]", hostname, connectionID) - s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance(), s)) + s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance())) return true, nil } log.Debugf("Error connecting to %v %v", hostname, err) @@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error { if err == nil { tempHostname := s.getNewConnectionID() log.Debugf("Accepted connection from %v", tempHostname) - s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance(), s)) + s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance())) } else { log.Debugf("Error accepting connection %v", err) return err diff --git a/persistence/test.dbgi b/persistence/test.dbgi new file mode 100644 index 0000000000000000000000000000000000000000..040d2d0043705f54f315a0f018f4c338fc1d22c0 GIT binary patch literal 32768 zcmeI(Jx;?g6aZjTex^!vXJJNcOe~0lAfZZRf~Hbw8G4W&jysf*sv9=(lRyH&(5f9i zONpJtiLGyYeojhVIjZ&YwZH6@_Tc;d`Zyie#!0O|`{s6gb#^%)t>2=bng9U;1PBly zK!5-N0t5&U*dKv|I+fqz|A_Bn#)q{L@qb*O-YwoQR@XPtPfdUT0RjXF5FkK+009C7 z2<(qQ#P_|_tpt>FN&T1j{x%=e@nh3I=f;mE-YO5vTm5{Lm4wN#eHf4KUxrV!W;&TQ z`IbU}009C72oNAZfB*pk1PJU*AoBNv)U5=>K7iQgzq5%Mm;eC+1PBlyK!5-N0t5&U z*qA`X`;p22n*Tq~qx_Zs-`GTahyVcs1PBlyK!5-N0t5&U*qT7(`6K@y{{zInfUOPe z!vqKrAV7cs0RjXF5FkK+009C72oNAZfB*pk1PBlyK!5-N0t5&UAV7cs0RjXF5FkK+ Iz`q4P04I|f@Bjb+ literal 0 HcmV?d00001 diff --git a/service.go b/service.go index 22452b2..4534644 100644 --- a/service.go +++ b/service.go @@ -58,7 +58,7 @@ type connection struct { } // NewConnection creates a new Connection -func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application, service Service) Connection { +func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection { connection := new(connection) connection.hostname = hostname connection.conn = conn diff --git a/testing/tapir_malicious_remote_integration_test.go b/testing/tapir_malicious_remote_integration_test.go index 5e4b8cb..e80ab95 100644 --- a/testing/tapir_malicious_remote_integration_test.go +++ b/testing/tapir_malicious_remote_integration_test.go @@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) { log.Infof("closing ACN...") acn.Close() sg.Wait() - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 5) // wait for goroutines to finish... log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine()) if numRoutinesStart != runtime.NumGoroutine() { t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())