ability for metrics to total or average; monitorHistory test will have to be integ test; uptime instead of starttime

This commit is contained in:
Dan Ballard 2018-06-28 12:36:50 -07:00
parent bdd8f9a5fd
commit f416a3756e
3 changed files with 36 additions and 61 deletions

View File

@ -61,8 +61,19 @@ const (
MegaBytes
)
// MonitorAccumulation controls how monitor data is accumulated over time into larger summary buckets
type MonitorAccumulation int
const (
// Cumulative values with sum over time
Cumulative MonitorAccumulation = iota
// Average values will average over time
Average
)
type monitorHistory struct {
monitorType MonitorType
monitorType MonitorType
monitorAccumulation MonitorAccumulation
starttime time.Time
perMinutePerHour [60]float64
@ -96,8 +107,10 @@ type MonitorHistory interface {
}
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
func NewMonitorHistory(t MonitorType, monitor func() float64) MonitorHistory {
mh := &monitorHistory{monitorType: t, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)}
func NewMonitorHistory(t MonitorType, a MonitorAccumulation, monitor func() float64) MonitorHistory {
mh := &monitorHistory{monitorType: t, monitorAccumulation: a, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool),
timeLastHourRotate: time.Now(), timeLastDayRotate: time.Now(), timeLastWeekRotate: time.Now(),
timeLastMonthRotate: time.Now()}
mh.Start()
return mh
}
@ -148,7 +161,7 @@ func (mh *monitorHistory) Report(w *bufio.Writer) {
func reportLine(t MonitorType, array []float64) string {
switch t {
case Count:
return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(array)), " "), "[]")
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.0f", array)), " "), "[]")
case Percent:
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.2f", array)), " "), "[]")
case MegaBytes:
@ -171,7 +184,7 @@ func (mh *monitorHistory) returnCopy(slice []float64) []float64 {
return retSlice
}
func rotateAndAvg(array []float64, newVal float64) float64 {
func rotateAndAccumulate(array []float64, newVal float64, acc MonitorAccumulation) float64 {
total := float64(0.0)
for i := len(array) - 1; i > 0; i-- {
array[i] = array[i-1]
@ -179,52 +192,53 @@ func rotateAndAvg(array []float64, newVal float64) float64 {
}
array[0] = newVal
total += newVal
if acc == Cumulative {
return total
}
return total / float64(len(array))
}
func average(array []float64) float64 {
func accumulate(array []float64, acc MonitorAccumulation) float64 {
total := float64(0)
for _, x := range array {
total += x
}
if acc == Cumulative {
return total
}
return total / float64(len(array))
}
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
func (mh *monitorHistory) monitorThread() {
timeout := time.Duration(0) // first pass right away
for {
select {
case <-time.After(timeout):
case <-time.After(time.Minute):
mh.lock.Lock()
minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor())
minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation)
if time.Now().Sub(mh.timeLastHourRotate) > time.Hour {
rotateAndAvg(mh.perHourForDay[:], minuteAvg)
rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation)
mh.timeLastHourRotate = time.Now()
}
if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 {
rotateAndAvg(mh.perDayForWeek[:], average(mh.perHourForDay[:]))
rotateAndAccumulate(mh.perDayForWeek[:], accumulate(mh.perHourForDay[:], mh.monitorAccumulation), mh.monitorAccumulation)
mh.timeLastDayRotate = time.Now()
}
if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 {
rotateAndAvg(mh.perWeekForMonth[:], average(mh.perDayForWeek[:]))
rotateAndAccumulate(mh.perWeekForMonth[:], accumulate(mh.perDayForWeek[:], mh.monitorAccumulation), mh.monitorAccumulation)
mh.timeLastWeekRotate = time.Now()
}
if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 {
rotateAndAvg(mh.perMonthForYear[:], average(mh.perWeekForMonth[:]))
rotateAndAccumulate(mh.perMonthForYear[:], accumulate(mh.perWeekForMonth[:], mh.monitorAccumulation), mh.monitorAccumulation)
mh.timeLastMonthRotate = time.Now()
}
mh.lock.Unlock()
// Repeat every minute
timeout = time.Duration(time.Minute)
case <-mh.breakChannel:
return
}

View File

@ -1,7 +1,6 @@
package metrics
import (
"runtime"
"testing"
"time"
)
@ -29,41 +28,3 @@ func TestCounter(t *testing.T) {
t.Error("counter's starttime was innaccurate")
}
}
func TestMonitorHistory(t *testing.T) {
value := float64(60 * 24 * 7 * 4 * 12)
numGoRoutinesStart := runtime.NumGoroutine()
mh := NewMonitorHistory(Count, func() float64 { return value })
time.Sleep(1 * time.Second)
mh.Stop()
time.Sleep(1 * time.Second)
minutes := mh.Minutes()
if minutes[0] != value {
t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %f Actual: %f", value, minutes[0])
}
hours := mh.Hours()
if hours[0] != value/60 {
t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %f Actual: %f", value, hours[0])
}
days := mh.Days()
if days[0] != value/60/24 {
t.Errorf("Monitor day's first day slot had the wrong value. Expected: %f Actual: %f", value/24, days[0])
}
weeks := mh.Weeks()
if weeks[0] != value/60/24/7 {
t.Errorf("Monitor week's first day slot had the wrong value. Expected: %f Actual: %f", value/24/7, weeks[0])
}
months := mh.Months()
if months[0] != 12 {
t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %f", 12, months[0])
}
if numGoRoutinesStart != runtime.NumGoroutine() {
t.Errorf("Monitor hour did not stop")
}
}

View File

@ -32,10 +32,10 @@ func (mp *Monitors) Start(ra *application.RicochetApplication, log bool) {
mp.starttime = time.Now()
mp.breakChannel = make(chan bool)
mp.MessageCounter = NewCounter()
mp.Messages = NewMonitorHistory(Count, func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return })
mp.CPU = NewMonitorHistory(Percent, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) })
mp.Memory = NewMonitorHistory(MegaBytes, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) })
mp.ClientConns = NewMonitorHistory(Count, func() float64 { return float64(ra.ConnectionCount()) })
mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return })
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) })
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) })
mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ra.ConnectionCount()) })
if mp.log {
go mp.run()
@ -63,7 +63,7 @@ func (mp *Monitors) report() {
w := bufio.NewWriter(f)
fmt.Fprintf(w, "Started: %v\n\n", mp.starttime)
fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime))
fmt.Fprintln(w, "Messages:")
mp.Messages.Report(w)