Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

348 lines
8.6KB

  1. // +build !windows
  2. package bridge
  3. import (
  4. "cwtch.im/cwtch/protocol/connections"
  5. "encoding/base64"
  6. "encoding/binary"
  7. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  8. "syscall"
  9. "time"
  10. "cwtch.im/cwtch/event"
  11. "encoding/json"
  12. "os"
  13. "sync"
  14. )
  15. /* pipeBridge creates a pair of named pipes
  16. Needs a call to new client and service to fully successfully open
  17. */
  18. const maxBufferSize = 1000
  19. const serviceName = "service"
  20. const clientName = "client"
  21. const syn = "SYN"
  22. const synack = "SYNACK"
  23. const ack = "ACK"
  24. type pipeBridge struct {
  25. infile, outfile string
  26. in, out *os.File
  27. read chan event.IPCMessage
  28. write *InfiniteChannel
  29. closedChan chan bool
  30. state connections.ConnectionState
  31. lock sync.Mutex
  32. threeShake func () bool
  33. // For logging / debugging purposes
  34. name string
  35. }
  36. func newPipeBridge(inFilename, outFilename string) *pipeBridge {
  37. syscall.Mkfifo(inFilename, 0600)
  38. syscall.Mkfifo(outFilename, 0600)
  39. pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED}
  40. pb.read = make(chan event.IPCMessage, maxBufferSize)
  41. pb.write = newInfiniteChannel() //make(chan event.IPCMessage, maxBufferSize)
  42. return pb
  43. }
  44. // NewPipeBridgeClient returns a pipe backed IPCBridge for a client
  45. func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
  46. log.Debugf("Making new PipeBridge Client...\n")
  47. pb := newPipeBridge(inFilename, outFilename)
  48. pb.name = clientName
  49. pb.threeShake = pb.threeShakeClient
  50. go pb.connectionManager()
  51. return pb
  52. }
  53. // NewPipeBridgeService returns a pipe backed IPCBridge for a service
  54. func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
  55. log.Debugf("Making new PipeBridge Service...\n")
  56. pb := newPipeBridge(inFilename, outFilename)
  57. pb.name = serviceName
  58. pb.threeShake = pb.threeShakeService
  59. go pb.connectionManager()
  60. log.Debugf("Successfully created new PipeBridge Service!\n")
  61. return pb
  62. }
  63. func (pb *pipeBridge) connectionManager() {
  64. for pb.state != connections.KILLED {
  65. log.Debugf("clientConnManager loop start init\n")
  66. pb.state = connections.CONNECTING
  67. var err error
  68. log.Debugf("%v open file infile\n", pb.name)
  69. pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600)
  70. if err != nil {
  71. pb.state = connections.DISCONNECTED
  72. continue
  73. }
  74. log.Debugf("%v open file outfile\n", pb.name)
  75. pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600)
  76. if err != nil {
  77. pb.state = connections.DISCONNECTED
  78. continue
  79. }
  80. log.Debugf("Successfully connected PipeBridge %v!\n", pb.name)
  81. pb.handleConns()
  82. }
  83. log.Debugf("exiting %v ConnectionManager\n", pb.name)
  84. }
  85. // threeShake performs a 3way handshake sync up
  86. func (pb *pipeBridge) threeShakeService() bool {
  87. synacked := false
  88. for {
  89. resp, err := pb.readString()
  90. if err != nil {
  91. return false
  92. }
  93. if string(resp) == syn {
  94. if !synacked {
  95. err = pb.writeString([]byte(synack))
  96. if err != nil {
  97. return false
  98. }
  99. synacked = true
  100. }
  101. } else if string(resp) == ack {
  102. return true
  103. }
  104. }
  105. }
  106. func (pb *pipeBridge) synLoop(stop chan bool) {
  107. delay := time.Duration(0)
  108. for {
  109. select {
  110. case <-time.After(delay):
  111. err := pb.writeString([]byte(syn))
  112. if err != nil {
  113. return
  114. }
  115. delay = time.Second
  116. case <- stop:
  117. return
  118. }
  119. }
  120. }
  121. func (pb *pipeBridge) threeShakeClient() bool {
  122. stop := make(chan bool)
  123. go pb.synLoop(stop)
  124. for {
  125. resp, err := pb.readString()
  126. if err != nil {
  127. return false
  128. }
  129. if string(resp) == synack {
  130. stop <- true
  131. err := pb.writeString([]byte(ack))
  132. if err != nil {
  133. return false
  134. }
  135. return true
  136. }
  137. }
  138. }
  139. func (pb *pipeBridge) handleConns() {
  140. if !pb.threeShake() {
  141. pb.state = connections.FAILED
  142. pb.closeReset()
  143. return
  144. }
  145. pb.state = connections.AUTHENTICATED
  146. pb.closedChan = make(chan bool, 5)
  147. log.Debugf("handleConns authed, %v 2xgo\n", pb.name)
  148. go pb.handleRead()
  149. go pb.handleWrite()
  150. <-pb.closedChan
  151. log.Debugf("handleConns <-closedChan (%v)\n", pb.name)
  152. if pb.state != connections.KILLED {
  153. pb.state = connections.FAILED
  154. }
  155. pb.closeReset()
  156. log.Debugf("handleConns done for %v, exit\n", pb.name)
  157. }
  158. func (pb *pipeBridge) closeReset() {
  159. pb.in.Close()
  160. pb.out.Close()
  161. close(pb.read)
  162. pb.write.Close()
  163. if pb.state != connections.KILLED {
  164. pb.read = make(chan event.IPCMessage, maxBufferSize)
  165. pb.write = newInfiniteChannel()
  166. }
  167. }
  168. func (pb *pipeBridge) handleWrite() {
  169. log.Debugf("handleWrite() %v\n", pb.name)
  170. defer log.Debugf("exiting handleWrite() %v\n", pb.name)
  171. for {
  172. select {
  173. case messageInf := <-pb.write.output:
  174. if messageInf == nil {
  175. pb.closedChan <- true
  176. return
  177. }
  178. message := messageInf.(event.IPCMessage)
  179. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  180. log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType)
  181. } else {
  182. log.Debugf("handleWrite <- message: %v\n", message)
  183. }
  184. if pb.state == connections.AUTHENTICATED {
  185. encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
  186. for k, v := range message.Message.Data {
  187. encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v))
  188. }
  189. messageJSON, _ := json.Marshal(encMessage)
  190. err := pb.writeString(messageJSON)
  191. if err != nil {
  192. pb.closedChan <- true
  193. return
  194. }
  195. } else {
  196. return
  197. }
  198. }
  199. }
  200. }
  201. func (pb *pipeBridge) handleRead() {
  202. log.Debugf("handleRead() %v\n", pb.name)
  203. defer log.Debugf("exiting handleRead() %v", pb.name)
  204. for {
  205. log.Debugf("Waiting to handleRead()...\n")
  206. buffer, err := pb.readString()
  207. if err != nil {
  208. pb.closedChan <- true
  209. return
  210. }
  211. var message event.IPCMessage
  212. err = json.Unmarshal(buffer, &message)
  213. if err != nil {
  214. log.Errorf("Read error: '%v', value: '%v'", err, buffer)
  215. pb.closedChan <- true
  216. return // probably new connection trying to initialize
  217. }
  218. for k, v := range message.Message.Data {
  219. val, _ := base64.StdEncoding.DecodeString(v)
  220. message.Message.Data[k] = string(val)
  221. }
  222. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  223. log.Debugf("handleRead read<-: %v %v ...\n", message.Dest, message.Message.EventType)
  224. } else {
  225. log.Debugf("handleRead read<-: %v\n", message)
  226. }
  227. pb.read <- message
  228. log.Debugf("handleRead wrote\n")
  229. }
  230. }
  231. func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
  232. log.Debugf("Read() %v...\n", pb.name)
  233. var ok = false
  234. var message event.IPCMessage
  235. for !ok && pb.state != connections.KILLED {
  236. message, ok = <-pb.read
  237. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  238. log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
  239. } else {
  240. log.Debugf("Read %v: %v\n", pb.name, message)
  241. }
  242. }
  243. return &message, pb.state != connections.KILLED
  244. }
  245. func (pb *pipeBridge) Write(message *event.IPCMessage) {
  246. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  247. log.Debugf("Write %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
  248. } else {
  249. log.Debugf("Write %v: %v\n", pb.name, message)
  250. }
  251. pb.write.input <- *message
  252. log.Debugf("Wrote\n")
  253. }
  254. func (pb *pipeBridge) Shutdown() {
  255. log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.state])
  256. pb.state = connections.KILLED
  257. pb.closedChan <- true
  258. log.Debugf("Done Shutdown for %v\n", pb.name)
  259. }
  260. func (pb *pipeBridge) writeString(message []byte) error {
  261. size := make([]byte, 2)
  262. binary.LittleEndian.PutUint16(size, uint16(len(message)))
  263. pb.out.Write(size)
  264. for pos := 0; pos < len(message); {
  265. n, err := pb.out.Write(message[pos:])
  266. if err != nil {
  267. log.Errorf("Writing out on pipeBridge: %v\n", err)
  268. return err
  269. }
  270. pos += n
  271. }
  272. return nil
  273. }
  274. func (pb *pipeBridge) readString() ([]byte, error) {
  275. var n int
  276. size := make([]byte, 2)
  277. var err error
  278. n, err = pb.in.Read(size)
  279. if err != nil || n != 2 {
  280. log.Errorf("Could not read len int from stream: %v\n", err)
  281. return nil, err
  282. }
  283. n = int(binary.LittleEndian.Uint16(size))
  284. pos := 0
  285. buffer := make([]byte, n)
  286. for n > 0 {
  287. m, err := pb.in.Read(buffer[pos:])
  288. if err != nil {
  289. log.Errorf("Reading into buffer from pipe: %v\n", err)
  290. return nil, err
  291. }
  292. n -= m
  293. pos += m
  294. }
  295. return buffer, nil
  296. }