Merge branch 'ipcConnManager' of dan/cwtch into master
This commit is contained in:
commit
ae3c3ace4b
53
app/app.go
53
app/app.go
|
@ -8,6 +8,7 @@ import (
|
||||||
"cwtch.im/cwtch/storage"
|
"cwtch.im/cwtch/storage"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
|
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
|
@ -24,14 +25,18 @@ type applicationCore struct {
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type applicationPeers struct {
|
type appletPeers struct {
|
||||||
peers map[string]peer.CwtchPeer
|
peers map[string]peer.CwtchPeer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type appletACN struct {
|
||||||
|
acn connectivity.ACN
|
||||||
|
}
|
||||||
|
|
||||||
type application struct {
|
type application struct {
|
||||||
applicationCore
|
applicationCore
|
||||||
applicationPeers
|
appletPeers
|
||||||
acn connectivity.ACN
|
appletACN
|
||||||
storage map[string]storage.ProfileStore
|
storage map[string]storage.ProfileStore
|
||||||
engines map[string]connections.Engine
|
engines map[string]connections.Engine
|
||||||
appBus event.Manager
|
appBus event.Manager
|
||||||
|
@ -62,10 +67,32 @@ func newAppCore(appDirectory string) *applicationCore {
|
||||||
return appCore
|
return appCore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ap *appletPeers) init() {
|
||||||
|
ap.peers = make(map[string]peer.CwtchPeer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) {
|
||||||
|
a.acn = acn
|
||||||
|
acn.SetStatusCallback(publish)
|
||||||
|
prog, status := acn.GetBootstrapStatus()
|
||||||
|
publish(prog, status)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appletACN) Shutdown() {
|
||||||
|
a.acn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||||
func NewApp(acn connectivity.ACN, appDirectory string) Application {
|
func NewApp(acn connectivity.ACN, appDirectory string) Application {
|
||||||
log.Debugf("NewApp(%v)\n", appDirectory)
|
log.Debugf("NewApp(%v)\n", appDirectory)
|
||||||
app := &application{acn: acn, storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
|
app := &application{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
|
||||||
|
app.appletPeers.init()
|
||||||
|
|
||||||
|
fn := func(progress int, status string) {
|
||||||
|
progStr := strconv.Itoa(progress)
|
||||||
|
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
|
||||||
|
}
|
||||||
|
app.appletACN.init(acn, fn)
|
||||||
return app
|
return app
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,6 +178,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi
|
||||||
|
|
||||||
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
||||||
func (app *application) LoadProfiles(password string) {
|
func (app *application) LoadProfiles(password string) {
|
||||||
|
count := 0
|
||||||
app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
||||||
peer := peer.FromProfile(profile)
|
peer := peer.FromProfile(profile)
|
||||||
peer.Init(app.eventBuses[profile.Onion])
|
peer.Init(app.eventBuses[profile.Onion])
|
||||||
|
@ -164,7 +192,12 @@ func (app *application) LoadProfiles(password string) {
|
||||||
app.engines[profile.Onion] = engine
|
app.engines[profile.Onion] = engine
|
||||||
app.mutex.Unlock()
|
app.mutex.Unlock()
|
||||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
|
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
|
||||||
|
count++
|
||||||
})
|
})
|
||||||
|
if count == 0 {
|
||||||
|
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
|
||||||
|
app.appBus.Publish(message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
|
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
|
||||||
|
@ -173,8 +206,8 @@ func (app *application) GetPrimaryBus() event.Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// LaunchPeers starts each peer Listening and connecting to peers and groups
|
// LaunchPeers starts each peer Listening and connecting to peers and groups
|
||||||
func (appPeers *applicationPeers) LaunchPeers() {
|
func (ap *appletPeers) LaunchPeers() {
|
||||||
for _, p := range appPeers.peers {
|
for _, p := range ap.peers {
|
||||||
if !p.IsStarted() {
|
if !p.IsStarted() {
|
||||||
p.Listen()
|
p.Listen()
|
||||||
p.StartPeersConnections()
|
p.StartPeersConnections()
|
||||||
|
@ -184,17 +217,17 @@ func (appPeers *applicationPeers) LaunchPeers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListPeers returns a map of onions to their profile's Name
|
// ListPeers returns a map of onions to their profile's Name
|
||||||
func (appPeers *applicationPeers) ListPeers() map[string]string {
|
func (ap *appletPeers) ListPeers() map[string]string {
|
||||||
keys := map[string]string{}
|
keys := map[string]string{}
|
||||||
for k, p := range appPeers.peers {
|
for k, p := range ap.peers {
|
||||||
keys[k] = p.GetProfile().Name
|
keys[k] = p.GetProfile().Name
|
||||||
}
|
}
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPeer returns a cwtchPeer for a given onion address
|
// GetPeer returns a cwtchPeer for a given onion address
|
||||||
func (appPeers *applicationPeers) GetPeer(onion string) peer.CwtchPeer {
|
func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer {
|
||||||
if peer, ok := appPeers.peers[onion]; ok {
|
if peer, ok := ap.peers[onion]; ok {
|
||||||
return peer
|
return peer
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -14,7 +14,7 @@ func (ab *applicationBridge) listen() {
|
||||||
log.Infoln("ab.listen()")
|
log.Infoln("ab.listen()")
|
||||||
for {
|
for {
|
||||||
ipcMessage, ok := ab.bridge.Read()
|
ipcMessage, ok := ab.bridge.Read()
|
||||||
log.Infof("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest)
|
log.Debugf("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,14 +12,14 @@ import (
|
||||||
|
|
||||||
type applicationClient struct {
|
type applicationClient struct {
|
||||||
applicationBridge
|
applicationBridge
|
||||||
applicationPeers
|
appletPeers
|
||||||
|
|
||||||
appBus event.Manager
|
appBus event.Manager
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied
|
// NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied
|
||||||
func NewAppClient(appDirectory string, bridge event.IPCBridge) Application {
|
func NewAppClient(appDirectory string, bridge event.IPCBridge) Application {
|
||||||
appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()}
|
appClient := &applicationClient{appletPeers: appletPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()}
|
||||||
appClient.handle = appClient.handleEvent
|
appClient.handle = appClient.handleEvent
|
||||||
|
|
||||||
go appClient.listen()
|
go appClient.listen()
|
||||||
|
@ -43,6 +43,8 @@ func (ac *applicationClient) handleEvent(ev *event.Event) {
|
||||||
ac.appBus.Publish(*ev)
|
ac.appBus.Publish(*ev)
|
||||||
case event.AppError:
|
case event.AppError:
|
||||||
ac.appBus.Publish(*ev)
|
ac.appBus.Publish(*ev)
|
||||||
|
case event.ACNStatus:
|
||||||
|
ac.appBus.Publish(*ev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
|
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
"path"
|
"path"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -18,8 +19,8 @@ const (
|
||||||
|
|
||||||
type applicationService struct {
|
type applicationService struct {
|
||||||
applicationBridge
|
applicationBridge
|
||||||
|
appletACN
|
||||||
|
|
||||||
acn connectivity.ACN
|
|
||||||
storage map[string]storage.ProfileStore
|
storage map[string]storage.ProfileStore
|
||||||
engines map[string]connections.Engine
|
engines map[string]connections.Engine
|
||||||
}
|
}
|
||||||
|
@ -31,7 +32,12 @@ type ApplicationService interface {
|
||||||
|
|
||||||
// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge
|
// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge
|
||||||
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
|
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
|
||||||
appService := &applicationService{acn: acn, storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}
|
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}
|
||||||
|
fn := func(progress int, status string) {
|
||||||
|
progStr := strconv.Itoa(progress)
|
||||||
|
appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
|
||||||
|
}
|
||||||
|
appService.appletACN.init(acn, fn)
|
||||||
appService.handle = appService.handleEvent
|
appService.handle = appService.handleEvent
|
||||||
|
|
||||||
go appService.listen()
|
go appService.listen()
|
||||||
|
@ -96,7 +102,6 @@ func (as *applicationService) loadProfiles(password string) {
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)}
|
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)}
|
||||||
as.bridge.Write(&message)
|
as.bridge.Write(&message)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,9 +34,9 @@ func monitor(a, b *goChanBridge) {
|
||||||
a.closedChan <- true
|
a.closedChan <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *goChanBridge) Read() (message event.IPCMessage, ok bool) {
|
func (pb *goChanBridge) Read() (*event.IPCMessage, bool) {
|
||||||
message, ok = <-pb.in
|
message, ok := <-pb.in
|
||||||
return
|
return &message, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *goChanBridge) Write(message *event.IPCMessage) {
|
func (pb *goChanBridge) Write(message *event.IPCMessage) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package bridge
|
package bridge
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"cwtch.im/cwtch/protocol/connections"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
|
@ -16,119 +17,207 @@ import (
|
||||||
Needs a call to new client and service to fully successfully open
|
Needs a call to new client and service to fully successfully open
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
const maxBufferSize = 1000
|
||||||
|
|
||||||
type pipeBridge struct {
|
type pipeBridge struct {
|
||||||
in, out *os.File
|
infile, outfile string
|
||||||
closedChan chan bool
|
in, out *os.File
|
||||||
closed bool
|
read, write chan event.IPCMessage
|
||||||
lock sync.Mutex
|
closedChan chan bool
|
||||||
|
state connections.ConnectionState
|
||||||
|
lock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPipeBridge(inFilename, outFilename string) *pipeBridge {
|
||||||
|
syscall.Mkfifo(inFilename, 0600)
|
||||||
|
syscall.Mkfifo(outFilename, 0600)
|
||||||
|
pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED}
|
||||||
|
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
||||||
|
pb.write = make(chan event.IPCMessage, maxBufferSize)
|
||||||
|
return pb
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPipeBridgeClient returns a pipe backed IPCBridge for a client
|
// NewPipeBridgeClient returns a pipe backed IPCBridge for a client
|
||||||
func NewPipeBridgeClient(inFilename, outFilename string) (event.IPCBridge, error) {
|
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
|
||||||
log.Debugf("Making new PipeBridge Client...\n")
|
log.Debugf("Making new PipeBridge Client...\n")
|
||||||
syscall.Mkfifo(inFilename, 0600)
|
pb := newPipeBridge(inFilename, outFilename)
|
||||||
in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600)
|
go pb.clientConnectionManager()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
syscall.Mkfifo(outFilename, 0600)
|
return pb
|
||||||
out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false}
|
|
||||||
log.Debugf("Successfully created new PipeBridge Client!\n")
|
|
||||||
return pb, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPipeBridgeService returns a pipe backed IPCBridge for a service
|
// NewPipeBridgeService returns a pipe backed IPCBridge for a service
|
||||||
func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) {
|
func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) {
|
||||||
log.Debugf("Making new PipeBridge Service...\n")
|
log.Debugf("Making new PipeBridge Service...\n")
|
||||||
|
pb := newPipeBridge(inFilename, outFilename)
|
||||||
|
|
||||||
syscall.Mkfifo(outFilename, 0600)
|
go pb.serviceConnectionManager()
|
||||||
out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
syscall.Mkfifo(inFilename, 0600)
|
|
||||||
in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false}
|
|
||||||
log.Debugf("Successfully created new PipeBridge Service!\n")
|
log.Debugf("Successfully created new PipeBridge Service!\n")
|
||||||
return pb, nil
|
return pb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) {
|
func (pb *pipeBridge) clientConnectionManager() {
|
||||||
|
for pb.state != connections.KILLED {
|
||||||
|
pb.state = connections.CONNECTING
|
||||||
|
|
||||||
|
var err error
|
||||||
|
pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
|
||||||
|
if err != nil {
|
||||||
|
pb.state = connections.DISCONNECTED
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
|
||||||
|
if err != nil {
|
||||||
|
pb.state = connections.DISCONNECTED
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Successfully connected PipeBridge Client!\n")
|
||||||
|
|
||||||
|
pb.handleConns()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) serviceConnectionManager() {
|
||||||
|
for pb.state != connections.KILLED {
|
||||||
|
pb.state = connections.CONNECTING
|
||||||
|
|
||||||
|
var err error
|
||||||
|
pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
|
||||||
|
if err != nil {
|
||||||
|
pb.state = connections.DISCONNECTED
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
|
||||||
|
if err != nil {
|
||||||
|
pb.state = connections.DISCONNECTED
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Successfully connected PipeBridge Service!\n")
|
||||||
|
|
||||||
|
pb.handleConns()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) handleConns() {
|
||||||
|
|
||||||
|
// auth?
|
||||||
|
pb.state = connections.AUTHENTICATED
|
||||||
|
|
||||||
|
pb.closedChan = make(chan bool, 5)
|
||||||
|
|
||||||
|
log.Debugf("handleConns authed, 2xgo\n")
|
||||||
|
|
||||||
|
go pb.handleRead()
|
||||||
|
go pb.handleWrite()
|
||||||
|
|
||||||
|
<-pb.closedChan
|
||||||
|
log.Debugf("handleConns CLOSEDCHAN!!!!\n")
|
||||||
|
if pb.state != connections.KILLED {
|
||||||
|
pb.state = connections.FAILED
|
||||||
|
}
|
||||||
|
pb.in.Close()
|
||||||
|
pb.out.Close()
|
||||||
|
close(pb.write)
|
||||||
|
close(pb.read)
|
||||||
|
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
||||||
|
pb.write = make(chan event.IPCMessage, maxBufferSize)
|
||||||
|
log.Debugf("handleConns done, exit\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) handleWrite() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case message := <-pb.write:
|
||||||
|
log.Debugf("handleWrite <- message: %v\n", message)
|
||||||
|
if pb.state == connections.AUTHENTICATED {
|
||||||
|
encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
|
||||||
|
for k, v := range message.Message.Data {
|
||||||
|
encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
messageJSON, _ := json.Marshal(encMessage)
|
||||||
|
size := make([]byte, 2)
|
||||||
|
binary.LittleEndian.PutUint16(size, uint16(len(messageJSON)))
|
||||||
|
pb.out.Write(size)
|
||||||
|
|
||||||
|
for pos := 0; pos < len(messageJSON); {
|
||||||
|
n, err := pb.out.Write(messageJSON)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Writing out on pipeBridge: %v\n", err)
|
||||||
|
pb.closedChan <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pos += n
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) handleRead() {
|
||||||
var n int
|
var n int
|
||||||
size := make([]byte, 2)
|
size := make([]byte, 2)
|
||||||
n, err := pb.in.Read(size)
|
var err error
|
||||||
if err != nil || n != 2 {
|
for {
|
||||||
log.Errorf("Could not read len int from stream: %v\n", err)
|
log.Debugf("Waiting to handleRead()...\n")
|
||||||
return message, false
|
n, err = pb.in.Read(size)
|
||||||
}
|
if err != nil || n != 2 {
|
||||||
|
log.Errorf("Could not read len int from stream: %v\n", err)
|
||||||
n = int(binary.LittleEndian.Uint16(size))
|
pb.closedChan <- true
|
||||||
pos := 0
|
return
|
||||||
buffer := make([]byte, n)
|
|
||||||
for n > 0 {
|
|
||||||
m, err := pb.in.Read(buffer[pos:])
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Reading into buffer from pipe: %v\n", err)
|
|
||||||
return message, false
|
|
||||||
}
|
}
|
||||||
n -= m
|
|
||||||
pos += m
|
|
||||||
}
|
|
||||||
|
|
||||||
err = json.Unmarshal(buffer, &message)
|
n = int(binary.LittleEndian.Uint16(size))
|
||||||
if err != nil {
|
pos := 0
|
||||||
log.Errorf("Read error: %v --value: %v", err, message)
|
buffer := make([]byte, n)
|
||||||
return event.IPCMessage{}, false
|
for n > 0 {
|
||||||
|
m, err := pb.in.Read(buffer[pos:])
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Reading into buffer from pipe: %v\n", err)
|
||||||
|
pb.closedChan <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n -= m
|
||||||
|
pos += m
|
||||||
|
}
|
||||||
|
|
||||||
|
var message event.IPCMessage
|
||||||
|
err = json.Unmarshal(buffer, &message)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Read error: %v --value: %v", err, message)
|
||||||
|
continue // signal error?
|
||||||
|
}
|
||||||
|
for k, v := range message.Message.Data {
|
||||||
|
val, _ := base64.StdEncoding.DecodeString(v)
|
||||||
|
message.Message.Data[k] = string(val)
|
||||||
|
}
|
||||||
|
log.Debugf("handleRead read<-: %v\n", message)
|
||||||
|
pb.read <- message
|
||||||
|
log.Debugf("handleRead wrote\n")
|
||||||
}
|
}
|
||||||
for k, v := range message.Message.Data {
|
}
|
||||||
val, _ := base64.StdEncoding.DecodeString(v)
|
|
||||||
message.Message.Data[k] = string(val)
|
func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
|
||||||
}
|
log.Debugf("Read()...\n")
|
||||||
return message, true
|
message := <-pb.read
|
||||||
|
log.Debugf("Read: %v\n", message)
|
||||||
|
return &message, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *pipeBridge) Write(message *event.IPCMessage) {
|
func (pb *pipeBridge) Write(message *event.IPCMessage) {
|
||||||
pb.lock.Lock()
|
log.Debugf("Write: %v\n", message)
|
||||||
defer pb.lock.Unlock()
|
pb.write <- *message
|
||||||
if !pb.closed {
|
log.Debugf("Wrote\n")
|
||||||
encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
|
|
||||||
for k, v := range message.Message.Data {
|
|
||||||
encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v))
|
|
||||||
}
|
|
||||||
|
|
||||||
messageJSON, _ := json.Marshal(encMessage)
|
|
||||||
size := make([]byte, 2)
|
|
||||||
binary.LittleEndian.PutUint16(size, uint16(len(messageJSON)))
|
|
||||||
pb.out.Write(size)
|
|
||||||
|
|
||||||
for pos := 0; pos < len(messageJSON); {
|
|
||||||
n, err := pb.out.Write(messageJSON)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Writing out on pipeBridge: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pos += n
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *pipeBridge) Shutdown() {
|
func (pb *pipeBridge) Shutdown() {
|
||||||
pb.lock.Lock()
|
pb.state = connections.KILLED
|
||||||
defer pb.lock.Unlock()
|
pb.closedChan <- true
|
||||||
if !pb.closed {
|
|
||||||
pb.in.Close()
|
|
||||||
pb.out.Close()
|
|
||||||
pb.closed = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,12 +11,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
|
func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
|
||||||
client, err := NewPipeBridgeClient(in, out)
|
client := NewPipeBridgeClient(in, out)
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Error opening %v pipe: %v", in, err)
|
|
||||||
done <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
messageAfter, ok := client.Read()
|
messageAfter, ok := client.Read()
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -115,6 +115,9 @@ const (
|
||||||
|
|
||||||
// Error(err)
|
// Error(err)
|
||||||
AppError = Type("AppError")
|
AppError = Type("AppError")
|
||||||
|
|
||||||
|
// Progress, Status
|
||||||
|
ACNStatus = Type("ACNStatus")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Field defines common event attributes
|
// Field defines common event attributes
|
||||||
|
@ -144,6 +147,9 @@ const (
|
||||||
Data = Field("Data")
|
Data = Field("Data")
|
||||||
|
|
||||||
Error = Field("Error")
|
Error = Field("Error")
|
||||||
|
|
||||||
|
Progreess = Field("Progress")
|
||||||
|
Status = Field("Status")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Defining Common errors
|
// Defining Common errors
|
||||||
|
|
|
@ -8,7 +8,7 @@ type IPCMessage struct {
|
||||||
|
|
||||||
// IPCBridge is an interface to a IPC construct used to communicate IPCMessages
|
// IPCBridge is an interface to a IPC construct used to communicate IPCMessages
|
||||||
type IPCBridge interface {
|
type IPCBridge interface {
|
||||||
Read() (IPCMessage, bool)
|
Read() (*IPCMessage, bool)
|
||||||
Write(message *IPCMessage)
|
Write(message *IPCMessage)
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -1,7 +1,7 @@
|
||||||
module cwtch.im/cwtch
|
module cwtch.im/cwtch
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.openprivacy.ca/openprivacy/libricochet-go v1.0.3
|
git.openprivacy.ca/openprivacy/libricochet-go v1.0.4
|
||||||
github.com/c-bata/go-prompt v0.2.3
|
github.com/c-bata/go-prompt v0.2.3
|
||||||
github.com/golang/protobuf v1.2.0
|
github.com/golang/protobuf v1.2.0
|
||||||
github.com/mattn/go-colorable v0.0.9 // indirect
|
github.com/mattn/go-colorable v0.0.9 // indirect
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -2,6 +2,7 @@ git.openprivacy.ca/openprivacy/libricochet-go v1.0.2 h1:U5tufewB3O0L2EKjUyHXxJkv
|
||||||
git.openprivacy.ca/openprivacy/libricochet-go v1.0.2/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
|
git.openprivacy.ca/openprivacy/libricochet-go v1.0.2/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
|
||||||
git.openprivacy.ca/openprivacy/libricochet-go v1.0.3 h1:LHnhK9hzkqMY+iEE3TZ0FjZsYal05YDiamKmxDOXuts=
|
git.openprivacy.ca/openprivacy/libricochet-go v1.0.3 h1:LHnhK9hzkqMY+iEE3TZ0FjZsYal05YDiamKmxDOXuts=
|
||||||
git.openprivacy.ca/openprivacy/libricochet-go v1.0.3/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
|
git.openprivacy.ca/openprivacy/libricochet-go v1.0.3/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
|
||||||
|
git.openprivacy.ca/openprivacy/libricochet-go v1.0.4/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
|
||||||
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI=
|
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI=
|
||||||
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
|
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
|
||||||
github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s=
|
github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s=
|
||||||
|
|
Loading…
Reference in New Issue