diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 8e66e68..0fdd219 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -24,6 +24,11 @@ import ( "golang.org/x/crypto/ed25519" ) +type connectionLockedService struct { + service tapir.Service + connectingLock sync.Mutex +} + type engine struct { queue event.Queue @@ -46,7 +51,7 @@ type engine struct { getValRequests sync.Map // [string]string eventID:Data // Nextgen Tapir Service - ephemeralServices map[string]tapir.Service //sync.Map // string(onion) => tapir.Service + ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service ephemeralServicesLock sync.Mutex // Required for listen(), inaccessible from identity @@ -73,7 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine := new(engine) engine.identity = identity engine.privateKey = privateKey - engine.ephemeralServices = make(map[string]tapir.Service) + engine.ephemeralServices = make(map[string]*connectionLockedService) engine.queue = event.NewQueue() go engine.eventHandler() @@ -283,7 +288,7 @@ func (e *engine) Shutdown() { defer e.ephemeralServicesLock.Unlock() for _, connection := range e.ephemeralServices { log.Infof("shutting down ephemeral service") - connection.Shutdown() + connection.service.Shutdown() } e.queue.Shutdown() } @@ -319,14 +324,15 @@ func (e *engine) peerWithOnion(onion string) { // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { e.ephemeralServicesLock.Lock() - defer e.ephemeralServicesLock.Unlock() - connection, exists := e.ephemeralServices[onion] - if exists { - if conn, err := connection.GetConnection(onion); err == nil { + connectionService, exists := e.ephemeralServices[onion] + + if exists && connectionService.service != nil { + if conn, err := connectionService.service.GetConnection(onion); err == nil { // We are already peered and synced so return... - // This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called + // This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called // in CwtchPeer. if !conn.IsClosed() && len(lastKnownSignature) != 0 { + e.ephemeralServicesLock.Unlock() return } // Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)... @@ -335,6 +341,13 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke // Otherwise...let's reconnect } + connectionService = &connectionLockedService{} + e.ephemeralServices[onion] = connectionService + + connectionService.connectingLock.Lock() + defer connectionService.connectingLock.Unlock() + e.ephemeralServicesLock.Unlock() + log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) e.ignoreOnShutdown(e.serverConnecting)(onion) // Create a new ephemeral service for this connection @@ -345,7 +358,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke Y := ristretto255.NewElement() Y.UnmarshalText([]byte(tokenServerY)) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected))) - e.ephemeralServices[onion] = ephemeralService + e.ephemeralServicesLock.Lock() + e.ephemeralServices[onion].service = ephemeralService + e.ephemeralServicesLock.Unlock() // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) if connected && err != nil { @@ -508,7 +523,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si return } - conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) + conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { @@ -624,7 +639,7 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte func (e *engine) leaveServer(server string) { ephemeralService, ok := e.ephemeralServices[server] if ok { - ephemeralService.Shutdown() + ephemeralService.service.Shutdown() delete(e.ephemeralServices, server) } }