Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

engine.go 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. package connections
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/protocol"
  5. "cwtch.im/tapir"
  6. "cwtch.im/tapir/applications"
  7. "cwtch.im/tapir/networks/tor"
  8. "cwtch.im/tapir/primitives"
  9. "errors"
  10. "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
  11. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  12. "git.openprivacy.ca/openprivacy/libricochet-go/utils"
  13. "golang.org/x/crypto/ed25519"
  14. "sync"
  15. "time"
  16. )
  17. type engine struct {
  18. queue event.Queue
  19. connectionsManager *Manager
  20. // Engine Attributes
  21. identity primitives.Identity
  22. acn connectivity.ACN
  23. // Engine State
  24. started bool
  25. // Blocklist
  26. blocked sync.Map
  27. // Block Unknown Contacts
  28. blockUnknownContacts bool
  29. // Pointer to the Global Event Manager
  30. eventManager event.Manager
  31. // Nextgen Tapir Service
  32. service tapir.Service
  33. // Required for listen(), inaccessible from identity
  34. privateKey ed25519.PrivateKey
  35. shuttingDown bool
  36. }
  37. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
  38. // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
  39. type Engine interface {
  40. ACN() connectivity.ACN
  41. EventManager() event.Manager
  42. Shutdown()
  43. }
  44. // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
  45. func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, knownPeers []string, blockedPeers []string) Engine {
  46. engine := new(engine)
  47. engine.identity = identity
  48. engine.privateKey = privateKey
  49. engine.queue = event.NewQueue()
  50. go engine.eventHandler()
  51. engine.acn = acn
  52. engine.connectionsManager = NewConnectionsManager(engine.acn)
  53. go engine.connectionsManager.AttemptReconnections()
  54. // Init the Server running the Simple App.
  55. engine.service = new(tor.BaseOnionService)
  56. engine.service.Init(acn, privateKey, &identity)
  57. engine.eventManager = eventManager
  58. engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
  59. engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
  60. engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
  61. engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
  62. engine.eventManager.Subscribe(event.JoinServer, engine.queue)
  63. engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue)
  64. engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue)
  65. engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
  66. engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)
  67. engine.eventManager.Subscribe(event.BlockPeer, engine.queue)
  68. engine.eventManager.Subscribe(event.UnblockPeer, engine.queue)
  69. engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
  70. engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
  71. for _, peer := range knownPeers {
  72. engine.blocked.Store(peer, false)
  73. }
  74. for _, peer := range blockedPeers {
  75. engine.blocked.Store(peer, true)
  76. }
  77. return engine
  78. }
  79. func (e *engine) ACN() connectivity.ACN {
  80. return e.acn
  81. }
  82. func (e *engine) EventManager() event.Manager {
  83. return e.eventManager
  84. }
  85. // eventHandler process events from other subsystems
  86. func (e *engine) eventHandler() {
  87. for {
  88. ev := e.queue.Next()
  89. switch ev.EventType {
  90. case event.StatusRequest:
  91. e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
  92. case event.PeerRequest:
  93. if utils.IsValidHostname(ev.Data[event.RemotePeer]) {
  94. e.blocked.Store(ev.Data[event.RemotePeer], false)
  95. go e.peerWithOnion(ev.Data[event.RemotePeer])
  96. }
  97. case event.RetryPeerRequest:
  98. // This event allows engine to treat (automated) retry peering requests differently to user-specified
  99. // peer events
  100. if utils.IsValidHostname(ev.Data[event.RemotePeer]) {
  101. go e.peerWithOnion(ev.Data[event.RemotePeer])
  102. }
  103. case event.InvitePeerToGroup:
  104. e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
  105. case event.JoinServer:
  106. e.joinServer(ev.Data[event.GroupServer])
  107. case event.DeleteContact:
  108. onion := ev.Data[event.RemotePeer]
  109. // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
  110. e.blocked.Delete(ev.Data[event.RemotePeer])
  111. e.deleteConnection(onion)
  112. case event.DeleteGroup:
  113. // TODO: There isn't a way here to determine if other Groups are using a server connection...
  114. case event.SendMessageToGroup:
  115. e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
  116. case event.SendMessageToPeer:
  117. // TODO: remove this passthrough once the UI is integrated.
  118. context, ok := ev.Data[event.EventContext]
  119. if !ok {
  120. context = event.ContextRaw
  121. }
  122. err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
  123. if err != nil {
  124. e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
  125. }
  126. case event.UnblockPeer:
  127. // We simply remove the peer from our blocklist
  128. // The UI has the responsibility to reinitiate contact with the peer.
  129. // (this should happen periodically in any case)
  130. e.blocked.Store(ev.Data[event.RemotePeer], false)
  131. case event.BlockPeer:
  132. e.blocked.Store(ev.Data[event.RemotePeer], true)
  133. connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
  134. if connection != nil && err == nil {
  135. connection.Close()
  136. }
  137. // Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before
  138. // an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because
  139. // there isn't an active connection and we are stuck waiting for tor to time out)
  140. e.peerDisconnected(ev.Data[event.RemotePeer])
  141. case event.AllowUnknownPeers:
  142. e.blockUnknownContacts = false
  143. case event.BlockUnknownPeers:
  144. e.blockUnknownContacts = true
  145. case event.ProtocolEngineStartListen:
  146. go e.listenFn()
  147. default:
  148. return
  149. }
  150. }
  151. }
  152. func (e *engine) createPeerTemplate() *PeerApp {
  153. peerAppTemplate := new(PeerApp)
  154. peerAppTemplate.IsBlocked = func(onion string) bool {
  155. blocked, known := e.blocked.Load(onion)
  156. if !known {
  157. // if we block unknown peers we will block this contact
  158. return e.blockUnknownContacts
  159. }
  160. return blocked.(bool)
  161. }
  162. peerAppTemplate.MessageHandler = e.handlePeerMessage
  163. peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck)
  164. peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
  165. peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
  166. peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
  167. return peerAppTemplate
  168. }
  169. // Listen sets up an onion listener to process incoming cwtch messages
  170. func (e *engine) listenFn() {
  171. err := e.service.Listen(e.createPeerTemplate())
  172. if !e.shuttingDown {
  173. e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
  174. }
  175. return
  176. }
  177. // Shutdown tears down the eventHandler goroutine
  178. func (e *engine) Shutdown() {
  179. e.shuttingDown = true
  180. e.connectionsManager.Shutdown()
  181. e.service.Shutdown()
  182. e.queue.Shutdown()
  183. }
  184. // peerWithOnion is the entry point for cwtchPeer relationships
  185. // needs to be run in a goroutine as will block on Open.
  186. func (e *engine) peerWithOnion(onion string) {
  187. blocked, known := e.blocked.Load(onion)
  188. if known && !(blocked.(bool)) {
  189. e.ignoreOnShutdown(e.peerConnecting)(onion)
  190. connected, err := e.service.Connect(onion, e.createPeerTemplate())
  191. // If we are already connected...check if we are authed and issue an auth event
  192. // (This allows the ui to be stateless)
  193. if connected && err != nil {
  194. conn, err := e.service.GetConnection(onion)
  195. if err == nil {
  196. if conn.HasCapability(applications.AuthCapability) {
  197. e.ignoreOnShutdown(e.peerAuthed)(onion)
  198. return
  199. }
  200. }
  201. }
  202. // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
  203. if !connected && err != nil {
  204. e.ignoreOnShutdown(e.peerDisconnected)(onion)
  205. }
  206. }
  207. }
  208. func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
  209. return func(x string) {
  210. if !e.shuttingDown {
  211. f(x)
  212. }
  213. }
  214. }
  215. func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
  216. return func(x, y string) {
  217. if !e.shuttingDown {
  218. f(x, y)
  219. }
  220. }
  221. }
  222. func (e *engine) peerAuthed(onion string) {
  223. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  224. event.RemotePeer: string(onion),
  225. event.ConnectionState: ConnectionStateName[AUTHENTICATED],
  226. }))
  227. }
  228. func (e *engine) peerConnecting(onion string) {
  229. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  230. event.RemotePeer: string(onion),
  231. event.ConnectionState: ConnectionStateName[CONNECTING],
  232. }))
  233. }
  234. func (e *engine) peerAck(onion string, eventID string) {
  235. e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{
  236. event.EventID: eventID,
  237. event.RemotePeer: onion,
  238. }))
  239. }
  240. func (e *engine) peerDisconnected(onion string) {
  241. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  242. event.RemotePeer: string(onion),
  243. event.ConnectionState: ConnectionStateName[DISCONNECTED],
  244. }))
  245. }
  246. // sendMessageToPeer sends a message to a peer under a given context
  247. func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
  248. conn, err := e.service.GetConnection(onion)
  249. if err == nil {
  250. peerApp, ok := (conn.App()).(*PeerApp)
  251. if ok {
  252. peerApp.SendMessage(PeerMessage{eventID, context, message})
  253. return nil
  254. }
  255. return errors.New("failed type assertion conn.App != PeerApp")
  256. }
  257. return err
  258. }
  259. func (e *engine) deleteConnection(id string) {
  260. conn, err := e.service.GetConnection(id)
  261. if err == nil {
  262. conn.Close()
  263. }
  264. }
  265. // receiveGroupMessage is a callback function that processes GroupMessages from a given server
  266. func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
  267. // Publish Event so that a Profile Engine can deal with it.
  268. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
  269. e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
  270. }
  271. // joinServer manages a new server connection with the given onion address
  272. func (e *engine) joinServer(onion string) {
  273. e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage)
  274. }
  275. // sendMessageToGroup attempts to sent the given message to the given group id.
  276. func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
  277. psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
  278. if psc == nil {
  279. e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: "server is offline or the connection has yet to finalize"}))
  280. }
  281. gm := &protocol.GroupMessage{
  282. Ciphertext: ct,
  283. Signature: sig,
  284. }
  285. err := psc.SendGroupMessage(gm)
  286. if err != nil {
  287. e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: err.Error()}))
  288. }
  289. }
  290. func (e *engine) handlePeerMessage(hostname string, context string, message []byte) {
  291. log.Debugf("New message from peer: %v %v", hostname, context)
  292. if context == event.ContextInvite {
  293. e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(message)}))
  294. } else {
  295. e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
  296. }
  297. }