change locking on engine.ephermeralServices; logify integ test; delete unused events
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Dan Ballard 2021-12-16 19:07:08 -05:00
parent 3efacc889d
commit ac05caf009
4 changed files with 86 additions and 135 deletions

View File

@ -55,10 +55,6 @@ const (
// GroupID: groupID (allows them to fetch from the peer) // GroupID: groupID (allows them to fetch from the peer)
NewGroup = Type("NewGroup") NewGroup = Type("NewGroup")
// GroupID
AcceptGroupInvite = Type("AcceptGroupInvite")
RejectGroupInvite = Type("RejectGroupInvite")
SendMessageToGroup = Type("SendMessagetoGroup") SendMessageToGroup = Type("SendMessagetoGroup")
//Ciphertext, Signature: //Ciphertext, Signature:
@ -111,12 +107,6 @@ const (
// RemotePeer: The peer associated with the acknowledgement // RemotePeer: The peer associated with the acknowledgement
IndexedFailure = Type("IndexedFailure") IndexedFailure = Type("IndexedFailure")
// UpdateMessageFlags will change the flags associated with a given message.
// Handle
// Message Index
// Flags
UpdateMessageFlags = Type("UpdateMessageFlags")
// attributes: // attributes:
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"] // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"]
// Error: string describing the error // Error: string describing the error
@ -132,12 +122,6 @@ const (
// Password, NewPassword // Password, NewPassword
ChangePassword = Type("ChangePassword") ChangePassword = Type("ChangePassword")
// Error(err), EventID
ChangePasswordError = Type("ChangePasswordError")
// EventID
ChangePasswordSuccess = Type("ChangePasswordSuccess")
// a group has been successfully added or newly created // a group has been successfully added or newly created
// attributes: // attributes:
// Data [serialized *model.Group] // Data [serialized *model.Group]
@ -149,26 +133,6 @@ const (
// GroupID // GroupID
DeleteGroup = Type("DeleteGroup") DeleteGroup = Type("DeleteGroup")
// request to store a profile-wide attribute (good for e.g. per-profile settings like theme prefs)
// attributes:
// Key [eg "fontcolor"]
// Data [eg "red"]
SetAttribute = Type("SetAttribute")
// request to store a per-contact attribute (e.g. display names for a peer)
// attributes:
// RemotePeer [eg ""]
// Key [eg "nick"]
// Data [eg "erinn"]
SetPeerAttribute = Type("SetPeerAttribute")
// request to store a per-cwtch-group attribute (e.g. display name for a group)
// attributes:
// GroupID [eg ""]
// Key [eg "nick"]
// Data [eg "open privacy board"]
SetGroupAttribute = Type("SetGroupAttribute")
// PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections // PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections
// RemotePeer // RemotePeer
// ConnectionState // ConnectionState
@ -180,9 +144,6 @@ const (
/***** Application client / service messages *****/ /***** Application client / service messages *****/
// ProfileName, Password, Data(tag)
CreatePeer = Type("CreatePeer")
// app: Identity(onion), Created(bool) // app: Identity(onion), Created(bool)
// service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')], Created(bool) // service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')], Created(bool)
NewPeer = Type("NewPeer") NewPeer = Type("NewPeer")
@ -192,20 +153,6 @@ const (
// Identity(onion) // Identity(onion)
PeerDeleted = Type("PeerDeleted") PeerDeleted = Type("PeerDeleted")
// Identity(onion), Data(pluginID)
AddPeerPlugin = Type("AddPeerPlugin")
// Password
LoadProfiles = Type("LoadProfiles")
// Client has reloaded, triggers NewPeer s then ReloadDone
ReloadClient = Type("ReloadClient")
ReloadDone = Type("ReloadDone")
// Identity - Ask service to resend all connection states
ReloadPeer = Type("ReloadPeer")
// Identity(onion) // Identity(onion)
ShutdownPeer = Type("ShutdownPeer") ShutdownPeer = Type("ShutdownPeer")
@ -218,9 +165,6 @@ const (
// Error(err) // Error(err)
AppError = Type("AppError") AppError = Type("AppError")
GetACNStatus = Type("GetACNStatus")
GetACNVersion = Type("GetACNVersion")
// Progress, Status // Progress, Status
ACNStatus = Type("ACNStatus") ACNStatus = Type("ACNStatus")
@ -233,10 +177,6 @@ const (
// Onion: the local onion we attempt to check // Onion: the local onion we attempt to check
NetworkStatus = Type("NetworkError") NetworkStatus = Type("NetworkError")
// Notify the UI that a Server has been added
// Onion = Server Onion
ServerCreated = Type("ServerAdded")
// For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue // For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue
Syn = Type("Syn") Syn = Type("Syn")
Ack = Type("Ack") Ack = Type("Ack")

View File

@ -434,7 +434,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost)
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey)) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey))
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName)
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite})) cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite, event.GroupName: gci.GroupName}))
cp.JoinServer(gci.ServerHost) cp.JoinServer(gci.ServerHost)
} }
return groupConversationID, err return groupConversationID, err

View File

@ -46,7 +46,8 @@ type engine struct {
getValRequests sync.Map // [string]string eventID:Data getValRequests sync.Map // [string]string eventID:Data
// Nextgen Tapir Service // Nextgen Tapir Service
ephemeralServices sync.Map // string(onion) => tapir.Service ephemeralServices map[string]tapir.Service //sync.Map // string(onion) => tapir.Service
ephemeralServicesLock sync.Mutex
// Required for listen(), inaccessible from identity // Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey privateKey ed25519.PrivateKey
@ -72,6 +73,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine := new(engine) engine := new(engine)
engine.identity = identity engine.identity = identity
engine.privateKey = privateKey engine.privateKey = privateKey
engine.ephemeralServices = make(map[string]tapir.Service)
engine.queue = event.NewQueue() engine.queue = event.NewQueue()
go engine.eventHandler() go engine.eventHandler()
@ -150,7 +152,9 @@ func (e *engine) eventHandler() {
} }
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
case event.LeaveServer: case event.LeaveServer:
e.ephemeralServicesLock.Lock()
e.leaveServer(ev.Data[event.GroupServer]) e.leaveServer(ev.Data[event.GroupServer])
e.ephemeralServicesLock.Unlock()
case event.DeleteContact: case event.DeleteContact:
onion := ev.Data[event.RemotePeer] onion := ev.Data[event.RemotePeer]
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
@ -275,14 +279,12 @@ func (e *engine) Shutdown() {
e.shuttingDown = true e.shuttingDown = true
e.service.Shutdown() e.service.Shutdown()
e.ephemeralServices.Range(func(_, service interface{}) bool { e.ephemeralServicesLock.Lock()
connection, ok := service.(*tor.BaseOnionService) defer e.ephemeralServicesLock.Unlock()
if ok { for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service") log.Infof("shutting down ephemeral service")
connection.Shutdown() connection.Shutdown()
} }
return true
})
e.queue.Shutdown() e.queue.Shutdown()
} }
@ -316,10 +318,10 @@ func (e *engine) peerWithOnion(onion string) {
// peerWithTokenServer is the entry point for cwtchPeer - server relationships // peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open. // needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
e.ephemeralServicesLock.Lock()
service, exists := e.ephemeralServices.Load(onion) defer e.ephemeralServicesLock.Unlock()
connection, exists := e.ephemeralServices[onion]
if exists { if exists {
connection := service.(*tor.BaseOnionService)
if conn, err := connection.GetConnection(onion); err == nil { if conn, err := connection.GetConnection(onion); err == nil {
// We are already peered and synced so return... // We are already peered and synced so return...
// This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called // This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called
@ -343,7 +345,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
Y := ristretto255.NewElement() Y := ristretto255.NewElement()
Y.UnmarshalText([]byte(tokenServerY)) Y.UnmarshalText([]byte(tokenServerY))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected))) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
e.ephemeralServices.Store(onion, ephemeralService) e.ephemeralServices[onion] = ephemeralService
// If we are already connected...check if we are authed and issue an auth event // If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless) // (This allows the ui to be stateless)
if connected && err != nil { if connected && err != nil {
@ -486,7 +488,6 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
// sendMessageToGroup attempts to sent the given message to the given group id. // sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) {
// sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.)
// rather than trying to keep all that logic in method we simply back-off and try again // rather than trying to keep all that logic in method we simply back-off and try again
// but if we fail more than 5 times then we report back to the client so they can investigate other options. // but if we fail more than 5 times then we report back to the client so they can investigate other options.
@ -498,12 +499,14 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
return return
} }
es, ok := e.ephemeralServices.Load(server) e.ephemeralServicesLock.Lock()
if es == nil || !ok { ephemeralService, ok := e.ephemeralServices[server]
e.ephemeralServicesLock.Unlock()
if ephemeralService == nil || !ok {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)})) e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
return return
} }
ephemeralService := es.(tapir.Service)
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil { if err == nil {
@ -615,12 +618,15 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte
e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val)) e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val))
} }
// leaveServer disconnects from a server and deletes the ephemeral service
// REQUIREMENTS: must be called inside a block with e.ephemeralServicesLock.Lock()
// can't do it iself because is called from inside peerWithTokenServer which holds the lock
func (e *engine) leaveServer(server string) { func (e *engine) leaveServer(server string) {
es, ok := e.ephemeralServices.Load(server) es, ok := e.ephemeralServices[server]
if ok { if ok {
ephemeralService := es.(tapir.Service) ephemeralService := es.(tapir.Service)
ephemeralService.Shutdown() ephemeralService.Shutdown()
e.ephemeralServices.Delete(server) delete(e.ephemeralServices, server)
} }
} }

View File

@ -13,7 +13,6 @@ import (
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4" _ "github.com/mutecomm/go-sqlcipher/v4"
@ -36,18 +35,18 @@ var (
func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) { func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name) peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for { for {
fmt.Printf("%v checking connection...\n", peerName) log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr) state := peer.GetPeerState(addr)
fmt.Printf("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state) log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state)
if state == connections.FAILED { if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr) t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
} }
if state != target { if state != target {
fmt.Printf("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state]) log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
} else { } else {
fmt.Printf("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr) log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break break
} }
} }
@ -106,41 +105,41 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** cwtchPeer setup ***** // ***** cwtchPeer setup *****
fmt.Println("Creating Alice...") log.Infoln("Creating Alice...")
app.CreateTaggedPeer("Alice", "asdfasdf", "test") app.CreateTaggedPeer("Alice", "asdfasdf", "test")
fmt.Println("Creating Bob...") log.Infoln("Creating Bob...")
app.CreateTaggedPeer("Bob", "asdfasdf", "test") app.CreateTaggedPeer("Bob", "asdfasdf", "test")
fmt.Println("Creating Carol...") log.Infoln("Creating Carol...")
app.CreateTaggedPeer("Carol", "asdfasdf", "test") app.CreateTaggedPeer("Carol", "asdfasdf", "test")
alice := utils.WaitGetPeer(app, "Alice") alice := utils.WaitGetPeer(app, "Alice")
fmt.Println("Alice created:", alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := utils.WaitGetPeer(app, "Bob") bob := utils.WaitGetPeer(app, "Bob")
fmt.Println("Bob created:", bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := utils.WaitGetPeer(app, "Carol") carol := utils.WaitGetPeer(app, "Carol")
fmt.Println("Carol created:", carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
app.LaunchPeers() app.LaunchPeers()
waitTime := time.Duration(60) * time.Second waitTime := time.Duration(60) * time.Second
t.Logf("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime) log.Infof("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime)
time.Sleep(waitTime) time.Sleep(waitTime)
numGoRoutinesPostPeerStart := runtime.NumGoroutine() numGoRoutinesPostPeerStart := runtime.NumGoroutine()
fmt.Println("** Wait Done!") log.Infof("** Wait Done!")
// ***** Peering, server joining, group creation / invite ***** // ***** Peering, server joining, group creation / invite *****
fmt.Println("Alice peering with Bob...") log.Infoln("Alice peering with Bob...")
// Simulate Alice Adding Bob // Simulate Alice Adding Bob
alice2bobConversationID, err := alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) alice2bobConversationID, err := alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil { if err != nil {
@ -151,7 +150,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
t.Fatalf("error adding conversaiton %v", bob2aliceConversationID) t.Fatalf("error adding conversaiton %v", bob2aliceConversationID)
} }
t.Logf("Alice peering with Carol...") log.Infof("Alice peering with Carol...")
// Simulate Alice Adding Carol // Simulate Alice Adding Carol
alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true) alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil { if err != nil {
@ -170,7 +169,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED) waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED) waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
t.Logf("Alice and Bob getVal public.name...") log.Infof("Alice and Bob getVal public.name...")
alice.SendScopedZonedGetValToContact(alice2bobConversationID, attr.PublicScope, attr.ProfileZone, constants.Name) alice.SendScopedZonedGetValToContact(alice2bobConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
bob.SendScopedZonedGetValToContact(bob2aliceConversationID, attr.PublicScope, attr.ProfileZone, constants.Name) bob.SendScopedZonedGetValToContact(bob2aliceConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
@ -186,13 +185,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err) t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err)
} }
fmt.Printf("Bob has alice's name as '%v'\n", aliceName) log.Infof("Bob has alice's name as '%v'\n", aliceName)
bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || bobName != "Bob" { if err != nil || bobName != "Bob" {
t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err) t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err)
} }
fmt.Printf("Alice has bob's name as '%v'\n", bobName) log.Infof("Alice has bob's name as '%v'\n", bobName)
aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
@ -203,37 +202,33 @@ func TestCwtchPeerIntegration(t *testing.T) {
if err != nil || carolName != "Carol" { if err != nil || carolName != "Carol" {
t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err) t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err)
} }
fmt.Printf("Alice has carol's name as '%v'\n", carolName) log.Infof("Alice has carol's name as '%v'\n", carolName)
// Group Testing // Group Testing
// Simulate Alice Creating a Group // Simulate Alice Creating a Group
fmt.Println("Alice joining server...") log.Infoln("Alice joining server...")
if _, err := alice.AddServer(string(serverKeyBundle)); err != nil { if _, err := alice.AddServer(string(serverKeyBundle)); err != nil {
t.Fatalf("Failed to Add Server Bundle %v", err) t.Fatalf("Failed to Add Server Bundle %v", err)
} }
bob.AddServer(string(serverKeyBundle)) // Ading here will require resync
carol.AddServer(string(serverKeyBundle)) carol.AddServer(string(serverKeyBundle))
t.Logf("Waiting for alice to join server...") log.Infof("Waiting for alice to join server...")
err = alice.JoinServer(ServerAddr)
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
waitForConnection(t, alice, ServerAddr, connections.SYNCED) waitForConnection(t, alice, ServerAddr, connections.SYNCED)
// Creating a Group // Creating a Group
t.Logf("Creating group on %v...", ServerAddr) log.Infof("Creating group on %v...", ServerAddr)
aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr) aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr)
t.Logf("Created group: %v!\n", aliceGroupConversationID) log.Infof("Created group: %v!\n", aliceGroupConversationID)
if err != nil { if err != nil {
t.Errorf("Failed to init group: %v", err) t.Errorf("Failed to init group: %v", err)
return return
} }
// Invites // Invites
fmt.Println("Alice inviting Bob to group...") log.Infoln("Alice inviting Bob to group...")
err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID) err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID)
if err != nil { if err != nil {
t.Fatalf("Error for Alice inviting Bob to group: %v", err) t.Fatalf("Error for Alice inviting Bob to group: %v", err)
@ -242,25 +237,22 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Alice invites Bob to the Group... // Alice invites Bob to the Group...
message, _, err := bob.GetChannelMessage(bob2aliceConversationID, 0, 1) message, _, err := bob.GetChannelMessage(bob2aliceConversationID, 0, 1)
t.Logf("Alice message to Bob %v %v", message, err) log.Infof("Alice message to Bob %v %v", message, err)
var overlayMessage model.MessageWrapper var overlayMessage model.MessageWrapper
json.Unmarshal([]byte(message), &overlayMessage) json.Unmarshal([]byte(message), &overlayMessage)
t.Logf("Parsed Overlay Message: %v", overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = bob.ImportBundle(overlayMessage.Data) err = bob.ImportBundle(overlayMessage.Data)
t.Logf("Result of Bob Importing the Bundle from Alice: %v", err) log.Infof("Result of Bob Importing the Bundle from Alice: %v", err)
t.Logf("Waiting for Bob to join connect to group server...") log.Infof("Waiting for Bob to join connect to group server...")
err = bob.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus time.Sleep(2 * time.Second)
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
bobGroupConversationID := 3 bobGroupConversationID := 3
waitForConnection(t, bob, ServerAddr, connections.SYNCED) waitForConnection(t, bob, ServerAddr, connections.SYNCED)
numGoRoutinesPostServerConnect := runtime.NumGoroutine() numGoRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Conversation ***** // ***** Conversation *****
t.Logf("Starting conversation in group...") log.Infof("Starting conversation in group...")
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0]) checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0])
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
@ -268,23 +260,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Alice invites Bob to the Group... // Alice invites Bob to the Group...
message, _, err = carol.GetChannelMessage(carol2aliceConversationID, 0, 1) message, _, err = carol.GetChannelMessage(carol2aliceConversationID, 0, 1)
t.Logf("Alice message to Carol %v %v", message, err) log.Infof("Alice message to Carol %v %v", message, err)
json.Unmarshal([]byte(message), &overlayMessage) json.Unmarshal([]byte(message), &overlayMessage)
t.Logf("Parsed Overlay Message: %v", overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = carol.ImportBundle(overlayMessage.Data) err = carol.ImportBundle(overlayMessage.Data)
t.Logf("Result of Carol Importing the Bundle from Alice: %v", err) log.Infof("Result of Carol Importing the Bundle from Alice: %v", err)
t.Logf("Waiting for Carol to join connect to group server...") log.Infof("Waiting for Carol to join connect to group server...")
err = carol.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus carol.ResyncServer(ServerAddr)
if err != nil { time.Sleep(2 * time.Second)
t.Fatalf("carol cannot join server %v %v", ServerAddr, err)
}
carolGroupConversationID := 3 carolGroupConversationID := 3
waitForConnection(t, carol, ServerAddr, connections.SYNCED) waitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine() numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
t.Logf("Shutting down Alice...") messages, err := alice.GetMostRecentMessages(aliceGroupConversationID, 0, 0, 20)
log.Infof("Alice messages")
for i, msg := range messages {
log.Infof("alice message %v: %v", i, msg)
}
// Check Alice Timeline // Check Alice Timeline
checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0]) checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0])
@ -292,6 +286,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1]) checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1])
checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1]) checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1])
log.Infof("Shutting down Alice...")
app.ShutdownPeer(alice.GetOnion()) app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostAlice := runtime.NumGoroutine() numGoRoutinesPostAlice := runtime.NumGoroutine()
@ -300,6 +295,11 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[2])
time.Sleep(time.Second * 30) time.Sleep(time.Second * 30)
messages, err = bob.GetMostRecentMessages(aliceGroupConversationID, 0, 0, 20)
log.Infof("Bob messages")
for i, msg := range messages {
log.Infof("bob message %v: %v", i, msg)
}
// Check Bob Timeline // Check Bob Timeline
checkMessage(t, bob, bobGroupConversationID, 1, aliceLines[0]) checkMessage(t, bob, bobGroupConversationID, 1, aliceLines[0])
checkMessage(t, bob, bobGroupConversationID, 2, bobLines[0]) checkMessage(t, bob, bobGroupConversationID, 2, bobLines[0])
@ -309,6 +309,11 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkMessage(t, bob, bobGroupConversationID, 6, bobLines[2]) checkMessage(t, bob, bobGroupConversationID, 6, bobLines[2])
// Check Carol Timeline // Check Carol Timeline
messages, err = carol.GetMostRecentMessages(aliceGroupConversationID, 0, 0, 20)
log.Infof("Carol messages")
for i, msg := range messages {
log.Infof("carol message %v: %v", i, msg)
}
checkMessage(t, carol, carolGroupConversationID, 1, aliceLines[0]) checkMessage(t, carol, carolGroupConversationID, 1, aliceLines[0])
checkMessage(t, carol, carolGroupConversationID, 2, bobLines[0]) checkMessage(t, carol, carolGroupConversationID, 2, bobLines[0])
checkMessage(t, carol, carolGroupConversationID, 3, aliceLines[1]) checkMessage(t, carol, carolGroupConversationID, 3, aliceLines[1])
@ -316,25 +321,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkMessage(t, carol, carolGroupConversationID, 5, carolLines[0]) checkMessage(t, carol, carolGroupConversationID, 5, carolLines[0])
checkMessage(t, carol, carolGroupConversationID, 6, bobLines[2]) checkMessage(t, carol, carolGroupConversationID, 6, bobLines[2])
t.Logf("Shutting down Bob...") log.Infof("Shutting down Bob...")
app.ShutdownPeer(bob.GetOnion()) app.ShutdownPeer(bob.GetOnion())
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine() numGoRoutinesPostBob := runtime.NumGoroutine()
t.Logf("Shutting down Carol...") log.Infof("Shutting down Carol...")
app.ShutdownPeer(carol.GetOnion()) app.ShutdownPeer(carol.GetOnion())
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine() numGoRoutinesPostCarol := runtime.NumGoroutine()
t.Logf("Shutting down apps...") log.Infof("Shutting down apps...")
fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine()) log.Infof("app Shutdown: %v\n", runtime.NumGoroutine())
app.Shutdown() app.Shutdown()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
t.Logf("Done shutdown: %v\n", runtime.NumGoroutine()) log.Infof("Done shutdown: %v\n", runtime.NumGoroutine())
t.Logf("Shutting down ACN...") log.Infof("Shutting down ACN...")
acn.Restart() // kill all active tor connections... acn.Restart() // kill all active tor connections...
// acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock... // acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock...
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
@ -345,7 +350,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Very useful if we are leaking any. // Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Logf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)
@ -359,7 +364,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Utility function for sending a message from a peer to a group // Utility function for sending a message from a peer to a group
func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) {
name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
t.Logf("%v> %v\n", name, message) log.Infof("%v> %v\n", name, message)
err := profile.SendMessage(id, message) err := profile.SendMessage(id, message)
if err != nil { if err != nil {
t.Fatalf("Alice failed to send a message to the group: %v", err) t.Fatalf("Alice failed to send a message to the group: %v", err)