diff --git a/tor/torProvider.go b/tor/torProvider.go index d1050ce..a8aaf21 100644 --- a/tor/torProvider.go +++ b/tor/torProvider.go @@ -38,6 +38,8 @@ const ( networkUp = 0 ) +const TorMaxPendingConns = 32 // tor/src/app/config/config.c MaxClientCircuitsPending + // NoTorrcError is a typed error thrown to indicate start could not complete due to lack of a torrc file type NoTorrcError struct { path string @@ -60,6 +62,60 @@ type onionListenService struct { tp *torProvider } +// LockQueue provides a blocking fifo queue that allows up to a specified number of entrants through non blocking +// before `freeMax` is hit, and then new entrants are placed in a blocking queue +type LockQueue struct { + lock sync.Mutex + freeMax int + count int + queue []*sync.Mutex +} + +// NewLockQueue returns a LockQueue that allows up to `freeMax` entrants before blocking starts +func NewLockQueue(freeMax int) *LockQueue { + return &LockQueue{freeMax: freeMax, count: 0, queue: []*sync.Mutex{}} +} + +// Enter entres a caller into the queue +// if there are less than `freeMax` others in the lock than it is non blocking and returns immediately +// if there are more than `freeMax` others in the lock, caller is placed on a queue, the call blocks until they are popped out of the queue +func (lq *LockQueue) Enter(skipToFront bool) { + lq.lock.Lock() + lq.count++ + if lq.count >= lq.freeMax { + myMutex := &sync.Mutex{} + if skipToFront { + lq.queue = append([]*sync.Mutex{myMutex}, lq.queue...) + } else { + lq.queue = append(lq.queue, myMutex) + } + lq.lock.Unlock() + myMutex.Lock() + myMutex.Lock() + time.Sleep(500 * time.Millisecond) + } else { + lq.lock.Unlock() + } +} + +// Exit takes a caller out of the lock covered code +// reduced count of callers in the covered code +// if there is a queue waiting to enter, unblocks another caller +func (lq *LockQueue) Exit() { + lq.lock.Lock() + lq.count-- + if len(lq.queue) > 0 { + head := lq.queue[0] + if len(lq.queue) == 1 { + lq.queue = []*sync.Mutex{} + } else { + lq.queue = lq.queue[1:] + } + head.Unlock() + } + lq.lock.Unlock() +} + type torProvider struct { controlPort int t *tor.Tor @@ -77,6 +133,8 @@ type torProvider struct { dataDir string version string bootProgress int + startTime time.Time + openLockQueue *LockQueue } func (ols *onionListenService) AddressFull() string { @@ -274,7 +332,9 @@ func (tp *torProvider) Listen(identity connectivity.PrivateKey, port int) (conne } conf := &tor.ListenConf{NoWait: true, Version3: true, Key: identity, RemotePorts: []int{port}, Detach: true, DiscardKey: true, LocalListener: localListener} + tp.openLockQueue.Enter(true) os, err := tp.t.Listen(context.TODO(), conf) + tp.openLockQueue.Exit() // Reattach to the old local listener... // Note: this code probably shouldn't be hit in Cwtch anymore because we purge torrc on restart. @@ -359,7 +419,14 @@ func (tp *torProvider) Open(hostname string) (net.Conn, string, error) { addrParts := strings.Split(hostname, ":") resolvedHostname = addrParts[1] } + + tp.openLockQueue.Enter(false) conn, err := tp.dialer.Dial("tcp", resolvedHostname+".onion:9878") + tp.openLockQueue.Exit() + + if err != nil { + log.Debugf("Open() error in tor.dailer.Dail: %v\n", err) + } return conn, resolvedHostname, err } @@ -477,7 +544,7 @@ func startTor(appDirectory string, bundledTorPath string, dataDir string, contro os.MkdirAll(torDir, 0700) - tp := &torProvider{authenticator: authenticator, controlPort: controlPort, appDirectory: appDirectory, bundeledTorPath: bundledTorPath, childListeners: make(map[string]*onionListenService), breakChan: make(chan bool, 1), statusCallback: nil, versionCallback: nil, lastRestartTime: time.Now().Add(-restartCooldown), bootProgress: -1} + tp := &torProvider{authenticator: authenticator, controlPort: controlPort, appDirectory: appDirectory, bundeledTorPath: bundledTorPath, childListeners: make(map[string]*onionListenService), breakChan: make(chan bool, 1), statusCallback: nil, versionCallback: nil, lastRestartTime: time.Now().Add(-restartCooldown), bootProgress: -1, startTime: time.Now(), openLockQueue: NewLockQueue(TorMaxPendingConns - 1)} log.Debugf("checking if there is a running system tor") if version, err := tp.checkVersion(); err == nil { diff --git a/tor/torrcBuilder.go b/tor/torrcBuilder.go index dc04c04..667ad79 100644 --- a/tor/torrcBuilder.go +++ b/tor/torrcBuilder.go @@ -42,12 +42,24 @@ func (tb *TorrcBuilder) WithControlPort(port int) *TorrcBuilder { return tb } -// WithLog sets the Log to file directive to the specified with with the specified log level +// WithLog sets the Log to file directive to the specified file with the specified log level func (tb *TorrcBuilder) WithLog(logfile string, level TorLogLevel) *TorrcBuilder { tb.lines = append(tb.lines, fmt.Sprintf("Log %v file %v", level, logfile)) return tb } +func (tb *TorrcBuilder) WithSocksTimeout(timeOutSecs int) *TorrcBuilder { + tb.lines = append(tb.lines, fmt.Sprintf("SocksTimeout %v", timeOutSecs)) + return tb +} + +// IncludeCustom appends to the torrc builder and allows the client to set any option they want, while benefiting +// from other configuration options. +func (tb *TorrcBuilder) IncludeCustom(lines []string) *TorrcBuilder { + tb.lines = append(tb.lines, lines...) + return tb +} + // WithCustom clobbers the torrc builder and allows the client to set any option they want, while benefiting // from other configuration options. func (tb *TorrcBuilder) WithCustom(lines []string) *TorrcBuilder {