From 9bf0fdec56d0eeb0c6c44276eb27857b2e0a4cfe Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Tue, 5 Jun 2018 15:06:38 -0700 Subject: [PATCH] adding fetch checks to integration test with late joining user carol. protocol drop 'groupMessages' field peer/sever fetch channel reworked to send series of groupMessage rather than groupMessages blob in order to not blow up a ricochet packet minor doc fixes change a few fmt to log fix class variable name (cplc -> cpsc) in cwtchpeer listen channel PeerPeerConnection.Close close Conn rather than break to catch goroutine leak --- peer/connections/peerpeerconnection.go | 3 +- peer/cwtch_peer.go | 7 +- peer/fetch/peer_fetch_channel.go | 29 ++- peer/fetch/peer_fetch_channel_test.go | 7 +- peer/send/peer_send_channel.go | 42 ++-- protocol/group_message.pb.go | 8 - protocol/group_message.proto | 1 - server/fetch/server_fetch_channel.go | 15 +- server/server_instance.go | 2 +- .../cwtch_peer_server_intergration_test.go | 232 ++++++++++++------ 10 files changed, 218 insertions(+), 128 deletions(-) diff --git a/peer/connections/peerpeerconnection.go b/peer/connections/peerpeerconnection.go index 498bfb4..ecf12dd 100644 --- a/peer/connections/peerpeerconnection.go +++ b/peer/connections/peerpeerconnection.go @@ -107,7 +107,6 @@ func (ppc *PeerPeerConnection) Run() error { // Close closes the connection func (ppc *PeerPeerConnection) Close() { ppc.state = KILLED - ppc.connection.Break() - // TODO We should kill the connection outright, but we need to add that to libricochet-go + ppc.connection.Conn.Close() } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 8ef46e6..60b616b 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -8,7 +8,6 @@ import ( "cwtch.im/cwtch/protocol" "encoding/json" "errors" - "fmt" "github.com/s-rah/go-ricochet/application" "github.com/s-rah/go-ricochet/channels" "github.com/s-rah/go-ricochet/connection" @@ -86,7 +85,7 @@ func (cp *CwtchPeer) InviteOnionToGroup(onion string, groupid string) error { group := cp.Profile.GetGroupByGroupID(groupid) if group != nil { - fmt.Printf("Constructing invite for group: %v\n", group) + log.Printf("Constructing invite for group: %v\n", group) invite, err := group.Invite() if err != nil { return err @@ -96,7 +95,7 @@ func (cp *CwtchPeer) InviteOnionToGroup(onion string, groupid string) error { return errors.New("peer connection not setup for onion. peers must be trusted before sending") } if ppc.GetState() == connections.AUTHENTICATED { - fmt.Printf("Got connection for group: %v - Sending Invite\n", ppc) + log.Printf("Got connection for group: %v - Sending Invite\n", ppc) ppc.SendGroupInvite(invite) } else { return errors.New("cannot send invite to onion: peer connection is not ready") @@ -120,7 +119,7 @@ func (cp *CwtchPeer) JoinServer(onion string) { func (cp *CwtchPeer) SendMessageToGroup(groupid string, message string) error { group := cp.Profile.GetGroupByGroupID(groupid) if group == nil { - return errors.New("group does not exit") + return errors.New("group does not exist") } psc := cp.connectionsManager.GetPeerServerConnectionForOnion(group.GroupServer) if psc == nil { diff --git a/peer/fetch/peer_fetch_channel.go b/peer/fetch/peer_fetch_channel.go index 3c70a3a..2b934be 100644 --- a/peer/fetch/peer_fetch_channel.go +++ b/peer/fetch/peer_fetch_channel.go @@ -74,13 +74,18 @@ func (cpfc *CwtchPeerFetchChannel) OpenOutboundResult(err error, crm *Protocol_D } // FetchRequest sends a FetchMessage to the Server. -func (cpfc *CwtchPeerFetchChannel) FetchRequest() { - fm := &protocol.FetchMessage{} - csp := &protocol.CwtchServerPacket{ - FetchMessage: fm, +func (cpfc *CwtchPeerFetchChannel) FetchRequest() error { + if cpfc.channel.Pending == false { + fm := &protocol.FetchMessage{} + csp := &protocol.CwtchServerPacket{ + FetchMessage: fm, + } + packet, _ := proto.Marshal(csp) + cpfc.channel.SendMessage(packet) + } else { + return errors.New("channel isn't set up yet") } - packet, _ := proto.Marshal(csp) - cpfc.channel.SendMessage(packet) + return nil } // Packet is called for each raw packet received on this channel. @@ -88,13 +93,11 @@ func (cpfc *CwtchPeerFetchChannel) Packet(data []byte) { csp := &protocol.CwtchServerPacket{} err := proto.Unmarshal(data, csp) if err == nil { - if csp.GetGroupMessages() != nil { - gms := csp.GetGroupMessages() - for _, gm := range gms { - cpfc.Handler.HandleGroupMessage(gm) - } + if csp.GetGroupMessage() != nil { + gm := csp.GetGroupMessage() + // We create a new go routine here to avoid leaking any information about processing time + // TODO Server can probably try to use this to DoS a peer + go cpfc.Handler.HandleGroupMessage(gm) } } - // After a fetch we close the channel. - cpfc.channel.CloseChannel() } diff --git a/peer/fetch/peer_fetch_channel_test.go b/peer/fetch/peer_fetch_channel_test.go index c2a1442..0702053 100644 --- a/peer/fetch/peer_fetch_channel_test.go +++ b/peer/fetch/peer_fetch_channel_test.go @@ -6,6 +6,7 @@ import ( "github.com/s-rah/go-ricochet/channels" "github.com/s-rah/go-ricochet/wire/control" "testing" + "time" ) type TestHandler struct { @@ -72,14 +73,16 @@ func TestPeerFetchChannel(t *testing.T) { } csp := &protocol.CwtchServerPacket{ - GroupMessages: []*protocol.GroupMessage{ - {Ciphertext: []byte("hello"), Spamguard: []byte{}}, + GroupMessage: &protocol.GroupMessage{ + Ciphertext: []byte("hello"), Spamguard: []byte{}, }, } packet, _ := proto.Marshal(csp) pfc.Packet(packet) + time.Sleep(time.Second * 2) + if th.Received != true { t.Errorf("group message should not have been received") } diff --git a/peer/send/peer_send_channel.go b/peer/send/peer_send_channel.go index e0c73eb..538722b 100644 --- a/peer/send/peer_send_channel.go +++ b/peer/send/peer_send_channel.go @@ -18,71 +18,71 @@ type CwtchPeerSendChannel struct { } // Type returns the type string for this channel, e.g. "im.ricochet.server.send". -func (cplc *CwtchPeerSendChannel) Type() string { +func (cpsc *CwtchPeerSendChannel) Type() string { return "im.cwtch.server.send" } // Closed is called when the channel is closed for any reason. -func (cplc *CwtchPeerSendChannel) Closed(err error) { +func (cpsc *CwtchPeerSendChannel) Closed(err error) { } // OnlyClientCanOpen - for Cwtch server channels only peers may open. -func (cplc *CwtchPeerSendChannel) OnlyClientCanOpen() bool { +func (cpsc *CwtchPeerSendChannel) OnlyClientCanOpen() bool { return true } // Singleton - for Cwtch channels there can only be one instance per direction -func (cplc *CwtchPeerSendChannel) Singleton() bool { +func (cpsc *CwtchPeerSendChannel) Singleton() bool { return true } // Bidirectional - for Cwtch channels are not bidrectional -func (cplc *CwtchPeerSendChannel) Bidirectional() bool { +func (cpsc *CwtchPeerSendChannel) Bidirectional() bool { return false } // RequiresAuthentication - Cwtch channels require no auth -func (cplc *CwtchPeerSendChannel) RequiresAuthentication() string { +func (cpsc *CwtchPeerSendChannel) RequiresAuthentication() string { return "none" } // OpenInbound should never be called on peers. -func (cplc *CwtchPeerSendChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) { +func (cpsc *CwtchPeerSendChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) { return nil, errors.New("client does not receive inbound listen channels") } // OpenOutbound is used to set up a new send channel and initialize spamguard -func (cplc *CwtchPeerSendChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) { - cplc.spamGuard.Difficulty = 2 - cplc.channel = channel +func (cpsc *CwtchPeerSendChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) { + cpsc.spamGuard.Difficulty = 2 + cpsc.channel = channel messageBuilder := new(utils.MessageBuilder) - return messageBuilder.OpenChannel(channel.ID, cplc.Type()), nil + return messageBuilder.OpenChannel(channel.ID, cpsc.Type()), nil } // OpenOutboundResult confirms the open channel request and sets the spamguard challenge -func (cplc *CwtchPeerSendChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) { +func (cpsc *CwtchPeerSendChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) { if err == nil { if crm.GetOpened() { - cplc.channel.Pending = false + cpsc.channel.Pending = false ce, _ := proto.GetExtension(crm, protocol.E_ServerNonce) - cplc.challenge = ce.([]byte)[:] + cpsc.challenge = ce.([]byte)[:] } } } // SendGroupMessage performs the spamguard proof of work and sends a message. -func (cplc *CwtchPeerSendChannel) SendGroupMessage(gm *protocol.GroupMessage) error { - if cplc.channel.Pending == false { - sgsolve := cplc.spamGuard.SolveChallenge(cplc.challenge, gm.GetCiphertext()) +func (cpsc *CwtchPeerSendChannel) SendGroupMessage(gm *protocol.GroupMessage) error { + if cpsc.channel.Pending == false { + sgsolve := cpsc.spamGuard.SolveChallenge(cpsc.challenge, gm.GetCiphertext()) gm.Spamguard = sgsolve[:] csp := &protocol.CwtchServerPacket{ GroupMessage: gm, } packet, _ := proto.Marshal(csp) - cplc.channel.SendMessage(packet) - cplc.channel.CloseChannel() + cpsc.channel.SendMessage(packet) + cpsc.channel.CloseChannel() } else { return errors.New("channel isn't set up yet") } @@ -90,7 +90,7 @@ func (cplc *CwtchPeerSendChannel) SendGroupMessage(gm *protocol.GroupMessage) er } // Packet should never be -func (cplc *CwtchPeerSendChannel) Packet(data []byte) { +func (cpsc *CwtchPeerSendChannel) Packet(data []byte) { // If we receive a packet on this channel, close the connection - cplc.channel.CloseChannel() + cpsc.channel.CloseChannel() } diff --git a/protocol/group_message.pb.go b/protocol/group_message.pb.go index f1fe74d..596200c 100644 --- a/protocol/group_message.pb.go +++ b/protocol/group_message.pb.go @@ -16,7 +16,6 @@ var _ = math.Inf type CwtchServerPacket struct { GroupMessage *GroupMessage `protobuf:"bytes,1,opt,name=group_message,json=groupMessage" json:"group_message,omitempty"` FetchMessage *FetchMessage `protobuf:"bytes,2,opt,name=fetch_message,json=fetchMessage" json:"fetch_message,omitempty"` - GroupMessages []*GroupMessage `protobuf:"bytes,3,rep,name=group_messages,json=groupMessages" json:"group_messages,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -39,13 +38,6 @@ func (m *CwtchServerPacket) GetFetchMessage() *FetchMessage { return nil } -func (m *CwtchServerPacket) GetGroupMessages() []*GroupMessage { - if m != nil { - return m.GroupMessages - } - return nil -} - type FetchMessage struct { XXX_unrecognized []byte `json:"-"` } diff --git a/protocol/group_message.proto b/protocol/group_message.proto index df6110d..9d3e263 100644 --- a/protocol/group_message.proto +++ b/protocol/group_message.proto @@ -6,7 +6,6 @@ import "ControlChannel.proto"; message CwtchServerPacket { optional GroupMessage group_message = 1; optional FetchMessage fetch_message = 2; - repeated GroupMessage group_messages = 3; } extend protocol.ChannelResult { diff --git a/server/fetch/server_fetch_channel.go b/server/fetch/server_fetch_channel.go index d3b41ef..1e4bf26 100644 --- a/server/fetch/server_fetch_channel.go +++ b/server/fetch/server_fetch_channel.go @@ -75,12 +75,15 @@ func (cc *CwtchServerFetchChannel) OpenOutboundResult(err error, crm *Protocol_D // NOTE: Should never be called } -// SendGroupMessages sends a batch of group messages to the client. -func (cc *CwtchServerFetchChannel) SendGroupMessages(gm []*protocol.GroupMessage) { - csp := &protocol.CwtchServerPacket{} - csp.GroupMessages = gm - packet, _ := proto.Marshal(csp) - cc.channel.SendMessage(packet) +// SendGroupMessages sends a series of group messages to the client. +func (cc *CwtchServerFetchChannel) SendGroupMessages(gms []*protocol.GroupMessage) { + for _, gm := range gms { + csp := &protocol.CwtchServerPacket{ + GroupMessage: gm, + } + packet, _ := proto.Marshal(csp) + cc.channel.SendMessage(packet) + } } // Packet is called for each raw packet received on this channel. diff --git a/server/server_instance.go b/server/server_instance.go index bde9645..07d0e33 100644 --- a/server/server_instance.go +++ b/server/server_instance.go @@ -27,7 +27,7 @@ func (si *Instance) HandleFetchRequest() []*protocol.GroupMessage { return si.msi.FetchMessages() } -// HandleGroupMessage takes ina group message and distributes it to all listening peers +// HandleGroupMessage takes in a group message and distributes it to all listening peers func (si *Instance) HandleGroupMessage(gm *protocol.GroupMessage) { si.msi.AddMessage(*gm) go si.ra.Broadcast(func(rai *application.ApplicationInstance) { diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index f9df6f0..ce5dcd3 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -7,47 +7,57 @@ import ( "fmt" "github.com/s-rah/go-ricochet" "github.com/s-rah/go-ricochet/utils" - "io/ioutil" - "log" "os" "runtime" "testing" "time" + "crypto/rsa" + "io/ioutil" + "log" ) const ( - keyfile = "./private_key" + serverKeyfile = "./../server/app/private_key" + localKeyfile = "./private_key" ) var ( - aliceLines = []string{"Hello", "My name is Alice", "bye"} - bobLines = []string{"Hi", "My name is Bob.", "toodles", "hello?"} + aliceLines = []string{"Hello, I'm Alice", "bye"} + bobLines = []string{"Hi, my name is Bob.", "toodles", "welcome"} + carolLines = []string{"Howdy, thanks!"} ) -func checkAndGenPrivateKey(privateKeyFile string) (generated bool) { - if _, err := os.Stat(privateKeyFile); os.IsNotExist(err) { +func loadOrGenPrivateKey(t *testing.T) (pk *rsa.PrivateKey, generated bool) { + if _, err := os.Stat(serverKeyfile); os.IsNotExist(err) { fmt.Println("generating new private key...") pk, err := utils.GeneratePrivateKey() if err != nil { - log.Fatalf("error generating new private key: %v\n", err) + t.Fatalf("error generating new private key: %v\n", err) } - err = ioutil.WriteFile(privateKeyFile, []byte(utils.PrivateKeyToString(pk)), 0400) + err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0400) if err != nil { - log.Fatalf("error writing new private key to file %s: %v\n", privateKeyFile, err) + log.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err) } - return true + + return pk,true } - return false + fmt.Println("Found server key " + serverKeyfile + ", loading...") + pk, err := utils.LoadPrivateKeyFromFile(serverKeyfile) + if err != nil { + t.Fatalf("Could not load server's key from %v", serverKeyfile) + } + return pk, false } -func printAndVerifyTimeline(t *testing.T, timeline []model.Message) error { +func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int { + numVerified := 0 for _, message := range timeline { fmt.Printf("%v %v> %s [%t]\n", message.Timestamp, message.PeerID, message.Message, message.Verified) - if !message.Verified { - t.Errorf("Message '%s' from '%s' not verified!", message.Message, message.PeerID) + if message.Verified { + numVerified++ } } - return nil + return numVerified } func serverCheck(serverAddr string) bool { @@ -68,12 +78,7 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Cwtch Server managment ***** var server *cwtchserver.Server = nil - generatedKey := checkAndGenPrivateKey(keyfile) - - serverKey, err := utils.LoadPrivateKeyFromFile(keyfile) - if err != nil { - t.Errorf("Could not load server's key from %v", keyfile) - } + serverKey, generatedKey := loadOrGenPrivateKey(t) serverAddr, _ := utils.GetOnionAddress(serverKey) serverOnline := false @@ -86,7 +91,8 @@ func TestCwtchPeerIntegration(t *testing.T) { // launch app server = new(cwtchserver.Server) fmt.Printf("No server found\nStarting cwtch server...\n") - go server.Run(keyfile) + + go server.Run(localKeyfile) // let tor get established fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr) @@ -109,24 +115,39 @@ func TestCwtchPeerIntegration(t *testing.T) { go bob.Listen() fmt.Println("Bob created:", bob.Profile.Onion) - fmt.Println("Waiting for alice and bob to connection with onion network...") - time.Sleep(time.Second * 60) + fmt.Println("Creating Carol...") + carol := peer.NewCwtchPeer("Carol") + go carol.Listen() + fmt.Println("Carol created:", carol.Profile.Onion) + + fmt.Println("Waiting for Alice, Bob, and Carol to connection with onion network...") + time.Sleep(time.Second * 70) numGoRoutinesPostPeerStart := runtime.NumGoroutine() - // ***** Peering and group creation / invite ***** + // ***** Peering, server joining, group creation / invite ***** fmt.Println("Creating group on ", serverAddr, "...") groupId, _, err := alice.Profile.StartGroup(serverAddr) fmt.Printf("Created group: %v!\n", groupId) if err != nil { - t.Errorf("Failed to start group: %v", err) + t.Errorf("Failed to init group: %v", err) return } - fmt.Println("Peering Alice peering with bob...") + fmt.Println("Alice peering with Bob...") alice.PeerWithOnion(bob.Profile.Onion) + fmt.Println("Alice peering with Carol...") + alice.PeerWithOnion(carol.Profile.Onion) - time.Sleep(time.Second * 15) + fmt.Println("Alice joining server...") + alice.JoinServer(serverAddr) + + fmt.Println("Bob joining server...") + bob.JoinServer(serverAddr) + + fmt.Println("Waiting for peerings and server joins...") + + time.Sleep(time.Second * 60) fmt.Println("Alice inviting Bob to group...") err = alice.InviteOnionToGroup(bob.Profile.Onion, groupId) @@ -144,19 +165,36 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.AcceptInvite(group.GroupID) } } - time.Sleep(time.Second * 3) + time.Sleep(time.Second * 5) - numGoRoutinesPostPeer := runtime.NumGoroutine() + numGoRoutinesPostServerConnect := runtime.NumGoroutine() - fmt.Println("Alice joining server...") - alice.JoinServer(serverAddr) + // ***** Fill up message history of server ****** - fmt.Println("Bob joining server...") - bob.JoinServer(serverAddr) + /* + // filler group will be used to fill up the servers message history a bit to stress test fetch later for carol + fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr) + if err != nil { + t.Errorf("Failed to init filler group: %v", err) + return + } - // Wait for them to join the server - time.Sleep(time.Second * 40) - numGouRoutinesPostServerConnect := runtime.NumGoroutine() + fmt.Println("Alice filling message history of server...") + for i := 0; i < 100; i++ { + + go func (x int) { + time.Sleep(time.Second * time.Duration(x)) + err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0]) + if err != nil { + fmt.Println("SEND", x, "ERROR:", err) + } else { + fmt.Println("SEND", x, " SUCCESS!") + } + }(i) + } + + time.Sleep(time.Second * 110) + */ // ***** Conversation ***** @@ -176,23 +214,47 @@ func TestCwtchPeerIntegration(t *testing.T) { } time.Sleep(time.Second * 10) - // "instant" - could be either order? fmt.Println("Alice> ", aliceLines[1]) alice.SendMessageToGroup(groupId, aliceLines[1]) + time.Sleep(time.Second * 10) + fmt.Println("Bob> ", bobLines[1]) bob.SendMessageToGroup(groupId, bobLines[1]) time.Sleep(time.Second * 10) - fmt.Println("Alice> ", aliceLines[2]) - alice.SendMessageToGroup(groupId, aliceLines[2]) - // Todo: Alice disconnects + fmt.Println("Alice inviting Carol to group...") + err = alice.InviteOnionToGroup(carol.Profile.Onion, groupId) + if err != nil { + t.Fatalf("Error for Alice inviting Carol to group: %v", err) + } time.Sleep(time.Second * 10) + fmt.Println("Carol examining groups and accepting invites...") + for _, group := range carol.Profile.Groups { + fmt.Printf("Carol group: %v (Accepted: %v)\n", group.GroupID, group.Accepted) + if group.Accepted == false { + fmt.Printf("Carol received and accepting group invite: %v\n", group.GroupID) + carol.AcceptInvite(group.GroupID) + } + } + + fmt.Println("Shutting down Alice...") + alice.Shutdown() + time.Sleep(time.Second * 5) + numGoRoutinesPostAlice := runtime.NumGoroutine() + + fmt.Println("Carol joining server...") + carol.JoinServer(serverAddr) + + time.Sleep(time.Second * 60) + numGoRotinesPostCarolConnect := runtime.NumGoroutine() fmt.Println("Bob> ", bobLines[2]) bob.SendMessageToGroup(groupId, bobLines[2]) time.Sleep(time.Second * 10) - // Todo: Alice reconects, gets missed messages (from bob) + fmt.Println("Carol> ", carolLines[0]) + carol.SendMessageToGroup(groupId, carolLines[0]) + time.Sleep(time.Second * 10) // ***** Verify Test ***** @@ -200,43 +262,71 @@ func TestCwtchPeerIntegration(t *testing.T) { time.Sleep(time.Second * 15) alicesGroup := alice.Profile.GetGroupByGroupID(groupId) - fmt.Printf("alice Groups:\n") - for k := range alice.Profile.Groups { - fmt.Println(" " + k) - } if alicesGroup == nil { t.Error("aliceGroup == nil") return } - fmt.Printf("Alice TimeLine:\n") - printAndVerifyTimeline(t, alicesGroup.GetTimeline()) + fmt.Printf("Alice's TimeLine:\n") + aliceVerified := printAndCountVerifedTimeline(t, alicesGroup.GetTimeline()) + if aliceVerified != 4 { + t.Errorf("Alice did not have 4 verified messages") + } bobsGroup := bob.Profile.GetGroupByGroupID(groupId) - fmt.Printf("bob Groups:\n") - for k := range bob.Profile.Groups { - fmt.Println(" " + k) - } if bobsGroup == nil { t.Error("bobGroup == nil") return } - fmt.Printf("Bob TimeLine:\n") - printAndVerifyTimeline(t, bobsGroup.GetTimeline()) + fmt.Printf("Bob's TimeLine:\n") + bobVerified := printAndCountVerifedTimeline(t, bobsGroup.GetTimeline()) + if bobVerified != 5 { + t.Errorf("Bob did not have 5 verified messages") + } - if len(alicesGroup.Timeline.Messages) != 6 { + carolsGroup := carol.Profile.GetGroupByGroupID(groupId) + fmt.Printf("Carol's TimeLine:\n") + carolVerified := printAndCountVerifedTimeline(t, carolsGroup.GetTimeline()) + if carolVerified != 3 { + t.Errorf("Carol did not have 3 verified messages") + } + + if len(alicesGroup.GetTimeline()) != 4 { t.Errorf("Alice's timeline does not have all messages") return } - // check message 0,1 and 4,5 for content (2,3 could be out of order) - aliceGroupTimeline := alicesGroup.GetTimeline() - if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || - aliceGroupTimeline[4].Message != aliceLines[2] || aliceGroupTimeline[5].Message != bobLines[2] { - t.Errorf("Some of the messages did not have the expected content!") + if len(bobsGroup.GetTimeline()) != 6 { + t.Errorf("Bob's timeline does not have all messages") + } + + // check message 0,1,2,3 + aliceGroupTimeline := alicesGroup.GetTimeline() + if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || + aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { + t.Errorf("Some of Alice's timeline messages did not have the expected content!") + } + + // check message 0,1,2,3,4,5 + bobGroupTimeline := bobsGroup.GetTimeline() + if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || + bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || + bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Bob's timeline messages did not have the expected content!") + } + + if len(carolsGroup.GetTimeline()) != 6 { + t.Errorf("Carol's timeline does not have all messages") + } + + // check message 0,1,2,3,4,5 + carolGroupTimeline := carolsGroup.GetTimeline() + if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || + carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || + carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Carol's timeline messages did not have the expected content!") } - // Todo: shutdown users and server fmt.Println("Shutting down Bob...") bob.Shutdown() time.Sleep(time.Second * 3) @@ -247,16 +337,18 @@ func TestCwtchPeerIntegration(t *testing.T) { time.Sleep(time.Second * 3) } numGoRoutinesPostServerShutdown := runtime.NumGoroutine() - fmt.Println("Shutting down Alice...") - alice.Shutdown() + + fmt.Println("Shuttind down Carol...") + carol.Shutdown() time.Sleep(time.Second * 3) - numGoRoutinesPostAlice := runtime.NumGoroutine() + numGoRoutinesPostCarol := runtime.NumGoroutine() - fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeer: %v\nnumGouRoutinesPostServerConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostAlice: %v\n", - numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostPeer, numGouRoutinesPostServerConnect, - numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostAlice) + fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n" + + "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n", + numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, + numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol) - if numGoRoutinesPostServer != numGoRoutinesPostAlice { - t.Errorf("Number of GoRoutines once server checks were completed (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesPostServer, numGoRoutinesPostAlice) + if numGoRoutinesPostServer != numGoRoutinesPostCarol { + t.Errorf("Number of GoRoutines once server checks were completed (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesPostServer, numGoRoutinesPostCarol) } }