Add Garbage Collection for Old Connections #50

Merged
dan merged 1 commits from gc into master 4 months ago
  1. 54
      networks/tor/BaseOnionService.go
  2. 12
      testing/tapir_integration_test.go
  3. 2
      testing/tapir_malicious_remote_integration_test.go

54
networks/tor/BaseOnionService.go

@ -1,6 +1,7 @@
package tor
import (
"context"
"crypto/rand"
"encoding/base64"
"errors"
@ -15,13 +16,14 @@ import (
// BaseOnionService is a concrete implementation of the service interface over Tor onion services.
type BaseOnionService struct {
connections sync.Map
acn connectivity.ACN
id *primitives.Identity
privateKey ed25519.PrivateKey
ls connectivity.ListenService
lock sync.Mutex
port int
connections sync.Map
acn connectivity.ACN
id *primitives.Identity
privateKey ed25519.PrivateKey
ls connectivity.ListenService
lock sync.Mutex
port int
shutdownChannel chan bool
}
// Metrics provides a report of useful information about the status of the service e.g. the number of active
@ -53,6 +55,25 @@ func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id
s.id = id
s.privateKey = sk
s.port = 9878
// blocking so we can wait on shutdown for closing of this goroutine
s.shutdownChannel = make(chan bool)
go func() {
for s.waitOrTimeout() {
s.GarbageCollect()
}
log.Debugf("closing down garbage collection goroutine")
}()
}
func (s *BaseOnionService) waitOrTimeout() bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
select {
case <-s.shutdownChannel:
return false
case <-ctx.Done():
return true
}
}
// SetPort configures the port that the service uses.
@ -183,6 +204,20 @@ func (s *BaseOnionService) GetConnection(hostname string) (tapir.Connection, err
return conn[0], nil
}
// GarbageCollect iterates through the connection pool and cleans up any connections that are closed
// that haven't been removed from the map.
func (s *BaseOnionService) GarbageCollect() {
log.Debugf("running garbage collection...")
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
if connection.IsClosed() {
// Delete this Closed Connection
s.connections.Delete(key)
}
return true
})
}
// Connect initializes a new outbound connection to the given peer, using the defined Application
func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (bool, error) {
currconn, _ := s.GetConnection(hostname)
@ -263,11 +298,16 @@ func (s *BaseOnionService) Shutdown() {
if s.ls != nil {
s.ls.Close()
}
// close all existing connections manually
s.connections.Range(func(key, value interface{}) bool {
connection := value.(tapir.Connection)
connection.Close()
return true
})
// wait for the return of our garbage collection goroutine
s.shutdownChannel <- true
}
// Broadcast sends a message to all connections who possess the given capability

12
testing/tapir_integration_test.go

@ -12,6 +12,7 @@ import (
"io/ioutil"
"os"
"runtime"
"runtime/pprof"
"sync"
"testing"
"time"
@ -103,14 +104,19 @@ func TestTapir(t *testing.T) {
go connectclient(t, client, id.PublicKey(), wg)
CheckConnection(service, clienthostname, wg)
wg.Wait()
// Wait for Server to Sync
time.Sleep(time.Second * 2)
// Wait for Garbage Collection...
time.Sleep(time.Second * 60)
log.Infof("Closing ACN...")
client.Shutdown()
service.Shutdown()
acn.Close()
sg.Wait()
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 5)
log.Infof("Number of goroutines open at close: %d", runtime.NumGoroutine())
if numRoutinesStart != runtime.NumGoroutine() {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Errorf("Potential goroutine leak: Num Start:%v NumEnd: %v", numRoutinesStart, runtime.NumGoroutine())
}
if !AuthSuccess {

2
testing/tapir_malicious_remote_integration_test.go

@ -71,6 +71,8 @@ func TestTapirMaliciousRemote(t *testing.T) {
// Wait for Server to Sync
time.Sleep(time.Second * 2)
log.Infof("closing ACN...")
client.Shutdown()
service.Shutdown()
acn.Close()
sg.Wait()
time.Sleep(time.Second * 5) // wait for goroutines to finish...

Loading…
Cancel
Save