Merge branch 'server-metics-fix' of dan/cwtch into master

This commit is contained in:
Sarah Jamie Lewis 2018-06-27 16:46:00 +00:00 committad av Gogs
förälder 21817f936d b70fdf1af2
incheckning 9ab582750f
10 ändrade filer med 272 tillägg och 90 borttagningar

Visa fil

@ -85,7 +85,7 @@ func TestPeerPeerConnection(t *testing.T) {
t.Errorf("Onion address error %v", err)
}
profile := model.GenerateNewProfile("sarah")
profile := model.GenerateNewProfile("alice")
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile)
//numcalls := 0
tp := new(TestPeer)

Visa fil

@ -1,7 +1,7 @@
{
"maxBufferLines": 100000
serverReporting: {
reportingGroupId: ""
reportingServerAddr: ""
"maxBufferLines": 100000,
"serverReporting": {
"reportingGroupId": "",
"reportingServerAddr": ""
}
}

Visa fil

@ -1,6 +1,9 @@
package metrics
import (
"bufio"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
@ -13,9 +16,10 @@ type counter struct {
// Counter providers a threadsafe counter to use for storing long running counts
type Counter interface {
Add(unit uint64)
Add(unit int)
Reset()
Count() uint64
Count() int
GetStarttime() time.Time
}
@ -26,13 +30,18 @@ func NewCounter() Counter {
}
// Add add a count of unit to the counter
func (c *counter) Add(unit uint64) {
atomic.AddUint64(&c.count, unit)
func (c *counter) Add(unit int) {
atomic.AddUint64(&c.count, uint64(unit))
}
// Count returns the count since Start
func (c *counter) Count() uint64 {
return atomic.LoadUint64(&c.count)
func (c *counter) Count() int {
return int(atomic.LoadUint64(&c.count))
}
func (c *counter) Reset() {
atomic.StoreUint64(&c.count, 0)
c.startTime = time.Now()
}
// GetStarttime returns the starttime of the counter
@ -40,15 +49,33 @@ func (c *counter) GetStarttime() time.Time {
return c.startTime
}
type monitorHistory struct {
starttime time.Time
perMinutePerHour [60]int
perHourForDay [24]int
perDayForWeek [7]int
perWeekForMonth [4]int
perMonthForYear [12]int
// MonitorType controls how the monitor will report itself
type MonitorType int
monitor func() int
const (
// Count indicates the monitor should report in interger format
Count MonitorType = iota
// Percent indicates the monitor should report in decimal format with 2 places
Percent
// MegaBytes indicates the monitor should transform the raw number into MBs
MegaBytes
)
type monitorHistory struct {
monitorType MonitorType
starttime time.Time
perMinutePerHour [60]float64
timeLastHourRotate time.Time
perHourForDay [24]float64
timeLastDayRotate time.Time
perDayForWeek [7]float64
timeLastWeekRotate time.Time
perWeekForMonth [4]float64
timeLastMonthRotate time.Time
perMonthForYear [12]float64
monitor func() float64
breakChannel chan bool
lock sync.Mutex
@ -59,16 +86,18 @@ type MonitorHistory interface {
Start()
Stop()
Minutes() []int
Hours() []int
Days() []int
Weeks() []int
Months() []int
Minutes() []float64
Hours() []float64
Days() []float64
Weeks() []float64
Months() []float64
Report(w *bufio.Writer)
}
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
func NewMonitorHistory(monitor func() int) MonitorHistory {
mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)}
func NewMonitorHistory(t MonitorType, monitor func() float64) MonitorHistory {
mh := &monitorHistory{monitorType: t, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)}
mh.Start()
return mh
}
@ -84,32 +113,56 @@ func (mh *monitorHistory) Stop() {
}
// Minutes returns the last 60 minute monitoring results
func (mh *monitorHistory) Minutes() []int {
func (mh *monitorHistory) Minutes() []float64 {
return mh.returnCopy(mh.perMinutePerHour[:])
}
// Hours returns the last 24 hourly averages of monitor results
func (mh *monitorHistory) Hours() []int {
func (mh *monitorHistory) Hours() []float64 {
return mh.returnCopy(mh.perHourForDay[:])
}
// Days returns the last 7 day averages of monitor results
func (mh *monitorHistory) Days() []int {
func (mh *monitorHistory) Days() []float64 {
return mh.returnCopy(mh.perDayForWeek[:])
}
// Weeks returns the last 4 weeks of averages of monitor results
func (mh *monitorHistory) Weeks() []int {
func (mh *monitorHistory) Weeks() []float64 {
return mh.returnCopy(mh.perWeekForMonth[:])
}
// Months returns the last 12 months of averages of monitor results
func (mh *monitorHistory) Months() []int {
func (mh *monitorHistory) Months() []float64 {
return mh.returnCopy(mh.perMonthForYear[:])
}
func (mh *monitorHistory) returnCopy(slice []int) []int {
retSlice := make([]int, len(slice))
func (mh *monitorHistory) Report(w *bufio.Writer) {
fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:]))
fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:]))
fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:]))
fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:]))
fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:]))
}
func reportLine(t MonitorType, array []float64) string {
switch t {
case Count:
return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(array)), " "), "[]")
case Percent:
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.2f", array)), " "), "[]")
case MegaBytes:
mbs := make([]int, len(array))
for i, b := range array {
mbs[i] = int(b) / 1024 / 1024
}
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%d", mbs)), "MBs "), "[]") + "MBs"
}
return ""
}
func (mh *monitorHistory) returnCopy(slice []float64) []float64 {
retSlice := make([]float64, len(slice))
mh.lock.Lock()
for i, v := range slice {
retSlice[i] = v
@ -118,15 +171,22 @@ func (mh *monitorHistory) returnCopy(slice []int) []int {
return retSlice
}
func rotateAndAvg(array []int, newVal int) int {
total := 0
func rotateAndAvg(array []float64, newVal float64) float64 {
total := float64(0.0)
for i := len(array) - 1; i > 0; i-- {
array[i] = array[i-1]
total += array[i]
}
array[0] = newVal
total += newVal
return total / len(array)
return total / float64(len(array))
}
func average(array []float64) float64 {
total := float64(0)
for _, x := range array {
total += x
}
return total / float64(len(array))
}
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
@ -139,10 +199,26 @@ func (mh *monitorHistory) monitorThread() {
mh.lock.Lock()
minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor())
hourAvg := rotateAndAvg(mh.perHourForDay[:], minuteAvg)
dayAvg := rotateAndAvg(mh.perDayForWeek[:], hourAvg)
weekAvg := rotateAndAvg(mh.perWeekForMonth[:], dayAvg)
rotateAndAvg(mh.perMonthForYear[:], weekAvg)
if time.Now().Sub(mh.timeLastHourRotate) > time.Hour {
rotateAndAvg(mh.perHourForDay[:], minuteAvg)
mh.timeLastHourRotate = time.Now()
}
if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 {
rotateAndAvg(mh.perDayForWeek[:], average(mh.perHourForDay[:]))
mh.timeLastDayRotate = time.Now()
}
if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 {
rotateAndAvg(mh.perWeekForMonth[:], average(mh.perDayForWeek[:]))
mh.timeLastWeekRotate = time.Now()
}
if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 {
rotateAndAvg(mh.perMonthForYear[:], average(mh.perWeekForMonth[:]))
mh.timeLastMonthRotate = time.Now()
}
mh.lock.Unlock()

Visa fil

@ -31,36 +31,36 @@ func TestCounter(t *testing.T) {
}
func TestMonitorHistory(t *testing.T) {
value := 60 * 24 * 7 * 4 * 12
value := float64(60 * 24 * 7 * 4 * 12)
numGoRoutinesStart := runtime.NumGoroutine()
mh := NewMonitorHistory(func() int { return value })
mh := NewMonitorHistory(Count, func() float64 { return value })
time.Sleep(1 * time.Second)
mh.Stop()
time.Sleep(1 * time.Second)
minutes := mh.Minutes()
if minutes[0] != value {
t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %d Actual: %d", value, minutes[0])
t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %f Actual: %f", value, minutes[0])
}
hours := mh.Hours()
if hours[0] != value/60 {
t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %d Actual: %d", value, hours[0])
t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %f Actual: %f", value, hours[0])
}
days := mh.Days()
if days[0] != value/60/24 {
t.Errorf("Monitor day's first day slot had the wrong value. Expected: %d Actual: %d", value/24, days[0])
t.Errorf("Monitor day's first day slot had the wrong value. Expected: %f Actual: %f", value/24, days[0])
}
weeks := mh.Weeks()
if weeks[0] != value/60/24/7 {
t.Errorf("Monitor week's first day slot had the wrong value. Expected: %d Actual: %d", value/24/7, weeks[0])
t.Errorf("Monitor week's first day slot had the wrong value. Expected: %f Actual: %f", value/24/7, weeks[0])
}
months := mh.Months()
if months[0] != 12 {
t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %d", 12, months[0])
t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %f", 12, months[0])
}
if numGoRoutinesStart != runtime.NumGoroutine() {

Visa fil

@ -0,0 +1,93 @@
package metrics
import (
"bufio"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"github.com/struCoder/pidusage"
"log"
"os"
"time"
)
const (
reportFile = "serverMonitorReport.txt"
)
// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns
type Monitors struct {
MessageCounter Counter
Messages MonitorHistory
CPU MonitorHistory
Memory MonitorHistory
ClientConns MonitorHistory
starttime time.Time
breakChannel chan bool
}
// Start initializes a Monitors's monitors
func (mp *Monitors) Start(ra *application.RicochetApplication) {
mp.starttime = time.Now()
mp.breakChannel = make(chan bool)
mp.MessageCounter = NewCounter()
mp.Messages = NewMonitorHistory(Count, func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return })
mp.CPU = NewMonitorHistory(Percent, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) })
mp.Memory = NewMonitorHistory(MegaBytes, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) })
mp.ClientConns = NewMonitorHistory(Count, func() float64 { return float64(ra.ConnectionCount()) })
// Todo: replace with proper reporting
go mp.log()
}
func (mp *Monitors) log() {
for {
select {
case <-time.After(time.Minute):
messageMinutes := mp.Messages.Minutes()
cpuMinutes := mp.CPU.Minutes()
memoryMinutes := mp.Memory.Minutes()
listenConnsMinutes := mp.ClientConns.Minutes()
log.Printf("METRICS: Messages: %.0f ClientConns: %.0f CPU: %.2f Memory: %.0fMBs\n", messageMinutes[0], listenConnsMinutes[0], cpuMinutes[0], memoryMinutes[0]/1024/1024)
mp.report()
case <-mp.breakChannel:
return
}
}
}
func (mp *Monitors) report() {
f, err := os.Create(reportFile)
if err != nil {
log.Println("ERROR: Could not open monitor reporting file: ", err)
return
}
defer f.Close()
w := bufio.NewWriter(f)
fmt.Fprintf(w, "Started: %v\n\n", mp.starttime)
fmt.Fprintln(w, "Messages:")
mp.Messages.Report(w)
fmt.Fprintln(w, "\nClient Connections:")
mp.ClientConns.Report(w)
fmt.Fprintln(w, "\nCPU:")
mp.CPU.Report(w)
fmt.Fprintln(w, "\nMemory:")
mp.Memory.Report(w)
w.Flush()
}
// Stop stops all the monitors in a Monitors
func (mp *Monitors) Stop() {
mp.breakChannel <- true
mp.Messages.Stop()
mp.CPU.Stop()
mp.Memory.Stop()
mp.ClientConns.Stop()
}

Visa fil

@ -3,6 +3,7 @@ package server
import (
"cwtch.im/cwtch/server/fetch"
"cwtch.im/cwtch/server/listen"
"cwtch.im/cwtch/server/metrics"
"cwtch.im/cwtch/server/send"
"cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
@ -12,8 +13,9 @@ import (
// Server encapsulates a complete, compliant Cwtch server.
type Server struct {
app *application.RicochetApplication
config *Config
app *application.RicochetApplication
config *Config
metricsPack metrics.Monitors
}
// Run starts a server with the given privateKey
@ -21,6 +23,7 @@ type Server struct {
func (s *Server) Run(serverConfig *Config) {
s.config = serverConfig
cwtchserver := new(application.RicochetApplication)
s.metricsPack.Start(cwtchserver)
l, err := application.SetupOnion("127.0.0.1:9051", "tcp4", "", s.config.PrivateKey(), 9878)
@ -31,10 +34,8 @@ func (s *Server) Run(serverConfig *Config) {
af := application.ApplicationInstanceFactory{}
af.Init()
ms := new(storage.MessageStore)
ms.Init("cwtch.messages", s.config.MaxBufferLines)
ms.Init("cwtch.messages", s.config.MaxBufferLines, s.metricsPack.MessageCounter)
af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler {
si := new(Instance)
si.Init(rai, cwtchserver, ms)
return func() channels.Handler {
cslc := new(listen.CwtchServerListenChannel)
return cslc
@ -70,4 +71,5 @@ func (s *Server) Run(serverConfig *Config) {
// Shutdown kills the app closing all connections and freeing all goroutines
func (s *Server) Shutdown() {
s.app.Shutdown()
s.metricsPack.Stop()
}

Visa fil

@ -2,6 +2,7 @@ package server
import (
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/server/metrics"
"cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"os"
@ -15,7 +16,7 @@ func TestServerInstance(t *testing.T) {
ra := new(application.RicochetApplication)
msi := new(storage.MessageStore)
os.Remove("ms.test")
msi.Init("ms.test", 5)
msi.Init("ms.test", 5, metrics.NewCounter())
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."),
Spamguard: []byte{},

Visa fil

@ -3,6 +3,7 @@ package storage
import (
"bufio"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/server/metrics"
"encoding/json"
"fmt"
"log"
@ -18,12 +19,13 @@ type MessageStoreInterface interface {
// MessageStore is a file-backed implementation of MessageStoreInterface
type MessageStore struct {
file *os.File
lock sync.Mutex
messages []*protocol.GroupMessage
bufferSize int
pos int
rotated bool
file *os.File
lock sync.Mutex
messages []*protocol.GroupMessage
messageCounter metrics.Counter
bufferSize int
pos int
rotated bool
}
// Close closes the message store and underlying resources.
@ -44,7 +46,7 @@ func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
}
// Init sets up a MessageStore of size bufferSize backed by filename
func (ms *MessageStore) Init(filename string, bufferSize int) {
func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil {
panic(err)
@ -54,6 +56,7 @@ func (ms *MessageStore) Init(filename string, bufferSize int) {
ms.bufferSize = bufferSize
ms.messages = make([]*protocol.GroupMessage, bufferSize)
ms.rotated = false
ms.messageCounter = messageCounter
scanner := bufio.NewScanner(f)
for scanner.Scan() {
@ -89,6 +92,7 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
// AddMessage adds a GroupMessage to the store
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
ms.messageCounter.Add(1)
ms.lock.Lock()
ms.updateBuffer(&gm)
s, err := json.Marshal(gm)

Visa fil

@ -2,6 +2,7 @@ package storage
import (
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/server/metrics"
"os"
"strconv"
"testing"
@ -10,7 +11,8 @@ import (
func TestMessageStore(t *testing.T) {
os.Remove("ms.test")
ms := new(MessageStore)
ms.Init("ms.test", 100000)
counter := metrics.NewCounter()
ms.Init("ms.test", 100000, counter)
for i := 0; i < 50000; i++ {
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
@ -18,13 +20,18 @@ func TestMessageStore(t *testing.T) {
}
ms.AddMessage(gm)
}
if counter.Count() != 50000 {
t.Errorf("Counter should be at 50000 was %v", counter.Count())
}
ms.Close()
ms.Init("ms.test", 100000)
ms.Init("ms.test", 100000, counter)
m := ms.FetchMessages()
if len(m) != 50000 {
t.Errorf("Should have been 5000 was %v", len(m))
t.Errorf("Should have been 50000 was %v", len(m))
}
counter.Reset()
for i := 0; i < 100000; i++ {
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),

Visa fil

@ -28,6 +28,7 @@ var (
carolLines = []string{"Howdy, thanks!"}
)
// TODO: fix to load private key from server/app/serverConfig.json
func loadPrivateKey(t *testing.T) *rsa.PrivateKey {
if _, err := os.Stat(serverKeyfile); os.IsNotExist(err) {
return nil
@ -86,7 +87,6 @@ func serverCheck(t *testing.T, serverAddr string) bool {
func waitForPeerConnection(t *testing.T, peer peer.CwtchPeerInterface, server string) {
for {
servers := peer.GetServers()
fmt.Println(servers)
state, ok := servers[server]
if ok {
if state == connections.FAILED {
@ -298,8 +298,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** Verify Test *****
// final syncing time...
time.Sleep(time.Second * 15)
fmt.Println("Final syncing time...")
time.Sleep(time.Second * 30)
alicesGroup := alice.GetGroup(groupID)
if alicesGroup == nil {
@ -333,38 +333,37 @@ func TestCwtchPeerIntegration(t *testing.T) {
if len(alicesGroup.GetTimeline()) != 4 {
t.Errorf("Alice's timeline does not have all messages")
return
} else {
// check message 0,1,2,3
aliceGroupTimeline := alicesGroup.GetTimeline()
if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
t.Errorf("Some of Alice's timeline messages did not have the expected content!")
}
}
if len(bobsGroup.GetTimeline()) != 6 {
t.Errorf("Bob's timeline does not have all messages")
}
// check message 0,1,2,3
aliceGroupTimeline := alicesGroup.GetTimeline()
if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
t.Errorf("Some of Alice's timeline messages did not have the expected content!")
}
// check message 0,1,2,3,4,5
bobGroupTimeline := bobsGroup.GetTimeline()
if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] ||
bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
t.Errorf("Some of Bob's timeline messages did not have the expected content!")
} else {
// check message 0,1,2,3,4,5
bobGroupTimeline := bobsGroup.GetTimeline()
if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] ||
bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
t.Errorf("Some of Bob's timeline messages did not have the expected content!")
}
}
if len(carolsGroup.GetTimeline()) != 6 {
t.Errorf("Carol's timeline does not have all messages")
}
// check message 0,1,2,3,4,5
carolGroupTimeline := carolsGroup.GetTimeline()
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
} else {
// check message 0,1,2,3,4,5
carolGroupTimeline := carolsGroup.GetTimeline()
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
}
}
fmt.Println("Shutting down Bob...")