Remove IPC App Bridge
This commit is contained in:
parent
81bd787a96
commit
e2bba41a9a
|
@ -1,39 +0,0 @@
|
||||||
package app
|
|
||||||
|
|
||||||
import "cwtch.im/cwtch/event"
|
|
||||||
import "git.openprivacy.ca/openprivacy/log"
|
|
||||||
|
|
||||||
const (
|
|
||||||
// DestApp should be used as a destination for IPC messages that are for the application itself an not a peer
|
|
||||||
DestApp = "app"
|
|
||||||
)
|
|
||||||
|
|
||||||
type applicationBridge struct {
|
|
||||||
applicationCore
|
|
||||||
|
|
||||||
bridge event.IPCBridge
|
|
||||||
handle func(*event.Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ab *applicationBridge) listen() {
|
|
||||||
log.Infoln("ab.listen()")
|
|
||||||
for {
|
|
||||||
ipcMessage, ok := ab.bridge.Read()
|
|
||||||
log.Debugf("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest)
|
|
||||||
if !ok {
|
|
||||||
log.Debugln("exiting appBridge.listen()")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if ipcMessage.Dest == DestApp {
|
|
||||||
ab.handle(&ipcMessage.Message)
|
|
||||||
} else {
|
|
||||||
if eventBus, exists := ab.eventBuses[ipcMessage.Dest]; exists {
|
|
||||||
eventBus.PublishLocal(ipcMessage.Message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ab *applicationBridge) Shutdown() {
|
|
||||||
}
|
|
177
app/appClient.go
177
app/appClient.go
|
@ -1,177 +0,0 @@
|
||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"cwtch.im/cwtch/app/plugins"
|
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
"cwtch.im/cwtch/peer"
|
|
||||||
"cwtch.im/cwtch/storage"
|
|
||||||
"fmt"
|
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
|
||||||
"path"
|
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type applicationClient struct {
|
|
||||||
applicationBridge
|
|
||||||
appletPeers
|
|
||||||
|
|
||||||
appBus event.Manager
|
|
||||||
acmutex sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied
|
|
||||||
func NewAppClient(appDirectory string, bridge event.IPCBridge) Application {
|
|
||||||
appClient := &applicationClient{appletPeers: appletPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()}
|
|
||||||
appClient.handle = appClient.handleEvent
|
|
||||||
|
|
||||||
go appClient.listen()
|
|
||||||
|
|
||||||
appClient.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadClient)})
|
|
||||||
|
|
||||||
log.Infoln("Created new App Client")
|
|
||||||
return appClient
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
|
|
||||||
func (ac *applicationClient) GetPrimaryBus() event.Manager {
|
|
||||||
return ac.appBus
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) handleEvent(ev *event.Event) {
|
|
||||||
switch ev.EventType {
|
|
||||||
case event.NewPeer:
|
|
||||||
localID := ev.Data[event.Identity]
|
|
||||||
key := ev.Data[event.Key]
|
|
||||||
salt := ev.Data[event.Salt]
|
|
||||||
reload := ev.Data[event.Status] == event.StorageRunning
|
|
||||||
created := ev.Data[event.Created]
|
|
||||||
ac.newPeer(localID, key, salt, reload, created)
|
|
||||||
case event.PeerDeleted:
|
|
||||||
onion := ev.Data[event.Identity]
|
|
||||||
ac.handleDeletedPeer(onion)
|
|
||||||
case event.PeerError:
|
|
||||||
ac.appBus.Publish(*ev)
|
|
||||||
case event.AppError:
|
|
||||||
ac.appBus.Publish(*ev)
|
|
||||||
case event.ACNStatus:
|
|
||||||
ac.appBus.Publish(*ev)
|
|
||||||
case event.ACNVersion:
|
|
||||||
ac.appBus.Publish(*ev)
|
|
||||||
case event.ReloadDone:
|
|
||||||
ac.appBus.Publish(*ev)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) newPeer(localID, key, salt string, reload bool, created string) {
|
|
||||||
var keyBytes [32]byte
|
|
||||||
var saltBytes [128]byte
|
|
||||||
copy(keyBytes[:], key)
|
|
||||||
copy(saltBytes[:], salt)
|
|
||||||
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), keyBytes, saltBytes)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Could not read profile for NewPeer event: %v\n", err)
|
|
||||||
ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("Could not read profile for NewPeer event: %v\n", err)))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, exists := ac.peers[profile.Onion]
|
|
||||||
if exists {
|
|
||||||
log.Errorf("profile for onion %v already exists", profile.Onion)
|
|
||||||
ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("profile for onion %v already exists", profile.Onion)))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
eventBus := event.NewIPCEventManager(ac.bridge, profile.Onion)
|
|
||||||
peer := peer.FromProfile(profile)
|
|
||||||
peer.Init(eventBus)
|
|
||||||
|
|
||||||
ac.peerLock.Lock()
|
|
||||||
defer ac.peerLock.Unlock()
|
|
||||||
ac.peers[profile.Onion] = peer
|
|
||||||
ac.eventBuses[profile.Onion] = eventBus
|
|
||||||
npEvent := event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion, event.Created: created})
|
|
||||||
if reload {
|
|
||||||
npEvent.Data[event.Status] = event.StorageRunning
|
|
||||||
}
|
|
||||||
ac.appBus.Publish(npEvent)
|
|
||||||
|
|
||||||
if reload {
|
|
||||||
ac.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadPeer, event.Identity, profile.Onion)})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreatePeer messages the service to create a new Peer with the given name
|
|
||||||
func (ac *applicationClient) CreatePeer(name string, password string) {
|
|
||||||
ac.CreateTaggedPeer(name, password, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) CreateTaggedPeer(name, password, tag string) {
|
|
||||||
log.Infof("appClient CreatePeer %v\n", name)
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.CreatePeer, map[event.Field]string{event.ProfileName: name, event.Password: password, event.Data: tag})}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeletePeer messages the service to delete a peer
|
|
||||||
func (ac *applicationClient) DeletePeer(onion string, password string) {
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.DeletePeer, map[event.Field]string{event.Identity: onion, event.Password: password})}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) ChangePeerPassword(onion, oldpass, newpass string) {
|
|
||||||
message := event.IPCMessage{Dest: onion, Message: event.NewEventList(event.ChangePassword, event.Password, oldpass, event.NewPassword, newpass)}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) handleDeletedPeer(onion string) {
|
|
||||||
ac.acmutex.Lock()
|
|
||||||
defer ac.acmutex.Unlock()
|
|
||||||
ac.peers[onion].Shutdown()
|
|
||||||
delete(ac.peers, onion)
|
|
||||||
ac.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
|
||||||
|
|
||||||
ac.applicationCore.DeletePeer(onion)
|
|
||||||
ac.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.AddPeerPlugin, map[event.Field]string{event.Identity: onion, event.Data: strconv.Itoa(int(pluginID))})}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadProfiles messages the service to load any profiles for the given password
|
|
||||||
func (ac *applicationClient) LoadProfiles(password string) {
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.LoadProfiles, map[event.Field]string{event.Password: password})}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) QueryACNStatus() {
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.GetACNStatus, map[event.Field]string{})}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *applicationClient) QueryACNVersion() {
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.GetACNVersion, map[event.Field]string{})}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ShutdownPeer shuts down a peer and removes it from the app's management
|
|
||||||
func (ac *applicationClient) ShutdownPeer(onion string) {
|
|
||||||
ac.acmutex.Lock()
|
|
||||||
defer ac.acmutex.Unlock()
|
|
||||||
ac.eventBuses[onion].Shutdown()
|
|
||||||
delete(ac.eventBuses, onion)
|
|
||||||
ac.peers[onion].Shutdown()
|
|
||||||
delete(ac.peers, onion)
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.ShutdownPeer, map[event.Field]string{event.Identity: onion})}
|
|
||||||
ac.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown shuts down the application client and all front end peer components
|
|
||||||
func (ac *applicationClient) Shutdown() {
|
|
||||||
for id := range ac.peers {
|
|
||||||
ac.ShutdownPeer(id)
|
|
||||||
}
|
|
||||||
ac.applicationBridge.Shutdown()
|
|
||||||
ac.appBus.Shutdown()
|
|
||||||
}
|
|
|
@ -1,209 +0,0 @@
|
||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"cwtch.im/cwtch/app/plugins"
|
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
"cwtch.im/cwtch/model"
|
|
||||||
"cwtch.im/cwtch/protocol/connections"
|
|
||||||
"cwtch.im/cwtch/storage"
|
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
|
||||||
"git.openprivacy.ca/openprivacy/connectivity"
|
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
|
||||||
path "path/filepath"
|
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type applicationService struct {
|
|
||||||
applicationBridge
|
|
||||||
appletACN
|
|
||||||
appletPlugins
|
|
||||||
|
|
||||||
storage map[string]storage.ProfileStore
|
|
||||||
engines map[string]connections.Engine
|
|
||||||
asmutex sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// ApplicationService is the back end of an application that manages engines and writing storage and communicates to an ApplicationClient by an IPCBridge
|
|
||||||
type ApplicationService interface {
|
|
||||||
Shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge
|
|
||||||
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
|
|
||||||
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}
|
|
||||||
|
|
||||||
appService.appletACN.init(acn, appService.getACNStatusHandler())
|
|
||||||
appService.handle = appService.handleEvent
|
|
||||||
|
|
||||||
go appService.listen()
|
|
||||||
|
|
||||||
log.Infoln("Created new App Service")
|
|
||||||
return appService
|
|
||||||
}
|
|
||||||
|
|
||||||
func (as *applicationService) handleEvent(ev *event.Event) {
|
|
||||||
log.Infof("app Service handleEvent %v\n", ev.EventType)
|
|
||||||
switch ev.EventType {
|
|
||||||
case event.CreatePeer:
|
|
||||||
profileName := ev.Data[event.ProfileName]
|
|
||||||
password := ev.Data[event.Password]
|
|
||||||
tag := ev.Data[event.Data]
|
|
||||||
as.createPeer(profileName, password, tag)
|
|
||||||
case event.DeletePeer:
|
|
||||||
onion := ev.Data[event.Identity]
|
|
||||||
password := ev.Data[event.Password]
|
|
||||||
as.deletePeer(onion, password)
|
|
||||||
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: *ev}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
case event.AddPeerPlugin:
|
|
||||||
onion := ev.Data[event.Identity]
|
|
||||||
pluginID, _ := strconv.Atoi(ev.Data[event.Data])
|
|
||||||
as.AddPlugin(onion, plugins.PluginID(pluginID), as.eventBuses[onion], as.acn)
|
|
||||||
case event.LoadProfiles:
|
|
||||||
password := ev.Data[event.Password]
|
|
||||||
as.loadProfiles(password)
|
|
||||||
case event.ReloadClient:
|
|
||||||
for _, storage := range as.storage {
|
|
||||||
peerMsg := *storage.GetNewPeerMessage()
|
|
||||||
peerMsg.Data[event.Status] = event.StorageRunning
|
|
||||||
peerMsg.Data[event.Created] = event.False
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadDone)}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
case event.ReloadPeer:
|
|
||||||
onion := ev.Data[event.Identity]
|
|
||||||
events := as.storage[onion].GetStatusMessages()
|
|
||||||
|
|
||||||
for _, ev := range events {
|
|
||||||
message := event.IPCMessage{Dest: onion, Message: *ev}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
case event.GetACNStatus:
|
|
||||||
prog, status := as.acn.GetBootstrapStatus()
|
|
||||||
as.getACNStatusHandler()(prog, status)
|
|
||||||
case event.GetACNVersion:
|
|
||||||
version := as.acn.GetVersion()
|
|
||||||
as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNVersion, event.Data, version)})
|
|
||||||
case event.ShutdownPeer:
|
|
||||||
onion := ev.Data[event.Identity]
|
|
||||||
as.ShutdownPeer(onion)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (as *applicationService) createPeer(name, password, tag string) {
|
|
||||||
log.Infof("app Service create peer %v %v\n", name, password)
|
|
||||||
profile, err := as.applicationCore.CreatePeer(name)
|
|
||||||
as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion])
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Could not create Peer: %v\n", err)
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.PeerError, event.Error, err.Error())}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
profileStore := storage.CreateProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile)
|
|
||||||
|
|
||||||
peerAuthorizations := profile.ContactsAuthorizations()
|
|
||||||
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
|
|
||||||
identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
|
|
||||||
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], peerAuthorizations)
|
|
||||||
|
|
||||||
as.storage[profile.Onion] = profileStore
|
|
||||||
as.engines[profile.Onion] = engine
|
|
||||||
|
|
||||||
peerMsg := *profileStore.GetNewPeerMessage()
|
|
||||||
peerMsg.Data[event.Created] = event.True
|
|
||||||
peerMsg.Data[event.Status] = event.StorageNew
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (as *applicationService) loadProfiles(password string) {
|
|
||||||
count := 0
|
|
||||||
as.applicationCore.LoadProfiles(password, false, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
|
||||||
as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion])
|
|
||||||
|
|
||||||
peerAuthorizations := profile.ContactsAuthorizations()
|
|
||||||
identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
|
|
||||||
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], peerAuthorizations)
|
|
||||||
as.asmutex.Lock()
|
|
||||||
as.storage[profile.Onion] = profileStore
|
|
||||||
as.engines[profile.Onion] = engine
|
|
||||||
as.asmutex.Unlock()
|
|
||||||
|
|
||||||
peerMsg := *profileStore.GetNewPeerMessage()
|
|
||||||
peerMsg.Data[event.Created] = event.False
|
|
||||||
peerMsg.Data[event.Status] = event.StorageNew
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
count++
|
|
||||||
})
|
|
||||||
if count == 0 {
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (as *applicationService) getACNStatusHandler() func(int, string) {
|
|
||||||
return func(progress int, status string) {
|
|
||||||
progStr := strconv.Itoa(progress)
|
|
||||||
as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)})
|
|
||||||
as.applicationCore.coremutex.Lock()
|
|
||||||
defer as.applicationCore.coremutex.Unlock()
|
|
||||||
for _, bus := range as.eventBuses {
|
|
||||||
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (as *applicationService) deletePeer(onion, password string) {
|
|
||||||
as.asmutex.Lock()
|
|
||||||
defer as.asmutex.Unlock()
|
|
||||||
|
|
||||||
if as.storage[onion].CheckPassword(password) {
|
|
||||||
as.appletPlugins.ShutdownPeer(onion)
|
|
||||||
as.plugins.Delete(onion)
|
|
||||||
|
|
||||||
as.engines[onion].Shutdown()
|
|
||||||
delete(as.engines, onion)
|
|
||||||
|
|
||||||
as.storage[onion].Shutdown()
|
|
||||||
as.storage[onion].Delete()
|
|
||||||
delete(as.storage, onion)
|
|
||||||
|
|
||||||
as.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
|
||||||
|
|
||||||
as.applicationCore.DeletePeer(onion)
|
|
||||||
log.Debugf("Delete peer for %v Done\n", onion)
|
|
||||||
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.PeerDeleted, event.Identity, onion)}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)}
|
|
||||||
as.bridge.Write(&message)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (as *applicationService) ShutdownPeer(onion string) {
|
|
||||||
as.engines[onion].Shutdown()
|
|
||||||
delete(as.engines, onion)
|
|
||||||
as.storage[onion].Shutdown()
|
|
||||||
delete(as.storage, onion)
|
|
||||||
as.eventBuses[onion].Shutdown()
|
|
||||||
delete(as.eventBuses, onion)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown shuts down the application Service and all peer related backend parts
|
|
||||||
func (as *applicationService) Shutdown() {
|
|
||||||
log.Debugf("shutting down application service...")
|
|
||||||
as.appletPlugins.Shutdown()
|
|
||||||
for id := range as.engines {
|
|
||||||
log.Debugf("shutting down application service peer engine %v", id)
|
|
||||||
as.ShutdownPeer(id)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
package bridge
|
|
||||||
|
|
||||||
import (
|
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type goChanBridge struct {
|
|
||||||
in chan event.IPCMessage
|
|
||||||
out chan event.IPCMessage
|
|
||||||
closedChan chan bool
|
|
||||||
closed bool
|
|
||||||
lock sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// MakeGoChanBridge returns a simple testing IPCBridge made from inprocess go channels
|
|
||||||
func MakeGoChanBridge() (b1, b2 event.IPCBridge) {
|
|
||||||
chan1 := make(chan event.IPCMessage)
|
|
||||||
chan2 := make(chan event.IPCMessage)
|
|
||||||
closed := make(chan bool)
|
|
||||||
|
|
||||||
a := &goChanBridge{in: chan1, out: chan2, closedChan: closed, closed: false}
|
|
||||||
b := &goChanBridge{in: chan2, out: chan1, closedChan: closed, closed: false}
|
|
||||||
|
|
||||||
go monitor(a, b)
|
|
||||||
|
|
||||||
return a, b
|
|
||||||
}
|
|
||||||
|
|
||||||
func monitor(a, b *goChanBridge) {
|
|
||||||
<-a.closedChan
|
|
||||||
a.closed = true
|
|
||||||
b.closed = true
|
|
||||||
a.closedChan <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *goChanBridge) Read() (*event.IPCMessage, bool) {
|
|
||||||
message, ok := <-pb.in
|
|
||||||
return &message, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *goChanBridge) Write(message *event.IPCMessage) {
|
|
||||||
pb.lock.Lock()
|
|
||||||
defer pb.lock.Unlock()
|
|
||||||
if !pb.closed {
|
|
||||||
pb.out <- *message
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *goChanBridge) Shutdown() {
|
|
||||||
if !pb.closed {
|
|
||||||
close(pb.in)
|
|
||||||
close(pb.out)
|
|
||||||
pb.closedChan <- true
|
|
||||||
<-pb.closedChan
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,72 +0,0 @@
|
||||||
package bridge
|
|
||||||
|
|
||||||
/* Todo: When go generics ships, refactor this and event.infiniteChannel into one */
|
|
||||||
|
|
||||||
// InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output.
|
|
||||||
type InfiniteChannel struct {
|
|
||||||
input, output chan interface{}
|
|
||||||
length chan int
|
|
||||||
buffer *InfiniteQueue
|
|
||||||
}
|
|
||||||
|
|
||||||
func newInfiniteChannel() *InfiniteChannel {
|
|
||||||
ch := &InfiniteChannel{
|
|
||||||
input: make(chan interface{}),
|
|
||||||
output: make(chan interface{}),
|
|
||||||
length: make(chan int),
|
|
||||||
buffer: newInfiniteQueue(),
|
|
||||||
}
|
|
||||||
go ch.infiniteBuffer()
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
// In returns the input channel
|
|
||||||
func (ch *InfiniteChannel) In() chan<- interface{} {
|
|
||||||
return ch.input
|
|
||||||
}
|
|
||||||
|
|
||||||
// Out returns the output channel
|
|
||||||
func (ch *InfiniteChannel) Out() <-chan interface{} {
|
|
||||||
return ch.output
|
|
||||||
}
|
|
||||||
|
|
||||||
// Len returns the length of items in queue
|
|
||||||
func (ch *InfiniteChannel) Len() int {
|
|
||||||
return <-ch.length
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the InfiniteChanel
|
|
||||||
func (ch *InfiniteChannel) Close() {
|
|
||||||
close(ch.input)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *InfiniteChannel) infiniteBuffer() {
|
|
||||||
var input, output chan interface{}
|
|
||||||
var next interface{}
|
|
||||||
input = ch.input
|
|
||||||
|
|
||||||
for input != nil || output != nil {
|
|
||||||
select {
|
|
||||||
case elem, open := <-input:
|
|
||||||
if open {
|
|
||||||
ch.buffer.Add(elem)
|
|
||||||
} else {
|
|
||||||
input = nil
|
|
||||||
}
|
|
||||||
case output <- next:
|
|
||||||
ch.buffer.Remove()
|
|
||||||
case ch.length <- ch.buffer.Length():
|
|
||||||
}
|
|
||||||
|
|
||||||
if ch.buffer.Length() > 0 {
|
|
||||||
output = ch.output
|
|
||||||
next = ch.buffer.Peek()
|
|
||||||
} else {
|
|
||||||
output = nil
|
|
||||||
next = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close(ch.output)
|
|
||||||
close(ch.length)
|
|
||||||
}
|
|
|
@ -1,105 +0,0 @@
|
||||||
package bridge
|
|
||||||
|
|
||||||
/* Todo: When go generics ships, refactor this and event.infinitQueue channel into one */
|
|
||||||
|
|
||||||
/*
|
|
||||||
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
|
|
||||||
Using this instead of other, simpler, queue implementations (slice+append or linked list) provides
|
|
||||||
substantial memory and time benefits, and fewer GC pauses.
|
|
||||||
|
|
||||||
The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe.
|
|
||||||
*/
|
|
||||||
|
|
||||||
// minQueueLen is smallest capacity that queue may have.
|
|
||||||
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
|
|
||||||
const minQueueLen = 16
|
|
||||||
|
|
||||||
// InfiniteQueue represents a single instance of the queue data structure.
|
|
||||||
type InfiniteQueue struct {
|
|
||||||
buf []interface{}
|
|
||||||
head, tail, count int
|
|
||||||
}
|
|
||||||
|
|
||||||
// New constructs and returns a new Queue.
|
|
||||||
func newInfiniteQueue() *InfiniteQueue {
|
|
||||||
return &InfiniteQueue{
|
|
||||||
buf: make([]interface{}, minQueueLen),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Length returns the number of elements currently stored in the queue.
|
|
||||||
func (q *InfiniteQueue) Length() int {
|
|
||||||
return q.count
|
|
||||||
}
|
|
||||||
|
|
||||||
// resizes the queue to fit exactly twice its current contents
|
|
||||||
// this can result in shrinking if the queue is less than half-full
|
|
||||||
func (q *InfiniteQueue) resize() {
|
|
||||||
newBuf := make([]interface{}, q.count<<1)
|
|
||||||
|
|
||||||
if q.tail > q.head {
|
|
||||||
copy(newBuf, q.buf[q.head:q.tail])
|
|
||||||
} else {
|
|
||||||
n := copy(newBuf, q.buf[q.head:])
|
|
||||||
copy(newBuf[n:], q.buf[:q.tail])
|
|
||||||
}
|
|
||||||
|
|
||||||
q.head = 0
|
|
||||||
q.tail = q.count
|
|
||||||
q.buf = newBuf
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add puts an element on the end of the queue.
|
|
||||||
func (q *InfiniteQueue) Add(elem interface{}) {
|
|
||||||
if q.count == len(q.buf) {
|
|
||||||
q.resize()
|
|
||||||
}
|
|
||||||
|
|
||||||
q.buf[q.tail] = elem
|
|
||||||
// bitwise modulus
|
|
||||||
q.tail = (q.tail + 1) & (len(q.buf) - 1)
|
|
||||||
q.count++
|
|
||||||
}
|
|
||||||
|
|
||||||
// Peek returns the element at the head of the queue. This call panics
|
|
||||||
// if the queue is empty.
|
|
||||||
func (q *InfiniteQueue) Peek() interface{} {
|
|
||||||
if q.count <= 0 {
|
|
||||||
panic("queue: Peek() called on empty queue")
|
|
||||||
}
|
|
||||||
return q.buf[q.head]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get returns the element at index i in the queue. If the index is
|
|
||||||
// invalid, the call will panic. This method accepts both positive and
|
|
||||||
// negative index values. Index 0 refers to the first element, and
|
|
||||||
// index -1 refers to the last.
|
|
||||||
func (q *InfiniteQueue) Get(i int) interface{} {
|
|
||||||
// If indexing backwards, convert to positive index.
|
|
||||||
if i < 0 {
|
|
||||||
i += q.count
|
|
||||||
}
|
|
||||||
if i < 0 || i >= q.count {
|
|
||||||
panic("queue: Get() called with index out of range")
|
|
||||||
}
|
|
||||||
// bitwise modulus
|
|
||||||
return q.buf[(q.head+i)&(len(q.buf)-1)]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove removes and returns the element from the front of the queue. If the
|
|
||||||
// queue is empty, the call will panic.
|
|
||||||
func (q *InfiniteQueue) Remove() interface{} {
|
|
||||||
if q.count <= 0 {
|
|
||||||
panic("queue: Remove() called on empty queue")
|
|
||||||
}
|
|
||||||
ret := q.buf[q.head]
|
|
||||||
q.buf[q.head] = nil
|
|
||||||
// bitwise modulus
|
|
||||||
q.head = (q.head + 1) & (len(q.buf) - 1)
|
|
||||||
q.count--
|
|
||||||
// Resize down if buffer 1/4 full.
|
|
||||||
if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) {
|
|
||||||
q.resize()
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
|
@ -1,19 +0,0 @@
|
||||||
// +build windows
|
|
||||||
|
|
||||||
package bridge
|
|
||||||
|
|
||||||
import (
|
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
"log"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
|
|
||||||
log.Fatal("Not supported on windows")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPipeBridgeService returns a pipe backed IPCBridge for a service
|
|
||||||
func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
|
|
||||||
log.Fatal("Not supported on windows")
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,357 +0,0 @@
|
||||||
// +build !windows
|
|
||||||
|
|
||||||
package bridge
|
|
||||||
|
|
||||||
import (
|
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
"cwtch.im/cwtch/protocol/connections"
|
|
||||||
"encoding/base64"
|
|
||||||
"encoding/binary"
|
|
||||||
"encoding/json"
|
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
/* pipeBridge creates a pair of named pipes
|
|
||||||
Needs a call to new client and service to fully successfully open
|
|
||||||
*/
|
|
||||||
|
|
||||||
const maxBufferSize = 1000
|
|
||||||
|
|
||||||
const serviceName = "service"
|
|
||||||
const clientName = "client"
|
|
||||||
|
|
||||||
const syn = "SYN"
|
|
||||||
const synack = "SYNACK"
|
|
||||||
const ack = "ACK"
|
|
||||||
|
|
||||||
type pipeBridge struct {
|
|
||||||
infile, outfile string
|
|
||||||
in, out *os.File
|
|
||||||
read chan event.IPCMessage
|
|
||||||
write *InfiniteChannel
|
|
||||||
closedChan chan bool
|
|
||||||
state connections.ConnectionState
|
|
||||||
lock sync.Mutex
|
|
||||||
threeShake func() bool
|
|
||||||
|
|
||||||
// For logging / debugging purposes
|
|
||||||
name string
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPipeBridge(inFilename, outFilename string) *pipeBridge {
|
|
||||||
syscall.Mkfifo(inFilename, 0600)
|
|
||||||
syscall.Mkfifo(outFilename, 0600)
|
|
||||||
pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED}
|
|
||||||
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
|
||||||
pb.write = newInfiniteChannel() //make(chan event.IPCMessage, maxBufferSize)
|
|
||||||
return pb
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPipeBridgeClient returns a pipe backed IPCBridge for a client
|
|
||||||
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
|
|
||||||
log.Debugf("Making new PipeBridge Client...\n")
|
|
||||||
pb := newPipeBridge(inFilename, outFilename)
|
|
||||||
pb.name = clientName
|
|
||||||
pb.threeShake = pb.threeShakeClient
|
|
||||||
go pb.connectionManager()
|
|
||||||
|
|
||||||
return pb
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPipeBridgeService returns a pipe backed IPCBridge for a service
|
|
||||||
func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
|
|
||||||
log.Debugf("Making new PipeBridge Service...\n")
|
|
||||||
pb := newPipeBridge(inFilename, outFilename)
|
|
||||||
pb.name = serviceName
|
|
||||||
pb.threeShake = pb.threeShakeService
|
|
||||||
|
|
||||||
go pb.connectionManager()
|
|
||||||
|
|
||||||
log.Debugf("Successfully created new PipeBridge Service!\n")
|
|
||||||
return pb
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) setState(state connections.ConnectionState) {
|
|
||||||
pb.lock.Lock()
|
|
||||||
defer pb.lock.Unlock()
|
|
||||||
|
|
||||||
pb.state = state
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) getState() connections.ConnectionState {
|
|
||||||
pb.lock.Lock()
|
|
||||||
defer pb.lock.Unlock()
|
|
||||||
|
|
||||||
return pb.state
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) connectionManager() {
|
|
||||||
for pb.getState() != connections.KILLED {
|
|
||||||
log.Debugf("clientConnManager loop start init\n")
|
|
||||||
pb.setState(connections.CONNECTING)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
log.Debugf("%v open file infile\n", pb.name)
|
|
||||||
pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600)
|
|
||||||
if err != nil {
|
|
||||||
pb.setState(connections.DISCONNECTED)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("%v open file outfile\n", pb.name)
|
|
||||||
pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600)
|
|
||||||
if err != nil {
|
|
||||||
pb.setState(connections.DISCONNECTED)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("Successfully connected PipeBridge %v!\n", pb.name)
|
|
||||||
|
|
||||||
pb.handleConns()
|
|
||||||
}
|
|
||||||
log.Debugf("exiting %v ConnectionManager\n", pb.name)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// threeShake performs a 3way handshake sync up
|
|
||||||
func (pb *pipeBridge) threeShakeService() bool {
|
|
||||||
synacked := false
|
|
||||||
|
|
||||||
for {
|
|
||||||
resp, err := pb.readString()
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(resp) == syn {
|
|
||||||
if !synacked {
|
|
||||||
err = pb.writeString([]byte(synack))
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
synacked = true
|
|
||||||
}
|
|
||||||
} else if string(resp) == ack {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) synLoop(stop chan bool) {
|
|
||||||
delay := time.Duration(0)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-time.After(delay):
|
|
||||||
err := pb.writeString([]byte(syn))
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
delay = time.Second
|
|
||||||
case <-stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) threeShakeClient() bool {
|
|
||||||
stop := make(chan bool)
|
|
||||||
go pb.synLoop(stop)
|
|
||||||
for {
|
|
||||||
resp, err := pb.readString()
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(resp) == synack {
|
|
||||||
stop <- true
|
|
||||||
err := pb.writeString([]byte(ack))
|
|
||||||
return err == nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) handleConns() {
|
|
||||||
|
|
||||||
if !pb.threeShake() {
|
|
||||||
pb.setState(connections.FAILED)
|
|
||||||
pb.closeReset()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
pb.setState(connections.AUTHENTICATED)
|
|
||||||
|
|
||||||
pb.closedChan = make(chan bool, 5)
|
|
||||||
|
|
||||||
log.Debugf("handleConns authed, %v 2xgo\n", pb.name)
|
|
||||||
|
|
||||||
go pb.handleRead()
|
|
||||||
go pb.handleWrite()
|
|
||||||
|
|
||||||
<-pb.closedChan
|
|
||||||
log.Debugf("handleConns <-closedChan (%v)\n", pb.name)
|
|
||||||
if pb.getState() != connections.KILLED {
|
|
||||||
pb.setState(connections.FAILED)
|
|
||||||
}
|
|
||||||
pb.closeReset()
|
|
||||||
log.Debugf("handleConns done for %v, exit\n", pb.name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) closeReset() {
|
|
||||||
pb.in.Close()
|
|
||||||
pb.out.Close()
|
|
||||||
close(pb.read)
|
|
||||||
pb.write.Close()
|
|
||||||
|
|
||||||
if pb.getState() != connections.KILLED {
|
|
||||||
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
|
||||||
pb.write = newInfiniteChannel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) handleWrite() {
|
|
||||||
log.Debugf("handleWrite() %v\n", pb.name)
|
|
||||||
defer log.Debugf("exiting handleWrite() %v\n", pb.name)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case messageInf := <-pb.write.output:
|
|
||||||
if messageInf == nil {
|
|
||||||
pb.closedChan <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
message := messageInf.(event.IPCMessage)
|
|
||||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
|
||||||
log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType)
|
|
||||||
} else {
|
|
||||||
log.Debugf("handleWrite <- message: %v\n", message)
|
|
||||||
}
|
|
||||||
if pb.getState() == connections.AUTHENTICATED {
|
|
||||||
encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
|
|
||||||
for k, v := range message.Message.Data {
|
|
||||||
encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v))
|
|
||||||
}
|
|
||||||
|
|
||||||
messageJSON, _ := json.Marshal(encMessage)
|
|
||||||
err := pb.writeString(messageJSON)
|
|
||||||
if err != nil {
|
|
||||||
pb.closedChan <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) handleRead() {
|
|
||||||
log.Debugf("handleRead() %v\n", pb.name)
|
|
||||||
defer log.Debugf("exiting handleRead() %v", pb.name)
|
|
||||||
|
|
||||||
for {
|
|
||||||
log.Debugf("Waiting to handleRead()...\n")
|
|
||||||
|
|
||||||
buffer, err := pb.readString()
|
|
||||||
if err != nil {
|
|
||||||
pb.closedChan <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var message event.IPCMessage
|
|
||||||
err = json.Unmarshal(buffer, &message)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Read error: '%v', value: '%v'", err, buffer)
|
|
||||||
pb.closedChan <- true
|
|
||||||
return // probably new connection trying to initialize
|
|
||||||
}
|
|
||||||
for k, v := range message.Message.Data {
|
|
||||||
val, _ := base64.StdEncoding.DecodeString(v)
|
|
||||||
message.Message.Data[k] = string(val)
|
|
||||||
}
|
|
||||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
|
||||||
log.Debugf("handleRead read<-: %v %v ...\n", message.Dest, message.Message.EventType)
|
|
||||||
} else {
|
|
||||||
log.Debugf("handleRead read<-: %v\n", message)
|
|
||||||
}
|
|
||||||
pb.read <- message
|
|
||||||
log.Debugf("handleRead wrote\n")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
|
|
||||||
log.Debugf("Read() %v...\n", pb.name)
|
|
||||||
var ok = false
|
|
||||||
var message event.IPCMessage
|
|
||||||
for !ok && pb.getState() != connections.KILLED {
|
|
||||||
message, ok = <-pb.read
|
|
||||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
|
||||||
log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
|
|
||||||
} else {
|
|
||||||
log.Debugf("Read %v: %v\n", pb.name, message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &message, pb.getState() != connections.KILLED
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) Write(message *event.IPCMessage) {
|
|
||||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
|
||||||
log.Debugf("Write %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
|
|
||||||
} else {
|
|
||||||
log.Debugf("Write %v: %v\n", pb.name, message)
|
|
||||||
}
|
|
||||||
pb.write.input <- *message
|
|
||||||
log.Debugf("Wrote\n")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) Shutdown() {
|
|
||||||
log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.getState()])
|
|
||||||
pb.state = connections.KILLED
|
|
||||||
pb.closedChan <- true
|
|
||||||
log.Debugf("Done Shutdown for %v\n", pb.name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) writeString(message []byte) error {
|
|
||||||
size := make([]byte, 2)
|
|
||||||
binary.LittleEndian.PutUint16(size, uint16(len(message)))
|
|
||||||
pb.out.Write(size)
|
|
||||||
|
|
||||||
for pos := 0; pos < len(message); {
|
|
||||||
n, err := pb.out.Write(message[pos:])
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Writing out on pipeBridge: %v\n", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
pos += n
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pb *pipeBridge) readString() ([]byte, error) {
|
|
||||||
var n int
|
|
||||||
size := make([]byte, 2)
|
|
||||||
var err error
|
|
||||||
|
|
||||||
n, err = pb.in.Read(size)
|
|
||||||
if err != nil || n != 2 {
|
|
||||||
log.Errorf("Could not read len int from stream: %v\n", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
n = int(binary.LittleEndian.Uint16(size))
|
|
||||||
pos := 0
|
|
||||||
buffer := make([]byte, n)
|
|
||||||
for n > 0 {
|
|
||||||
m, err := pb.in.Read(buffer[pos:])
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Reading into buffer from pipe: %v\n", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
n -= m
|
|
||||||
pos += m
|
|
||||||
}
|
|
||||||
return buffer, nil
|
|
||||||
}
|
|
|
@ -1,131 +0,0 @@
|
||||||
package bridge
|
|
||||||
|
|
||||||
import (
|
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
clientPipe = "./client"
|
|
||||||
servicePipe = "./service"
|
|
||||||
)
|
|
||||||
|
|
||||||
func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
|
|
||||||
client := NewPipeBridgeClient(in, out)
|
|
||||||
|
|
||||||
messageAfter, ok := client.Read()
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("Reading from client IPCBridge failed")
|
|
||||||
done <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if messageOrig.Dest != messageAfter.Dest {
|
|
||||||
t.Errorf("Dest's value differs expected: %v actaul: %v", messageOrig.Dest, messageAfter.Dest)
|
|
||||||
}
|
|
||||||
|
|
||||||
if messageOrig.Message.EventType != messageAfter.Message.EventType {
|
|
||||||
t.Errorf("EventTypes's value differs expected: %v actaul: %v", messageOrig.Message.EventType, messageAfter.Message.EventType)
|
|
||||||
}
|
|
||||||
|
|
||||||
if messageOrig.Message.Data[event.Identity] != messageAfter.Message.Data[event.Identity] {
|
|
||||||
t.Errorf("Data[Identity]'s value differs expected: %v actaul: %v", messageOrig.Message.Data[event.Identity], messageAfter.Message.Data[event.Identity])
|
|
||||||
}
|
|
||||||
|
|
||||||
done <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
|
|
||||||
service := NewPipeBridgeService(in, out)
|
|
||||||
|
|
||||||
service.Write(messageOrig)
|
|
||||||
|
|
||||||
done <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPipeBridge(t *testing.T) {
|
|
||||||
os.Remove(servicePipe)
|
|
||||||
os.Remove(clientPipe)
|
|
||||||
|
|
||||||
messageOrig := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer, event.Identity, "It is I")}
|
|
||||||
serviceDone := make(chan bool)
|
|
||||||
clientDone := make(chan bool)
|
|
||||||
|
|
||||||
go clientHelper(t, clientPipe, servicePipe, messageOrig, clientDone)
|
|
||||||
go serviceHelper(t, servicePipe, clientPipe, messageOrig, serviceDone)
|
|
||||||
|
|
||||||
<-serviceDone
|
|
||||||
<-clientDone
|
|
||||||
}
|
|
||||||
|
|
||||||
func restartingClient(t *testing.T, in, out string, done chan bool) {
|
|
||||||
client := NewPipeBridgeClient(in, out)
|
|
||||||
|
|
||||||
message1 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer)}
|
|
||||||
log.Infoln("client writing message 1")
|
|
||||||
client.Write(message1)
|
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
log.Infoln("client shutdown")
|
|
||||||
client.Shutdown()
|
|
||||||
|
|
||||||
log.Infoln("client new client")
|
|
||||||
client = NewPipeBridgeClient(in, out)
|
|
||||||
message2 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.DeleteContact)}
|
|
||||||
log.Infoln("client2 write message2")
|
|
||||||
client.Write(message2)
|
|
||||||
|
|
||||||
done <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
func stableService(t *testing.T, in, out string, done chan bool) {
|
|
||||||
service := NewPipeBridgeService(in, out)
|
|
||||||
|
|
||||||
log.Infoln("service wait read 1")
|
|
||||||
message1, ok := service.Read()
|
|
||||||
log.Infof("service read 1 %v ok:%v\n", message1, ok)
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("Reading from client IPCBridge 1st time failed")
|
|
||||||
done <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if message1.Message.EventType != event.NewPeer {
|
|
||||||
t.Errorf("Wrong message received, expected NewPeer\n")
|
|
||||||
done <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infoln("service wait read 2")
|
|
||||||
message2, ok := service.Read()
|
|
||||||
log.Infof("service read 2 got %v ok:%v\n", message2, ok)
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("Reading from client IPCBridge 2nd time failed")
|
|
||||||
done <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if message2.Message.EventType != event.DeleteContact {
|
|
||||||
t.Errorf("Wrong message received, expected DeleteContact, got %v\n", message2)
|
|
||||||
done <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
done <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReconnect(t *testing.T) {
|
|
||||||
log.Infoln("TestReconnect")
|
|
||||||
os.Remove(servicePipe)
|
|
||||||
os.Remove(clientPipe)
|
|
||||||
|
|
||||||
serviceDone := make(chan bool)
|
|
||||||
clientDone := make(chan bool)
|
|
||||||
|
|
||||||
go restartingClient(t, clientPipe, servicePipe, clientDone)
|
|
||||||
go stableService(t, servicePipe, clientPipe, serviceDone)
|
|
||||||
|
|
||||||
<-serviceDone
|
|
||||||
<-clientDone
|
|
||||||
}
|
|
|
@ -62,11 +62,9 @@ type manager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Manager is an interface for an event bus
|
// Manager is an interface for an event bus
|
||||||
// FIXME this interface lends itself to race conditions around channels
|
|
||||||
type Manager interface {
|
type Manager interface {
|
||||||
Subscribe(Type, Queue)
|
Subscribe(Type, Queue)
|
||||||
Publish(Event)
|
Publish(Event)
|
||||||
PublishLocal(Event)
|
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,11 +121,6 @@ func (em *manager) Publish(event Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish an event only locally, not going over an IPC bridge if there is one
|
|
||||||
func (em *manager) PublishLocal(event Event) {
|
|
||||||
em.Publish(event)
|
|
||||||
}
|
|
||||||
|
|
||||||
// eventBus is an internal function that is used to distribute events to all subscribers
|
// eventBus is an internal function that is used to distribute events to all subscribers
|
||||||
func (em *manager) eventBus() {
|
func (em *manager) eventBus() {
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -1,38 +0,0 @@
|
||||||
package event
|
|
||||||
|
|
||||||
type ipcManager struct {
|
|
||||||
manager Manager
|
|
||||||
|
|
||||||
onion string
|
|
||||||
ipcBridge IPCBridge
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewIPCEventManager returns an EvenetManager that also pipes events over and supplied IPCBridge
|
|
||||||
func NewIPCEventManager(bridge IPCBridge, onion string) Manager {
|
|
||||||
em := &ipcManager{onion: onion, ipcBridge: bridge, manager: NewEventManager()}
|
|
||||||
return em
|
|
||||||
}
|
|
||||||
|
|
||||||
// IPCEventManagerFrom returns an IPCEventManger from the supplied manager and IPCBridge
|
|
||||||
func IPCEventManagerFrom(bridge IPCBridge, onion string, manager Manager) Manager {
|
|
||||||
em := &ipcManager{onion: onion, ipcBridge: bridge, manager: manager}
|
|
||||||
return em
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ipcm *ipcManager) Publish(ev Event) {
|
|
||||||
ipcm.manager.Publish(ev)
|
|
||||||
message := &IPCMessage{Dest: ipcm.onion, Message: ev}
|
|
||||||
ipcm.ipcBridge.Write(message)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ipcm *ipcManager) PublishLocal(ev Event) {
|
|
||||||
ipcm.manager.Publish(ev)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ipcm *ipcManager) Subscribe(eventType Type, queue Queue) {
|
|
||||||
ipcm.manager.Subscribe(eventType, queue)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ipcm *ipcManager) Shutdown() {
|
|
||||||
ipcm.manager.Shutdown()
|
|
||||||
}
|
|
14
event/ipc.go
14
event/ipc.go
|
@ -1,14 +0,0 @@
|
||||||
package event
|
|
||||||
|
|
||||||
// IPCMessage is a wrapper for a regular eventMessage with a destination (onion|AppDest) so the other side of the bridge can route appropriately
|
|
||||||
type IPCMessage struct {
|
|
||||||
Dest string
|
|
||||||
Message Event
|
|
||||||
}
|
|
||||||
|
|
||||||
// IPCBridge is an interface to a IPC construct used to communicate IPCMessages
|
|
||||||
type IPCBridge interface {
|
|
||||||
Read() (*IPCMessage, bool)
|
|
||||||
Write(message *IPCMessage)
|
|
||||||
Shutdown()
|
|
||||||
}
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
app2 "cwtch.im/cwtch/app"
|
app2 "cwtch.im/cwtch/app"
|
||||||
"cwtch.im/cwtch/app/utils"
|
"cwtch.im/cwtch/app/utils"
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
"cwtch.im/cwtch/event/bridge"
|
|
||||||
"cwtch.im/cwtch/model"
|
"cwtch.im/cwtch/model"
|
||||||
"cwtch.im/cwtch/model/attr"
|
"cwtch.im/cwtch/model/attr"
|
||||||
"cwtch.im/cwtch/model/constants"
|
"cwtch.im/cwtch/model/constants"
|
||||||
|
@ -96,7 +95,6 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
log.ExcludeFromPattern("connection/connection")
|
log.ExcludeFromPattern("connection/connection")
|
||||||
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
||||||
log.ExcludeFromPattern("event/eventmanager")
|
log.ExcludeFromPattern("event/eventmanager")
|
||||||
log.ExcludeFromPattern("pipeBridge")
|
|
||||||
log.ExcludeFromPattern("tapir")
|
log.ExcludeFromPattern("tapir")
|
||||||
os.Mkdir("tordir", 0700)
|
os.Mkdir("tordir", 0700)
|
||||||
dataDir := path.Join("tordir", "tor")
|
dataDir := path.Join("tordir", "tor")
|
||||||
|
@ -135,10 +133,6 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
os.Mkdir(cwtchDir, 0700)
|
os.Mkdir(cwtchDir, 0700)
|
||||||
os.RemoveAll(path.Join(cwtchDir, "testing"))
|
os.RemoveAll(path.Join(cwtchDir, "testing"))
|
||||||
os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
|
os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
|
||||||
bridgeClient := bridge.NewPipeBridgeClient(path.Join(cwtchDir, "testing/clientPipe"), path.Join(cwtchDir, "testing/servicePipe"))
|
|
||||||
bridgeService := bridge.NewPipeBridgeService(path.Join(cwtchDir, "testing/servicePipe"), path.Join(cwtchDir, "testing/clientPipe"))
|
|
||||||
appClient := app2.NewAppClient("./storage", bridgeClient)
|
|
||||||
appService := app2.NewAppService(acn, "./storage", bridgeService)
|
|
||||||
|
|
||||||
numGoRoutinesPostAppStart := runtime.NumGoroutine()
|
numGoRoutinesPostAppStart := runtime.NumGoroutine()
|
||||||
|
|
||||||
|
@ -151,7 +145,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
app.CreatePeer("bob", "asdfasdf")
|
app.CreatePeer("bob", "asdfasdf")
|
||||||
|
|
||||||
fmt.Println("Creating Carol...")
|
fmt.Println("Creating Carol...")
|
||||||
appClient.CreatePeer("carol", "asdfasdf")
|
app.CreatePeer("carol", "asdfasdf")
|
||||||
|
|
||||||
alice := utils.WaitGetPeer(app, "alice")
|
alice := utils.WaitGetPeer(app, "alice")
|
||||||
fmt.Println("Alice created:", alice.GetOnion())
|
fmt.Println("Alice created:", alice.GetOnion())
|
||||||
|
@ -163,13 +157,12 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
|
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
|
||||||
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||||
|
|
||||||
carol := utils.WaitGetPeer(appClient, "carol")
|
carol := utils.WaitGetPeer(app, "carol")
|
||||||
fmt.Println("Carol created:", carol.GetOnion())
|
fmt.Println("Carol created:", carol.GetOnion())
|
||||||
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
|
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
|
||||||
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||||
|
|
||||||
app.LaunchPeers()
|
app.LaunchPeers()
|
||||||
appClient.LaunchPeers()
|
|
||||||
|
|
||||||
waitTime := time.Duration(60) * time.Second
|
waitTime := time.Duration(60) * time.Second
|
||||||
t.Logf("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime)
|
t.Logf("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime)
|
||||||
|
@ -423,24 +416,14 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
numGoRoutinesPostBob := runtime.NumGoroutine()
|
numGoRoutinesPostBob := runtime.NumGoroutine()
|
||||||
|
|
||||||
fmt.Println("Shutting down Carol...")
|
fmt.Println("Shutting down Carol...")
|
||||||
appClient.ShutdownPeer(carol.GetOnion())
|
app.ShutdownPeer(carol.GetOnion())
|
||||||
time.Sleep(time.Second * 3)
|
time.Sleep(time.Second * 3)
|
||||||
numGoRoutinesPostCarol := runtime.NumGoroutine()
|
numGoRoutinesPostCarol := runtime.NumGoroutine()
|
||||||
|
|
||||||
fmt.Println("Shutting down apps...")
|
fmt.Println("Shutting down apps...")
|
||||||
fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine())
|
fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine())
|
||||||
app.Shutdown()
|
app.Shutdown()
|
||||||
fmt.Printf("appClientShutdown: %v\n", runtime.NumGoroutine())
|
|
||||||
appClient.Shutdown()
|
|
||||||
fmt.Printf("appServiceShutdown: %v\n", runtime.NumGoroutine())
|
|
||||||
appService.Shutdown()
|
|
||||||
|
|
||||||
fmt.Printf("bridgeClientShutdown: %v\n", runtime.NumGoroutine())
|
|
||||||
bridgeClient.Shutdown()
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
|
|
||||||
fmt.Printf("brideServiceShutdown: %v\n", runtime.NumGoroutine())
|
|
||||||
bridgeService.Shutdown()
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine())
|
fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine())
|
||||||
|
|
Loading…
Reference in New Issue