From e2bba41a9ae79d2754ee00297eb68e27746e3035 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 3 Nov 2021 11:38:42 -0700 Subject: [PATCH] Remove IPC App Bridge --- app/appBridge.go | 39 -- app/appClient.go | 177 --------- app/appService.go | 209 ---------- event/bridge/goChanBridge.go | 57 --- event/bridge/infinite_chan.go | 72 ---- event/bridge/infinite_queue.go | 105 ------ event/bridge/pipeBridge-windows.go | 19 - event/bridge/pipeBridge.go | 357 ------------------ event/bridge/pipeBridge_test.go | 131 ------- event/eventmanager.go | 7 - event/eventmanageripc.go | 38 -- event/ipc.go | 14 - testing/cwtch_peer_server_integration_test.go | 23 +- 13 files changed, 3 insertions(+), 1245 deletions(-) delete mode 100644 app/appBridge.go delete mode 100644 app/appClient.go delete mode 100644 app/appService.go delete mode 100644 event/bridge/goChanBridge.go delete mode 100644 event/bridge/infinite_chan.go delete mode 100644 event/bridge/infinite_queue.go delete mode 100644 event/bridge/pipeBridge-windows.go delete mode 100644 event/bridge/pipeBridge.go delete mode 100644 event/bridge/pipeBridge_test.go delete mode 100644 event/eventmanageripc.go delete mode 100644 event/ipc.go diff --git a/app/appBridge.go b/app/appBridge.go deleted file mode 100644 index 2e661c6..0000000 --- a/app/appBridge.go +++ /dev/null @@ -1,39 +0,0 @@ -package app - -import "cwtch.im/cwtch/event" -import "git.openprivacy.ca/openprivacy/log" - -const ( - // DestApp should be used as a destination for IPC messages that are for the application itself an not a peer - DestApp = "app" -) - -type applicationBridge struct { - applicationCore - - bridge event.IPCBridge - handle func(*event.Event) -} - -func (ab *applicationBridge) listen() { - log.Infoln("ab.listen()") - for { - ipcMessage, ok := ab.bridge.Read() - log.Debugf("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest) - if !ok { - log.Debugln("exiting appBridge.listen()") - return - } - - if ipcMessage.Dest == DestApp { - ab.handle(&ipcMessage.Message) - } else { - if eventBus, exists := ab.eventBuses[ipcMessage.Dest]; exists { - eventBus.PublishLocal(ipcMessage.Message) - } - } - } -} - -func (ab *applicationBridge) Shutdown() { -} diff --git a/app/appClient.go b/app/appClient.go deleted file mode 100644 index 9dd5c1b..0000000 --- a/app/appClient.go +++ /dev/null @@ -1,177 +0,0 @@ -package app - -import ( - "cwtch.im/cwtch/app/plugins" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/peer" - "cwtch.im/cwtch/storage" - "fmt" - "git.openprivacy.ca/openprivacy/log" - "path" - "strconv" - "sync" -) - -type applicationClient struct { - applicationBridge - appletPeers - - appBus event.Manager - acmutex sync.Mutex -} - -// NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied -func NewAppClient(appDirectory string, bridge event.IPCBridge) Application { - appClient := &applicationClient{appletPeers: appletPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()} - appClient.handle = appClient.handleEvent - - go appClient.listen() - - appClient.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadClient)}) - - log.Infoln("Created new App Client") - return appClient -} - -// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific -func (ac *applicationClient) GetPrimaryBus() event.Manager { - return ac.appBus -} - -func (ac *applicationClient) handleEvent(ev *event.Event) { - switch ev.EventType { - case event.NewPeer: - localID := ev.Data[event.Identity] - key := ev.Data[event.Key] - salt := ev.Data[event.Salt] - reload := ev.Data[event.Status] == event.StorageRunning - created := ev.Data[event.Created] - ac.newPeer(localID, key, salt, reload, created) - case event.PeerDeleted: - onion := ev.Data[event.Identity] - ac.handleDeletedPeer(onion) - case event.PeerError: - ac.appBus.Publish(*ev) - case event.AppError: - ac.appBus.Publish(*ev) - case event.ACNStatus: - ac.appBus.Publish(*ev) - case event.ACNVersion: - ac.appBus.Publish(*ev) - case event.ReloadDone: - ac.appBus.Publish(*ev) - } -} - -func (ac *applicationClient) newPeer(localID, key, salt string, reload bool, created string) { - var keyBytes [32]byte - var saltBytes [128]byte - copy(keyBytes[:], key) - copy(saltBytes[:], salt) - profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), keyBytes, saltBytes) - if err != nil { - log.Errorf("Could not read profile for NewPeer event: %v\n", err) - ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("Could not read profile for NewPeer event: %v\n", err))) - return - } - - _, exists := ac.peers[profile.Onion] - if exists { - log.Errorf("profile for onion %v already exists", profile.Onion) - ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("profile for onion %v already exists", profile.Onion))) - return - } - - eventBus := event.NewIPCEventManager(ac.bridge, profile.Onion) - peer := peer.FromProfile(profile) - peer.Init(eventBus) - - ac.peerLock.Lock() - defer ac.peerLock.Unlock() - ac.peers[profile.Onion] = peer - ac.eventBuses[profile.Onion] = eventBus - npEvent := event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion, event.Created: created}) - if reload { - npEvent.Data[event.Status] = event.StorageRunning - } - ac.appBus.Publish(npEvent) - - if reload { - ac.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadPeer, event.Identity, profile.Onion)}) - } -} - -// CreatePeer messages the service to create a new Peer with the given name -func (ac *applicationClient) CreatePeer(name string, password string) { - ac.CreateTaggedPeer(name, password, "") -} - -func (ac *applicationClient) CreateTaggedPeer(name, password, tag string) { - log.Infof("appClient CreatePeer %v\n", name) - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.CreatePeer, map[event.Field]string{event.ProfileName: name, event.Password: password, event.Data: tag})} - ac.bridge.Write(&message) -} - -// DeletePeer messages the service to delete a peer -func (ac *applicationClient) DeletePeer(onion string, password string) { - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.DeletePeer, map[event.Field]string{event.Identity: onion, event.Password: password})} - ac.bridge.Write(&message) -} - -func (ac *applicationClient) ChangePeerPassword(onion, oldpass, newpass string) { - message := event.IPCMessage{Dest: onion, Message: event.NewEventList(event.ChangePassword, event.Password, oldpass, event.NewPassword, newpass)} - ac.bridge.Write(&message) -} - -func (ac *applicationClient) handleDeletedPeer(onion string) { - ac.acmutex.Lock() - defer ac.acmutex.Unlock() - ac.peers[onion].Shutdown() - delete(ac.peers, onion) - ac.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - - ac.applicationCore.DeletePeer(onion) - ac.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) -} - -func (ac *applicationClient) AddPeerPlugin(onion string, pluginID plugins.PluginID) { - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.AddPeerPlugin, map[event.Field]string{event.Identity: onion, event.Data: strconv.Itoa(int(pluginID))})} - ac.bridge.Write(&message) -} - -// LoadProfiles messages the service to load any profiles for the given password -func (ac *applicationClient) LoadProfiles(password string) { - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.LoadProfiles, map[event.Field]string{event.Password: password})} - ac.bridge.Write(&message) -} - -func (ac *applicationClient) QueryACNStatus() { - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.GetACNStatus, map[event.Field]string{})} - ac.bridge.Write(&message) -} - -func (ac *applicationClient) QueryACNVersion() { - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.GetACNVersion, map[event.Field]string{})} - ac.bridge.Write(&message) -} - -// ShutdownPeer shuts down a peer and removes it from the app's management -func (ac *applicationClient) ShutdownPeer(onion string) { - ac.acmutex.Lock() - defer ac.acmutex.Unlock() - ac.eventBuses[onion].Shutdown() - delete(ac.eventBuses, onion) - ac.peers[onion].Shutdown() - delete(ac.peers, onion) - message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.ShutdownPeer, map[event.Field]string{event.Identity: onion})} - ac.bridge.Write(&message) -} - -// Shutdown shuts down the application client and all front end peer components -func (ac *applicationClient) Shutdown() { - for id := range ac.peers { - ac.ShutdownPeer(id) - } - ac.applicationBridge.Shutdown() - ac.appBus.Shutdown() -} diff --git a/app/appService.go b/app/appService.go deleted file mode 100644 index 6744202..0000000 --- a/app/appService.go +++ /dev/null @@ -1,209 +0,0 @@ -package app - -import ( - "cwtch.im/cwtch/app/plugins" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/protocol/connections" - "cwtch.im/cwtch/storage" - "git.openprivacy.ca/cwtch.im/tapir/primitives" - "git.openprivacy.ca/openprivacy/connectivity" - "git.openprivacy.ca/openprivacy/log" - path "path/filepath" - "strconv" - "sync" -) - -type applicationService struct { - applicationBridge - appletACN - appletPlugins - - storage map[string]storage.ProfileStore - engines map[string]connections.Engine - asmutex sync.Mutex -} - -// ApplicationService is the back end of an application that manages engines and writing storage and communicates to an ApplicationClient by an IPCBridge -type ApplicationService interface { - Shutdown() -} - -// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge -func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService { - appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}} - - appService.appletACN.init(acn, appService.getACNStatusHandler()) - appService.handle = appService.handleEvent - - go appService.listen() - - log.Infoln("Created new App Service") - return appService -} - -func (as *applicationService) handleEvent(ev *event.Event) { - log.Infof("app Service handleEvent %v\n", ev.EventType) - switch ev.EventType { - case event.CreatePeer: - profileName := ev.Data[event.ProfileName] - password := ev.Data[event.Password] - tag := ev.Data[event.Data] - as.createPeer(profileName, password, tag) - case event.DeletePeer: - onion := ev.Data[event.Identity] - password := ev.Data[event.Password] - as.deletePeer(onion, password) - - message := event.IPCMessage{Dest: DestApp, Message: *ev} - as.bridge.Write(&message) - case event.AddPeerPlugin: - onion := ev.Data[event.Identity] - pluginID, _ := strconv.Atoi(ev.Data[event.Data]) - as.AddPlugin(onion, plugins.PluginID(pluginID), as.eventBuses[onion], as.acn) - case event.LoadProfiles: - password := ev.Data[event.Password] - as.loadProfiles(password) - case event.ReloadClient: - for _, storage := range as.storage { - peerMsg := *storage.GetNewPeerMessage() - peerMsg.Data[event.Status] = event.StorageRunning - peerMsg.Data[event.Created] = event.False - message := event.IPCMessage{Dest: DestApp, Message: peerMsg} - as.bridge.Write(&message) - } - - message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadDone)} - as.bridge.Write(&message) - case event.ReloadPeer: - onion := ev.Data[event.Identity] - events := as.storage[onion].GetStatusMessages() - - for _, ev := range events { - message := event.IPCMessage{Dest: onion, Message: *ev} - as.bridge.Write(&message) - } - case event.GetACNStatus: - prog, status := as.acn.GetBootstrapStatus() - as.getACNStatusHandler()(prog, status) - case event.GetACNVersion: - version := as.acn.GetVersion() - as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNVersion, event.Data, version)}) - case event.ShutdownPeer: - onion := ev.Data[event.Identity] - as.ShutdownPeer(onion) - } -} - -func (as *applicationService) createPeer(name, password, tag string) { - log.Infof("app Service create peer %v %v\n", name, password) - profile, err := as.applicationCore.CreatePeer(name) - as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion]) - if err != nil { - log.Errorf("Could not create Peer: %v\n", err) - message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.PeerError, event.Error, err.Error())} - as.bridge.Write(&message) - return - } - - profileStore := storage.CreateProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile) - - peerAuthorizations := profile.ContactsAuthorizations() - // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. - identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) - engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], peerAuthorizations) - - as.storage[profile.Onion] = profileStore - as.engines[profile.Onion] = engine - - peerMsg := *profileStore.GetNewPeerMessage() - peerMsg.Data[event.Created] = event.True - peerMsg.Data[event.Status] = event.StorageNew - message := event.IPCMessage{Dest: DestApp, Message: peerMsg} - as.bridge.Write(&message) -} - -func (as *applicationService) loadProfiles(password string) { - count := 0 - as.applicationCore.LoadProfiles(password, false, func(profile *model.Profile, profileStore storage.ProfileStore) { - as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion]) - - peerAuthorizations := profile.ContactsAuthorizations() - identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) - engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], peerAuthorizations) - as.asmutex.Lock() - as.storage[profile.Onion] = profileStore - as.engines[profile.Onion] = engine - as.asmutex.Unlock() - - peerMsg := *profileStore.GetNewPeerMessage() - peerMsg.Data[event.Created] = event.False - peerMsg.Data[event.Status] = event.StorageNew - message := event.IPCMessage{Dest: DestApp, Message: peerMsg} - as.bridge.Write(&message) - count++ - }) - if count == 0 { - message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)} - as.bridge.Write(&message) - } -} - -func (as *applicationService) getACNStatusHandler() func(int, string) { - return func(progress int, status string) { - progStr := strconv.Itoa(progress) - as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)}) - as.applicationCore.coremutex.Lock() - defer as.applicationCore.coremutex.Unlock() - for _, bus := range as.eventBuses { - bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) - } - } -} - -func (as *applicationService) deletePeer(onion, password string) { - as.asmutex.Lock() - defer as.asmutex.Unlock() - - if as.storage[onion].CheckPassword(password) { - as.appletPlugins.ShutdownPeer(onion) - as.plugins.Delete(onion) - - as.engines[onion].Shutdown() - delete(as.engines, onion) - - as.storage[onion].Shutdown() - as.storage[onion].Delete() - delete(as.storage, onion) - - as.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - - as.applicationCore.DeletePeer(onion) - log.Debugf("Delete peer for %v Done\n", onion) - - message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.PeerDeleted, event.Identity, onion)} - as.bridge.Write(&message) - return - } - message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)} - as.bridge.Write(&message) -} - -func (as *applicationService) ShutdownPeer(onion string) { - as.engines[onion].Shutdown() - delete(as.engines, onion) - as.storage[onion].Shutdown() - delete(as.storage, onion) - as.eventBuses[onion].Shutdown() - delete(as.eventBuses, onion) -} - -// Shutdown shuts down the application Service and all peer related backend parts -func (as *applicationService) Shutdown() { - log.Debugf("shutting down application service...") - as.appletPlugins.Shutdown() - for id := range as.engines { - log.Debugf("shutting down application service peer engine %v", id) - as.ShutdownPeer(id) - } -} diff --git a/event/bridge/goChanBridge.go b/event/bridge/goChanBridge.go deleted file mode 100644 index ce9f67e..0000000 --- a/event/bridge/goChanBridge.go +++ /dev/null @@ -1,57 +0,0 @@ -package bridge - -import ( - "cwtch.im/cwtch/event" - "sync" -) - -type goChanBridge struct { - in chan event.IPCMessage - out chan event.IPCMessage - closedChan chan bool - closed bool - lock sync.Mutex -} - -// MakeGoChanBridge returns a simple testing IPCBridge made from inprocess go channels -func MakeGoChanBridge() (b1, b2 event.IPCBridge) { - chan1 := make(chan event.IPCMessage) - chan2 := make(chan event.IPCMessage) - closed := make(chan bool) - - a := &goChanBridge{in: chan1, out: chan2, closedChan: closed, closed: false} - b := &goChanBridge{in: chan2, out: chan1, closedChan: closed, closed: false} - - go monitor(a, b) - - return a, b -} - -func monitor(a, b *goChanBridge) { - <-a.closedChan - a.closed = true - b.closed = true - a.closedChan <- true -} - -func (pb *goChanBridge) Read() (*event.IPCMessage, bool) { - message, ok := <-pb.in - return &message, ok -} - -func (pb *goChanBridge) Write(message *event.IPCMessage) { - pb.lock.Lock() - defer pb.lock.Unlock() - if !pb.closed { - pb.out <- *message - } -} - -func (pb *goChanBridge) Shutdown() { - if !pb.closed { - close(pb.in) - close(pb.out) - pb.closedChan <- true - <-pb.closedChan - } -} diff --git a/event/bridge/infinite_chan.go b/event/bridge/infinite_chan.go deleted file mode 100644 index 688d7f6..0000000 --- a/event/bridge/infinite_chan.go +++ /dev/null @@ -1,72 +0,0 @@ -package bridge - -/* Todo: When go generics ships, refactor this and event.infiniteChannel into one */ - -// InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output. -type InfiniteChannel struct { - input, output chan interface{} - length chan int - buffer *InfiniteQueue -} - -func newInfiniteChannel() *InfiniteChannel { - ch := &InfiniteChannel{ - input: make(chan interface{}), - output: make(chan interface{}), - length: make(chan int), - buffer: newInfiniteQueue(), - } - go ch.infiniteBuffer() - return ch -} - -// In returns the input channel -func (ch *InfiniteChannel) In() chan<- interface{} { - return ch.input -} - -// Out returns the output channel -func (ch *InfiniteChannel) Out() <-chan interface{} { - return ch.output -} - -// Len returns the length of items in queue -func (ch *InfiniteChannel) Len() int { - return <-ch.length -} - -// Close closes the InfiniteChanel -func (ch *InfiniteChannel) Close() { - close(ch.input) -} - -func (ch *InfiniteChannel) infiniteBuffer() { - var input, output chan interface{} - var next interface{} - input = ch.input - - for input != nil || output != nil { - select { - case elem, open := <-input: - if open { - ch.buffer.Add(elem) - } else { - input = nil - } - case output <- next: - ch.buffer.Remove() - case ch.length <- ch.buffer.Length(): - } - - if ch.buffer.Length() > 0 { - output = ch.output - next = ch.buffer.Peek() - } else { - output = nil - next = nil - } - } - - close(ch.output) - close(ch.length) -} diff --git a/event/bridge/infinite_queue.go b/event/bridge/infinite_queue.go deleted file mode 100644 index 5ce1289..0000000 --- a/event/bridge/infinite_queue.go +++ /dev/null @@ -1,105 +0,0 @@ -package bridge - -/* Todo: When go generics ships, refactor this and event.infinitQueue channel into one */ - -/* -Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. -Using this instead of other, simpler, queue implementations (slice+append or linked list) provides -substantial memory and time benefits, and fewer GC pauses. - -The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe. -*/ - -// minQueueLen is smallest capacity that queue may have. -// Must be power of 2 for bitwise modulus: x % n == x & (n - 1). -const minQueueLen = 16 - -// InfiniteQueue represents a single instance of the queue data structure. -type InfiniteQueue struct { - buf []interface{} - head, tail, count int -} - -// New constructs and returns a new Queue. -func newInfiniteQueue() *InfiniteQueue { - return &InfiniteQueue{ - buf: make([]interface{}, minQueueLen), - } -} - -// Length returns the number of elements currently stored in the queue. -func (q *InfiniteQueue) Length() int { - return q.count -} - -// resizes the queue to fit exactly twice its current contents -// this can result in shrinking if the queue is less than half-full -func (q *InfiniteQueue) resize() { - newBuf := make([]interface{}, q.count<<1) - - if q.tail > q.head { - copy(newBuf, q.buf[q.head:q.tail]) - } else { - n := copy(newBuf, q.buf[q.head:]) - copy(newBuf[n:], q.buf[:q.tail]) - } - - q.head = 0 - q.tail = q.count - q.buf = newBuf -} - -// Add puts an element on the end of the queue. -func (q *InfiniteQueue) Add(elem interface{}) { - if q.count == len(q.buf) { - q.resize() - } - - q.buf[q.tail] = elem - // bitwise modulus - q.tail = (q.tail + 1) & (len(q.buf) - 1) - q.count++ -} - -// Peek returns the element at the head of the queue. This call panics -// if the queue is empty. -func (q *InfiniteQueue) Peek() interface{} { - if q.count <= 0 { - panic("queue: Peek() called on empty queue") - } - return q.buf[q.head] -} - -// Get returns the element at index i in the queue. If the index is -// invalid, the call will panic. This method accepts both positive and -// negative index values. Index 0 refers to the first element, and -// index -1 refers to the last. -func (q *InfiniteQueue) Get(i int) interface{} { - // If indexing backwards, convert to positive index. - if i < 0 { - i += q.count - } - if i < 0 || i >= q.count { - panic("queue: Get() called with index out of range") - } - // bitwise modulus - return q.buf[(q.head+i)&(len(q.buf)-1)] -} - -// Remove removes and returns the element from the front of the queue. If the -// queue is empty, the call will panic. -func (q *InfiniteQueue) Remove() interface{} { - if q.count <= 0 { - panic("queue: Remove() called on empty queue") - } - ret := q.buf[q.head] - q.buf[q.head] = nil - // bitwise modulus - q.head = (q.head + 1) & (len(q.buf) - 1) - q.count-- - // Resize down if buffer 1/4 full. - if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) { - q.resize() - } - return ret -} diff --git a/event/bridge/pipeBridge-windows.go b/event/bridge/pipeBridge-windows.go deleted file mode 100644 index 1671ad9..0000000 --- a/event/bridge/pipeBridge-windows.go +++ /dev/null @@ -1,19 +0,0 @@ -// +build windows - -package bridge - -import ( - "cwtch.im/cwtch/event" - "log" -) - -func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge { - log.Fatal("Not supported on windows") - return nil -} - -// NewPipeBridgeService returns a pipe backed IPCBridge for a service -func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge { - log.Fatal("Not supported on windows") - return nil -} diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go deleted file mode 100644 index c7fdc9f..0000000 --- a/event/bridge/pipeBridge.go +++ /dev/null @@ -1,357 +0,0 @@ -// +build !windows - -package bridge - -import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/protocol/connections" - "encoding/base64" - "encoding/binary" - "encoding/json" - "git.openprivacy.ca/openprivacy/log" - "os" - "sync" - "syscall" - "time" -) - -/* pipeBridge creates a pair of named pipes - Needs a call to new client and service to fully successfully open -*/ - -const maxBufferSize = 1000 - -const serviceName = "service" -const clientName = "client" - -const syn = "SYN" -const synack = "SYNACK" -const ack = "ACK" - -type pipeBridge struct { - infile, outfile string - in, out *os.File - read chan event.IPCMessage - write *InfiniteChannel - closedChan chan bool - state connections.ConnectionState - lock sync.Mutex - threeShake func() bool - - // For logging / debugging purposes - name string -} - -func newPipeBridge(inFilename, outFilename string) *pipeBridge { - syscall.Mkfifo(inFilename, 0600) - syscall.Mkfifo(outFilename, 0600) - pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED} - pb.read = make(chan event.IPCMessage, maxBufferSize) - pb.write = newInfiniteChannel() //make(chan event.IPCMessage, maxBufferSize) - return pb -} - -// NewPipeBridgeClient returns a pipe backed IPCBridge for a client -func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge { - log.Debugf("Making new PipeBridge Client...\n") - pb := newPipeBridge(inFilename, outFilename) - pb.name = clientName - pb.threeShake = pb.threeShakeClient - go pb.connectionManager() - - return pb -} - -// NewPipeBridgeService returns a pipe backed IPCBridge for a service -func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge { - log.Debugf("Making new PipeBridge Service...\n") - pb := newPipeBridge(inFilename, outFilename) - pb.name = serviceName - pb.threeShake = pb.threeShakeService - - go pb.connectionManager() - - log.Debugf("Successfully created new PipeBridge Service!\n") - return pb -} - -func (pb *pipeBridge) setState(state connections.ConnectionState) { - pb.lock.Lock() - defer pb.lock.Unlock() - - pb.state = state -} - -func (pb *pipeBridge) getState() connections.ConnectionState { - pb.lock.Lock() - defer pb.lock.Unlock() - - return pb.state -} - -func (pb *pipeBridge) connectionManager() { - for pb.getState() != connections.KILLED { - log.Debugf("clientConnManager loop start init\n") - pb.setState(connections.CONNECTING) - - var err error - log.Debugf("%v open file infile\n", pb.name) - pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600) - if err != nil { - pb.setState(connections.DISCONNECTED) - continue - } - - log.Debugf("%v open file outfile\n", pb.name) - pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600) - if err != nil { - pb.setState(connections.DISCONNECTED) - continue - } - - log.Debugf("Successfully connected PipeBridge %v!\n", pb.name) - - pb.handleConns() - } - log.Debugf("exiting %v ConnectionManager\n", pb.name) - -} - -// threeShake performs a 3way handshake sync up -func (pb *pipeBridge) threeShakeService() bool { - synacked := false - - for { - resp, err := pb.readString() - if err != nil { - return false - } - - if string(resp) == syn { - if !synacked { - err = pb.writeString([]byte(synack)) - if err != nil { - return false - } - synacked = true - } - } else if string(resp) == ack { - return true - } - } -} - -func (pb *pipeBridge) synLoop(stop chan bool) { - delay := time.Duration(0) - for { - select { - case <-time.After(delay): - err := pb.writeString([]byte(syn)) - if err != nil { - return - } - delay = time.Second - case <-stop: - return - } - } -} - -func (pb *pipeBridge) threeShakeClient() bool { - stop := make(chan bool) - go pb.synLoop(stop) - for { - resp, err := pb.readString() - if err != nil { - return false - } - - if string(resp) == synack { - stop <- true - err := pb.writeString([]byte(ack)) - return err == nil - } - } -} - -func (pb *pipeBridge) handleConns() { - - if !pb.threeShake() { - pb.setState(connections.FAILED) - pb.closeReset() - return - } - - pb.setState(connections.AUTHENTICATED) - - pb.closedChan = make(chan bool, 5) - - log.Debugf("handleConns authed, %v 2xgo\n", pb.name) - - go pb.handleRead() - go pb.handleWrite() - - <-pb.closedChan - log.Debugf("handleConns <-closedChan (%v)\n", pb.name) - if pb.getState() != connections.KILLED { - pb.setState(connections.FAILED) - } - pb.closeReset() - log.Debugf("handleConns done for %v, exit\n", pb.name) -} - -func (pb *pipeBridge) closeReset() { - pb.in.Close() - pb.out.Close() - close(pb.read) - pb.write.Close() - - if pb.getState() != connections.KILLED { - pb.read = make(chan event.IPCMessage, maxBufferSize) - pb.write = newInfiniteChannel() - } -} - -func (pb *pipeBridge) handleWrite() { - log.Debugf("handleWrite() %v\n", pb.name) - defer log.Debugf("exiting handleWrite() %v\n", pb.name) - - for { - select { - case messageInf := <-pb.write.output: - if messageInf == nil { - pb.closedChan <- true - return - } - message := messageInf.(event.IPCMessage) - if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { - log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType) - } else { - log.Debugf("handleWrite <- message: %v\n", message) - } - if pb.getState() == connections.AUTHENTICATED { - encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}} - for k, v := range message.Message.Data { - encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v)) - } - - messageJSON, _ := json.Marshal(encMessage) - err := pb.writeString(messageJSON) - if err != nil { - pb.closedChan <- true - return - } - } else { - return - } - } - } -} - -func (pb *pipeBridge) handleRead() { - log.Debugf("handleRead() %v\n", pb.name) - defer log.Debugf("exiting handleRead() %v", pb.name) - - for { - log.Debugf("Waiting to handleRead()...\n") - - buffer, err := pb.readString() - if err != nil { - pb.closedChan <- true - return - } - - var message event.IPCMessage - err = json.Unmarshal(buffer, &message) - if err != nil { - log.Errorf("Read error: '%v', value: '%v'", err, buffer) - pb.closedChan <- true - return // probably new connection trying to initialize - } - for k, v := range message.Message.Data { - val, _ := base64.StdEncoding.DecodeString(v) - message.Message.Data[k] = string(val) - } - if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { - log.Debugf("handleRead read<-: %v %v ...\n", message.Dest, message.Message.EventType) - } else { - log.Debugf("handleRead read<-: %v\n", message) - } - pb.read <- message - log.Debugf("handleRead wrote\n") - } -} - -func (pb *pipeBridge) Read() (*event.IPCMessage, bool) { - log.Debugf("Read() %v...\n", pb.name) - var ok = false - var message event.IPCMessage - for !ok && pb.getState() != connections.KILLED { - message, ok = <-pb.read - if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { - log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType) - } else { - log.Debugf("Read %v: %v\n", pb.name, message) - } - } - return &message, pb.getState() != connections.KILLED -} - -func (pb *pipeBridge) Write(message *event.IPCMessage) { - if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { - log.Debugf("Write %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType) - } else { - log.Debugf("Write %v: %v\n", pb.name, message) - } - pb.write.input <- *message - log.Debugf("Wrote\n") -} - -func (pb *pipeBridge) Shutdown() { - log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.getState()]) - pb.state = connections.KILLED - pb.closedChan <- true - log.Debugf("Done Shutdown for %v\n", pb.name) -} - -func (pb *pipeBridge) writeString(message []byte) error { - size := make([]byte, 2) - binary.LittleEndian.PutUint16(size, uint16(len(message))) - pb.out.Write(size) - - for pos := 0; pos < len(message); { - n, err := pb.out.Write(message[pos:]) - if err != nil { - log.Errorf("Writing out on pipeBridge: %v\n", err) - return err - } - pos += n - } - return nil -} - -func (pb *pipeBridge) readString() ([]byte, error) { - var n int - size := make([]byte, 2) - var err error - - n, err = pb.in.Read(size) - if err != nil || n != 2 { - log.Errorf("Could not read len int from stream: %v\n", err) - return nil, err - } - - n = int(binary.LittleEndian.Uint16(size)) - pos := 0 - buffer := make([]byte, n) - for n > 0 { - m, err := pb.in.Read(buffer[pos:]) - if err != nil { - log.Errorf("Reading into buffer from pipe: %v\n", err) - return nil, err - } - n -= m - pos += m - } - return buffer, nil -} diff --git a/event/bridge/pipeBridge_test.go b/event/bridge/pipeBridge_test.go deleted file mode 100644 index 637f3a6..0000000 --- a/event/bridge/pipeBridge_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package bridge - -import ( - "cwtch.im/cwtch/event" - "git.openprivacy.ca/openprivacy/log" - "os" - "testing" - "time" -) - -var ( - clientPipe = "./client" - servicePipe = "./service" -) - -func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) { - client := NewPipeBridgeClient(in, out) - - messageAfter, ok := client.Read() - if !ok { - t.Errorf("Reading from client IPCBridge failed") - done <- true - return - } - - if messageOrig.Dest != messageAfter.Dest { - t.Errorf("Dest's value differs expected: %v actaul: %v", messageOrig.Dest, messageAfter.Dest) - } - - if messageOrig.Message.EventType != messageAfter.Message.EventType { - t.Errorf("EventTypes's value differs expected: %v actaul: %v", messageOrig.Message.EventType, messageAfter.Message.EventType) - } - - if messageOrig.Message.Data[event.Identity] != messageAfter.Message.Data[event.Identity] { - t.Errorf("Data[Identity]'s value differs expected: %v actaul: %v", messageOrig.Message.Data[event.Identity], messageAfter.Message.Data[event.Identity]) - } - - done <- true -} - -func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) { - service := NewPipeBridgeService(in, out) - - service.Write(messageOrig) - - done <- true -} - -func TestPipeBridge(t *testing.T) { - os.Remove(servicePipe) - os.Remove(clientPipe) - - messageOrig := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer, event.Identity, "It is I")} - serviceDone := make(chan bool) - clientDone := make(chan bool) - - go clientHelper(t, clientPipe, servicePipe, messageOrig, clientDone) - go serviceHelper(t, servicePipe, clientPipe, messageOrig, serviceDone) - - <-serviceDone - <-clientDone -} - -func restartingClient(t *testing.T, in, out string, done chan bool) { - client := NewPipeBridgeClient(in, out) - - message1 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer)} - log.Infoln("client writing message 1") - client.Write(message1) - - time.Sleep(100 * time.Millisecond) - log.Infoln("client shutdown") - client.Shutdown() - - log.Infoln("client new client") - client = NewPipeBridgeClient(in, out) - message2 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.DeleteContact)} - log.Infoln("client2 write message2") - client.Write(message2) - - done <- true -} - -func stableService(t *testing.T, in, out string, done chan bool) { - service := NewPipeBridgeService(in, out) - - log.Infoln("service wait read 1") - message1, ok := service.Read() - log.Infof("service read 1 %v ok:%v\n", message1, ok) - if !ok { - t.Errorf("Reading from client IPCBridge 1st time failed") - done <- true - return - } - if message1.Message.EventType != event.NewPeer { - t.Errorf("Wrong message received, expected NewPeer\n") - done <- true - return - } - - log.Infoln("service wait read 2") - message2, ok := service.Read() - log.Infof("service read 2 got %v ok:%v\n", message2, ok) - if !ok { - t.Errorf("Reading from client IPCBridge 2nd time failed") - done <- true - return - } - if message2.Message.EventType != event.DeleteContact { - t.Errorf("Wrong message received, expected DeleteContact, got %v\n", message2) - done <- true - return - } - - done <- true -} - -func TestReconnect(t *testing.T) { - log.Infoln("TestReconnect") - os.Remove(servicePipe) - os.Remove(clientPipe) - - serviceDone := make(chan bool) - clientDone := make(chan bool) - - go restartingClient(t, clientPipe, servicePipe, clientDone) - go stableService(t, servicePipe, clientPipe, serviceDone) - - <-serviceDone - <-clientDone -} diff --git a/event/eventmanager.go b/event/eventmanager.go index 2575f2e..04c09a2 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -62,11 +62,9 @@ type manager struct { } // Manager is an interface for an event bus -// FIXME this interface lends itself to race conditions around channels type Manager interface { Subscribe(Type, Queue) Publish(Event) - PublishLocal(Event) Shutdown() } @@ -123,11 +121,6 @@ func (em *manager) Publish(event Event) { } } -// Publish an event only locally, not going over an IPC bridge if there is one -func (em *manager) PublishLocal(event Event) { - em.Publish(event) -} - // eventBus is an internal function that is used to distribute events to all subscribers func (em *manager) eventBus() { for { diff --git a/event/eventmanageripc.go b/event/eventmanageripc.go deleted file mode 100644 index 4dddcd1..0000000 --- a/event/eventmanageripc.go +++ /dev/null @@ -1,38 +0,0 @@ -package event - -type ipcManager struct { - manager Manager - - onion string - ipcBridge IPCBridge -} - -// NewIPCEventManager returns an EvenetManager that also pipes events over and supplied IPCBridge -func NewIPCEventManager(bridge IPCBridge, onion string) Manager { - em := &ipcManager{onion: onion, ipcBridge: bridge, manager: NewEventManager()} - return em -} - -// IPCEventManagerFrom returns an IPCEventManger from the supplied manager and IPCBridge -func IPCEventManagerFrom(bridge IPCBridge, onion string, manager Manager) Manager { - em := &ipcManager{onion: onion, ipcBridge: bridge, manager: manager} - return em -} - -func (ipcm *ipcManager) Publish(ev Event) { - ipcm.manager.Publish(ev) - message := &IPCMessage{Dest: ipcm.onion, Message: ev} - ipcm.ipcBridge.Write(message) -} - -func (ipcm *ipcManager) PublishLocal(ev Event) { - ipcm.manager.Publish(ev) -} - -func (ipcm *ipcManager) Subscribe(eventType Type, queue Queue) { - ipcm.manager.Subscribe(eventType, queue) -} - -func (ipcm *ipcManager) Shutdown() { - ipcm.manager.Shutdown() -} diff --git a/event/ipc.go b/event/ipc.go deleted file mode 100644 index 8abae3e..0000000 --- a/event/ipc.go +++ /dev/null @@ -1,14 +0,0 @@ -package event - -// IPCMessage is a wrapper for a regular eventMessage with a destination (onion|AppDest) so the other side of the bridge can route appropriately -type IPCMessage struct { - Dest string - Message Event -} - -// IPCBridge is an interface to a IPC construct used to communicate IPCMessages -type IPCBridge interface { - Read() (*IPCMessage, bool) - Write(message *IPCMessage) - Shutdown() -} diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index c95eb8f..6969416 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -5,7 +5,6 @@ import ( app2 "cwtch.im/cwtch/app" "cwtch.im/cwtch/app/utils" "cwtch.im/cwtch/event" - "cwtch.im/cwtch/event/bridge" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" @@ -96,7 +95,6 @@ func TestCwtchPeerIntegration(t *testing.T) { log.ExcludeFromPattern("connection/connection") log.ExcludeFromPattern("outbound/3dhauthchannel") log.ExcludeFromPattern("event/eventmanager") - log.ExcludeFromPattern("pipeBridge") log.ExcludeFromPattern("tapir") os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") @@ -135,10 +133,6 @@ func TestCwtchPeerIntegration(t *testing.T) { os.Mkdir(cwtchDir, 0700) os.RemoveAll(path.Join(cwtchDir, "testing")) os.Mkdir(path.Join(cwtchDir, "testing"), 0700) - bridgeClient := bridge.NewPipeBridgeClient(path.Join(cwtchDir, "testing/clientPipe"), path.Join(cwtchDir, "testing/servicePipe")) - bridgeService := bridge.NewPipeBridgeService(path.Join(cwtchDir, "testing/servicePipe"), path.Join(cwtchDir, "testing/clientPipe")) - appClient := app2.NewAppClient("./storage", bridgeClient) - appService := app2.NewAppService(acn, "./storage", bridgeService) numGoRoutinesPostAppStart := runtime.NumGoroutine() @@ -151,7 +145,7 @@ func TestCwtchPeerIntegration(t *testing.T) { app.CreatePeer("bob", "asdfasdf") fmt.Println("Creating Carol...") - appClient.CreatePeer("carol", "asdfasdf") + app.CreatePeer("carol", "asdfasdf") alice := utils.WaitGetPeer(app, "alice") fmt.Println("Alice created:", alice.GetOnion()) @@ -163,13 +157,12 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) - carol := utils.WaitGetPeer(appClient, "carol") + carol := utils.WaitGetPeer(app, "carol") fmt.Println("Carol created:", carol.GetOnion()) carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) app.LaunchPeers() - appClient.LaunchPeers() waitTime := time.Duration(60) * time.Second t.Logf("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime) @@ -423,24 +416,14 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostBob := runtime.NumGoroutine() fmt.Println("Shutting down Carol...") - appClient.ShutdownPeer(carol.GetOnion()) + app.ShutdownPeer(carol.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine() fmt.Println("Shutting down apps...") fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine()) app.Shutdown() - fmt.Printf("appClientShutdown: %v\n", runtime.NumGoroutine()) - appClient.Shutdown() - fmt.Printf("appServiceShutdown: %v\n", runtime.NumGoroutine()) - appService.Shutdown() - fmt.Printf("bridgeClientShutdown: %v\n", runtime.NumGoroutine()) - bridgeClient.Shutdown() - time.Sleep(2 * time.Second) - - fmt.Printf("brideServiceShutdown: %v\n", runtime.NumGoroutine()) - bridgeService.Shutdown() time.Sleep(2 * time.Second) fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine())