Merge branch 'master' into ipreview
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2021-12-19 00:16:09 +00:00
commit fa1729d08d
5 changed files with 150 additions and 199 deletions

View File

@ -9,7 +9,6 @@ import (
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage" "cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"io/ioutil" "io/ioutil"
@ -19,15 +18,10 @@ import (
"sync" "sync"
) )
type applicationCore struct {
eventBuses map[string]event.Manager
directory string
coremutex sync.Mutex
}
type application struct { type application struct {
applicationCore eventBuses map[string]event.Manager
directory string
coremutex sync.Mutex
appletPeers appletPeers
appletACN appletACN
appletPlugins appletPlugins
@ -60,30 +54,18 @@ type Application interface {
// LoadProfileFn is the function signature for a function in an app that loads a profile // LoadProfileFn is the function signature for a function in an app that loads a profile
type LoadProfileFn func(profile peer.CwtchPeer) type LoadProfileFn func(profile peer.CwtchPeer)
func newAppCore(appDirectory string) *applicationCore {
appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory}
os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
return appCore
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager // NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application { func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory) log.Debugf("NewApp(%v)\n", appDirectory)
app := &application{engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
app.appletPeers.init() app.appletPeers.init()
app.appletACN.init(acn, app.getACNStatusHandler()) app.appletACN.init(acn, app.getACNStatusHandler())
return app return app
} }
func (ac *applicationCore) DeletePeer(onion string) {
ac.coremutex.Lock()
defer ac.coremutex.Unlock()
ac.eventBuses[onion].Shutdown()
delete(ac.eventBuses, onion)
}
func (app *application) CreateTaggedPeer(name string, password string, tag string) { func (app *application) CreateTaggedPeer(name string, password string, tag string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
@ -127,7 +109,11 @@ func (app *application) DeletePeer(onion string, password string) {
app.peers[onion].Delete() app.peers[onion].Delete()
delete(app.peers, onion) delete(app.peers, onion)
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.applicationCore.DeletePeer(onion)
app.coremutex.Lock()
defer app.coremutex.Unlock()
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
log.Debugf("Delete peer for %v Done\n", onion) log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
@ -145,63 +131,75 @@ func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
} }
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error { func (app *application) LoadProfiles(password string) {
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles")) count := 0
migrating := false
files, err := ioutil.ReadDir(path.Join(app.directory, "profiles"))
if err != nil { if err != nil {
return fmt.Errorf("error: cannot read profiles directory: %v", err) log.Errorf("error: cannot read profiles directory: %v", err)
return
} }
for _, file := range files { for _, file := range files {
// Attempt to load an encrypted database // Attempt to load an encrypted database
profileDirectory := path.Join(ac.directory, "profiles", file.Name()) profileDirectory := path.Join(app.directory, "profiles", file.Name())
profile, err := peer.FromEncryptedDatabase(profileDirectory, password) profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
loaded := false
if err == nil { if err == nil {
// return the load the profile... // return the load the profile...
log.Infof("loading profile from new-type storage database...") log.Infof("loading profile from new-type storage database...")
loadProfileFn(profile) loaded = app.installProfile(profile)
} else { // On failure attempt to load a legacy profile } else { // On failure attempt to load a legacy profile
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
if err != nil { if err != nil {
continue continue
} }
log.Infof("found legacy profile. importing to new database structure...") log.Infof("found legacy profile. importing to new database structure...")
legacyProfile := profileStore.GetProfileCopy(timeline) legacyProfile := profileStore.GetProfileCopy(true)
if !migrating {
migrating = true
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
}
cps, err := peer.CreateEncryptedStore(profileDirectory, password) cps, err := peer.CreateEncryptedStore(profileDirectory, password)
if err != nil { if err != nil {
log.Errorf("error creating encrypted store: %v", err) log.Errorf("error creating encrypted store: %v", err)
} }
profile := peer.ImportLegacyProfile(legacyProfile, cps) profile := peer.ImportLegacyProfile(legacyProfile, cps)
loadProfileFn(profile) loaded = app.installProfile(profile)
}
if loaded {
count++
} }
} }
return nil
}
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) {
count := 0
app.applicationCore.LoadProfiles(password, true, func(profile peer.CwtchPeer) {
app.appmutex.Lock()
// Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
count++
} else {
// Otherwise shutdown the connections
profile.Shutdown()
}
app.appmutex.Unlock()
})
if count == 0 { if count == 0 {
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
app.appBus.Publish(message) app.appBus.Publish(message)
} }
if migrating {
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
}
}
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.appmutex.Lock()
defer app.appmutex.Unlock()
// Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true
}
// Otherwise shutdown the connections
profile.Shutdown()
return false
} }
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
@ -210,8 +208,8 @@ func (app *application) GetPrimaryBus() event.Manager {
} }
// GetEventBus returns a cwtchPeer's event bus // GetEventBus returns a cwtchPeer's event bus
func (ac *applicationCore) GetEventBus(onion string) event.Manager { func (app *application) GetEventBus(onion string) event.Manager {
if manager, ok := ac.eventBuses[onion]; ok { if manager, ok := app.eventBuses[onion]; ok {
return manager return manager
} }
return nil return nil

View File

@ -55,10 +55,6 @@ const (
// GroupID: groupID (allows them to fetch from the peer) // GroupID: groupID (allows them to fetch from the peer)
NewGroup = Type("NewGroup") NewGroup = Type("NewGroup")
// GroupID
AcceptGroupInvite = Type("AcceptGroupInvite")
RejectGroupInvite = Type("RejectGroupInvite")
SendMessageToGroup = Type("SendMessagetoGroup") SendMessageToGroup = Type("SendMessagetoGroup")
//Ciphertext, Signature: //Ciphertext, Signature:
@ -111,12 +107,6 @@ const (
// RemotePeer: The peer associated with the acknowledgement // RemotePeer: The peer associated with the acknowledgement
IndexedFailure = Type("IndexedFailure") IndexedFailure = Type("IndexedFailure")
// UpdateMessageFlags will change the flags associated with a given message.
// Handle
// Message Index
// Flags
UpdateMessageFlags = Type("UpdateMessageFlags")
// attributes: // attributes:
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"] // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"]
// Error: string describing the error // Error: string describing the error
@ -149,26 +139,6 @@ const (
// GroupID // GroupID
DeleteGroup = Type("DeleteGroup") DeleteGroup = Type("DeleteGroup")
// request to store a profile-wide attribute (good for e.g. per-profile settings like theme prefs)
// attributes:
// Key [eg "fontcolor"]
// Data [eg "red"]
SetAttribute = Type("SetAttribute")
// request to store a per-contact attribute (e.g. display names for a peer)
// attributes:
// RemotePeer [eg ""]
// Key [eg "nick"]
// Data [eg "erinn"]
SetPeerAttribute = Type("SetPeerAttribute")
// request to store a per-cwtch-group attribute (e.g. display name for a group)
// attributes:
// GroupID [eg ""]
// Key [eg "nick"]
// Data [eg "open privacy board"]
SetGroupAttribute = Type("SetGroupAttribute")
// PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections // PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections
// RemotePeer // RemotePeer
// ConnectionState // ConnectionState
@ -180,9 +150,6 @@ const (
/***** Application client / service messages *****/ /***** Application client / service messages *****/
// ProfileName, Password, Data(tag)
CreatePeer = Type("CreatePeer")
// app: Identity(onion), Created(bool) // app: Identity(onion), Created(bool)
// service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')], Created(bool) // service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')], Created(bool)
NewPeer = Type("NewPeer") NewPeer = Type("NewPeer")
@ -192,20 +159,6 @@ const (
// Identity(onion) // Identity(onion)
PeerDeleted = Type("PeerDeleted") PeerDeleted = Type("PeerDeleted")
// Identity(onion), Data(pluginID)
AddPeerPlugin = Type("AddPeerPlugin")
// Password
LoadProfiles = Type("LoadProfiles")
// Client has reloaded, triggers NewPeer s then ReloadDone
ReloadClient = Type("ReloadClient")
ReloadDone = Type("ReloadDone")
// Identity - Ask service to resend all connection states
ReloadPeer = Type("ReloadPeer")
// Identity(onion) // Identity(onion)
ShutdownPeer = Type("ShutdownPeer") ShutdownPeer = Type("ShutdownPeer")
@ -218,9 +171,6 @@ const (
// Error(err) // Error(err)
AppError = Type("AppError") AppError = Type("AppError")
GetACNStatus = Type("GetACNStatus")
GetACNVersion = Type("GetACNVersion")
// Progress, Status // Progress, Status
ACNStatus = Type("ACNStatus") ACNStatus = Type("ACNStatus")
@ -233,10 +183,6 @@ const (
// Onion: the local onion we attempt to check // Onion: the local onion we attempt to check
NetworkStatus = Type("NetworkError") NetworkStatus = Type("NetworkError")
// Notify the UI that a Server has been added
// Onion = Server Onion
ServerCreated = Type("ServerAdded")
// For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue // For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue
Syn = Type("Syn") Syn = Type("Syn")
Ack = Type("Ack") Ack = Type("Ack")
@ -256,6 +202,9 @@ const (
// Profile Attribute Event // Profile Attribute Event
UpdatedProfileAttribute = Type("UpdatedProfileAttribute") UpdatedProfileAttribute = Type("UpdatedProfileAttribute")
StartingStorageMiragtion = Type("StartingStorageMigration")
DoneStorageMigration = Type("DoneStorageMigration")
) )
// Field defines common event attributes // Field defines common event attributes
@ -279,6 +228,7 @@ const (
ConversationID = Field("ConversationID") ConversationID = Field("ConversationID")
GroupID = Field("GroupID") GroupID = Field("GroupID")
GroupServer = Field("GroupServer") GroupServer = Field("GroupServer")
GroupName = Field("GroupName")
ServerTokenY = Field("ServerTokenY") ServerTokenY = Field("ServerTokenY")
ServerTokenOnion = Field("ServerTokenOnion") ServerTokenOnion = Field("ServerTokenOnion")
GroupInvite = Field("GroupInvite") GroupInvite = Field("GroupInvite")
@ -337,12 +287,6 @@ const (
PasswordMatchError = "Password did not match" PasswordMatchError = "Password did not match"
) )
// Values to be suplied in event.NewPeer for Status
const (
StorageRunning = "running"
StorageNew = "new"
)
// Defining Protocol Contexts // Defining Protocol Contexts
const ( const (
ContextAck = "im.cwtch.acknowledgement" ContextAck = "im.cwtch.acknowledgement"

View File

@ -434,7 +434,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost)
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey)) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey))
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName)
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite})) cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite, event.GroupName: gci.GroupName}))
cp.JoinServer(gci.ServerHost) cp.JoinServer(gci.ServerHost)
} }
return groupConversationID, err return groupConversationID, err
@ -588,6 +588,7 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
event.ConversationID: strconv.Itoa(conversationID), event.ConversationID: strconv.Itoa(conversationID),
event.GroupID: group.GroupID, event.GroupID: group.GroupID,
event.GroupServer: group.GroupServer, event.GroupServer: group.GroupServer,
event.GroupName: name,
})) }))
return conversationID, nil return conversationID, nil
} }
@ -652,6 +653,7 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) {
cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v) cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v)
} }
cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))), serverSpecification) cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))), serverSpecification)
cp.JoinServer(onion)
return onion, err return onion, err
} }
return "", err return "", err

View File

@ -24,6 +24,11 @@ import (
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
) )
type connectionLockedService struct {
service tapir.Service
connectingLock sync.Mutex
}
type engine struct { type engine struct {
queue event.Queue queue event.Queue
@ -46,7 +51,8 @@ type engine struct {
getValRequests sync.Map // [string]string eventID:Data getValRequests sync.Map // [string]string eventID:Data
// Nextgen Tapir Service // Nextgen Tapir Service
ephemeralServices sync.Map // string(onion) => tapir.Service ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service
ephemeralServicesLock sync.Mutex
// Required for listen(), inaccessible from identity // Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey privateKey ed25519.PrivateKey
@ -72,6 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine := new(engine) engine := new(engine)
engine.identity = identity engine.identity = identity
engine.privateKey = privateKey engine.privateKey = privateKey
engine.ephemeralServices = make(map[string]*connectionLockedService)
engine.queue = event.NewQueue() engine.queue = event.NewQueue()
go engine.eventHandler() go engine.eventHandler()
@ -275,14 +282,12 @@ func (e *engine) Shutdown() {
e.shuttingDown = true e.shuttingDown = true
e.service.Shutdown() e.service.Shutdown()
e.ephemeralServices.Range(func(_, service interface{}) bool { e.ephemeralServicesLock.Lock()
connection, ok := service.(*tor.BaseOnionService) defer e.ephemeralServicesLock.Unlock()
if ok { for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service") log.Infof("shutting down ephemeral service")
connection.Shutdown() connection.service.Shutdown()
} }
return true
})
e.queue.Shutdown() e.queue.Shutdown()
} }
@ -316,23 +321,31 @@ func (e *engine) peerWithOnion(onion string) {
// peerWithTokenServer is the entry point for cwtchPeer - server relationships // peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open. // needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
e.ephemeralServicesLock.Lock()
connectionService, exists := e.ephemeralServices[onion]
service, exists := e.ephemeralServices.Load(onion) if exists && connectionService.service != nil {
if exists { if conn, err := connectionService.service.GetConnection(onion); err == nil {
connection := service.(*tor.BaseOnionService)
if conn, err := connection.GetConnection(onion); err == nil {
// We are already peered and synced so return... // We are already peered and synced so return...
// This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called // This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called
// in CwtchPeer. // in CwtchPeer.
if !conn.IsClosed() && len(lastKnownSignature) != 0 { if !conn.IsClosed() && len(lastKnownSignature) != 0 {
e.ephemeralServicesLock.Unlock()
return return
} }
// Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)... // Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)...
e.leaveServer(onion) connectionService.service.Shutdown()
} }
// Otherwise...let's reconnect // Otherwise...let's reconnect
} }
connectionService = &connectionLockedService{}
e.ephemeralServices[onion] = connectionService
connectionService.connectingLock.Lock()
defer connectionService.connectingLock.Unlock()
e.ephemeralServicesLock.Unlock()
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
e.ignoreOnShutdown(e.serverConnecting)(onion) e.ignoreOnShutdown(e.serverConnecting)(onion)
// Create a new ephemeral service for this connection // Create a new ephemeral service for this connection
@ -343,7 +356,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
Y := ristretto255.NewElement() Y := ristretto255.NewElement()
Y.UnmarshalText([]byte(tokenServerY)) Y.UnmarshalText([]byte(tokenServerY))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected))) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
e.ephemeralServices.Store(onion, ephemeralService) e.ephemeralServicesLock.Lock()
e.ephemeralServices[onion].service = ephemeralService
e.ephemeralServicesLock.Unlock()
// If we are already connected...check if we are authed and issue an auth event // If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless) // (This allows the ui to be stateless)
if connected && err != nil { if connected && err != nil {
@ -486,7 +501,6 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
// sendMessageToGroup attempts to sent the given message to the given group id. // sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) {
// sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.)
// rather than trying to keep all that logic in method we simply back-off and try again // rather than trying to keep all that logic in method we simply back-off and try again
// but if we fail more than 5 times then we report back to the client so they can investigate other options. // but if we fail more than 5 times then we report back to the client so they can investigate other options.
@ -498,14 +512,16 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
return return
} }
es, ok := e.ephemeralServices.Load(server) e.ephemeralServicesLock.Lock()
if es == nil || !ok { ephemeralService, ok := e.ephemeralServices[server]
e.ephemeralServicesLock.Unlock()
if ephemeralService == nil || !ok {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)})) e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
return return
} }
ephemeralService := es.(tapir.Service)
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil { if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient) tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok { if ok {
@ -615,12 +631,14 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte
e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val)) e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val))
} }
// leaveServer disconnects from a server and deletes the ephemeral service
func (e *engine) leaveServer(server string) { func (e *engine) leaveServer(server string) {
es, ok := e.ephemeralServices.Load(server) e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock()
ephemeralService, ok := e.ephemeralServices[server]
if ok { if ok {
ephemeralService := es.(tapir.Service) ephemeralService.service.Shutdown()
ephemeralService.Shutdown() delete(e.ephemeralServices, server)
e.ephemeralServices.Delete(server)
} }
} }

View File

@ -13,7 +13,6 @@ import (
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4" _ "github.com/mutecomm/go-sqlcipher/v4"
@ -36,18 +35,18 @@ var (
func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) { func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name) peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for { for {
fmt.Printf("%v checking connection...\n", peerName) log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr) state := peer.GetPeerState(addr)
fmt.Printf("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state) log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state)
if state == connections.FAILED { if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr) t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
} }
if state != target { if state != target {
fmt.Printf("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state]) log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
continue continue
} else { } else {
fmt.Printf("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr) log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break break
} }
} }
@ -106,41 +105,41 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** cwtchPeer setup ***** // ***** cwtchPeer setup *****
fmt.Println("Creating Alice...") log.Infoln("Creating Alice...")
app.CreateTaggedPeer("Alice", "asdfasdf", "test") app.CreateTaggedPeer("Alice", "asdfasdf", "test")
fmt.Println("Creating Bob...") log.Infoln("Creating Bob...")
app.CreateTaggedPeer("Bob", "asdfasdf", "test") app.CreateTaggedPeer("Bob", "asdfasdf", "test")
fmt.Println("Creating Carol...") log.Infoln("Creating Carol...")
app.CreateTaggedPeer("Carol", "asdfasdf", "test") app.CreateTaggedPeer("Carol", "asdfasdf", "test")
alice := utils.WaitGetPeer(app, "Alice") alice := utils.WaitGetPeer(app, "Alice")
fmt.Println("Alice created:", alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := utils.WaitGetPeer(app, "Bob") bob := utils.WaitGetPeer(app, "Bob")
fmt.Println("Bob created:", bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := utils.WaitGetPeer(app, "Carol") carol := utils.WaitGetPeer(app, "Carol")
fmt.Println("Carol created:", carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
app.LaunchPeers() app.LaunchPeers()
waitTime := time.Duration(60) * time.Second waitTime := time.Duration(60) * time.Second
t.Logf("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime) log.Infof("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime)
time.Sleep(waitTime) time.Sleep(waitTime)
numGoRoutinesPostPeerStart := runtime.NumGoroutine() numGoRoutinesPostPeerStart := runtime.NumGoroutine()
fmt.Println("** Wait Done!") log.Infof("** Wait Done!")
// ***** Peering, server joining, group creation / invite ***** // ***** Peering, server joining, group creation / invite *****
fmt.Println("Alice peering with Bob...") log.Infoln("Alice peering with Bob...")
// Simulate Alice Adding Bob // Simulate Alice Adding Bob
alice2bobConversationID, err := alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) alice2bobConversationID, err := alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil { if err != nil {
@ -151,7 +150,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
t.Fatalf("error adding conversaiton %v", bob2aliceConversationID) t.Fatalf("error adding conversaiton %v", bob2aliceConversationID)
} }
t.Logf("Alice peering with Carol...") log.Infof("Alice peering with Carol...")
// Simulate Alice Adding Carol // Simulate Alice Adding Carol
alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true) alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil { if err != nil {
@ -170,7 +169,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED) waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED) waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
t.Logf("Alice and Bob getVal public.name...") log.Infof("Alice and Bob getVal public.name...")
alice.SendScopedZonedGetValToContact(alice2bobConversationID, attr.PublicScope, attr.ProfileZone, constants.Name) alice.SendScopedZonedGetValToContact(alice2bobConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
bob.SendScopedZonedGetValToContact(bob2aliceConversationID, attr.PublicScope, attr.ProfileZone, constants.Name) bob.SendScopedZonedGetValToContact(bob2aliceConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
@ -186,13 +185,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err) t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err)
} }
fmt.Printf("Bob has alice's name as '%v'\n", aliceName) log.Infof("Bob has alice's name as '%v'\n", aliceName)
bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || bobName != "Bob" { if err != nil || bobName != "Bob" {
t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err) t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err)
} }
fmt.Printf("Alice has bob's name as '%v'\n", bobName) log.Infof("Alice has bob's name as '%v'\n", bobName)
aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
@ -203,37 +202,33 @@ func TestCwtchPeerIntegration(t *testing.T) {
if err != nil || carolName != "Carol" { if err != nil || carolName != "Carol" {
t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err) t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err)
} }
fmt.Printf("Alice has carol's name as '%v'\n", carolName) log.Infof("Alice has carol's name as '%v'\n", carolName)
// Group Testing // Group Testing
// Simulate Alice Creating a Group // Simulate Alice Creating a Group
fmt.Println("Alice joining server...") log.Infoln("Alice joining server...")
if _, err := alice.AddServer(string(serverKeyBundle)); err != nil { if _, err := alice.AddServer(string(serverKeyBundle)); err != nil {
t.Fatalf("Failed to Add Server Bundle %v", err) t.Fatalf("Failed to Add Server Bundle %v", err)
} }
bob.AddServer(string(serverKeyBundle)) // Ading here will require resync
carol.AddServer(string(serverKeyBundle)) carol.AddServer(string(serverKeyBundle))
t.Logf("Waiting for alice to join server...") log.Infof("Waiting for alice to join server...")
err = alice.JoinServer(ServerAddr)
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
waitForConnection(t, alice, ServerAddr, connections.SYNCED) waitForConnection(t, alice, ServerAddr, connections.SYNCED)
// Creating a Group // Creating a Group
t.Logf("Creating group on %v...", ServerAddr) log.Infof("Creating group on %v...", ServerAddr)
aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr) aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr)
t.Logf("Created group: %v!\n", aliceGroupConversationID) log.Infof("Created group: %v!\n", aliceGroupConversationID)
if err != nil { if err != nil {
t.Errorf("Failed to init group: %v", err) t.Errorf("Failed to init group: %v", err)
return return
} }
// Invites // Invites
fmt.Println("Alice inviting Bob to group...") log.Infoln("Alice inviting Bob to group...")
err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID) err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID)
if err != nil { if err != nil {
t.Fatalf("Error for Alice inviting Bob to group: %v", err) t.Fatalf("Error for Alice inviting Bob to group: %v", err)
@ -242,25 +237,22 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Alice invites Bob to the Group... // Alice invites Bob to the Group...
message, _, err := bob.GetChannelMessage(bob2aliceConversationID, 0, 1) message, _, err := bob.GetChannelMessage(bob2aliceConversationID, 0, 1)
t.Logf("Alice message to Bob %v %v", message, err) log.Infof("Alice message to Bob %v %v", message, err)
var overlayMessage model.MessageWrapper var overlayMessage model.MessageWrapper
json.Unmarshal([]byte(message), &overlayMessage) json.Unmarshal([]byte(message), &overlayMessage)
t.Logf("Parsed Overlay Message: %v", overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = bob.ImportBundle(overlayMessage.Data) err = bob.ImportBundle(overlayMessage.Data)
t.Logf("Result of Bob Importing the Bundle from Alice: %v", err) log.Infof("Result of Bob Importing the Bundle from Alice: %v", err)
t.Logf("Waiting for Bob to join connect to group server...") log.Infof("Waiting for Bob to join connect to group server...")
err = bob.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus time.Sleep(2 * time.Second)
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
bobGroupConversationID := 3 bobGroupConversationID := 3
waitForConnection(t, bob, ServerAddr, connections.SYNCED) waitForConnection(t, bob, ServerAddr, connections.SYNCED)
numGoRoutinesPostServerConnect := runtime.NumGoroutine() numGoRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Conversation ***** // ***** Conversation *****
t.Logf("Starting conversation in group...") log.Infof("Starting conversation in group...")
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0]) checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0])
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
@ -268,30 +260,27 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Alice invites Bob to the Group... // Alice invites Bob to the Group...
message, _, err = carol.GetChannelMessage(carol2aliceConversationID, 0, 1) message, _, err = carol.GetChannelMessage(carol2aliceConversationID, 0, 1)
t.Logf("Alice message to Carol %v %v", message, err) log.Infof("Alice message to Carol %v %v", message, err)
json.Unmarshal([]byte(message), &overlayMessage) json.Unmarshal([]byte(message), &overlayMessage)
t.Logf("Parsed Overlay Message: %v", overlayMessage) log.Infof("Parsed Overlay Message: %v", overlayMessage)
err = carol.ImportBundle(overlayMessage.Data) err = carol.ImportBundle(overlayMessage.Data)
t.Logf("Result of Carol Importing the Bundle from Alice: %v", err) log.Infof("Result of Carol Importing the Bundle from Alice: %v", err)
t.Logf("Waiting for Carol to join connect to group server...") log.Infof("Waiting for Carol to join connect to group server...")
err = carol.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus carol.ResyncServer(ServerAddr)
if err != nil { time.Sleep(2 * time.Second)
t.Fatalf("carol cannot join server %v %v", ServerAddr, err)
}
carolGroupConversationID := 3 carolGroupConversationID := 3
waitForConnection(t, carol, ServerAddr, connections.SYNCED) waitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine() numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
t.Logf("Shutting down Alice...")
// Check Alice Timeline // Check Alice Timeline
checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0]) checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0])
checkMessage(t, alice, aliceGroupConversationID, 2, bobLines[0]) checkMessage(t, alice, aliceGroupConversationID, 2, bobLines[0])
checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1]) checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1])
checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1]) checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1])
log.Infof("Shutting down Alice...")
app.ShutdownPeer(alice.GetOnion()) app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostAlice := runtime.NumGoroutine() numGoRoutinesPostAlice := runtime.NumGoroutine()
@ -316,25 +305,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
checkMessage(t, carol, carolGroupConversationID, 5, carolLines[0]) checkMessage(t, carol, carolGroupConversationID, 5, carolLines[0])
checkMessage(t, carol, carolGroupConversationID, 6, bobLines[2]) checkMessage(t, carol, carolGroupConversationID, 6, bobLines[2])
t.Logf("Shutting down Bob...") log.Infof("Shutting down Bob...")
app.ShutdownPeer(bob.GetOnion()) app.ShutdownPeer(bob.GetOnion())
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine() numGoRoutinesPostBob := runtime.NumGoroutine()
t.Logf("Shutting down Carol...") log.Infof("Shutting down Carol...")
app.ShutdownPeer(carol.GetOnion()) app.ShutdownPeer(carol.GetOnion())
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine() numGoRoutinesPostCarol := runtime.NumGoroutine()
t.Logf("Shutting down apps...") log.Infof("Shutting down apps...")
fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine()) log.Infof("app Shutdown: %v\n", runtime.NumGoroutine())
app.Shutdown() app.Shutdown()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
t.Logf("Done shutdown: %v\n", runtime.NumGoroutine()) log.Infof("Done shutdown: %v\n", runtime.NumGoroutine())
t.Logf("Shutting down ACN...") log.Infof("Shutting down ACN...")
acn.Restart() // kill all active tor connections... acn.Restart() // kill all active tor connections...
// acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock... // acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock...
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
@ -345,7 +334,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Very useful if we are leaking any. // Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Logf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)
@ -359,7 +348,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Utility function for sending a message from a peer to a group // Utility function for sending a message from a peer to a group
func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) {
name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
t.Logf("%v> %v\n", name, message) log.Infof("%v> %v\n", name, message)
err := profile.SendMessage(id, message) err := profile.SendMessage(id, message)
if err != nil { if err != nil {
t.Fatalf("Alice failed to send a message to the group: %v", err) t.Fatalf("Alice failed to send a message to the group: %v", err)