forked from cwtch.im/cwtch
Merge branch 'server-metrics' of dan/cwtch into master
This commit is contained in:
commit
b4d984cf20
|
@ -96,7 +96,7 @@ func (m *Manager) AttemptReconnections() {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(timeout * time.Second):
|
||||
case <-time.After(timeout):
|
||||
m.lock.Lock()
|
||||
for _, ppc := range m.peerConnections {
|
||||
if ppc.GetState() == FAILED {
|
||||
|
@ -114,7 +114,7 @@ func (m *Manager) AttemptReconnections() {
|
|||
m.lock.Unlock()
|
||||
|
||||
// Launch Another Run In 30 Seconds
|
||||
timeout = time.Duration(30)
|
||||
timeout = time.Duration(30 * time.Second)
|
||||
case <-m.breakChannel:
|
||||
return
|
||||
}
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type counter struct {
|
||||
startTime time.Time
|
||||
count uint64
|
||||
}
|
||||
|
||||
// Counter providers a threadsafe counter to use for storing long running counts
|
||||
type Counter interface {
|
||||
Add(unit uint64)
|
||||
|
||||
Count() uint64
|
||||
GetStarttime() time.Time
|
||||
}
|
||||
|
||||
// NewCounter initializes a counter starting at time.Now() and a count of 0 and returns it
|
||||
func NewCounter() Counter {
|
||||
c := &counter{startTime: time.Now(), count: 0}
|
||||
return c
|
||||
}
|
||||
|
||||
// Add add a count of unit to the counter
|
||||
func (c *counter) Add(unit uint64) {
|
||||
atomic.AddUint64(&c.count, unit)
|
||||
}
|
||||
|
||||
// Count returns the count since Start
|
||||
func (c *counter) Count() uint64 {
|
||||
return atomic.LoadUint64(&c.count)
|
||||
}
|
||||
|
||||
// GetStarttime returns the starttime of the counter
|
||||
func (c *counter) GetStarttime() time.Time {
|
||||
return c.startTime
|
||||
}
|
||||
|
||||
type monitorHistory struct {
|
||||
starttime time.Time
|
||||
perMinutePerHour [60]int
|
||||
perHourForDay [24]int
|
||||
perDayForWeek [7]int
|
||||
perWeekForMonth [4]int
|
||||
perMonthForYear [12]int
|
||||
|
||||
monitor func() int
|
||||
|
||||
breakChannel chan bool
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// MonitorHistory runs a monitor every minute and rotates and averages the results out across time
|
||||
type MonitorHistory interface {
|
||||
Start()
|
||||
Stop()
|
||||
|
||||
Minutes() []int
|
||||
Hours() []int
|
||||
Days() []int
|
||||
Weeks() []int
|
||||
Months() []int
|
||||
}
|
||||
|
||||
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
|
||||
func NewMonitorHistory(monitor func() int) MonitorHistory {
|
||||
mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)}
|
||||
mh.Start()
|
||||
return mh
|
||||
}
|
||||
|
||||
// Start starts a monitorHistory go rountine to run the monitor at intervals and rotate history
|
||||
func (mh *monitorHistory) Start() {
|
||||
go mh.monitorThread()
|
||||
}
|
||||
|
||||
// Stop stops a monitorHistory go routine
|
||||
func (mh *monitorHistory) Stop() {
|
||||
mh.breakChannel <- true
|
||||
}
|
||||
|
||||
// Minutes returns the last 60 minute monitoring results
|
||||
func (mh *monitorHistory) Minutes() []int {
|
||||
return mh.returnCopy(mh.perMinutePerHour[:])
|
||||
}
|
||||
|
||||
// Hours returns the last 24 hourly averages of monitor results
|
||||
func (mh *monitorHistory) Hours() []int {
|
||||
return mh.returnCopy(mh.perHourForDay[:])
|
||||
}
|
||||
|
||||
// Days returns the last 7 day averages of monitor results
|
||||
func (mh *monitorHistory) Days() []int {
|
||||
return mh.returnCopy(mh.perDayForWeek[:])
|
||||
}
|
||||
|
||||
// Weeks returns the last 4 weeks of averages of monitor results
|
||||
func (mh *monitorHistory) Weeks() []int {
|
||||
return mh.returnCopy(mh.perWeekForMonth[:])
|
||||
}
|
||||
|
||||
// Months returns the last 12 months of averages of monitor results
|
||||
func (mh *monitorHistory) Months() []int {
|
||||
return mh.returnCopy(mh.perMonthForYear[:])
|
||||
}
|
||||
|
||||
func (mh *monitorHistory) returnCopy(slice []int) []int {
|
||||
retSlice := make([]int, len(slice))
|
||||
mh.lock.Lock()
|
||||
for i, v := range slice {
|
||||
retSlice[i] = v
|
||||
}
|
||||
mh.lock.Unlock()
|
||||
return retSlice
|
||||
}
|
||||
|
||||
func rotateAndAvg(array []int, newVal int) int {
|
||||
total := 0
|
||||
for i := len(array) - 1; i > 0; i-- {
|
||||
array[i] = array[i-1]
|
||||
total += array[i]
|
||||
}
|
||||
array[0] = newVal
|
||||
total += newVal
|
||||
return total / len(array)
|
||||
}
|
||||
|
||||
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
|
||||
func (mh *monitorHistory) monitorThread() {
|
||||
timeout := time.Duration(0) // first pass right away
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
mh.lock.Lock()
|
||||
|
||||
minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor())
|
||||
hourAvg := rotateAndAvg(mh.perHourForDay[:], minuteAvg)
|
||||
dayAvg := rotateAndAvg(mh.perDayForWeek[:], hourAvg)
|
||||
weekAvg := rotateAndAvg(mh.perWeekForMonth[:], dayAvg)
|
||||
rotateAndAvg(mh.perMonthForYear[:], weekAvg)
|
||||
|
||||
mh.lock.Unlock()
|
||||
|
||||
// Repeat every minute
|
||||
timeout = time.Duration(time.Minute)
|
||||
|
||||
case <-mh.breakChannel:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCounter(t *testing.T) {
|
||||
starttime := time.Now()
|
||||
c := NewCounter()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
c.Add(1)
|
||||
}()
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
val := c.Count()
|
||||
if val != 100 {
|
||||
t.Errorf("counter count was not 100")
|
||||
}
|
||||
|
||||
counterStart := c.GetStarttime()
|
||||
|
||||
if counterStart.Sub(starttime) > time.Millisecond {
|
||||
t.Error("counter's starttime was innaccurate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorHistory(t *testing.T) {
|
||||
value := 60 * 24 * 7 * 4 * 12
|
||||
numGoRoutinesStart := runtime.NumGoroutine()
|
||||
mh := NewMonitorHistory(func() int { return value })
|
||||
time.Sleep(1 * time.Second)
|
||||
mh.Stop()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
minutes := mh.Minutes()
|
||||
if minutes[0] != value {
|
||||
t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %d Actual: %d", value, minutes[0])
|
||||
}
|
||||
|
||||
hours := mh.Hours()
|
||||
if hours[0] != value/60 {
|
||||
t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %d Actual: %d", value, hours[0])
|
||||
}
|
||||
|
||||
days := mh.Days()
|
||||
if days[0] != value/60/24 {
|
||||
t.Errorf("Monitor day's first day slot had the wrong value. Expected: %d Actual: %d", value/24, days[0])
|
||||
}
|
||||
|
||||
weeks := mh.Weeks()
|
||||
if weeks[0] != value/60/24/7 {
|
||||
t.Errorf("Monitor week's first day slot had the wrong value. Expected: %d Actual: %d", value/24/7, weeks[0])
|
||||
}
|
||||
|
||||
months := mh.Months()
|
||||
if months[0] != 12 {
|
||||
t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %d", 12, months[0])
|
||||
}
|
||||
|
||||
if numGoRoutinesStart != runtime.NumGoroutine() {
|
||||
t.Errorf("Monitor hour did not stop")
|
||||
}
|
||||
}
|
|
@ -14,6 +14,7 @@ go test ${1} -coverprofile=peer.cover.out -v ./peer
|
|||
go test ${1} -coverprofile=server.fetch.cover.out -v ./server/fetch
|
||||
go test ${1} -coverprofile=server.listen.cover.out -v ./server/listen
|
||||
go test ${1} -coverprofile=server.send.cover.out -v ./server/send
|
||||
go test ${1} -coverprofile=server.metrics.cover.out -v ./server/metrics
|
||||
go test ${1} -coverprofile=server.cover.out -v ./server
|
||||
echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \
|
||||
awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out
|
||||
|
|
Loading…
Reference in New Issue