fix race condition in engine
This commit is contained in:
parent
9c65ad4af3
commit
cf036bdee4
|
@ -7,6 +7,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
|
@ -63,7 +64,7 @@ type engine struct {
|
||||||
|
|
||||||
tokenManagers sync.Map // [tokenService][]TokenManager
|
tokenManagers sync.Map // [tokenService][]TokenManager
|
||||||
|
|
||||||
shuttingDown bool
|
shuttingDown atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
|
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
|
||||||
|
@ -139,7 +140,7 @@ func (e *engine) eventHandler() {
|
||||||
for {
|
for {
|
||||||
ev := e.queue.Next()
|
ev := e.queue.Next()
|
||||||
// optimistic shutdown...
|
// optimistic shutdown...
|
||||||
if e.shuttingDown {
|
if e.shuttingDown.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch ev.EventType {
|
switch ev.EventType {
|
||||||
|
@ -297,7 +298,7 @@ func (e *engine) createPeerTemplate() *PeerApp {
|
||||||
// Listen sets up an onion listener to process incoming cwtch messages
|
// Listen sets up an onion listener to process incoming cwtch messages
|
||||||
func (e *engine) listenFn() {
|
func (e *engine) listenFn() {
|
||||||
err := e.service.Listen(e.createPeerTemplate())
|
err := e.service.Listen(e.createPeerTemplate())
|
||||||
if !e.shuttingDown {
|
if !e.shuttingDown.Load() {
|
||||||
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
|
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -307,7 +308,8 @@ func (e *engine) Shutdown() {
|
||||||
// don't accept any more events...
|
// don't accept any more events...
|
||||||
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
|
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
|
||||||
e.service.Shutdown()
|
e.service.Shutdown()
|
||||||
e.shuttingDown = true
|
|
||||||
|
e.shuttingDown.Store(true)
|
||||||
|
|
||||||
e.ephemeralServicesLock.Lock()
|
e.ephemeralServicesLock.Lock()
|
||||||
defer e.ephemeralServicesLock.Unlock()
|
defer e.ephemeralServicesLock.Unlock()
|
||||||
|
@ -368,7 +370,7 @@ func (e *engine) makeAntispamPayment(onion string) {
|
||||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||||
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
|
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
|
||||||
if tokenManager.NumTokens() < 5 {
|
if tokenManager.NumTokens() < 5 {
|
||||||
go tokenApp.MakePayment()
|
go tokenApp.PurchaseTokens()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -431,7 +433,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
|
||||||
|
|
||||||
func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
|
func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
|
||||||
return func(x string) {
|
return func(x string) {
|
||||||
if !e.shuttingDown {
|
if !e.shuttingDown.Load() {
|
||||||
f(x)
|
f(x)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -439,7 +441,7 @@ func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
|
||||||
|
|
||||||
func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
|
func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
|
||||||
return func(x, y string) {
|
return func(x, y string) {
|
||||||
if !e.shuttingDown {
|
if !e.shuttingDown.Load() {
|
||||||
f(x, y)
|
f(x, y)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -615,7 +617,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
|
||||||
if ok {
|
if ok {
|
||||||
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
|
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
|
||||||
// we failed to post, probably because we ran out of tokens... so make a payment
|
// we failed to post, probably because we ran out of tokens... so make a payment
|
||||||
go tokenApp.MakePayment()
|
go tokenApp.PurchaseTokens()
|
||||||
// backoff
|
// backoff
|
||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
// try again
|
// try again
|
||||||
|
@ -623,7 +625,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
|
||||||
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1)
|
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1)
|
||||||
} else {
|
} else {
|
||||||
if numtokens < 5 {
|
if numtokens < 5 {
|
||||||
go tokenApp.MakePayment()
|
go tokenApp.PurchaseTokens()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// regardless we return....
|
// regardless we return....
|
||||||
|
|
|
@ -2,18 +2,13 @@ package connections
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/protocol/groups"
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
"cwtch.im/cwtch/utils"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir"
|
"git.openprivacy.ca/cwtch.im/tapir"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||||
"git.openprivacy.ca/openprivacy/connectivity"
|
"git.openprivacy.ca/openprivacy/connectivity"
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"github.com/gtank/ristretto255"
|
"github.com/gtank/ristretto255"
|
||||||
"reflect"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
|
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
|
||||||
|
@ -73,8 +68,8 @@ func (ta *TokenBoardClient) Init(connection tapir.Connection) {
|
||||||
log.Debugf("Successfully Initialized Connection to %v", connection.Hostname())
|
log.Debugf("Successfully Initialized Connection to %v", connection.Hostname())
|
||||||
go ta.Listen()
|
go ta.Listen()
|
||||||
// Optimistically acquire many tokens for this server...
|
// Optimistically acquire many tokens for this server...
|
||||||
go ta.MakePayment()
|
go ta.PurchaseTokens()
|
||||||
go ta.MakePayment()
|
go ta.PurchaseTokens()
|
||||||
ta.Replay()
|
ta.Replay()
|
||||||
} else {
|
} else {
|
||||||
connection.Close()
|
connection.Close()
|
||||||
|
@ -152,7 +147,7 @@ func (ta *TokenBoardClient) Replay() {
|
||||||
|
|
||||||
// PurchaseTokens purchases the given number of tokens from the server (using the provided payment handler)
|
// PurchaseTokens purchases the given number of tokens from the server (using the provided payment handler)
|
||||||
func (ta *TokenBoardClient) PurchaseTokens() {
|
func (ta *TokenBoardClient) PurchaseTokens() {
|
||||||
ta.MakePayment()
|
MakePayment(ta.tokenServiceOnion, ta.tokenService, ta.acn, ta.tokenBoardHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Post sends a Post Request to the server
|
// Post sends a Post Request to the server
|
||||||
|
@ -172,51 +167,6 @@ func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
|
||||||
return false, numTokens
|
return false, numTokens
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakePayment uses the PoW based token protocol to obtain more tokens
|
|
||||||
func (ta *TokenBoardClient) MakePayment() error {
|
|
||||||
log.Debugf("Making a Payment")
|
|
||||||
id, sk := primitives.InitializeEphemeralIdentity()
|
|
||||||
client := new(tor.BaseOnionService)
|
|
||||||
client.Init(ta.acn, sk, &id)
|
|
||||||
defer client.Shutdown()
|
|
||||||
|
|
||||||
tokenApplication := new(applications.TokenApplication)
|
|
||||||
tokenApplication.TokenService = ta.tokenService
|
|
||||||
powTokenApp := new(applications.ApplicationChain).
|
|
||||||
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
|
|
||||||
ChainApplication(tokenApplication, applications.HasTokensCapability)
|
|
||||||
|
|
||||||
log.Debugf("Waiting for successful PoW Auth...")
|
|
||||||
|
|
||||||
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
|
|
||||||
if connected && err == nil {
|
|
||||||
log.Debugf("Waiting for successful Token Acquisition...")
|
|
||||||
tp := utils.TimeoutPolicy(time.Second * 30)
|
|
||||||
err := tp.ExecuteAction(func() error {
|
|
||||||
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
|
||||||
if err == nil {
|
|
||||||
powtapp, ok := conn.App().(*applications.TokenApplication)
|
|
||||||
if ok {
|
|
||||||
log.Debugf("Updating Tokens")
|
|
||||||
ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens)
|
|
||||||
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
|
|
||||||
conn.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// we timed out
|
|
||||||
if err != nil {
|
|
||||||
ta.connection.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// NextToken retrieves the next token
|
// NextToken retrieves the next token
|
||||||
func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) {
|
func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) {
|
||||||
token, numtokens, err := ta.tokenBoardHandler.FetchToken(ta.tokenServiceOnion)
|
token, numtokens, err := ta.tokenBoardHandler.FetchToken(ta.tokenServiceOnion)
|
||||||
|
|
Loading…
Reference in New Issue