Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cwtch_peer.go 16KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. package peer
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/model"
  5. "cwtch.im/cwtch/protocol/connections"
  6. "encoding/base32"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
  16. event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
  17. event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true}
  18. // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
  19. type cwtchPeer struct {
  20. Profile *model.Profile
  21. mutex sync.Mutex
  22. shutdown bool
  23. queue event.Queue
  24. eventBus event.Manager
  25. }
  26. // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
  27. // directly implement a cwtchPeer.
  28. type CwtchPeer interface {
  29. Init(event.Manager)
  30. AutoHandleEvents(events []event.Type)
  31. PeerWithOnion(string)
  32. InviteOnionToGroup(string, string) error
  33. SendMessageToPeer(string, string) string
  34. TrustPeer(string) error
  35. BlockPeer(string) error
  36. UnblockPeer(string) error
  37. AcceptInvite(string) error
  38. RejectInvite(string)
  39. DeleteContact(string)
  40. DeleteGroup(string)
  41. JoinServer(string)
  42. SendMessageToGroup(string, string) error
  43. SendMessageToGroupTracked(string, string) (string, error)
  44. GetProfile() *model.Profile
  45. GetPeerState(string) connections.ConnectionState
  46. StartGroup(string) (string, []byte, error)
  47. ImportGroup(string) error
  48. ExportGroup(string) (string, error)
  49. GetGroup(string) *model.Group
  50. GetGroupState(string) connections.ConnectionState
  51. GetGroups() []string
  52. AddContact(nick, onion string, trusted bool)
  53. GetContacts() []string
  54. GetContact(string) *model.PublicProfile
  55. SetAttribute(string, string)
  56. GetAttribute(string) (string, bool)
  57. SetContactAttribute(string, string, string)
  58. GetContactAttribute(string, string) (string, bool)
  59. SetGroupAttribute(string, string, string)
  60. GetGroupAttribute(string, string) (string, bool)
  61. Listen()
  62. StartPeersConnections()
  63. StartGroupConnections()
  64. Shutdown()
  65. }
  66. // NewCwtchPeer creates and returns a new cwtchPeer with the given name.
  67. func NewCwtchPeer(name string) CwtchPeer {
  68. cp := new(cwtchPeer)
  69. cp.Profile = model.GenerateNewProfile(name)
  70. cp.shutdown = false
  71. return cp
  72. }
  73. // FromProfile generates a new peer from a profile.
  74. func FromProfile(profile *model.Profile) CwtchPeer {
  75. cp := new(cwtchPeer)
  76. cp.Profile = profile
  77. cp.shutdown = false
  78. return cp
  79. }
  80. // Init instantiates a cwtchPeer
  81. func (cp *cwtchPeer) Init(eventBus event.Manager) {
  82. cp.queue = event.NewQueue()
  83. go cp.eventHandler()
  84. cp.eventBus = eventBus
  85. cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.PeerError, event.SendMessageToGroupError})
  86. }
  87. // AutoHandleEvents sets an event (if able) to be handled by this peer
  88. func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) {
  89. for _, ev := range events {
  90. if _, exists := autoHandleableEvents[ev]; exists {
  91. cp.eventBus.Subscribe(ev, cp.queue)
  92. } else {
  93. log.Errorf("Peer asked to autohandle event it cannot: %v\n", ev)
  94. }
  95. }
  96. }
  97. // ImportGroup intializes a group from an imported source rather than a peer invite
  98. func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) {
  99. if strings.HasPrefix(exportedInvite, "torv3") {
  100. data, err := base64.StdEncoding.DecodeString(exportedInvite[5:])
  101. if err == nil {
  102. cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{
  103. event.GroupInvite: string(data),
  104. }))
  105. } else {
  106. log.Errorf("error decoding group invite: %v", err)
  107. }
  108. return nil
  109. }
  110. return errors.New("unsupported exported group type")
  111. }
  112. // ExportGroup serializes a group invite so it can be given offline
  113. func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
  114. group := cp.Profile.GetGroup(groupID)
  115. if group != nil {
  116. invite, err := group.Invite(group.GetInitialMessage())
  117. if err == nil {
  118. exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(invite)
  119. return exportedInvite, err
  120. }
  121. }
  122. return "", errors.New("group id could not be found")
  123. }
  124. // StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
  125. func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) {
  126. return cp.StartGroupWithMessage(server, []byte{})
  127. }
  128. // StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error.
  129. func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) {
  130. groupID, invite, err = cp.Profile.StartGroupWithMessage(server, initialMessage)
  131. if err == nil {
  132. group := cp.GetGroup(groupID)
  133. jsobj, err := json.Marshal(group)
  134. if err == nil {
  135. cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
  136. event.Data: string(jsobj),
  137. }))
  138. }
  139. } else {
  140. log.Errorf("error creating group: %v", err)
  141. }
  142. return
  143. }
  144. // GetGroups returns an unordered list of all group IDs.
  145. func (cp *cwtchPeer) GetGroups() []string {
  146. return cp.Profile.GetGroups()
  147. }
  148. // GetGroup returns a pointer to a specific group, nil if no group exists.
  149. func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
  150. return cp.Profile.GetGroup(groupID)
  151. }
  152. func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) {
  153. decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
  154. pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}}
  155. cp.Profile.AddContact(onion, pp)
  156. pd, _ := json.Marshal(pp)
  157. cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
  158. event.Data: string(pd),
  159. event.RemotePeer: onion,
  160. }))
  161. }
  162. // GetContacts returns an unordered list of onions
  163. func (cp *cwtchPeer) GetContacts() []string {
  164. return cp.Profile.GetContacts()
  165. }
  166. // GetContact returns a given contact, nil is no such contact exists
  167. func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
  168. contact, _ := cp.Profile.GetContact(onion)
  169. return contact
  170. }
  171. // GetProfile returns the profile associated with this cwtchPeer.
  172. func (cp *cwtchPeer) GetProfile() *model.Profile {
  173. return cp.Profile
  174. }
  175. func (cp *cwtchPeer) GetPeerState(onion string) connections.ConnectionState {
  176. return connections.ConnectionStateToType[cp.Profile.Contacts[onion].State]
  177. }
  178. func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState {
  179. return connections.ConnectionStateToType[cp.Profile.Groups[groupid].State]
  180. }
  181. // PeerWithOnion is the entry point for cwtchPeer relationships
  182. func (cp *cwtchPeer) PeerWithOnion(onion string) {
  183. if _, exists := cp.Profile.GetContact(onion); !exists {
  184. cp.AddContact(onion, onion, false)
  185. }
  186. cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
  187. }
  188. // DeleteContact deletes a peer from the profile, storage, and handling
  189. func (cp *cwtchPeer) DeleteContact(onion string) {
  190. cp.Profile.DeleteContact(onion)
  191. cp.eventBus.Publish(event.NewEventList(event.DeleteContact, event.RemotePeer, onion))
  192. }
  193. // DeleteGroup deletes a Group from the profile, storage, and handling
  194. func (cp *cwtchPeer) DeleteGroup(groupID string) {
  195. cp.Profile.DeleteGroup(groupID)
  196. cp.eventBus.Publish(event.NewEventList(event.DeleteGroup, event.GroupID, groupID))
  197. }
  198. // InviteOnionToGroup kicks off the invite process
  199. func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
  200. group := cp.Profile.GetGroup(groupid)
  201. if group == nil {
  202. return errors.New("invalid group id")
  203. }
  204. invite, err := group.Invite(group.InitialMessage)
  205. if err == nil {
  206. cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[event.Field]string{event.RemotePeer: onion, event.GroupInvite: string(invite)}))
  207. }
  208. return err
  209. }
  210. // JoinServer manages a new server connection with the given onion address
  211. func (cp *cwtchPeer) JoinServer(onion string) {
  212. cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion}))
  213. }
  214. // SendMessageToGroup attempts to sent the given message to the given group id.
  215. // TODO: Deprecate in favour of SendMessageToGroupTracked
  216. func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
  217. _, err := cp.SendMessageToGroupTracked(groupid, message)
  218. return err
  219. }
  220. // SendMessageToGroup attempts to sent the given message to the given group id.
  221. // It returns the signature of the message which can be used to identify it in any UX layer.
  222. func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
  223. group := cp.Profile.GetGroup(groupid)
  224. if group == nil {
  225. return "", errors.New("invalid group id")
  226. }
  227. ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
  228. if err == nil {
  229. cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: string(ct), event.Signature: string(sig)}))
  230. }
  231. return string(sig), err
  232. }
  233. func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
  234. event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
  235. cp.eventBus.Publish(event)
  236. cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID)
  237. return event.EventID
  238. }
  239. // TrustPeer sets an existing peer relationship to trusted
  240. func (cp *cwtchPeer) TrustPeer(peer string) error {
  241. err := cp.Profile.TrustPeer(peer)
  242. if err == nil {
  243. cp.PeerWithOnion(peer)
  244. }
  245. return err
  246. }
  247. // BlockPeer blocks an existing peer relationship.
  248. func (cp *cwtchPeer) BlockPeer(peer string) error {
  249. err := cp.Profile.BlockPeer(peer)
  250. cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[event.Field]string{event.RemotePeer: peer}))
  251. return err
  252. }
  253. // UnblockPeer blocks an existing peer relationship.
  254. func (cp *cwtchPeer) UnblockPeer(peer string) error {
  255. err := cp.Profile.UnblockPeer(peer)
  256. cp.eventBus.Publish(event.NewEvent(event.UnblockPeer, map[event.Field]string{event.RemotePeer: peer}))
  257. return err
  258. }
  259. // AcceptInvite accepts a given existing group invite
  260. func (cp *cwtchPeer) AcceptInvite(groupID string) error {
  261. err := cp.Profile.AcceptInvite(groupID)
  262. if err != nil {
  263. return err
  264. }
  265. cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
  266. cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
  267. return nil
  268. }
  269. // RejectInvite rejects a given group invite.
  270. func (cp *cwtchPeer) RejectInvite(groupID string) {
  271. cp.Profile.RejectInvite(groupID)
  272. }
  273. // Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
  274. func (cp *cwtchPeer) Listen() {
  275. log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n")
  276. cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion}))
  277. }
  278. // StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager)
  279. func (cp *cwtchPeer) StartPeersConnections() {
  280. for _, contact := range cp.GetContacts() {
  281. cp.PeerWithOnion(contact)
  282. }
  283. }
  284. // StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager)
  285. func (cp *cwtchPeer) StartGroupConnections() {
  286. joinedServers := map[string]bool{}
  287. for _, groupID := range cp.GetGroups() {
  288. // Only send a join server packet if we haven't joined this server yet...
  289. group := cp.GetGroup(groupID)
  290. if joined := joinedServers[groupID]; group.Accepted && !joined {
  291. log.Infof("Join Server %v (%v)\n", group.GroupServer, joined)
  292. cp.JoinServer(group.GroupServer)
  293. joinedServers[group.GroupServer] = true
  294. }
  295. }
  296. }
  297. // SetAttribute sets an attribute for this profile and emits an event
  298. func (cp *cwtchPeer) SetAttribute(key string, val string) {
  299. cp.Profile.SetAttribute(key, val)
  300. cp.eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{
  301. event.Key: key,
  302. event.Data: val,
  303. }))
  304. }
  305. // GetAttribute gets an attribute for the profile
  306. func (cp *cwtchPeer) GetAttribute(key string) (string, bool) {
  307. return cp.Profile.GetAttribute(key)
  308. }
  309. // SetContactAttribute sets an attribute for the indicated contact and emits an event
  310. func (cp *cwtchPeer) SetContactAttribute(onion string, key string, val string) {
  311. if contact, ok := cp.Profile.GetContact(onion); ok {
  312. contact.SetAttribute(key, val)
  313. cp.eventBus.Publish(event.NewEvent(event.SetPeerAttribute, map[event.Field]string{
  314. event.RemotePeer: onion,
  315. event.Key: key,
  316. event.Data: val,
  317. }))
  318. }
  319. }
  320. // GetContactAttribute gets an attribute for the indicated contact
  321. func (cp *cwtchPeer) GetContactAttribute(onion string, key string) (string, bool) {
  322. if contact, ok := cp.Profile.GetContact(onion); ok {
  323. return contact.GetAttribute(key)
  324. }
  325. return "", false
  326. }
  327. // SetGroupAttribute sets an attribute for the indicated group and emits an event
  328. func (cp *cwtchPeer) SetGroupAttribute(gid string, key string, val string) {
  329. if group := cp.Profile.GetGroup(gid); group != nil {
  330. group.SetAttribute(key, val)
  331. cp.eventBus.Publish(event.NewEvent(event.SetGroupAttribute, map[event.Field]string{
  332. event.GroupID: gid,
  333. event.Key: key,
  334. event.Data: val,
  335. }))
  336. }
  337. }
  338. // GetGroupAttribute gets an attribute for the indicated group
  339. func (cp *cwtchPeer) GetGroupAttribute(gid string, key string) (string, bool) {
  340. if group := cp.Profile.GetGroup(gid); group != nil {
  341. return group.GetAttribute(key)
  342. }
  343. return "", false
  344. }
  345. // Shutdown kills all connections and cleans up all goroutines for the peer
  346. func (cp *cwtchPeer) Shutdown() {
  347. cp.shutdown = true
  348. cp.queue.Shutdown()
  349. }
  350. // eventHandler process events from other subsystems
  351. func (cp *cwtchPeer) eventHandler() {
  352. for {
  353. ev := cp.queue.Next()
  354. switch ev.EventType {
  355. /***** Default auto handled events *****/
  356. case event.EncryptedGroupMessage:
  357. // If successful, a side effect is the message is added to the group's timeline
  358. ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
  359. if ok && !seen {
  360. cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
  361. }
  362. case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
  363. ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
  364. cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
  365. case event.PeerAcknowledgement:
  366. cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
  367. case event.SendMessageToGroupError:
  368. cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error])
  369. case event.SendMessageToPeerError:
  370. cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
  371. /***** Non default but requestable handlable events *****/
  372. case event.NewGroupInvite:
  373. cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
  374. case event.PeerStateChange:
  375. if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists {
  376. cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
  377. }
  378. case event.ServerStateChange:
  379. for _, group := range cp.Profile.Groups {
  380. if group.GroupServer == ev.Data[event.GroupServer] {
  381. group.State = ev.Data[event.ConnectionState]
  382. }
  383. }
  384. default:
  385. if ev.EventType != "" {
  386. log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
  387. }
  388. return
  389. }
  390. }
  391. }