Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

engine.go 9.7KB


  1. package connections
  2. import (
  3. "crypto/rsa"
  4. "cwtch.im/cwtch/event"
  5. "cwtch.im/cwtch/protocol"
  6. "cwtch.im/cwtch/protocol/connections/peer"
  7. "errors"
  8. "git.openprivacy.ca/openprivacy/libricochet-go/application"
  9. "git.openprivacy.ca/openprivacy/libricochet-go/channels"
  10. "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
  11. "git.openprivacy.ca/openprivacy/libricochet-go/identity"
  12. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  13. "github.com/golang/protobuf/proto"
  14. "golang.org/x/crypto/ed25519"
  15. "sync"
  16. "time"
  17. )
  18. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
  19. // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
  20. type Engine struct {
  21. queue *event.Queue
  22. connectionsManager *Manager
  23. // Engine Attributes
  24. Identity identity.Identity
  25. ACN connectivity.ACN
  26. app *application.RicochetApplication
  27. // Engine State
  28. started bool
  29. // Blocklist
  30. blocked sync.Map
  31. // Pointer to the Global Event Manager
  32. eventManager *event.Manager
  33. privateKey ed25519.PrivateKey
  34. }
  35. // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
  36. func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) *Engine {
  37. engine := new(Engine)
  38. engine.privateKey = privateKey
  39. engine.queue = event.NewEventQueue(100)
  40. go engine.eventHandler()
  41. engine.ACN = acn
  42. engine.connectionsManager = NewConnectionsManager(engine.ACN)
  43. go engine.connectionsManager.AttemptReconnections()
  44. engine.eventManager = eventManager
  45. engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
  46. engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
  47. engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
  48. engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
  49. engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
  50. engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
  51. engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel)
  52. for _, peer := range blockedPeers {
  53. engine.blocked.Store(peer, true)
  54. }
  55. return engine
  56. }
  57. // eventHandler process events from other subsystems
  58. func (e *Engine) eventHandler() {
  59. for {
  60. ev := e.queue.Next()
  61. switch ev.EventType {
  62. case event.StatusRequest:
  63. e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
  64. case event.PeerRequest:
  65. e.PeerWithOnion(ev.Data[event.RemotePeer])
  66. case event.InvitePeerToGroup:
  67. e.InviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite]))
  68. case event.JoinServer:
  69. e.JoinServer(ev.Data[event.GroupServer])
  70. case event.SendMessageToGroup:
  71. e.SendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
  72. case event.SendMessageToPeer:
  73. log.Debugf("Sending Message to Peer.....")
  74. ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer])
  75. if ppc != nil {
  76. // TODO this will block.
  77. ppc.SendPacket([]byte(ev.Data[event.Data]))
  78. }
  79. case event.BlockPeer:
  80. e.blocked.Store(ev.Data[event.RemotePeer], true)
  81. ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer])
  82. if ppc != nil {
  83. ppc.Close()
  84. }
  85. e.app.Close(ev.Data[event.RemotePeer])
  86. case event.ProtocolEngineStartListen:
  87. go e.listenFn()
  88. default:
  89. return
  90. }
  91. }
  92. }
  93. // GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler
  94. // TODO: There is likely a slightly better way to encapsulate this behavior
  95. func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler {
  96. return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager}
  97. }
  98. // Listen sets up an onion listener to process incoming cwtch messages
  99. func (e *Engine) listenFn() {
  100. ra := new(application.RicochetApplication)
  101. onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort)
  102. if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
  103. e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname(), event.Error: err.Error()}))
  104. return
  105. }
  106. af := application.InstanceFactory{}
  107. af.Init()
  108. af.AddHandler("im.cwtch.peer", func(rai *application.Instance) func() channels.Handler {
  109. cpi := new(CwtchPeerInstance)
  110. cpi.Init(rai, ra)
  111. return func() channels.Handler {
  112. cpc := new(peer.CwtchPeerChannel)
  113. cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
  114. return cpc
  115. }
  116. })
  117. af.AddHandler("im.cwtch.peer.data", func(rai *application.Instance) func() channels.Handler {
  118. cpi := new(CwtchPeerInstance)
  119. cpi.Init(rai, ra)
  120. return func() channels.Handler {
  121. cpc := new(peer.CwtchPeerDataChannel)
  122. cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
  123. return cpc
  124. }
  125. })
  126. ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e)
  127. log.Infof("Running cwtch peer on %v", onionService.AddressFull())
  128. e.started = true
  129. e.app = ra
  130. ra.Run(onionService)
  131. e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname()}))
  132. return
  133. }
  134. // LookupContact is a V2 API Call, we want to reject all V2 Peers
  135. // TODO Deprecate
  136. func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
  137. return false, false
  138. }
  139. // ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface
  140. // TODO Deprecate
  141. func (e *Engine) ContactRequest(name string, message string) string {
  142. return "Rejected"
  143. }
  144. // LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
  145. func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
  146. // TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be
  147. // disregarded by peers, so we set it to false.
  148. if _, blocked := e.blocked.Load(hostname); blocked {
  149. return false, false
  150. }
  151. return true, false
  152. }
  153. // Shutdown tears down the eventHandler goroutine
  154. func (e *Engine) Shutdown() {
  155. e.connectionsManager.Shutdown()
  156. e.app.Shutdown()
  157. e.queue.Shutdown()
  158. }
  159. // PeerWithOnion is the entry point for cwtchPeer relationships
  160. func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection {
  161. return e.connectionsManager.ManagePeerConnection(onion, e)
  162. }
  163. // InviteOnionToGroup kicks off the invite process
  164. func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error {
  165. ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion)
  166. if ppc == nil {
  167. return errors.New("peer connection not setup for onion. peers must be trusted before sending")
  168. }
  169. if ppc.GetState() == AUTHENTICATED {
  170. log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
  171. ppc.SendGroupInvite(invite)
  172. } else {
  173. return errors.New("cannot send invite to onion: peer connection is not ready")
  174. }
  175. return nil
  176. }
  177. // ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
  178. func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
  179. // Publish Event so that a Profile Engine can deal with it.
  180. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
  181. e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
  182. }
  183. // JoinServer manages a new server connection with the given onion address
  184. func (e *Engine) JoinServer(onion string) {
  185. e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage)
  186. }
  187. // SendMessageToGroup attemps to sent the given message to the given group id.
  188. func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) error {
  189. psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
  190. if psc == nil {
  191. return errors.New("could not find server connection to send message to")
  192. }
  193. gm := &protocol.GroupMessage{
  194. Ciphertext: ct,
  195. Signature: sig,
  196. }
  197. err := psc.SendGroupMessage(gm)
  198. return err
  199. }
  200. // GetPeers returns a list of peer connections.
  201. func (e *Engine) GetPeers() map[string]ConnectionState {
  202. return e.connectionsManager.GetPeers()
  203. }
  204. // GetServers returns a list of server connections
  205. func (e *Engine) GetServers() map[string]ConnectionState {
  206. return e.connectionsManager.GetServers()
  207. }
  208. // CwtchPeerInstance encapsulates incoming peer connections
  209. type CwtchPeerInstance struct {
  210. rai *application.Instance
  211. ra *application.RicochetApplication
  212. }
  213. // Init sets up a CwtchPeerInstance
  214. func (cpi *CwtchPeerInstance) Init(rai *application.Instance, ra *application.RicochetApplication) {
  215. cpi.rai = rai
  216. cpi.ra = ra
  217. }
  218. // CwtchPeerHandler encapsulates handling of incoming CwtchPackets
  219. type CwtchPeerHandler struct {
  220. Onion string
  221. EventBus *event.Manager
  222. DataHandler func(string, []byte) []byte
  223. }
  224. // HandleGroupInvite handles incoming GroupInvites
  225. func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
  226. log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
  227. marshal, err := proto.Marshal(gci)
  228. if err == nil {
  229. cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().String(), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)}))
  230. }
  231. }
  232. // HandlePacket handles the Cwtch cwtchPeer Data Channel
  233. func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
  234. cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().String(), event.RemotePeer: cph.Onion, event.Data: string(data)}))
  235. return []byte{} // TODO remove this
  236. }