Merge pull request 'Add Broadcast' (#23) from broadcast into master
the build was successful Details

Reviewed-on: #23
This commit is contained in:
Dan Ballard 2020-07-14 15:05:53 -07:00
commit 28b2f8212f
5 changed files with 32 additions and 5 deletions

View File

@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool {
panic("implement me") panic("implement me")
} }
func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error {
panic("implement me")
}
func TestAuthApp_Failed(t *testing.T) { func TestAuthApp_Failed(t *testing.T) {
var authApp AuthApp var authApp AuthApp
ai := authApp.NewInstance() ai := authApp.NewInstance()

View File

@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
} }
log.Debugf("Connected to %v [%v]", hostname, connectionID) log.Debugf("Connected to %v [%v]", hostname, connectionID)
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance())) s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
return true, nil return true, nil
} }
log.Debugf("Error connecting to %v %v", hostname, err) log.Debugf("Error connecting to %v %v", hostname, err)
@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error {
if err == nil { if err == nil {
tempHostname := s.getNewConnectionID() tempHostname := s.getNewConnectionID()
log.Debugf("Accepted connection from %v", tempHostname) log.Debugf("Accepted connection from %v", tempHostname)
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance())) s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
} else { } else {
log.Debugf("Error accepting connection %v", err) log.Debugf("Error accepting connection %v", err)
return err return err
@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() {
return true return true
}) })
} }
// Broadcast sends a message to all connections who possess the given capability
func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
s.lock.Lock()
defer s.lock.Unlock()
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.HasCapability(capability) {
connection.Send(message)
}
return true
})
return nil
}

BIN
persistence/test.dbgi Normal file

Binary file not shown.

View File

@ -18,6 +18,7 @@ type Service interface {
Connect(hostname string, application Application) (bool, error) Connect(hostname string, application Application) (bool, error)
Listen(application Application) error Listen(application Application) error
GetConnection(connectionID string) (Connection, error) GetConnection(connectionID string) (Connection, error)
Broadcast(message []byte, capability Capability) error
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error) WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
Shutdown() Shutdown()
} }
@ -37,6 +38,7 @@ type Connection interface {
App() Application App() Application
SetApp(application Application) SetApp(application Application)
IsClosed() bool IsClosed() bool
Broadcast(message []byte, capability Capability) error
} }
// Connection defines a Tapir Connection // Connection defines a Tapir Connection
@ -52,17 +54,19 @@ type connection struct {
closed bool closed bool
MaxLength int MaxLength int
lock sync.Mutex lock sync.Mutex
service Service
} }
// NewConnection creates a new Connection // NewConnection creates a new Connection
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection { func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
connection := new(connection) connection := new(connection)
connection.hostname = hostname connection.hostname = hostname
connection.conn = conn connection.conn = conn
connection.app = app connection.app = app
connection.identity = id connection.identity = id
connection.outbound = outbound connection.outbound = outbound
connection.MaxLength = 1024 connection.MaxLength = 8192
connection.service = service
go connection.app.Init(connection) go connection.app.Init(connection)
return connection return connection
} }
@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) {
c.closed = true c.closed = true
} }
} }
// Broadcast sends a message to all active service connections with a given capability
func (c *connection) Broadcast(message []byte, capability Capability) error {
return c.service.Broadcast(message, capability)
}

View File

@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) {
log.Infof("closing ACN...") log.Infof("closing ACN...")
acn.Close() acn.Close()
sg.Wait() sg.Wait()
time.Sleep(time.Second * 2) time.Sleep(time.Second * 5) // wait for goroutines to finish...
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine()) log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
if numRoutinesStart != runtime.NumGoroutine() { if numRoutinesStart != runtime.NumGoroutine() {
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine()) t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())