diff --git a/applications/auth_test.go b/applications/auth_test.go index bd5129a..ef5f08f 100644 --- a/applications/auth_test.go +++ b/applications/auth_test.go @@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool { panic("implement me") } +func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error { + panic("implement me") +} + func TestAuthApp_Failed(t *testing.T) { var authApp AuthApp ai := authApp.NewInstance() diff --git a/networks/tor/BaseOnionService.go b/networks/tor/BaseOnionService.go index f9e0080..fe9c2e5 100644 --- a/networks/tor/BaseOnionService.go +++ b/networks/tor/BaseOnionService.go @@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool } log.Debugf("Connected to %v [%v]", hostname, connectionID) - s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance())) + s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance())) return true, nil } log.Debugf("Error connecting to %v %v", hostname, err) @@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error { if err == nil { tempHostname := s.getNewConnectionID() log.Debugf("Accepted connection from %v", tempHostname) - s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance())) + s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance())) } else { log.Debugf("Error accepting connection %v", err) return err @@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() { return true }) } + +// Broadcast sends a message to all connections who possess the given capability +func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error { + s.lock.Lock() + defer s.lock.Unlock() + s.connections.Range(func(key, value interface{}) bool { + connection := value.(tapir.Connection) + if connection.HasCapability(capability) { + connection.Send(message) + } + return true + }) + return nil +} diff --git a/persistence/test.dbgi b/persistence/test.dbgi new file mode 100644 index 0000000..040d2d0 Binary files /dev/null and b/persistence/test.dbgi differ diff --git a/service.go b/service.go index 80da505..4534644 100644 --- a/service.go +++ b/service.go @@ -18,6 +18,7 @@ type Service interface { Connect(hostname string, application Application) (bool, error) Listen(application Application) error GetConnection(connectionID string) (Connection, error) + Broadcast(message []byte, capability Capability) error WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error) Shutdown() } @@ -37,6 +38,7 @@ type Connection interface { App() Application SetApp(application Application) IsClosed() bool + Broadcast(message []byte, capability Capability) error } // Connection defines a Tapir Connection @@ -52,17 +54,19 @@ type connection struct { closed bool MaxLength int lock sync.Mutex + service Service } // NewConnection creates a new Connection -func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection { +func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection { connection := new(connection) connection.hostname = hostname connection.conn = conn connection.app = app connection.identity = id connection.outbound = outbound - connection.MaxLength = 1024 + connection.MaxLength = 8192 + connection.service = service go connection.app.Init(connection) return connection } @@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) { c.closed = true } } + +// Broadcast sends a message to all active service connections with a given capability +func (c *connection) Broadcast(message []byte, capability Capability) error { + return c.service.Broadcast(message, capability) +} diff --git a/testing/tapir_malicious_remote_integration_test.go b/testing/tapir_malicious_remote_integration_test.go index 5e4b8cb..e80ab95 100644 --- a/testing/tapir_malicious_remote_integration_test.go +++ b/testing/tapir_malicious_remote_integration_test.go @@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) { log.Infof("closing ACN...") acn.Close() sg.Wait() - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 5) // wait for goroutines to finish... log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine()) if numRoutinesStart != runtime.NumGoroutine() { t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())