Add Broadcast #23

Merged
dan merged 4 commits from broadcast into master 2020-07-14 22:05:54 +00:00
5 changed files with 32 additions and 5 deletions

View File

@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool {
panic("implement me") panic("implement me")
} }
func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error {
panic("implement me")
}
func TestAuthApp_Failed(t *testing.T) { func TestAuthApp_Failed(t *testing.T) {
var authApp AuthApp var authApp AuthApp
ai := authApp.NewInstance() ai := authApp.NewInstance()

View File

@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
} }
log.Debugf("Connected to %v [%v]", hostname, connectionID) log.Debugf("Connected to %v [%v]", hostname, connectionID)
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance())) s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
return true, nil return true, nil
} }
log.Debugf("Error connecting to %v %v", hostname, err) log.Debugf("Error connecting to %v %v", hostname, err)
@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error {
if err == nil { if err == nil {
tempHostname := s.getNewConnectionID() tempHostname := s.getNewConnectionID()
log.Debugf("Accepted connection from %v", tempHostname) log.Debugf("Accepted connection from %v", tempHostname)
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance())) s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
} else { } else {
log.Debugf("Error accepting connection %v", err) log.Debugf("Error accepting connection %v", err)
return err return err
@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() {
return true return true
}) })
} }
// Broadcast sends a message to all connections who possess the given capability
func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
s.lock.Lock()
defer s.lock.Unlock()
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.HasCapability(capability) {
connection.Send(message)
}
return true
})
return nil
}

BIN
persistence/test.dbgi Normal file

Binary file not shown.

View File

@ -18,6 +18,7 @@ type Service interface {
Connect(hostname string, application Application) (bool, error) Connect(hostname string, application Application) (bool, error)
Listen(application Application) error Listen(application Application) error
GetConnection(connectionID string) (Connection, error) GetConnection(connectionID string) (Connection, error)
Broadcast(message []byte, capability Capability) error
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error) WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
Shutdown() Shutdown()
} }
@ -37,6 +38,7 @@ type Connection interface {
App() Application App() Application
SetApp(application Application) SetApp(application Application)
IsClosed() bool IsClosed() bool
Broadcast(message []byte, capability Capability) error
} }
// Connection defines a Tapir Connection // Connection defines a Tapir Connection
@ -52,17 +54,19 @@ type connection struct {
closed bool closed bool
MaxLength int MaxLength int
lock sync.Mutex lock sync.Mutex
service Service
} }
// NewConnection creates a new Connection // NewConnection creates a new Connection
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection { func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
Outdated
Review

can we change the sig to

NewConnection(service Service, hostname string, outbound bool, conn io.ReadWriteCloser, app Application)

and either not need to store .identity or just pull it from service.id or service.getNewConnectionID() (we seem to use the former in connect and the latter in listen)

can we change the sig to ```NewConnection(service Service, hostname string, outbound bool, conn io.ReadWriteCloser, app Application)``` and either not need to store .identity or just pull it from service.id or service.getNewConnectionID() (we seem to use the former in connect and the latter in listen)
Review

Fixed the signature, can't get rid of id yet (see: #20)

Fixed the signature, can't get rid of id yet (see: https://git.openprivacy.ca/cwtch.im/tapir/issues/20)
connection := new(connection) connection := new(connection)
connection.hostname = hostname connection.hostname = hostname
connection.conn = conn connection.conn = conn
connection.app = app connection.app = app
connection.identity = id connection.identity = id
connection.outbound = outbound connection.outbound = outbound
connection.MaxLength = 1024 connection.MaxLength = 8192
Review

Could be a top of file const, or not, just a thought

Could be a top of file const, or not, just a thought
Review

Will be addressing this #22

Will be addressing this https://git.openprivacy.ca/cwtch.im/tapir/issues/22
connection.service = service
go connection.app.Init(connection) go connection.app.Init(connection)
return connection return connection
} }
@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) {
c.closed = true c.closed = true
} }
} }
// Broadcast sends a message to all active service connections with a given capability
func (c *connection) Broadcast(message []byte, capability Capability) error {
return c.service.Broadcast(message, capability)
}

View File

@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) {
log.Infof("closing ACN...") log.Infof("closing ACN...")
acn.Close() acn.Close()
sg.Wait() sg.Wait()
time.Sleep(time.Second * 2) time.Sleep(time.Second * 5) // wait for goroutines to finish...
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine()) log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
if numRoutinesStart != runtime.NumGoroutine() { if numRoutinesStart != runtime.NumGoroutine() {
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine()) t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())