forked from cwtch.im/cwtch
ipc bridge does 3way handshake to sync new connections; a few other minor fixes to the flow
This commit is contained in:
parent
9dab8cc877
commit
6a169472e7
|
@ -0,0 +1,72 @@
|
||||||
|
package bridge
|
||||||
|
|
||||||
|
/* Todo: When go generics ships, refactor this and event.infiniteChannel into one */
|
||||||
|
|
||||||
|
// InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output.
|
||||||
|
type InfiniteChannel struct {
|
||||||
|
input, output chan interface{}
|
||||||
|
length chan int
|
||||||
|
buffer *InfiniteQueue
|
||||||
|
}
|
||||||
|
|
||||||
|
func newInfiniteChannel() *InfiniteChannel {
|
||||||
|
ch := &InfiniteChannel{
|
||||||
|
input: make(chan interface{}),
|
||||||
|
output: make(chan interface{}),
|
||||||
|
length: make(chan int),
|
||||||
|
buffer: newInfiniteQueue(),
|
||||||
|
}
|
||||||
|
go ch.infiniteBuffer()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// In returns the input channel
|
||||||
|
func (ch *InfiniteChannel) In() chan<- interface{} {
|
||||||
|
return ch.input
|
||||||
|
}
|
||||||
|
|
||||||
|
// Out returns the output channel
|
||||||
|
func (ch *InfiniteChannel) Out() <-chan interface{} {
|
||||||
|
return ch.output
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the length of items in queue
|
||||||
|
func (ch *InfiniteChannel) Len() int {
|
||||||
|
return <-ch.length
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the InfiniteChanel
|
||||||
|
func (ch *InfiniteChannel) Close() {
|
||||||
|
close(ch.input)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ch *InfiniteChannel) infiniteBuffer() {
|
||||||
|
var input, output chan interface{}
|
||||||
|
var next interface{}
|
||||||
|
input = ch.input
|
||||||
|
|
||||||
|
for input != nil || output != nil {
|
||||||
|
select {
|
||||||
|
case elem, open := <-input:
|
||||||
|
if open {
|
||||||
|
ch.buffer.Add(elem)
|
||||||
|
} else {
|
||||||
|
input = nil
|
||||||
|
}
|
||||||
|
case output <- next:
|
||||||
|
ch.buffer.Remove()
|
||||||
|
case ch.length <- ch.buffer.Length():
|
||||||
|
}
|
||||||
|
|
||||||
|
if ch.buffer.Length() > 0 {
|
||||||
|
output = ch.output
|
||||||
|
next = ch.buffer.Peek()
|
||||||
|
} else {
|
||||||
|
output = nil
|
||||||
|
next = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(ch.output)
|
||||||
|
close(ch.length)
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
package bridge
|
||||||
|
|
||||||
|
/* Todo: When go generics ships, refactor this and event.infinitQueue channel into one */
|
||||||
|
|
||||||
|
/*
|
||||||
|
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
|
||||||
|
Using this instead of other, simpler, queue implementations (slice+append or linked list) provides
|
||||||
|
substantial memory and time benefits, and fewer GC pauses.
|
||||||
|
|
||||||
|
The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// minQueueLen is smallest capacity that queue may have.
|
||||||
|
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
|
||||||
|
const minQueueLen = 16
|
||||||
|
|
||||||
|
// InfiniteQueue represents a single instance of the queue data structure.
|
||||||
|
type InfiniteQueue struct {
|
||||||
|
buf []interface{}
|
||||||
|
head, tail, count int
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs and returns a new Queue.
|
||||||
|
func newInfiniteQueue() *InfiniteQueue {
|
||||||
|
return &InfiniteQueue{
|
||||||
|
buf: make([]interface{}, minQueueLen),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Length returns the number of elements currently stored in the queue.
|
||||||
|
func (q *InfiniteQueue) Length() int {
|
||||||
|
return q.count
|
||||||
|
}
|
||||||
|
|
||||||
|
// resizes the queue to fit exactly twice its current contents
|
||||||
|
// this can result in shrinking if the queue is less than half-full
|
||||||
|
func (q *InfiniteQueue) resize() {
|
||||||
|
newBuf := make([]interface{}, q.count<<1)
|
||||||
|
|
||||||
|
if q.tail > q.head {
|
||||||
|
copy(newBuf, q.buf[q.head:q.tail])
|
||||||
|
} else {
|
||||||
|
n := copy(newBuf, q.buf[q.head:])
|
||||||
|
copy(newBuf[n:], q.buf[:q.tail])
|
||||||
|
}
|
||||||
|
|
||||||
|
q.head = 0
|
||||||
|
q.tail = q.count
|
||||||
|
q.buf = newBuf
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add puts an element on the end of the queue.
|
||||||
|
func (q *InfiniteQueue) Add(elem interface{}) {
|
||||||
|
if q.count == len(q.buf) {
|
||||||
|
q.resize()
|
||||||
|
}
|
||||||
|
|
||||||
|
q.buf[q.tail] = elem
|
||||||
|
// bitwise modulus
|
||||||
|
q.tail = (q.tail + 1) & (len(q.buf) - 1)
|
||||||
|
q.count++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Peek returns the element at the head of the queue. This call panics
|
||||||
|
// if the queue is empty.
|
||||||
|
func (q *InfiniteQueue) Peek() interface{} {
|
||||||
|
if q.count <= 0 {
|
||||||
|
panic("queue: Peek() called on empty queue")
|
||||||
|
}
|
||||||
|
return q.buf[q.head]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the element at index i in the queue. If the index is
|
||||||
|
// invalid, the call will panic. This method accepts both positive and
|
||||||
|
// negative index values. Index 0 refers to the first element, and
|
||||||
|
// index -1 refers to the last.
|
||||||
|
func (q *InfiniteQueue) Get(i int) interface{} {
|
||||||
|
// If indexing backwards, convert to positive index.
|
||||||
|
if i < 0 {
|
||||||
|
i += q.count
|
||||||
|
}
|
||||||
|
if i < 0 || i >= q.count {
|
||||||
|
panic("queue: Get() called with index out of range")
|
||||||
|
}
|
||||||
|
// bitwise modulus
|
||||||
|
return q.buf[(q.head+i)&(len(q.buf)-1)]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove removes and returns the element from the front of the queue. If the
|
||||||
|
// queue is empty, the call will panic.
|
||||||
|
func (q *InfiniteQueue) Remove() interface{} {
|
||||||
|
if q.count <= 0 {
|
||||||
|
panic("queue: Remove() called on empty queue")
|
||||||
|
}
|
||||||
|
ret := q.buf[q.head]
|
||||||
|
q.buf[q.head] = nil
|
||||||
|
// bitwise modulus
|
||||||
|
q.head = (q.head + 1) & (len(q.buf) - 1)
|
||||||
|
q.count--
|
||||||
|
// Resize down if buffer 1/4 full.
|
||||||
|
if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) {
|
||||||
|
q.resize()
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
@ -21,13 +22,22 @@ import (
|
||||||
|
|
||||||
const maxBufferSize = 1000
|
const maxBufferSize = 1000
|
||||||
|
|
||||||
|
const serviceName = "service"
|
||||||
|
const clientName = "client"
|
||||||
|
|
||||||
|
const syn = "SYN"
|
||||||
|
const synack = "SYNACK"
|
||||||
|
const ack = "ACK"
|
||||||
|
|
||||||
type pipeBridge struct {
|
type pipeBridge struct {
|
||||||
infile, outfile string
|
infile, outfile string
|
||||||
in, out *os.File
|
in, out *os.File
|
||||||
read, write chan event.IPCMessage
|
read chan event.IPCMessage
|
||||||
|
write *InfiniteChannel
|
||||||
closedChan chan bool
|
closedChan chan bool
|
||||||
state connections.ConnectionState
|
state connections.ConnectionState
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
threeShake func () bool
|
||||||
|
|
||||||
// For logging / debugging purposes
|
// For logging / debugging purposes
|
||||||
name string
|
name string
|
||||||
|
@ -38,7 +48,7 @@ func newPipeBridge(inFilename, outFilename string) *pipeBridge {
|
||||||
syscall.Mkfifo(outFilename, 0600)
|
syscall.Mkfifo(outFilename, 0600)
|
||||||
pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED}
|
pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED}
|
||||||
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
||||||
pb.write = make(chan event.IPCMessage, maxBufferSize)
|
pb.write = newInfiniteChannel() //make(chan event.IPCMessage, maxBufferSize)
|
||||||
return pb
|
return pb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +56,8 @@ func newPipeBridge(inFilename, outFilename string) *pipeBridge {
|
||||||
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
|
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
|
||||||
log.Debugf("Making new PipeBridge Client...\n")
|
log.Debugf("Making new PipeBridge Client...\n")
|
||||||
pb := newPipeBridge(inFilename, outFilename)
|
pb := newPipeBridge(inFilename, outFilename)
|
||||||
pb.name = "client"
|
pb.name = clientName
|
||||||
|
pb.threeShake = pb.threeShakeClient
|
||||||
go pb.connectionManager()
|
go pb.connectionManager()
|
||||||
|
|
||||||
return pb
|
return pb
|
||||||
|
@ -56,7 +67,8 @@ func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
|
||||||
func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
|
func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
|
||||||
log.Debugf("Making new PipeBridge Service...\n")
|
log.Debugf("Making new PipeBridge Service...\n")
|
||||||
pb := newPipeBridge(inFilename, outFilename)
|
pb := newPipeBridge(inFilename, outFilename)
|
||||||
pb.name = "service"
|
pb.name = serviceName
|
||||||
|
pb.threeShake = pb.threeShakeService
|
||||||
|
|
||||||
go pb.connectionManager()
|
go pb.connectionManager()
|
||||||
|
|
||||||
|
@ -92,9 +104,74 @@ func (pb *pipeBridge) connectionManager() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// threeShake performs a 3way handshake sync up
|
||||||
|
func (pb *pipeBridge) threeShakeService() bool {
|
||||||
|
synacked := false
|
||||||
|
|
||||||
|
for {
|
||||||
|
resp, err := pb.readString()
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(resp) == syn {
|
||||||
|
if !synacked {
|
||||||
|
err = pb.writeString([]byte(synack))
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
synacked = true
|
||||||
|
}
|
||||||
|
} else if string(resp) == ack {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) synLoop(stop chan bool) {
|
||||||
|
delay := time.Duration(0)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(delay):
|
||||||
|
err := pb.writeString([]byte(syn))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delay = time.Second
|
||||||
|
case <- stop:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) threeShakeClient() bool {
|
||||||
|
stop := make(chan bool)
|
||||||
|
go pb.synLoop(stop)
|
||||||
|
for {
|
||||||
|
resp, err := pb.readString()
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(resp) == synack {
|
||||||
|
stop <- true
|
||||||
|
err := pb.writeString([]byte(ack))
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (pb *pipeBridge) handleConns() {
|
func (pb *pipeBridge) handleConns() {
|
||||||
|
|
||||||
// auth?
|
if !pb.threeShake() {
|
||||||
|
pb.state = connections.FAILED
|
||||||
|
pb.closeReset()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
pb.state = connections.AUTHENTICATED
|
pb.state = connections.AUTHENTICATED
|
||||||
|
|
||||||
pb.closedChan = make(chan bool, 5)
|
pb.closedChan = make(chan bool, 5)
|
||||||
|
@ -116,10 +193,13 @@ func (pb *pipeBridge) handleConns() {
|
||||||
func (pb *pipeBridge) closeReset() {
|
func (pb *pipeBridge) closeReset() {
|
||||||
pb.in.Close()
|
pb.in.Close()
|
||||||
pb.out.Close()
|
pb.out.Close()
|
||||||
close(pb.write)
|
|
||||||
close(pb.read)
|
close(pb.read)
|
||||||
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
pb.write.Close()
|
||||||
pb.write = make(chan event.IPCMessage, maxBufferSize)
|
|
||||||
|
if pb.state != connections.KILLED {
|
||||||
|
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
||||||
|
pb.write = newInfiniteChannel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pb *pipeBridge) handleWrite() {
|
func (pb *pipeBridge) handleWrite() {
|
||||||
|
@ -128,7 +208,12 @@ func (pb *pipeBridge) handleWrite() {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case message := <-pb.write:
|
case messageInf := <-pb.write.output:
|
||||||
|
if messageInf == nil {
|
||||||
|
pb.closedChan <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
message := messageInf.(event.IPCMessage)
|
||||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
||||||
log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType)
|
log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType)
|
||||||
} else {
|
} else {
|
||||||
|
@ -141,18 +226,10 @@ func (pb *pipeBridge) handleWrite() {
|
||||||
}
|
}
|
||||||
|
|
||||||
messageJSON, _ := json.Marshal(encMessage)
|
messageJSON, _ := json.Marshal(encMessage)
|
||||||
size := make([]byte, 2)
|
err := pb.writeString(messageJSON)
|
||||||
binary.LittleEndian.PutUint16(size, uint16(len(messageJSON)))
|
if err != nil {
|
||||||
pb.out.Write(size)
|
pb.closedChan <- true
|
||||||
|
return
|
||||||
for pos := 0; pos < len(messageJSON); {
|
|
||||||
n, err := pb.out.Write(messageJSON)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Writing out on pipeBridge: %v\n", err)
|
|
||||||
pb.closedChan <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pos += n
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return
|
return
|
||||||
|
@ -165,37 +242,21 @@ func (pb *pipeBridge) handleRead() {
|
||||||
log.Debugf("handleRead() %v\n", pb.name)
|
log.Debugf("handleRead() %v\n", pb.name)
|
||||||
defer log.Debugf("exiting handleRead() %v", pb.name)
|
defer log.Debugf("exiting handleRead() %v", pb.name)
|
||||||
|
|
||||||
var n int
|
|
||||||
size := make([]byte, 2)
|
|
||||||
var err error
|
|
||||||
for {
|
for {
|
||||||
log.Debugf("Waiting to handleRead()...\n")
|
log.Debugf("Waiting to handleRead()...\n")
|
||||||
n, err = pb.in.Read(size)
|
|
||||||
if err != nil || n != 2 {
|
buffer, err := pb.readString()
|
||||||
log.Errorf("Could not read len int from stream: %v\n", err)
|
if err != nil {
|
||||||
pb.closedChan <- true
|
pb.closedChan <- true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
n = int(binary.LittleEndian.Uint16(size))
|
|
||||||
pos := 0
|
|
||||||
buffer := make([]byte, n)
|
|
||||||
for n > 0 {
|
|
||||||
m, err := pb.in.Read(buffer[pos:])
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Reading into buffer from pipe: %v\n", err)
|
|
||||||
pb.closedChan <- true
|
|
||||||
return
|
|
||||||
}
|
|
||||||
n -= m
|
|
||||||
pos += m
|
|
||||||
}
|
|
||||||
|
|
||||||
var message event.IPCMessage
|
var message event.IPCMessage
|
||||||
err = json.Unmarshal(buffer, &message)
|
err = json.Unmarshal(buffer, &message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Read error: %v --value: %v", err, message)
|
log.Errorf("Read error: '%v', value: '%v'", err, buffer)
|
||||||
continue // signal error?
|
pb.closedChan <- true
|
||||||
|
return // probably new connection trying to initialize
|
||||||
}
|
}
|
||||||
for k, v := range message.Message.Data {
|
for k, v := range message.Message.Data {
|
||||||
val, _ := base64.StdEncoding.DecodeString(v)
|
val, _ := base64.StdEncoding.DecodeString(v)
|
||||||
|
@ -213,12 +274,15 @@ func (pb *pipeBridge) handleRead() {
|
||||||
|
|
||||||
func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
|
func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
|
||||||
log.Debugf("Read() %v...\n", pb.name)
|
log.Debugf("Read() %v...\n", pb.name)
|
||||||
|
var ok = false
|
||||||
message := <-pb.read
|
var message event.IPCMessage
|
||||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
for !ok && pb.state != connections.KILLED {
|
||||||
log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
|
message, ok = <-pb.read
|
||||||
} else {
|
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
||||||
log.Debugf("Read %v: %v\n", pb.name, message)
|
log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
|
||||||
|
} else {
|
||||||
|
log.Debugf("Read %v: %v\n", pb.name, message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return &message, pb.state != connections.KILLED
|
return &message, pb.state != connections.KILLED
|
||||||
}
|
}
|
||||||
|
@ -229,7 +293,7 @@ func (pb *pipeBridge) Write(message *event.IPCMessage) {
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("Write %v: %v\n", pb.name, message)
|
log.Debugf("Write %v: %v\n", pb.name, message)
|
||||||
}
|
}
|
||||||
pb.write <- *message
|
pb.write.input <- *message
|
||||||
log.Debugf("Wrote\n")
|
log.Debugf("Wrote\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,3 +303,45 @@ func (pb *pipeBridge) Shutdown() {
|
||||||
pb.closedChan <- true
|
pb.closedChan <- true
|
||||||
log.Debugf("Done Shutdown for %v\n", pb.name)
|
log.Debugf("Done Shutdown for %v\n", pb.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) writeString(message []byte) error {
|
||||||
|
size := make([]byte, 2)
|
||||||
|
binary.LittleEndian.PutUint16(size, uint16(len(message)))
|
||||||
|
pb.out.Write(size)
|
||||||
|
|
||||||
|
for pos := 0; pos < len(message); {
|
||||||
|
n, err := pb.out.Write(message[pos:])
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Writing out on pipeBridge: %v\n", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pos += n
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pb *pipeBridge) readString() ([]byte, error) {
|
||||||
|
var n int
|
||||||
|
size := make([]byte, 2)
|
||||||
|
var err error
|
||||||
|
|
||||||
|
n, err = pb.in.Read(size)
|
||||||
|
if err != nil || n != 2 {
|
||||||
|
log.Errorf("Could not read len int from stream: %v\n", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n = int(binary.LittleEndian.Uint16(size))
|
||||||
|
pos := 0
|
||||||
|
buffer := make([]byte, n)
|
||||||
|
for n > 0 {
|
||||||
|
m, err := pb.in.Read(buffer[pos:])
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Reading into buffer from pipe: %v\n", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
n -= m
|
||||||
|
pos += m
|
||||||
|
}
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,10 @@ package bridge
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -44,6 +47,8 @@ func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage,
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPipeBridge(t *testing.T) {
|
func TestPipeBridge(t *testing.T) {
|
||||||
|
os.Remove(servicePipe)
|
||||||
|
os.Remove(clientPipe)
|
||||||
|
|
||||||
messageOrig := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer, event.Identity, "It is I")}
|
messageOrig := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer, event.Identity, "It is I")}
|
||||||
serviceDone := make(chan bool)
|
serviceDone := make(chan bool)
|
||||||
|
@ -55,3 +60,72 @@ func TestPipeBridge(t *testing.T) {
|
||||||
<-serviceDone
|
<-serviceDone
|
||||||
<-clientDone
|
<-clientDone
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func restartingClient(t *testing.T, in, out string, done chan bool) {
|
||||||
|
client := NewPipeBridgeClient(in, out)
|
||||||
|
|
||||||
|
message1 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer)}
|
||||||
|
log.Infoln("client writing message 1")
|
||||||
|
client.Write(message1)
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
log.Infoln("client shutdown")
|
||||||
|
client.Shutdown()
|
||||||
|
|
||||||
|
log.Infoln("client new client")
|
||||||
|
client = NewPipeBridgeClient(in, out)
|
||||||
|
message2 := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.DeleteContact)}
|
||||||
|
log.Infoln("client2 write message2")
|
||||||
|
client.Write(message2)
|
||||||
|
|
||||||
|
done <- true
|
||||||
|
}
|
||||||
|
|
||||||
|
func stableService(t *testing.T, in, out string, done chan bool) {
|
||||||
|
service := NewPipeBridgeService(in, out)
|
||||||
|
|
||||||
|
log.Infoln("service wait read 1")
|
||||||
|
message1, ok := service.Read()
|
||||||
|
log.Infof("service read 1 %v ok:%v\n", message1, ok)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Reading from client IPCBridge 1st time failed")
|
||||||
|
done <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if message1.Message.EventType != event.NewPeer {
|
||||||
|
t.Errorf("Wrong message recieved, expected NewPeer\n")
|
||||||
|
done <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infoln("service wait read 2")
|
||||||
|
message2, ok := service.Read()
|
||||||
|
log.Infof("service read 2 got %v ok:%v\n", message2, ok)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Reading from client IPCBridge 2nd time failed")
|
||||||
|
done <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if message2.Message.EventType != event.DeleteContact {
|
||||||
|
t.Errorf("Wrong message recieved, expected DeleteContact, got %v\n", message2)
|
||||||
|
done <- true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
done <- true
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReconnect(t *testing.T) {
|
||||||
|
log.Infoln("TestReconnect")
|
||||||
|
os.Remove(servicePipe)
|
||||||
|
os.Remove(clientPipe)
|
||||||
|
|
||||||
|
serviceDone := make(chan bool)
|
||||||
|
clientDone := make(chan bool)
|
||||||
|
|
||||||
|
go restartingClient(t, clientPipe, servicePipe, clientDone)
|
||||||
|
go stableService(t, servicePipe, clientPipe, serviceDone)
|
||||||
|
|
||||||
|
<-serviceDone
|
||||||
|
<-clientDone
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue