Add Broacast
the build was successful Details

This commit is contained in:
Sarah Jamie Lewis 2020-07-14 14:23:27 -07:00
parent b6eddf79b1
commit 66d6b0b51e
3 changed files with 31 additions and 4 deletions

View File

@ -80,6 +80,10 @@ func (MockConnection) IsClosed() bool {
panic("implement me")
}
func (MockConnection) Broadcast(message []byte, capability tapir.Capability) error {
panic("implement me")
}
func TestAuthApp_Failed(t *testing.T) {
var authApp AuthApp
ai := authApp.NewInstance()

View File

@ -106,7 +106,7 @@ func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool
}
log.Debugf("Connected to %v [%v]", hostname, connectionID)
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance()))
s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance(), s))
return true, nil
}
log.Debugf("Error connecting to %v %v", hostname, err)
@ -137,7 +137,7 @@ func (s *BaseOnionService) Listen(app tapir.Application) error {
if err == nil {
tempHostname := s.getNewConnectionID()
log.Debugf("Accepted connection from %v", tempHostname)
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance()))
s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance(), s))
} else {
log.Debugf("Error accepting connection %v", err)
return err
@ -159,3 +159,17 @@ func (s *BaseOnionService) Shutdown() {
return true
})
}
// Broadcast sends a message to all connections who possess the given capability
func (s *BaseOnionService) Broadcast(message []byte, capability tapir.Capability) error {
s.lock.Lock()
defer s.lock.Unlock()
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.HasCapability(capability) {
connection.Send(message)
}
return true
})
return nil
}

View File

@ -18,6 +18,7 @@ type Service interface {
Connect(hostname string, application Application) (bool, error)
Listen(application Application) error
GetConnection(connectionID string) (Connection, error)
Broadcast(message []byte, capability Capability) error
WaitForCapabilityOrClose(connectionID string, capability Capability) (Connection, error)
Shutdown()
}
@ -37,6 +38,7 @@ type Connection interface {
App() Application
SetApp(application Application)
IsClosed() bool
Broadcast(message []byte, capability Capability) error
}
// Connection defines a Tapir Connection
@ -52,17 +54,19 @@ type connection struct {
closed bool
MaxLength int
lock sync.Mutex
service Service
}
// NewConnection creates a new Connection
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application) Connection {
func NewConnection(id *primitives.Identity, hostname string, outbound bool, conn io.ReadWriteCloser, app Application, service Service) Connection {
connection := new(connection)
connection.hostname = hostname
connection.conn = conn
connection.app = app
connection.identity = id
connection.outbound = outbound
connection.MaxLength = 1024
connection.MaxLength = 8192
connection.service = service
go connection.app.Init(connection)
return connection
}
@ -204,3 +208,8 @@ func (c *connection) Send(message []byte) {
c.closed = true
}
}
// Broadcast sends a message to all active service connections with a given capability
func (c *connection) Broadcast(message []byte, capability Capability) error {
return c.service.Broadcast(message, capability)
}