diff --git a/peer/connections/connectionsmanager.go b/peer/connections/connectionsmanager.go index 4d7a960..307c4ff 100644 --- a/peer/connections/connectionsmanager.go +++ b/peer/connections/connectionsmanager.go @@ -12,6 +12,7 @@ type Manager struct { peerConnections map[string]*PeerPeerConnection serverConnections map[string]*PeerServerConnection lock sync.Mutex + breakChannel chan bool } // NewConnectionsManager creates a new instance of Manager. @@ -19,6 +20,7 @@ func NewConnectionsManager() *Manager { m := new(Manager) m.peerConnections = make(map[string]*PeerPeerConnection) m.serverConnections = make(map[string]*PeerServerConnection) + m.breakChannel = make(chan bool) return m } @@ -50,17 +52,6 @@ func (m *Manager) ManageServerConnection(host string, handler func(string, *prot m.lock.Unlock() } -// TearDownPeerConnection closes an existing peer connection -func (m *Manager) TearDownPeerConnection(onion string) { - m.lock.Lock() - pc, ok := m.peerConnections[onion] - if ok { - pc.Kill() - delete(m.peerConnections, onion) - } - m.lock.Unlock() -} - // GetPeers returns a map of all peer connections with their state func (m *Manager) GetPeers() map[string]ConnectionState { rm := make(map[string]ConnectionState) @@ -101,23 +92,56 @@ func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerC // AttemptReconnections repeatedly attempts to reconnect with failed peers and servers. func (m *Manager) AttemptReconnections() { - m.lock.Lock() - for _, ppc := range m.peerConnections { - if ppc.GetState() == FAILED { - go ppc.Run() + timeout := time.Duration(0) // first pass right away + + for { + select { + case <-time.After(timeout * time.Second): + m.lock.Lock() + for _, ppc := range m.peerConnections { + if ppc.GetState() == FAILED { + go ppc.Run() + } + } + m.lock.Unlock() + + m.lock.Lock() + for _, psc := range m.serverConnections { + if psc.GetState() == FAILED { + go psc.Run() + } + } + m.lock.Unlock() + + // Launch Another Run In 30 Seconds + timeout = time.Duration(30) + case <-m.breakChannel: + return } } - m.lock.Unlock() - - m.lock.Lock() - for _, psc := range m.serverConnections { - if psc.GetState() == FAILED { - go psc.Run() - } - } - m.lock.Unlock() - - // Launch Another Run In 30 Seconds - time.Sleep(time.Second * 30) - go m.AttemptReconnections() +} + +// ClosePeerConnection closes an existing peer connection +func (m *Manager) ClosePeerConnection(onion string) { + m.lock.Lock() + pc, ok := m.peerConnections[onion] + if ok { + pc.Close() + delete(m.peerConnections, onion) + } + m.lock.Unlock() +} + +func (m *Manager) Shutdown() { + m.breakChannel <- true + m.lock.Lock() + for onion, ppc := range m.peerConnections { + ppc.Close() + delete(m.peerConnections, onion) + } + for onion, psc := range m.serverConnections { + psc.Close() + delete(m.serverConnections, onion) + } + m.lock.Unlock() } diff --git a/peer/connections/peerpeerconnection.go b/peer/connections/peerpeerconnection.go index 570f9a6..498bfb4 100644 --- a/peer/connections/peerpeerconnection.go +++ b/peer/connections/peerpeerconnection.go @@ -104,9 +104,10 @@ func (ppc *PeerPeerConnection) Run() error { return err } -// Kill closes the connection -func (ppc *PeerPeerConnection) Kill() { +// Close closes the connection +func (ppc *PeerPeerConnection) Close() { ppc.state = KILLED ppc.connection.Break() // TODO We should kill the connection outright, but we need to add that to libricochet-go } + diff --git a/peer/connections/peerserverconnection.go b/peer/connections/peerserverconnection.go index f96d3cf..529a7e2 100644 --- a/peer/connections/peerserverconnection.go +++ b/peer/connections/peerserverconnection.go @@ -112,6 +112,11 @@ func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) err return err } +func (psc *PeerServerConnection) Close() { + psc.state = KILLED + psc.connection.Conn.Close() +} + // HandleGroupMessage passes the given group message back to the profile. func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) { log.Printf("Received Group Message: %v", gm) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 6a49409..8ef46e6 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -21,6 +21,7 @@ import ( type CwtchPeer struct { connection.AutoConnectionHandler Profile *model.Profile + app *application.RicochetApplication mutex sync.Mutex Log chan string `json:"-"` connectionsManager *connections.Manager @@ -158,7 +159,7 @@ func (cp *CwtchPeer) TrustPeer(peer string) error { // BlockPeer blocks an existing peer relationship. func (cp *CwtchPeer) BlockPeer(peer string) error { err := cp.Profile.BlockPeer(peer) - cp.connectionsManager.TearDownPeerConnection(peer) + cp.connectionsManager.ClosePeerConnection(peer) return err } @@ -203,13 +204,18 @@ func (cp *CwtchPeer) Listen() error { return cpc } }) - cwtchpeer.Init(cp.Profile.Name, cp.Profile.OnionPrivateKey, af, cp) log.Printf("Running cwtch peer on %v", l.Addr().String()) + cp.app = cwtchpeer cwtchpeer.Run(l) return nil } +func (cp *CwtchPeer) Shutdown() { + cp.connectionsManager.Shutdown() + cp.app.Shutdown() +} + // CwtchPeerInstance encapsulates incoming peer connections type CwtchPeerInstance struct { rai *application.ApplicationInstance diff --git a/server/server.go b/server/server.go index 5072e04..b0a2246 100644 --- a/server/server.go +++ b/server/server.go @@ -13,6 +13,7 @@ import ( // Server encapsulates a complete, compliant Cwtch server. type Server struct { + app *application.RicochetApplication } // Run s @@ -68,5 +69,10 @@ func (s *Server) Run(privateKeyFile string) { cwtchserver.Init("cwtch server for "+l.Addr().String()[0:16], pk, af, new(application.AcceptAllContactManager)) log.Printf("cwtch server running on cwtch:%s", l.Addr().String()[0:16]) + s.app = cwtchserver cwtchserver.Run(l) } + +func (s *Server) Shutdown() { + s.app.Shutdown() +} diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index 602b8e0..f9df6f0 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "log" "os" + "runtime" "testing" "time" ) @@ -51,8 +52,10 @@ func printAndVerifyTimeline(t *testing.T, timeline []model.Message) error { func serverCheck(serverAddr string) bool { rc, err := goricochet.Open(serverAddr) + // Won't actaully free thread because rc.start() will wait for the uncalled rc.Process to read from errorChannel... + rc.Conn.Close() + if err == nil { - rc.Conn.Close() return true } return false @@ -61,9 +64,10 @@ func serverCheck(serverAddr string) bool { func TestCwtchPeerIntegration(t *testing.T) { // Hide logging "noise" log.SetOutput(ioutil.Discard) - // Todo: coung goroutines at beginning middle and end after shutdown + numGoRoutinesStart := runtime.NumGoroutine() // ***** Cwtch Server managment ***** + var server *cwtchserver.Server = nil generatedKey := checkAndGenPrivateKey(keyfile) serverKey, err := utils.LoadPrivateKeyFromFile(keyfile) @@ -80,7 +84,7 @@ func TestCwtchPeerIntegration(t *testing.T) { if !serverOnline { // launch app - server := new(cwtchserver.Server) + server = new(cwtchserver.Server) fmt.Printf("No server found\nStarting cwtch server...\n") go server.Run(keyfile) @@ -90,6 +94,9 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Printf("Found existing cwtch server %v, using for tests...\n", serverAddr) } + time.Sleep(time.Second * 2) + numGoRoutinesPostServer := runtime.NumGoroutine() + // ***** Peer setup ***** fmt.Println("Creating Alice...") @@ -104,6 +111,7 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Waiting for alice and bob to connection with onion network...") time.Sleep(time.Second * 60) + numGoRoutinesPostPeerStart := runtime.NumGoroutine() // ***** Peering and group creation / invite ***** @@ -138,6 +146,8 @@ func TestCwtchPeerIntegration(t *testing.T) { } time.Sleep(time.Second * 3) + numGoRoutinesPostPeer := runtime.NumGoroutine() + fmt.Println("Alice joining server...") alice.JoinServer(serverAddr) @@ -145,7 +155,8 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.JoinServer(serverAddr) // Wait for them to join the server - time.Sleep(time.Second * 30) + time.Sleep(time.Second * 40) + numGouRoutinesPostServerConnect := runtime.NumGoroutine() // ***** Conversation ***** @@ -186,7 +197,7 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Verify Test ***** // final syncing time... - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 15) alicesGroup := alice.Profile.GetGroupByGroupID(groupId) fmt.Printf("alice Groups:\n") @@ -226,4 +237,26 @@ func TestCwtchPeerIntegration(t *testing.T) { } // Todo: shutdown users and server + fmt.Println("Shutting down Bob...") + bob.Shutdown() + time.Sleep(time.Second * 3) + numGoRoutinesPostBob := runtime.NumGoroutine() + if server != nil { + fmt.Println("Shutting down server...") + server.Shutdown() + time.Sleep(time.Second * 3) + } + numGoRoutinesPostServerShutdown := runtime.NumGoroutine() + fmt.Println("Shutting down Alice...") + alice.Shutdown() + time.Sleep(time.Second * 3) + numGoRoutinesPostAlice := runtime.NumGoroutine() + + fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeer: %v\nnumGouRoutinesPostServerConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostAlice: %v\n", + numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostPeer, numGouRoutinesPostServerConnect, + numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostAlice) + + if numGoRoutinesPostServer != numGoRoutinesPostAlice { + t.Errorf("Number of GoRoutines once server checks were completed (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesPostServer, numGoRoutinesPostAlice) + } }