add a gating queue around Open/Listen based on tor circuit creation limits
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Dan Ballard 2022-09-25 17:11:38 -07:00
parent 1f52dc7138
commit 9161cd671e
2 changed files with 81 additions and 2 deletions

View File

@ -38,6 +38,8 @@ const (
networkUp = 0
)
const TorMaxPendingConns = 32 // tor/src/app/config/config.c MaxClientCircuitsPending
// NoTorrcError is a typed error thrown to indicate start could not complete due to lack of a torrc file
type NoTorrcError struct {
path string
@ -60,6 +62,60 @@ type onionListenService struct {
tp *torProvider
}
// LockQueue provides a blocking fifo queue that allows up to a specified number of entrants through non blocking
// before `freeMax` is hit, and then new entrants are placed in a blocking queue
type LockQueue struct {
lock sync.Mutex
freeMax int
count int
queue []*sync.Mutex
}
// NewLockQueue returns a LockQueue that allows up to `freeMax` entrants before blocking starts
func NewLockQueue(freeMax int) *LockQueue {
return &LockQueue{freeMax: freeMax, count: 0, queue: []*sync.Mutex{}}
}
// Enter entres a caller into the queue
// if there are less than `freeMax` others in the lock than it is non blocking and returns immediately
// if there are more than `freeMax` others in the lock, caller is placed on a queue, the call blocks until they are popped out of the queue
func (lq *LockQueue) Enter(skipToFront bool) {
lq.lock.Lock()
lq.count++
if lq.count >= lq.freeMax {
myMutex := &sync.Mutex{}
if skipToFront {
lq.queue = append([]*sync.Mutex{myMutex}, lq.queue...)
} else {
lq.queue = append(lq.queue, myMutex)
}
lq.lock.Unlock()
myMutex.Lock()
myMutex.Lock()
time.Sleep(500 * time.Millisecond)
} else {
lq.lock.Unlock()
}
}
// Exit takes a caller out of the lock covered code
// reduced count of callers in the covered code
// if there is a queue waiting to enter, unblocks another caller
func (lq *LockQueue) Exit() {
lq.lock.Lock()
lq.count--
if len(lq.queue) > 0 {
head := lq.queue[0]
if len(lq.queue) == 1 {
lq.queue = []*sync.Mutex{}
} else {
lq.queue = lq.queue[1:]
}
head.Unlock()
}
lq.lock.Unlock()
}
type torProvider struct {
controlPort int
t *tor.Tor
@ -77,6 +133,8 @@ type torProvider struct {
dataDir string
version string
bootProgress int
startTime time.Time
openLockQueue *LockQueue
}
func (ols *onionListenService) AddressFull() string {
@ -274,7 +332,9 @@ func (tp *torProvider) Listen(identity connectivity.PrivateKey, port int) (conne
}
conf := &tor.ListenConf{NoWait: true, Version3: true, Key: identity, RemotePorts: []int{port}, Detach: true, DiscardKey: true, LocalListener: localListener}
tp.openLockQueue.Enter(true)
os, err := tp.t.Listen(context.TODO(), conf)
tp.openLockQueue.Exit()
// Reattach to the old local listener...
// Note: this code probably shouldn't be hit in Cwtch anymore because we purge torrc on restart.
@ -359,7 +419,14 @@ func (tp *torProvider) Open(hostname string) (net.Conn, string, error) {
addrParts := strings.Split(hostname, ":")
resolvedHostname = addrParts[1]
}
tp.openLockQueue.Enter(false)
conn, err := tp.dialer.Dial("tcp", resolvedHostname+".onion:9878")
tp.openLockQueue.Exit()
if err != nil {
log.Debugf("Open() error in tor.dailer.Dail: %v\n", err)
}
return conn, resolvedHostname, err
}
@ -477,7 +544,7 @@ func startTor(appDirectory string, bundledTorPath string, dataDir string, contro
os.MkdirAll(torDir, 0700)
tp := &torProvider{authenticator: authenticator, controlPort: controlPort, appDirectory: appDirectory, bundeledTorPath: bundledTorPath, childListeners: make(map[string]*onionListenService), breakChan: make(chan bool, 1), statusCallback: nil, versionCallback: nil, lastRestartTime: time.Now().Add(-restartCooldown), bootProgress: -1}
tp := &torProvider{authenticator: authenticator, controlPort: controlPort, appDirectory: appDirectory, bundeledTorPath: bundledTorPath, childListeners: make(map[string]*onionListenService), breakChan: make(chan bool, 1), statusCallback: nil, versionCallback: nil, lastRestartTime: time.Now().Add(-restartCooldown), bootProgress: -1, startTime: time.Now(), openLockQueue: NewLockQueue(TorMaxPendingConns - 1)}
log.Debugf("checking if there is a running system tor")
if version, err := tp.checkVersion(); err == nil {

View File

@ -42,12 +42,24 @@ func (tb *TorrcBuilder) WithControlPort(port int) *TorrcBuilder {
return tb
}
// WithLog sets the Log to file directive to the specified with with the specified log level
// WithLog sets the Log to file directive to the specified file with the specified log level
func (tb *TorrcBuilder) WithLog(logfile string, level TorLogLevel) *TorrcBuilder {
tb.lines = append(tb.lines, fmt.Sprintf("Log %v file %v", level, logfile))
return tb
}
func (tb *TorrcBuilder) WithSocksTimeout(timeOutSecs int) *TorrcBuilder {
tb.lines = append(tb.lines, fmt.Sprintf("SocksTimeout %v", timeOutSecs))
return tb
}
// IncludeCustom appends to the torrc builder and allows the client to set any option they want, while benefiting
// from other configuration options.
func (tb *TorrcBuilder) IncludeCustom(lines []string) *TorrcBuilder {
tb.lines = append(tb.lines, lines...)
return tb
}
// WithCustom clobbers the torrc builder and allows the client to set any option they want, while benefiting
// from other configuration options.
func (tb *TorrcBuilder) WithCustom(lines []string) *TorrcBuilder {