Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

379 lines
13KB

  1. package peer
  2. import (
  3. "crypto/rsa"
  4. "cwtch.im/cwtch/model"
  5. "cwtch.im/cwtch/peer/connections"
  6. "cwtch.im/cwtch/peer/peer"
  7. "cwtch.im/cwtch/protocol"
  8. "encoding/base64"
  9. "errors"
  10. "fmt"
  11. "git.openprivacy.ca/openprivacy/libricochet-go/application"
  12. "git.openprivacy.ca/openprivacy/libricochet-go/channels"
  13. "git.openprivacy.ca/openprivacy/libricochet-go/connection"
  14. "git.openprivacy.ca/openprivacy/libricochet-go/identity"
  15. "git.openprivacy.ca/openprivacy/libricochet-go/utils"
  16. "github.com/golang/protobuf/proto"
  17. "golang.org/x/crypto/ed25519"
  18. "log"
  19. "strings"
  20. "sync"
  21. )
  22. // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
  23. type cwtchPeer struct {
  24. connection.AutoConnectionHandler
  25. Profile *model.Profile
  26. app *application.RicochetApplication
  27. mutex sync.Mutex
  28. connectionsManager *connections.Manager
  29. dataHandler func(string, []byte) []byte
  30. //handlers map[string]func(*application.ApplicationInstance) func() channels.Handler
  31. aif application.ApplicationInstanceFactory
  32. }
  33. // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
  34. // directly implement a cwtchPeer.
  35. type CwtchPeer interface {
  36. Init()
  37. PeerWithOnion(string) *connections.PeerPeerConnection
  38. InviteOnionToGroup(string, string) error
  39. TrustPeer(string) error
  40. BlockPeer(string) error
  41. AcceptInvite(string) error
  42. RejectInvite(string)
  43. JoinServer(string)
  44. SendMessageToGroup(string, string) error
  45. GetProfile() *model.Profile
  46. GetPeers() map[string]connections.ConnectionState
  47. GetServers() map[string]connections.ConnectionState
  48. StartGroup(string) (string, []byte, error)
  49. ImportGroup(string) (string, error)
  50. ExportGroup(string) (string, error)
  51. GetGroup(string) *model.Group
  52. GetGroups() []string
  53. GetContacts() []string
  54. GetContact(string) *model.PublicProfile
  55. SetApplicationInstanceFactory(factory application.ApplicationInstanceFactory)
  56. SetPeerDataHandler(func(string, []byte) []byte)
  57. Listen() error
  58. Shutdown()
  59. }
  60. // NewCwtchPeer creates and returns a new cwtchPeer with the given name.
  61. func NewCwtchPeer(name string) CwtchPeer {
  62. cp := new(cwtchPeer)
  63. cp.Profile = model.GenerateNewProfile(name)
  64. cp.Init()
  65. return cp
  66. }
  67. // FromProfile generates a new peer from a profile.
  68. func FromProfile(profile *model.Profile) CwtchPeer {
  69. cp := new(cwtchPeer)
  70. cp.Profile = profile
  71. cp.Init()
  72. return cp
  73. }
  74. // Init instantiates a cwtchPeer
  75. func (cp *cwtchPeer) Init() {
  76. cp.connectionsManager = connections.NewConnectionsManager()
  77. go cp.connectionsManager.AttemptReconnections()
  78. }
  79. // ImportGroup intializes a group from an imported source rather than a peer invite
  80. func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err error) {
  81. if strings.HasPrefix(exportedInvite, "torv3") {
  82. data, err := base64.StdEncoding.DecodeString(exportedInvite[5+44:])
  83. if err == nil {
  84. cpp := &protocol.CwtchPeerPacket{}
  85. err := proto.Unmarshal(data, cpp)
  86. if err == nil {
  87. pk, err := base64.StdEncoding.DecodeString(exportedInvite[5 : 5+44])
  88. if err == nil {
  89. edpk := ed25519.PublicKey(pk)
  90. onion := utils.GetTorV3Hostname(edpk)
  91. cp.Profile.AddContact(onion, &model.PublicProfile{Name: "", Ed25519PublicKey: edpk, Trusted: true, Blocked: false, Onion: onion})
  92. cp.Profile.ProcessInvite(cpp.GetGroupChatInvite(), onion)
  93. return cpp.GroupChatInvite.GetGroupName(), nil
  94. }
  95. }
  96. }
  97. } else {
  98. err = errors.New("unsupported exported group type")
  99. }
  100. return
  101. }
  102. // a handler for the optional data handler
  103. // note that the "correct" way to do this would be to AddChannelHandler("im.cwtch.peerdata", ...") but peerdata is such
  104. // a handy channel that it's nice to have this convenience shortcut
  105. // SetPeerDataHandler sets the handler for the (optional) data channel for cwtch peers.
  106. func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
  107. cp.dataHandler = dataHandler
  108. }
  109. // add extra channel handlers (note that the peer will merge these with the ones necessary to make cwtch work, so you
  110. // are not clobbering the underlying functionality)
  111. func (cp *cwtchPeer) SetApplicationInstanceFactory(aif application.ApplicationInstanceFactory) {
  112. cp.aif = aif
  113. }
  114. // ExportGroup serializes a group invite so it can be given offline
  115. func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
  116. group := cp.Profile.GetGroupByGroupID(groupID)
  117. if group != nil {
  118. invite, err := group.Invite(group.GetInitialMessage())
  119. if err == nil {
  120. exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(cp.Profile.Ed25519PublicKey) + base64.StdEncoding.EncodeToString(invite)
  121. return exportedInvite, err
  122. }
  123. }
  124. return "", errors.New("group id could not be found")
  125. }
  126. // StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
  127. func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) {
  128. return cp.Profile.StartGroup(server)
  129. }
  130. // StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error.
  131. func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (string, []byte, error) {
  132. return cp.Profile.StartGroupWithMessage(server, initialMessage)
  133. }
  134. // GetGroups returns an unordered list of all group IDs.
  135. func (cp *cwtchPeer) GetGroups() []string {
  136. return cp.Profile.GetGroups()
  137. }
  138. // GetGroup returns a pointer to a specific group, nil if no group exists.
  139. func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
  140. return cp.Profile.GetGroupByGroupID(groupID)
  141. }
  142. // GetContacts returns an unordered list of onions
  143. func (cp *cwtchPeer) GetContacts() []string {
  144. return cp.Profile.GetContacts()
  145. }
  146. // GetContact returns a given contact, nil is no such contact exists
  147. func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
  148. contact, _ := cp.Profile.GetContact(onion)
  149. return contact
  150. }
  151. // GetProfile returns the profile associated with this cwtchPeer.
  152. // TODO While it is probably "safe", it is not really "safe", to call functions on this profile. This only exists to return things like Name and Onion,we should gate these.
  153. func (cp *cwtchPeer) GetProfile() *model.Profile {
  154. return cp.Profile
  155. }
  156. // PeerWithOnion is the entry point for cwtchPeer relationships
  157. func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
  158. return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler, cp.aif)
  159. }
  160. // InviteOnionToGroup kicks off the invite process
  161. func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
  162. group := cp.Profile.GetGroupByGroupID(groupid)
  163. if group != nil {
  164. log.Printf("Constructing invite for group: %v\n", group)
  165. invite, err := group.Invite(group.GetInitialMessage())
  166. if err != nil {
  167. return err
  168. }
  169. ppc := cp.connectionsManager.GetPeerPeerConnectionForOnion(onion)
  170. if ppc == nil {
  171. return errors.New("peer connection not setup for onion. peers must be trusted before sending")
  172. }
  173. if ppc.GetState() == connections.AUTHENTICATED {
  174. log.Printf("Got connection for group: %v - Sending Invite\n", ppc)
  175. ppc.SendGroupInvite(invite)
  176. } else {
  177. return errors.New("cannot send invite to onion: peer connection is not ready")
  178. }
  179. return nil
  180. }
  181. return errors.New("group id could not be found")
  182. }
  183. // ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
  184. func (cp *cwtchPeer) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
  185. cp.Profile.AttemptDecryption(gm.Ciphertext, gm.Signature)
  186. }
  187. // JoinServer manages a new server connection with the given onion address
  188. func (cp *cwtchPeer) JoinServer(onion string) {
  189. cp.connectionsManager.ManageServerConnection(onion, cp.ReceiveGroupMessage)
  190. }
  191. // SendMessageToGroup attemps to sent the given message to the given group id.
  192. func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
  193. group := cp.Profile.GetGroupByGroupID(groupid)
  194. if group == nil {
  195. return errors.New("group does not exist")
  196. }
  197. psc := cp.connectionsManager.GetPeerServerConnectionForOnion(group.GroupServer)
  198. if psc == nil {
  199. return errors.New("could not find server connection to send message to")
  200. }
  201. ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
  202. if err != nil {
  203. return err
  204. }
  205. gm := &protocol.GroupMessage{
  206. Ciphertext: ct,
  207. Signature: sig,
  208. }
  209. err = psc.SendGroupMessage(gm)
  210. return err
  211. }
  212. // GetPeers returns a list of peer connections.
  213. func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState {
  214. return cp.connectionsManager.GetPeers()
  215. }
  216. // GetServers returns a list of server connections
  217. func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState {
  218. return cp.connectionsManager.GetServers()
  219. }
  220. // TrustPeer sets an existing peer relationship to trusted
  221. func (cp *cwtchPeer) TrustPeer(peer string) error {
  222. err := cp.Profile.TrustPeer(peer)
  223. if err == nil {
  224. cp.PeerWithOnion(peer)
  225. }
  226. return err
  227. }
  228. // BlockPeer blocks an existing peer relationship.
  229. func (cp *cwtchPeer) BlockPeer(peer string) error {
  230. err := cp.Profile.BlockPeer(peer)
  231. cp.connectionsManager.ClosePeerConnection(peer)
  232. return err
  233. }
  234. // AcceptInvite accepts a given existing group invite
  235. func (cp *cwtchPeer) AcceptInvite(groupID string) error {
  236. return cp.Profile.AcceptInvite(groupID)
  237. }
  238. // RejectInvite rejects a given group invite.
  239. func (cp *cwtchPeer) RejectInvite(groupID string) {
  240. cp.Profile.RejectInvite(groupID)
  241. }
  242. // LookupContact returns that a contact is known and allowed to communicate for all cases.
  243. func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
  244. blocked := cp.Profile.IsBlocked(hostname)
  245. return !blocked, true
  246. }
  247. // LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
  248. func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
  249. blocked := cp.Profile.IsBlocked(hostname)
  250. return !blocked, true
  251. }
  252. // ContactRequest needed to implement ContactRequestHandler Interface
  253. func (cp *cwtchPeer) ContactRequest(name string, message string) string {
  254. return "Accepted"
  255. }
  256. // Listen sets up an onion listener to process incoming cwtch messages
  257. func (cp *cwtchPeer) Listen() error {
  258. cwtchpeer := new(application.RicochetApplication)
  259. l, err := application.SetupOnionV3("127.0.0.1:9051", "tcp4", "", cp.Profile.Ed25519PrivateKey, cp.GetProfile().Onion, 9878)
  260. if err != nil && fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision" {
  261. return err
  262. }
  263. af := application.ApplicationInstanceFactory{}
  264. af.Init()
  265. af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler {
  266. cpi := new(CwtchPeerInstance)
  267. cpi.Init(rai, cwtchpeer)
  268. return func() channels.Handler {
  269. cpc := new(peer.CwtchPeerChannel)
  270. cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp}
  271. return cpc
  272. }
  273. })
  274. if cp.dataHandler != nil {
  275. af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
  276. cpi := new(CwtchPeerInstance)
  277. cpi.Init(rai, cwtchpeer)
  278. return func() channels.Handler {
  279. cpc := new(peer.CwtchPeerDataChannel)
  280. cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler}
  281. return cpc
  282. }
  283. })
  284. }
  285. handlers := cp.aif.GetHandlers()
  286. for i := range handlers {
  287. af.AddHandler(handlers[i], cp.aif.GetHandler(handlers[i]))
  288. }
  289. cwtchpeer.InitV3(cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp)
  290. log.Printf("Running cwtch peer on %v", l.Addr().String())
  291. cp.app = cwtchpeer
  292. cwtchpeer.Run(l)
  293. return nil
  294. }
  295. // Shutdown kills all connections and cleans up all goroutines for the peer
  296. func (cp *cwtchPeer) Shutdown() {
  297. cp.connectionsManager.Shutdown()
  298. if cp.app != nil {
  299. cp.app.Shutdown()
  300. }
  301. }
  302. // CwtchPeerInstance encapsulates incoming peer connections
  303. type CwtchPeerInstance struct {
  304. rai *application.ApplicationInstance
  305. ra *application.RicochetApplication
  306. }
  307. // Init sets up a CwtchPeerInstance
  308. func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) {
  309. cpi.rai = rai
  310. cpi.ra = ra
  311. }
  312. // CwtchPeerHandler encapsulates handling of incoming CwtchPackets
  313. type CwtchPeerHandler struct {
  314. Onion string
  315. Peer *cwtchPeer
  316. DataHandler func(string, []byte) []byte
  317. }
  318. // HandleGroupInvite handles incoming GroupInvites
  319. func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
  320. log.Printf("Received GroupID from %v %v\n", cph.Onion, gci.String())
  321. cph.Peer.Profile.ProcessInvite(gci, cph.Onion)
  322. }
  323. // HandlePacket handles the Cwtch cwtchPeer Data Channel
  324. func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
  325. return cph.DataHandler(cph.Onion, data)
  326. }