diff --git a/networks/tor/BaseOnionService.go b/networks/tor/BaseOnionService.go index e58a029..703e22b 100644 --- a/networks/tor/BaseOnionService.go +++ b/networks/tor/BaseOnionService.go @@ -1,6 +1,7 @@ package tor import ( + "context" "crypto/rand" "encoding/base64" "errors" @@ -15,13 +16,14 @@ import ( // BaseOnionService is a concrete implementation of the service interface over Tor onion services. type BaseOnionService struct { - connections sync.Map - acn connectivity.ACN - id *primitives.Identity - privateKey ed25519.PrivateKey - ls connectivity.ListenService - lock sync.Mutex - port int + connections sync.Map + acn connectivity.ACN + id *primitives.Identity + privateKey ed25519.PrivateKey + ls connectivity.ListenService + lock sync.Mutex + port int + shutdownChannel chan bool } // Metrics provides a report of useful information about the status of the service e.g. the number of active @@ -53,6 +55,25 @@ func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id s.id = id s.privateKey = sk s.port = 9878 + // blocking so we can wait on shutdown for closing of this goroutine + s.shutdownChannel = make(chan bool) + go func() { + for s.waitOrTimeout() { + s.GarbageCollect() + } + log.Debugf("closing down garbage collection goroutine") + }() +} + +func (s *BaseOnionService) waitOrTimeout() bool { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + select { + case <-s.shutdownChannel: + return false + case <-ctx.Done(): + return true + } } // SetPort configures the port that the service uses. @@ -183,6 +204,20 @@ func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, err return conn[0], nil } +// GarbageCollect iterates through the connection pool and cleans up any connections that are closed +// that haven't been removed from the map. +func (s *BaseOnionService) GarbageCollect() { + log.Debugf("running garbage collection...") + s.connections.Range(func(key, value interface{}) bool { + connection := value.(tapir.Connection) + if connection.IsClosed() { + // Delete this Closed Connection + s.connections.Delete(key) + } + return true + }) +} + // Connect initializes a new outbound connection to the given peer, using the defined Application func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) { currconn, _ := s.GetConnection(hostname) @@ -263,11 +298,16 @@ func (s *BaseOnionService) Shutdown() { if s.ls != nil { s.ls.Close() } + + // close all existing connections manually s.connections.Range(func(key, value interface{}) bool { connection := value.(tapir.Connection) connection.Close() return true }) + + // wait for the return of our garbage collection goroutine + s.shutdownChannel <- true } // Broadcast sends a message to all connections who possess the given capability diff --git a/testing/tapir_integration_test.go b/testing/tapir_integration_test.go index 85ab25c..582cb25 100644 --- a/testing/tapir_integration_test.go +++ b/testing/tapir_integration_test.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "os" "runtime" + "runtime/pprof" "sync" "testing" "time" @@ -103,14 +104,19 @@ func TestTapir(t *testing.T) { go connectclient(t, client, id.PublicKey(), wg) CheckConnection(service, clienthostname, wg) wg.Wait() - // Wait for Server to Sync - time.Sleep(time.Second * 2) + // Wait for Garbage Collection... + time.Sleep(time.Second * 60) log.Infof("Closing ACN...") + client.Shutdown() + service.Shutdown() acn.Close() sg.Wait() - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 5) log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine()) if numRoutinesStart != runtime.NumGoroutine() { + + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine()) } if !AuthSuccess { diff --git a/testing/tapir_malicious_remote_integration_test.go b/testing/tapir_malicious_remote_integration_test.go index 6efaceb..bf8488b 100644 --- a/testing/tapir_malicious_remote_integration_test.go +++ b/testing/tapir_malicious_remote_integration_test.go @@ -71,6 +71,8 @@ func TestTapirMaliciousRemote(t *testing.T) { // Wait for Server to Sync time.Sleep(time.Second * 2) log.Infof("closing ACN...") + client.Shutdown() + service.Shutdown() acn.Close() sg.Wait() time.Sleep(time.Second * 5) // wait for goroutines to finish...