package bridge import ( "encoding/base64" "encoding/binary" "git.openprivacy.ca/openprivacy/libricochet-go/log" "cwtch.im/cwtch/event" "encoding/json" "os" "sync" "syscall" ) /* pipeBridge creates a pair of named pipes Needs a call to new client and service to fully successfully open */ type pipeBridge struct { in, out *os.File closedChan chan bool closed bool lock sync.Mutex } // NewPipeBridgeClient returns a pipe backed IPCBridge for a client func NewPipeBridgeClient(inFilename, outFilename string) (event.IPCBridge, error) { log.Debugf("Making new PipeBridge Client...\n") syscall.Mkfifo(inFilename, 0600) in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600) if err != nil { return nil, err } syscall.Mkfifo(outFilename, 0600) out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600) if err != nil { return nil, err } pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false} log.Debugf("Successfully created new PipeBridge Client!\n") return pb, nil } // NewPipeBridgeService returns a pipe backed IPCBridge for a service func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) { log.Debugf("Making new PipeBridge Service...\n") syscall.Mkfifo(outFilename, 0600) out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600) if err != nil { return nil, err } syscall.Mkfifo(inFilename, 0600) in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600) if err != nil { return nil, err } pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false} log.Debugf("Successfully created new PipeBridge Service!\n") return pb, nil } func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) { var n int size := make([]byte, 2) n, err := pb.in.Read(size) if err != nil || n != 2 { log.Errorf("Could not read len int from stream: %v\n", err) return message, false } n = int(binary.LittleEndian.Uint16(size)) pos := 0 buffer := make([]byte, n) for n > 0 { m, err := pb.in.Read(buffer[pos:]) if err != nil { log.Errorf("Reading into buffer from pipe: %v\n", err) return message, false } n -= m pos += m } err = json.Unmarshal(buffer, &message) if err != nil { log.Errorf("Read error: %v --value: %v", err, message) return event.IPCMessage{}, false } for k, v := range message.Message.Data { val, _ := base64.StdEncoding.DecodeString(v) message.Message.Data[k] = string(val) } return message, true } func (pb *pipeBridge) Write(message *event.IPCMessage) { pb.lock.Lock() defer pb.lock.Unlock() if !pb.closed { encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}} for k, v := range message.Message.Data { encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v)) } messageJSON, _ := json.Marshal(encMessage) size := make([]byte, 2) binary.LittleEndian.PutUint16(size, uint16(len(messageJSON))) pb.out.Write(size) for pos := 0; pos < len(messageJSON); { n, err := pb.out.Write(messageJSON) if err != nil { log.Errorf("Writing out on pipeBridge: %v\n", err) return } pos += n } } } func (pb *pipeBridge) Shutdown() { pb.lock.Lock() defer pb.lock.Unlock() if !pb.closed { pb.in.Close() pb.out.Close() pb.closed = true } }