diff --git a/app/app.go b/app/app.go index 7c487fc..6c68e71 100644 --- a/app/app.go +++ b/app/app.go @@ -9,7 +9,6 @@ import ( "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/storage" - "fmt" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "io/ioutil" @@ -19,15 +18,10 @@ import ( "sync" ) -type applicationCore struct { - eventBuses map[string]event.Manager - - directory string - coremutex sync.Mutex -} - type application struct { - applicationCore + eventBuses map[string]event.Manager + directory string + coremutex sync.Mutex appletPeers appletACN appletPlugins @@ -60,30 +54,18 @@ type Application interface { // LoadProfileFn is the function signature for a function in an app that loads a profile type LoadProfileFn func(profile peer.CwtchPeer) -func newAppCore(appDirectory string) *applicationCore { - appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory} - os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700) - return appCore -} - // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) - app := &application{engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} + os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) + + app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} app.appletPeers.init() app.appletACN.init(acn, app.getACNStatusHandler()) return app } -func (ac *applicationCore) DeletePeer(onion string) { - ac.coremutex.Lock() - defer ac.coremutex.Unlock() - - ac.eventBuses[onion].Shutdown() - delete(ac.eventBuses, onion) -} - func (app *application) CreateTaggedPeer(name string, password string, tag string) { app.appmutex.Lock() defer app.appmutex.Unlock() @@ -127,7 +109,11 @@ func (app *application) DeletePeer(onion string, password string) { app.peers[onion].Delete() delete(app.peers, onion) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - app.applicationCore.DeletePeer(onion) + + app.coremutex.Lock() + defer app.coremutex.Unlock() + app.eventBuses[onion].Shutdown() + delete(app.eventBuses, onion) log.Debugf("Delete peer for %v Done\n", onion) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) @@ -145,63 +131,75 @@ func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { } // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them -func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error { - files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles")) +func (app *application) LoadProfiles(password string) { + count := 0 + migrating := false + + files, err := ioutil.ReadDir(path.Join(app.directory, "profiles")) if err != nil { - return fmt.Errorf("error: cannot read profiles directory: %v", err) + log.Errorf("error: cannot read profiles directory: %v", err) + return } for _, file := range files { // Attempt to load an encrypted database - profileDirectory := path.Join(ac.directory, "profiles", file.Name()) + profileDirectory := path.Join(app.directory, "profiles", file.Name()) profile, err := peer.FromEncryptedDatabase(profileDirectory, password) + loaded := false if err == nil { // return the load the profile... log.Infof("loading profile from new-type storage database...") - loadProfileFn(profile) + loaded = app.installProfile(profile) } else { // On failure attempt to load a legacy profile profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) if err != nil { continue } log.Infof("found legacy profile. importing to new database structure...") - legacyProfile := profileStore.GetProfileCopy(timeline) + legacyProfile := profileStore.GetProfileCopy(true) + if !migrating { + migrating = true + app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) + } cps, err := peer.CreateEncryptedStore(profileDirectory, password) if err != nil { log.Errorf("error creating encrypted store: %v", err) } profile := peer.ImportLegacyProfile(legacyProfile, cps) - loadProfileFn(profile) + loaded = app.installProfile(profile) + } + if loaded { + count++ } } - return nil -} - -// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them -func (app *application) LoadProfiles(password string) { - count := 0 - app.applicationCore.LoadProfiles(password, true, func(profile peer.CwtchPeer) { - app.appmutex.Lock() - // Only attempt to finalize the profile if we don't have one loaded... - if app.peers[profile.GetOnion()] == nil { - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile - app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) - count++ - } else { - // Otherwise shutdown the connections - profile.Shutdown() - } - app.appmutex.Unlock() - }) if count == 0 { message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) app.appBus.Publish(message) } + if migrating { + app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) + } +} + +// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true +func (app *application) installProfile(profile peer.CwtchPeer) bool { + app.appmutex.Lock() + defer app.appmutex.Unlock() + + // Only attempt to finalize the profile if we don't have one loaded... + if app.peers[profile.GetOnion()] == nil { + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) + return true + } + // Otherwise shutdown the connections + profile.Shutdown() + return false } // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific @@ -210,8 +208,8 @@ func (app *application) GetPrimaryBus() event.Manager { } // GetEventBus returns a cwtchPeer's event bus -func (ac *applicationCore) GetEventBus(onion string) event.Manager { - if manager, ok := ac.eventBuses[onion]; ok { +func (app *application) GetEventBus(onion string) event.Manager { + if manager, ok := app.eventBuses[onion]; ok { return manager } return nil diff --git a/event/common.go b/event/common.go index d9dbc19..28eed5a 100644 --- a/event/common.go +++ b/event/common.go @@ -55,10 +55,6 @@ const ( // GroupID: groupID (allows them to fetch from the peer) NewGroup = Type("NewGroup") - // GroupID - AcceptGroupInvite = Type("AcceptGroupInvite") - RejectGroupInvite = Type("RejectGroupInvite") - SendMessageToGroup = Type("SendMessagetoGroup") //Ciphertext, Signature: @@ -111,12 +107,6 @@ const ( // RemotePeer: The peer associated with the acknowledgement IndexedFailure = Type("IndexedFailure") - // UpdateMessageFlags will change the flags associated with a given message. - // Handle - // Message Index - // Flags - UpdateMessageFlags = Type("UpdateMessageFlags") - // attributes: // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"] // Error: string describing the error @@ -149,26 +139,6 @@ const ( // GroupID DeleteGroup = Type("DeleteGroup") - // request to store a profile-wide attribute (good for e.g. per-profile settings like theme prefs) - // attributes: - // Key [eg "fontcolor"] - // Data [eg "red"] - SetAttribute = Type("SetAttribute") - - // request to store a per-contact attribute (e.g. display names for a peer) - // attributes: - // RemotePeer [eg ""] - // Key [eg "nick"] - // Data [eg "erinn"] - SetPeerAttribute = Type("SetPeerAttribute") - - // request to store a per-cwtch-group attribute (e.g. display name for a group) - // attributes: - // GroupID [eg ""] - // Key [eg "nick"] - // Data [eg "open privacy board"] - SetGroupAttribute = Type("SetGroupAttribute") - // PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections // RemotePeer // ConnectionState @@ -180,9 +150,6 @@ const ( /***** Application client / service messages *****/ - // ProfileName, Password, Data(tag) - CreatePeer = Type("CreatePeer") - // app: Identity(onion), Created(bool) // service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')], Created(bool) NewPeer = Type("NewPeer") @@ -192,20 +159,6 @@ const ( // Identity(onion) PeerDeleted = Type("PeerDeleted") - // Identity(onion), Data(pluginID) - AddPeerPlugin = Type("AddPeerPlugin") - - // Password - LoadProfiles = Type("LoadProfiles") - - // Client has reloaded, triggers NewPeer s then ReloadDone - ReloadClient = Type("ReloadClient") - - ReloadDone = Type("ReloadDone") - - // Identity - Ask service to resend all connection states - ReloadPeer = Type("ReloadPeer") - // Identity(onion) ShutdownPeer = Type("ShutdownPeer") @@ -218,9 +171,6 @@ const ( // Error(err) AppError = Type("AppError") - GetACNStatus = Type("GetACNStatus") - GetACNVersion = Type("GetACNVersion") - // Progress, Status ACNStatus = Type("ACNStatus") @@ -233,10 +183,6 @@ const ( // Onion: the local onion we attempt to check NetworkStatus = Type("NetworkError") - // Notify the UI that a Server has been added - // Onion = Server Onion - ServerCreated = Type("ServerAdded") - // For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue Syn = Type("Syn") Ack = Type("Ack") @@ -256,6 +202,9 @@ const ( // Profile Attribute Event UpdatedProfileAttribute = Type("UpdatedProfileAttribute") + + StartingStorageMiragtion = Type("StartingStorageMigration") + DoneStorageMigration = Type("DoneStorageMigration") ) // Field defines common event attributes @@ -279,6 +228,7 @@ const ( ConversationID = Field("ConversationID") GroupID = Field("GroupID") GroupServer = Field("GroupServer") + GroupName = Field("GroupName") ServerTokenY = Field("ServerTokenY") ServerTokenOnion = Field("ServerTokenOnion") GroupInvite = Field("GroupInvite") @@ -337,12 +287,6 @@ const ( PasswordMatchError = "Password did not match" ) -// Values to be suplied in event.NewPeer for Status -const ( - StorageRunning = "running" - StorageNew = "new" -) - // Defining Protocol Contexts const ( ContextAck = "im.cwtch.acknowledgement" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 547f132..f394fc5 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -434,7 +434,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey)) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName) - cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite})) + cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite, event.GroupName: gci.GroupName})) cp.JoinServer(gci.ServerHost) } return groupConversationID, err @@ -588,6 +588,7 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) { event.ConversationID: strconv.Itoa(conversationID), event.GroupID: group.GroupID, event.GroupServer: group.GroupServer, + event.GroupName: name, })) return conversationID, nil } @@ -652,6 +653,7 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v) } cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))), serverSpecification) + cp.JoinServer(onion) return onion, err } return "", err diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 58cc446..067c9f2 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -24,6 +24,11 @@ import ( "golang.org/x/crypto/ed25519" ) +type connectionLockedService struct { + service tapir.Service + connectingLock sync.Mutex +} + type engine struct { queue event.Queue @@ -46,7 +51,8 @@ type engine struct { getValRequests sync.Map // [string]string eventID:Data // Nextgen Tapir Service - ephemeralServices sync.Map // string(onion) => tapir.Service + ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service + ephemeralServicesLock sync.Mutex // Required for listen(), inaccessible from identity privateKey ed25519.PrivateKey @@ -72,6 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine := new(engine) engine.identity = identity engine.privateKey = privateKey + engine.ephemeralServices = make(map[string]*connectionLockedService) engine.queue = event.NewQueue() go engine.eventHandler() @@ -275,14 +282,12 @@ func (e *engine) Shutdown() { e.shuttingDown = true e.service.Shutdown() - e.ephemeralServices.Range(func(_, service interface{}) bool { - connection, ok := service.(*tor.BaseOnionService) - if ok { - log.Infof("shutting down ephemeral service") - connection.Shutdown() - } - return true - }) + e.ephemeralServicesLock.Lock() + defer e.ephemeralServicesLock.Unlock() + for _, connection := range e.ephemeralServices { + log.Infof("shutting down ephemeral service") + connection.service.Shutdown() + } e.queue.Shutdown() } @@ -316,23 +321,31 @@ func (e *engine) peerWithOnion(onion string) { // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { + e.ephemeralServicesLock.Lock() + connectionService, exists := e.ephemeralServices[onion] - service, exists := e.ephemeralServices.Load(onion) - if exists { - connection := service.(*tor.BaseOnionService) - if conn, err := connection.GetConnection(onion); err == nil { + if exists && connectionService.service != nil { + if conn, err := connectionService.service.GetConnection(onion); err == nil { // We are already peered and synced so return... - // This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called + // This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called // in CwtchPeer. if !conn.IsClosed() && len(lastKnownSignature) != 0 { + e.ephemeralServicesLock.Unlock() return } // Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)... - e.leaveServer(onion) + connectionService.service.Shutdown() } // Otherwise...let's reconnect } + connectionService = &connectionLockedService{} + e.ephemeralServices[onion] = connectionService + + connectionService.connectingLock.Lock() + defer connectionService.connectingLock.Unlock() + e.ephemeralServicesLock.Unlock() + log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) e.ignoreOnShutdown(e.serverConnecting)(onion) // Create a new ephemeral service for this connection @@ -343,7 +356,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke Y := ristretto255.NewElement() Y.UnmarshalText([]byte(tokenServerY)) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected))) - e.ephemeralServices.Store(onion, ephemeralService) + e.ephemeralServicesLock.Lock() + e.ephemeralServices[onion].service = ephemeralService + e.ephemeralServicesLock.Unlock() // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) if connected && err != nil { @@ -486,7 +501,6 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes // sendMessageToGroup attempts to sent the given message to the given group id. func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { - // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) // rather than trying to keep all that logic in method we simply back-off and try again // but if we fail more than 5 times then we report back to the client so they can investigate other options. @@ -498,14 +512,16 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si return } - es, ok := e.ephemeralServices.Load(server) - if es == nil || !ok { + e.ephemeralServicesLock.Lock() + ephemeralService, ok := e.ephemeralServices[server] + e.ephemeralServicesLock.Unlock() + + if ephemeralService == nil || !ok { e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)})) return } - ephemeralService := es.(tapir.Service) - conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) + conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { @@ -615,12 +631,14 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val)) } +// leaveServer disconnects from a server and deletes the ephemeral service func (e *engine) leaveServer(server string) { - es, ok := e.ephemeralServices.Load(server) + e.ephemeralServicesLock.Lock() + defer e.ephemeralServicesLock.Unlock() + ephemeralService, ok := e.ephemeralServices[server] if ok { - ephemeralService := es.(tapir.Service) - ephemeralService.Shutdown() - e.ephemeralServices.Delete(server) + ephemeralService.service.Shutdown() + delete(e.ephemeralServices, server) } } diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index dddff4f..5b41bd0 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -13,7 +13,6 @@ import ( "cwtch.im/cwtch/protocol/connections" "encoding/base64" "encoding/json" - "fmt" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" _ "github.com/mutecomm/go-sqlcipher/v4" @@ -36,18 +35,18 @@ var ( func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) { peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name) for { - fmt.Printf("%v checking connection...\n", peerName) + log.Infof("%v checking connection...\n", peerName) state := peer.GetPeerState(addr) - fmt.Printf("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state) + log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state) if state == connections.FAILED { t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr) } if state != target { - fmt.Printf("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state]) + log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state]) time.Sleep(time.Second * 5) continue } else { - fmt.Printf("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr) + log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr) break } } @@ -106,41 +105,41 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** cwtchPeer setup ***** - fmt.Println("Creating Alice...") + log.Infoln("Creating Alice...") app.CreateTaggedPeer("Alice", "asdfasdf", "test") - fmt.Println("Creating Bob...") + log.Infoln("Creating Bob...") app.CreateTaggedPeer("Bob", "asdfasdf", "test") - fmt.Println("Creating Carol...") + log.Infoln("Creating Carol...") app.CreateTaggedPeer("Carol", "asdfasdf", "test") alice := utils.WaitGetPeer(app, "Alice") - fmt.Println("Alice created:", alice.GetOnion()) + log.Infoln("Alice created:", alice.GetOnion()) alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob := utils.WaitGetPeer(app, "Bob") - fmt.Println("Bob created:", bob.GetOnion()) + log.Infoln("Bob created:", bob.GetOnion()) bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol := utils.WaitGetPeer(app, "Carol") - fmt.Println("Carol created:", carol.GetOnion()) + log.Infoln("Carol created:", carol.GetOnion()) carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) app.LaunchPeers() waitTime := time.Duration(60) * time.Second - t.Logf("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime) + log.Infof("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime) time.Sleep(waitTime) numGoRoutinesPostPeerStart := runtime.NumGoroutine() - fmt.Println("** Wait Done!") + log.Infof("** Wait Done!") // ***** Peering, server joining, group creation / invite ***** - fmt.Println("Alice peering with Bob...") + log.Infoln("Alice peering with Bob...") // Simulate Alice Adding Bob alice2bobConversationID, err := alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) if err != nil { @@ -151,7 +150,7 @@ func TestCwtchPeerIntegration(t *testing.T) { t.Fatalf("error adding conversaiton %v", bob2aliceConversationID) } - t.Logf("Alice peering with Carol...") + log.Infof("Alice peering with Carol...") // Simulate Alice Adding Carol alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true) if err != nil { @@ -170,7 +169,7 @@ func TestCwtchPeerIntegration(t *testing.T) { waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED) waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED) - t.Logf("Alice and Bob getVal public.name...") + log.Infof("Alice and Bob getVal public.name...") alice.SendScopedZonedGetValToContact(alice2bobConversationID, attr.PublicScope, attr.ProfileZone, constants.Name) bob.SendScopedZonedGetValToContact(bob2aliceConversationID, attr.PublicScope, attr.ProfileZone, constants.Name) @@ -186,13 +185,13 @@ func TestCwtchPeerIntegration(t *testing.T) { if err != nil || aliceName != "Alice" { t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err) } - fmt.Printf("Bob has alice's name as '%v'\n", aliceName) + log.Infof("Bob has alice's name as '%v'\n", aliceName) bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || bobName != "Bob" { t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err) } - fmt.Printf("Alice has bob's name as '%v'\n", bobName) + log.Infof("Alice has bob's name as '%v'\n", bobName) aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || aliceName != "Alice" { @@ -203,37 +202,33 @@ func TestCwtchPeerIntegration(t *testing.T) { if err != nil || carolName != "Carol" { t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err) } - fmt.Printf("Alice has carol's name as '%v'\n", carolName) + log.Infof("Alice has carol's name as '%v'\n", carolName) // Group Testing // Simulate Alice Creating a Group - fmt.Println("Alice joining server...") + log.Infoln("Alice joining server...") if _, err := alice.AddServer(string(serverKeyBundle)); err != nil { t.Fatalf("Failed to Add Server Bundle %v", err) } - bob.AddServer(string(serverKeyBundle)) + // Ading here will require resync carol.AddServer(string(serverKeyBundle)) - t.Logf("Waiting for alice to join server...") - err = alice.JoinServer(ServerAddr) - if err != nil { - t.Fatalf("alice cannot join server %v %v", ServerAddr, err) - } + log.Infof("Waiting for alice to join server...") waitForConnection(t, alice, ServerAddr, connections.SYNCED) // Creating a Group - t.Logf("Creating group on %v...", ServerAddr) + log.Infof("Creating group on %v...", ServerAddr) aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr) - t.Logf("Created group: %v!\n", aliceGroupConversationID) + log.Infof("Created group: %v!\n", aliceGroupConversationID) if err != nil { t.Errorf("Failed to init group: %v", err) return } // Invites - fmt.Println("Alice inviting Bob to group...") + log.Infoln("Alice inviting Bob to group...") err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID) if err != nil { t.Fatalf("Error for Alice inviting Bob to group: %v", err) @@ -242,25 +237,22 @@ func TestCwtchPeerIntegration(t *testing.T) { // Alice invites Bob to the Group... message, _, err := bob.GetChannelMessage(bob2aliceConversationID, 0, 1) - t.Logf("Alice message to Bob %v %v", message, err) + log.Infof("Alice message to Bob %v %v", message, err) var overlayMessage model.MessageWrapper json.Unmarshal([]byte(message), &overlayMessage) - t.Logf("Parsed Overlay Message: %v", overlayMessage) + log.Infof("Parsed Overlay Message: %v", overlayMessage) err = bob.ImportBundle(overlayMessage.Data) - t.Logf("Result of Bob Importing the Bundle from Alice: %v", err) + log.Infof("Result of Bob Importing the Bundle from Alice: %v", err) - t.Logf("Waiting for Bob to join connect to group server...") - err = bob.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus - if err != nil { - t.Fatalf("alice cannot join server %v %v", ServerAddr, err) - } + log.Infof("Waiting for Bob to join connect to group server...") + time.Sleep(2 * time.Second) bobGroupConversationID := 3 waitForConnection(t, bob, ServerAddr, connections.SYNCED) numGoRoutinesPostServerConnect := runtime.NumGoroutine() // ***** Conversation ***** - t.Logf("Starting conversation in group...") + log.Infof("Starting conversation in group...") checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0]) checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0]) checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1]) @@ -268,30 +260,27 @@ func TestCwtchPeerIntegration(t *testing.T) { // Alice invites Bob to the Group... message, _, err = carol.GetChannelMessage(carol2aliceConversationID, 0, 1) - t.Logf("Alice message to Carol %v %v", message, err) + log.Infof("Alice message to Carol %v %v", message, err) json.Unmarshal([]byte(message), &overlayMessage) - t.Logf("Parsed Overlay Message: %v", overlayMessage) + log.Infof("Parsed Overlay Message: %v", overlayMessage) err = carol.ImportBundle(overlayMessage.Data) - t.Logf("Result of Carol Importing the Bundle from Alice: %v", err) + log.Infof("Result of Carol Importing the Bundle from Alice: %v", err) - t.Logf("Waiting for Carol to join connect to group server...") - err = carol.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus - if err != nil { - t.Fatalf("carol cannot join server %v %v", ServerAddr, err) - } + log.Infof("Waiting for Carol to join connect to group server...") + carol.ResyncServer(ServerAddr) + time.Sleep(2 * time.Second) carolGroupConversationID := 3 waitForConnection(t, carol, ServerAddr, connections.SYNCED) numGoRoutinesPostCarolConnect := runtime.NumGoroutine() - t.Logf("Shutting down Alice...") - // Check Alice Timeline checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0]) checkMessage(t, alice, aliceGroupConversationID, 2, bobLines[0]) checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1]) checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1]) + log.Infof("Shutting down Alice...") app.ShutdownPeer(alice.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostAlice := runtime.NumGoroutine() @@ -316,25 +305,25 @@ func TestCwtchPeerIntegration(t *testing.T) { checkMessage(t, carol, carolGroupConversationID, 5, carolLines[0]) checkMessage(t, carol, carolGroupConversationID, 6, bobLines[2]) - t.Logf("Shutting down Bob...") + log.Infof("Shutting down Bob...") app.ShutdownPeer(bob.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostBob := runtime.NumGoroutine() - t.Logf("Shutting down Carol...") + log.Infof("Shutting down Carol...") app.ShutdownPeer(carol.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine() - t.Logf("Shutting down apps...") - fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine()) + log.Infof("Shutting down apps...") + log.Infof("app Shutdown: %v\n", runtime.NumGoroutine()) app.Shutdown() time.Sleep(2 * time.Second) - t.Logf("Done shutdown: %v\n", runtime.NumGoroutine()) + log.Infof("Done shutdown: %v\n", runtime.NumGoroutine()) - t.Logf("Shutting down ACN...") + log.Infof("Shutting down ACN...") acn.Restart() // kill all active tor connections... // acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock... time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them @@ -345,7 +334,7 @@ func TestCwtchPeerIntegration(t *testing.T) { // Very useful if we are leaking any. pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - t.Logf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ + log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) @@ -359,7 +348,7 @@ func TestCwtchPeerIntegration(t *testing.T) { // Utility function for sending a message from a peer to a group func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) - t.Logf("%v> %v\n", name, message) + log.Infof("%v> %v\n", name, message) err := profile.SendMessage(id, message) if err != nil { t.Fatalf("Alice failed to send a message to the group: %v", err)