From 530f2d97738bf864588c376b4d42055a8ba5d8d7 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Sun, 4 Dec 2022 22:11:32 -0800 Subject: [PATCH] integ test wait for group message to be acked --- testing/cwtch_peer_server_integration_test.go | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 198799b..2972723 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -23,6 +23,7 @@ import ( "path/filepath" "runtime" "runtime/pprof" + "strconv" "testing" "time" ) @@ -150,18 +151,21 @@ func TestCwtchPeerIntegration(t *testing.T) { app.CreateTaggedPeer("Carol", "asdfasdf", "test") alice := app2.WaitGetPeer(app, "Alice") + aliceBus := app.GetEventBus(alice.GetOnion()) app.ActivatePeerEngine(alice.GetOnion(), true, true, true) log.Infoln("Alice created:", alice.GetOnion()) alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob := app2.WaitGetPeer(app, "Bob") + bobBus := app.GetEventBus(bob.GetOnion()) app.ActivatePeerEngine(bob.GetOnion(), true, true, true) log.Infoln("Bob created:", bob.GetOnion()) bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol := app2.WaitGetPeer(app, "Carol") + carolBus := app.GetEventBus(carol.GetOnion()) app.ActivatePeerEngine(carol.GetOnion(), true, true, true) log.Infoln("Carol created:", carol.GetOnion()) carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") @@ -311,10 +315,10 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Conversation ***** log.Infof("Starting conversation in group...") - checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0]) - checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0]) - checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) - checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) + checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[0]) + checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[0]) + checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[1]) + checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[1]) // Pretend that Carol Acquires the Overlay Message through some other means... json.Unmarshal([]byte(message), &overlayMessage) @@ -341,8 +345,8 @@ func TestCwtchPeerIntegration(t *testing.T) { time.Sleep(time.Second * 3) numGoRoutinesPostAlice := runtime.NumGoroutine() - checkSendMessageToGroup(t, carol, carolGroupConversationID, carolLines[0]) - checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2]) + checkSendMessageToGroup(t, carol, carolBus, carolGroupConversationID, carolLines[0]) + checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[2]) // Time to Sync time.Sleep(time.Second * 10) @@ -404,14 +408,26 @@ func TestCwtchPeerIntegration(t *testing.T) { } // Utility function for sending a message from a peer to a group -func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { +func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, bus event.Manager, id int, message string) { name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) log.Infof("%v> %v\n", name, message) - _, err := profile.SendMessage(id, message) + queue := event.NewQueue() + bus.Subscribe(event.IndexedAcknowledgement, queue) + mid, err := profile.SendMessage(id, message) if err != nil { log.Errorf("Alice failed to send a message to the group: %v", err) t.Fatalf("Alice failed to send a message to the group: %v\n", err) } + log.Infof("Sent message with mid: %v, waiting for ack...", mid) + ev := queue.Next() + switch ev.EventType { + case event.IndexedAcknowledgement: + if evid, err := strconv.Atoi(ev.Data[event.Index]); err == nil && evid == mid { + log.Infof("Message mid acked!") + break + } + } + queue.Shutdown() time.Sleep(time.Second * 10) }