Add Broadcast #23

Merged
dan merged 4 commits from broadcast into master 2020-07-14 22:05:54 +00:00
5 changed files with 32 additions and 5 deletions

View File

@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool {
panic("implement me")
}
func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error {
panic("implement me")
}
func TestAuthApp_Failed(t *testing.T) {
var authApp AuthApp
ai := authApp.NewInstance()

View File

@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
}
log.Debugf("Connected to %v [%v]", hostname, connectionID)
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance()))
s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
return true, nil
}
log.Debugf("Error connecting to %v %v", hostname, err)
@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error {
if err == nil {
tempHostname := s.getNewConnectionID()
log.Debugf("Accepted connection from %v", tempHostname)
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance()))
s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
} else {
log.Debugf("Error accepting connection %v", err)
return err
@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() {
return true
})
}
// Broadcast sends a message to all connections who possess the given capability
func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
s.lock.Lock()
defer s.lock.Unlock()
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.HasCapability(capability) {
connection.Send(message)
}
return true
})
return nil
}

BIN
persistence/test.dbgi Normal file

Binary file not shown.

View File

@ -18,6 +18,7 @@ type Service interface {
Connect(hostname string, application Application) (bool, error)
Listen(application Application) error
GetConnection(connectionID string) (Connection, error)
Broadcast(message []byte, capability Capability) error
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
Shutdown()
}
@ -37,6 +38,7 @@ type Connection interface {
App() Application
SetApp(application Application)
IsClosed() bool
Broadcast(message []byte, capability Capability) error
}
// Connection defines a Tapir Connection
@ -52,17 +54,19 @@ type connection struct {
closed bool
MaxLength int
lock sync.Mutex
service Service
}
// NewConnection creates a new Connection
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
Outdated
Review

can we change the sig to

NewConnection(service Service, hostname string, outbound bool, conn io.ReadWriteCloser, app Application)

and either not need to store .identity or just pull it from service.id or service.getNewConnectionID() (we seem to use the former in connect and the latter in listen)

can we change the sig to ```NewConnection(service Service, hostname string, outbound bool, conn io.ReadWriteCloser, app Application)``` and either not need to store .identity or just pull it from service.id or service.getNewConnectionID() (we seem to use the former in connect and the latter in listen)
Review

Fixed the signature, can't get rid of id yet (see: #20)

Fixed the signature, can't get rid of id yet (see: https://git.openprivacy.ca/cwtch.im/tapir/issues/20)
connection := new(connection)
connection.hostname = hostname
connection.conn = conn
connection.app = app
connection.identity = id
connection.outbound = outbound
connection.MaxLength = 1024
connection.MaxLength = 8192
Review

Could be a top of file const, or not, just a thought

Could be a top of file const, or not, just a thought
Review

Will be addressing this #22

Will be addressing this https://git.openprivacy.ca/cwtch.im/tapir/issues/22
connection.service = service
go connection.app.Init(connection)
return connection
}
@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) {
c.closed = true
}
}
// Broadcast sends a message to all active service connections with a given capability
func (c *connection) Broadcast(message []byte, capability Capability) error {
return c.service.Broadcast(message, capability)
}

View File

@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) {
log.Infof("closing ACN...")
acn.Close()
sg.Wait()
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 5) // wait for goroutines to finish...
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
if numRoutinesStart != runtime.NumGoroutine() {
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())