From 6a169472e7858e3e73a45a8f77abfbb9b945a39e Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Thu, 5 Sep 2019 10:59:55 -0700 Subject: [PATCH] ipc bridge does 3way handshake to sync new connections; a few other minor fixes to the flow --- event/bridge/infinite_chan.go | 72 +++++++++++ event/bridge/infinite_queue.go | 105 ++++++++++++++++ event/bridge/pipeBridge.go | 206 ++++++++++++++++++++++++-------- event/bridge/pipeBridge_test.go | 74 ++++++++++++ 4 files changed, 407 insertions(+), 50 deletions(-) create mode 100644 event/bridge/infinite_chan.go create mode 100644 event/bridge/infinite_queue.go diff --git a/event/bridge/infinite_chan.go b/event/bridge/infinite_chan.go new file mode 100644 index 0000000..688d7f6 --- /dev/null +++ b/event/bridge/infinite_chan.go @@ -0,0 +1,72 @@ +package bridge + +/* Todo: When go generics ships, refactor this and event.infiniteChannel into one */ + +// InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output. +type InfiniteChannel struct { + input, output chan interface{} + length chan int + buffer *InfiniteQueue +} + +func newInfiniteChannel() *InfiniteChannel { + ch := &InfiniteChannel{ + input: make(chan interface{}), + output: make(chan interface{}), + length: make(chan int), + buffer: newInfiniteQueue(), + } + go ch.infiniteBuffer() + return ch +} + +// In returns the input channel +func (ch *InfiniteChannel) In() chan<- interface{} { + return ch.input +} + +// Out returns the output channel +func (ch *InfiniteChannel) Out() <-chan interface{} { + return ch.output +} + +// Len returns the length of items in queue +func (ch *InfiniteChannel) Len() int { + return <-ch.length +} + +// Close closes the InfiniteChanel +func (ch *InfiniteChannel) Close() { + close(ch.input) +} + +func (ch *InfiniteChannel) infiniteBuffer() { + var input, output chan interface{} + var next interface{} + input = ch.input + + for input != nil || output != nil { + select { + case elem, open := <-input: + if open { + ch.buffer.Add(elem) + } else { + input = nil + } + case output <- next: + ch.buffer.Remove() + case ch.length <- ch.buffer.Length(): + } + + if ch.buffer.Length() > 0 { + output = ch.output + next = ch.buffer.Peek() + } else { + output = nil + next = nil + } + } + + close(ch.output) + close(ch.length) +} diff --git a/event/bridge/infinite_queue.go b/event/bridge/infinite_queue.go new file mode 100644 index 0000000..5ce1289 --- /dev/null +++ b/event/bridge/infinite_queue.go @@ -0,0 +1,105 @@ +package bridge + +/* Todo: When go generics ships, refactor this and event.infinitQueue channel into one */ + +/* +Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. +Using this instead of other, simpler, queue implementations (slice+append or linked list) provides +substantial memory and time benefits, and fewer GC pauses. + +The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe. +*/ + +// minQueueLen is smallest capacity that queue may have. +// Must be power of 2 for bitwise modulus: x % n == x & (n - 1). +const minQueueLen = 16 + +// InfiniteQueue represents a single instance of the queue data structure. +type InfiniteQueue struct { + buf []interface{} + head, tail, count int +} + +// New constructs and returns a new Queue. +func newInfiniteQueue() *InfiniteQueue { + return &InfiniteQueue{ + buf: make([]interface{}, minQueueLen), + } +} + +// Length returns the number of elements currently stored in the queue. +func (q *InfiniteQueue) Length() int { + return q.count +} + +// resizes the queue to fit exactly twice its current contents +// this can result in shrinking if the queue is less than half-full +func (q *InfiniteQueue) resize() { + newBuf := make([]interface{}, q.count<<1) + + if q.tail > q.head { + copy(newBuf, q.buf[q.head:q.tail]) + } else { + n := copy(newBuf, q.buf[q.head:]) + copy(newBuf[n:], q.buf[:q.tail]) + } + + q.head = 0 + q.tail = q.count + q.buf = newBuf +} + +// Add puts an element on the end of the queue. +func (q *InfiniteQueue) Add(elem interface{}) { + if q.count == len(q.buf) { + q.resize() + } + + q.buf[q.tail] = elem + // bitwise modulus + q.tail = (q.tail + 1) & (len(q.buf) - 1) + q.count++ +} + +// Peek returns the element at the head of the queue. This call panics +// if the queue is empty. +func (q *InfiniteQueue) Peek() interface{} { + if q.count <= 0 { + panic("queue: Peek() called on empty queue") + } + return q.buf[q.head] +} + +// Get returns the element at index i in the queue. If the index is +// invalid, the call will panic. This method accepts both positive and +// negative index values. Index 0 refers to the first element, and +// index -1 refers to the last. +func (q *InfiniteQueue) Get(i int) interface{} { + // If indexing backwards, convert to positive index. + if i < 0 { + i += q.count + } + if i < 0 || i >= q.count { + panic("queue: Get() called with index out of range") + } + // bitwise modulus + return q.buf[(q.head+i)&(len(q.buf)-1)] +} + +// Remove removes and returns the element from the front of the queue. If the +// queue is empty, the call will panic. +func (q *InfiniteQueue) Remove() interface{} { + if q.count <= 0 { + panic("queue: Remove() called on empty queue") + } + ret := q.buf[q.head] + q.buf[q.head] = nil + // bitwise modulus + q.head = (q.head + 1) & (len(q.buf) - 1) + q.count-- + // Resize down if buffer 1/4 full. + if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) { + q.resize() + } + return ret +} diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go index 03c5bb9..bbe8383 100644 --- a/event/bridge/pipeBridge.go +++ b/event/bridge/pipeBridge.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "git.openprivacy.ca/openprivacy/libricochet-go/log" "syscall" + "time" "cwtch.im/cwtch/event" "encoding/json" @@ -21,13 +22,22 @@ import ( const maxBufferSize = 1000 +const serviceName = "service" +const clientName = "client" + +const syn = "SYN" +const synack = "SYNACK" +const ack = "ACK" + type pipeBridge struct { infile, outfile string in, out *os.File - read, write chan event.IPCMessage + read chan event.IPCMessage + write *InfiniteChannel closedChan chan bool state connections.ConnectionState lock sync.Mutex + threeShake func () bool // For logging / debugging purposes name string @@ -38,7 +48,7 @@ func newPipeBridge(inFilename, outFilename string) *pipeBridge { syscall.Mkfifo(outFilename, 0600) pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED} pb.read = make(chan event.IPCMessage, maxBufferSize) - pb.write = make(chan event.IPCMessage, maxBufferSize) + pb.write = newInfiniteChannel() //make(chan event.IPCMessage, maxBufferSize) return pb } @@ -46,7 +56,8 @@ func newPipeBridge(inFilename, outFilename string) *pipeBridge { func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge { log.Debugf("Making new PipeBridge Client...\n") pb := newPipeBridge(inFilename, outFilename) - pb.name = "client" + pb.name = clientName + pb.threeShake = pb.threeShakeClient go pb.connectionManager() return pb @@ -56,7 +67,8 @@ func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge { func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge { log.Debugf("Making new PipeBridge Service...\n") pb := newPipeBridge(inFilename, outFilename) - pb.name = "service" + pb.name = serviceName + pb.threeShake = pb.threeShakeService go pb.connectionManager() @@ -92,9 +104,74 @@ func (pb *pipeBridge) connectionManager() { } +// threeShake performs a 3way handshake sync up +func (pb *pipeBridge) threeShakeService() bool { + synacked := false + + for { + resp, err := pb.readString() + if err != nil { + return false + } + + if string(resp) == syn { + if !synacked { + err = pb.writeString([]byte(synack)) + if err != nil { + return false + } + synacked = true + } + } else if string(resp) == ack { + return true + } + } +} + +func (pb *pipeBridge) synLoop(stop chan bool) { + delay := time.Duration(0) + for { + select { + case <-time.After(delay): + err := pb.writeString([]byte(syn)) + if err != nil { + return + } + delay = time.Second + case <- stop: + return + } + } +} + +func (pb *pipeBridge) threeShakeClient() bool { + stop := make(chan bool) + go pb.synLoop(stop) + for { + resp, err := pb.readString() + if err != nil { + return false + } + + if string(resp) == synack { + stop <- true + err := pb.writeString([]byte(ack)) + if err != nil { + return false + } + return true + } + } +} + func (pb *pipeBridge) handleConns() { - // auth? + if !pb.threeShake() { + pb.state = connections.FAILED + pb.closeReset() + return + } + pb.state = connections.AUTHENTICATED pb.closedChan = make(chan bool, 5) @@ -116,10 +193,13 @@ func (pb *pipeBridge) handleConns() { func (pb *pipeBridge) closeReset() { pb.in.Close() pb.out.Close() - close(pb.write) close(pb.read) - pb.read = make(chan event.IPCMessage, maxBufferSize) - pb.write = make(chan event.IPCMessage, maxBufferSize) + pb.write.Close() + + if pb.state != connections.KILLED { + pb.read = make(chan event.IPCMessage, maxBufferSize) + pb.write = newInfiniteChannel() + } } func (pb *pipeBridge) handleWrite() { @@ -128,7 +208,12 @@ func (pb *pipeBridge) handleWrite() { for { select { - case message := <-pb.write: + case messageInf := <-pb.write.output: + if messageInf == nil { + pb.closedChan <- true + return + } + message := messageInf.(event.IPCMessage) if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType) } else { @@ -141,18 +226,10 @@ func (pb *pipeBridge) handleWrite() { } messageJSON, _ := json.Marshal(encMessage) - size := make([]byte, 2) - binary.LittleEndian.PutUint16(size, uint16(len(messageJSON))) - pb.out.Write(size) - - for pos := 0; pos < len(messageJSON); { - n, err := pb.out.Write(messageJSON) - if err != nil { - log.Errorf("Writing out on pipeBridge: %v\n", err) - pb.closedChan <- true - return - } - pos += n + err := pb.writeString(messageJSON) + if err != nil { + pb.closedChan <- true + return } } else { return @@ -165,37 +242,21 @@ func (pb *pipeBridge) handleRead() { log.Debugf("handleRead() %v\n", pb.name) defer log.Debugf("exiting handleRead() %v", pb.name) - var n int - size := make([]byte, 2) - var err error for { log.Debugf("Waiting to handleRead()...\n") - n, err = pb.in.Read(size) - if err != nil || n != 2 { - log.Errorf("Could not read len int from stream: %v\n", err) + + buffer, err := pb.readString() + if err != nil { pb.closedChan <- true return } - n = int(binary.LittleEndian.Uint16(size)) - pos := 0 - buffer := make([]byte, n) - for n > 0 { - m, err := pb.in.Read(buffer[pos:]) - if err != nil { - log.Errorf("Reading into buffer from pipe: %v\n", err) - pb.closedChan <- true - return - } - n -= m - pos += m - } - var message event.IPCMessage err = json.Unmarshal(buffer, &message) if err != nil { - log.Errorf("Read error: %v --value: %v", err, message) - continue // signal error? + log.Errorf("Read error: '%v', value: '%v'", err, buffer) + pb.closedChan <- true + return // probably new connection trying to initialize } for k, v := range message.Message.Data { val, _ := base64.StdEncoding.DecodeString(v) @@ -213,12 +274,15 @@ func (pb *pipeBridge) handleRead() { func (pb *pipeBridge) Read() (*event.IPCMessage, bool) { log.Debugf("Read() %v...\n", pb.name) - - message := <-pb.read - if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { - log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType) - } else { - log.Debugf("Read %v: %v\n", pb.name, message) + var ok = false + var message event.IPCMessage + for !ok && pb.state != connections.KILLED { + message, ok = <-pb.read + if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { + log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType) + } else { + log.Debugf("Read %v: %v\n", pb.name, message) + } } return &message, pb.state != connections.KILLED } @@ -229,7 +293,7 @@ func (pb *pipeBridge) Write(message *event.IPCMessage) { } else { log.Debugf("Write %v: %v\n", pb.name, message) } - pb.write <- *message + pb.write.input <- *message log.Debugf("Wrote\n") } @@ -239,3 +303,45 @@ func (pb *pipeBridge) Shutdown() { pb.closedChan <- true log.Debugf("Done Shutdown for %v\n", pb.name) } + +func (pb *pipeBridge) writeString(message []byte) error { + size := make([]byte, 2) + binary.LittleEndian.PutUint16(size, uint16(len(message))) + pb.out.Write(size) + + for pos := 0; pos < len(message); { + n, err := pb.out.Write(message[pos:]) + if err != nil { + log.Errorf("Writing out on pipeBridge: %v\n", err) + return err + } + pos += n + } + return nil +} + +func (pb *pipeBridge) readString() ([]byte, error) { + var n int + size := make([]byte, 2) + var err error + + n, err = pb.in.Read(size) + if err != nil || n != 2 { + log.Errorf("Could not read len int from stream: %v\n", err) + return nil, err + } + + n = int(binary.LittleEndian.Uint16(size)) + pos := 0 + buffer := make([]byte, n) + for n > 0 { + m, err := pb.in.Read(buffer[pos:]) + if err != nil { + log.Errorf("Reading into buffer from pipe: %v\n", err) + return nil, err + } + n -= m + pos += m + } + return buffer, nil +} diff --git a/event/bridge/pipeBridge_test.go b/event/bridge/pipeBridge_test.go index da0c500..1c5bbb7 100644 --- a/event/bridge/pipeBridge_test.go +++ b/event/bridge/pipeBridge_test.go @@ -2,7 +2,10 @@ package bridge import ( "cwtch.im/cwtch/event" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "os" "testing" + "time" ) var ( @@ -44,6 +47,8 @@ func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, } func TestPipeBridge(t *testing.T) { + os.Remove(servicePipe) + os.Remove(clientPipe) messageOrig := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer, event.Identity, "It is I")} serviceDone := make(chan bool) @@ -55,3 +60,72 @@ func TestPipeBridge(t *testing.T) { <-serviceDone <-clientDone } + +func restartingClient(t *testing.T, in, out string, done chan bool) { + client := NewPipeBridgeClient(in, out) + + message1 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer)} + log.Infoln("client writing message 1") + client.Write(message1) + + time.Sleep(100 * time.Millisecond) + log.Infoln("client shutdown") + client.Shutdown() + + log.Infoln("client new client") + client = NewPipeBridgeClient(in, out) + message2 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.DeleteContact)} + log.Infoln("client2 write message2") + client.Write(message2) + + done <- true +} + +func stableService(t *testing.T, in, out string, done chan bool) { + service := NewPipeBridgeService(in, out) + + log.Infoln("service wait read 1") + message1, ok := service.Read() + log.Infof("service read 1 %v ok:%v\n", message1, ok) + if !ok { + t.Errorf("Reading from client IPCBridge 1st time failed") + done <- true + return + } + if message1.Message.EventType != event.NewPeer { + t.Errorf("Wrong message recieved, expected NewPeer\n") + done <- true + return + } + + log.Infoln("service wait read 2") + message2, ok := service.Read() + log.Infof("service read 2 got %v ok:%v\n", message2, ok) + if !ok { + t.Errorf("Reading from client IPCBridge 2nd time failed") + done <- true + return + } + if message2.Message.EventType != event.DeleteContact { + t.Errorf("Wrong message recieved, expected DeleteContact, got %v\n", message2) + done <- true + return + } + + done <- true +} + +func TestReconnect(t *testing.T) { + log.Infoln("TestReconnect") + os.Remove(servicePipe) + os.Remove(clientPipe) + + serviceDone := make(chan bool) + clientDone := make(chan bool) + + go restartingClient(t, clientPipe, servicePipe, clientDone) + go stableService(t, servicePipe, clientPipe, serviceDone) + + <-serviceDone + <-clientDone +}