diff --git a/peer/connections/connectionsmanager.go b/peer/connections/connectionsmanager.go index eb148ee..a4438ad 100644 --- a/peer/connections/connectionsmanager.go +++ b/peer/connections/connectionsmanager.go @@ -96,7 +96,7 @@ func (m *Manager) AttemptReconnections() { for { select { - case <-time.After(timeout * time.Second): + case <-time.After(timeout): m.lock.Lock() for _, ppc := range m.peerConnections { if ppc.GetState() == FAILED { @@ -114,7 +114,7 @@ func (m *Manager) AttemptReconnections() { m.lock.Unlock() // Launch Another Run In 30 Seconds - timeout = time.Duration(30) + timeout = time.Duration(30 * time.Second) case <-m.breakChannel: return } diff --git a/server/metrics/metrics.go b/server/metrics/metrics.go new file mode 100644 index 0000000..13a53e9 --- /dev/null +++ b/server/metrics/metrics.go @@ -0,0 +1,156 @@ +package metrics + +import ( + "sync" + "sync/atomic" + "time" +) + +type counter struct { + startTime time.Time + count uint64 +} + +// Counter providers a threadsafe counter to use for storing long running counts +type Counter interface { + Add(unit uint64) + + Count() uint64 + GetStarttime() time.Time +} + +// NewCounter initializes a counter starting at time.Now() and a count of 0 and returns it +func NewCounter() Counter { + c := &counter{startTime: time.Now(), count: 0} + return c +} + +// Add add a count of unit to the counter +func (c *counter) Add(unit uint64) { + atomic.AddUint64(&c.count, unit) +} + +// Count returns the count since Start +func (c *counter) Count() uint64 { + return atomic.LoadUint64(&c.count) +} + +// GetStarttime returns the starttime of the counter +func (c *counter) GetStarttime() time.Time { + return c.startTime +} + +type monitorHistory struct { + starttime time.Time + perMinutePerHour [60]int + perHourForDay [24]int + perDayForWeek [7]int + perWeekForMonth [4]int + perMonthForYear [12]int + + monitor func() int + + breakChannel chan bool + lock sync.Mutex +} + +// MonitorHistory runs a monitor every minute and rotates and averages the results out across time +type MonitorHistory interface { + Start() + Stop() + + Minutes() []int + Hours() []int + Days() []int + Weeks() []int + Months() []int +} + +// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor +func NewMonitorHistory(monitor func() int) MonitorHistory { + mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)} + mh.Start() + return mh +} + +// Start starts a monitorHistory go rountine to run the monitor at intervals and rotate history +func (mh *monitorHistory) Start() { + go mh.monitorThread() +} + +// Stop stops a monitorHistory go routine +func (mh *monitorHistory) Stop() { + mh.breakChannel <- true +} + +// Minutes returns the last 60 minute monitoring results +func (mh *monitorHistory) Minutes() []int { + return mh.returnCopy(mh.perMinutePerHour[:]) +} + +// Hours returns the last 24 hourly averages of monitor results +func (mh *monitorHistory) Hours() []int { + return mh.returnCopy(mh.perHourForDay[:]) +} + +// Days returns the last 7 day averages of monitor results +func (mh *monitorHistory) Days() []int { + return mh.returnCopy(mh.perDayForWeek[:]) +} + +// Weeks returns the last 4 weeks of averages of monitor results +func (mh *monitorHistory) Weeks() []int { + return mh.returnCopy(mh.perWeekForMonth[:]) +} + +// Months returns the last 12 months of averages of monitor results +func (mh *monitorHistory) Months() []int { + return mh.returnCopy(mh.perMonthForYear[:]) +} + +func (mh *monitorHistory) returnCopy(slice []int) []int { + retSlice := make([]int, len(slice)) + mh.lock.Lock() + for i, v := range slice { + retSlice[i] = v + } + mh.lock.Unlock() + return retSlice +} + +func rotateAndAvg(array []int, newVal int) int { + total := 0 + for i := len(array) - 1; i > 0; i-- { + array[i] = array[i-1] + total += array[i] + } + array[0] = newVal + total += newVal + return total / len(array) +} + +// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation +func (mh *monitorHistory) monitorThread() { + timeout := time.Duration(0) // first pass right away + + for { + select { + case <-time.After(timeout): + mh.lock.Lock() + + minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor()) + hourAvg := rotateAndAvg(mh.perHourForDay[:], minuteAvg) + dayAvg := rotateAndAvg(mh.perDayForWeek[:], hourAvg) + weekAvg := rotateAndAvg(mh.perWeekForMonth[:], dayAvg) + rotateAndAvg(mh.perMonthForYear[:], weekAvg) + + mh.lock.Unlock() + + // Repeat every minute + timeout = time.Duration(time.Minute) + + case <-mh.breakChannel: + return + } + } +} diff --git a/server/metrics/metrics_test.go b/server/metrics/metrics_test.go new file mode 100644 index 0000000..f6bec06 --- /dev/null +++ b/server/metrics/metrics_test.go @@ -0,0 +1,69 @@ +package metrics + +import ( + "runtime" + "testing" + "time" +) + +func TestCounter(t *testing.T) { + starttime := time.Now() + c := NewCounter() + + for i := 0; i < 100; i++ { + go func() { + c.Add(1) + }() + } + + time.Sleep(2 * time.Second) + + val := c.Count() + if val != 100 { + t.Errorf("counter count was not 100") + } + + counterStart := c.GetStarttime() + + if counterStart.Sub(starttime) > time.Millisecond { + t.Error("counter's starttime was innaccurate") + } +} + +func TestMonitorHistory(t *testing.T) { + value := 60 * 24 * 7 * 4 * 12 + numGoRoutinesStart := runtime.NumGoroutine() + mh := NewMonitorHistory(func() int { return value }) + time.Sleep(1 * time.Second) + mh.Stop() + time.Sleep(1 * time.Second) + + minutes := mh.Minutes() + if minutes[0] != value { + t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %d Actual: %d", value, minutes[0]) + } + + hours := mh.Hours() + if hours[0] != value/60 { + t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %d Actual: %d", value, hours[0]) + } + + days := mh.Days() + if days[0] != value/60/24 { + t.Errorf("Monitor day's first day slot had the wrong value. Expected: %d Actual: %d", value/24, days[0]) + } + + weeks := mh.Weeks() + if weeks[0] != value/60/24/7 { + t.Errorf("Monitor week's first day slot had the wrong value. Expected: %d Actual: %d", value/24/7, weeks[0]) + } + + months := mh.Months() + if months[0] != 12 { + t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %d", 12, months[0]) + } + + if numGoRoutinesStart != runtime.NumGoroutine() { + t.Errorf("Monitor hour did not stop") + } +} diff --git a/testing/tests.sh b/testing/tests.sh index 25e238f..2ce493c 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -14,6 +14,7 @@ go test ${1} -coverprofile=peer.cover.out -v ./peer go test ${1} -coverprofile=server.fetch.cover.out -v ./server/fetch go test ${1} -coverprofile=server.listen.cover.out -v ./server/listen go test ${1} -coverprofile=server.send.cover.out -v ./server/send +go test ${1} -coverprofile=server.metrics.cover.out -v ./server/metrics go test ${1} -coverprofile=server.cover.out -v ./server echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \ awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out