engine: add more granular locking around ephemeral token services

This commit is contained in:
Dan Ballard 2021-12-17 20:27:56 -05:00
parent 1d220381eb
commit ff012313be
1 changed files with 26 additions and 11 deletions

View File

@ -24,6 +24,11 @@ import (
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
) )
type connectionLockedService struct {
service tapir.Service
connectingLock sync.Mutex
}
type engine struct { type engine struct {
queue event.Queue queue event.Queue
@ -46,7 +51,7 @@ type engine struct {
getValRequests sync.Map // [string]string eventID:Data getValRequests sync.Map // [string]string eventID:Data
// Nextgen Tapir Service // Nextgen Tapir Service
ephemeralServices map[string]tapir.Service //sync.Map // string(onion) => tapir.Service ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service
ephemeralServicesLock sync.Mutex ephemeralServicesLock sync.Mutex
// Required for listen(), inaccessible from identity // Required for listen(), inaccessible from identity
@ -73,7 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine := new(engine) engine := new(engine)
engine.identity = identity engine.identity = identity
engine.privateKey = privateKey engine.privateKey = privateKey
engine.ephemeralServices = make(map[string]tapir.Service) engine.ephemeralServices = make(map[string]*connectionLockedService)
engine.queue = event.NewQueue() engine.queue = event.NewQueue()
go engine.eventHandler() go engine.eventHandler()
@ -283,7 +288,7 @@ func (e *engine) Shutdown() {
defer e.ephemeralServicesLock.Unlock() defer e.ephemeralServicesLock.Unlock()
for _, connection := range e.ephemeralServices { for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service") log.Infof("shutting down ephemeral service")
connection.Shutdown() connection.service.Shutdown()
} }
e.queue.Shutdown() e.queue.Shutdown()
} }
@ -319,14 +324,15 @@ func (e *engine) peerWithOnion(onion string) {
// needs to be run in a goroutine as will block on Open. // needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
e.ephemeralServicesLock.Lock() e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock() connectionService, exists := e.ephemeralServices[onion]
connection, exists := e.ephemeralServices[onion]
if exists { if exists && connectionService.service != nil {
if conn, err := connection.GetConnection(onion); err == nil { if conn, err := connectionService.service.GetConnection(onion); err == nil {
// We are already peered and synced so return... // We are already peered and synced so return...
// This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called // This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called
// in CwtchPeer. // in CwtchPeer.
if !conn.IsClosed() && len(lastKnownSignature) != 0 { if !conn.IsClosed() && len(lastKnownSignature) != 0 {
e.ephemeralServicesLock.Unlock()
return return
} }
// Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)... // Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)...
@ -335,6 +341,13 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
// Otherwise...let's reconnect // Otherwise...let's reconnect
} }
connectionService = &connectionLockedService{}
e.ephemeralServices[onion] = connectionService
connectionService.connectingLock.Lock()
defer connectionService.connectingLock.Unlock()
e.ephemeralServicesLock.Unlock()
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
e.ignoreOnShutdown(e.serverConnecting)(onion) e.ignoreOnShutdown(e.serverConnecting)(onion)
// Create a new ephemeral service for this connection // Create a new ephemeral service for this connection
@ -345,7 +358,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
Y := ristretto255.NewElement() Y := ristretto255.NewElement()
Y.UnmarshalText([]byte(tokenServerY)) Y.UnmarshalText([]byte(tokenServerY))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected))) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
e.ephemeralServices[onion] = ephemeralService e.ephemeralServicesLock.Lock()
e.ephemeralServices[onion].service = ephemeralService
e.ephemeralServicesLock.Unlock()
// If we are already connected...check if we are authed and issue an auth event // If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless) // (This allows the ui to be stateless)
if connected && err != nil { if connected && err != nil {
@ -508,7 +523,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
return return
} }
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil { if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient) tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok { if ok {
@ -624,7 +639,7 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte
func (e *engine) leaveServer(server string) { func (e *engine) leaveServer(server string) {
ephemeralService, ok := e.ephemeralServices[server] ephemeralService, ok := e.ephemeralServices[server]
if ok { if ok {
ephemeralService.Shutdown() ephemeralService.service.Shutdown()
delete(e.ephemeralServices, server) delete(e.ephemeralServices, server)
} }
} }