Fixups for Integration Test
continuous-integration/drone/push Build is pending Details
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2021-11-19 14:04:43 -08:00
parent 847b04e4fc
commit 4f5b1fa106
3 changed files with 81 additions and 138 deletions

View File

@ -320,7 +320,7 @@ func ImportLegacyProfile(profile *model.Profile, cps *CwtchProfileStorage) Cwtch
case lastKnownSignature:
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), value)
default:
log.Errorf("could not import conversation attribute %v %v", key, value)
log.Errorf("could not import conversation attribute %v", key)
}
}
@ -487,7 +487,6 @@ func (cp *cwtchPeer) DeleteConversation(id int) error {
func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
log.Debugf("setting %v %v on conversation %v", path, value, id)
return cp.storage.SetConversationAttribute(id, path, value)
}
@ -529,6 +528,8 @@ func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset
return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit)
}
// UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel
// errors if the message doesn't exist, or for underlying daabase issues.
func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error {
_, attr, err := cp.GetChannelMessage(conversation, channel, id)
if err == nil {
@ -626,10 +627,17 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) {
return "", model.InconsistentKeyBundleError
}
// GetServers returns an unordered list of servers
// Status: TODO
// GetServers returns an unordered list of server handles
func (cp *cwtchPeer) GetServers() []string {
var servers []string
conversations, err := cp.FetchConversations()
if err == nil {
for _, conversationInfo := range conversations {
if conversationInfo.IsServer() {
servers = append(servers, conversationInfo.Handle)
}
}
}
return servers
}
@ -1203,6 +1211,8 @@ func (cp *cwtchPeer) GetChannelMessageByContentHash(conversationID int, channelI
return cp.storage.GetChannelMessageByContentHash(conversationID, channelID, contenthash)
}
// constructGroupFromConversation returns a model.Group wrapper around a database back groups. Useful for
// encrypting / decrypting messages to/from the group.
func (cp *cwtchPeer) constructGroupFromConversation(conversationInfo *model.Conversation) (*model.Group, error) {
key := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()]
groupKey, err := base64.StdEncoding.DecodeString(key)

View File

@ -274,6 +274,15 @@ func (e *engine) listenFn() {
func (e *engine) Shutdown() {
e.shuttingDown = true
e.service.Shutdown()
e.ephemeralServices.Range(func(_, service interface{}) bool {
connection, ok := service.(*tor.BaseOnionService)
if ok {
log.Infof("shutting down ephemeral service")
connection.Shutdown()
}
return true
})
e.queue.Shutdown()
}

View File

@ -54,7 +54,6 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
}
func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesStart := runtime.NumGoroutine()
log.AddEverythingFromPattern("connectivity")
log.SetLevel(log.LevelDebug)
@ -83,10 +82,12 @@ func TestCwtchPeerIntegration(t *testing.T) {
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
}
pid, _ := acn.GetPID()
t.Logf("Tor pid: %v", pid)
acn.WaitTillBootstrapped()
defer acn.Close()
// We don't include ACN in our routine calculations anymore
numGoRoutinesStart := runtime.NumGoroutine()
// ***** Cwtch Server management *****
const ServerKeyBundleBase64 = "eyJLZXlzIjp7ImJ1bGxldGluX2JvYXJkX29uaW9uIjoibmZoeHp2enhpbnJpcGdkaDR0Mm00eGN5M2NyZjZwNGNiaGVjdGdja3VqM2lkc2pzYW90Z293YWQiLCJwcml2YWN5X3Bhc3NfcHVibGljX2tleSI6IjVwd2hQRGJ0c0EvdFI3ZHlUVUkzakpZZnM1L3Jaai9iQ1ZWZEpTc0Jtbk09IiwidG9rZW5fc2VydmljZV9vbmlvbiI6ImVvd25mcTRsNTZxMmU0NWs0bW03MjdsanJod3Z0aDZ5ZWN0dWV1bXB4emJ5cWxnbXVhZm1qdXFkIn0sIlNpZ25hdHVyZSI6IlY5R3NPMHNZWFJ1bGZxdzdmbGdtclVxSTBXS0JlSFIzNjIvR3hGbWZPekpEZjJaRks2ck9jNVRRR1ZxVWIrbXIwV2xId0pwdXh0UW1JRU9KNkplYkNRPT0ifQ=="
@ -215,35 +216,33 @@ func TestCwtchPeerIntegration(t *testing.T) {
bob.AddServer(string(serverKeyBundle))
carol.AddServer(string(serverKeyBundle))
t.Logf("Waiting for alice to join server...")
err = alice.JoinServer(ServerAddr)
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
waitForConnection(t, alice, ServerAddr, connections.AUTHENTICATED)
waitForConnection(t, alice, ServerAddr, connections.SYNCED)
// Creating a Group
fmt.Println("Creating group on ", ServerAddr, "...")
t.Logf("Creating group on %v...", ServerAddr)
aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr)
fmt.Printf("Created group: %v!\n", aliceGroupConversationID)
t.Logf("Created group: %v!\n", aliceGroupConversationID)
if err != nil {
t.Errorf("Failed to init group: %v", err)
return
}
fmt.Println("Waiting for alice to join server...")
// Invites
fmt.Println("Alice inviting Bob to group...")
err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID)
if err != nil {
t.Fatalf("Error for Alice inviting Bob to group: %v", err)
}
time.Sleep(time.Second * 5)
// Alice invites Bob to the Group...
message, _, err := alice.GetChannelMessage(alice2bobConversationID, 0, 1)
t.Logf("Alice message from Bob %v %v", message, err)
message, _, err := bob.GetChannelMessage(bob2aliceConversationID, 0, 1)
t.Logf("Alice message to Bob %v %v", message, err)
var overlayMessage model.MessageWrapper
json.Unmarshal([]byte(message), &overlayMessage)
t.Logf("Parsed Overlay Message: %v", overlayMessage)
@ -267,172 +266,96 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1])
//fmt.Println("Alice inviting Carol to group...")
//err = alice.InviteOnionToGroup(carol.GetOnion(), groupID)
//if err != nil {
// t.Fatalf("Error for Alice inviting Carol to group: %v", err)
//}
//time.Sleep(time.Second * 60) // Account for some token acquisition in Alice and Bob flows.
//fmt.Println("Carol examining groups and accepting invites...")
//for _, message := range carol.GetContact(alice.GetOnion()).Timeline.GetMessages() {
// fmt.Printf("Found message from Alice: %v", message.Message)
// if strings.HasPrefix(message.Message, "torv3") {
// gid, err := carol.ImportGroup(message.Message)
// if err == nil {
// fmt.Printf("Carol found invite...now accepting %v...", gid)
// carol.AcceptInvite(gid)
// } else {
// t.Fatalf("Carol could not accept invite...%v", gid)
// }
// }
//}
//
//fmt.Println("Shutting down Alice...")
//app.ShutdownPeer(alice.GetOnion())
//time.Sleep(time.Second * 5)
numGoRoutinesPostAlice := runtime.NumGoroutine()
//
//fmt.Println("Carol joining server...")
//carol.JoinServer(ServerAddr)
//waitForPeerGroupConnection(t, carol, groupID)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
//
//fmt.Printf("%v> %v", bobName, bobLines[2])
//bob.SendMessage(groupID, bobLines[2])
//// Bob should have enough tokens so we don't need to account for
//// token acquisition here...
//
//fmt.Printf("%v> %v", carolName, carolLines[0])
//carol.SendMessage(groupID, carolLines[0])
//time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
//// be warmed-up and delays should be pretty small.
//
//// ***** Verify Test *****
//
//fmt.Println("Final syncing time...")
//time.Sleep(time.Second * 30)
//
//alicesGroup := alice.GetGroup(groupID)
//if alicesGroup == nil {
// t.Error("aliceGroup == nil")
// return
//}
//
//fmt.Printf("Alice's TimeLine:\n")
//aliceVerified := printAndCountVerifedTimeline(t, alicesGroup.GetTimeline())
//if aliceVerified != 4 {
// t.Errorf("Alice did not have 4 verified messages")
//}
//
//bobsGroup := bob.GetGroup(groupID)
//if bobsGroup == nil {
// t.Error("bobGroup == nil")
// return
//}
//fmt.Printf("Bob's TimeLine:\n")
//bobVerified := printAndCountVerifedTimeline(t, bobsGroup.GetTimeline())
//if bobVerified != 6 {
// t.Errorf("Bob did not have 6 verified messages")
//}
//
//carolsGroup := carol.GetGroup(groupID)
//fmt.Printf("Carol's TimeLine:\n")
//carolVerified := printAndCountVerifedTimeline(t, carolsGroup.GetTimeline())
//if carolVerified != 6 {
// t.Errorf("Carol did not have 6 verified messages")
//}
//
//if len(alicesGroup.GetTimeline()) != 4 {
// t.Errorf("Alice's timeline does not have all messages")
//} else {
// // check message 0,1,2,3
// alicesGroup.Timeline.Sort()
// aliceGroupTimeline := alicesGroup.GetTimeline()
// if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
// aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
// t.Errorf("Some of Alice's timeline messages did not have the expected content!")
// }
// Alice invites Bob to the Group...
message, _, err = carol.GetChannelMessage(carol2aliceConversationID, 0, 1)
t.Logf("Alice message to Carol %v %v", message, err)
json.Unmarshal([]byte(message), &overlayMessage)
t.Logf("Parsed Overlay Message: %v", overlayMessage)
err = carol.ImportBundle(overlayMessage.Data)
t.Logf("Result of Carol Importing the Bundle from Alice: %v", err)
t.Logf("Waiting for Carol to join connect to group server...")
err = carol.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus
if err != nil {
t.Fatalf("carol cannot join server %v %v", ServerAddr, err)
}
carolGroupConversationID := 3
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
t.Logf("Shutting down Alice...")
// Check Alice Timeline
checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0])
checkMessage(t, alice, aliceGroupConversationID, 2, bobLines[0])
checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1])
checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1])
app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 3)
numGoRoutinesPostAlice := runtime.NumGoroutine()
checkSendMessageToGroup(t, carol, carolGroupConversationID, carolLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2])
time.Sleep(time.Second * 30)
// Check Bob Timeline
checkMessage(t, bob, bobGroupConversationID, 1, aliceLines[0])
checkMessage(t, bob, bobGroupConversationID, 2, bobLines[0])
checkMessage(t, bob, bobGroupConversationID, 3, aliceLines[1])
checkMessage(t, bob, bobGroupConversationID, 4, bobLines[1])
checkMessage(t, bob, bobGroupConversationID, 5, carolLines[0])
checkMessage(t, bob, bobGroupConversationID, 6, bobLines[2])
//}
//
//if len(bobsGroup.GetTimeline()) != 6 {
// t.Errorf("Bob's timeline does not have all messages")
//} else {
// // check message 0,1,2,3,4,5
// bobsGroup.Timeline.Sort()
// bobGroupTimeline := bobsGroup.GetTimeline()
// if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
// bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] ||
// bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
// t.Errorf("Some of Bob's timeline messages did not have the expected content!")
// }
//}
//
//if len(carolsGroup.GetTimeline()) != 6 {
// t.Errorf("Carol's timeline does not have all messages")
//} else {
// // check message 0,1,2,3,4,5
// carolsGroup.Timeline.Sort()
// carolGroupTimeline := carolsGroup.GetTimeline()
// if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
// carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
// carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] {
// t.Errorf("Some of Carol's timeline messages did not have the expected content!")
// }
//}
// Check Carol Timeline
checkMessage(t, carol, carolGroupConversationID, 1, aliceLines[0])
checkMessage(t, carol, carolGroupConversationID, 2, bobLines[0])
checkMessage(t, carol, carolGroupConversationID, 3, aliceLines[1])
checkMessage(t, carol, carolGroupConversationID, 4, bobLines[1])
checkMessage(t, carol, carolGroupConversationID, 5, carolLines[0])
checkMessage(t, carol, carolGroupConversationID, 6, bobLines[2])
fmt.Println("Shutting down Bob...")
t.Logf("Shutting down Bob...")
app.ShutdownPeer(bob.GetOnion())
time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine()
fmt.Println("Shutting down Carol...")
t.Logf("Shutting down Carol...")
app.ShutdownPeer(carol.GetOnion())
time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine()
fmt.Println("Shutting down apps...")
t.Logf("Shutting down apps...")
fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine())
app.Shutdown()
time.Sleep(2 * time.Second)
fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine())
t.Logf("Done shutdown: %v\n", runtime.NumGoroutine())
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
fmt.Println("Shutting down ACN...")
t.Logf("Shutting down ACN...")
// acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock...
time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostACN := runtime.NumGoroutine()
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n",
t.Logf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN)
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)
if numGoRoutinesStart != numGoRoutinesPostACN {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN)
if numGoRoutinesStart != numGoRoutinesPostAppShutdown {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown)
}
}
// Utility function for sending a message from a peer to a group
func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) {
name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
t.Logf("%v> %v\n", name, message)
@ -443,6 +366,7 @@ func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, messa
time.Sleep(time.Second * 10)
}
// Utility function for testing that a message in a conversation is as expected
func checkMessage(t *testing.T, profile peer.CwtchPeer, id int, messageID int, expected string) {
message, _, err := profile.GetChannelMessage(id, 0, messageID)
if err != nil {