From 872bf30b1b87a71f4e041982c47a8f0378110c33 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Wed, 24 Nov 2021 17:08:02 -0800 Subject: [PATCH] refactor monitors, remove pidusage, fix logging optionality, improve fetch stats, improve reporting --- app/main.go | 7 +-- go.mod | 3 +- go.sum | 43 ------------- metrics/metrics.go | 25 ++++++-- metrics/monitors.go | 115 +++++++++++++++++----------------- metrics/monitors_test.go | 5 +- server.go | 78 +++++++++++++++-------- serverConfig.go | 15 ++--- servers.go | 3 +- storage/message_store.go | 39 +++++++++--- storage/message_store_test.go | 3 +- 11 files changed, 174 insertions(+), 162 deletions(-) diff --git a/app/main.go b/app/main.go index 8909d49..3421a37 100644 --- a/app/main.go +++ b/app/main.go @@ -8,6 +8,7 @@ import ( "git.openprivacy.ca/cwtch.im/tapir/primitives" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" + _ "github.com/mattn/go-sqlite3" // sqlite3 driver "io/ioutil" mrand "math/rand" "os" @@ -42,9 +43,7 @@ func main() { config.TokenServerPublicKey = tid.PublicKey() config.MaxBufferLines = 100000 config.ServerReporting = cwtchserver.Reporting{ - LogMetricsToFile: true, - ReportingGroupID: "", - ReportingServerAddr: "", + LogMetricsToFile: true, } config.ConfigDir = "." config.FilePath = cwtchserver.ServerConfigFile @@ -53,7 +52,7 @@ func main() { return } - serverConfig, err := cwtchserver.LoadCreateDefaultConfigFile(configDir, cwtchserver.ServerConfigFile, false, "") + serverConfig, err := cwtchserver.LoadCreateDefaultConfigFile(configDir, cwtchserver.ServerConfigFile, false, "", true) if err != nil { log.Errorf("Could not load/create config file: %s\n", err) return diff --git a/go.mod b/go.mod index e48b5e6..8d0cf50 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,5 @@ require ( git.openprivacy.ca/openprivacy/log v1.0.3 github.com/gtank/ristretto255 v0.1.2 github.com/mattn/go-sqlite3 v1.14.7 - github.com/struCoder/pidusage v0.2.1 golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee -) \ No newline at end of file +) diff --git a/go.sum b/go.sum index a6605ec..82ba41e 100644 --- a/go.sum +++ b/go.sum @@ -1,32 +1,13 @@ -cwtch.im/cwtch v0.7.3 h1:3f2Q2XFgNtq8hVbEz7K++exS1gDsO+zmSO7X7Fqz8Mo= -cwtch.im/cwtch v0.7.3/go.mod h1:S0JRLSTwFM+kRrpOPKgUQn/loKe/fc6lLDnOFopRKq8= -cwtch.im/cwtch v0.8.0 h1:QDRaDBTXefFRPZPqUMtxoNhOcgXv0rl0bGjysSOmJX0= -cwtch.im/cwtch v0.8.0/go.mod h1:+SY/4ueF1U7mK+CX8hZFbtd+GC1lx/cReo110KgtQAw= -cwtch.im/cwtch v0.8.5 h1:W67jAF2oRwqWytbZEv1UeCqW0cU2x69tgUw8iy27xFA= -cwtch.im/cwtch v0.8.5/go.mod h1:5GHxaaeVnKeXSU64IvtCKzkqhU8DRiLoVM+tiBT8kkc= cwtch.im/cwtch v0.12.2 h1:I+ndKadCRCITw4SPbd+1cpRv+z/7iHjjTUv8OzRwTrE= cwtch.im/cwtch v0.12.2/go.mod h1:QpTkQK7MqNt0dQK9/pBk5VpkvFhy6xuoxJIn401B8fM= filippo.io/edwards25519 v1.0.0-rc.1 h1:m0VOOB23frXZvAOK44usCgLWvtsxIoMCTBGJZlpmGfU= filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= -git.openprivacy.ca/cwtch.im/tapir v0.4.0 h1:clG8uORt0NKEhT4P+Dpw1pzyUuYzYBMevGqn2pciKk8= -git.openprivacy.ca/cwtch.im/tapir v0.4.0/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E= -git.openprivacy.ca/cwtch.im/tapir v0.4.1 h1:9LMpQX41IzecNNlRc1FZKXHg6wlFss679tFsa3vzb3Y= -git.openprivacy.ca/cwtch.im/tapir v0.4.1/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E= -git.openprivacy.ca/cwtch.im/tapir v0.4.2 h1:bxMWZnVJXX4dqqOFS7ELW4iFkVL4GS8wiRkjRv5rJe8= -git.openprivacy.ca/cwtch.im/tapir v0.4.2/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E= -git.openprivacy.ca/cwtch.im/tapir v0.4.4 h1:KyuTVmr9GYptTCeR7JDODjmhBBbnIBf9V3NSC4+6bHc= -git.openprivacy.ca/cwtch.im/tapir v0.4.4/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE= git.openprivacy.ca/cwtch.im/tapir v0.4.9 h1:LXonlztwvI1F1++0IyomIcDH1/Bxzo+oN8YjGonNvjM= git.openprivacy.ca/cwtch.im/tapir v0.4.9/go.mod h1:p4bHo3DAO8wwimU6JAeZXbfPQ4jnoA2bV+4YvknWTNQ= git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= -git.openprivacy.ca/openprivacy/connectivity v1.4.3 h1:i2Ad/U9FlL9dKr2bhRck7lJ8NoWyGtoEfUwoCyMT0fU= -git.openprivacy.ca/openprivacy/connectivity v1.4.3/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo= -git.openprivacy.ca/openprivacy/connectivity v1.4.5 h1:UYMdCWPzEAP7LbqdMXGNXmfKjWlvfnKdmewBtnbgQRI= -git.openprivacy.ca/openprivacy/connectivity v1.4.5/go.mod h1:JVRCIdL+lAG6ohBFWiKeC/MN42nnC0sfFszR9XG6vPQ= git.openprivacy.ca/openprivacy/connectivity v1.5.0 h1:ZxsR/ZaVKXIkD2x6FlajZn62ciNQjamrI4i/5xIpdoQ= git.openprivacy.ca/openprivacy/connectivity v1.5.0/go.mod h1:UjQiGBnWbotmBzIw59B8H6efwDadjkKzm3RPT1UaIRw= -git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM= git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0= @@ -56,32 +37,16 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ= -github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= -github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY= -github.com/struCoder/pidusage v0.2.1/go.mod h1:bewtP2KUA1TBUyza5+/PCpSQ6sc/H6jJbIKAzqW86BA= -github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee h1:4yd7jl+vXjalO5ztz6Vc1VADv+S/80LGJmyl1ROJ2AI= golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= -golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -98,14 +63,6 @@ golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e h1:FDhOuMEY4JVRztM/gsbk+IKUQ8kj74bxZrgw87eMMVc= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA= -golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= diff --git a/metrics/metrics.go b/metrics/metrics.go index 589c909..551d340 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -71,6 +71,7 @@ const ( Average ) +// TODO port to SQLite for persistence between runs? type monitorHistory struct { monitorType MonitorType monitorAccumulation MonitorAccumulation @@ -150,13 +151,25 @@ func (mh *monitorHistory) Months() []float64 { return mh.returnCopy(mh.perMonthForYear[:]) } +const timeDay = time.Hour * 24 +const timeWeek = timeDay * 7 +const timeMonth = timeDay * 28 + func (mh *monitorHistory) Report(w *bufio.Writer) { mh.lock.Lock() fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:])) - fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:])) - fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:])) - fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:])) - fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:])) + if time.Since(mh.starttime) >= time.Hour { + fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:])) + } + if time.Since(mh.starttime) >= timeDay { + fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:])) + } + if time.Since(mh.starttime) >= timeWeek { + fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:])) + } + if time.Since(mh.starttime) >= timeMonth { + fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:])) + } mh.lock.Unlock() } @@ -215,10 +228,10 @@ func (mh *monitorHistory) monitorThread() { case <-time.After(time.Minute): mh.lock.Lock() - minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation) + minuteAcc := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation) if time.Since(mh.timeLastHourRotate) > time.Hour { - rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation) + rotateAndAccumulate(mh.perHourForDay[:], minuteAcc, mh.monitorAccumulation) mh.timeLastHourRotate = time.Now() } diff --git a/metrics/monitors.go b/metrics/monitors.go index e445531..61fb3ec 100644 --- a/metrics/monitors.go +++ b/metrics/monitors.go @@ -5,7 +5,6 @@ import ( "fmt" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/openprivacy/log" - "github.com/struCoder/pidusage" "os" "path" "runtime" @@ -17,70 +16,48 @@ const ( reportFile = "serverMonitorReport.txt" ) +type MessageCountFn func() int + // Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns type Monitors struct { - MessageCounter Counter - TotalMessageCounter Counter - Messages MonitorHistory - CPU MonitorHistory - Memory MonitorHistory - ClientConns MonitorHistory - starttime time.Time - breakChannel chan bool - log bool - configDir string + MessageCounter Counter + Messages MonitorHistory + Memory MonitorHistory + ClientConns MonitorHistory + messageCountFn MessageCountFn + starttime time.Time + breakChannel chan bool + log bool + configDir string + running bool + lock sync.Mutex +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 } // Start initializes a Monitors's monitors -func (mp *Monitors) Start(ts tapir.Service, configDir string, doLogging bool) { +func (mp *Monitors) Start(ts tapir.Service, mcfn MessageCountFn, configDir string, doLogging bool) { mp.log = doLogging mp.configDir = configDir mp.starttime = time.Now() mp.breakChannel = make(chan bool) mp.MessageCounter = NewCounter() + mp.messageCountFn = mcfn - // Maintain a count of total messages - mp.TotalMessageCounter = NewCounter() mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) { c = float64(mp.MessageCounter.Count()) - mp.TotalMessageCounter.Add(int(c)) mp.MessageCounter.Reset() return }) - var pidUsageLock sync.Mutex - // pidusage doesn't support windows - if runtime.GOOS != "windows" { - mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { - pidUsageLock.Lock() - defer pidUsageLock.Unlock() - sysInfo, err := pidusage.GetStat(os.Getpid()) - if err != nil { - log.Errorf("pidusage.GetStat failed with: %s", err) - return 0.0 - } - return float64(sysInfo.CPU) - }) - mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { - pidUsageLock.Lock() - defer pidUsageLock.Unlock() - sysInfo, err := pidusage.GetStat(os.Getpid()) - if err != nil { - log.Errorf("pidusage.GetStat failed with: %s", err) - return 0.0 - } - return float64(sysInfo.Memory) - }) - } else { - mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { - return 0.0 - }) - mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { - return 0.0 - }) - } + mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return float64(bToMb(m.Sys)) + }) - // TODO: replace with ts. mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ts.Metrics().ConnectionCount) }) if mp.log { @@ -89,16 +66,34 @@ func (mp *Monitors) Start(ts tapir.Service, configDir string, doLogging bool) { } func (mp *Monitors) run() { + mp.running = true for { select { case <-time.After(time.Minute): + mp.lock.Lock() mp.report() + mp.lock.Unlock() case <-mp.breakChannel: + mp.lock.Lock() + mp.running = false + mp.lock.Unlock() return } } } +func FormatDuration(ts time.Duration) string { + const ( + Day = 24 * time.Hour + ) + d := ts / Day + ts = ts % Day + h := ts / time.Hour + ts = ts % time.Hour + m := ts / time.Minute + return fmt.Sprintf("%dd%dh%dm", d, h, m) +} + func (mp *Monitors) report() { f, err := os.Create(path.Join(mp.configDir, reportFile)) if err != nil { @@ -109,18 +104,16 @@ func (mp *Monitors) report() { w := bufio.NewWriter(f) - fmt.Fprintf(w, "Uptime: %v\n\n", time.Since(mp.starttime)) + fmt.Fprintf(w, "Uptime: %v \n", FormatDuration(time.Since(mp.starttime))) + fmt.Fprintf(w, "Total Messages: %v \n\n", mp.messageCountFn()) - fmt.Fprintln(w, "messages:") + fmt.Fprintln(w, "Messages:") mp.Messages.Report(w) fmt.Fprintln(w, "\nClient Connections:") mp.ClientConns.Report(w) - fmt.Fprintln(w, "\nCPU:") - mp.CPU.Report(w) - - fmt.Fprintln(w, "\nMemory:") + fmt.Fprintln(w, "\nSys Memory:") mp.Memory.Report(w) w.Flush() @@ -128,11 +121,15 @@ func (mp *Monitors) report() { // Stop stops all the monitors in a Monitors func (mp *Monitors) Stop() { - if mp.log { - mp.breakChannel <- true + mp.lock.Lock() + running := mp.running + mp.lock.Unlock() + if running { + if mp.log { + mp.breakChannel <- true + } + mp.Messages.Stop() + mp.Memory.Stop() + mp.ClientConns.Stop() } - mp.Messages.Stop() - mp.CPU.Stop() - mp.Memory.Stop() - mp.ClientConns.Stop() } diff --git a/metrics/monitors_test.go b/metrics/monitors_test.go index 1da9c26..a587552 100644 --- a/metrics/monitors_test.go +++ b/metrics/monitors_test.go @@ -15,13 +15,14 @@ func TestMonitors(t *testing.T) { os.Mkdir("testLog", 0700) service := new(tor2.BaseOnionService) mp := Monitors{} - mp.Start(service, "testLog", true) + mp.Start(service, func() int { return 1 }, "testLog", true) + mp.MessageCounter.Add(1) log.Infof("sleeping for minute to give to for monitors to trigger...") // wait a minute for it to trigger time.Sleep(62 * time.Second) // it didn't segfault? that's good, did it create a log file? - if _, err := os.Stat(filepath.Join("testLog", "serverMonitorReport.txt")); err != nil { + if _, err := os.Stat(filepath.Join("testLog", "serverMonitorReport.txt")); err != nil { t.Errorf("serverMonitorReport.txt not generated") } diff --git a/server.go b/server.go index e03d3bf..82bf1c1 100644 --- a/server.go +++ b/server.go @@ -42,21 +42,22 @@ type Server interface { TofuBundle() string GetAttribute(string) string SetAttribute(string, string) + SetMonitorLogging(bool) } type server struct { - service tapir.Service - config *Config - metricsPack metrics.Monitors - tokenTapirService tapir.Service - tokenServer *privacypass.TokenServer - tokenService primitives.Identity - tokenServicePrivKey ed25519.PrivateKey - tokenServiceStopped bool - onionServiceStopped bool - running bool - existingMessageCount int - lock sync.RWMutex + config *Config + service tapir.Service + messageStore storage.MessageStoreInterface + metricsPack metrics.Monitors + tokenTapirService tapir.Service + tokenServer *privacypass.TokenServer + tokenService primitives.Identity + tokenServicePrivKey ed25519.PrivateKey + tokenServiceStopped bool + onionServiceStopped bool + running bool + lock sync.RWMutex } // NewServer creates and configures a new server based on the supplied configuration @@ -78,6 +79,21 @@ func (s *server) Identity() primitives.Identity { return s.config.Identity() } +// helper fn to pass to metrics +func (s *server) getStorageTotalMessageCount() int { + if s.messageStore != nil { + return s.messageStore.MessagesCount() + } + return 0 +} + +// helper fn to pass to storage +func (s *server) incMessageCount() { + if s.metricsPack.MessageCounter != nil { + s.metricsPack.MessageCounter.Add(1) + } +} + // Run starts a server with the given privateKey func (s *server) Run(acn connectivity.ACN) error { s.lock.Lock() @@ -91,17 +107,17 @@ func (s *server) Run(acn connectivity.ACN) error { service.Init(acn, s.config.PrivateKey, &identity) s.service = service log.Infof("cwtch server running on cwtch:%s\n", s.Onion()) - s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile) - ms, err := storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.metricsPack.MessageCounter) + if s.config.ServerReporting.LogMetricsToFile { + s.metricsPack.Start(service, s.getStorageTotalMessageCount, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile) + } + + var err error + s.messageStore, err = storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.incMessageCount) if err != nil { return fmt.Errorf("could not open database: %v", err) } - // Needed because we only collect metrics on a per-session basis - // TODO fix metrics so they persist across sessions? - s.existingMessageCount = len(ms.FetchMessages()) - s.tokenTapirService = new(tor2.BaseOnionService) s.tokenTapirService.Init(acn, s.tokenServicePrivKey, &s.tokenService) tokenApplication := new(applications.TokenApplication) @@ -114,7 +130,7 @@ func (s *server) Run(acn connectivity.ACN) error { s.tokenServiceStopped = true }() go func() { - s.service.Listen(NewTokenBoardServer(ms, s.tokenServer)) + s.service.Listen(NewTokenBoardServer(s.messageStore, s.tokenServer)) s.onionServiceStopped = true }() @@ -151,6 +167,7 @@ func (s *server) Stop() { defer s.lock.Unlock() if s.running { s.service.Shutdown() + s.messageStore.Close() s.tokenTapirService.Shutdown() log.Infof("Closing Token server Database...") @@ -169,20 +186,16 @@ func (s *server) Destroy() { // Statistics is an encapsulation of information about the server that an operator might want to know at a glance. type Statistics struct { - TotalMessages int + TotalMessages int + TotalConnections int } // GetStatistics is a stub method for providing some high level information about // the server operation to bundling applications (e.g. the UI) func (s *server) GetStatistics() Statistics { - // TODO Statistics from Metrics is very awkward. Metrics needs an overhaul to make safe - total := s.existingMessageCount - if s.metricsPack.TotalMessageCounter != nil { - total += s.metricsPack.TotalMessageCounter.Count() - } - return Statistics{ - TotalMessages: total, + TotalMessages: s.messageStore.MessagesCount(), + TotalConnections: s.service.Metrics().ConnectionCount, } } @@ -229,3 +242,14 @@ func (s *server) GetAttribute(key string) string { func (s *server) SetAttribute(key, val string) { s.config.SetAttribute(key, val) } + +// SetMonitorLogging turns on or off the monitor logging suite, and logging to a file in the server dir +func (s *server) SetMonitorLogging(do bool) { + s.config.ServerReporting.LogMetricsToFile = do + s.config.Save() + if do { + s.metricsPack.Start(s.service, s.getStorageTotalMessageCount, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile) + } else { + s.metricsPack.Stop() + } +} diff --git a/serverConfig.go b/serverConfig.go index 0786b9d..5743cb3 100644 --- a/serverConfig.go +++ b/serverConfig.go @@ -42,9 +42,7 @@ const ( // Reporting is a struct for storing a the config a server needs to be a peer, and connect to a group to report type Reporting struct { - LogMetricsToFile bool `json:"logMetricsToFile"` - ReportingGroupID string `json:"reportingGroupId"` - ReportingServerAddr string `json:"reportingServerAddr"` + LogMetricsToFile bool `json:"logMetricsToFile"` } // Config is a struct for storing basic server configuration @@ -92,9 +90,7 @@ func initDefaultConfig(configDir, filename string, encrypted bool) *Config { config.TokenServerPublicKey = tid.PublicKey() config.MaxBufferLines = 100000 config.ServerReporting = Reporting{ - LogMetricsToFile: true, - ReportingGroupID: "", - ReportingServerAddr: "", + LogMetricsToFile: false, } config.Attributes[AttrAutostart] = "false" @@ -112,19 +108,20 @@ func initDefaultConfig(configDir, filename string, encrypted bool) *Config { // LoadCreateDefaultConfigFile loads a Config from or creates a default config and saves it to a json file specified by filename // if the encrypted flag is true the config is store encrypted by password -func LoadCreateDefaultConfigFile(configDir, filename string, encrypted bool, password string) (*Config, error) { +func LoadCreateDefaultConfigFile(configDir, filename string, encrypted bool, password string, defaultLogToFile bool) (*Config, error) { if _, err := os.Stat(path.Join(configDir, filename)); os.IsNotExist(err) { - return CreateConfig(configDir, filename, encrypted, password) + return CreateConfig(configDir, filename, encrypted, password, defaultLogToFile) } return LoadConfig(configDir, filename, encrypted, password) } // CreateConfig creates a default config and saves it to a json file specified by filename // if the encrypted flag is true the config is store encrypted by password -func CreateConfig(configDir, filename string, encrypted bool, password string) (*Config, error) { +func CreateConfig(configDir, filename string, encrypted bool, password string, defaultLogToFile bool) (*Config, error) { log.Debugf("CreateConfig for server with configDir: %s\n", configDir) os.MkdirAll(configDir, 0700) config := initDefaultConfig(configDir, filename, encrypted) + config.ServerReporting.LogMetricsToFile = defaultLogToFile if encrypted { key, _, err := v1.InitV1Directory(configDir, password) if err != nil { diff --git a/servers.go b/servers.go index e48c942..4ae895d 100644 --- a/servers.go +++ b/servers.go @@ -63,7 +63,6 @@ func (s *servers) LoadServers(password string) ([]string, error) { } } } - log.Infof("LoadServers returning: %s\n", loadedServers) return loadedServers, nil } @@ -71,7 +70,7 @@ func (s *servers) LoadServers(password string) ([]string, error) { func (s *servers) CreateServer(password string) (Server, error) { newLocalID := model.GenerateRandomID() directory := path.Join(s.directory, newLocalID) - config, err := CreateConfig(directory, ServerConfigFile, true, password) + config, err := CreateConfig(directory, ServerConfigFile, true, password, false) if err != nil { return nil, err } diff --git a/storage/message_store.go b/storage/message_store.go index aa3d459..4f90734 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -5,22 +5,22 @@ import ( "database/sql" "encoding/base64" "fmt" - "git.openprivacy.ca/cwtch.im/server/metrics" "git.openprivacy.ca/openprivacy/log" - _ "github.com/mattn/go-sqlite3" // sqlite3 driver ) // MessageStoreInterface defines an interface to interact with a store of cwtch messages. type MessageStoreInterface interface { AddMessage(groups.EncryptedGroupMessage) FetchMessages() []*groups.EncryptedGroupMessage + MessagesCount() int FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage + Close() } // SqliteMessageStore is an sqlite3 backed message store type SqliteMessageStore struct { - messageCounter metrics.Counter - database *sql.DB + incMessageCounterFn func() + database *sql.DB // Some prepared queries... preparedInsertStatement *sql.Stmt // A Stmt is safe for concurrent use by multiple goroutines. @@ -36,7 +36,9 @@ func (s *SqliteMessageStore) Close() { // AddMessage implements the MessageStoreInterface AddMessage for sqlite message store func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { - s.messageCounter.Add(1) + if s.incMessageCounterFn != nil { + s.incMessageCounterFn() + } // ignore this clearly invalid message... if len(message.Signature) == 0 { return @@ -49,6 +51,29 @@ func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) { } } +func (s SqliteMessageStore) MessagesCount() int { + rows, err := s.database.Query("SELECT COUNT(*) from messages") + if err != nil { + log.Errorf("%v", err) + return -1 + } + defer rows.Close() + + result := rows.Next() + if !result { + return -1 + } + + var rownum int + err = rows.Scan(&rownum) + if err != nil { + log.Errorf("error fetching rows: %v", err) + return -1 + } + + return rownum +} + // FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage { rows, err := s.database.Query("SELECT id, signature,ciphertext from messages") @@ -107,7 +132,7 @@ func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGrou // InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist) // and returns an open database -func InitializeSqliteMessageStore(dbfile string, messageCounter metrics.Counter) (*SqliteMessageStore, error) { +func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*SqliteMessageStore, error) { db, err := sql.Open("sqlite3", dbfile) if err != nil { log.Errorf("database %v cannot be created or opened %v", dbfile, err) @@ -123,7 +148,7 @@ func InitializeSqliteMessageStore(dbfile string, messageCounter metrics.Counter) log.Infof("Database Initialized") slms := new(SqliteMessageStore) slms.database = db - slms.messageCounter = messageCounter + slms.incMessageCounterFn = incMessageCounterFn sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);` stmt, err := slms.database.Prepare(sqlStmt) diff --git a/storage/message_store_test.go b/storage/message_store_test.go index 54dd364..619632b 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "git.openprivacy.ca/cwtch.im/server/metrics" "git.openprivacy.ca/openprivacy/log" + _ "github.com/mattn/go-sqlite3" // sqlite3 driver "os" "testing" "time" @@ -15,7 +16,7 @@ func TestMessageStore(t *testing.T) { os.Remove(filename) log.SetLevel(log.LevelDebug) counter := metrics.NewCounter() - db, err := InitializeSqliteMessageStore(filename, counter) + db, err := InitializeSqliteMessageStore(filename, func() { counter.Add(1) }) if err != nil { t.Fatalf("Error: %v", err) }