diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index ab60044..df54dbc 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -320,7 +320,7 @@ func ImportLegacyProfile(profile *model.Profile, cps *CwtchProfileStorage) Cwtch case lastKnownSignature: cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), value) default: - log.Errorf("could not import conversation attribute %v %v", key, value) + log.Errorf("could not import conversation attribute %v", key) } } @@ -487,7 +487,6 @@ func (cp *cwtchPeer) DeleteConversation(id int) error { func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error { cp.mutex.Lock() defer cp.mutex.Unlock() - log.Debugf("setting %v %v on conversation %v", path, value, id) return cp.storage.SetConversationAttribute(id, path, value) } @@ -529,6 +528,8 @@ func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit) } +// UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel +// errors if the message doesn't exist, or for underlying daabase issues. func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error { _, attr, err := cp.GetChannelMessage(conversation, channel, id) if err == nil { @@ -626,10 +627,17 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { return "", model.InconsistentKeyBundleError } -// GetServers returns an unordered list of servers -// Status: TODO +// GetServers returns an unordered list of server handles func (cp *cwtchPeer) GetServers() []string { var servers []string + conversations, err := cp.FetchConversations() + if err == nil { + for _, conversationInfo := range conversations { + if conversationInfo.IsServer() { + servers = append(servers, conversationInfo.Handle) + } + } + } return servers } @@ -1203,6 +1211,8 @@ func (cp *cwtchPeer) GetChannelMessageByContentHash(conversationID int, channelI return cp.storage.GetChannelMessageByContentHash(conversationID, channelID, contenthash) } +// constructGroupFromConversation returns a model.Group wrapper around a database back groups. Useful for +// encrypting / decrypting messages to/from the group. func (cp *cwtchPeer) constructGroupFromConversation(conversationInfo *model.Conversation) (*model.Group, error) { key := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()] groupKey, err := base64.StdEncoding.DecodeString(key) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 10caf80..03b89df 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -274,6 +274,15 @@ func (e *engine) listenFn() { func (e *engine) Shutdown() { e.shuttingDown = true e.service.Shutdown() + + e.ephemeralServices.Range(func(_, service interface{}) bool { + connection, ok := service.(*tor.BaseOnionService) + if ok { + log.Infof("shutting down ephemeral service") + connection.Shutdown() + } + return true + }) e.queue.Shutdown() } diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index adc3bd8..43a708b 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -54,7 +54,6 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co } func TestCwtchPeerIntegration(t *testing.T) { - numGoRoutinesStart := runtime.NumGoroutine() log.AddEverythingFromPattern("connectivity") log.SetLevel(log.LevelDebug) @@ -83,10 +82,12 @@ func TestCwtchPeerIntegration(t *testing.T) { if err != nil { t.Fatalf("Could not start Tor: %v", err) } - pid, _ := acn.GetPID() - t.Logf("Tor pid: %v", pid) acn.WaitTillBootstrapped() defer acn.Close() + + // We don't include ACN in our routine calculations anymore + numGoRoutinesStart := runtime.NumGoroutine() + // ***** Cwtch Server management ***** const ServerKeyBundleBase64 = "eyJLZXlzIjp7ImJ1bGxldGluX2JvYXJkX29uaW9uIjoibmZoeHp2enhpbnJpcGdkaDR0Mm00eGN5M2NyZjZwNGNiaGVjdGdja3VqM2lkc2pzYW90Z293YWQiLCJwcml2YWN5X3Bhc3NfcHVibGljX2tleSI6IjVwd2hQRGJ0c0EvdFI3ZHlUVUkzakpZZnM1L3Jaai9iQ1ZWZEpTc0Jtbk09IiwidG9rZW5fc2VydmljZV9vbmlvbiI6ImVvd25mcTRsNTZxMmU0NWs0bW03MjdsanJod3Z0aDZ5ZWN0dWV1bXB4emJ5cWxnbXVhZm1qdXFkIn0sIlNpZ25hdHVyZSI6IlY5R3NPMHNZWFJ1bGZxdzdmbGdtclVxSTBXS0JlSFIzNjIvR3hGbWZPekpEZjJaRks2ck9jNVRRR1ZxVWIrbXIwV2xId0pwdXh0UW1JRU9KNkplYkNRPT0ifQ==" @@ -215,35 +216,33 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.AddServer(string(serverKeyBundle)) carol.AddServer(string(serverKeyBundle)) + t.Logf("Waiting for alice to join server...") err = alice.JoinServer(ServerAddr) if err != nil { t.Fatalf("alice cannot join server %v %v", ServerAddr, err) } - waitForConnection(t, alice, ServerAddr, connections.AUTHENTICATED) + waitForConnection(t, alice, ServerAddr, connections.SYNCED) // Creating a Group - fmt.Println("Creating group on ", ServerAddr, "...") + t.Logf("Creating group on %v...", ServerAddr) aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr) - fmt.Printf("Created group: %v!\n", aliceGroupConversationID) + t.Logf("Created group: %v!\n", aliceGroupConversationID) if err != nil { t.Errorf("Failed to init group: %v", err) return } - fmt.Println("Waiting for alice to join server...") - // Invites fmt.Println("Alice inviting Bob to group...") err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID) if err != nil { t.Fatalf("Error for Alice inviting Bob to group: %v", err) } - time.Sleep(time.Second * 5) // Alice invites Bob to the Group... - message, _, err := alice.GetChannelMessage(alice2bobConversationID, 0, 1) - t.Logf("Alice message from Bob %v %v", message, err) + message, _, err := bob.GetChannelMessage(bob2aliceConversationID, 0, 1) + t.Logf("Alice message to Bob %v %v", message, err) var overlayMessage model.MessageWrapper json.Unmarshal([]byte(message), &overlayMessage) t.Logf("Parsed Overlay Message: %v", overlayMessage) @@ -267,172 +266,96 @@ func TestCwtchPeerIntegration(t *testing.T) { checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) - //fmt.Println("Alice inviting Carol to group...") - //err = alice.InviteOnionToGroup(carol.GetOnion(), groupID) - //if err != nil { - // t.Fatalf("Error for Alice inviting Carol to group: %v", err) - //} - //time.Sleep(time.Second * 60) // Account for some token acquisition in Alice and Bob flows. - //fmt.Println("Carol examining groups and accepting invites...") - //for _, message := range carol.GetContact(alice.GetOnion()).Timeline.GetMessages() { - // fmt.Printf("Found message from Alice: %v", message.Message) - // if strings.HasPrefix(message.Message, "torv3") { - // gid, err := carol.ImportGroup(message.Message) - // if err == nil { - // fmt.Printf("Carol found invite...now accepting %v...", gid) - // carol.AcceptInvite(gid) - // } else { - // t.Fatalf("Carol could not accept invite...%v", gid) - // } - // } - //} - // - //fmt.Println("Shutting down Alice...") - //app.ShutdownPeer(alice.GetOnion()) - //time.Sleep(time.Second * 5) - numGoRoutinesPostAlice := runtime.NumGoroutine() - // - //fmt.Println("Carol joining server...") - //carol.JoinServer(ServerAddr) - //waitForPeerGroupConnection(t, carol, groupID) - numGoRoutinesPostCarolConnect := runtime.NumGoroutine() - // - //fmt.Printf("%v> %v", bobName, bobLines[2]) - //bob.SendMessage(groupID, bobLines[2]) - //// Bob should have enough tokens so we don't need to account for - //// token acquisition here... - // - //fmt.Printf("%v> %v", carolName, carolLines[0]) - //carol.SendMessage(groupID, carolLines[0]) - //time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should - //// be warmed-up and delays should be pretty small. - // - //// ***** Verify Test ***** - // - //fmt.Println("Final syncing time...") - //time.Sleep(time.Second * 30) - // - //alicesGroup := alice.GetGroup(groupID) - //if alicesGroup == nil { - // t.Error("aliceGroup == nil") - // return - //} - // - //fmt.Printf("Alice's TimeLine:\n") - //aliceVerified := printAndCountVerifedTimeline(t, alicesGroup.GetTimeline()) - //if aliceVerified != 4 { - // t.Errorf("Alice did not have 4 verified messages") - //} - // - //bobsGroup := bob.GetGroup(groupID) - //if bobsGroup == nil { - // t.Error("bobGroup == nil") - // return - //} - //fmt.Printf("Bob's TimeLine:\n") - //bobVerified := printAndCountVerifedTimeline(t, bobsGroup.GetTimeline()) - //if bobVerified != 6 { - // t.Errorf("Bob did not have 6 verified messages") - //} - // - //carolsGroup := carol.GetGroup(groupID) - //fmt.Printf("Carol's TimeLine:\n") - //carolVerified := printAndCountVerifedTimeline(t, carolsGroup.GetTimeline()) - //if carolVerified != 6 { - // t.Errorf("Carol did not have 6 verified messages") - //} - // - //if len(alicesGroup.GetTimeline()) != 4 { - // t.Errorf("Alice's timeline does not have all messages") - //} else { - // // check message 0,1,2,3 - // alicesGroup.Timeline.Sort() - // aliceGroupTimeline := alicesGroup.GetTimeline() - // if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || - // aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { - // t.Errorf("Some of Alice's timeline messages did not have the expected content!") - // } + // Alice invites Bob to the Group... + message, _, err = carol.GetChannelMessage(carol2aliceConversationID, 0, 1) + t.Logf("Alice message to Carol %v %v", message, err) + json.Unmarshal([]byte(message), &overlayMessage) + t.Logf("Parsed Overlay Message: %v", overlayMessage) + err = carol.ImportBundle(overlayMessage.Data) + t.Logf("Result of Carol Importing the Bundle from Alice: %v", err) + t.Logf("Waiting for Carol to join connect to group server...") + err = carol.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus + if err != nil { + t.Fatalf("carol cannot join server %v %v", ServerAddr, err) + } + carolGroupConversationID := 3 + waitForConnection(t, carol, ServerAddr, connections.SYNCED) + + numGoRoutinesPostCarolConnect := runtime.NumGoroutine() + + t.Logf("Shutting down Alice...") + + // Check Alice Timeline checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0]) checkMessage(t, alice, aliceGroupConversationID, 2, bobLines[0]) checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1]) checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1]) + app.ShutdownPeer(alice.GetOnion()) + time.Sleep(time.Second * 3) + numGoRoutinesPostAlice := runtime.NumGoroutine() + + checkSendMessageToGroup(t, carol, carolGroupConversationID, carolLines[0]) + checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2]) time.Sleep(time.Second * 30) + // Check Bob Timeline checkMessage(t, bob, bobGroupConversationID, 1, aliceLines[0]) checkMessage(t, bob, bobGroupConversationID, 2, bobLines[0]) checkMessage(t, bob, bobGroupConversationID, 3, aliceLines[1]) checkMessage(t, bob, bobGroupConversationID, 4, bobLines[1]) + checkMessage(t, bob, bobGroupConversationID, 5, carolLines[0]) + checkMessage(t, bob, bobGroupConversationID, 6, bobLines[2]) - //} - // - //if len(bobsGroup.GetTimeline()) != 6 { - // t.Errorf("Bob's timeline does not have all messages") - //} else { - // // check message 0,1,2,3,4,5 - // bobsGroup.Timeline.Sort() - // bobGroupTimeline := bobsGroup.GetTimeline() - // if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || - // bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || - // bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { - // t.Errorf("Some of Bob's timeline messages did not have the expected content!") - // } - //} - // - //if len(carolsGroup.GetTimeline()) != 6 { - // t.Errorf("Carol's timeline does not have all messages") - //} else { - // // check message 0,1,2,3,4,5 - // carolsGroup.Timeline.Sort() - // carolGroupTimeline := carolsGroup.GetTimeline() - // if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || - // carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || - // carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] { - // t.Errorf("Some of Carol's timeline messages did not have the expected content!") - // } - //} + // Check Carol Timeline + checkMessage(t, carol, carolGroupConversationID, 1, aliceLines[0]) + checkMessage(t, carol, carolGroupConversationID, 2, bobLines[0]) + checkMessage(t, carol, carolGroupConversationID, 3, aliceLines[1]) + checkMessage(t, carol, carolGroupConversationID, 4, bobLines[1]) + checkMessage(t, carol, carolGroupConversationID, 5, carolLines[0]) + checkMessage(t, carol, carolGroupConversationID, 6, bobLines[2]) - fmt.Println("Shutting down Bob...") + t.Logf("Shutting down Bob...") app.ShutdownPeer(bob.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostBob := runtime.NumGoroutine() - fmt.Println("Shutting down Carol...") + t.Logf("Shutting down Carol...") app.ShutdownPeer(carol.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine() - fmt.Println("Shutting down apps...") + t.Logf("Shutting down apps...") fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine()) app.Shutdown() time.Sleep(2 * time.Second) - fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine()) + t.Logf("Done shutdown: %v\n", runtime.NumGoroutine()) numGoRoutinesPostAppShutdown := runtime.NumGoroutine() - fmt.Println("Shutting down ACN...") + t.Logf("Shutting down ACN...") // acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock... time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them - numGoRoutinesPostACN := runtime.NumGoroutine() // Printing out the current goroutines // Very useful if we are leaking any. pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ - "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n", + t.Logf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ + "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, - numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN) + numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) - if numGoRoutinesStart != numGoRoutinesPostACN { - t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN) + if numGoRoutinesStart != numGoRoutinesPostAppShutdown { + t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown) } } +// Utility function for sending a message from a peer to a group func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) t.Logf("%v> %v\n", name, message) @@ -443,6 +366,7 @@ func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, messa time.Sleep(time.Second * 10) } +// Utility function for testing that a message in a conversation is as expected func checkMessage(t *testing.T, profile peer.CwtchPeer, id int, messageID int, expected string) { message, _, err := profile.GetChannelMessage(id, 0, messageID) if err != nil {