Add Broadcast #23
|
@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
func TestAuthApp_Failed(t *testing.T) {
|
func TestAuthApp_Failed(t *testing.T) {
|
||||||
var authApp AuthApp
|
var authApp AuthApp
|
||||||
ai := authApp.NewInstance()
|
ai := authApp.NewInstance()
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Connected to %v [%v]", hostname, connectionID)
|
log.Debugf("Connected to %v [%v]", hostname, connectionID)
|
||||||
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance()))
|
s.connections.Store(connectionID, tapir.NewConnection(s, s.id, hostname, true, conn, app.NewInstance()))
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
log.Debugf("Error connecting to %v %v", hostname, err)
|
log.Debugf("Error connecting to %v %v", hostname, err)
|
||||||
|
@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
tempHostname := s.getNewConnectionID()
|
tempHostname := s.getNewConnectionID()
|
||||||
log.Debugf("Accepted connection from %v", tempHostname)
|
log.Debugf("Accepted connection from %v", tempHostname)
|
||||||
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance()))
|
s.connections.Store(tempHostname, tapir.NewConnection(s, s.id, tempHostname, false, conn, app.NewInstance()))
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Error accepting connection %v", err)
|
log.Debugf("Error accepting connection %v", err)
|
||||||
return err
|
return err
|
||||||
|
@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() {
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast sends a message to all connections who possess the given capability
|
||||||
|
func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
s.connections.Range(func(key, value interface{}) bool {
|
||||||
|
connection := value.(tapir.Connection)
|
||||||
|
if connection.HasCapability(capability) {
|
||||||
|
connection.Send(message)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Binary file not shown.
13
service.go
13
service.go
|
@ -18,6 +18,7 @@ type Service interface {
|
||||||
Connect(hostname string, application Application) (bool, error)
|
Connect(hostname string, application Application) (bool, error)
|
||||||
Listen(application Application) error
|
Listen(application Application) error
|
||||||
GetConnection(connectionID string) (Connection, error)
|
GetConnection(connectionID string) (Connection, error)
|
||||||
|
Broadcast(message []byte, capability Capability) error
|
||||||
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
|
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
@ -37,6 +38,7 @@ type Connection interface {
|
||||||
App() Application
|
App() Application
|
||||||
SetApp(application Application)
|
SetApp(application Application)
|
||||||
IsClosed() bool
|
IsClosed() bool
|
||||||
|
Broadcast(message []byte, capability Capability) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connection defines a Tapir Connection
|
// Connection defines a Tapir Connection
|
||||||
|
@ -52,17 +54,19 @@ type connection struct {
|
||||||
closed bool
|
closed bool
|
||||||
MaxLength int
|
MaxLength int
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
service Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConnection creates a new Connection
|
// NewConnection creates a new Connection
|
||||||
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
|
func NewConnection(service Service, id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
|
||||||
|
|||||||
connection := new(connection)
|
connection := new(connection)
|
||||||
connection.hostname = hostname
|
connection.hostname = hostname
|
||||||
connection.conn = conn
|
connection.conn = conn
|
||||||
connection.app = app
|
connection.app = app
|
||||||
connection.identity = id
|
connection.identity = id
|
||||||
connection.outbound = outbound
|
connection.outbound = outbound
|
||||||
connection.MaxLength = 1024
|
connection.MaxLength = 8192
|
||||||
dan
commented
Could be a top of file const, or not, just a thought Could be a top of file const, or not, just a thought
sarah
commented
Will be addressing this #22 Will be addressing this https://git.openprivacy.ca/cwtch.im/tapir/issues/22
|
|||||||
|
connection.service = service
|
||||||
go connection.app.Init(connection)
|
go connection.app.Init(connection)
|
||||||
return connection
|
return connection
|
||||||
}
|
}
|
||||||
|
@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) {
|
||||||
c.closed = true
|
c.closed = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast sends a message to all active service connections with a given capability
|
||||||
|
func (c *connection) Broadcast(message []byte, capability Capability) error {
|
||||||
|
return c.service.Broadcast(message, capability)
|
||||||
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ func TestTapirMaliciousRemote(t *testing.T) {
|
||||||
log.Infof("closing ACN...")
|
log.Infof("closing ACN...")
|
||||||
acn.Close()
|
acn.Close()
|
||||||
sg.Wait()
|
sg.Wait()
|
||||||
time.Sleep(time.Second * 2)
|
time.Sleep(time.Second * 5) // wait for goroutines to finish...
|
||||||
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
|
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
|
||||||
if numRoutinesStart != runtime.NumGoroutine() {
|
if numRoutinesStart != runtime.NumGoroutine() {
|
||||||
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())
|
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())
|
||||||
|
|
Loading…
Reference in New Issue
Fixed the signature, can't get rid of id yet (see: #20)