Merge pull request 'info->debug fixes; rearrange integ test wait for connections for hopeful speed improvement' (#479) from connectionLogic into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #479
Reviewed-by: Sarah Jamie Lewis <sarah@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2022-12-05 16:30:14 +00:00
commit f630dedab6
6 changed files with 56 additions and 24 deletions

View File

@ -39,7 +39,7 @@ steps:
path: /go path: /go
commands: commands:
- export PATH=`pwd`:$PATH - export PATH=`pwd`:$PATH
- go test -race -v cwtch.im/cwtch/testing/ - go test -timeout=30m -race -v cwtch.im/cwtch/testing/
- name: filesharing-integ-test - name: filesharing-integ-test
image: golang:1.19.1 image: golang:1.19.1
volumes: volumes:
@ -47,7 +47,7 @@ steps:
path: /go path: /go
commands: commands:
- export PATH=`pwd`:$PATH - export PATH=`pwd`:$PATH
- go test -race -v cwtch.im/cwtch/testing/filesharing - go test -timeout=20m -race -v cwtch.im/cwtch/testing/filesharing
- name: notify-gogs - name: notify-gogs
image: openpriv/drone-gogs image: openpriv/drone-gogs
pull: if-not-exists pull: if-not-exists

View File

@ -282,13 +282,16 @@ func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) { func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) if _, exists := app.engines[onion]; !exists {
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.QueryACNStatus()
if doListen { app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
profile.Listen() app.QueryACNStatus()
if doListen {
profile.Listen()
}
profile.StartConnections(doPeers, doServers)
} }
profile.StartConnections(doPeers, doServers)
} }
} }

View File

@ -327,7 +327,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
pinf, _ := cr.connections.Load(id) pinf, _ := cr.connections.Load(id)
p := pinf.(*contact) p := pinf.(*contact)
log.Infof(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion) log.Debugf(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
if p.state == connections.SYNCED || p.state == connections.AUTHENTICATED { if p.state == connections.SYNCED || p.state == connections.AUTHENTICATED {
p.lastSeen = time.Now() p.lastSeen = time.Now()

View File

@ -1015,7 +1015,7 @@ func (cp *cwtchPeer) Listen() {
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
if !cp.listenStatus { if !cp.listenStatus {
log.Infof("cwtchPeer Listen sending ProtocolEngineStartListen\n") log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.listenStatus = true cp.listenStatus = true
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: string(onion)})) cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: string(onion)}))

View File

@ -1,7 +1,6 @@
package testing package testing
import ( import (
// Import SQL Cipher
"crypto/rand" "crypto/rand"
app2 "cwtch.im/cwtch/app" app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
@ -12,6 +11,7 @@ import (
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
@ -23,6 +23,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"strconv"
"testing" "testing"
"time" "time"
) )
@ -53,6 +54,16 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
} }
} }
func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) {
for {
_, err := peer.GetConversationAttribute(convId, szp)
if err == nil {
return
}
time.Sleep(time.Second * 5)
}
}
func checkAndLoadTokens() []*privacypass.Token { func checkAndLoadTokens() []*privacypass.Token {
var tokens []*privacypass.Token var tokens []*privacypass.Token
data, err := os.ReadFile("../tokens") data, err := os.ReadFile("../tokens")
@ -150,18 +161,21 @@ func TestCwtchPeerIntegration(t *testing.T) {
app.CreateTaggedPeer("Carol", "asdfasdf", "test") app.CreateTaggedPeer("Carol", "asdfasdf", "test")
alice := app2.WaitGetPeer(app, "Alice") alice := app2.WaitGetPeer(app, "Alice")
aliceBus := app.GetEventBus(alice.GetOnion())
app.ActivatePeerEngine(alice.GetOnion(), true, true, true) app.ActivatePeerEngine(alice.GetOnion(), true, true, true)
log.Infoln("Alice created:", alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := app2.WaitGetPeer(app, "Bob") bob := app2.WaitGetPeer(app, "Bob")
bobBus := app.GetEventBus(bob.GetOnion())
app.ActivatePeerEngine(bob.GetOnion(), true, true, true) app.ActivatePeerEngine(bob.GetOnion(), true, true, true)
log.Infoln("Bob created:", bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := app2.WaitGetPeer(app, "Carol") carol := app2.WaitGetPeer(app, "Carol")
carolBus := app.GetEventBus(carol.GetOnion())
app.ActivatePeerEngine(carol.GetOnion(), true, true, true) app.ActivatePeerEngine(carol.GetOnion(), true, true, true)
log.Infoln("Carol created:", carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
@ -232,23 +246,27 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Probably related to latency/throughput problems in the underlying tor network. // Probably related to latency/throughput problems in the underlying tor network.
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
waitForRetVal(bob, bob2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
aliceName, err := bob.GetConversationAttribute(bob2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err := bob.GetConversationAttribute(bob2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err) t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err)
} }
log.Infof("Bob has alice's name as '%v'\n", aliceName) log.Infof("Bob has alice's name as '%v'\n", aliceName)
waitForRetVal(alice, alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || bobName != "Bob" { if err != nil || bobName != "Bob" {
t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err) t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err)
} }
log.Infof("Alice has bob's name as '%v'\n", bobName) log.Infof("Alice has bob's name as '%v'\n", bobName)
waitForRetVal(carol, carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
t.Fatalf("carol GetKeyVal error for alice peer.name %v: %v\n", aliceName, err) t.Fatalf("carol GetKeyVal error for alice peer.name %v: %v\n", aliceName, err)
} }
waitForRetVal(alice, alice2carolConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
carolName, err := alice.GetConversationAttribute(alice2carolConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) carolName, err := alice.GetConversationAttribute(alice2carolConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || carolName != "Carol" { if err != nil || carolName != "Carol" {
t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err) t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err)
@ -267,9 +285,6 @@ func TestCwtchPeerIntegration(t *testing.T) {
t.Fatalf("Failed to Add Server Bundle %v", err) t.Fatalf("Failed to Add Server Bundle %v", err)
} }
log.Infof("Waiting for alice to join server...")
waitForConnection(t, alice, ServerAddr, connections.SYNCED)
// Creating a Group // Creating a Group
log.Infof("Creating group on %v...", ServerAddr) log.Infof("Creating group on %v...", ServerAddr)
aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr) aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr)
@ -300,6 +315,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
usedTokens += len(bobLines) usedTokens += len(bobLines)
} }
log.Infof("Waiting for alice to join server...")
waitForConnection(t, alice, ServerAddr, connections.SYNCED)
log.Infof("Waiting for Bob to join connect to group server...") log.Infof("Waiting for Bob to join connect to group server...")
waitForConnection(t, bob, ServerAddr, connections.SYNCED) waitForConnection(t, bob, ServerAddr, connections.SYNCED)
@ -312,10 +329,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** Conversation ***** // ***** Conversation *****
log.Infof("Starting conversation in group...") log.Infof("Starting conversation in group...")
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0]) checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0]) checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[0])
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, alice, aliceBus, aliceGroupConversationID, aliceLines[1])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1]) checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[1])
// Pretend that Carol Acquires the Overlay Message through some other means... // Pretend that Carol Acquires the Overlay Message through some other means...
json.Unmarshal([]byte(message), &overlayMessage) json.Unmarshal([]byte(message), &overlayMessage)
@ -342,8 +359,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostAlice := runtime.NumGoroutine() numGoRoutinesPostAlice := runtime.NumGoroutine()
checkSendMessageToGroup(t, carol, carolGroupConversationID, carolLines[0]) checkSendMessageToGroup(t, carol, carolBus, carolGroupConversationID, carolLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2]) checkSendMessageToGroup(t, bob, bobBus, bobGroupConversationID, bobLines[2])
// Time to Sync // Time to Sync
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
@ -393,7 +410,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Printing out the current goroutines // Printing out the current goroutines
// Very useful if we are leaking any. // Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Println("")
log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
@ -405,14 +422,26 @@ func TestCwtchPeerIntegration(t *testing.T) {
} }
// Utility function for sending a message from a peer to a group // Utility function for sending a message from a peer to a group
func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, bus event.Manager, id int, message string) {
name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
log.Infof("%v> %v\n", name, message) log.Infof("%v> %v\n", name, message)
_, err := profile.SendMessage(id, message) queue := event.NewQueue()
bus.Subscribe(event.IndexedAcknowledgement, queue)
mid, err := profile.SendMessage(id, message)
if err != nil { if err != nil {
log.Errorf("Alice failed to send a message to the group: %v", err) log.Errorf("Alice failed to send a message to the group: %v", err)
t.Fatalf("Alice failed to send a message to the group: %v\n", err) t.Fatalf("Alice failed to send a message to the group: %v\n", err)
} }
log.Infof("Sent message with mid: %v, waiting for ack...", mid)
ev := queue.Next()
switch ev.EventType {
case event.IndexedAcknowledgement:
if evid, err := strconv.Atoi(ev.Data[event.Index]); err == nil && evid == mid {
log.Infof("Message mid acked!")
break
}
}
queue.Shutdown()
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
} }

View File

@ -215,7 +215,7 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional
} }
// Wait for the file downloaded event // Wait for the file downloaded event
ClientTimeout := utils2.TimeoutPolicy(time.Second * 60) ClientTimeout := utils2.TimeoutPolicy(time.Second * 120)
err = ClientTimeout.ExecuteAction(func() error { err = ClientTimeout.ExecuteAction(func() error {
ev := queueOracle.Next() ev := queueOracle.Next()
if ev.EventType != event.FileDownloaded { if ev.EventType != event.FileDownloaded {