diff --git a/app/app.go b/app/app.go index e4099ba..dd02147 100644 --- a/app/app.go +++ b/app/app.go @@ -8,6 +8,7 @@ import ( "cwtch.im/cwtch/storage" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/identity" + "strconv" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -24,14 +25,18 @@ type applicationCore struct { mutex sync.Mutex } -type applicationPeers struct { +type appletPeers struct { peers map[string]peer.CwtchPeer } +type appletACN struct { + acn connectivity.ACN +} + type application struct { applicationCore - applicationPeers - acn connectivity.ACN + appletPeers + appletACN storage map[string]storage.ProfileStore engines map[string]connections.Engine appBus event.Manager @@ -62,10 +67,32 @@ func newAppCore(appDirectory string) *applicationCore { return appCore } +func (ap *appletPeers) init() { + ap.peers = make(map[string]peer.CwtchPeer) +} + +func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) { + a.acn = acn + acn.SetStatusCallback(publish) + prog, status := acn.GetBootstrapStatus() + publish(prog, status) +} + +func (a *appletACN) Shutdown() { + a.acn.Close() +} + // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) - app := &application{acn: acn, storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} + app := &application{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} + app.appletPeers.init() + + fn := func(progress int, status string) { + progStr := strconv.Itoa(progress) + app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) + } + app.appletACN.init(acn, fn) return app } @@ -151,6 +178,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them func (app *application) LoadProfiles(password string) { + count := 0 app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { peer := peer.FromProfile(profile) peer.Init(app.eventBuses[profile.Onion]) @@ -164,7 +192,12 @@ func (app *application) LoadProfiles(password string) { app.engines[profile.Onion] = engine app.mutex.Unlock() app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) + count++ }) + if count == 0 { + message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) + app.appBus.Publish(message) + } } // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific @@ -173,8 +206,8 @@ func (app *application) GetPrimaryBus() event.Manager { } // LaunchPeers starts each peer Listening and connecting to peers and groups -func (appPeers *applicationPeers) LaunchPeers() { - for _, p := range appPeers.peers { +func (ap *appletPeers) LaunchPeers() { + for _, p := range ap.peers { if !p.IsStarted() { p.Listen() p.StartPeersConnections() @@ -184,17 +217,17 @@ func (appPeers *applicationPeers) LaunchPeers() { } // ListPeers returns a map of onions to their profile's Name -func (appPeers *applicationPeers) ListPeers() map[string]string { +func (ap *appletPeers) ListPeers() map[string]string { keys := map[string]string{} - for k, p := range appPeers.peers { + for k, p := range ap.peers { keys[k] = p.GetProfile().Name } return keys } // GetPeer returns a cwtchPeer for a given onion address -func (appPeers *applicationPeers) GetPeer(onion string) peer.CwtchPeer { - if peer, ok := appPeers.peers[onion]; ok { +func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer { + if peer, ok := ap.peers[onion]; ok { return peer } return nil diff --git a/app/appBridge.go b/app/appBridge.go index 4ba05b1..6a68945 100644 --- a/app/appBridge.go +++ b/app/appBridge.go @@ -14,7 +14,7 @@ func (ab *applicationBridge) listen() { log.Infoln("ab.listen()") for { ipcMessage, ok := ab.bridge.Read() - log.Infof("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest) + log.Debugf("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest) if !ok { return } diff --git a/app/appClient.go b/app/appClient.go index 5390480..9b0da8c 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -12,14 +12,14 @@ import ( type applicationClient struct { applicationBridge - applicationPeers + appletPeers appBus event.Manager } // NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied func NewAppClient(appDirectory string, bridge event.IPCBridge) Application { - appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()} + appClient := &applicationClient{appletPeers: appletPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()} appClient.handle = appClient.handleEvent go appClient.listen() @@ -43,6 +43,8 @@ func (ac *applicationClient) handleEvent(ev *event.Event) { ac.appBus.Publish(*ev) case event.AppError: ac.appBus.Publish(*ev) + case event.ACNStatus: + ac.appBus.Publish(*ev) } } diff --git a/app/appService.go b/app/appService.go index e6be603..13b1028 100644 --- a/app/appService.go +++ b/app/appService.go @@ -9,6 +9,7 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "path" + "strconv" ) const ( @@ -18,8 +19,8 @@ const ( type applicationService struct { applicationBridge + appletACN - acn connectivity.ACN storage map[string]storage.ProfileStore engines map[string]connections.Engine } @@ -31,7 +32,12 @@ type ApplicationService interface { // NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService { - appService := &applicationService{acn: acn, storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}} + appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}} + fn := func(progress int, status string) { + progStr := strconv.Itoa(progress) + appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)}) + } + appService.appletACN.init(acn, fn) appService.handle = appService.handleEvent go appService.listen() @@ -96,7 +102,6 @@ func (as *applicationService) loadProfiles(password string) { if count == 0 { message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)} as.bridge.Write(&message) - } } diff --git a/event/bridge/goChanBridge.go b/event/bridge/goChanBridge.go index 7745092..ce9f67e 100644 --- a/event/bridge/goChanBridge.go +++ b/event/bridge/goChanBridge.go @@ -34,9 +34,9 @@ func monitor(a, b *goChanBridge) { a.closedChan <- true } -func (pb *goChanBridge) Read() (message event.IPCMessage, ok bool) { - message, ok = <-pb.in - return +func (pb *goChanBridge) Read() (*event.IPCMessage, bool) { + message, ok := <-pb.in + return &message, ok } func (pb *goChanBridge) Write(message *event.IPCMessage) { diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go index 63c3ea9..6db91e8 100644 --- a/event/bridge/pipeBridge.go +++ b/event/bridge/pipeBridge.go @@ -1,6 +1,7 @@ package bridge import ( + "cwtch.im/cwtch/protocol/connections" "encoding/base64" "encoding/binary" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -16,119 +17,207 @@ import ( Needs a call to new client and service to fully successfully open */ +const maxBufferSize = 1000 + type pipeBridge struct { - in, out *os.File - closedChan chan bool - closed bool - lock sync.Mutex + infile, outfile string + in, out *os.File + read, write chan event.IPCMessage + closedChan chan bool + state connections.ConnectionState + lock sync.Mutex +} + +func newPipeBridge(inFilename, outFilename string) *pipeBridge { + syscall.Mkfifo(inFilename, 0600) + syscall.Mkfifo(outFilename, 0600) + pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED} + pb.read = make(chan event.IPCMessage, maxBufferSize) + pb.write = make(chan event.IPCMessage, maxBufferSize) + return pb } // NewPipeBridgeClient returns a pipe backed IPCBridge for a client -func NewPipeBridgeClient(inFilename, outFilename string) (event.IPCBridge, error) { +func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge { log.Debugf("Making new PipeBridge Client...\n") - syscall.Mkfifo(inFilename, 0600) - in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600) - if err != nil { - return nil, err - } + pb := newPipeBridge(inFilename, outFilename) + go pb.clientConnectionManager() - syscall.Mkfifo(outFilename, 0600) - out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600) - if err != nil { - return nil, err - } - - pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false} - log.Debugf("Successfully created new PipeBridge Client!\n") - return pb, nil + return pb } // NewPipeBridgeService returns a pipe backed IPCBridge for a service func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) { log.Debugf("Making new PipeBridge Service...\n") + pb := newPipeBridge(inFilename, outFilename) - syscall.Mkfifo(outFilename, 0600) - out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600) - if err != nil { - return nil, err - } + go pb.serviceConnectionManager() - syscall.Mkfifo(inFilename, 0600) - in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600) - if err != nil { - return nil, err - } - - pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false} log.Debugf("Successfully created new PipeBridge Service!\n") return pb, nil } -func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) { +func (pb *pipeBridge) clientConnectionManager() { + for pb.state != connections.KILLED { + pb.state = connections.CONNECTING + + var err error + pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600) + if err != nil { + pb.state = connections.DISCONNECTED + continue + } + + pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600) + if err != nil { + pb.state = connections.DISCONNECTED + continue + } + + log.Debugf("Successfully connected PipeBridge Client!\n") + + pb.handleConns() + } +} + +func (pb *pipeBridge) serviceConnectionManager() { + for pb.state != connections.KILLED { + pb.state = connections.CONNECTING + + var err error + pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600) + if err != nil { + pb.state = connections.DISCONNECTED + continue + } + + pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600) + if err != nil { + pb.state = connections.DISCONNECTED + continue + } + + log.Debugf("Successfully connected PipeBridge Service!\n") + + pb.handleConns() + } +} + +func (pb *pipeBridge) handleConns() { + + // auth? + pb.state = connections.AUTHENTICATED + + pb.closedChan = make(chan bool, 5) + + log.Debugf("handleConns authed, 2xgo\n") + + go pb.handleRead() + go pb.handleWrite() + + <-pb.closedChan + log.Debugf("handleConns CLOSEDCHAN!!!!\n") + if pb.state != connections.KILLED { + pb.state = connections.FAILED + } + pb.in.Close() + pb.out.Close() + close(pb.write) + close(pb.read) + pb.read = make(chan event.IPCMessage, maxBufferSize) + pb.write = make(chan event.IPCMessage, maxBufferSize) + log.Debugf("handleConns done, exit\n") +} + +func (pb *pipeBridge) handleWrite() { + for { + select { + case message := <-pb.write: + log.Debugf("handleWrite <- message: %v\n", message) + if pb.state == connections.AUTHENTICATED { + encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}} + for k, v := range message.Message.Data { + encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v)) + } + + messageJSON, _ := json.Marshal(encMessage) + size := make([]byte, 2) + binary.LittleEndian.PutUint16(size, uint16(len(messageJSON))) + pb.out.Write(size) + + for pos := 0; pos < len(messageJSON); { + n, err := pb.out.Write(messageJSON) + if err != nil { + log.Errorf("Writing out on pipeBridge: %v\n", err) + pb.closedChan <- true + return + } + pos += n + } + } else { + return + } + } + } +} + +func (pb *pipeBridge) handleRead() { var n int size := make([]byte, 2) - n, err := pb.in.Read(size) - if err != nil || n != 2 { - log.Errorf("Could not read len int from stream: %v\n", err) - return message, false - } - - n = int(binary.LittleEndian.Uint16(size)) - pos := 0 - buffer := make([]byte, n) - for n > 0 { - m, err := pb.in.Read(buffer[pos:]) - if err != nil { - log.Errorf("Reading into buffer from pipe: %v\n", err) - return message, false + var err error + for { + log.Debugf("Waiting to handleRead()...\n") + n, err = pb.in.Read(size) + if err != nil || n != 2 { + log.Errorf("Could not read len int from stream: %v\n", err) + pb.closedChan <- true + return } - n -= m - pos += m - } - err = json.Unmarshal(buffer, &message) - if err != nil { - log.Errorf("Read error: %v --value: %v", err, message) - return event.IPCMessage{}, false + n = int(binary.LittleEndian.Uint16(size)) + pos := 0 + buffer := make([]byte, n) + for n > 0 { + m, err := pb.in.Read(buffer[pos:]) + if err != nil { + log.Errorf("Reading into buffer from pipe: %v\n", err) + pb.closedChan <- true + return + } + n -= m + pos += m + } + + var message event.IPCMessage + err = json.Unmarshal(buffer, &message) + if err != nil { + log.Errorf("Read error: %v --value: %v", err, message) + continue // signal error? + } + for k, v := range message.Message.Data { + val, _ := base64.StdEncoding.DecodeString(v) + message.Message.Data[k] = string(val) + } + log.Debugf("handleRead read<-: %v\n", message) + pb.read <- message + log.Debugf("handleRead wrote\n") } - for k, v := range message.Message.Data { - val, _ := base64.StdEncoding.DecodeString(v) - message.Message.Data[k] = string(val) - } - return message, true +} + +func (pb *pipeBridge) Read() (*event.IPCMessage, bool) { + log.Debugf("Read()...\n") + message := <-pb.read + log.Debugf("Read: %v\n", message) + return &message, true } func (pb *pipeBridge) Write(message *event.IPCMessage) { - pb.lock.Lock() - defer pb.lock.Unlock() - if !pb.closed { - encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}} - for k, v := range message.Message.Data { - encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v)) - } - - messageJSON, _ := json.Marshal(encMessage) - size := make([]byte, 2) - binary.LittleEndian.PutUint16(size, uint16(len(messageJSON))) - pb.out.Write(size) - - for pos := 0; pos < len(messageJSON); { - n, err := pb.out.Write(messageJSON) - if err != nil { - log.Errorf("Writing out on pipeBridge: %v\n", err) - return - } - pos += n - } - } + log.Debugf("Write: %v\n", message) + pb.write <- *message + log.Debugf("Wrote\n") } func (pb *pipeBridge) Shutdown() { - pb.lock.Lock() - defer pb.lock.Unlock() - if !pb.closed { - pb.in.Close() - pb.out.Close() - pb.closed = true - } + pb.state = connections.KILLED + pb.closedChan <- true } diff --git a/event/bridge/pipeBridge_test.go b/event/bridge/pipeBridge_test.go index 1f42274..6317196 100644 --- a/event/bridge/pipeBridge_test.go +++ b/event/bridge/pipeBridge_test.go @@ -11,12 +11,7 @@ var ( ) func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) { - client, err := NewPipeBridgeClient(in, out) - if err != nil { - t.Errorf("Error opening %v pipe: %v", in, err) - done <- true - return - } + client := NewPipeBridgeClient(in, out) messageAfter, ok := client.Read() if !ok { diff --git a/event/common.go b/event/common.go index 55d8331..2c9f275 100644 --- a/event/common.go +++ b/event/common.go @@ -115,6 +115,9 @@ const ( // Error(err) AppError = Type("AppError") + + // Progress, Status + ACNStatus = Type("ACNStatus") ) // Field defines common event attributes @@ -144,6 +147,9 @@ const ( Data = Field("Data") Error = Field("Error") + + Progreess = Field("Progress") + Status = Field("Status") ) // Defining Common errors diff --git a/event/ipc.go b/event/ipc.go index 57244bb..8abae3e 100644 --- a/event/ipc.go +++ b/event/ipc.go @@ -8,7 +8,7 @@ type IPCMessage struct { // IPCBridge is an interface to a IPC construct used to communicate IPCMessages type IPCBridge interface { - Read() (IPCMessage, bool) + Read() (*IPCMessage, bool) Write(message *IPCMessage) Shutdown() } diff --git a/go.mod b/go.mod index 76421a7..50f7de3 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module cwtch.im/cwtch require ( - git.openprivacy.ca/openprivacy/libricochet-go v1.0.3 + git.openprivacy.ca/openprivacy/libricochet-go v1.0.4 github.com/c-bata/go-prompt v0.2.3 github.com/golang/protobuf v1.2.0 github.com/mattn/go-colorable v0.0.9 // indirect diff --git a/go.sum b/go.sum index 57b7a85..0d0cd82 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ git.openprivacy.ca/openprivacy/libricochet-go v1.0.2 h1:U5tufewB3O0L2EKjUyHXxJkv git.openprivacy.ca/openprivacy/libricochet-go v1.0.2/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= git.openprivacy.ca/openprivacy/libricochet-go v1.0.3 h1:LHnhK9hzkqMY+iEE3TZ0FjZsYal05YDiamKmxDOXuts= git.openprivacy.ca/openprivacy/libricochet-go v1.0.3/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= +git.openprivacy.ca/openprivacy/libricochet-go v1.0.4/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s=