Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

313 lines
11 KiB

  1. package connections
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/protocol"
  5. "cwtch.im/tapir"
  6. "cwtch.im/tapir/applications"
  7. "cwtch.im/tapir/networks/tor"
  8. "errors"
  9. "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
  10. "git.openprivacy.ca/openprivacy/libricochet-go/identity"
  11. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  12. "github.com/golang/protobuf/proto"
  13. "golang.org/x/crypto/ed25519"
  14. "sync"
  15. "time"
  16. )
  17. type engine struct {
  18. queue *event.Queue
  19. connectionsManager *Manager
  20. // Engine Attributes
  21. identity identity.Identity
  22. acn connectivity.ACN
  23. // Engine State
  24. started bool
  25. // Blocklist
  26. blocked sync.Map
  27. // Pointer to the Global Event Manager
  28. eventManager event.Manager
  29. // Nextgen Tapir Service
  30. service tapir.Service
  31. // Required for listen(), inaccessible from identity
  32. privateKey ed25519.PrivateKey
  33. shuttingDown bool
  34. }
  35. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
  36. // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
  37. type Engine interface {
  38. Identity() identity.Identity
  39. ACN() connectivity.ACN
  40. EventManager() event.Manager
  41. Shutdown()
  42. }
  43. // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
  44. func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, blockedPeers []string) Engine {
  45. engine := new(engine)
  46. engine.identity = identity
  47. engine.privateKey = privateKey
  48. engine.queue = event.NewEventQueue(100)
  49. go engine.eventHandler()
  50. engine.acn = acn
  51. engine.connectionsManager = NewConnectionsManager(engine.acn)
  52. go engine.connectionsManager.AttemptReconnections()
  53. // Init the Server running the Simple App.
  54. engine.service = new(tor.BaseOnionService)
  55. engine.service.Init(acn, privateKey, identity)
  56. engine.eventManager = eventManager
  57. engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
  58. engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
  59. engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
  60. engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
  61. engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
  62. engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
  63. engine.eventManager.Subscribe(event.DeleteContact, engine.queue.EventChannel)
  64. engine.eventManager.Subscribe(event.DeleteGroup, engine.queue.EventChannel)
  65. engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel)
  66. for _, peer := range blockedPeers {
  67. engine.blocked.Store(peer, true)
  68. }
  69. return engine
  70. }
  71. func (e *engine) ACN() connectivity.ACN {
  72. return e.acn
  73. }
  74. func (e *engine) Identity() identity.Identity {
  75. return e.identity
  76. }
  77. func (e *engine) EventManager() event.Manager {
  78. return e.eventManager
  79. }
  80. // eventHandler process events from other subsystems
  81. func (e *engine) eventHandler() {
  82. for {
  83. ev := e.queue.Next()
  84. switch ev.EventType {
  85. case event.StatusRequest:
  86. e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
  87. case event.PeerRequest:
  88. go e.peerWithOnion(ev.Data[event.RemotePeer])
  89. case event.InvitePeerToGroup:
  90. e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
  91. case event.JoinServer:
  92. e.joinServer(ev.Data[event.GroupServer])
  93. case event.DeleteContact:
  94. onion := ev.Data[event.RemotePeer]
  95. e.deleteConnection(onion)
  96. case event.DeleteGroup:
  97. // TODO: There isn't a way here to determine if other Groups are using a server connection...
  98. case event.SendMessageToGroup:
  99. e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
  100. case event.SendMessageToPeer:
  101. // TODO: remove this passthrough once the UI is integrated.
  102. context, ok := ev.Data[event.EventContext]
  103. if !ok {
  104. context = event.ContextRaw
  105. }
  106. err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
  107. if err != nil {
  108. e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
  109. }
  110. case event.BlockPeer:
  111. e.blocked.Store(ev.Data[event.RemotePeer], true)
  112. connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
  113. if connection != nil && err == nil {
  114. connection.Close()
  115. }
  116. // Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before
  117. // an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because
  118. // there isn't an active connection and we are stuck waiting for tor to time out)
  119. e.peerDisconnected(ev.Data[event.RemotePeer])
  120. case event.ProtocolEngineStartListen:
  121. go e.listenFn()
  122. default:
  123. return
  124. }
  125. }
  126. }
  127. func (e *engine) createPeerTemplate() *PeerApp {
  128. peerAppTemplate := new(PeerApp)
  129. peerAppTemplate.IsBlocked = func(onion string) bool {
  130. _, blocked := e.blocked.Load(onion)
  131. return blocked
  132. }
  133. peerAppTemplate.MessageHandler = e.handlePeerMessage
  134. peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck)
  135. peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
  136. peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
  137. peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
  138. return peerAppTemplate
  139. }
  140. // Listen sets up an onion listener to process incoming cwtch messages
  141. func (e *engine) listenFn() {
  142. err := e.service.Listen(e.createPeerTemplate())
  143. if !e.shuttingDown {
  144. e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
  145. }
  146. return
  147. }
  148. // Shutdown tears down the eventHandler goroutine
  149. func (e *engine) Shutdown() {
  150. e.shuttingDown = true
  151. e.connectionsManager.Shutdown()
  152. e.service.Shutdown()
  153. e.queue.Shutdown()
  154. }
  155. // peerWithOnion is the entry point for cwtchPeer relationships
  156. // needs to be run in a goroutine as will block on Open.
  157. func (e *engine) peerWithOnion(onion string) {
  158. _, blocked := e.blocked.Load(onion)
  159. if !blocked {
  160. e.ignoreOnShutdown(e.peerConnecting)(onion)
  161. connected, err := e.service.Connect(onion, e.createPeerTemplate())
  162. // If we are already connected...check if we are authed and issue an auth event
  163. // (This allows the ui to be stateless)
  164. if connected && err != nil {
  165. conn, err := e.service.GetConnection(onion)
  166. if err == nil {
  167. if conn.HasCapability(applications.AuthCapability) {
  168. e.ignoreOnShutdown(e.peerAuthed)(onion)
  169. return
  170. }
  171. }
  172. }
  173. // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
  174. if !connected && err != nil {
  175. e.ignoreOnShutdown(e.peerDisconnected)(onion)
  176. }
  177. }
  178. }
  179. func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
  180. return func(x string) {
  181. if !e.shuttingDown {
  182. f(x)
  183. }
  184. }
  185. }
  186. func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
  187. return func(x, y string) {
  188. if !e.shuttingDown {
  189. f(x, y)
  190. }
  191. }
  192. }
  193. func (e *engine) peerAuthed(onion string) {
  194. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  195. event.RemotePeer: string(onion),
  196. event.ConnectionState: ConnectionStateName[AUTHENTICATED],
  197. }))
  198. }
  199. func (e *engine) peerConnecting(onion string) {
  200. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  201. event.RemotePeer: string(onion),
  202. event.ConnectionState: ConnectionStateName[CONNECTING],
  203. }))
  204. }
  205. func (e *engine) peerAck(onion string, eventID string) {
  206. e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{
  207. event.EventID: eventID,
  208. event.RemotePeer: onion,
  209. }))
  210. }
  211. func (e *engine) peerDisconnected(onion string) {
  212. e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
  213. event.RemotePeer: string(onion),
  214. event.ConnectionState: ConnectionStateName[DISCONNECTED],
  215. }))
  216. }
  217. // sendMessageToPeer sends a message to a peer under a given context
  218. func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
  219. conn, err := e.service.GetConnection(onion)
  220. if err == nil {
  221. peerApp, ok := conn.App.(*PeerApp)
  222. if ok {
  223. peerApp.SendMessage(PeerMessage{eventID, context, message})
  224. return nil
  225. }
  226. return errors.New("failed type assertion conn.App != PeerApp")
  227. }
  228. return err
  229. }
  230. func (e *engine) deleteConnection(id string) {
  231. conn, err := e.service.GetConnection(id)
  232. if err == nil {
  233. conn.Close()
  234. }
  235. }
  236. // receiveGroupMessage is a callback function that processes GroupMessages from a given server
  237. func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
  238. // Publish Event so that a Profile Engine can deal with it.
  239. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
  240. e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
  241. }
  242. // joinServer manages a new server connection with the given onion address
  243. func (e *engine) joinServer(onion string) {
  244. e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage)
  245. }
  246. // sendMessageToGroup attempts to sent the given message to the given group id.
  247. func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
  248. psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
  249. if psc == nil {
  250. e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: "server is offline or the connection has yet to finalize"}))
  251. }
  252. gm := &protocol.GroupMessage{
  253. Ciphertext: ct,
  254. Signature: sig,
  255. }
  256. err := psc.SendGroupMessage(gm)
  257. if err != nil {
  258. e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: err.Error()}))
  259. }
  260. }
  261. func (e *engine) handlePeerMessage(hostname string, context string, message []byte) {
  262. log.Debugf("New message from peer: %v %v", hostname, context)
  263. if context == event.ContextInvite {
  264. cpp := &protocol.CwtchPeerPacket{}
  265. err := proto.Unmarshal(message, cpp)
  266. if err == nil && cpp.GetGroupChatInvite() != nil {
  267. marshal, _ := proto.Marshal(cpp.GetGroupChatInvite())
  268. e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(marshal)}))
  269. }
  270. } else {
  271. e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
  272. }
  273. }