diff --git a/.drone.yml b/.drone.yml index 52a0828..4ee878e 100644 --- a/.drone.yml +++ b/.drone.yml @@ -39,7 +39,7 @@ steps: path: /go commands: - export PATH=`pwd`:$PATH - - go test -race -v cwtch.im/cwtch/testing/ + - go test -timeout=30m -race -v cwtch.im/cwtch/testing/ - name: filesharing-integ-test image: golang:1.19.1 volumes: @@ -47,7 +47,7 @@ steps: path: /go commands: - export PATH=`pwd`:$PATH - - go test -race -v cwtch.im/cwtch/testing/filesharing + - go test -timeout=20m -race -v cwtch.im/cwtch/testing/filesharing - name: notify-gogs image: openpriv/drone-gogs pull: if-not-exists diff --git a/app/app.go b/app/app.go index 734db56..92ae38a 100644 --- a/app/app.go +++ b/app/app.go @@ -282,13 +282,16 @@ func (app *application) ActivateEngines(doListen, doPeers, doServers bool) { func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) { profile := app.GetPeer(onion) if profile != nil { - app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) - app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) - app.QueryACNStatus() - if doListen { - profile.Listen() + if _, exists := app.engines[onion]; !exists { + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + + app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) + app.QueryACNStatus() + if doListen { + profile.Listen() + } + profile.StartConnections(doPeers, doServers) } - profile.StartConnections(doPeers, doServers) } } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index dcfd06d..97e34ce 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -327,7 +327,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState pinf, _ := cr.connections.Load(id) p := pinf.(*contact) - log.Infof(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion) + log.Debugf(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion) if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { if p.state == connections.SYNCED || p.state == connections.AUTHENTICATED { p.lastSeen = time.Now() diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 3d7ed6c..d5a9321 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1015,7 +1015,7 @@ func (cp *cwtchPeer) Listen() { cp.mutex.Lock() defer cp.mutex.Unlock() if !cp.listenStatus { - log.Infof("cwtchPeer Listen sending ProtocolEngineStartListen\n") + log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n") cp.listenStatus = true onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: string(onion)})) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 1fbe98f..9845b81 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -1,7 +1,6 @@ package testing import ( - // Import SQL Cipher "crypto/rand" app2 "cwtch.im/cwtch/app" "cwtch.im/cwtch/event" @@ -12,6 +11,7 @@ import ( "cwtch.im/cwtch/protocol/connections" "encoding/base64" "encoding/json" + "fmt" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" @@ -23,6 +23,7 @@ import ( "path/filepath" "runtime" "runtime/pprof" + "strconv" "testing" "time" ) @@ -53,6 +54,16 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co } } +func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) { + for { + _, err := peer.GetConversationAttribute(convId, szp) + if err == nil { + return + } + time.Sleep(time.Second * 5) + } +} + func checkAndLoadTokens() []*privacypass.Token { var tokens []*privacypass.Token data, err := os.ReadFile("../tokens") @@ -150,18 +161,21 @@ func TestCwtchPeerIntegration(t *testing.T) { app.CreateTaggedPeer("Carol", "asdfasdf", "test") alice := app2.WaitGetPeer(app, "Alice") + aliceBus := app.GetEventBus(alice.GetOnion()) app.ActivatePeerEngine(alice.GetOnion(), true, true, true) log.Infoln("Alice created:", alice.GetOnion()) alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob := app2.WaitGetPeer(app, "Bob") + bobBus := app.GetEventBus(bob.GetOnion()) app.ActivatePeerEngine(bob.GetOnion(), true, true, true) log.Infoln("Bob created:", bob.GetOnion()) bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol := app2.WaitGetPeer(app, "Carol") + carolBus := app.GetEventBus(carol.GetOnion()) app.ActivatePeerEngine(carol.GetOnion(), true, true, true) log.Infoln("Carol created:", carol.GetOnion()) carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") @@ -232,23 +246,27 @@ func TestCwtchPeerIntegration(t *testing.T) { // Probably related to latency/throughput problems in the underlying tor network. time.Sleep(30 * time.Second) + waitForRetVal(bob, bob2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err := bob.GetConversationAttribute(bob2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || aliceName != "Alice" { t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err) } log.Infof("Bob has alice's name as '%v'\n", aliceName) + waitForRetVal(alice, alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || bobName != "Bob" { t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err) } log.Infof("Alice has bob's name as '%v'\n", bobName) + waitForRetVal(carol, carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || aliceName != "Alice" { t.Fatalf("carol GetKeyVal error for alice peer.name %v: %v\n", aliceName, err) } + waitForRetVal(alice, alice2carolConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) carolName, err := alice.GetConversationAttribute(alice2carolConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || carolName != "Carol" { t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err) @@ -267,9 +285,6 @@ func TestCwtchPeerIntegration(t *testing.T) { t.Fatalf("Failed to Add Server Bundle %v", err) } - log.Infof("Waiting for alice to join server...") - waitForConnection(t, alice, ServerAddr, connections.SYNCED) - // Creating a Group log.Infof("Creating group on %v...", ServerAddr) aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr) @@ -300,6 +315,8 @@ func TestCwtchPeerIntegration(t *testing.T) { usedTokens += len(bobLines) } + log.Infof("Waiting for alice to join server...") + waitForConnection(t, alice, ServerAddr, connections.SYNCED) log.Infof("Waiting for Bob to join connect to group server...") waitForConnection(t, bob, ServerAddr, connections.SYNCED) @@ -312,10 +329,10 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Conversation ***** log.Infof("Starting conversation in group...") - checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0]) - checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0]) - checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) - checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) + checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[0]) + checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[0]) + checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[1]) + checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[1]) // Pretend that Carol Acquires the Overlay Message through some other means... json.Unmarshal([]byte(message), &overlayMessage) @@ -342,8 +359,8 @@ func TestCwtchPeerIntegration(t *testing.T) { time.Sleep(time.Second * 3) numGoRoutinesPostAlice := runtime.NumGoroutine() - checkSendMessageToGroup(t, carol, carolGroupConversationID, carolLines[0]) - checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2]) + checkSendMessageToGroup(t, carol, carolBus, carolGroupConversationID, carolLines[0]) + checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[2]) // Time to Sync time.Sleep(time.Second * 10) @@ -393,7 +410,7 @@ func TestCwtchPeerIntegration(t *testing.T) { // Printing out the current goroutines // Very useful if we are leaking any. pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - + fmt.Println("") log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, @@ -405,14 +422,26 @@ func TestCwtchPeerIntegration(t *testing.T) { } // Utility function for sending a message from a peer to a group -func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { +func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, bus event.Manager, id int, message string) { name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) log.Infof("%v> %v\n", name, message) - _, err := profile.SendMessage(id, message) + queue := event.NewQueue() + bus.Subscribe(event.IndexedAcknowledgement, queue) + mid, err := profile.SendMessage(id, message) if err != nil { log.Errorf("Alice failed to send a message to the group: %v", err) t.Fatalf("Alice failed to send a message to the group: %v\n", err) } + log.Infof("Sent message with mid: %v, waiting for ack...", mid) + ev := queue.Next() + switch ev.EventType { + case event.IndexedAcknowledgement: + if evid, err := strconv.Atoi(ev.Data[event.Index]); err == nil && evid == mid { + log.Infof("Message mid acked!") + break + } + } + queue.Shutdown() time.Sleep(time.Second * 10) } diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index e3006dd..b266847 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -215,7 +215,7 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional } // Wait for the file downloaded event - ClientTimeout := utils2.TimeoutPolicy(time.Second * 60) + ClientTimeout := utils2.TimeoutPolicy(time.Second * 120) err = ClientTimeout.ExecuteAction(func() error { ev := queueOracle.Next() if ev.EventType != event.FileDownloaded {