Fix several small goroutine leaks around restart
continuous-integration/drone/pr Build is passing Details
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2022-04-18 14:49:24 -07:00
parent a30e16354c
commit 8d0ed17e7f
3 changed files with 79 additions and 13 deletions

View File

@ -7,11 +7,14 @@ import (
"math/rand"
"os"
path "path/filepath"
"runtime"
"runtime/pprof"
"testing"
"time"
)
func TestLaunchTor(t *testing.T) {
goRoutineStart := runtime.NumGoroutine()
log.SetLevel(log.LevelDebug)
rand.Seed(int64(time.Now().Nanosecond()))
@ -51,4 +54,15 @@ func TestLaunchTor(t *testing.T) {
t.Log("we have bootstrapped!")
acn.Close()
}
time.Sleep(time.Second * 5)
goRoutineEnd := runtime.NumGoroutine()
if goRoutineEnd != goRoutineStart {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Fatalf("goroutine leak in ACN: %v %v", goRoutineStart, goRoutineEnd)
}
}

View File

@ -1,6 +1,7 @@
package tor
import (
"bytes"
"context"
"errors"
"fmt"
@ -333,6 +334,7 @@ func (tp *torProvider) restart() {
tp.dialer = newTp.dialer
tp.statusCallback = statusCallback
tp.lastRestartTime = time.Now()
tp.isClosed = false
go tp.monitorRestart()
} else {
log.Errorf("Error restarting Tor process: %v", err)
@ -368,6 +370,8 @@ func (tp *torProvider) Close() {
delete(tp.childListeners, addr)
}
log.Debugf("shutting down acn threads..(is already closed: %v)", tp.isClosed)
if !tp.isClosed {
// Break out of any background checks and close
// the underlying tor connection
@ -404,6 +408,7 @@ func (tp *torProvider) callStatusCallback(prog int, status string) {
func NewTorACNWithAuth(appDirectory string, bundledTorPath string, dataDir string, controlPort int, authenticator tor.Authenticator) (connectivity.ACN, error) {
tp, err := startTor(appDirectory, bundledTorPath, dataDir, controlPort, authenticator)
if err == nil {
tp.isClosed = false
go tp.monitorRestart()
}
return tp, err
@ -450,7 +455,7 @@ func startTor(appDirectory string, bundledTorPath string, dataDir string, contro
os.MkdirAll(torDir, 0700)
tp := &torProvider{authenticator: authenticator, controlPort: controlPort, appDirectory: appDirectory, bundeledTorPath: bundledTorPath, childListeners: make(map[string]*onionListenService), breakChan: make(chan bool), statusCallback: nil, lastRestartTime: time.Now().Add(-restartCooldown)}
tp := &torProvider{authenticator: authenticator, controlPort: controlPort, appDirectory: appDirectory, bundeledTorPath: bundledTorPath, childListeners: make(map[string]*onionListenService), breakChan: make(chan bool, 1), statusCallback: nil, lastRestartTime: time.Now().Add(-restartCooldown)}
log.Debugf("checking if there is a running system tor")
if err := tp.checkVersion(); err == nil {
@ -494,7 +499,7 @@ func startTor(appDirectory string, bundledTorPath string, dataDir string, contro
err := tp.checkVersion()
if err == nil {
tp.t.DeleteDataDirOnClose = false // caller is repsonsible for dealing with cached information...
tp.t.DeleteDataDirOnClose = false // caller is responsible for dealing with cached information...
tp.dialer, err = tp.t.Dialer(context.TODO(), &tor.DialConf{Authenticator: tp.authenticator})
return tp, err
}
@ -567,24 +572,53 @@ func createFromExisting(controlport *control.Conn, datadir string) *tor.Tor {
}
func checkCmdlineTorVersion(torCmd string) bool {
// ideally we would use CommandContext with Timeout here
// but it doesn't work with programs that may launch other processes via scripts e.g. exec
// and the workout is more complex than just implementing the logic ourselves...
cmd := exec.Command(torCmd, "--version")
cmd.SysProcAttr = sysProcAttr
out, err := cmd.CombinedOutput()
var outb bytes.Buffer
cmd.Stdout = &outb
waiting := make(chan error, 1)
// try running the tor process
go func() {
log.Debugf("running tor process: %v", torCmd)
cmd.Run()
waiting <- nil
}()
// timeout function
go func() {
<-time.After(time.Second * 5)
waiting <- errors.New("timeout")
}()
err := <-waiting
if err != nil {
log.Debugf("tor process timed out")
return false
}
re := regexp.MustCompile(`[0-1]\.[0-9]\.[0-9]\.[0-9]`)
sysTorVersion := re.Find(out)
log.Infoln("tor version: " + string(sysTorVersion))
return err == nil && minTorVersionReqs(string(sysTorVersion))
sysTorVersion := re.Find(outb.Bytes())
log.Infof("tor version: %v", string(sysTorVersion))
return minTorVersionReqs(string(sysTorVersion))
}
// returns true if supplied version meets our min requirments
// min requirement: 0.3.5.x
func minTorVersionReqs(torversion string) bool {
torversions := strings.Split(torversion, ".") //eg: 0.3.4.8 or 0.3.5.1-alpha
log.Debugf("torversions: %v", torversions)
tva, _ := strconv.Atoi(torversions[0])
tvb, _ := strconv.Atoi(torversions[1])
tvc, _ := strconv.Atoi(torversions[2])
return tva > 0 || (tva == 0 && (tvb > 3 || (tvb == 3 && tvc >= 5)))
if len(torversions) >= 3 {
log.Debugf("torversions: %v", torversions)
tva, _ := strconv.Atoi(torversions[0])
tvb, _ := strconv.Atoi(torversions[1])
tvc, _ := strconv.Atoi(torversions[2])
return tva > 0 || (tva == 0 && (tvb > 3 || (tvb == 3 && tvc >= 5)))
}
return false
}
func dialControlPort(port int) (*control.Conn, error) {

View File

@ -4,8 +4,12 @@ import (
"fmt"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
"os"
path "path/filepath"
"runtime"
"runtime/pprof"
"testing"
"time"
)
func getStatusCallback(progChan chan int) func(int, string) {
@ -16,7 +20,10 @@ func getStatusCallback(progChan chan int) func(int, string) {
}
func TestTorProvider(t *testing.T) {
progChan := make(chan int)
goRoutineStart := runtime.NumGoroutine()
progChan := make(chan int, 10)
log.SetLevel(log.LevelDebug)
torpath := path.Join("..", "tmp/tor")
@ -79,4 +86,15 @@ func TestTorProvider(t *testing.T) {
acn.Restart()
acn.Close()
time.Sleep(time.Second * 5)
goRoutineEnd := runtime.NumGoroutine()
if goRoutineEnd != goRoutineStart {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Fatalf("goroutine leak in ACN: %v %v", goRoutineStart, goRoutineEnd)
}
}