Merge branch 'test-teardown' of dan/cwtch into master

This commit is contained in:
Sarah Jamie Lewis 2018-06-03 19:07:13 +00:00 committed by Gogs
commit 381a0d743d
6 changed files with 112 additions and 37 deletions

View File

@ -12,6 +12,7 @@ type Manager struct {
peerConnections map[string]*PeerPeerConnection
serverConnections map[string]*PeerServerConnection
lock sync.Mutex
breakChannel chan bool
}
// NewConnectionsManager creates a new instance of Manager.
@ -19,6 +20,7 @@ func NewConnectionsManager() *Manager {
m := new(Manager)
m.peerConnections = make(map[string]*PeerPeerConnection)
m.serverConnections = make(map[string]*PeerServerConnection)
m.breakChannel = make(chan bool)
return m
}
@ -50,17 +52,6 @@ func (m *Manager) ManageServerConnection(host string, handler func(string, *prot
m.lock.Unlock()
}
// TearDownPeerConnection closes an existing peer connection
func (m *Manager) TearDownPeerConnection(onion string) {
m.lock.Lock()
pc, ok := m.peerConnections[onion]
if ok {
pc.Kill()
delete(m.peerConnections, onion)
}
m.lock.Unlock()
}
// GetPeers returns a map of all peer connections with their state
func (m *Manager) GetPeers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
@ -101,23 +92,56 @@ func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerC
// AttemptReconnections repeatedly attempts to reconnect with failed peers and servers.
func (m *Manager) AttemptReconnections() {
m.lock.Lock()
for _, ppc := range m.peerConnections {
if ppc.GetState() == FAILED {
go ppc.Run()
timeout := time.Duration(0) // first pass right away
for {
select {
case <-time.After(timeout * time.Second):
m.lock.Lock()
for _, ppc := range m.peerConnections {
if ppc.GetState() == FAILED {
go ppc.Run()
}
}
m.lock.Unlock()
m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
}
}
m.lock.Unlock()
// Launch Another Run In 30 Seconds
timeout = time.Duration(30)
case <-m.breakChannel:
return
}
}
m.lock.Unlock()
m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
}
}
m.lock.Unlock()
// Launch Another Run In 30 Seconds
time.Sleep(time.Second * 30)
go m.AttemptReconnections()
}
// ClosePeerConnection closes an existing peer connection
func (m *Manager) ClosePeerConnection(onion string) {
m.lock.Lock()
pc, ok := m.peerConnections[onion]
if ok {
pc.Close()
delete(m.peerConnections, onion)
}
m.lock.Unlock()
}
func (m *Manager) Shutdown() {
m.breakChannel <- true
m.lock.Lock()
for onion, ppc := range m.peerConnections {
ppc.Close()
delete(m.peerConnections, onion)
}
for onion, psc := range m.serverConnections {
psc.Close()
delete(m.serverConnections, onion)
}
m.lock.Unlock()
}

View File

@ -104,9 +104,10 @@ func (ppc *PeerPeerConnection) Run() error {
return err
}
// Kill closes the connection
func (ppc *PeerPeerConnection) Kill() {
// Close closes the connection
func (ppc *PeerPeerConnection) Close() {
ppc.state = KILLED
ppc.connection.Break()
// TODO We should kill the connection outright, but we need to add that to libricochet-go
}

View File

@ -112,6 +112,11 @@ func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) err
return err
}
func (psc *PeerServerConnection) Close() {
psc.state = KILLED
psc.connection.Conn.Close()
}
// HandleGroupMessage passes the given group message back to the profile.
func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) {
log.Printf("Received Group Message: %v", gm)

View File

@ -21,6 +21,7 @@ import (
type CwtchPeer struct {
connection.AutoConnectionHandler
Profile *model.Profile
app *application.RicochetApplication
mutex sync.Mutex
Log chan string `json:"-"`
connectionsManager *connections.Manager
@ -158,7 +159,7 @@ func (cp *CwtchPeer) TrustPeer(peer string) error {
// BlockPeer blocks an existing peer relationship.
func (cp *CwtchPeer) BlockPeer(peer string) error {
err := cp.Profile.BlockPeer(peer)
cp.connectionsManager.TearDownPeerConnection(peer)
cp.connectionsManager.ClosePeerConnection(peer)
return err
}
@ -203,13 +204,18 @@ func (cp *CwtchPeer) Listen() error {
return cpc
}
})
cwtchpeer.Init(cp.Profile.Name, cp.Profile.OnionPrivateKey, af, cp)
log.Printf("Running cwtch peer on %v", l.Addr().String())
cp.app = cwtchpeer
cwtchpeer.Run(l)
return nil
}
func (cp *CwtchPeer) Shutdown() {
cp.connectionsManager.Shutdown()
cp.app.Shutdown()
}
// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
rai *application.ApplicationInstance

View File

@ -13,6 +13,7 @@ import (
// Server encapsulates a complete, compliant Cwtch server.
type Server struct {
app *application.RicochetApplication
}
// Run s
@ -68,5 +69,10 @@ func (s *Server) Run(privateKeyFile string) {
cwtchserver.Init("cwtch server for "+l.Addr().String()[0:16], pk, af, new(application.AcceptAllContactManager))
log.Printf("cwtch server running on cwtch:%s", l.Addr().String()[0:16])
s.app = cwtchserver
cwtchserver.Run(l)
}
func (s *Server) Shutdown() {
s.app.Shutdown()
}

View File

@ -10,6 +10,7 @@ import (
"io/ioutil"
"log"
"os"
"runtime"
"testing"
"time"
)
@ -51,8 +52,10 @@ func printAndVerifyTimeline(t *testing.T, timeline []model.Message) error {
func serverCheck(serverAddr string) bool {
rc, err := goricochet.Open(serverAddr)
// Won't actaully free thread because rc.start() will wait for the uncalled rc.Process to read from errorChannel...
rc.Conn.Close()
if err == nil {
rc.Conn.Close()
return true
}
return false
@ -61,9 +64,10 @@ func serverCheck(serverAddr string) bool {
func TestCwtchPeerIntegration(t *testing.T) {
// Hide logging "noise"
log.SetOutput(ioutil.Discard)
// Todo: coung goroutines at beginning middle and end after shutdown
numGoRoutinesStart := runtime.NumGoroutine()
// ***** Cwtch Server managment *****
var server *cwtchserver.Server = nil
generatedKey := checkAndGenPrivateKey(keyfile)
serverKey, err := utils.LoadPrivateKeyFromFile(keyfile)
@ -80,7 +84,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
if !serverOnline {
// launch app
server := new(cwtchserver.Server)
server = new(cwtchserver.Server)
fmt.Printf("No server found\nStarting cwtch server...\n")
go server.Run(keyfile)
@ -90,6 +94,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Printf("Found existing cwtch server %v, using for tests...\n", serverAddr)
}
time.Sleep(time.Second * 2)
numGoRoutinesPostServer := runtime.NumGoroutine()
// ***** Peer setup *****
fmt.Println("Creating Alice...")
@ -104,6 +111,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Waiting for alice and bob to connection with onion network...")
time.Sleep(time.Second * 60)
numGoRoutinesPostPeerStart := runtime.NumGoroutine()
// ***** Peering and group creation / invite *****
@ -138,6 +146,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
time.Sleep(time.Second * 3)
numGoRoutinesPostPeer := runtime.NumGoroutine()
fmt.Println("Alice joining server...")
alice.JoinServer(serverAddr)
@ -145,7 +155,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
bob.JoinServer(serverAddr)
// Wait for them to join the server
time.Sleep(time.Second * 30)
time.Sleep(time.Second * 40)
numGouRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Conversation *****
@ -186,7 +197,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** Verify Test *****
// final syncing time...
time.Sleep(time.Second * 10)
time.Sleep(time.Second * 15)
alicesGroup := alice.Profile.GetGroupByGroupID(groupId)
fmt.Printf("alice Groups:\n")
@ -226,4 +237,26 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
// Todo: shutdown users and server
fmt.Println("Shutting down Bob...")
bob.Shutdown()
time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine()
if server != nil {
fmt.Println("Shutting down server...")
server.Shutdown()
time.Sleep(time.Second * 3)
}
numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
fmt.Println("Shutting down Alice...")
alice.Shutdown()
time.Sleep(time.Second * 3)
numGoRoutinesPostAlice := runtime.NumGoroutine()
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeer: %v\nnumGouRoutinesPostServerConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostAlice: %v\n",
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostPeer, numGouRoutinesPostServerConnect,
numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostAlice)
if numGoRoutinesPostServer != numGoRoutinesPostAlice {
t.Errorf("Number of GoRoutines once server checks were completed (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesPostServer, numGoRoutinesPostAlice)
}
}