App Client/Service: new IPCBridge type and test gochan impl; new IPC using eventManager; new App Client and Service; some app api changes and a few more events (NewPeer) and errors (Loading errors)

This commit is contained in:
Dan Ballard 2019-06-05 13:40:55 -07:00
parent 5429cc6deb
commit 04dd8fa89c
20 changed files with 652 additions and 149 deletions

View File

@ -2,6 +2,7 @@ package app
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage"
@ -16,86 +17,114 @@ import (
"sync"
)
type applicationCore struct {
eventBuses map[string]event.Manager
acn connectivity.ACN
directory string
mutex sync.Mutex
}
type applicationPeers struct {
peers map[string]peer.CwtchPeer
}
type application struct {
peers map[string]peer.CwtchPeer
storage map[string]storage.ProfileStore
engines map[string]connections.Engine
eventBuses map[string]*event.Manager
acn connectivity.ACN
directory string
mutex sync.Mutex
primaryonion string
applicationCore
applicationPeers
storage map[string]storage.ProfileStore
engines map[string]connections.Engine
appBus event.Manager
}
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
type Application interface {
LoadProfiles(password string) error
CreatePeer(name string, password string) (peer.CwtchPeer, error)
PrimaryIdentity() peer.CwtchPeer
GetPeer(onion string) peer.CwtchPeer
ListPeers() map[string]string
GetEventBus(onion string) *event.Manager
LoadProfiles(password string)
CreatePeer(name string, password string)
LaunchPeers()
GetPrimaryBus() event.Manager
GetEventBus(onion string) event.Manager
ShutdownPeer(string)
Shutdown()
GetPeer(onion string) peer.CwtchPeer
ListPeers() map[string]string
}
// LoadProfileFn is the function signature for a function in an app that loads a profile
type LoadProfileFn func(profile *model.Profile, store storage.ProfileStore)
func newAppCore(acn connectivity.ACN, appDirectory string) *applicationCore {
appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory, acn: acn}
os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
return appCore
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory)
app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), eventBuses: make(map[string]*event.Manager), directory: appDirectory, acn: acn}
os.Mkdir(path.Join(app.directory, "profiles"), 0700)
app := &application{storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(acn, appDirectory), appBus: event.NewEventManager()}
return app
}
// NewProfile creates a new cwtchPeer with a given name.
func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) {
// CreatePeer creates a new Peer with a given name and core required accessories (eventbus)
func (ac *applicationCore) CreatePeer(name string, password string) (*model.Profile, error) {
log.Debugf("CreatePeer(%v)\n", name)
eventBus := new(event.Manager)
eventBus.Initialize()
profile := storage.NewProfile(name)
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile)
pc := profileStore.GetProfileCopy()
p := peer.FromProfile(pc)
_, exists := app.peers[p.GetProfile().Onion]
ac.mutex.Lock()
defer ac.mutex.Unlock()
_, exists := ac.eventBuses[profile.Onion]
if exists {
profileStore.Shutdown()
eventBus.Shutdown()
return nil, fmt.Errorf("Error: profile for onion %v already exists", p.GetProfile().Onion)
return nil, fmt.Errorf("Error: profile for onion %v already exists", profile.Onion)
}
p.Init(app.acn, eventBus)
eventBus := event.NewEventManager()
ac.eventBuses[profile.Onion] = eventBus
return profile, nil
}
// CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine)
func (app *application) CreatePeer(name string, password string) {
profile, err := app.applicationCore.CreatePeer(name, password)
if err != nil {
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
return
}
profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
app.storage[profile.Onion] = profileStore
pc := app.storage[profile.Onion].GetProfileCopy()
peer := peer.FromProfile(pc)
peer.Init(app.acn, app.eventBuses[profile.Onion])
blockedPeers := profile.BlockedPeers()
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], blockedPeers)
app.mutex.Lock()
app.peers[p.GetProfile().Onion] = p
app.storage[p.GetProfile().Onion] = profileStore
app.engines[p.GetProfile().Onion] = engine
app.eventBuses[p.GetProfile().Onion] = eventBus
app.mutex.Unlock()
app.peers[profile.Onion] = peer
app.engines[profile.Onion] = engine
return p, nil
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
}
func (app *application) LoadProfiles(password string) error {
files, err := ioutil.ReadDir(path.Join(app.directory, "profiles"))
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfileFn) error {
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
if err != nil {
return fmt.Errorf("Error: cannot read profiles directory: %v", err)
}
for _, file := range files {
eventBus := new(event.Manager)
eventBus.Initialize()
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil)
eventBus := event.NewEventManager()
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password, nil)
err = profileStore.Load()
if err != nil {
continue
@ -103,7 +132,7 @@ func (app *application) LoadProfiles(password string) error {
profile := profileStore.GetProfileCopy()
_, exists := app.peers[profile.Onion]
_, exists := ac.eventBuses[profile.Onion]
if exists {
profileStore.Shutdown()
eventBus.Shutdown()
@ -111,29 +140,41 @@ func (app *application) LoadProfiles(password string) error {
continue
}
peer := peer.FromProfile(profile)
peer.Init(app.acn, eventBus)
ac.mutex.Lock()
ac.eventBuses[profile.Onion] = eventBus
ac.mutex.Unlock()
blockedPeers := profile.BlockedPeers()
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers)
app.mutex.Lock()
app.peers[profile.Onion] = peer
app.storage[profile.Onion] = profileStore
app.engines[profile.Onion] = engine
app.eventBuses[profile.Onion] = eventBus
if app.primaryonion == "" {
app.primaryonion = profile.Onion
}
app.mutex.Unlock()
loadProfileFn(profile, profileStore)
}
return nil
}
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) {
app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
peer := peer.FromProfile(profile)
peer.Init(app.acn, app.eventBuses[profile.Onion])
blockedPeers := profile.BlockedPeers()
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], blockedPeers)
app.mutex.Lock()
app.peers[profile.Onion] = peer
app.storage[profile.Onion] = profileStore
app.engines[profile.Onion] = engine
app.mutex.Unlock()
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
})
}
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
func (app *application) GetPrimaryBus() event.Manager {
return app.appBus
}
// LaunchPeers starts each peer Listening and connecting to peers and groups
func (app *application) LaunchPeers() {
for _, p := range app.peers {
func (appPeers *applicationPeers) LaunchPeers() {
for _, p := range appPeers.peers {
if !p.IsStarted() {
p.Listen()
p.StartPeersConnections()
@ -143,30 +184,25 @@ func (app *application) LaunchPeers() {
}
// ListPeers returns a map of onions to their profile's Name
func (app *application) ListPeers() map[string]string {
func (appPeers *applicationPeers) ListPeers() map[string]string {
keys := map[string]string{}
for k, p := range app.peers {
for k, p := range appPeers.peers {
keys[k] = p.GetProfile().Name
}
return keys
}
// PrimaryIdentity returns a cwtchPeer for a given onion address
func (app *application) PrimaryIdentity() peer.CwtchPeer {
return app.peers[app.primaryonion]
}
// GetPeer returns a cwtchPeer for a given onion address
func (app *application) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := app.peers[onion]; ok {
func (appPeers *applicationPeers) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := appPeers.peers[onion]; ok {
return peer
}
return nil
}
// GetEventBus returns a cwtchPeer's event bus
func (app *application) GetEventBus(onion string) *event.Manager {
if manager, ok := app.eventBuses[onion]; ok {
func (ac *applicationCore) GetEventBus(onion string) event.Manager {
if manager, ok := ac.eventBuses[onion]; ok {
return manager
}
return nil
@ -194,4 +230,5 @@ func (app *application) Shutdown() {
app.storage[id].Shutdown()
app.eventBuses[id].Shutdown()
}
app.appBus.Shutdown()
}

34
app/appBridge.go Normal file
View File

@ -0,0 +1,34 @@
package app
import "cwtch.im/cwtch/event"
import "git.openprivacy.ca/openprivacy/libricochet-go/log"
type applicationBridge struct {
applicationCore
bridge event.IPCBridge
handle func(*event.Event)
}
func (ab *applicationBridge) listen() {
log.Infoln("ab.listen()")
for {
ipcMessage, ok := ab.bridge.Read()
log.Infof("listen() got %v\n", ipcMessage)
if !ok {
return
}
if ipcMessage.Dest == DestApp {
ab.handle(&ipcMessage.Message)
} else {
if eventBus, exists := ab.eventBuses[ipcMessage.Dest]; exists {
eventBus.PublishLocal(ipcMessage.Message)
}
}
}
}
func (ab *applicationBridge) Shutdown() {
ab.bridge.Shutdown()
}

107
app/appClient.go Normal file
View File

@ -0,0 +1,107 @@
package app
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"path"
)
type applicationClient struct {
applicationBridge
applicationPeers
appBus event.Manager
}
// NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied
func NewAppClient(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) Application {
appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}, appBus: event.NewEventManager()}
appClient.handle = appClient.handleEvent
go appClient.listen()
return appClient
}
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
func (ac *applicationClient) GetPrimaryBus() event.Manager {
return ac.appBus
}
func (ac *applicationClient) handleEvent(ev *event.Event) {
switch ev.EventType {
case event.NewPeer:
localID := ev.Data[event.Identity]
password := ev.Data[event.Password]
ac.newPeer(localID, password)
case event.PeerError:
ac.appBus.Publish(*ev)
case event.AppError:
ac.appBus.Publish(*ev)
}
}
func (ac *applicationClient) newPeer(localID, password string) {
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password)
if err != nil {
log.Errorf("Could not read profile for NewPeer event: %v\n", err)
ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("Could not read profile for NewPeer event: %v\n", err)))
return
}
_, exists := ac.peers[profile.Onion]
if exists {
log.Errorf("profile for onion %v already exists", profile.Onion)
ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("profile for onion %v already exists", profile.Onion)))
return
}
eventBus := event.NewIPCEventManager(ac.bridge, profile.Onion)
peer := peer.FromProfile(profile)
peer.Init(ac.acn, eventBus)
ac.mutex.Lock()
defer ac.mutex.Unlock()
ac.peers[profile.Onion] = peer
ac.eventBuses[profile.Onion] = eventBus
ac.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
}
// CreatePeer messages the service to create a new Peer with the given name
func (ac *applicationClient) CreatePeer(name string, password string) {
log.Infof("appClient CreatePeer %v\n", name)
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.CreatePeer, map[event.Field]string{event.ProfileName: name, event.Password: password})}
ac.bridge.Write(&message)
}
// LoadProfiles messages the service to load any profiles for the given password
func (ac *applicationClient) LoadProfiles(password string) {
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.LoadProfiles, map[event.Field]string{event.Password: password})}
ac.bridge.Write(&message)
}
// ShutdownPeer shuts down a peer and removes it from the app's management
func (ac *applicationClient) ShutdownPeer(onion string) {
ac.mutex.Lock()
defer ac.mutex.Unlock()
ac.eventBuses[onion].Shutdown()
delete(ac.eventBuses, onion)
ac.peers[onion].Shutdown()
delete(ac.peers, onion)
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.ShutdownPeer, map[event.Field]string{event.Identity: onion})}
ac.bridge.Write(&message)
}
// Shutdown shuts down the application lcienr and all front end peer components
func (ac *applicationClient) Shutdown() {
for id := range ac.peers {
ac.ShutdownPeer(id)
}
ac.applicationBridge.Shutdown()
ac.appBus.Shutdown()
}

110
app/appService.go Normal file
View File

@ -0,0 +1,110 @@
package app
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"path"
)
const (
// DestApp should be used as a destination for IPC messages that are for the application itself an not a peer
DestApp = "app"
)
type applicationService struct {
applicationBridge
storage map[string]storage.ProfileStore
engines map[string]connections.Engine
}
// ApplicationService is the back end of an application that manages engines and writing storage and communicates to an ApplicationClient by an IPCBridge
type ApplicationService interface {
Shutdown()
}
// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}}
appService.handle = appService.handleEvent
// Set up IPC
// attach to listener
go appService.listen()
return appService
}
func (as *applicationService) handleEvent(ev *event.Event) {
log.Infof("app Service handleEvent %v\n", ev.EventType)
switch ev.EventType {
case event.CreatePeer:
profileName := ev.Data[event.ProfileName]
password := ev.Data[event.Password]
as.createPeer(profileName, password)
case event.LoadProfiles:
password := ev.Data[event.Password]
as.loadProfiles(password)
}
}
func (as *applicationService) createPeer(name, password string) {
log.Infof("app Service create peer %v %v\n", name, password)
profile, err := as.applicationCore.CreatePeer(name, password)
as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion])
if err != nil {
log.Errorf("Could not create Peer: %v\n", err)
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.PeerError, event.Error, err.Error())}
as.bridge.Write(&message)
return
}
profileStore := storage.NewProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile)
blockedPeers := profile.BlockedPeers()
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers)
as.storage[profile.Onion] = profileStore
as.engines[profile.Onion] = engine
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})}
as.bridge.Write(&message)
}
func (as *applicationService) loadProfiles(password string) {
count := 0
as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
blockedPeers := profile.BlockedPeers()
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers)
as.mutex.Lock()
as.storage[profile.Onion] = profileStore
as.engines[profile.Onion] = engine
as.mutex.Unlock()
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})}
as.bridge.Write(&message)
count++
})
if count == 0 {
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)}
as.bridge.Write(&message)
}
}
// Shutdown shuts down the application Service and all peer related backend parts
func (as *applicationService) Shutdown() {
for id := range as.engines {
as.engines[id].Shutdown()
as.storage[id].Shutdown()
as.eventBuses[id].Shutdown()
}
}

View File

@ -47,7 +47,7 @@ func main() {
}
botPeer := peer.NewCwtchPeer("servermon")
botPeer.Init(acn, new(event.Manager))
botPeer.Init(acn, event.NewEventManager())
fmt.Printf("Connecting to %v...\n", serverAddr)
botPeer.JoinServer(serverAddr)

View File

@ -2,6 +2,7 @@ package main
import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
peer2 "cwtch.im/cwtch/peer"
"bytes"
@ -207,6 +208,32 @@ func completer(d prompt.Document) []prompt.Suggest {
return s
}
func handleAppEvents(em event.Manager) {
queue := event.NewEventQueue(100)
em.Subscribe(event.NewPeer, queue.EventChannel)
em.Subscribe(event.PeerError, queue.EventChannel)
for {
ev := queue.Next()
switch ev.EventType {
case event.NewPeer:
onion := ev.Data[event.Identity]
p := app.GetPeer(onion)
app.LaunchPeers()
fmt.Printf("\nLoaded profile %v (%v)\n", p.GetProfile().Name, p.GetProfile().Onion)
suggestions = append(suggestionsBase, suggestionsSelectedProfile...)
profiles := app.ListPeers()
fmt.Printf("\n%v profiles active now\n", len(profiles))
fmt.Printf("You should run `select-profile` to use a profile or `list-profiles` to view loaded profiles\n")
case event.PeerError:
err := ev.Data[event.Error]
fmt.Printf("\nError creating profile: %v\n", err)
}
}
}
func main() {
cwtch :=
@ -260,6 +287,7 @@ func main() {
os.Exit(1)
}
app = app2.NewApp(acn, path.Join(usr.HomeDir, ".cwtch"))
go handleAppEvents(app.GetPrimaryBus())
if err != nil {
log.Errorf("Error initializing application: %v", err)
os.Exit(1)
@ -340,17 +368,7 @@ func main() {
if failcount >= 3 {
fmt.Printf("Error creating profile for %v: Your password entries must match!\n", name)
} else {
p, err := app.CreatePeer(name, password)
app.LaunchPeers()
if err == nil {
stopGroupFollow()
fmt.Printf("\nNew profile created for %v\n", name)
peer = p
suggestions = append(suggestionsBase, suggestionsSelectedProfile...)
} else {
fmt.Printf("\nError creating profile for %v: %v\n", name, err)
}
app.CreatePeer(name, password)
}
} else {
fmt.Printf("Error creating New Profile, usage: %s\n", usages[commands[0]])
@ -364,12 +382,10 @@ func main() {
continue
}
err = app.LoadProfiles(string(bytePassword))
app.LoadProfiles(string(bytePassword))
if err == nil {
app.LaunchPeers()
profiles := app.ListPeers()
fmt.Printf("\n%v profiles active now\n", len(profiles))
fmt.Printf("You should run `select-profile` to use a profile or `list-profiles` to view loaded profiles\n")
} else {
fmt.Printf("\nError loading profiles: %v\n", err)
}

View File

@ -22,8 +22,7 @@ func main() {
}
// Setup the Event Bus to Listen for Data Packets
eventBus := new(event.Manager)
eventBus.Initialize()
eventBus := event.NewEventManager()
queue := event.NewEventQueue(100)
eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel)

View File

@ -22,8 +22,7 @@ func main() {
}
// Set up the Event Buss and Initialize the Peer
eventBus := new(event.Manager)
eventBus.Initialize()
eventBus := event.NewEventManager()
bob := peer.NewCwtchPeer("bob")
bob.Init(acn, eventBus)

View File

@ -33,10 +33,10 @@ const (
EncryptedGroupMessage = Type("EncryptedGroupMessage")
NewMessageFromGroup = Type("NewMessageFromGroup")
// an error was encountered trying to send a particular message to a group
// an error was encountered trying to send a particular Message to a group
// attributes:
// GroupServer: The server the message was sent to
// Signature: The signature of the message that failed to send
// GroupServer: The server the Message was sent to
// Signature: The signature of the Message that failed to send
// Error: string describing the error
SendMessageToGroupError = Type("SendMessageToGroupError")
@ -92,6 +92,29 @@ const (
// GroupServer
// ConnectionState
ServerStateChange = Type("GroupStateChange")
/***** Application client / service messages *****/
// ProfileName, Password
CreatePeer = Type("CreatePeer")
// service -> client: Identity(localId), Password
// app -> Identity(onion)
NewPeer = Type("NewPeer")
// Password
LoadProfiles = Type("LoadProfiles")
// Identity(onion)
ShutdownPeer = Type("ShutdownPeer")
Shutdown = Type("Shutdown")
// Error(err)
PeerError = Type("PeerError")
// Error(err)
AppError = Type("AppError")
)
// Field defines common event attributes
@ -113,6 +136,7 @@ const (
GroupInvite = Field("GroupInvite")
ProfileName = Field("ProfileName")
Password = Field("Password")
ConnectionState = Field("ConnectionState")
@ -121,3 +145,8 @@ const (
Error = Field("Error")
)
// Defining Common errors
const (
AppErrLoaded0 = "Loaded 0 profiles"
)

View File

@ -18,8 +18,21 @@ func NewEvent(eventType Type, data map[Field]string) Event {
return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
}
// NewEventList creates a new event object with a unique ID and the given type and data supplied in a list format and composed into a map of Type:string
func NewEventList(eventType Type, args ...interface{}) Event {
data := map[Field]string{}
for i := 0; i < len(args); i += 2 {
key, kok := args[i].(Field)
val, vok := args[i+1].(string)
if kok && vok {
data[key] = val
}
}
return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
}
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
type Manager struct {
type manager struct {
subscribers map[Type][]chan Event
events chan Event
mapMutex sync.Mutex
@ -27,8 +40,23 @@ type Manager struct {
closed bool
}
// Manager is an interface for an event bus
type Manager interface {
Subscribe(Type, chan Event)
Publish(Event)
PublishLocal(Event)
Shutdown()
}
// NewEventManager returns an initialized EventManager
func NewEventManager() Manager {
em := &manager{}
em.initialize()
return em
}
// Initialize sets up the Manager.
func (em *Manager) Initialize() {
func (em *manager) initialize() {
em.subscribers = make(map[Type][]chan Event)
em.events = make(chan Event)
em.internal = make(chan bool)
@ -38,21 +66,26 @@ func (em *Manager) Initialize() {
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
// will be sent to the eventChannel.
func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) {
func (em *manager) Subscribe(eventType Type, eventChannel chan Event) {
em.mapMutex.Lock()
defer em.mapMutex.Unlock()
em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel)
}
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *Manager) Publish(event Event) {
func (em *manager) Publish(event Event) {
if event.EventType != "" && em.closed != true {
em.events <- event
}
}
// Publish an event only locally, not going over an IPC bridge if there is one
func (em *manager) PublishLocal(event Event) {
em.Publish(event)
}
// eventBus is an internal function that is used to distribute events to all subscribers
func (em *Manager) eventBus() {
func (em *manager) eventBus() {
for {
event := <-em.events
@ -83,7 +116,7 @@ func (em *Manager) eventBus() {
}
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
func (em *Manager) Shutdown() {
func (em *manager) Shutdown() {
em.events <- Event{}
em.closed = true
// wait for eventBus to finish

View File

@ -8,8 +8,7 @@ import (
// Most basic Manager Test, Initialize, Subscribe, Publish, Receive
func TestEventManager(t *testing.T) {
eventManager := new(Manager)
eventManager.Initialize()
eventManager := NewEventManager()
// We need to make this buffer at least 1, otherwise we will log an error!
testChan := make(chan Event, 1)
@ -28,8 +27,7 @@ func TestEventManager(t *testing.T) {
// Most basic Manager Test, Initialize, Subscribe, Publish, Receive
func TestEventManagerOverflow(t *testing.T) {
eventManager := new(Manager)
eventManager.Initialize()
eventManager := NewEventManager()
// Explicitly setting this to 0 log an error!
testChan := make(chan Event)
@ -39,8 +37,7 @@ func TestEventManagerOverflow(t *testing.T) {
func TestEventManagerMultiple(t *testing.T) {
log.SetLevel(log.LevelDebug)
eventManager := new(Manager)
eventManager.Initialize()
eventManager := NewEventManager()
groupEventQueue := NewEventQueue(10)
peerEventQueue := NewEventQueue(10)

38
event/eventmanageripc.go Normal file
View File

@ -0,0 +1,38 @@
package event
type ipcManager struct {
manager Manager
onion string
ipcBridge IPCBridge
}
// NewIPCEventManager returns an EvenetManager that also pipes events over and supplied IPCBridge
func NewIPCEventManager(bridge IPCBridge, onion string) Manager {
em := &ipcManager{onion: onion, ipcBridge: bridge, manager: NewEventManager()}
return em
}
// IPCEventManagerFrom returns an IPCEventManger from the supplied manager and IPCBridge
func IPCEventManagerFrom(bridge IPCBridge, onion string, manager Manager) Manager {
em := &ipcManager{onion: onion, ipcBridge: bridge, manager: manager}
return em
}
func (ipcm *ipcManager) Publish(ev Event) {
ipcm.manager.Publish(ev)
message := &IPCMessage{Dest: ipcm.onion, Message: ev}
ipcm.ipcBridge.Write(message)
}
func (ipcm *ipcManager) PublishLocal(ev Event) {
ipcm.manager.Publish(ev)
}
func (ipcm *ipcManager) Subscribe(eventType Type, eventChan chan Event) {
ipcm.manager.Subscribe(eventType, eventChan)
}
func (ipcm *ipcManager) Shutdown() {
ipcm.manager.Shutdown()
}

71
event/ipc.go Normal file
View File

@ -0,0 +1,71 @@
package event
import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"sync"
)
// IPCMessage is a wrapper for a regular eventMessage with a destination (onion|AppDest) so the other side of the bridge can route appropriately
type IPCMessage struct {
Dest string
Message Event
}
type pipeBridge struct {
in chan IPCMessage
out chan IPCMessage
closedChan chan bool
closed bool
lock sync.Mutex
}
// IPCBridge is an interface to a IPC construct used to communicate IPCMessages
type IPCBridge interface {
Read() (IPCMessage, bool)
Write(message *IPCMessage)
Shutdown()
}
// MakePipeBridge returns a simple testing IPCBridge made from inprocess go channels
func MakePipeBridge() (b1, b2 IPCBridge) {
chan1 := make(chan IPCMessage)
chan2 := make(chan IPCMessage)
closed := make(chan bool)
a := &pipeBridge{in: chan1, out: chan2, closedChan: closed, closed: false}
b := &pipeBridge{in: chan2, out: chan1, closedChan: closed, closed: false}
go monitor(a, b)
return a, b
}
func monitor(a, b *pipeBridge) {
<-a.closedChan
a.closed = true
b.closed = true
a.closedChan <- true
}
func (pb *pipeBridge) Read() (message IPCMessage, ok bool) {
message, ok = <-pb.in
return
}
func (pb *pipeBridge) Write(message *IPCMessage) {
pb.lock.Lock()
defer pb.lock.Unlock()
log.Infof("pb.Write: %v\n", message)
if !pb.closed {
pb.out <- *message
}
}
func (pb *pipeBridge) Shutdown() {
if !pb.closed {
close(pb.in)
close(pb.out)
pb.closedChan <- true
<-pb.closedChan
}
}

View File

@ -25,13 +25,13 @@ type cwtchPeer struct {
started bool
queue *event.Queue
eventBus *event.Manager
eventBus event.Manager
}
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
// directly implement a cwtchPeer.
type CwtchPeer interface {
Init(connectivity.ACN, *event.Manager)
Init(connectivity.ACN, event.Manager)
PeerWithOnion(string) *connections.PeerPeerConnection
InviteOnionToGroup(string, string) error
SendMessageToPeer(string, string) string
@ -83,7 +83,7 @@ func FromProfile(profile *model.Profile) CwtchPeer {
}
// Init instantiates a cwtchPeer
func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) {
func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus event.Manager) {
cp.queue = event.NewEventQueue(100)
go cp.eventHandler()

View File

@ -34,7 +34,7 @@ type engine struct {
blocked sync.Map
// Pointer to the Global Event Manager
eventManager *event.Manager
eventManager event.Manager
// Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey
@ -45,7 +45,7 @@ type engine struct {
type Engine interface {
Identity() identity.Identity
ACN() connectivity.ACN
EventManager() *event.Manager
EventManager() event.Manager
GetPeerHandler(string) *CwtchPeerHandler
ContactRequest(string, string) string
@ -57,7 +57,7 @@ type Engine interface {
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) Engine {
func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, blockedPeers []string) Engine {
engine := new(engine)
engine.identity = identity
engine.privateKey = privateKey
@ -92,7 +92,7 @@ func (e *engine) Identity() identity.Identity {
return e.identity
}
func (e *engine) EventManager() *event.Manager {
func (e *engine) EventManager() event.Manager {
return e.eventManager
}
@ -281,7 +281,7 @@ func (cpi *CwtchPeerInstance) Init(rai *application.Instance, ra *application.Ri
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
type CwtchPeerHandler struct {
Onion string
EventBus *event.Manager
EventBus event.Manager
DataHandler func(string, []byte) []byte
}

View File

@ -61,8 +61,7 @@ func TestPeerPeerConnection(t *testing.T) {
profile := model.GenerateNewProfile("alice")
hostname := identity.Hostname()
manager := &event.Manager{}
manager.Initialize()
manager := event.NewEventManager()
engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil)
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, engine)

View File

@ -76,8 +76,7 @@ func TestPeerServerConnection(t *testing.T) {
<-listenChan
onionAddr := identity.Hostname()
manager := &event.Manager{}
manager.Initialize()
manager := event.NewEventManager()
engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil)
psc := NewPeerServerConnection(engine, "127.0.0.1:5451|"+onionAddr)

View File

@ -19,7 +19,7 @@ type profileStore struct {
directory string
password string
profile *model.Profile
eventManager *event.Manager
eventManager event.Manager
queue *event.Queue
writer bool
}
@ -33,7 +33,7 @@ type ProfileStore interface {
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func NewProfileWriterStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore {
func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
ps.queue = event.NewEventQueue(100)
@ -56,13 +56,20 @@ func NewProfileWriterStore(eventManager *event.Manager, directory, password stri
return ps
}
// NewProfileReaderStore returns a profile store backed by a filestore
// ReadProfile reads a profile from storqage and returns the profile
// directory should be $appDir/profiles/$rand
func NewProfileReaderStore(directory, password string, profile *model.Profile) ProfileStore {
func ReadProfile(directory, password string) (*model.Profile, error) {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
return ps
err := ps.Load()
if err != nil {
return nil, err
}
profile := ps.GetProfileCopy()
return profile, nil
}
// NewProfile creates a new profile for use in the profile store.

View File

@ -17,8 +17,7 @@ const testMessage = "Hello from storage"
func TestProfileStoreWriteRead(t *testing.T) {
os.RemoveAll(testingDir)
eventBus := new(event.Manager)
eventBus.Initialize()
eventBus := event.NewEventManager()
profile := NewProfile(testProfileName)
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)

View File

@ -2,6 +2,7 @@ package testing
import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
@ -100,11 +101,23 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
return
}
func waitGetPeer(app app2.Application, name string) peer.CwtchPeer {
for true {
for id, n := range app.ListPeers() {
if n == name {
return app.GetPeer(id)
}
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
func TestCwtchPeerIntegration(t *testing.T) {
// Hide logging "noise"
numGoRoutinesStart := runtime.NumGoroutine()
log.AddEverythingFromPattern("connectivity")
//log.SetLevel(log.LevelDebug)
acn, err := connectivity.StartTor(".", "")
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
@ -137,29 +150,36 @@ func TestCwtchPeerIntegration(t *testing.T) {
app := app2.NewApp(acn, "./storage")
bridge1, bridge2 := event.MakePipeBridge()
appClient := app2.NewAppClient(acn, "./storage", bridge1)
appService := app2.NewAppService(acn, "./storage", bridge2)
numGoRoutinesPostAppStart := runtime.NumGoroutine()
// ***** cwtchPeer setup *****
// It's important that each Peer have their own EventBus
/*aliceEventBus := new(event.Manager)
aliceEventBus.Initialize()
bobEventBus := new(event.Manager)
bobEventBus.Initialize()
carolEventBus := new(event.Manager)
carolEventBus.Initialize()*/
fmt.Println("Creating Alice...")
alice, _ := app.CreatePeer("alice", "asdfasdf")
fmt.Println("Alice created:", alice.GetProfile().Onion)
app.CreatePeer("alice", "asdfasdf")
fmt.Println("Creating Bob...")
bob, _ := app.CreatePeer("bob", "asdfasdf")
fmt.Println("Bob created:", bob.GetProfile().Onion)
app.CreatePeer("bob", "asdfasdf")
fmt.Println("Creating Carol...")
carol, _ := app.CreatePeer("Carol", "asdfasdf")
appClient.CreatePeer("carol", "asdfasdf")
alice := waitGetPeer(app, "alice")
fmt.Println("Alice created:", alice.GetProfile().Onion)
bob := waitGetPeer(app, "bob")
fmt.Println("Bob created:", bob.GetProfile().Onion)
carol := waitGetPeer(appClient, "carol")
fmt.Println("Carol created:", carol.GetProfile().Onion)
//fmt.Println("Carol created:", carol.GetProfile().Onion)
app.LaunchPeers()
appClient.LaunchPeers()
fmt.Println("Waiting for Alice, Bob, and Carol to connect with onion network...")
time.Sleep(time.Second * 90)
@ -367,10 +387,19 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
fmt.Println("Shutting down Carol...")
app.ShutdownPeer(carol.GetProfile().Onion)
appClient.ShutdownPeer(carol.GetProfile().Onion)
time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine()
fmt.Println("Shutting down apps...")
app.Shutdown()
appClient.Shutdown()
appService.Shutdown()
bridge1.Shutdown()
bridge2.Shutdown()
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
fmt.Println("Shutting down ACN...")
acn.Close()
time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting
@ -380,10 +409,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostACN: %v\n",
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol, numGoRoutinesPostACN)
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n",
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN)
if numGoRoutinesStart != numGoRoutinesPostACN {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN)