Add Broadcast #23
|
@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool {
|
|||
panic("implement me")
|
||||
}
|
||||
|
||||
func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func TestAuthApp_Failed(t *testing.T) {
|
||||
var authApp AuthApp
|
||||
ai := authApp.NewInstance()
|
||||
|
|
|
@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
|
|||
}
|
||||
|
||||
log.Debugf("Connected to %v [%v]", hostname, connectionID)
|
||||
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance()))
|
||||
s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
|
||||
return true, nil
|
||||
}
|
||||
log.Debugf("Error connecting to %v %v", hostname, err)
|
||||
|
@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error {
|
|||
if err == nil {
|
||||
tempHostname := s.getNewConnectionID()
|
||||
log.Debugf("Accepted connection from %v", tempHostname)
|
||||
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance()))
|
||||
s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
|
||||
} else {
|
||||
log.Debugf("Error accepting connection %v", err)
|
||||
return err
|
||||
|
@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() {
|
|||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connections who possess the given capability
|
||||
func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.connections.Range(func(key, value interface{}) bool {
|
||||
connection := value.(tapir.Connection)
|
||||
if connection.HasCapability(capability) {
|
||||
connection.Send(message)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
|
Binary file not shown.
13
service.go
13
service.go
|
@ -18,6 +18,7 @@ type Service interface {
|
|||
Connect(hostname string, application Application) (bool, error)
|
||||
Listen(application Application) error
|
||||
GetConnection(connectionID string) (Connection, error)
|
||||
Broadcast(message []byte, capability Capability) error
|
||||
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
|
||||
Shutdown()
|
||||
}
|
||||
|
@ -37,6 +38,7 @@ type Connection interface {
|
|||
App() Application
|
||||
SetApp(application Application)
|
||||
IsClosed() bool
|
||||
Broadcast(message []byte, capability Capability) error
|
||||
}
|
||||
|
||||
// Connection defines a Tapir Connection
|
||||
|
@ -52,17 +54,19 @@ type connection struct {
|
|||
closed bool
|
||||
MaxLength int
|
||||
lock sync.Mutex
|
||||
service Service
|
||||
}
|
||||
|
||||
// NewConnection creates a new Connection
|
||||
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
|
||||
func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
|
||||
|
||||
connection := new(connection)
|
||||
connection.hostname = hostname
|
||||
connection.conn = conn
|
||||
connection.app = app
|
||||
connection.identity = id
|
||||
connection.outbound = outbound
|
||||
connection.MaxLength = 1024
|
||||
connection.MaxLength = 8192
|
||||
dan
commented
Could be a top of file const, or not, just a thought Could be a top of file const, or not, just a thought
sarah
commented
Will be addressing this #22 Will be addressing this https://git.openprivacy.ca/cwtch.im/tapir/issues/22
|
||||
connection.service = service
|
||||
go connection.app.Init(connection)
|
||||
return connection
|
||||
}
|
||||
|
@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) {
|
|||
c.closed = true
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all active service connections with a given capability
|
||||
func (c *connection) Broadcast(message []byte, capability Capability) error {
|
||||
return c.service.Broadcast(message, capability)
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) {
|
|||
log.Infof("closing ACN...")
|
||||
acn.Close()
|
||||
sg.Wait()
|
||||
time.Sleep(time.Second * 2)
|
||||
time.Sleep(time.Second * 5) // wait for goroutines to finish...
|
||||
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
|
||||
if numRoutinesStart != runtime.NumGoroutine() {
|
||||
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())
|
||||
|
|
Loading…
Reference in New Issue
can we change the sig to
NewConnection(service Service, hostname string, outbound bool, conn io.ReadWriteCloser, app Application)
and either not need to store .identity or just pull it from service.id or service.getNewConnectionID() (we seem to use the former in connect and the latter in listen)
Fixed the signature, can't get rid of id yet (see: #20)