diff --git a/peer/connections/peerpeerconnection_test.go b/peer/connections/peerpeerconnection_test.go index c03226b..73ede7d 100644 --- a/peer/connections/peerpeerconnection_test.go +++ b/peer/connections/peerpeerconnection_test.go @@ -85,7 +85,7 @@ func TestPeerPeerConnection(t *testing.T) { t.Errorf("Onion address error %v", err) } - profile := model.GenerateNewProfile("sarah") + profile := model.GenerateNewProfile("alice") ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile) //numcalls := 0 tp := new(TestPeer) diff --git a/server/app/serverConfig.json.sample b/server/app/serverConfig.json.sample index 985da45..a6b2d13 100644 --- a/server/app/serverConfig.json.sample +++ b/server/app/serverConfig.json.sample @@ -1,7 +1,7 @@ { - "maxBufferLines": 100000 - serverReporting: { - reportingGroupId: "" - reportingServerAddr: "" + "maxBufferLines": 100000, + "serverReporting": { + "reportingGroupId": "", + "reportingServerAddr": "" } } diff --git a/server/metrics/metrics.go b/server/metrics/metrics.go index 13a53e9..779dcb8 100644 --- a/server/metrics/metrics.go +++ b/server/metrics/metrics.go @@ -1,6 +1,9 @@ package metrics import ( + "bufio" + "fmt" + "strings" "sync" "sync/atomic" "time" @@ -13,9 +16,10 @@ type counter struct { // Counter providers a threadsafe counter to use for storing long running counts type Counter interface { - Add(unit uint64) + Add(unit int) + Reset() - Count() uint64 + Count() int GetStarttime() time.Time } @@ -26,13 +30,18 @@ func NewCounter() Counter { } // Add add a count of unit to the counter -func (c *counter) Add(unit uint64) { - atomic.AddUint64(&c.count, unit) +func (c *counter) Add(unit int) { + atomic.AddUint64(&c.count, uint64(unit)) } // Count returns the count since Start -func (c *counter) Count() uint64 { - return atomic.LoadUint64(&c.count) +func (c *counter) Count() int { + return int(atomic.LoadUint64(&c.count)) +} + +func (c *counter) Reset() { + atomic.StoreUint64(&c.count, 0) + c.startTime = time.Now() } // GetStarttime returns the starttime of the counter @@ -40,15 +49,33 @@ func (c *counter) GetStarttime() time.Time { return c.startTime } -type monitorHistory struct { - starttime time.Time - perMinutePerHour [60]int - perHourForDay [24]int - perDayForWeek [7]int - perWeekForMonth [4]int - perMonthForYear [12]int +// MonitorType controls how the monitor will report itself +type MonitorType int - monitor func() int +const ( + // Count indicates the monitor should report in interger format + Count MonitorType = iota + // Percent indicates the monitor should report in decimal format with 2 places + Percent + // MegaBytes indicates the monitor should transform the raw number into MBs + MegaBytes +) + +type monitorHistory struct { + monitorType MonitorType + + starttime time.Time + perMinutePerHour [60]float64 + timeLastHourRotate time.Time + perHourForDay [24]float64 + timeLastDayRotate time.Time + perDayForWeek [7]float64 + timeLastWeekRotate time.Time + perWeekForMonth [4]float64 + timeLastMonthRotate time.Time + perMonthForYear [12]float64 + + monitor func() float64 breakChannel chan bool lock sync.Mutex @@ -59,16 +86,18 @@ type MonitorHistory interface { Start() Stop() - Minutes() []int - Hours() []int - Days() []int - Weeks() []int - Months() []int + Minutes() []float64 + Hours() []float64 + Days() []float64 + Weeks() []float64 + Months() []float64 + + Report(w *bufio.Writer) } // NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor -func NewMonitorHistory(monitor func() int) MonitorHistory { - mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)} +func NewMonitorHistory(t MonitorType, monitor func() float64) MonitorHistory { + mh := &monitorHistory{monitorType: t, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)} mh.Start() return mh } @@ -84,32 +113,56 @@ func (mh *monitorHistory) Stop() { } // Minutes returns the last 60 minute monitoring results -func (mh *monitorHistory) Minutes() []int { +func (mh *monitorHistory) Minutes() []float64 { return mh.returnCopy(mh.perMinutePerHour[:]) } // Hours returns the last 24 hourly averages of monitor results -func (mh *monitorHistory) Hours() []int { +func (mh *monitorHistory) Hours() []float64 { return mh.returnCopy(mh.perHourForDay[:]) } // Days returns the last 7 day averages of monitor results -func (mh *monitorHistory) Days() []int { +func (mh *monitorHistory) Days() []float64 { return mh.returnCopy(mh.perDayForWeek[:]) } // Weeks returns the last 4 weeks of averages of monitor results -func (mh *monitorHistory) Weeks() []int { +func (mh *monitorHistory) Weeks() []float64 { return mh.returnCopy(mh.perWeekForMonth[:]) } // Months returns the last 12 months of averages of monitor results -func (mh *monitorHistory) Months() []int { +func (mh *monitorHistory) Months() []float64 { return mh.returnCopy(mh.perMonthForYear[:]) } -func (mh *monitorHistory) returnCopy(slice []int) []int { - retSlice := make([]int, len(slice)) +func (mh *monitorHistory) Report(w *bufio.Writer) { + fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:])) + fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:])) + fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:])) + fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:])) + fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:])) +} + +func reportLine(t MonitorType, array []float64) string { + switch t { + case Count: + return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(array)), " "), "[]") + case Percent: + return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.2f", array)), " "), "[]") + case MegaBytes: + mbs := make([]int, len(array)) + for i, b := range array { + mbs[i] = int(b) / 1024 / 1024 + } + return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%d", mbs)), "MBs "), "[]") + "MBs" + } + return "" +} + +func (mh *monitorHistory) returnCopy(slice []float64) []float64 { + retSlice := make([]float64, len(slice)) mh.lock.Lock() for i, v := range slice { retSlice[i] = v @@ -118,15 +171,22 @@ func (mh *monitorHistory) returnCopy(slice []int) []int { return retSlice } -func rotateAndAvg(array []int, newVal int) int { - total := 0 +func rotateAndAvg(array []float64, newVal float64) float64 { + total := float64(0.0) for i := len(array) - 1; i > 0; i-- { array[i] = array[i-1] total += array[i] } array[0] = newVal total += newVal - return total / len(array) + return total / float64(len(array)) +} +func average(array []float64) float64 { + total := float64(0) + for _, x := range array { + total += x + } + return total / float64(len(array)) } // monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation @@ -139,10 +199,26 @@ func (mh *monitorHistory) monitorThread() { mh.lock.Lock() minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor()) - hourAvg := rotateAndAvg(mh.perHourForDay[:], minuteAvg) - dayAvg := rotateAndAvg(mh.perDayForWeek[:], hourAvg) - weekAvg := rotateAndAvg(mh.perWeekForMonth[:], dayAvg) - rotateAndAvg(mh.perMonthForYear[:], weekAvg) + + if time.Now().Sub(mh.timeLastHourRotate) > time.Hour { + rotateAndAvg(mh.perHourForDay[:], minuteAvg) + mh.timeLastHourRotate = time.Now() + } + + if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 { + rotateAndAvg(mh.perDayForWeek[:], average(mh.perHourForDay[:])) + mh.timeLastDayRotate = time.Now() + } + + if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 { + rotateAndAvg(mh.perWeekForMonth[:], average(mh.perDayForWeek[:])) + mh.timeLastWeekRotate = time.Now() + } + + if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 { + rotateAndAvg(mh.perMonthForYear[:], average(mh.perWeekForMonth[:])) + mh.timeLastMonthRotate = time.Now() + } mh.lock.Unlock() diff --git a/server/metrics/metrics_test.go b/server/metrics/metrics_test.go index f6bec06..b838f33 100644 --- a/server/metrics/metrics_test.go +++ b/server/metrics/metrics_test.go @@ -31,36 +31,36 @@ func TestCounter(t *testing.T) { } func TestMonitorHistory(t *testing.T) { - value := 60 * 24 * 7 * 4 * 12 + value := float64(60 * 24 * 7 * 4 * 12) numGoRoutinesStart := runtime.NumGoroutine() - mh := NewMonitorHistory(func() int { return value }) + mh := NewMonitorHistory(Count, func() float64 { return value }) time.Sleep(1 * time.Second) mh.Stop() time.Sleep(1 * time.Second) minutes := mh.Minutes() if minutes[0] != value { - t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %d Actual: %d", value, minutes[0]) + t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %f Actual: %f", value, minutes[0]) } hours := mh.Hours() if hours[0] != value/60 { - t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %d Actual: %d", value, hours[0]) + t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %f Actual: %f", value, hours[0]) } days := mh.Days() if days[0] != value/60/24 { - t.Errorf("Monitor day's first day slot had the wrong value. Expected: %d Actual: %d", value/24, days[0]) + t.Errorf("Monitor day's first day slot had the wrong value. Expected: %f Actual: %f", value/24, days[0]) } weeks := mh.Weeks() if weeks[0] != value/60/24/7 { - t.Errorf("Monitor week's first day slot had the wrong value. Expected: %d Actual: %d", value/24/7, weeks[0]) + t.Errorf("Monitor week's first day slot had the wrong value. Expected: %f Actual: %f", value/24/7, weeks[0]) } months := mh.Months() if months[0] != 12 { - t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %d", 12, months[0]) + t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %f", 12, months[0]) } if numGoRoutinesStart != runtime.NumGoroutine() { diff --git a/server/metrics/monitors.go b/server/metrics/monitors.go new file mode 100644 index 0000000..14fbc8d --- /dev/null +++ b/server/metrics/monitors.go @@ -0,0 +1,93 @@ +package metrics + +import ( + "bufio" + "fmt" + "git.openprivacy.ca/openprivacy/libricochet-go/application" + "github.com/struCoder/pidusage" + "log" + "os" + "time" +) + +const ( + reportFile = "serverMonitorReport.txt" +) + +// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns +type Monitors struct { + MessageCounter Counter + Messages MonitorHistory + CPU MonitorHistory + Memory MonitorHistory + ClientConns MonitorHistory + starttime time.Time + breakChannel chan bool +} + +// Start initializes a Monitors's monitors +func (mp *Monitors) Start(ra *application.RicochetApplication) { + mp.starttime = time.Now() + mp.breakChannel = make(chan bool) + mp.MessageCounter = NewCounter() + mp.Messages = NewMonitorHistory(Count, func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return }) + mp.CPU = NewMonitorHistory(Percent, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) }) + mp.Memory = NewMonitorHistory(MegaBytes, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) }) + mp.ClientConns = NewMonitorHistory(Count, func() float64 { return float64(ra.ConnectionCount()) }) + + // Todo: replace with proper reporting + go mp.log() +} + +func (mp *Monitors) log() { + for { + select { + case <-time.After(time.Minute): + messageMinutes := mp.Messages.Minutes() + cpuMinutes := mp.CPU.Minutes() + memoryMinutes := mp.Memory.Minutes() + listenConnsMinutes := mp.ClientConns.Minutes() + log.Printf("METRICS: Messages: %.0f ClientConns: %.0f CPU: %.2f Memory: %.0fMBs\n", messageMinutes[0], listenConnsMinutes[0], cpuMinutes[0], memoryMinutes[0]/1024/1024) + mp.report() + case <-mp.breakChannel: + return + } + } +} + +func (mp *Monitors) report() { + f, err := os.Create(reportFile) + if err != nil { + log.Println("ERROR: Could not open monitor reporting file: ", err) + return + } + defer f.Close() + + w := bufio.NewWriter(f) + + fmt.Fprintf(w, "Started: %v\n\n", mp.starttime) + + fmt.Fprintln(w, "Messages:") + mp.Messages.Report(w) + + fmt.Fprintln(w, "\nClient Connections:") + mp.ClientConns.Report(w) + + fmt.Fprintln(w, "\nCPU:") + mp.CPU.Report(w) + + fmt.Fprintln(w, "\nMemory:") + mp.Memory.Report(w) + + w.Flush() + +} + +// Stop stops all the monitors in a Monitors +func (mp *Monitors) Stop() { + mp.breakChannel <- true + mp.Messages.Stop() + mp.CPU.Stop() + mp.Memory.Stop() + mp.ClientConns.Stop() +} diff --git a/server/server.go b/server/server.go index d79dd02..e10825b 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( "cwtch.im/cwtch/server/fetch" "cwtch.im/cwtch/server/listen" + "cwtch.im/cwtch/server/metrics" "cwtch.im/cwtch/server/send" "cwtch.im/cwtch/storage" "git.openprivacy.ca/openprivacy/libricochet-go/application" @@ -12,8 +13,9 @@ import ( // Server encapsulates a complete, compliant Cwtch server. type Server struct { - app *application.RicochetApplication - config *Config + app *application.RicochetApplication + config *Config + metricsPack metrics.Monitors } // Run starts a server with the given privateKey @@ -21,6 +23,7 @@ type Server struct { func (s *Server) Run(serverConfig *Config) { s.config = serverConfig cwtchserver := new(application.RicochetApplication) + s.metricsPack.Start(cwtchserver) l, err := application.SetupOnion("127.0.0.1:9051", "tcp4", "", s.config.PrivateKey(), 9878) @@ -31,10 +34,8 @@ func (s *Server) Run(serverConfig *Config) { af := application.ApplicationInstanceFactory{} af.Init() ms := new(storage.MessageStore) - ms.Init("cwtch.messages", s.config.MaxBufferLines) + ms.Init("cwtch.messages", s.config.MaxBufferLines, s.metricsPack.MessageCounter) af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler { - si := new(Instance) - si.Init(rai, cwtchserver, ms) return func() channels.Handler { cslc := new(listen.CwtchServerListenChannel) return cslc @@ -70,4 +71,5 @@ func (s *Server) Run(serverConfig *Config) { // Shutdown kills the app closing all connections and freeing all goroutines func (s *Server) Shutdown() { s.app.Shutdown() + s.metricsPack.Stop() } diff --git a/server/server_instance_test.go b/server/server_instance_test.go index 088fdb1..14afb32 100644 --- a/server/server_instance_test.go +++ b/server/server_instance_test.go @@ -2,6 +2,7 @@ package server import ( "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/server/metrics" "cwtch.im/cwtch/storage" "git.openprivacy.ca/openprivacy/libricochet-go/application" "os" @@ -15,7 +16,7 @@ func TestServerInstance(t *testing.T) { ra := new(application.RicochetApplication) msi := new(storage.MessageStore) os.Remove("ms.test") - msi.Init("ms.test", 5) + msi.Init("ms.test", 5, metrics.NewCounter()) gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."), Spamguard: []byte{}, diff --git a/storage/message_store.go b/storage/message_store.go index 7841d8a..41ed151 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -3,6 +3,7 @@ package storage import ( "bufio" "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/server/metrics" "encoding/json" "fmt" "log" @@ -18,12 +19,13 @@ type MessageStoreInterface interface { // MessageStore is a file-backed implementation of MessageStoreInterface type MessageStore struct { - file *os.File - lock sync.Mutex - messages []*protocol.GroupMessage - bufferSize int - pos int - rotated bool + file *os.File + lock sync.Mutex + messages []*protocol.GroupMessage + messageCounter metrics.Counter + bufferSize int + pos int + rotated bool } // Close closes the message store and underlying resources. @@ -44,7 +46,7 @@ func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) { } // Init sets up a MessageStore of size bufferSize backed by filename -func (ms *MessageStore) Init(filename string, bufferSize int) { +func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) { f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) if err != nil { panic(err) @@ -54,6 +56,7 @@ func (ms *MessageStore) Init(filename string, bufferSize int) { ms.bufferSize = bufferSize ms.messages = make([]*protocol.GroupMessage, bufferSize) ms.rotated = false + ms.messageCounter = messageCounter scanner := bufio.NewScanner(f) for scanner.Scan() { @@ -89,6 +92,7 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { // AddMessage adds a GroupMessage to the store func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) { + ms.messageCounter.Add(1) ms.lock.Lock() ms.updateBuffer(&gm) s, err := json.Marshal(gm) diff --git a/storage/message_store_test.go b/storage/message_store_test.go index e6b5407..da10bf9 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -2,6 +2,7 @@ package storage import ( "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/server/metrics" "os" "strconv" "testing" @@ -10,7 +11,8 @@ import ( func TestMessageStore(t *testing.T) { os.Remove("ms.test") ms := new(MessageStore) - ms.Init("ms.test", 100000) + counter := metrics.NewCounter() + ms.Init("ms.test", 100000, counter) for i := 0; i < 50000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), @@ -18,13 +20,18 @@ func TestMessageStore(t *testing.T) { } ms.AddMessage(gm) } + if counter.Count() != 50000 { + t.Errorf("Counter should be at 50000 was %v", counter.Count()) + } ms.Close() - ms.Init("ms.test", 100000) + ms.Init("ms.test", 100000, counter) m := ms.FetchMessages() if len(m) != 50000 { - t.Errorf("Should have been 5000 was %v", len(m)) + t.Errorf("Should have been 50000 was %v", len(m)) } + counter.Reset() + for i := 0; i < 100000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index 79b3456..0615de6 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -28,6 +28,7 @@ var ( carolLines = []string{"Howdy, thanks!"} ) +// TODO: fix to load private key from server/app/serverConfig.json func loadPrivateKey(t *testing.T) *rsa.PrivateKey { if _, err := os.Stat(serverKeyfile); os.IsNotExist(err) { return nil @@ -86,7 +87,6 @@ func serverCheck(t *testing.T, serverAddr string) bool { func waitForPeerConnection(t *testing.T, peer peer.CwtchPeerInterface, server string) { for { servers := peer.GetServers() - fmt.Println(servers) state, ok := servers[server] if ok { if state == connections.FAILED { @@ -298,8 +298,8 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Verify Test ***** - // final syncing time... - time.Sleep(time.Second * 15) + fmt.Println("Final syncing time...") + time.Sleep(time.Second * 30) alicesGroup := alice.GetGroup(groupID) if alicesGroup == nil { @@ -333,38 +333,37 @@ func TestCwtchPeerIntegration(t *testing.T) { if len(alicesGroup.GetTimeline()) != 4 { t.Errorf("Alice's timeline does not have all messages") - return + } else { + // check message 0,1,2,3 + aliceGroupTimeline := alicesGroup.GetTimeline() + if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || + aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { + t.Errorf("Some of Alice's timeline messages did not have the expected content!") + } } if len(bobsGroup.GetTimeline()) != 6 { t.Errorf("Bob's timeline does not have all messages") - } - - // check message 0,1,2,3 - aliceGroupTimeline := alicesGroup.GetTimeline() - if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || - aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { - t.Errorf("Some of Alice's timeline messages did not have the expected content!") - } - - // check message 0,1,2,3,4,5 - bobGroupTimeline := bobsGroup.GetTimeline() - if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || - bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || - bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { - t.Errorf("Some of Bob's timeline messages did not have the expected content!") + } else { + // check message 0,1,2,3,4,5 + bobGroupTimeline := bobsGroup.GetTimeline() + if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || + bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || + bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Bob's timeline messages did not have the expected content!") + } } if len(carolsGroup.GetTimeline()) != 6 { t.Errorf("Carol's timeline does not have all messages") - } - - // check message 0,1,2,3,4,5 - carolGroupTimeline := carolsGroup.GetTimeline() - if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || - carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || - carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { - t.Errorf("Some of Carol's timeline messages did not have the expected content!") + } else { + // check message 0,1,2,3,4,5 + carolGroupTimeline := carolsGroup.GetTimeline() + if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || + carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || + carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Carol's timeline messages did not have the expected content!") + } } fmt.Println("Shutting down Bob...")