Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

503 lines
18 KiB

  1. package testing
  2. import (
  3. "crypto/rand"
  4. app2 "cwtch.im/cwtch/app"
  5. "cwtch.im/cwtch/app/utils"
  6. "cwtch.im/cwtch/event"
  7. "cwtch.im/cwtch/event/bridge"
  8. "cwtch.im/cwtch/model"
  9. "cwtch.im/cwtch/model/attr"
  10. "cwtch.im/cwtch/peer"
  11. "cwtch.im/cwtch/protocol/connections"
  12. cwtchserver "cwtch.im/cwtch/server"
  13. "encoding/base64"
  14. "encoding/json"
  15. "fmt"
  16. "git.openprivacy.ca/openprivacy/connectivity/tor"
  17. "git.openprivacy.ca/openprivacy/log"
  18. "golang.org/x/net/proxy"
  19. mrand "math/rand"
  20. "os"
  21. "os/user"
  22. "path"
  23. "runtime"
  24. "runtime/pprof"
  25. "testing"
  26. "time"
  27. )
  28. const (
  29. serverKeyfile = "./../server/app/private_key"
  30. localKeyfile = "./private_key"
  31. )
  32. var (
  33. aliceLines = []string{"Hello, I'm Alice", "bye"}
  34. bobLines = []string{"Hi, my name is Bob.", "toodles", "welcome"}
  35. carolLines = []string{"Howdy, thanks!"}
  36. )
  37. func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
  38. numVerified := 0
  39. for _, message := range timeline {
  40. fmt.Printf("%v %v> %s\n", message.Timestamp, message.PeerID, message.Message)
  41. numVerified++
  42. }
  43. return numVerified
  44. }
  45. func serverCheck(t *testing.T, serverAddr string) bool {
  46. torDialer, err := proxy.SOCKS5("tcp", "127.0.0.1:9050", nil, proxy.Direct)
  47. if err != nil {
  48. t.Logf("Could not get SOCKS5 proxy: %v", err)
  49. return false
  50. }
  51. // Doesn't seem to be a way to turn the default timeout of 2 minutes down
  52. conn, err := torDialer.Dial("tcp", serverAddr+".onion:9878")
  53. if err != nil {
  54. t.Logf("Could not dial %v: %v", serverAddr, err)
  55. return false
  56. }
  57. conn.Close()
  58. return true
  59. }
  60. func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
  61. for {
  62. fmt.Printf("%v checking group connection...\n", peer.GetName())
  63. state, ok := peer.GetGroupState(groupID)
  64. if ok {
  65. fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state)
  66. if state == connections.FAILED {
  67. t.Fatalf("%v could not connect to %v", peer.GetOnion(), groupID)
  68. }
  69. if state != connections.SYNCED {
  70. fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peer.GetName(), peer.GetOnion(), groupID, connections.ConnectionStateName[state])
  71. time.Sleep(time.Second * 5)
  72. continue
  73. } else {
  74. fmt.Printf("peer %v %v CONNECTED to group %v\n", peer.GetName(), peer.GetOnion(), groupID)
  75. break
  76. }
  77. }
  78. }
  79. return
  80. }
  81. func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) {
  82. for {
  83. state, ok := peera.GetPeerState(peerb.GetOnion())
  84. if ok {
  85. //log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state)
  86. if state == connections.FAILED {
  87. t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion())
  88. }
  89. if state != connections.AUTHENTICATED {
  90. fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state])
  91. time.Sleep(time.Second * 5)
  92. continue
  93. } else {
  94. fmt.Printf("%v CONNECTED and AUTHED to %v\n", peera.GetName(), peerb.GetName())
  95. break
  96. }
  97. }
  98. }
  99. return
  100. }
  101. func TestCwtchPeerIntegration(t *testing.T) {
  102. numGoRoutinesStart := runtime.NumGoroutine()
  103. log.AddEverythingFromPattern("connectivity")
  104. log.SetLevel(log.LevelDebug)
  105. log.ExcludeFromPattern("connection/connection")
  106. log.ExcludeFromPattern("outbound/3dhauthchannel")
  107. log.ExcludeFromPattern("event/eventmanager")
  108. log.ExcludeFromPattern("pipeBridge")
  109. log.ExcludeFromPattern("tapir")
  110. os.Mkdir("tordir",0700)
  111. dataDir := path.Join("tordir", "tor")
  112. os.MkdirAll(dataDir, 0700)
  113. // we don't need real randomness for the port, just to avoid a possible conflict...
  114. mrand.Seed(int64(time.Now().Nanosecond()))
  115. socksPort := mrand.Intn(1000)+9051
  116. controlPort := mrand.Intn(1000)+9052
  117. // generate a random password
  118. key := make([]byte, 64)
  119. _, err := rand.Read(key)
  120. if err != nil {
  121. panic(err)
  122. }
  123. tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
  124. acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
  125. if err != nil {
  126. t.Fatalf("Could not start Tor: %v", err)
  127. }
  128. pid,err := acn.GetPID()
  129. t.Logf("Tor pid: %v", pid)
  130. // ***** Cwtch Server management *****
  131. var server *cwtchserver.Server
  132. serverOnline := false
  133. var serverAddr string
  134. var serverKeyBundle []byte
  135. if !serverOnline {
  136. // launch app with new key
  137. fmt.Println("No server found!")
  138. server = new(cwtchserver.Server)
  139. fmt.Println("Starting cwtch server...")
  140. os.Remove("server-test.json")
  141. config := cwtchserver.LoadConfig(".", "server-test.json")
  142. identity := config.Identity()
  143. serverAddr = identity.Hostname()
  144. server.Setup(config)
  145. serverKeyBundle, _ = json.Marshal(server.KeyBundle())
  146. log.Debugf("server key bundle %s", serverKeyBundle)
  147. go server.Run(acn)
  148. // let tor get established
  149. fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr)
  150. } else {
  151. fmt.Printf("Found existing cwtch server %v, using for tests...\n", serverAddr)
  152. }
  153. numGoRoutinesPostServer := runtime.NumGoroutine()
  154. app := app2.NewApp(acn, "./storage")
  155. usr, _ := user.Current()
  156. cwtchDir := path.Join(usr.HomeDir, ".cwtch")
  157. os.Mkdir(cwtchDir, 0700)
  158. os.RemoveAll(path.Join(cwtchDir, "testing"))
  159. os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
  160. bridgeClient := bridge.NewPipeBridgeClient(path.Join(cwtchDir, "testing/clientPipe"), path.Join(cwtchDir, "testing/servicePipe"))
  161. bridgeService := bridge.NewPipeBridgeService(path.Join(cwtchDir, "testing/servicePipe"), path.Join(cwtchDir, "testing/clientPipe"))
  162. appClient := app2.NewAppClient("./storage", bridgeClient)
  163. appService := app2.NewAppService(acn, "./storage", bridgeService)
  164. numGoRoutinesPostAppStart := runtime.NumGoroutine()
  165. // ***** cwtchPeer setup *****
  166. fmt.Println("Creating Alice...")
  167. app.CreatePeer("alice", "asdfasdf")
  168. fmt.Println("Creating Bob...")
  169. app.CreatePeer("bob", "asdfasdf")
  170. fmt.Println("Creating Carol...")
  171. appClient.CreatePeer("carol", "asdfasdf")
  172. alice := utils.WaitGetPeer(app, "alice")
  173. fmt.Println("Alice created:", alice.GetOnion())
  174. alice.SetAttribute(attr.GetPublicScope("name"), "Alice")
  175. alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
  176. bob := utils.WaitGetPeer(app, "bob")
  177. fmt.Println("Bob created:", bob.GetOnion())
  178. bob.SetAttribute(attr.GetPublicScope("name"), "Bob")
  179. bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
  180. carol := utils.WaitGetPeer(appClient, "carol")
  181. fmt.Println("Carol created:", carol.GetOnion())
  182. carol.SetAttribute(attr.GetPublicScope("name"), "Carol")
  183. carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
  184. app.LaunchPeers()
  185. appClient.LaunchPeers()
  186. waitTime := time.Duration(60) * time.Second
  187. t.Logf("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime)
  188. time.Sleep(waitTime)
  189. numGoRoutinesPostPeerStart := runtime.NumGoroutine()
  190. fmt.Println("** Wait Done!")
  191. // ***** Peering, server joining, group creation / invite *****
  192. fmt.Println("Alice joining server...")
  193. if err := alice.AddServer(string(serverKeyBundle)); err != nil {
  194. t.Fatalf("Failed to Add Server Bundle %v", err)
  195. }
  196. alice.JoinServer(serverAddr)
  197. fmt.Println("Alice peering with Bob...")
  198. alice.PeerWithOnion(bob.GetOnion())
  199. fmt.Println("Alice peering with Carol...")
  200. alice.PeerWithOnion(carol.GetOnion())
  201. fmt.Println("Creating group on ", serverAddr, "...")
  202. groupID, _, err := alice.StartGroup(serverAddr)
  203. fmt.Printf("Created group: %v!\n", groupID)
  204. if err != nil {
  205. t.Errorf("Failed to init group: %v", err)
  206. return
  207. }
  208. fmt.Println("Waiting for alice to join server...")
  209. waitForPeerGroupConnection(t, alice, groupID)
  210. fmt.Println("Waiting for alice and Bob to peer...")
  211. waitForPeerPeerConnection(t, alice, bob)
  212. // Need to add contact else SetContactAuth fails on peer peer doesnt exist
  213. // Normal flow would be Bob app monitors for the new connection (a new connection state change to Auth
  214. // and the adds the user to peer, and then approves or blocks it
  215. bob.AddContact("alice?", alice.GetOnion(), model.AuthApproved)
  216. bob.AddServer(string(serverKeyBundle))
  217. bob.SetContactAuthorization(alice.GetOnion(), model.AuthApproved)
  218. waitForPeerPeerConnection(t, alice, carol)
  219. carol.AddContact("alice?", alice.GetOnion(), model.AuthApproved)
  220. carol.AddServer(string(serverKeyBundle))
  221. carol.SetContactAuthorization(alice.GetOnion(), model.AuthApproved)
  222. fmt.Println("Alice and Bob getVal public.name...")
  223. alice.SendGetValToPeer(bob.GetOnion(), attr.PublicScope, "name")
  224. bob.SendGetValToPeer(alice.GetOnion(), attr.PublicScope, "name")
  225. alice.SendGetValToPeer(carol.GetOnion(), attr.PublicScope, "name")
  226. carol.SendGetValToPeer(alice.GetOnion(), attr.PublicScope, "name")
  227. time.Sleep(10 * time.Second)
  228. aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope("name"))
  229. if !exists || aliceName != "Alice" {
  230. t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v\n", exists)
  231. }
  232. fmt.Printf("Bob has alice's name as '%v'\n", aliceName)
  233. bobName, exists := alice.GetContactAttribute(bob.GetOnion(), attr.GetPeerScope("name"))
  234. if !exists || bobName != "Bob" {
  235. t.Fatalf("Alice: bob GetKeyVal error on bob peer.name\n")
  236. }
  237. fmt.Printf("Alice has bob's name as '%v'\n", bobName)
  238. aliceName, exists = carol.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope("name"))
  239. if !exists || aliceName != "Alice" {
  240. t.Fatalf("carol GetKeyVal error for alice peer.name %v\n", exists)
  241. }
  242. carolName, exists := alice.GetContactAttribute(carol.GetOnion(), attr.GetPeerScope("name"))
  243. if !exists || carolName != "Carol" {
  244. t.Fatalf("alice GetKeyVal error, carol peer.name\n")
  245. }
  246. fmt.Printf("Alice has carol's name as '%v'\n", carolName)
  247. fmt.Println("Alice inviting Bob to group...")
  248. err = alice.InviteOnionToGroup(bob.GetOnion(), groupID)
  249. if err != nil {
  250. t.Fatalf("Error for Alice inviting Bob to group: %v", err)
  251. }
  252. time.Sleep(time.Second * 5)
  253. fmt.Println("Bob examining groups and accepting invites...")
  254. for _, groupID := range bob.GetGroups() {
  255. group := bob.GetGroup(groupID)
  256. fmt.Printf("Bob group: %v (Accepted: %v)\n", group.GroupID, group.Accepted)
  257. if group.Accepted == false {
  258. fmt.Printf("Bob received and accepting group invite: %v\n", group.GroupID)
  259. bob.AcceptInvite(group.GroupID)
  260. }
  261. }
  262. fmt.Println("Waiting for Bob to join connect to group server...")
  263. waitForPeerGroupConnection(t, bob, groupID)
  264. numGoRoutinesPostServerConnect := runtime.NumGoroutine()
  265. // ***** Conversation *****
  266. fmt.Println("Starting conversation in group...")
  267. // Conversation
  268. fmt.Printf("%v> %v\n", aliceName, aliceLines[0])
  269. err = alice.SendMessageToGroup(groupID, aliceLines[0])
  270. if err != nil {
  271. t.Fatalf("Alice failed to send a message to the group: %v", err)
  272. }
  273. time.Sleep(time.Second * 10)
  274. fmt.Printf("%v> %v\n", bobName, bobLines[0])
  275. err = bob.SendMessageToGroup(groupID, bobLines[0])
  276. if err != nil {
  277. t.Fatalf("Bob failed to send a message to the group: %v", err)
  278. }
  279. time.Sleep(time.Second * 10)
  280. fmt.Printf("%v> %v\n", aliceName, aliceLines[1])
  281. alice.SendMessageToGroup(groupID, aliceLines[1])
  282. time.Sleep(time.Second * 10)
  283. fmt.Printf("%v> %v\n", bobName, bobLines[1])
  284. bob.SendMessageToGroup(groupID, bobLines[1])
  285. time.Sleep(time.Second * 10)
  286. fmt.Println("Alice inviting Carol to group...")
  287. err = alice.InviteOnionToGroup(carol.GetOnion(), groupID)
  288. if err != nil {
  289. t.Fatalf("Error for Alice inviting Carol to group: %v", err)
  290. }
  291. time.Sleep(time.Second * 60) // Account for some token acquisition in Alice and Bob flows.
  292. fmt.Println("Carol examining groups and accepting invites...")
  293. for _, groupID := range carol.GetGroups() {
  294. group := carol.GetGroup(groupID)
  295. fmt.Printf("Carol group: %v (Accepted: %v)\n", group.GroupID, group.Accepted)
  296. if group.Accepted == false {
  297. fmt.Printf("Carol received and accepting group invite: %v\n", group.GroupID)
  298. carol.AcceptInvite(group.GroupID)
  299. }
  300. }
  301. fmt.Println("Shutting down Alice...")
  302. app.ShutdownPeer(alice.GetOnion())
  303. time.Sleep(time.Second * 5)
  304. numGoRoutinesPostAlice := runtime.NumGoroutine()
  305. fmt.Println("Carol joining server...")
  306. carol.JoinServer(serverAddr)
  307. waitForPeerGroupConnection(t, carol, groupID)
  308. numGoRotinesPostCarolConnect := runtime.NumGoroutine()
  309. fmt.Printf("%v> %v", bobName, bobLines[2])
  310. bob.SendMessageToGroup(groupID, bobLines[2])
  311. // Bob should have enough tokens so we don't need to account for
  312. // token acquisition here...
  313. fmt.Printf("%v> %v", carolName, carolLines[0])
  314. carol.SendMessageToGroup(groupID, carolLines[0])
  315. time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
  316. // be warmed-up and delays should be pretty small.
  317. // ***** Verify Test *****
  318. fmt.Println("Final syncing time...")
  319. time.Sleep(time.Second * 30)
  320. alicesGroup := alice.GetGroup(groupID)
  321. if alicesGroup == nil {
  322. t.Error("aliceGroup == nil")
  323. return
  324. }
  325. fmt.Printf("Alice's TimeLine:\n")
  326. aliceVerified := printAndCountVerifedTimeline(t, alicesGroup.GetTimeline())
  327. if aliceVerified != 4 {
  328. t.Errorf("Alice did not have 4 verified messages")
  329. }
  330. bobsGroup := bob.GetGroup(groupID)
  331. if bobsGroup == nil {
  332. t.Error("bobGroup == nil")
  333. return
  334. }
  335. fmt.Printf("Bob's TimeLine:\n")
  336. bobVerified := printAndCountVerifedTimeline(t, bobsGroup.GetTimeline())
  337. if bobVerified != 6 {
  338. t.Errorf("Bob did not have 6 verified messages")
  339. }
  340. carolsGroup := carol.GetGroup(groupID)
  341. fmt.Printf("Carol's TimeLine:\n")
  342. carolVerified := printAndCountVerifedTimeline(t, carolsGroup.GetTimeline())
  343. if carolVerified != 6 {
  344. t.Errorf("Carol did not have 6 verified messages")
  345. }
  346. if len(alicesGroup.GetTimeline()) != 4 {
  347. t.Errorf("Alice's timeline does not have all messages")
  348. } else {
  349. // check message 0,1,2,3
  350. aliceGroupTimeline := alicesGroup.GetTimeline()
  351. if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
  352. aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
  353. t.Errorf("Some of Alice's timeline messages did not have the expected content!")
  354. }
  355. }
  356. if len(bobsGroup.GetTimeline()) != 6 {
  357. t.Errorf("Bob's timeline does not have all messages")
  358. } else {
  359. // check message 0,1,2,3,4,5
  360. bobGroupTimeline := bobsGroup.GetTimeline()
  361. if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
  362. bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] ||
  363. bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
  364. t.Errorf("Some of Bob's timeline messages did not have the expected content!")
  365. }
  366. }
  367. if len(carolsGroup.GetTimeline()) != 6 {
  368. t.Errorf("Carol's timeline does not have all messages")
  369. } else {
  370. // check message 0,1,2,3,4,5
  371. carolGroupTimeline := carolsGroup.GetTimeline()
  372. if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
  373. carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
  374. carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
  375. t.Errorf("Some of Carol's timeline messages did not have the expected content!")
  376. }
  377. }
  378. fmt.Println("Shutting down Bob...")
  379. app.ShutdownPeer(bob.GetOnion())
  380. time.Sleep(time.Second * 3)
  381. numGoRoutinesPostBob := runtime.NumGoroutine()
  382. if server != nil {
  383. fmt.Println("Shutting down server...")
  384. server.Shutdown()
  385. time.Sleep(time.Second * 3)
  386. }
  387. numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
  388. fmt.Println("Shutting down Carol...")
  389. appClient.ShutdownPeer(carol.GetOnion())
  390. time.Sleep(time.Second * 3)
  391. numGoRoutinesPostCarol := runtime.NumGoroutine()
  392. fmt.Println("Shutting down apps...")
  393. fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine())
  394. app.Shutdown()
  395. fmt.Printf("appClientShutdown: %v\n", runtime.NumGoroutine())
  396. appClient.Shutdown()
  397. fmt.Printf("appServiceShutdown: %v\n", runtime.NumGoroutine())
  398. appService.Shutdown()
  399. fmt.Printf("bridgeClientShutdown: %v\n", runtime.NumGoroutine())
  400. bridgeClient.Shutdown()
  401. time.Sleep(2 * time.Second)
  402. fmt.Printf("brideServiceShutdown: %v\n", runtime.NumGoroutine())
  403. bridgeService.Shutdown()
  404. time.Sleep(2 * time.Second)
  405. fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine())
  406. numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
  407. fmt.Println("Shutting down ACN...")
  408. acn.Close()
  409. time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting
  410. time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
  411. numGoRoutinesPostACN := runtime.NumGoroutine()
  412. // Printing out the current goroutines
  413. // Very useful if we are leaking any.
  414. pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
  415. fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
  416. "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n",
  417. numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
  418. numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN)
  419. if numGoRoutinesStart != numGoRoutinesPostACN {
  420. t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN)
  421. }
  422. }