forked from cwtch.im/cwtch
Merge branch 'metrics-total' of dan/cwtch into master
This commit is contained in:
commit
8d2a1aaf32
|
@ -61,8 +61,19 @@ const (
|
|||
MegaBytes
|
||||
)
|
||||
|
||||
// MonitorAccumulation controls how monitor data is accumulated over time into larger summary buckets
|
||||
type MonitorAccumulation int
|
||||
|
||||
const (
|
||||
// Cumulative values with sum over time
|
||||
Cumulative MonitorAccumulation = iota
|
||||
// Average values will average over time
|
||||
Average
|
||||
)
|
||||
|
||||
type monitorHistory struct {
|
||||
monitorType MonitorType
|
||||
monitorType MonitorType
|
||||
monitorAccumulation MonitorAccumulation
|
||||
|
||||
starttime time.Time
|
||||
perMinutePerHour [60]float64
|
||||
|
@ -96,8 +107,10 @@ type MonitorHistory interface {
|
|||
}
|
||||
|
||||
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
|
||||
func NewMonitorHistory(t MonitorType, monitor func() float64) MonitorHistory {
|
||||
mh := &monitorHistory{monitorType: t, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)}
|
||||
func NewMonitorHistory(t MonitorType, a MonitorAccumulation, monitor func() float64) MonitorHistory {
|
||||
mh := &monitorHistory{monitorType: t, monitorAccumulation: a, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool),
|
||||
timeLastHourRotate: time.Now(), timeLastDayRotate: time.Now(), timeLastWeekRotate: time.Now(),
|
||||
timeLastMonthRotate: time.Now()}
|
||||
mh.Start()
|
||||
return mh
|
||||
}
|
||||
|
@ -148,7 +161,7 @@ func (mh *monitorHistory) Report(w *bufio.Writer) {
|
|||
func reportLine(t MonitorType, array []float64) string {
|
||||
switch t {
|
||||
case Count:
|
||||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(array)), " "), "[]")
|
||||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.0f", array)), " "), "[]")
|
||||
case Percent:
|
||||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.2f", array)), " "), "[]")
|
||||
case MegaBytes:
|
||||
|
@ -171,7 +184,7 @@ func (mh *monitorHistory) returnCopy(slice []float64) []float64 {
|
|||
return retSlice
|
||||
}
|
||||
|
||||
func rotateAndAvg(array []float64, newVal float64) float64 {
|
||||
func rotateAndAccumulate(array []float64, newVal float64, acc MonitorAccumulation) float64 {
|
||||
total := float64(0.0)
|
||||
for i := len(array) - 1; i > 0; i-- {
|
||||
array[i] = array[i-1]
|
||||
|
@ -179,52 +192,53 @@ func rotateAndAvg(array []float64, newVal float64) float64 {
|
|||
}
|
||||
array[0] = newVal
|
||||
total += newVal
|
||||
if acc == Cumulative {
|
||||
return total
|
||||
}
|
||||
return total / float64(len(array))
|
||||
}
|
||||
func average(array []float64) float64 {
|
||||
func accumulate(array []float64, acc MonitorAccumulation) float64 {
|
||||
total := float64(0)
|
||||
for _, x := range array {
|
||||
total += x
|
||||
}
|
||||
if acc == Cumulative {
|
||||
return total
|
||||
}
|
||||
return total / float64(len(array))
|
||||
}
|
||||
|
||||
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
|
||||
func (mh *monitorHistory) monitorThread() {
|
||||
timeout := time.Duration(0) // first pass right away
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
case <-time.After(time.Minute):
|
||||
mh.lock.Lock()
|
||||
|
||||
minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor())
|
||||
minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation)
|
||||
|
||||
if time.Now().Sub(mh.timeLastHourRotate) > time.Hour {
|
||||
rotateAndAvg(mh.perHourForDay[:], minuteAvg)
|
||||
rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation)
|
||||
mh.timeLastHourRotate = time.Now()
|
||||
}
|
||||
|
||||
if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 {
|
||||
rotateAndAvg(mh.perDayForWeek[:], average(mh.perHourForDay[:]))
|
||||
rotateAndAccumulate(mh.perDayForWeek[:], accumulate(mh.perHourForDay[:], mh.monitorAccumulation), mh.monitorAccumulation)
|
||||
mh.timeLastDayRotate = time.Now()
|
||||
}
|
||||
|
||||
if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 {
|
||||
rotateAndAvg(mh.perWeekForMonth[:], average(mh.perDayForWeek[:]))
|
||||
rotateAndAccumulate(mh.perWeekForMonth[:], accumulate(mh.perDayForWeek[:], mh.monitorAccumulation), mh.monitorAccumulation)
|
||||
mh.timeLastWeekRotate = time.Now()
|
||||
}
|
||||
|
||||
if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 {
|
||||
rotateAndAvg(mh.perMonthForYear[:], average(mh.perWeekForMonth[:]))
|
||||
rotateAndAccumulate(mh.perMonthForYear[:], accumulate(mh.perWeekForMonth[:], mh.monitorAccumulation), mh.monitorAccumulation)
|
||||
mh.timeLastMonthRotate = time.Now()
|
||||
}
|
||||
|
||||
mh.lock.Unlock()
|
||||
|
||||
// Repeat every minute
|
||||
timeout = time.Duration(time.Minute)
|
||||
|
||||
case <-mh.breakChannel:
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
@ -29,41 +28,3 @@ func TestCounter(t *testing.T) {
|
|||
t.Error("counter's starttime was innaccurate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorHistory(t *testing.T) {
|
||||
value := float64(60 * 24 * 7 * 4 * 12)
|
||||
numGoRoutinesStart := runtime.NumGoroutine()
|
||||
mh := NewMonitorHistory(Count, func() float64 { return value })
|
||||
time.Sleep(1 * time.Second)
|
||||
mh.Stop()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
minutes := mh.Minutes()
|
||||
if minutes[0] != value {
|
||||
t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %f Actual: %f", value, minutes[0])
|
||||
}
|
||||
|
||||
hours := mh.Hours()
|
||||
if hours[0] != value/60 {
|
||||
t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %f Actual: %f", value, hours[0])
|
||||
}
|
||||
|
||||
days := mh.Days()
|
||||
if days[0] != value/60/24 {
|
||||
t.Errorf("Monitor day's first day slot had the wrong value. Expected: %f Actual: %f", value/24, days[0])
|
||||
}
|
||||
|
||||
weeks := mh.Weeks()
|
||||
if weeks[0] != value/60/24/7 {
|
||||
t.Errorf("Monitor week's first day slot had the wrong value. Expected: %f Actual: %f", value/24/7, weeks[0])
|
||||
}
|
||||
|
||||
months := mh.Months()
|
||||
if months[0] != 12 {
|
||||
t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %f", 12, months[0])
|
||||
}
|
||||
|
||||
if numGoRoutinesStart != runtime.NumGoroutine() {
|
||||
t.Errorf("Monitor hour did not stop")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,10 +32,10 @@ func (mp *Monitors) Start(ra *application.RicochetApplication, log bool) {
|
|||
mp.starttime = time.Now()
|
||||
mp.breakChannel = make(chan bool)
|
||||
mp.MessageCounter = NewCounter()
|
||||
mp.Messages = NewMonitorHistory(Count, func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return })
|
||||
mp.CPU = NewMonitorHistory(Percent, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) })
|
||||
mp.Memory = NewMonitorHistory(MegaBytes, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) })
|
||||
mp.ClientConns = NewMonitorHistory(Count, func() float64 { return float64(ra.ConnectionCount()) })
|
||||
mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return })
|
||||
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) })
|
||||
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) })
|
||||
mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ra.ConnectionCount()) })
|
||||
|
||||
if mp.log {
|
||||
go mp.run()
|
||||
|
@ -63,7 +63,7 @@ func (mp *Monitors) report() {
|
|||
|
||||
w := bufio.NewWriter(f)
|
||||
|
||||
fmt.Fprintf(w, "Started: %v\n\n", mp.starttime)
|
||||
fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime))
|
||||
|
||||
fmt.Fprintln(w, "Messages:")
|
||||
mp.Messages.Report(w)
|
||||
|
|
Loading…
Reference in New Issue