Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

361 lines
8.9KB

  1. // +build !windows
  2. package bridge
  3. import (
  4. "cwtch.im/cwtch/event"
  5. "cwtch.im/cwtch/protocol/connections"
  6. "encoding/base64"
  7. "encoding/binary"
  8. "encoding/json"
  9. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  10. "os"
  11. "sync"
  12. "syscall"
  13. "time"
  14. )
  15. /* pipeBridge creates a pair of named pipes
  16. Needs a call to new client and service to fully successfully open
  17. */
  18. const maxBufferSize = 1000
  19. const serviceName = "service"
  20. const clientName = "client"
  21. const syn = "SYN"
  22. const synack = "SYNACK"
  23. const ack = "ACK"
  24. type pipeBridge struct {
  25. infile, outfile string
  26. in, out *os.File
  27. read chan event.IPCMessage
  28. write *InfiniteChannel
  29. closedChan chan bool
  30. state connections.ConnectionState
  31. lock sync.Mutex
  32. threeShake func() bool
  33. // For logging / debugging purposes
  34. name string
  35. }
  36. func newPipeBridge(inFilename, outFilename string) *pipeBridge {
  37. syscall.Mkfifo(inFilename, 0600)
  38. syscall.Mkfifo(outFilename, 0600)
  39. pb := &pipeBridge{infile: inFilename, outfile: outFilename, state: connections.DISCONNECTED}
  40. pb.read = make(chan event.IPCMessage, maxBufferSize)
  41. pb.write = newInfiniteChannel() //make(chan event.IPCMessage, maxBufferSize)
  42. return pb
  43. }
  44. // NewPipeBridgeClient returns a pipe backed IPCBridge for a client
  45. func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
  46. log.Debugf("Making new PipeBridge Client...\n")
  47. pb := newPipeBridge(inFilename, outFilename)
  48. pb.name = clientName
  49. pb.threeShake = pb.threeShakeClient
  50. go pb.connectionManager()
  51. return pb
  52. }
  53. // NewPipeBridgeService returns a pipe backed IPCBridge for a service
  54. func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
  55. log.Debugf("Making new PipeBridge Service...\n")
  56. pb := newPipeBridge(inFilename, outFilename)
  57. pb.name = serviceName
  58. pb.threeShake = pb.threeShakeService
  59. go pb.connectionManager()
  60. log.Debugf("Successfully created new PipeBridge Service!\n")
  61. return pb
  62. }
  63. func (pb *pipeBridge) setState(state connections.ConnectionState) {
  64. pb.lock.Lock()
  65. defer pb.lock.Unlock()
  66. pb.state = state
  67. }
  68. func (pb *pipeBridge) getState() connections.ConnectionState {
  69. pb.lock.Lock()
  70. defer pb.lock.Unlock()
  71. return pb.state
  72. }
  73. func (pb *pipeBridge) connectionManager() {
  74. for pb.getState() != connections.KILLED {
  75. log.Debugf("clientConnManager loop start init\n")
  76. pb.setState(connections.CONNECTING)
  77. var err error
  78. log.Debugf("%v open file infile\n", pb.name)
  79. pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600)
  80. if err != nil {
  81. pb.setState(connections.DISCONNECTED)
  82. continue
  83. }
  84. log.Debugf("%v open file outfile\n", pb.name)
  85. pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600)
  86. if err != nil {
  87. pb.setState(connections.DISCONNECTED)
  88. continue
  89. }
  90. log.Debugf("Successfully connected PipeBridge %v!\n", pb.name)
  91. pb.handleConns()
  92. }
  93. log.Debugf("exiting %v ConnectionManager\n", pb.name)
  94. }
  95. // threeShake performs a 3way handshake sync up
  96. func (pb *pipeBridge) threeShakeService() bool {
  97. synacked := false
  98. for {
  99. resp, err := pb.readString()
  100. if err != nil {
  101. return false
  102. }
  103. if string(resp) == syn {
  104. if !synacked {
  105. err = pb.writeString([]byte(synack))
  106. if err != nil {
  107. return false
  108. }
  109. synacked = true
  110. }
  111. } else if string(resp) == ack {
  112. return true
  113. }
  114. }
  115. }
  116. func (pb *pipeBridge) synLoop(stop chan bool) {
  117. delay := time.Duration(0)
  118. for {
  119. select {
  120. case <-time.After(delay):
  121. err := pb.writeString([]byte(syn))
  122. if err != nil {
  123. return
  124. }
  125. delay = time.Second
  126. case <-stop:
  127. return
  128. }
  129. }
  130. }
  131. func (pb *pipeBridge) threeShakeClient() bool {
  132. stop := make(chan bool)
  133. go pb.synLoop(stop)
  134. for {
  135. resp, err := pb.readString()
  136. if err != nil {
  137. return false
  138. }
  139. if string(resp) == synack {
  140. stop <- true
  141. err := pb.writeString([]byte(ack))
  142. if err != nil {
  143. return false
  144. }
  145. return true
  146. }
  147. }
  148. }
  149. func (pb *pipeBridge) handleConns() {
  150. if !pb.threeShake() {
  151. pb.setState(connections.FAILED)
  152. pb.closeReset()
  153. return
  154. }
  155. pb.setState(connections.AUTHENTICATED)
  156. pb.closedChan = make(chan bool, 5)
  157. log.Debugf("handleConns authed, %v 2xgo\n", pb.name)
  158. go pb.handleRead()
  159. go pb.handleWrite()
  160. <-pb.closedChan
  161. log.Debugf("handleConns <-closedChan (%v)\n", pb.name)
  162. if pb.getState() != connections.KILLED {
  163. pb.setState(connections.FAILED)
  164. }
  165. pb.closeReset()
  166. log.Debugf("handleConns done for %v, exit\n", pb.name)
  167. }
  168. func (pb *pipeBridge) closeReset() {
  169. pb.in.Close()
  170. pb.out.Close()
  171. close(pb.read)
  172. pb.write.Close()
  173. if pb.getState() != connections.KILLED {
  174. pb.read = make(chan event.IPCMessage, maxBufferSize)
  175. pb.write = newInfiniteChannel()
  176. }
  177. }
  178. func (pb *pipeBridge) handleWrite() {
  179. log.Debugf("handleWrite() %v\n", pb.name)
  180. defer log.Debugf("exiting handleWrite() %v\n", pb.name)
  181. for {
  182. select {
  183. case messageInf := <-pb.write.output:
  184. if messageInf == nil {
  185. pb.closedChan <- true
  186. return
  187. }
  188. message := messageInf.(event.IPCMessage)
  189. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  190. log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType)
  191. } else {
  192. log.Debugf("handleWrite <- message: %v\n", message)
  193. }
  194. if pb.getState() == connections.AUTHENTICATED {
  195. encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
  196. for k, v := range message.Message.Data {
  197. encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v))
  198. }
  199. messageJSON, _ := json.Marshal(encMessage)
  200. err := pb.writeString(messageJSON)
  201. if err != nil {
  202. pb.closedChan <- true
  203. return
  204. }
  205. } else {
  206. return
  207. }
  208. }
  209. }
  210. }
  211. func (pb *pipeBridge) handleRead() {
  212. log.Debugf("handleRead() %v\n", pb.name)
  213. defer log.Debugf("exiting handleRead() %v", pb.name)
  214. for {
  215. log.Debugf("Waiting to handleRead()...\n")
  216. buffer, err := pb.readString()
  217. if err != nil {
  218. pb.closedChan <- true
  219. return
  220. }
  221. var message event.IPCMessage
  222. err = json.Unmarshal(buffer, &message)
  223. if err != nil {
  224. log.Errorf("Read error: '%v', value: '%v'", err, buffer)
  225. pb.closedChan <- true
  226. return // probably new connection trying to initialize
  227. }
  228. for k, v := range message.Message.Data {
  229. val, _ := base64.StdEncoding.DecodeString(v)
  230. message.Message.Data[k] = string(val)
  231. }
  232. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  233. log.Debugf("handleRead read<-: %v %v ...\n", message.Dest, message.Message.EventType)
  234. } else {
  235. log.Debugf("handleRead read<-: %v\n", message)
  236. }
  237. pb.read <- message
  238. log.Debugf("handleRead wrote\n")
  239. }
  240. }
  241. func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
  242. log.Debugf("Read() %v...\n", pb.name)
  243. var ok = false
  244. var message event.IPCMessage
  245. for !ok && pb.getState() != connections.KILLED {
  246. message, ok = <-pb.read
  247. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  248. log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
  249. } else {
  250. log.Debugf("Read %v: %v\n", pb.name, message)
  251. }
  252. }
  253. return &message, pb.getState() != connections.KILLED
  254. }
  255. func (pb *pipeBridge) Write(message *event.IPCMessage) {
  256. if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
  257. log.Debugf("Write %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
  258. } else {
  259. log.Debugf("Write %v: %v\n", pb.name, message)
  260. }
  261. pb.write.input <- *message
  262. log.Debugf("Wrote\n")
  263. }
  264. func (pb *pipeBridge) Shutdown() {
  265. log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.getState()])
  266. pb.state = connections.KILLED
  267. pb.closedChan <- true
  268. log.Debugf("Done Shutdown for %v\n", pb.name)
  269. }
  270. func (pb *pipeBridge) writeString(message []byte) error {
  271. size := make([]byte, 2)
  272. binary.LittleEndian.PutUint16(size, uint16(len(message)))
  273. pb.out.Write(size)
  274. for pos := 0; pos < len(message); {
  275. n, err := pb.out.Write(message[pos:])
  276. if err != nil {
  277. log.Errorf("Writing out on pipeBridge: %v\n", err)
  278. return err
  279. }
  280. pos += n
  281. }
  282. return nil
  283. }
  284. func (pb *pipeBridge) readString() ([]byte, error) {
  285. var n int
  286. size := make([]byte, 2)
  287. var err error
  288. n, err = pb.in.Read(size)
  289. if err != nil || n != 2 {
  290. log.Errorf("Could not read len int from stream: %v\n", err)
  291. return nil, err
  292. }
  293. n = int(binary.LittleEndian.Uint16(size))
  294. pos := 0
  295. buffer := make([]byte, n)
  296. for n > 0 {
  297. m, err := pb.in.Read(buffer[pos:])
  298. if err != nil {
  299. log.Errorf("Reading into buffer from pipe: %v\n", err)
  300. return nil, err
  301. }
  302. n -= m
  303. pos += m
  304. }
  305. return buffer, nil
  306. }