integ test wait for group message to be acked

This commit is contained in:
Dan Ballard 2022-12-04 22:11:32 -08:00 committed by Gitea
parent bdb4b93f59
commit 530f2d9773
1 changed files with 24 additions and 8 deletions

View File

@ -23,6 +23,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"strconv"
"testing" "testing"
"time" "time"
) )
@ -150,18 +151,21 @@ func TestCwtchPeerIntegration(t *testing.T) {
app.CreateTaggedPeer("Carol", "asdfasdf", "test") app.CreateTaggedPeer("Carol", "asdfasdf", "test")
alice := app2.WaitGetPeer(app, "Alice") alice := app2.WaitGetPeer(app, "Alice")
aliceBus := app.GetEventBus(alice.GetOnion())
app.ActivatePeerEngine(alice.GetOnion(), true, true, true) app.ActivatePeerEngine(alice.GetOnion(), true, true, true)
log.Infoln("Alice created:", alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := app2.WaitGetPeer(app, "Bob") bob := app2.WaitGetPeer(app, "Bob")
bobBus := app.GetEventBus(bob.GetOnion())
app.ActivatePeerEngine(bob.GetOnion(), true, true, true) app.ActivatePeerEngine(bob.GetOnion(), true, true, true)
log.Infoln("Bob created:", bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := app2.WaitGetPeer(app, "Carol") carol := app2.WaitGetPeer(app, "Carol")
carolBus := app.GetEventBus(carol.GetOnion())
app.ActivatePeerEngine(carol.GetOnion(), true, true, true) app.ActivatePeerEngine(carol.GetOnion(), true, true, true)
log.Infoln("Carol created:", carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
@ -311,10 +315,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** Conversation ***** // ***** Conversation *****
log.Infof("Starting conversation in group...") log.Infof("Starting conversation in group...")
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0]) checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0]) checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[0])
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[1])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[1])
// Pretend that Carol Acquires the Overlay Message through some other means... // Pretend that Carol Acquires the Overlay Message through some other means...
json.Unmarshal([]byte(message), &overlayMessage) json.Unmarshal([]byte(message), &overlayMessage)
@ -341,8 +345,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostAlice := runtime.NumGoroutine() numGoRoutinesPostAlice := runtime.NumGoroutine()
checkSendMessageToGroup(t, carol, carolGroupConversationID, carolLines[0]) checkSendMessageToGroup(t, carol, carolBus, carolGroupConversationID, carolLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2]) checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[2])
// Time to Sync // Time to Sync
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
@ -404,14 +408,26 @@ func TestCwtchPeerIntegration(t *testing.T) {
} }
// Utility function for sending a message from a peer to a group // Utility function for sending a message from a peer to a group
func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, bus event.Manager, id int, message string) {
name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
log.Infof("%v> %v\n", name, message) log.Infof("%v> %v\n", name, message)
_, err := profile.SendMessage(id, message) queue := event.NewQueue()
bus.Subscribe(event.IndexedAcknowledgement, queue)
mid, err := profile.SendMessage(id, message)
if err != nil { if err != nil {
log.Errorf("Alice failed to send a message to the group: %v", err) log.Errorf("Alice failed to send a message to the group: %v", err)
t.Fatalf("Alice failed to send a message to the group: %v\n", err) t.Fatalf("Alice failed to send a message to the group: %v\n", err)
} }
log.Infof("Sent message with mid: %v, waiting for ack...", mid)
ev := queue.Next()
switch ev.EventType {
case event.IndexedAcknowledgement:
if evid, err := strconv.Atoi(ev.Data[event.Index]); err == nil && evid == mid {
log.Infof("Message mid acked!")
break
}
}
queue.Shutdown()
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
} }