Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

299 lines
10KB

  1. package connections
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/protocol"
  5. "cwtch.im/tapir"
  6. "cwtch.im/tapir/applications"
  7. "cwtch.im/tapir/networks/tor"
  8. "errors"
  9. "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
  10. "git.openprivacy.ca/openprivacy/libricochet-go/identity"
  11. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  12. "github.com/golang/protobuf/proto"
  13. "golang.org/x/crypto/ed25519"
  14. "sync"
  15. "time"
  16. )
  17. type engine struct {
  18. queue *event.Queue
  19. connectionsManager *Manager
  20. // Engine Attributes
  21. identity identity.Identity
  22. acn connectivity.ACN
  23. // Engine State
  24. started bool
  25. // Blocklist
  26. blocked sync.Map
  27. // Pointer to the Global Event Manager
  28. eventManager event.Manager
  29. // Nextgen Tapir Service
  30. service tapir.Service
  31. // Required for listen(), inaccessible from identity
  32. privateKey ed25519.PrivateKey
  33. shuttingDown bool
  34. }
  35. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
  36. // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
  37. type Engine interface {
  38. Identity() identity.Identity
  39. ACN() connectivity.ACN
  40. EventManager() event.Manager
  41. Shutdown()
  42. }
  43. // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
  44. func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, blockedPeers []string) Engine {
  45. engine := new(engine)
  46. engine.identity = identity
  47. engine.privateKey = privateKey
  48. engine.queue = event.NewEventQueue(100)
  49. go engine.eventHandler()
  50. engine.acn = acn
  51. engine.connectionsManager = NewConnectionsManager(engine.acn)
  52. go engine.connectionsManager.AttemptReconnections()
  53. // Init the Server running the Simple App.
  54. engine.service = new(tor.BaseOnionService)
  55. engine.service.Init(acn, privateKey, identity)
  56. engine.eventManager = eventManager
  57. engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
  58. engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
  59. engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
  60. engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
  61. engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
  62. engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
  63. engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel)
  64. for _, peer := range blockedPeers {
  65. engine.blocked.Store(peer, true)
  66. }
  67. return engine
  68. }
  69. func (e *engine) ACN() connectivity.ACN {
  70. return e.acn
  71. }
  72. func (e *engine) Identity() identity.Identity {
  73. return e.identity
  74. }
  75. func (e *engine) EventManager() event.Manager {
  76. return e.eventManager
  77. }
  78. // eventHandler process events from other subsystems
  79. func (e *engine) eventHandler() {
  80. for {
  81. ev := e.queue.Next()
  82. switch ev.EventType {
  83. case event.StatusRequest:
  84. e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
  85. case event.PeerRequest:
  86. go e.peerWithOnion(ev.Data[event.RemotePeer])
  87. case event.InvitePeerToGroup:
  88. e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
  89. case event.JoinServer:
  90. e.joinServer(ev.Data[event.GroupServer])
  91. case event.SendMessageToGroup:
  92. e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
  93. case event.SendMessageToPeer:
  94. // TODO: remove this passthrough once the UI is integrated.
  95. context, ok := ev.Data[event.EventContext]
  96. if !ok {
  97. context = event.ContextRaw
  98. }
  99. err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
  100. if err != nil {
  101. e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
  102. }
  103. case event.BlockPeer:
  104. e.blocked.Store(ev.Data[event.RemotePeer], true)
  105. connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
  106. if connection != nil && err == nil {
  107. connection.Close()
  108. }
  109. // Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before
  110. // an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because
  111. // there isn't an active connection and we are stuck waiting for tor to time out)
  112. e.peerDisconnected(ev.Data[event.RemotePeer])
  113. case event.ProtocolEngineStartListen:
  114. go e.listenFn()
  115. default:
  116. return
  117. }
  118. }
  119. }
  120. func (e *engine) createPeerTemplate() *PeerApp {
  121. peerAppTemplate := new(PeerApp)
  122. peerAppTemplate.IsBlocked = func(onion string) bool {
  123. _, blocked := e.blocked.Load(onion)
  124. return blocked
  125. }
  126. peerAppTemplate.MessageHandler = e.handlePeerMessage
  127. peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck)
  128. peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
  129. peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
  130. peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
  131. return peerAppTemplate
  132. }
  133. // Listen sets up an onion listener to process incoming cwtch messages
  134. func (e *engine) listenFn() {
  135. err := e.service.Listen(e.createPeerTemplate())
  136. if !e.shuttingDown {
  137. e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
  138. }
  139. return
  140. }
  141. // Shutdown tears down the eventHandler goroutine
  142. func (e *engine) Shutdown() {
  143. e.shuttingDown = true
  144. e.connectionsManager.Shutdown()
  145. e.service.Shutdown()
  146. e.queue.Shutdown()
  147. }
  148. // peerWithOnion is the entry point for cwtchPeer relationships
  149. // needs to be run in a goroutine as will block on Open.
  150. func (e *engine) peerWithOnion(onion string) {
  151. _, blocked := e.blocked.Load(onion)
  152. if !blocked {
  153. e.ignoreOnShutdown(e.peerConnecting)(onion)
  154. connected, err := e.service.Connect(onion, e.createPeerTemplate())
  155. // If we are already connected...check if we are authed and issue an auth event
  156. // (This allows the ui to be stateless)
  157. if connected && err != nil {
  158. conn, err := e.service.GetConnection(onion)
  159. if err == nil {
  160. if conn.HasCapability(applications.AuthCapability) {
  161. e.ignoreOnShutdown(e.peerAuthed)(onion)
  162. return
  163. }
  164. }
  165. }
  166. // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
  167. if !connected && err != nil {
  168. e.ignoreOnShutdown(e.peerDisconnected)(onion)
  169. }
  170. }
  171. }
  172. func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
  173. return func(x string) {
  174. if !e.shuttingDown {
  175. f(x)
  176. }
  177. }
  178. }
  179. func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
  180. return func(x, y string) {
  181. if !e.shuttingDown {
  182. f(x, y)
  183. }
  184. }
  185. }
  186. func (e *engine) peerAuthed(onion string) {
  187. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  188. event.RemotePeer: string(onion),
  189. event.ConnectionState: ConnectionStateName[AUTHENTICATED],
  190. }))
  191. }
  192. func (e *engine) peerConnecting(onion string) {
  193. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  194. event.RemotePeer: string(onion),
  195. event.ConnectionState: ConnectionStateName[CONNECTING],
  196. }))
  197. }
  198. func (e *engine) peerAck(onion string, eventID string) {
  199. e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{
  200. event.EventID: eventID,
  201. event.RemotePeer: onion,
  202. }))
  203. }
  204. func (e *engine) peerDisconnected(onion string) {
  205. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  206. event.RemotePeer: string(onion),
  207. event.ConnectionState: ConnectionStateName[DISCONNECTED],
  208. }))
  209. }
  210. // sendMessageToPeer sends a message to a peer under a given context
  211. func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
  212. conn, err := e.service.GetConnection(onion)
  213. if err == nil {
  214. peerApp, ok := conn.App.(*PeerApp)
  215. if ok {
  216. peerApp.SendMessage(PeerMessage{eventID, context, message})
  217. return nil
  218. }
  219. return errors.New("failed type assertion conn.App != PeerApp")
  220. }
  221. return err
  222. }
  223. // receiveGroupMessage is a callback function that processes GroupMessages from a given server
  224. func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
  225. // Publish Event so that a Profile Engine can deal with it.
  226. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
  227. e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
  228. }
  229. // joinServer manages a new server connection with the given onion address
  230. func (e *engine) joinServer(onion string) {
  231. e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage)
  232. }
  233. // sendMessageToGroup attempts to sent the given message to the given group id.
  234. func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
  235. psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
  236. if psc == nil {
  237. e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: "server is offline or the connection has yet to finalize"}))
  238. }
  239. gm := &protocol.GroupMessage{
  240. Ciphertext: ct,
  241. Signature: sig,
  242. }
  243. err := psc.SendGroupMessage(gm)
  244. if err != nil {
  245. e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: err.Error()}))
  246. }
  247. }
  248. func (e *engine) handlePeerMessage(hostname string, context string, message []byte) {
  249. log.Debugf("New message from peer: %v %v", hostname, context)
  250. if context == event.ContextInvite {
  251. cpp := &protocol.CwtchPeerPacket{}
  252. err := proto.Unmarshal(message, cpp)
  253. if err == nil && cpp.GetGroupChatInvite() != nil {
  254. marshal, _ := proto.Marshal(cpp.GetGroupChatInvite())
  255. e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(marshal)}))
  256. }
  257. } else {
  258. e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
  259. }
  260. }