cwtch_peer.go 12 KB


  1. package peer
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/model"
  5. "cwtch.im/cwtch/protocol"
  6. "cwtch.im/cwtch/protocol/connections"
  7. "encoding/base32"
  8. "encoding/base64"
  9. "encoding/json"
  10. "errors"
  11. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  12. "github.com/golang/protobuf/proto"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
  18. type cwtchPeer struct {
  19. Profile *model.Profile
  20. mutex sync.Mutex
  21. shutdown bool
  22. started bool
  23. queue *event.Queue
  24. eventBus event.Manager
  25. }
  26. // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
  27. // directly implement a cwtchPeer.
  28. type CwtchPeer interface {
  29. Init(event.Manager)
  30. PeerWithOnion(string) *connections.PeerPeerConnection
  31. InviteOnionToGroup(string, string) error
  32. SendMessageToPeer(string, string) string
  33. TrustPeer(string) error
  34. BlockPeer(string) error
  35. AcceptInvite(string) error
  36. RejectInvite(string)
  37. JoinServer(string)
  38. SendMessageToGroup(string, string) error
  39. SendMessageToGroupTracked(string, string) (string, error)
  40. GetProfile() *model.Profile
  41. GetPeerState(string) connections.ConnectionState
  42. StartGroup(string) (string, []byte, error)
  43. ImportGroup(string) (string, error)
  44. ExportGroup(string) (string, error)
  45. GetGroup(string) *model.Group
  46. GetGroupState(string) connections.ConnectionState
  47. GetGroups() []string
  48. AddContact(nick, onion string, trusted bool)
  49. GetContacts() []string
  50. GetContact(string) *model.PublicProfile
  51. IsStarted() bool
  52. Listen()
  53. StartPeersConnections()
  54. StartGroupConnections()
  55. Shutdown()
  56. }
  57. // NewCwtchPeer creates and returns a new cwtchPeer with the given name.
  58. func NewCwtchPeer(name string) CwtchPeer {
  59. cp := new(cwtchPeer)
  60. cp.Profile = model.GenerateNewProfile(name)
  61. cp.shutdown = false
  62. return cp
  63. }
  64. // FromProfile generates a new peer from a profile.
  65. func FromProfile(profile *model.Profile) CwtchPeer {
  66. cp := new(cwtchPeer)
  67. cp.Profile = profile
  68. return cp
  69. }
  70. // Init instantiates a cwtchPeer
  71. func (cp *cwtchPeer) Init(eventBus event.Manager) {
  72. cp.queue = event.NewEventQueue(100)
  73. go cp.eventHandler()
  74. cp.eventBus = eventBus
  75. cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel)
  76. cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel)
  77. cp.eventBus.Subscribe(event.ServerStateChange, cp.queue.EventChannel)
  78. cp.eventBus.Subscribe(event.PeerStateChange, cp.queue.EventChannel)
  79. }
  80. // ImportGroup intializes a group from an imported source rather than a peer invite
  81. func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err error) {
  82. if strings.HasPrefix(exportedInvite, "torv3") {
  83. data, err := base64.StdEncoding.DecodeString(exportedInvite[5+44:])
  84. if err == nil {
  85. cpp := &protocol.CwtchPeerPacket{}
  86. err = proto.Unmarshal(data, cpp)
  87. if err == nil {
  88. jsobj, err := proto.Marshal(cpp.GetGroupChatInvite())
  89. if err == nil {
  90. cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{
  91. event.GroupInvite: string(jsobj),
  92. }))
  93. } else {
  94. log.Errorf("error serializing group: %v", err)
  95. }
  96. return cpp.GroupChatInvite.GetGroupName(), nil
  97. }
  98. }
  99. } else {
  100. err = errors.New("unsupported exported group type")
  101. }
  102. return
  103. }
  104. // ExportGroup serializes a group invite so it can be given offline
  105. func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
  106. group := cp.Profile.GetGroupByGroupID(groupID)
  107. if group != nil {
  108. invite, err := group.Invite(group.GetInitialMessage())
  109. if err == nil {
  110. exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(cp.Profile.Ed25519PublicKey) + base64.StdEncoding.EncodeToString(invite)
  111. return exportedInvite, err
  112. }
  113. }
  114. return "", errors.New("group id could not be found")
  115. }
  116. // StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
  117. func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) {
  118. return cp.StartGroupWithMessage(server, []byte{})
  119. }
  120. // StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error.
  121. func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) {
  122. groupID, invite, err = cp.Profile.StartGroupWithMessage(server, initialMessage)
  123. if err == nil {
  124. group := cp.GetGroup(groupID)
  125. jsobj, err := json.Marshal(group)
  126. if err == nil {
  127. cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
  128. event.Data: string(jsobj),
  129. }))
  130. }
  131. } else {
  132. log.Errorf("error creating group: %v", err)
  133. }
  134. return
  135. }
  136. // GetGroups returns an unordered list of all group IDs.
  137. func (cp *cwtchPeer) GetGroups() []string {
  138. return cp.Profile.GetGroups()
  139. }
  140. // GetGroup returns a pointer to a specific group, nil if no group exists.
  141. func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
  142. return cp.Profile.GetGroupByGroupID(groupID)
  143. }
  144. func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) {
  145. decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
  146. pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}}
  147. cp.Profile.AddContact(onion, pp)
  148. pd, _ := json.Marshal(pp)
  149. cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
  150. event.Data: string(pd),
  151. event.RemotePeer: onion,
  152. }))
  153. }
  154. // GetContacts returns an unordered list of onions
  155. func (cp *cwtchPeer) GetContacts() []string {
  156. return cp.Profile.GetContacts()
  157. }
  158. // GetContact returns a given contact, nil is no such contact exists
  159. func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
  160. contact, _ := cp.Profile.GetContact(onion)
  161. return contact
  162. }
  163. // GetProfile returns the profile associated with this cwtchPeer.
  164. func (cp *cwtchPeer) GetProfile() *model.Profile {
  165. return cp.Profile
  166. }
  167. func (cp *cwtchPeer) GetPeerState(onion string) connections.ConnectionState {
  168. return connections.ConnectionStateType[cp.Profile.Contacts[onion].State]
  169. }
  170. func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState {
  171. return connections.ConnectionStateType[cp.Profile.Groups[groupid].State]
  172. }
  173. // PeerWithOnion is the entry point for cwtchPeer relationships
  174. func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
  175. cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
  176. return nil
  177. }
  178. // InviteOnionToGroup kicks off the invite process
  179. func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
  180. group := cp.Profile.GetGroupByGroupID(groupid)
  181. if group == nil {
  182. return errors.New("invalid group id")
  183. }
  184. invite, err := group.Invite(group.InitialMessage)
  185. if err == nil {
  186. cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[event.Field]string{event.RemotePeer: onion, event.GroupInvite: string(invite)}))
  187. }
  188. return err
  189. }
  190. // JoinServer manages a new server connection with the given onion address
  191. func (cp *cwtchPeer) JoinServer(onion string) {
  192. cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion}))
  193. }
  194. // SendMessageToGroup attempts to sent the given message to the given group id.
  195. // TODO: Deprecate in favour of SendMessageToGroupTracked
  196. func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
  197. _, err := cp.SendMessageToGroupTracked(groupid, message)
  198. return err
  199. }
  200. // SendMessageToGroup attempts to sent the given message to the given group id.
  201. // It returns the signature of the message which can be used to identify it in any UX layer.
  202. func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
  203. group := cp.Profile.GetGroupByGroupID(groupid)
  204. if group == nil {
  205. return "", errors.New("invalid group id")
  206. }
  207. ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
  208. if err == nil {
  209. cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: string(ct), event.Signature: string(sig)}))
  210. }
  211. return string(sig), err
  212. }
  213. func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
  214. event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
  215. cp.eventBus.Publish(event)
  216. return event.EventID
  217. }
  218. // TrustPeer sets an existing peer relationship to trusted
  219. func (cp *cwtchPeer) TrustPeer(peer string) error {
  220. err := cp.Profile.TrustPeer(peer)
  221. if err == nil {
  222. cp.PeerWithOnion(peer)
  223. }
  224. return err
  225. }
  226. // BlockPeer blocks an existing peer relationship.
  227. func (cp *cwtchPeer) BlockPeer(peer string) error {
  228. err := cp.Profile.BlockPeer(peer)
  229. cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[event.Field]string{event.RemotePeer: peer}))
  230. return err
  231. }
  232. // AcceptInvite accepts a given existing group invite
  233. func (cp *cwtchPeer) AcceptInvite(groupID string) error {
  234. err := cp.Profile.AcceptInvite(groupID)
  235. if err != nil {
  236. return err
  237. }
  238. cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
  239. cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
  240. return nil
  241. }
  242. // RejectInvite rejects a given group invite.
  243. func (cp *cwtchPeer) RejectInvite(groupID string) {
  244. cp.Profile.RejectInvite(groupID)
  245. }
  246. // Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
  247. func (cp *cwtchPeer) Listen() {
  248. cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{}))
  249. }
  250. // StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager)
  251. func (cp *cwtchPeer) StartPeersConnections() {
  252. for _, contact := range cp.GetContacts() {
  253. cp.PeerWithOnion(contact)
  254. }
  255. }
  256. // StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager)
  257. func (cp *cwtchPeer) StartGroupConnections() {
  258. joinedServers := map[string]bool{}
  259. for _, groupID := range cp.GetGroups() {
  260. // Only send a join server packet if we haven't joined this server yet...
  261. group := cp.GetGroup(groupID)
  262. if joined := joinedServers[groupID]; group.Accepted && !joined {
  263. log.Infof("Join Server %v (%v)\n", group.GroupServer, joined)
  264. cp.JoinServer(group.GroupServer)
  265. joinedServers[group.GroupServer] = true
  266. }
  267. }
  268. }
  269. // Shutdown kills all connections and cleans up all goroutines for the peer
  270. func (cp *cwtchPeer) Shutdown() {
  271. cp.shutdown = true
  272. cp.queue.Shutdown()
  273. }
  274. // IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future
  275. func (cp *cwtchPeer) IsStarted() bool {
  276. return cp.started
  277. }
  278. // eventHandler process events from other subsystems
  279. func (cp *cwtchPeer) eventHandler() {
  280. for {
  281. ev := cp.queue.Next()
  282. switch ev.EventType {
  283. case event.EncryptedGroupMessage:
  284. ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
  285. if ok && !seen {
  286. cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
  287. }
  288. case event.NewGroupInvite:
  289. var groupInvite protocol.GroupChatInvite
  290. err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite)
  291. if err != nil {
  292. log.Errorf("NewGroupInvite could not json decode invite: %v\n", err)
  293. }
  294. cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer])
  295. case event.PeerStateChange:
  296. if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists {
  297. cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
  298. }
  299. case event.ServerStateChange:
  300. for _, group := range cp.Profile.Groups {
  301. if group.GroupServer == ev.Data[event.GroupServer] {
  302. group.State = ev.Data[event.ConnectionState]
  303. }
  304. }
  305. default:
  306. if ev.EventType != "" {
  307. log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
  308. }
  309. return
  310. }
  311. }
  312. }