cwtch/event/bridge/pipeBridge.go

224 lines
5.4 KiB
Go
Raw Normal View History

package bridge
import (
"cwtch.im/cwtch/protocol/connections"
"encoding/base64"
"encoding/binary"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"cwtch.im/cwtch/event"
"encoding/json"
"os"
"sync"
"syscall"
)
/* pipeBridge creates a pair of named pipes
Needs a call to new client and service to fully successfully open
*/
const maxBufferSize = 1000
type pipeBridge struct {
infile, outfile string
in, out *os.File
read, write chan event.IPCMessage
closedChan chan bool
state connections.ConnectionState
lock sync.Mutex
}
func newPipeBridge(inFilename, outFilename string) *pipeBridge {
syscall.Mkfifo(inFilename, 0600)
syscall.Mkfifo(outFilename, 0600)
pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED}
pb.read = make(chan event.IPCMessage, maxBufferSize)
pb.write = make(chan event.IPCMessage, maxBufferSize)
return pb
}
// NewPipeBridgeClient returns a pipe backed IPCBridge for a client
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
log.Debugf("Making new PipeBridge Client...\n")
pb := newPipeBridge(inFilename, outFilename)
go pb.clientConnectionManager()
return pb
}
// NewPipeBridgeService returns a pipe backed IPCBridge for a service
func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) {
log.Debugf("Making new PipeBridge Service...\n")
pb := newPipeBridge(inFilename, outFilename)
go pb.serviceConnectionManager()
log.Debugf("Successfully created new PipeBridge Service!\n")
return pb, nil
}
func (pb *pipeBridge) clientConnectionManager() {
for pb.state != connections.KILLED {
pb.state = connections.CONNECTING
var err error
pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}
pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}
log.Debugf("Successfully connected PipeBridge Client!\n")
pb.handleConns()
}
}
func (pb *pipeBridge) serviceConnectionManager() {
for pb.state != connections.KILLED {
pb.state = connections.CONNECTING
var err error
pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}
pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}
log.Debugf("Successfully connected PipeBridge Service!\n")
pb.handleConns()
}
}
func (pb *pipeBridge) handleConns() {
// auth?
pb.state = connections.AUTHENTICATED
pb.closedChan = make(chan bool, 5)
log.Debugf("handleConns authed, 2xgo\n")
go pb.handleRead()
go pb.handleWrite()
<-pb.closedChan
log.Debugf("handleConns CLOSEDCHAN!!!!\n")
if pb.state != connections.KILLED {
pb.state = connections.FAILED
}
pb.in.Close()
pb.out.Close()
close(pb.write)
close(pb.read)
pb.read = make(chan event.IPCMessage, maxBufferSize)
pb.write = make(chan event.IPCMessage, maxBufferSize)
log.Debugf("handleConns done, exit\n")
}
func (pb *pipeBridge) handleWrite() {
for {
select {
case message := <-pb.write:
log.Debugf("handleWrite <- message: %v\n", message)
if pb.state == connections.AUTHENTICATED {
encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
for k, v := range message.Message.Data {
encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v))
}
messageJSON, _ := json.Marshal(encMessage)
size := make([]byte, 2)
binary.LittleEndian.PutUint16(size, uint16(len(messageJSON)))
pb.out.Write(size)
for pos := 0; pos < len(messageJSON); {
n, err := pb.out.Write(messageJSON)
if err != nil {
log.Errorf("Writing out on pipeBridge: %v\n", err)
pb.closedChan <- true
return
}
pos += n
}
} else {
return
}
}
}
}
func (pb *pipeBridge) handleRead() {
var n int
size := make([]byte, 2)
var err error
for {
log.Debugf("Waiting to handleRead()...\n")
n, err = pb.in.Read(size)
if err != nil || n != 2 {
log.Errorf("Could not read len int from stream: %v\n", err)
pb.closedChan <- true
return
}
n = int(binary.LittleEndian.Uint16(size))
pos := 0
buffer := make([]byte, n)
for n > 0 {
m, err := pb.in.Read(buffer[pos:])
if err != nil {
log.Errorf("Reading into buffer from pipe: %v\n", err)
pb.closedChan <- true
return
}
n -= m
pos += m
}
var message event.IPCMessage
err = json.Unmarshal(buffer, &message)
if err != nil {
log.Errorf("Read error: %v --value: %v", err, message)
continue // signal error?
}
for k, v := range message.Message.Data {
val, _ := base64.StdEncoding.DecodeString(v)
message.Message.Data[k] = string(val)
}
log.Debugf("handleRead read<-: %v\n", message)
pb.read <- message
log.Debugf("handleRead wrote\n")
}
}
func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
log.Debugf("Read()...\n")
message := <-pb.read
log.Debugf("Read: %v\n", message)
return &message, true
}
func (pb *pipeBridge) Write(message *event.IPCMessage) {
log.Debugf("Write: %v\n", message)
pb.write <- *message
log.Debugf("Wrote\n")
}
func (pb *pipeBridge) Shutdown() {
pb.state = connections.KILLED
pb.closedChan <- true
}