Browse Source

peer and server tear down; connectionManager tear down; integ test tracks goRoutines and errors if not cleaned up properly

pull/30/head
Dan Ballard 2 years ago
parent
commit
30b4a2068e
6 changed files with 107 additions and 32 deletions
  1. +47
    -23
      peer/connections/connectionsmanager.go
  2. +3
    -2
      peer/connections/peerpeerconnection.go
  3. +5
    -0
      peer/connections/peerserverconnection.go
  4. +8
    -2
      peer/cwtch_peer.go
  5. +6
    -0
      server/server.go
  6. +38
    -5
      testing/cwtch_peer_server_intergration_test.go

+ 47
- 23
peer/connections/connectionsmanager.go View File

@@ -12,6 +12,7 @@ type Manager struct {
peerConnections map[string]*PeerPeerConnection
serverConnections map[string]*PeerServerConnection
lock sync.Mutex
breakChannel chan bool
}

// NewConnectionsManager creates a new instance of Manager.
@@ -19,6 +20,7 @@ func NewConnectionsManager() *Manager {
m := new(Manager)
m.peerConnections = make(map[string]*PeerPeerConnection)
m.serverConnections = make(map[string]*PeerServerConnection)
m.breakChannel = make(chan bool)
return m
}

@@ -50,17 +52,6 @@ func (m *Manager) ManageServerConnection(host string, handler func(string, *prot
m.lock.Unlock()
}

// TearDownPeerConnection closes an existing peer connection
func (m *Manager) TearDownPeerConnection(onion string) {
m.lock.Lock()
pc, ok := m.peerConnections[onion]
if ok {
pc.Kill()
delete(m.peerConnections, onion)
}
m.lock.Unlock()
}

// GetPeers returns a map of all peer connections with their state
func (m *Manager) GetPeers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
@@ -101,23 +92,56 @@ func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerC

// AttemptReconnections repeatedly attempts to reconnect with failed peers and servers.
func (m *Manager) AttemptReconnections() {
m.lock.Lock()
for _, ppc := range m.peerConnections {
if ppc.GetState() == FAILED {
go ppc.Run()
timeout := time.Duration(0) // first pass right away

for {
select {
case <-time.After(timeout * time.Second):
m.lock.Lock()
for _, ppc := range m.peerConnections {
if ppc.GetState() == FAILED {
go ppc.Run()
}
}
m.lock.Unlock()

m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
}
}
m.lock.Unlock()

// Launch Another Run In 30 Seconds
timeout = time.Duration(30)
case <-m.breakChannel:
return
}
}
m.lock.Unlock()
}

// ClosePeerConnection closes an existing peer connection
func (m *Manager) ClosePeerConnection(onion string) {
m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
}
pc, ok := m.peerConnections[onion]
if ok {
pc.Close()
delete(m.peerConnections, onion)
}
m.lock.Unlock()
}

// Launch Another Run In 30 Seconds
time.Sleep(time.Second * 30)
go m.AttemptReconnections()
func (m *Manager) Shutdown() {
m.breakChannel <- true
m.lock.Lock()
for onion, ppc := range m.peerConnections {
ppc.Close()
delete(m.peerConnections, onion)
}
for onion, psc := range m.serverConnections {
psc.Close()
delete(m.serverConnections, onion)
}
m.lock.Unlock()
}

+ 3
- 2
peer/connections/peerpeerconnection.go View File

@@ -104,9 +104,10 @@ func (ppc *PeerPeerConnection) Run() error {
return err
}

// Kill closes the connection
func (ppc *PeerPeerConnection) Kill() {
// Close closes the connection
func (ppc *PeerPeerConnection) Close() {
ppc.state = KILLED
ppc.connection.Break()
// TODO We should kill the connection outright, but we need to add that to libricochet-go
}


+ 5
- 0
peer/connections/peerserverconnection.go View File

@@ -112,6 +112,11 @@ func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) err
return err
}

func (psc *PeerServerConnection) Close() {
psc.state = KILLED
psc.connection.Conn.Close()
}

// HandleGroupMessage passes the given group message back to the profile.
func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) {
log.Printf("Received Group Message: %v", gm)


+ 8
- 2
peer/cwtch_peer.go View File

@@ -21,6 +21,7 @@ import (
type CwtchPeer struct {
connection.AutoConnectionHandler
Profile *model.Profile
app *application.RicochetApplication
mutex sync.Mutex
Log chan string `json:"-"`
connectionsManager *connections.Manager
@@ -158,7 +159,7 @@ func (cp *CwtchPeer) TrustPeer(peer string) error {
// BlockPeer blocks an existing peer relationship.
func (cp *CwtchPeer) BlockPeer(peer string) error {
err := cp.Profile.BlockPeer(peer)
cp.connectionsManager.TearDownPeerConnection(peer)
cp.connectionsManager.ClosePeerConnection(peer)
return err
}

@@ -203,13 +204,18 @@ func (cp *CwtchPeer) Listen() error {
return cpc
}
})

cwtchpeer.Init(cp.Profile.Name, cp.Profile.OnionPrivateKey, af, cp)
log.Printf("Running cwtch peer on %v", l.Addr().String())
cp.app = cwtchpeer
cwtchpeer.Run(l)
return nil
}

func (cp *CwtchPeer) Shutdown() {
cp.connectionsManager.Shutdown()
cp.app.Shutdown()
}

// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
rai *application.ApplicationInstance


+ 6
- 0
server/server.go View File

@@ -13,6 +13,7 @@ import (

// Server encapsulates a complete, compliant Cwtch server.
type Server struct {
app *application.RicochetApplication
}

// Run s
@@ -68,5 +69,10 @@ func (s *Server) Run(privateKeyFile string) {

cwtchserver.Init("cwtch server for "+l.Addr().String()[0:16], pk, af, new(application.AcceptAllContactManager))
log.Printf("cwtch server running on cwtch:%s", l.Addr().String()[0:16])
s.app = cwtchserver
cwtchserver.Run(l)
}

func (s *Server) Shutdown() {
s.app.Shutdown()
}

+ 38
- 5
testing/cwtch_peer_server_intergration_test.go View File

@@ -10,6 +10,7 @@ import (
"io/ioutil"
"log"
"os"
"runtime"
"testing"
"time"
)
@@ -51,8 +52,10 @@ func printAndVerifyTimeline(t *testing.T, timeline []model.Message) error {

func serverCheck(serverAddr string) bool {
rc, err := goricochet.Open(serverAddr)
// Won't actaully free thread because rc.start() will wait for the uncalled rc.Process to read from errorChannel...
rc.Conn.Close()

if err == nil {
rc.Conn.Close()
return true
}
return false
@@ -61,9 +64,10 @@ func serverCheck(serverAddr string) bool {
func TestCwtchPeerIntegration(t *testing.T) {
// Hide logging "noise"
log.SetOutput(ioutil.Discard)
// Todo: coung goroutines at beginning middle and end after shutdown
numGoRoutinesStart := runtime.NumGoroutine()

// ***** Cwtch Server managment *****
var server *cwtchserver.Server = nil
generatedKey := checkAndGenPrivateKey(keyfile)

serverKey, err := utils.LoadPrivateKeyFromFile(keyfile)
@@ -80,7 +84,7 @@ func TestCwtchPeerIntegration(t *testing.T) {

if !serverOnline {
// launch app
server := new(cwtchserver.Server)
server = new(cwtchserver.Server)
fmt.Printf("No server found\nStarting cwtch server...\n")
go server.Run(keyfile)

@@ -90,6 +94,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Printf("Found existing cwtch server %v, using for tests...\n", serverAddr)
}

time.Sleep(time.Second * 2)
numGoRoutinesPostServer := runtime.NumGoroutine()

// ***** Peer setup *****

fmt.Println("Creating Alice...")
@@ -104,6 +111,7 @@ func TestCwtchPeerIntegration(t *testing.T) {

fmt.Println("Waiting for alice and bob to connection with onion network...")
time.Sleep(time.Second * 60)
numGoRoutinesPostPeerStart := runtime.NumGoroutine()

// ***** Peering and group creation / invite *****

@@ -138,6 +146,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
time.Sleep(time.Second * 3)

numGoRoutinesPostPeer := runtime.NumGoroutine()

fmt.Println("Alice joining server...")
alice.JoinServer(serverAddr)

@@ -145,7 +155,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
bob.JoinServer(serverAddr)

// Wait for them to join the server
time.Sleep(time.Second * 30)
time.Sleep(time.Second * 40)
numGouRoutinesPostServerConnect := runtime.NumGoroutine()

// ***** Conversation *****

@@ -186,7 +197,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** Verify Test *****

// final syncing time...
time.Sleep(time.Second * 10)
time.Sleep(time.Second * 15)

alicesGroup := alice.Profile.GetGroupByGroupID(groupId)
fmt.Printf("alice Groups:\n")
@@ -226,4 +237,26 @@ func TestCwtchPeerIntegration(t *testing.T) {
}

// Todo: shutdown users and server
fmt.Println("Shutting down Bob...")
bob.Shutdown()
time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine()
if server != nil {
fmt.Println("Shutting down server...")
server.Shutdown()
time.Sleep(time.Second * 3)
}
numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
fmt.Println("Shutting down Alice...")
alice.Shutdown()
time.Sleep(time.Second * 3)
numGoRoutinesPostAlice := runtime.NumGoroutine()

fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeer: %v\nnumGouRoutinesPostServerConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostAlice: %v\n",
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostPeer, numGouRoutinesPostServerConnect,
numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostAlice)

if numGoRoutinesPostServer != numGoRoutinesPostAlice {
t.Errorf("Number of GoRoutines once server checks were completed (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesPostServer, numGoRoutinesPostAlice)
}
}

Loading…
Cancel
Save