diff --git a/peer/connections/peerpeerconnection_test.go b/peer/connections/peerpeerconnection_test.go index 73ede7d..c03226b 100644 --- a/peer/connections/peerpeerconnection_test.go +++ b/peer/connections/peerpeerconnection_test.go @@ -85,7 +85,7 @@ func TestPeerPeerConnection(t *testing.T) { t.Errorf("Onion address error %v", err) } - profile := model.GenerateNewProfile("alice") + profile := model.GenerateNewProfile("sarah") ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile) //numcalls := 0 tp := new(TestPeer) diff --git a/server/app/serverConfig.json.sample b/server/app/serverConfig.json.sample index a6b2d13..985da45 100644 --- a/server/app/serverConfig.json.sample +++ b/server/app/serverConfig.json.sample @@ -1,7 +1,7 @@ { - "maxBufferLines": 100000, - "serverReporting": { - "reportingGroupId": "", - "reportingServerAddr": "" + "maxBufferLines": 100000 + serverReporting: { + reportingGroupId: "" + reportingServerAddr: "" } } diff --git a/server/metrics/metrics.go b/server/metrics/metrics.go index 47b81e2..13a53e9 100644 --- a/server/metrics/metrics.go +++ b/server/metrics/metrics.go @@ -13,10 +13,9 @@ type counter struct { // Counter providers a threadsafe counter to use for storing long running counts type Counter interface { - Add(unit int) - Reset() + Add(unit uint64) - Count() int + Count() uint64 GetStarttime() time.Time } @@ -27,18 +26,13 @@ func NewCounter() Counter { } // Add add a count of unit to the counter -func (c *counter) Add(unit int) { - atomic.AddUint64(&c.count, uint64(unit)) +func (c *counter) Add(unit uint64) { + atomic.AddUint64(&c.count, unit) } // Count returns the count since Start -func (c *counter) Count() int { - return int(atomic.LoadUint64(&c.count)) -} - -func (c *counter) Reset() { - atomic.StoreUint64(&c.count, 0) - c.startTime = time.Now() +func (c *counter) Count() uint64 { + return atomic.LoadUint64(&c.count) } // GetStarttime returns the starttime of the counter @@ -47,18 +41,14 @@ func (c *counter) GetStarttime() time.Time { } type monitorHistory struct { - starttime time.Time - perMinutePerHour [60]float64 - timeLastHourRotate time.Time - perHourForDay [24]float64 - timeLastDayRotate time.Time - perDayForWeek [7]float64 - timeLastWeekRotate time.Time - perWeekForMonth [4]float64 - timeLastMonthRotate time.Time - perMonthForYear [12]float64 + starttime time.Time + perMinutePerHour [60]int + perHourForDay [24]int + perDayForWeek [7]int + perWeekForMonth [4]int + perMonthForYear [12]int - monitor func() float64 + monitor func() int breakChannel chan bool lock sync.Mutex @@ -69,15 +59,15 @@ type MonitorHistory interface { Start() Stop() - Minutes() []float64 - Hours() []float64 - Days() []float64 - Weeks() []float64 - Months() []float64 + Minutes() []int + Hours() []int + Days() []int + Weeks() []int + Months() []int } // NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor -func NewMonitorHistory(monitor func() float64) MonitorHistory { +func NewMonitorHistory(monitor func() int) MonitorHistory { mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)} mh.Start() return mh @@ -94,32 +84,32 @@ func (mh *monitorHistory) Stop() { } // Minutes returns the last 60 minute monitoring results -func (mh *monitorHistory) Minutes() []float64 { +func (mh *monitorHistory) Minutes() []int { return mh.returnCopy(mh.perMinutePerHour[:]) } // Hours returns the last 24 hourly averages of monitor results -func (mh *monitorHistory) Hours() []float64 { +func (mh *monitorHistory) Hours() []int { return mh.returnCopy(mh.perHourForDay[:]) } // Days returns the last 7 day averages of monitor results -func (mh *monitorHistory) Days() []float64 { +func (mh *monitorHistory) Days() []int { return mh.returnCopy(mh.perDayForWeek[:]) } // Weeks returns the last 4 weeks of averages of monitor results -func (mh *monitorHistory) Weeks() []float64 { +func (mh *monitorHistory) Weeks() []int { return mh.returnCopy(mh.perWeekForMonth[:]) } // Months returns the last 12 months of averages of monitor results -func (mh *monitorHistory) Months() []float64 { +func (mh *monitorHistory) Months() []int { return mh.returnCopy(mh.perMonthForYear[:]) } -func (mh *monitorHistory) returnCopy(slice []float64) []float64 { - retSlice := make([]float64, len(slice)) +func (mh *monitorHistory) returnCopy(slice []int) []int { + retSlice := make([]int, len(slice)) mh.lock.Lock() for i, v := range slice { retSlice[i] = v @@ -128,22 +118,15 @@ func (mh *monitorHistory) returnCopy(slice []float64) []float64 { return retSlice } -func rotateAndAvg(array []float64, newVal float64) float64 { - total := float64(0.0) +func rotateAndAvg(array []int, newVal int) int { + total := 0 for i := len(array) - 1; i > 0; i-- { array[i] = array[i-1] total += array[i] } array[0] = newVal total += newVal - return total / float64(len(array)) -} -func average(array []float64) float64 { - total := float64(0) - for _, x := range array { - total += x - } - return total / float64(len(array)) + return total / len(array) } // monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation @@ -156,26 +139,10 @@ func (mh *monitorHistory) monitorThread() { mh.lock.Lock() minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor()) - - if time.Now().Sub(mh.timeLastHourRotate) > time.Hour { - rotateAndAvg(mh.perHourForDay[:], minuteAvg) - mh.timeLastHourRotate = time.Now() - } - - if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 { - rotateAndAvg(mh.perDayForWeek[:], average(mh.perHourForDay[:])) - mh.timeLastDayRotate = time.Now() - } - - if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 { - rotateAndAvg(mh.perWeekForMonth[:], average(mh.perDayForWeek[:])) - mh.timeLastWeekRotate = time.Now() - } - - if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 { - rotateAndAvg(mh.perMonthForYear[:], average(mh.perWeekForMonth[:])) - mh.timeLastMonthRotate = time.Now() - } + hourAvg := rotateAndAvg(mh.perHourForDay[:], minuteAvg) + dayAvg := rotateAndAvg(mh.perDayForWeek[:], hourAvg) + weekAvg := rotateAndAvg(mh.perWeekForMonth[:], dayAvg) + rotateAndAvg(mh.perMonthForYear[:], weekAvg) mh.lock.Unlock() diff --git a/server/metrics/metrics_test.go b/server/metrics/metrics_test.go index 7ffabac..f6bec06 100644 --- a/server/metrics/metrics_test.go +++ b/server/metrics/metrics_test.go @@ -31,36 +31,36 @@ func TestCounter(t *testing.T) { } func TestMonitorHistory(t *testing.T) { - value := float64(60 * 24 * 7 * 4 * 12) + value := 60 * 24 * 7 * 4 * 12 numGoRoutinesStart := runtime.NumGoroutine() - mh := NewMonitorHistory(func() float64 { return value }) + mh := NewMonitorHistory(func() int { return value }) time.Sleep(1 * time.Second) mh.Stop() time.Sleep(1 * time.Second) minutes := mh.Minutes() if minutes[0] != value { - t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %f Actual: %f", value, minutes[0]) + t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %d Actual: %d", value, minutes[0]) } hours := mh.Hours() if hours[0] != value/60 { - t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %f Actual: %f", value, hours[0]) + t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %d Actual: %d", value, hours[0]) } days := mh.Days() if days[0] != value/60/24 { - t.Errorf("Monitor day's first day slot had the wrong value. Expected: %f Actual: %f", value/24, days[0]) + t.Errorf("Monitor day's first day slot had the wrong value. Expected: %d Actual: %d", value/24, days[0]) } weeks := mh.Weeks() if weeks[0] != value/60/24/7 { - t.Errorf("Monitor week's first day slot had the wrong value. Expected: %f Actual: %f", value/24/7, weeks[0]) + t.Errorf("Monitor week's first day slot had the wrong value. Expected: %d Actual: %d", value/24/7, weeks[0]) } months := mh.Months() if months[0] != 12 { - t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %f", 12, months[0]) + t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %d", 12, months[0]) } if numGoRoutinesStart != runtime.NumGoroutine() { diff --git a/server/metrics/monitors.go b/server/metrics/monitors.go deleted file mode 100644 index 72b0d02..0000000 --- a/server/metrics/monitors.go +++ /dev/null @@ -1,56 +0,0 @@ -package metrics - -import ( - "git.openprivacy.ca/openprivacy/libricochet-go/application" - "github.com/struCoder/pidusage" - "log" - "os" - "time" -) - -// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns -type Monitors struct { - MessageCounter Counter - Messages MonitorHistory - CPU MonitorHistory - Memory MonitorHistory - ClientListenConns MonitorHistory - breakChannel chan bool -} - -// Start initializes a Monitors's monitors -func (mp *Monitors) Start(ra *application.RicochetApplication) { - mp.breakChannel = make(chan bool) - mp.MessageCounter = NewCounter() - mp.Messages = NewMonitorHistory(func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return }) - mp.CPU = NewMonitorHistory(func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) }) - mp.Memory = NewMonitorHistory(func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) }) - mp.ClientListenConns = NewMonitorHistory(func() float64 { return float64(ra.ConnectionCount()) }) - - // Todo: replace with proper reporting - go mp.log() -} - -func (mp *Monitors) log() { - for { - select { - case <-time.After(time.Minute): - messageMinutes := mp.Messages.Minutes() - cpuMinutes := mp.CPU.Minutes() - memoryMinutes := mp.Memory.Minutes() - listenConnsMinutes := mp.ClientListenConns.Minutes() - log.Printf("METRICS: Messages: %.0f ClientListenConns: %.0f CPU: %.2f Memory: %.0f\n", messageMinutes[0], listenConnsMinutes[0], cpuMinutes[0], memoryMinutes[0]) - case <-mp.breakChannel: - return - } - } -} - -// Stop stops all the monitors in a Monitors -func (mp *Monitors) Stop() { - mp.breakChannel <- true - mp.Messages.Stop() - mp.CPU.Stop() - mp.Memory.Stop() - mp.ClientListenConns.Stop() -} diff --git a/server/server.go b/server/server.go index e10825b..d79dd02 100644 --- a/server/server.go +++ b/server/server.go @@ -3,7 +3,6 @@ package server import ( "cwtch.im/cwtch/server/fetch" "cwtch.im/cwtch/server/listen" - "cwtch.im/cwtch/server/metrics" "cwtch.im/cwtch/server/send" "cwtch.im/cwtch/storage" "git.openprivacy.ca/openprivacy/libricochet-go/application" @@ -13,9 +12,8 @@ import ( // Server encapsulates a complete, compliant Cwtch server. type Server struct { - app *application.RicochetApplication - config *Config - metricsPack metrics.Monitors + app *application.RicochetApplication + config *Config } // Run starts a server with the given privateKey @@ -23,7 +21,6 @@ type Server struct { func (s *Server) Run(serverConfig *Config) { s.config = serverConfig cwtchserver := new(application.RicochetApplication) - s.metricsPack.Start(cwtchserver) l, err := application.SetupOnion("127.0.0.1:9051", "tcp4", "", s.config.PrivateKey(), 9878) @@ -34,8 +31,10 @@ func (s *Server) Run(serverConfig *Config) { af := application.ApplicationInstanceFactory{} af.Init() ms := new(storage.MessageStore) - ms.Init("cwtch.messages", s.config.MaxBufferLines, s.metricsPack.MessageCounter) + ms.Init("cwtch.messages", s.config.MaxBufferLines) af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler { + si := new(Instance) + si.Init(rai, cwtchserver, ms) return func() channels.Handler { cslc := new(listen.CwtchServerListenChannel) return cslc @@ -71,5 +70,4 @@ func (s *Server) Run(serverConfig *Config) { // Shutdown kills the app closing all connections and freeing all goroutines func (s *Server) Shutdown() { s.app.Shutdown() - s.metricsPack.Stop() } diff --git a/server/server_instance_test.go b/server/server_instance_test.go index 14afb32..088fdb1 100644 --- a/server/server_instance_test.go +++ b/server/server_instance_test.go @@ -2,7 +2,6 @@ package server import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/server/metrics" "cwtch.im/cwtch/storage" "git.openprivacy.ca/openprivacy/libricochet-go/application" "os" @@ -16,7 +15,7 @@ func TestServerInstance(t *testing.T) { ra := new(application.RicochetApplication) msi := new(storage.MessageStore) os.Remove("ms.test") - msi.Init("ms.test", 5, metrics.NewCounter()) + msi.Init("ms.test", 5) gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."), Spamguard: []byte{}, diff --git a/storage/message_store.go b/storage/message_store.go index 41ed151..7841d8a 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -3,7 +3,6 @@ package storage import ( "bufio" "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/server/metrics" "encoding/json" "fmt" "log" @@ -19,13 +18,12 @@ type MessageStoreInterface interface { // MessageStore is a file-backed implementation of MessageStoreInterface type MessageStore struct { - file *os.File - lock sync.Mutex - messages []*protocol.GroupMessage - messageCounter metrics.Counter - bufferSize int - pos int - rotated bool + file *os.File + lock sync.Mutex + messages []*protocol.GroupMessage + bufferSize int + pos int + rotated bool } // Close closes the message store and underlying resources. @@ -46,7 +44,7 @@ func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) { } // Init sets up a MessageStore of size bufferSize backed by filename -func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) { +func (ms *MessageStore) Init(filename string, bufferSize int) { f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) if err != nil { panic(err) @@ -56,7 +54,6 @@ func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter met ms.bufferSize = bufferSize ms.messages = make([]*protocol.GroupMessage, bufferSize) ms.rotated = false - ms.messageCounter = messageCounter scanner := bufio.NewScanner(f) for scanner.Scan() { @@ -92,7 +89,6 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { // AddMessage adds a GroupMessage to the store func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) { - ms.messageCounter.Add(1) ms.lock.Lock() ms.updateBuffer(&gm) s, err := json.Marshal(gm) diff --git a/storage/message_store_test.go b/storage/message_store_test.go index da10bf9..e6b5407 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -2,7 +2,6 @@ package storage import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/server/metrics" "os" "strconv" "testing" @@ -11,8 +10,7 @@ import ( func TestMessageStore(t *testing.T) { os.Remove("ms.test") ms := new(MessageStore) - counter := metrics.NewCounter() - ms.Init("ms.test", 100000, counter) + ms.Init("ms.test", 100000) for i := 0; i < 50000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), @@ -20,18 +18,13 @@ func TestMessageStore(t *testing.T) { } ms.AddMessage(gm) } - if counter.Count() != 50000 { - t.Errorf("Counter should be at 50000 was %v", counter.Count()) - } ms.Close() - ms.Init("ms.test", 100000, counter) + ms.Init("ms.test", 100000) m := ms.FetchMessages() if len(m) != 50000 { - t.Errorf("Should have been 50000 was %v", len(m)) + t.Errorf("Should have been 5000 was %v", len(m)) } - counter.Reset() - for i := 0; i < 100000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index f48d642..fe72e90 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -28,7 +28,6 @@ var ( carolLines = []string{"Howdy, thanks!"} ) -// TODO: fix to load private key from server/app/serverConfig.json func loadPrivateKey(t *testing.T) *rsa.PrivateKey { if _, err := os.Stat(serverKeyfile); os.IsNotExist(err) { return nil @@ -87,6 +86,7 @@ func serverCheck(t *testing.T, serverAddr string) bool { func waitForPeerConnection(t *testing.T, peer peer.CwtchPeerInterface, server string) { for { servers := peer.GetServers() + fmt.Println(servers) state, ok := servers[server] if ok { if state == connections.FAILED { @@ -298,8 +298,8 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Verify Test ***** - fmt.Println("Final syncing time...") - time.Sleep(time.Second * 30) + // final syncing time... + time.Sleep(time.Second * 15) alicesGroup := alice.GetGroup(groupID) if alicesGroup == nil { @@ -333,37 +333,38 @@ func TestCwtchPeerIntegration(t *testing.T) { if len(alicesGroup.GetTimeline()) != 4 { t.Errorf("Alice's timeline does not have all messages") - } else { - // check message 0,1,2,3 - aliceGroupTimeline := alicesGroup.GetTimeline() - if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || - aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { - t.Errorf("Some of Alice's timeline messages did not have the expected content!") - } + return } if len(bobsGroup.GetTimeline()) != 6 { t.Errorf("Bob's timeline does not have all messages") - } else { - // check message 0,1,2,3,4,5 - bobGroupTimeline := bobsGroup.GetTimeline() - if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || - bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || - bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { - t.Errorf("Some of Bob's timeline messages did not have the expected content!") - } + } + + // check message 0,1,2,3 + aliceGroupTimeline := alicesGroup.GetTimeline() + if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || + aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { + t.Errorf("Some of Alice's timeline messages did not have the expected content!") + } + + // check message 0,1,2,3,4,5 + bobGroupTimeline := bobsGroup.GetTimeline() + if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || + bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || + bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Bob's timeline messages did not have the expected content!") } if len(carolsGroup.GetTimeline()) != 6 { t.Errorf("Carol's timeline does not have all messages") - } else { - // check message 0,1,2,3,4,5 - carolGroupTimeline := carolsGroup.GetTimeline() - if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || - carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || - carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { - t.Errorf("Some of Carol's timeline messages did not have the expected content!") - } + } + + // check message 0,1,2,3,4,5 + carolGroupTimeline := carolsGroup.GetTimeline() + if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || + carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || + carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Carol's timeline messages did not have the expected content!") } fmt.Println("Shutting down Bob...")