From 04dd8fa89ca2666de4b599e72abc8aa38b80d4b8 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Wed, 5 Jun 2019 13:40:55 -0700 Subject: [PATCH] App Client/Service: new IPCBridge type and test gochan impl; new IPC using eventManager; new App Client and Service; some app api changes and a few more events (NewPeer) and errors (Loading errors) --- app/app.go | 185 +++++++++++------- app/appBridge.go | 34 ++++ app/appClient.go | 107 ++++++++++ app/appService.go | 110 +++++++++++ app/bots/servermon/main.go | 2 +- app/cli/main.go | 48 +++-- app/peer/alice/alice.go | 3 +- app/peer/bob/bob.go | 3 +- event/common.go | 35 +++- event/eventmanager.go | 45 ++++- event/eventmanager_test.go | 9 +- event/eventmanageripc.go | 38 ++++ event/ipc.go | 71 +++++++ peer/cwtch_peer.go | 6 +- protocol/connections/engine.go | 10 +- .../connections/peerpeerconnection_test.go | 3 +- .../connections/peerserverconnection_test.go | 3 +- storage/profile_store.go | 19 +- storage/profile_store_test.go | 3 +- testing/cwtch_peer_server_integration_test.go | 67 +++++-- 20 files changed, 652 insertions(+), 149 deletions(-) create mode 100644 app/appBridge.go create mode 100644 app/appClient.go create mode 100644 app/appService.go create mode 100644 event/eventmanageripc.go create mode 100644 event/ipc.go diff --git a/app/app.go b/app/app.go index dbe0177..5cfe688 100644 --- a/app/app.go +++ b/app/app.go @@ -2,6 +2,7 @@ package app import ( "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/storage" @@ -16,86 +17,114 @@ import ( "sync" ) +type applicationCore struct { + eventBuses map[string]event.Manager + + acn connectivity.ACN + directory string + mutex sync.Mutex +} + +type applicationPeers struct { + peers map[string]peer.CwtchPeer +} + type application struct { - peers map[string]peer.CwtchPeer - storage map[string]storage.ProfileStore - engines map[string]connections.Engine - eventBuses map[string]*event.Manager - acn connectivity.ACN - directory string - mutex sync.Mutex - primaryonion string + applicationCore + applicationPeers + storage map[string]storage.ProfileStore + engines map[string]connections.Engine + appBus event.Manager } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers type Application interface { - LoadProfiles(password string) error - CreatePeer(name string, password string) (peer.CwtchPeer, error) - - PrimaryIdentity() peer.CwtchPeer - GetPeer(onion string) peer.CwtchPeer - ListPeers() map[string]string - GetEventBus(onion string) *event.Manager + LoadProfiles(password string) + CreatePeer(name string, password string) LaunchPeers() + GetPrimaryBus() event.Manager + GetEventBus(onion string) event.Manager + ShutdownPeer(string) Shutdown() + + GetPeer(onion string) peer.CwtchPeer + ListPeers() map[string]string +} + +// LoadProfileFn is the function signature for a function in an app that loads a profile +type LoadProfileFn func(profile *model.Profile, store storage.ProfileStore) + +func newAppCore(acn connectivity.ACN, appDirectory string) *applicationCore { + appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory, acn: acn} + os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700) + return appCore } // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) - app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), eventBuses: make(map[string]*event.Manager), directory: appDirectory, acn: acn} - os.Mkdir(path.Join(app.directory, "profiles"), 0700) + app := &application{storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(acn, appDirectory), appBus: event.NewEventManager()} return app } -// NewProfile creates a new cwtchPeer with a given name. -func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) { +// CreatePeer creates a new Peer with a given name and core required accessories (eventbus) +func (ac *applicationCore) CreatePeer(name string, password string) (*model.Profile, error) { log.Debugf("CreatePeer(%v)\n", name) - eventBus := new(event.Manager) - eventBus.Initialize() - profile := storage.NewProfile(name) - profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile) - pc := profileStore.GetProfileCopy() - p := peer.FromProfile(pc) - _, exists := app.peers[p.GetProfile().Onion] + + ac.mutex.Lock() + defer ac.mutex.Unlock() + + _, exists := ac.eventBuses[profile.Onion] if exists { - profileStore.Shutdown() - eventBus.Shutdown() - return nil, fmt.Errorf("Error: profile for onion %v already exists", p.GetProfile().Onion) + return nil, fmt.Errorf("Error: profile for onion %v already exists", profile.Onion) } - p.Init(app.acn, eventBus) + + eventBus := event.NewEventManager() + ac.eventBuses[profile.Onion] = eventBus + + return profile, nil +} + +// CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine) +func (app *application) CreatePeer(name string, password string) { + profile, err := app.applicationCore.CreatePeer(name, password) + if err != nil { + app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) + return + } + + profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile) + app.storage[profile.Onion] = profileStore + + pc := app.storage[profile.Onion].GetProfileCopy() + peer := peer.FromProfile(pc) + peer.Init(app.acn, app.eventBuses[profile.Onion]) blockedPeers := profile.BlockedPeers() // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) - engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], blockedPeers) - app.mutex.Lock() - app.peers[p.GetProfile().Onion] = p - app.storage[p.GetProfile().Onion] = profileStore - app.engines[p.GetProfile().Onion] = engine - app.eventBuses[p.GetProfile().Onion] = eventBus - app.mutex.Unlock() + app.peers[profile.Onion] = peer + app.engines[profile.Onion] = engine - return p, nil + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) } -func (app *application) LoadProfiles(password string) error { - files, err := ioutil.ReadDir(path.Join(app.directory, "profiles")) +// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them +func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfileFn) error { + files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles")) if err != nil { return fmt.Errorf("Error: cannot read profiles directory: %v", err) } for _, file := range files { - - eventBus := new(event.Manager) - eventBus.Initialize() - - profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil) + eventBus := event.NewEventManager() + profileStore := storage.NewProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password, nil) err = profileStore.Load() if err != nil { continue @@ -103,7 +132,7 @@ func (app *application) LoadProfiles(password string) error { profile := profileStore.GetProfileCopy() - _, exists := app.peers[profile.Onion] + _, exists := ac.eventBuses[profile.Onion] if exists { profileStore.Shutdown() eventBus.Shutdown() @@ -111,29 +140,41 @@ func (app *application) LoadProfiles(password string) error { continue } - peer := peer.FromProfile(profile) - peer.Init(app.acn, eventBus) + ac.mutex.Lock() + ac.eventBuses[profile.Onion] = eventBus + ac.mutex.Unlock() - blockedPeers := profile.BlockedPeers() - identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) - engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers) - - app.mutex.Lock() - app.peers[profile.Onion] = peer - app.storage[profile.Onion] = profileStore - app.engines[profile.Onion] = engine - app.eventBuses[profile.Onion] = eventBus - if app.primaryonion == "" { - app.primaryonion = profile.Onion - } - app.mutex.Unlock() + loadProfileFn(profile, profileStore) } return nil } +// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them +func (app *application) LoadProfiles(password string) { + app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { + peer := peer.FromProfile(profile) + peer.Init(app.acn, app.eventBuses[profile.Onion]) + + blockedPeers := profile.BlockedPeers() + identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], blockedPeers) + app.mutex.Lock() + app.peers[profile.Onion] = peer + app.storage[profile.Onion] = profileStore + app.engines[profile.Onion] = engine + app.mutex.Unlock() + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) + }) +} + +// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific +func (app *application) GetPrimaryBus() event.Manager { + return app.appBus +} + // LaunchPeers starts each peer Listening and connecting to peers and groups -func (app *application) LaunchPeers() { - for _, p := range app.peers { +func (appPeers *applicationPeers) LaunchPeers() { + for _, p := range appPeers.peers { if !p.IsStarted() { p.Listen() p.StartPeersConnections() @@ -143,30 +184,25 @@ func (app *application) LaunchPeers() { } // ListPeers returns a map of onions to their profile's Name -func (app *application) ListPeers() map[string]string { +func (appPeers *applicationPeers) ListPeers() map[string]string { keys := map[string]string{} - for k, p := range app.peers { + for k, p := range appPeers.peers { keys[k] = p.GetProfile().Name } return keys } -// PrimaryIdentity returns a cwtchPeer for a given onion address -func (app *application) PrimaryIdentity() peer.CwtchPeer { - return app.peers[app.primaryonion] -} - // GetPeer returns a cwtchPeer for a given onion address -func (app *application) GetPeer(onion string) peer.CwtchPeer { - if peer, ok := app.peers[onion]; ok { +func (appPeers *applicationPeers) GetPeer(onion string) peer.CwtchPeer { + if peer, ok := appPeers.peers[onion]; ok { return peer } return nil } // GetEventBus returns a cwtchPeer's event bus -func (app *application) GetEventBus(onion string) *event.Manager { - if manager, ok := app.eventBuses[onion]; ok { +func (ac *applicationCore) GetEventBus(onion string) event.Manager { + if manager, ok := ac.eventBuses[onion]; ok { return manager } return nil @@ -194,4 +230,5 @@ func (app *application) Shutdown() { app.storage[id].Shutdown() app.eventBuses[id].Shutdown() } + app.appBus.Shutdown() } diff --git a/app/appBridge.go b/app/appBridge.go new file mode 100644 index 0000000..07bcd43 --- /dev/null +++ b/app/appBridge.go @@ -0,0 +1,34 @@ +package app + +import "cwtch.im/cwtch/event" +import "git.openprivacy.ca/openprivacy/libricochet-go/log" + +type applicationBridge struct { + applicationCore + + bridge event.IPCBridge + handle func(*event.Event) +} + +func (ab *applicationBridge) listen() { + log.Infoln("ab.listen()") + for { + ipcMessage, ok := ab.bridge.Read() + log.Infof("listen() got %v\n", ipcMessage) + if !ok { + return + } + + if ipcMessage.Dest == DestApp { + ab.handle(&ipcMessage.Message) + } else { + if eventBus, exists := ab.eventBuses[ipcMessage.Dest]; exists { + eventBus.PublishLocal(ipcMessage.Message) + } + } + } +} + +func (ab *applicationBridge) Shutdown() { + ab.bridge.Shutdown() +} diff --git a/app/appClient.go b/app/appClient.go new file mode 100644 index 0000000..da8ad78 --- /dev/null +++ b/app/appClient.go @@ -0,0 +1,107 @@ +package app + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/storage" + "fmt" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + + "path" +) + +type applicationClient struct { + applicationBridge + applicationPeers + + appBus event.Manager +} + +// NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied +func NewAppClient(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) Application { + appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}, appBus: event.NewEventManager()} + appClient.handle = appClient.handleEvent + + go appClient.listen() + + return appClient +} + +// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific +func (ac *applicationClient) GetPrimaryBus() event.Manager { + return ac.appBus +} + +func (ac *applicationClient) handleEvent(ev *event.Event) { + switch ev.EventType { + case event.NewPeer: + localID := ev.Data[event.Identity] + password := ev.Data[event.Password] + ac.newPeer(localID, password) + case event.PeerError: + ac.appBus.Publish(*ev) + case event.AppError: + ac.appBus.Publish(*ev) + } +} + +func (ac *applicationClient) newPeer(localID, password string) { + profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password) + if err != nil { + log.Errorf("Could not read profile for NewPeer event: %v\n", err) + ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("Could not read profile for NewPeer event: %v\n", err))) + return + } + + _, exists := ac.peers[profile.Onion] + if exists { + log.Errorf("profile for onion %v already exists", profile.Onion) + ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("profile for onion %v already exists", profile.Onion))) + return + } + + eventBus := event.NewIPCEventManager(ac.bridge, profile.Onion) + peer := peer.FromProfile(profile) + peer.Init(ac.acn, eventBus) + + ac.mutex.Lock() + defer ac.mutex.Unlock() + ac.peers[profile.Onion] = peer + ac.eventBuses[profile.Onion] = eventBus + ac.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) +} + +// CreatePeer messages the service to create a new Peer with the given name +func (ac *applicationClient) CreatePeer(name string, password string) { + log.Infof("appClient CreatePeer %v\n", name) + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.CreatePeer, map[event.Field]string{event.ProfileName: name, event.Password: password})} + ac.bridge.Write(&message) +} + +// LoadProfiles messages the service to load any profiles for the given password +func (ac *applicationClient) LoadProfiles(password string) { + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.LoadProfiles, map[event.Field]string{event.Password: password})} + ac.bridge.Write(&message) +} + +// ShutdownPeer shuts down a peer and removes it from the app's management +func (ac *applicationClient) ShutdownPeer(onion string) { + ac.mutex.Lock() + defer ac.mutex.Unlock() + ac.eventBuses[onion].Shutdown() + delete(ac.eventBuses, onion) + ac.peers[onion].Shutdown() + delete(ac.peers, onion) + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.ShutdownPeer, map[event.Field]string{event.Identity: onion})} + ac.bridge.Write(&message) +} + +// Shutdown shuts down the application lcienr and all front end peer components +func (ac *applicationClient) Shutdown() { + for id := range ac.peers { + ac.ShutdownPeer(id) + } + ac.applicationBridge.Shutdown() + ac.appBus.Shutdown() +} diff --git a/app/appService.go b/app/appService.go new file mode 100644 index 0000000..78a7594 --- /dev/null +++ b/app/appService.go @@ -0,0 +1,110 @@ +package app + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/protocol/connections" + "cwtch.im/cwtch/storage" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/identity" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "path" +) + +const ( + // DestApp should be used as a destination for IPC messages that are for the application itself an not a peer + DestApp = "app" +) + +type applicationService struct { + applicationBridge + + storage map[string]storage.ProfileStore + engines map[string]connections.Engine +} + +// ApplicationService is the back end of an application that manages engines and writing storage and communicates to an ApplicationClient by an IPCBridge +type ApplicationService interface { + Shutdown() +} + +// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge +func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService { + appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}} + appService.handle = appService.handleEvent + + // Set up IPC + + // attach to listener + go appService.listen() + + return appService +} + +func (as *applicationService) handleEvent(ev *event.Event) { + log.Infof("app Service handleEvent %v\n", ev.EventType) + switch ev.EventType { + case event.CreatePeer: + profileName := ev.Data[event.ProfileName] + password := ev.Data[event.Password] + as.createPeer(profileName, password) + case event.LoadProfiles: + password := ev.Data[event.Password] + as.loadProfiles(password) + } +} + +func (as *applicationService) createPeer(name, password string) { + log.Infof("app Service create peer %v %v\n", name, password) + profile, err := as.applicationCore.CreatePeer(name, password) + as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion]) + if err != nil { + log.Errorf("Could not create Peer: %v\n", err) + message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.PeerError, event.Error, err.Error())} + as.bridge.Write(&message) + return + } + + profileStore := storage.NewProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile) + + blockedPeers := profile.BlockedPeers() + // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. + identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers) + + as.storage[profile.Onion] = profileStore + as.engines[profile.Onion] = engine + + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})} + as.bridge.Write(&message) +} + +func (as *applicationService) loadProfiles(password string) { + count := 0 + as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { + blockedPeers := profile.BlockedPeers() + identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers) + as.mutex.Lock() + as.storage[profile.Onion] = profileStore + as.engines[profile.Onion] = engine + as.mutex.Unlock() + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})} + as.bridge.Write(&message) + count++ + }) + if count == 0 { + message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)} + as.bridge.Write(&message) + + } +} + +// Shutdown shuts down the application Service and all peer related backend parts +func (as *applicationService) Shutdown() { + for id := range as.engines { + as.engines[id].Shutdown() + as.storage[id].Shutdown() + as.eventBuses[id].Shutdown() + } +} diff --git a/app/bots/servermon/main.go b/app/bots/servermon/main.go index e82d2e5..58a47cd 100644 --- a/app/bots/servermon/main.go +++ b/app/bots/servermon/main.go @@ -47,7 +47,7 @@ func main() { } botPeer := peer.NewCwtchPeer("servermon") - botPeer.Init(acn, new(event.Manager)) + botPeer.Init(acn, event.NewEventManager()) fmt.Printf("Connecting to %v...\n", serverAddr) botPeer.JoinServer(serverAddr) diff --git a/app/cli/main.go b/app/cli/main.go index 3fed1d5..d4dc549 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -2,6 +2,7 @@ package main import ( app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/event" peer2 "cwtch.im/cwtch/peer" "bytes" @@ -207,6 +208,32 @@ func completer(d prompt.Document) []prompt.Suggest { return s } +func handleAppEvents(em event.Manager) { + queue := event.NewEventQueue(100) + em.Subscribe(event.NewPeer, queue.EventChannel) + em.Subscribe(event.PeerError, queue.EventChannel) + + for { + ev := queue.Next() + switch ev.EventType { + case event.NewPeer: + onion := ev.Data[event.Identity] + p := app.GetPeer(onion) + app.LaunchPeers() + + fmt.Printf("\nLoaded profile %v (%v)\n", p.GetProfile().Name, p.GetProfile().Onion) + suggestions = append(suggestionsBase, suggestionsSelectedProfile...) + + profiles := app.ListPeers() + fmt.Printf("\n%v profiles active now\n", len(profiles)) + fmt.Printf("You should run `select-profile` to use a profile or `list-profiles` to view loaded profiles\n") + case event.PeerError: + err := ev.Data[event.Error] + fmt.Printf("\nError creating profile: %v\n", err) + } + } +} + func main() { cwtch := @@ -260,6 +287,7 @@ func main() { os.Exit(1) } app = app2.NewApp(acn, path.Join(usr.HomeDir, ".cwtch")) + go handleAppEvents(app.GetPrimaryBus()) if err != nil { log.Errorf("Error initializing application: %v", err) os.Exit(1) @@ -340,17 +368,7 @@ func main() { if failcount >= 3 { fmt.Printf("Error creating profile for %v: Your password entries must match!\n", name) } else { - p, err := app.CreatePeer(name, password) - app.LaunchPeers() - if err == nil { - stopGroupFollow() - fmt.Printf("\nNew profile created for %v\n", name) - peer = p - suggestions = append(suggestionsBase, suggestionsSelectedProfile...) - - } else { - fmt.Printf("\nError creating profile for %v: %v\n", name, err) - } + app.CreatePeer(name, password) } } else { fmt.Printf("Error creating New Profile, usage: %s\n", usages[commands[0]]) @@ -364,12 +382,10 @@ func main() { continue } - err = app.LoadProfiles(string(bytePassword)) + app.LoadProfiles(string(bytePassword)) + if err == nil { - app.LaunchPeers() - profiles := app.ListPeers() - fmt.Printf("\n%v profiles active now\n", len(profiles)) - fmt.Printf("You should run `select-profile` to use a profile or `list-profiles` to view loaded profiles\n") + } else { fmt.Printf("\nError loading profiles: %v\n", err) } diff --git a/app/peer/alice/alice.go b/app/peer/alice/alice.go index 9d4998f..348ebb2 100644 --- a/app/peer/alice/alice.go +++ b/app/peer/alice/alice.go @@ -22,8 +22,7 @@ func main() { } // Setup the Event Bus to Listen for Data Packets - eventBus := new(event.Manager) - eventBus.Initialize() + eventBus := event.NewEventManager() queue := event.NewEventQueue(100) eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel) diff --git a/app/peer/bob/bob.go b/app/peer/bob/bob.go index f663f95..7df4aef 100644 --- a/app/peer/bob/bob.go +++ b/app/peer/bob/bob.go @@ -22,8 +22,7 @@ func main() { } // Set up the Event Buss and Initialize the Peer - eventBus := new(event.Manager) - eventBus.Initialize() + eventBus := event.NewEventManager() bob := peer.NewCwtchPeer("bob") bob.Init(acn, eventBus) diff --git a/event/common.go b/event/common.go index 7e87e37..55d8331 100644 --- a/event/common.go +++ b/event/common.go @@ -33,10 +33,10 @@ const ( EncryptedGroupMessage = Type("EncryptedGroupMessage") NewMessageFromGroup = Type("NewMessageFromGroup") - // an error was encountered trying to send a particular message to a group + // an error was encountered trying to send a particular Message to a group // attributes: - // GroupServer: The server the message was sent to - // Signature: The signature of the message that failed to send + // GroupServer: The server the Message was sent to + // Signature: The signature of the Message that failed to send // Error: string describing the error SendMessageToGroupError = Type("SendMessageToGroupError") @@ -92,6 +92,29 @@ const ( // GroupServer // ConnectionState ServerStateChange = Type("GroupStateChange") + + /***** Application client / service messages *****/ + + // ProfileName, Password + CreatePeer = Type("CreatePeer") + + // service -> client: Identity(localId), Password + // app -> Identity(onion) + NewPeer = Type("NewPeer") + + // Password + LoadProfiles = Type("LoadProfiles") + + // Identity(onion) + ShutdownPeer = Type("ShutdownPeer") + + Shutdown = Type("Shutdown") + + // Error(err) + PeerError = Type("PeerError") + + // Error(err) + AppError = Type("AppError") ) // Field defines common event attributes @@ -113,6 +136,7 @@ const ( GroupInvite = Field("GroupInvite") ProfileName = Field("ProfileName") + Password = Field("Password") ConnectionState = Field("ConnectionState") @@ -121,3 +145,8 @@ const ( Error = Field("Error") ) + +// Defining Common errors +const ( + AppErrLoaded0 = "Loaded 0 profiles" +) diff --git a/event/eventmanager.go b/event/eventmanager.go index 3334226..c62d837 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -18,8 +18,21 @@ func NewEvent(eventType Type, data map[Field]string) Event { return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data} } +// NewEventList creates a new event object with a unique ID and the given type and data supplied in a list format and composed into a map of Type:string +func NewEventList(eventType Type, args ...interface{}) Event { + data := map[Field]string{} + for i := 0; i < len(args); i += 2 { + key, kok := args[i].(Field) + val, vok := args[i+1].(string) + if kok && vok { + data[key] = val + } + } + return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data} +} + // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. -type Manager struct { +type manager struct { subscribers map[Type][]chan Event events chan Event mapMutex sync.Mutex @@ -27,8 +40,23 @@ type Manager struct { closed bool } +// Manager is an interface for an event bus +type Manager interface { + Subscribe(Type, chan Event) + Publish(Event) + PublishLocal(Event) + Shutdown() +} + +// NewEventManager returns an initialized EventManager +func NewEventManager() Manager { + em := &manager{} + em.initialize() + return em +} + // Initialize sets up the Manager. -func (em *Manager) Initialize() { +func (em *manager) initialize() { em.subscribers = make(map[Type][]chan Event) em.events = make(chan Event) em.internal = make(chan bool) @@ -38,21 +66,26 @@ func (em *Manager) Initialize() { // Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type // will be sent to the eventChannel. -func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) { +func (em *manager) Subscribe(eventType Type, eventChannel chan Event) { em.mapMutex.Lock() defer em.mapMutex.Unlock() em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel) } // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers -func (em *Manager) Publish(event Event) { +func (em *manager) Publish(event Event) { if event.EventType != "" && em.closed != true { em.events <- event } } +// Publish an event only locally, not going over an IPC bridge if there is one +func (em *manager) PublishLocal(event Event) { + em.Publish(event) +} + // eventBus is an internal function that is used to distribute events to all subscribers -func (em *Manager) eventBus() { +func (em *manager) eventBus() { for { event := <-em.events @@ -83,7 +116,7 @@ func (em *Manager) eventBus() { } // Shutdown triggers, and waits for, the internal eventBus goroutine to finish -func (em *Manager) Shutdown() { +func (em *manager) Shutdown() { em.events <- Event{} em.closed = true // wait for eventBus to finish diff --git a/event/eventmanager_test.go b/event/eventmanager_test.go index 8828272..cf8e2fb 100644 --- a/event/eventmanager_test.go +++ b/event/eventmanager_test.go @@ -8,8 +8,7 @@ import ( // Most basic Manager Test, Initialize, Subscribe, Publish, Receive func TestEventManager(t *testing.T) { - eventManager := new(Manager) - eventManager.Initialize() + eventManager := NewEventManager() // We need to make this buffer at least 1, otherwise we will log an error! testChan := make(chan Event, 1) @@ -28,8 +27,7 @@ func TestEventManager(t *testing.T) { // Most basic Manager Test, Initialize, Subscribe, Publish, Receive func TestEventManagerOverflow(t *testing.T) { - eventManager := new(Manager) - eventManager.Initialize() + eventManager := NewEventManager() // Explicitly setting this to 0 log an error! testChan := make(chan Event) @@ -39,8 +37,7 @@ func TestEventManagerOverflow(t *testing.T) { func TestEventManagerMultiple(t *testing.T) { log.SetLevel(log.LevelDebug) - eventManager := new(Manager) - eventManager.Initialize() + eventManager := NewEventManager() groupEventQueue := NewEventQueue(10) peerEventQueue := NewEventQueue(10) diff --git a/event/eventmanageripc.go b/event/eventmanageripc.go new file mode 100644 index 0000000..dda3589 --- /dev/null +++ b/event/eventmanageripc.go @@ -0,0 +1,38 @@ +package event + +type ipcManager struct { + manager Manager + + onion string + ipcBridge IPCBridge +} + +// NewIPCEventManager returns an EvenetManager that also pipes events over and supplied IPCBridge +func NewIPCEventManager(bridge IPCBridge, onion string) Manager { + em := &ipcManager{onion: onion, ipcBridge: bridge, manager: NewEventManager()} + return em +} + +// IPCEventManagerFrom returns an IPCEventManger from the supplied manager and IPCBridge +func IPCEventManagerFrom(bridge IPCBridge, onion string, manager Manager) Manager { + em := &ipcManager{onion: onion, ipcBridge: bridge, manager: manager} + return em +} + +func (ipcm *ipcManager) Publish(ev Event) { + ipcm.manager.Publish(ev) + message := &IPCMessage{Dest: ipcm.onion, Message: ev} + ipcm.ipcBridge.Write(message) +} + +func (ipcm *ipcManager) PublishLocal(ev Event) { + ipcm.manager.Publish(ev) +} + +func (ipcm *ipcManager) Subscribe(eventType Type, eventChan chan Event) { + ipcm.manager.Subscribe(eventType, eventChan) +} + +func (ipcm *ipcManager) Shutdown() { + ipcm.manager.Shutdown() +} diff --git a/event/ipc.go b/event/ipc.go new file mode 100644 index 0000000..9357c05 --- /dev/null +++ b/event/ipc.go @@ -0,0 +1,71 @@ +package event + +import ( + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "sync" +) + +// IPCMessage is a wrapper for a regular eventMessage with a destination (onion|AppDest) so the other side of the bridge can route appropriately +type IPCMessage struct { + Dest string + Message Event +} + +type pipeBridge struct { + in chan IPCMessage + out chan IPCMessage + closedChan chan bool + closed bool + lock sync.Mutex +} + +// IPCBridge is an interface to a IPC construct used to communicate IPCMessages +type IPCBridge interface { + Read() (IPCMessage, bool) + Write(message *IPCMessage) + Shutdown() +} + +// MakePipeBridge returns a simple testing IPCBridge made from inprocess go channels +func MakePipeBridge() (b1, b2 IPCBridge) { + chan1 := make(chan IPCMessage) + chan2 := make(chan IPCMessage) + closed := make(chan bool) + + a := &pipeBridge{in: chan1, out: chan2, closedChan: closed, closed: false} + b := &pipeBridge{in: chan2, out: chan1, closedChan: closed, closed: false} + + go monitor(a, b) + + return a, b +} + +func monitor(a, b *pipeBridge) { + <-a.closedChan + a.closed = true + b.closed = true + a.closedChan <- true +} + +func (pb *pipeBridge) Read() (message IPCMessage, ok bool) { + message, ok = <-pb.in + return +} + +func (pb *pipeBridge) Write(message *IPCMessage) { + pb.lock.Lock() + defer pb.lock.Unlock() + log.Infof("pb.Write: %v\n", message) + if !pb.closed { + pb.out <- *message + } +} + +func (pb *pipeBridge) Shutdown() { + if !pb.closed { + close(pb.in) + close(pb.out) + pb.closedChan <- true + <-pb.closedChan + } +} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 05475d5..8a83dc3 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -25,13 +25,13 @@ type cwtchPeer struct { started bool queue *event.Queue - eventBus *event.Manager + eventBus event.Manager } // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to // directly implement a cwtchPeer. type CwtchPeer interface { - Init(connectivity.ACN, *event.Manager) + Init(connectivity.ACN, event.Manager) PeerWithOnion(string) *connections.PeerPeerConnection InviteOnionToGroup(string, string) error SendMessageToPeer(string, string) string @@ -83,7 +83,7 @@ func FromProfile(profile *model.Profile) CwtchPeer { } // Init instantiates a cwtchPeer -func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) { +func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus event.Manager) { cp.queue = event.NewEventQueue(100) go cp.eventHandler() diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 62728fe..02695c4 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -34,7 +34,7 @@ type engine struct { blocked sync.Map // Pointer to the Global Event Manager - eventManager *event.Manager + eventManager event.Manager // Required for listen(), inaccessible from identity privateKey ed25519.PrivateKey @@ -45,7 +45,7 @@ type engine struct { type Engine interface { Identity() identity.Identity ACN() connectivity.ACN - EventManager() *event.Manager + EventManager() event.Manager GetPeerHandler(string) *CwtchPeerHandler ContactRequest(string, string) string @@ -57,7 +57,7 @@ type Engine interface { } // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters -func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) Engine { +func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, blockedPeers []string) Engine { engine := new(engine) engine.identity = identity engine.privateKey = privateKey @@ -92,7 +92,7 @@ func (e *engine) Identity() identity.Identity { return e.identity } -func (e *engine) EventManager() *event.Manager { +func (e *engine) EventManager() event.Manager { return e.eventManager } @@ -281,7 +281,7 @@ func (cpi *CwtchPeerInstance) Init(rai *application.Instance, ra *application.Ri // CwtchPeerHandler encapsulates handling of incoming CwtchPackets type CwtchPeerHandler struct { Onion string - EventBus *event.Manager + EventBus event.Manager DataHandler func(string, []byte) []byte } diff --git a/protocol/connections/peerpeerconnection_test.go b/protocol/connections/peerpeerconnection_test.go index 735d0f9..8cd583f 100644 --- a/protocol/connections/peerpeerconnection_test.go +++ b/protocol/connections/peerpeerconnection_test.go @@ -61,8 +61,7 @@ func TestPeerPeerConnection(t *testing.T) { profile := model.GenerateNewProfile("alice") hostname := identity.Hostname() - manager := &event.Manager{} - manager.Initialize() + manager := event.NewEventManager() engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil) ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, engine) diff --git a/protocol/connections/peerserverconnection_test.go b/protocol/connections/peerserverconnection_test.go index 13b9fe6..cb0fb6c 100644 --- a/protocol/connections/peerserverconnection_test.go +++ b/protocol/connections/peerserverconnection_test.go @@ -76,8 +76,7 @@ func TestPeerServerConnection(t *testing.T) { <-listenChan onionAddr := identity.Hostname() - manager := &event.Manager{} - manager.Initialize() + manager := event.NewEventManager() engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil) psc := NewPeerServerConnection(engine, "127.0.0.1:5451|"+onionAddr) diff --git a/storage/profile_store.go b/storage/profile_store.go index 0247be1..7d9fe05 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -19,7 +19,7 @@ type profileStore struct { directory string password string profile *model.Profile - eventManager *event.Manager + eventManager event.Manager queue *event.Queue writer bool } @@ -33,7 +33,7 @@ type ProfileStore interface { // NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them // directory should be $appDir/profiles/$rand -func NewProfileWriterStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore { +func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore { os.Mkdir(directory, 0700) ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} ps.queue = event.NewEventQueue(100) @@ -56,13 +56,20 @@ func NewProfileWriterStore(eventManager *event.Manager, directory, password stri return ps } -// NewProfileReaderStore returns a profile store backed by a filestore +// ReadProfile reads a profile from storqage and returns the profile // directory should be $appDir/profiles/$rand -func NewProfileReaderStore(directory, password string, profile *model.Profile) ProfileStore { +func ReadProfile(directory, password string) (*model.Profile, error) { os.Mkdir(directory, 0700) - ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} + ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} - return ps + err := ps.Load() + if err != nil { + return nil, err + } + + profile := ps.GetProfileCopy() + + return profile, nil } // NewProfile creates a new profile for use in the profile store. diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index 3466b8b..dbb6479 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -17,8 +17,7 @@ const testMessage = "Hello from storage" func TestProfileStoreWriteRead(t *testing.T) { os.RemoveAll(testingDir) - eventBus := new(event.Manager) - eventBus.Initialize() + eventBus := event.NewEventManager() profile := NewProfile(testProfileName) ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index bdcbe44..2e9c841 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -2,6 +2,7 @@ package testing import ( app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" @@ -100,11 +101,23 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw return } +func waitGetPeer(app app2.Application, name string) peer.CwtchPeer { + for true { + for id, n := range app.ListPeers() { + if n == name { + return app.GetPeer(id) + } + } + time.Sleep(100 * time.Millisecond) + } + return nil +} + func TestCwtchPeerIntegration(t *testing.T) { - // Hide logging "noise" numGoRoutinesStart := runtime.NumGoroutine() log.AddEverythingFromPattern("connectivity") + //log.SetLevel(log.LevelDebug) acn, err := connectivity.StartTor(".", "") if err != nil { t.Fatalf("Could not start Tor: %v", err) @@ -137,29 +150,36 @@ func TestCwtchPeerIntegration(t *testing.T) { app := app2.NewApp(acn, "./storage") + bridge1, bridge2 := event.MakePipeBridge() + appClient := app2.NewAppClient(acn, "./storage", bridge1) + appService := app2.NewAppService(acn, "./storage", bridge2) + + numGoRoutinesPostAppStart := runtime.NumGoroutine() + // ***** cwtchPeer setup ***** - // It's important that each Peer have their own EventBus - /*aliceEventBus := new(event.Manager) - aliceEventBus.Initialize() - bobEventBus := new(event.Manager) - bobEventBus.Initialize() - carolEventBus := new(event.Manager) - carolEventBus.Initialize()*/ - fmt.Println("Creating Alice...") - alice, _ := app.CreatePeer("alice", "asdfasdf") - fmt.Println("Alice created:", alice.GetProfile().Onion) + app.CreatePeer("alice", "asdfasdf") fmt.Println("Creating Bob...") - bob, _ := app.CreatePeer("bob", "asdfasdf") - fmt.Println("Bob created:", bob.GetProfile().Onion) + app.CreatePeer("bob", "asdfasdf") fmt.Println("Creating Carol...") - carol, _ := app.CreatePeer("Carol", "asdfasdf") + appClient.CreatePeer("carol", "asdfasdf") + + alice := waitGetPeer(app, "alice") + fmt.Println("Alice created:", alice.GetProfile().Onion) + + bob := waitGetPeer(app, "bob") + fmt.Println("Bob created:", bob.GetProfile().Onion) + + carol := waitGetPeer(appClient, "carol") fmt.Println("Carol created:", carol.GetProfile().Onion) + //fmt.Println("Carol created:", carol.GetProfile().Onion) + app.LaunchPeers() + appClient.LaunchPeers() fmt.Println("Waiting for Alice, Bob, and Carol to connect with onion network...") time.Sleep(time.Second * 90) @@ -367,10 +387,19 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServerShutdown := runtime.NumGoroutine() fmt.Println("Shutting down Carol...") - app.ShutdownPeer(carol.GetProfile().Onion) + appClient.ShutdownPeer(carol.GetProfile().Onion) time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine() + fmt.Println("Shutting down apps...") + app.Shutdown() + appClient.Shutdown() + appService.Shutdown() + + bridge1.Shutdown() + bridge2.Shutdown() + numGoRoutinesPostAppShutdown := runtime.NumGoroutine() + fmt.Println("Shutting down ACN...") acn.Close() time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting @@ -380,10 +409,10 @@ func TestCwtchPeerIntegration(t *testing.T) { // Very useful if we are leaking any. pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ - "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostACN: %v\n", - numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, - numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol, numGoRoutinesPostACN) + fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ + "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n", + numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, + numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN) if numGoRoutinesStart != numGoRoutinesPostACN { t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN)