Add Broadcast #23
|
@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
func TestAuthApp_Failed(t *testing.T) {
|
func TestAuthApp_Failed(t *testing.T) {
|
||||||
var authApp AuthApp
|
var authApp AuthApp
|
||||||
ai := authApp.NewInstance()
|
ai := authApp.NewInstance()
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Connected to %v [%v]", hostname, connectionID)
|
log.Debugf("Connected to %v [%v]", hostname, connectionID)
|
||||||
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance()))
|
s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
log.Debugf("Error connecting to %v %v", hostname, err)
|
log.Debugf("Error connecting to %v %v", hostname, err)
|
||||||
|
@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
tempHostname := s.getNewConnectionID()
|
tempHostname := s.getNewConnectionID()
|
||||||
log.Debugf("Accepted connection from %v", tempHostname)
|
log.Debugf("Accepted connection from %v", tempHostname)
|
||||||
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance()))
|
s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Error accepting connection %v", err)
|
log.Debugf("Error accepting connection %v", err)
|
||||||
return err
|
return err
|
||||||
|
@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() {
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast sends a message to all connections who possess the given capability
|
||||||
|
func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
s.connections.Range(func(key, value interface{}) bool {
|
||||||
|
connection := value.(tapir.Connection)
|
||||||
|
if connection.HasCapability(capability) {
|
||||||
|
connection.Send(message)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Binary file not shown.
13
service.go
13
service.go
|
@ -18,6 +18,7 @@ type Service interface {
|
||||||
Connect(hostname string, application Application) (bool, error)
|
Connect(hostname string, application Application) (bool, error)
|
||||||
Listen(application Application) error
|
Listen(application Application) error
|
||||||
GetConnection(connectionID string) (Connection, error)
|
GetConnection(connectionID string) (Connection, error)
|
||||||
|
Broadcast(message []byte, capability Capability) error
|
||||||
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
|
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
@ -37,6 +38,7 @@ type Connection interface {
|
||||||
App() Application
|
App() Application
|
||||||
SetApp(application Application)
|
SetApp(application Application)
|
||||||
IsClosed() bool
|
IsClosed() bool
|
||||||
|
Broadcast(message []byte, capability Capability) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connection defines a Tapir Connection
|
// Connection defines a Tapir Connection
|
||||||
|
@ -52,17 +54,19 @@ type connection struct {
|
||||||
closed bool
|
closed bool
|
||||||
MaxLength int
|
MaxLength int
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
service Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConnection creates a new Connection
|
// NewConnection creates a new Connection
|
||||||
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
|
func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
|
||||||
|
|||||||
connection := new(connection)
|
connection := new(connection)
|
||||||
connection.hostname = hostname
|
connection.hostname = hostname
|
||||||
connection.conn = conn
|
connection.conn = conn
|
||||||
connection.app = app
|
connection.app = app
|
||||||
connection.identity = id
|
connection.identity = id
|
||||||
connection.outbound = outbound
|
connection.outbound = outbound
|
||||||
connection.MaxLength = 1024
|
connection.MaxLength = 8192
|
||||||
dan
commented
Could be a top of file const, or not, just a thought Could be a top of file const, or not, just a thought
sarah
commented
Will be addressing this #22 Will be addressing this https://git.openprivacy.ca/cwtch.im/tapir/issues/22
|
|||||||
|
connection.service = service
|
||||||
go connection.app.Init(connection)
|
go connection.app.Init(connection)
|
||||||
return connection
|
return connection
|
||||||
}
|
}
|
||||||
|
@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) {
|
||||||
c.closed = true
|
c.closed = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast sends a message to all active service connections with a given capability
|
||||||
|
func (c *connection) Broadcast(message []byte, capability Capability) error {
|
||||||
|
return c.service.Broadcast(message, capability)
|
||||||
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) {
|
||||||
log.Infof("closing ACN...")
|
log.Infof("closing ACN...")
|
||||||
acn.Close()
|
acn.Close()
|
||||||
sg.Wait()
|
sg.Wait()
|
||||||
time.Sleep(time.Second * 2)
|
time.Sleep(time.Second * 5) // wait for goroutines to finish...
|
||||||
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
|
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
|
||||||
if numRoutinesStart != runtime.NumGoroutine() {
|
if numRoutinesStart != runtime.NumGoroutine() {
|
||||||
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())
|
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())
|
||||||
|
|
Loading…
Reference in New Issue
can we change the sig to
NewConnection(service Service, hostname string, outbound bool, conn io.ReadWriteCloser, app Application)
and either not need to store .identity or just pull it from service.id or service.getNewConnectionID() (we seem to use the former in connect and the latter in listen)
Fixed the signature, can't get rid of id yet (see: #20)