forked from cwtch.im/server
refactor monitors, remove pidusage, fix logging optionality, improve fetch stats, improve reporting
This commit is contained in:
parent
e6c184df4e
commit
872bf30b1b
|
@ -8,6 +8,7 @@ import (
|
|||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mattn/go-sqlite3" // sqlite3 driver
|
||||
"io/ioutil"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
|
@ -42,9 +43,7 @@ func main() {
|
|||
config.TokenServerPublicKey = tid.PublicKey()
|
||||
config.MaxBufferLines = 100000
|
||||
config.ServerReporting = cwtchserver.Reporting{
|
||||
LogMetricsToFile: true,
|
||||
ReportingGroupID: "",
|
||||
ReportingServerAddr: "",
|
||||
LogMetricsToFile: true,
|
||||
}
|
||||
config.ConfigDir = "."
|
||||
config.FilePath = cwtchserver.ServerConfigFile
|
||||
|
@ -53,7 +52,7 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
serverConfig, err := cwtchserver.LoadCreateDefaultConfigFile(configDir, cwtchserver.ServerConfigFile, false, "")
|
||||
serverConfig, err := cwtchserver.LoadCreateDefaultConfigFile(configDir, cwtchserver.ServerConfigFile, false, "", true)
|
||||
if err != nil {
|
||||
log.Errorf("Could not load/create config file: %s\n", err)
|
||||
return
|
||||
|
|
3
go.mod
3
go.mod
|
@ -9,6 +9,5 @@ require (
|
|||
git.openprivacy.ca/openprivacy/log v1.0.3
|
||||
github.com/gtank/ristretto255 v0.1.2
|
||||
github.com/mattn/go-sqlite3 v1.14.7
|
||||
github.com/struCoder/pidusage v0.2.1
|
||||
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee
|
||||
)
|
||||
)
|
||||
|
|
43
go.sum
43
go.sum
|
@ -1,32 +1,13 @@
|
|||
cwtch.im/cwtch v0.7.3 h1:3f2Q2XFgNtq8hVbEz7K++exS1gDsO+zmSO7X7Fqz8Mo=
|
||||
cwtch.im/cwtch v0.7.3/go.mod h1:S0JRLSTwFM+kRrpOPKgUQn/loKe/fc6lLDnOFopRKq8=
|
||||
cwtch.im/cwtch v0.8.0 h1:QDRaDBTXefFRPZPqUMtxoNhOcgXv0rl0bGjysSOmJX0=
|
||||
cwtch.im/cwtch v0.8.0/go.mod h1:+SY/4ueF1U7mK+CX8hZFbtd+GC1lx/cReo110KgtQAw=
|
||||
cwtch.im/cwtch v0.8.5 h1:W67jAF2oRwqWytbZEv1UeCqW0cU2x69tgUw8iy27xFA=
|
||||
cwtch.im/cwtch v0.8.5/go.mod h1:5GHxaaeVnKeXSU64IvtCKzkqhU8DRiLoVM+tiBT8kkc=
|
||||
cwtch.im/cwtch v0.12.2 h1:I+ndKadCRCITw4SPbd+1cpRv+z/7iHjjTUv8OzRwTrE=
|
||||
cwtch.im/cwtch v0.12.2/go.mod h1:QpTkQK7MqNt0dQK9/pBk5VpkvFhy6xuoxJIn401B8fM=
|
||||
filippo.io/edwards25519 v1.0.0-rc.1 h1:m0VOOB23frXZvAOK44usCgLWvtsxIoMCTBGJZlpmGfU=
|
||||
filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.0 h1:clG8uORt0NKEhT4P+Dpw1pzyUuYzYBMevGqn2pciKk8=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.0/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.1 h1:9LMpQX41IzecNNlRc1FZKXHg6wlFss679tFsa3vzb3Y=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.1/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.2 h1:bxMWZnVJXX4dqqOFS7ELW4iFkVL4GS8wiRkjRv5rJe8=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.2/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.4 h1:KyuTVmr9GYptTCeR7JDODjmhBBbnIBf9V3NSC4+6bHc=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.4/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.9 h1:LXonlztwvI1F1++0IyomIcDH1/Bxzo+oN8YjGonNvjM=
|
||||
git.openprivacy.ca/cwtch.im/tapir v0.4.9/go.mod h1:p4bHo3DAO8wwimU6JAeZXbfPQ4jnoA2bV+4YvknWTNQ=
|
||||
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
|
||||
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.4.3 h1:i2Ad/U9FlL9dKr2bhRck7lJ8NoWyGtoEfUwoCyMT0fU=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.4.3/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.4.5 h1:UYMdCWPzEAP7LbqdMXGNXmfKjWlvfnKdmewBtnbgQRI=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.4.5/go.mod h1:JVRCIdL+lAG6ohBFWiKeC/MN42nnC0sfFszR9XG6vPQ=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.5.0 h1:ZxsR/ZaVKXIkD2x6FlajZn62ciNQjamrI4i/5xIpdoQ=
|
||||
git.openprivacy.ca/openprivacy/connectivity v1.5.0/go.mod h1:UjQiGBnWbotmBzIw59B8H6efwDadjkKzm3RPT1UaIRw=
|
||||
git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
|
||||
git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM=
|
||||
git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
|
||||
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
|
||||
|
@ -56,32 +37,16 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
|||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ=
|
||||
github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI=
|
||||
github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY=
|
||||
github.com/struCoder/pidusage v0.2.1/go.mod h1:bewtP2KUA1TBUyza5+/PCpSQ6sc/H6jJbIKAzqW86BA=
|
||||
github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg=
|
||||
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee h1:4yd7jl+vXjalO5ztz6Vc1VADv+S/80LGJmyl1ROJ2AI=
|
||||
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
|
||||
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
|
||||
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -98,14 +63,6 @@ golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
|||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e h1:FDhOuMEY4JVRztM/gsbk+IKUQ8kj74bxZrgw87eMMVc=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
|
||||
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
|
||||
|
|
|
@ -71,6 +71,7 @@ const (
|
|||
Average
|
||||
)
|
||||
|
||||
// TODO port to SQLite for persistence between runs?
|
||||
type monitorHistory struct {
|
||||
monitorType MonitorType
|
||||
monitorAccumulation MonitorAccumulation
|
||||
|
@ -150,13 +151,25 @@ func (mh *monitorHistory) Months() []float64 {
|
|||
return mh.returnCopy(mh.perMonthForYear[:])
|
||||
}
|
||||
|
||||
const timeDay = time.Hour * 24
|
||||
const timeWeek = timeDay * 7
|
||||
const timeMonth = timeDay * 28
|
||||
|
||||
func (mh *monitorHistory) Report(w *bufio.Writer) {
|
||||
mh.lock.Lock()
|
||||
fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:]))
|
||||
fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:]))
|
||||
fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:]))
|
||||
fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:]))
|
||||
fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:]))
|
||||
if time.Since(mh.starttime) >= time.Hour {
|
||||
fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:]))
|
||||
}
|
||||
if time.Since(mh.starttime) >= timeDay {
|
||||
fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:]))
|
||||
}
|
||||
if time.Since(mh.starttime) >= timeWeek {
|
||||
fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:]))
|
||||
}
|
||||
if time.Since(mh.starttime) >= timeMonth {
|
||||
fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:]))
|
||||
}
|
||||
mh.lock.Unlock()
|
||||
}
|
||||
|
||||
|
@ -215,10 +228,10 @@ func (mh *monitorHistory) monitorThread() {
|
|||
case <-time.After(time.Minute):
|
||||
mh.lock.Lock()
|
||||
|
||||
minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation)
|
||||
minuteAcc := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation)
|
||||
|
||||
if time.Since(mh.timeLastHourRotate) > time.Hour {
|
||||
rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation)
|
||||
rotateAndAccumulate(mh.perHourForDay[:], minuteAcc, mh.monitorAccumulation)
|
||||
mh.timeLastHourRotate = time.Now()
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"github.com/struCoder/pidusage"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
|
@ -17,70 +16,48 @@ const (
|
|||
reportFile = "serverMonitorReport.txt"
|
||||
)
|
||||
|
||||
type MessageCountFn func() int
|
||||
|
||||
// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns
|
||||
type Monitors struct {
|
||||
MessageCounter Counter
|
||||
TotalMessageCounter Counter
|
||||
Messages MonitorHistory
|
||||
CPU MonitorHistory
|
||||
Memory MonitorHistory
|
||||
ClientConns MonitorHistory
|
||||
starttime time.Time
|
||||
breakChannel chan bool
|
||||
log bool
|
||||
configDir string
|
||||
MessageCounter Counter
|
||||
Messages MonitorHistory
|
||||
Memory MonitorHistory
|
||||
ClientConns MonitorHistory
|
||||
messageCountFn MessageCountFn
|
||||
starttime time.Time
|
||||
breakChannel chan bool
|
||||
log bool
|
||||
configDir string
|
||||
running bool
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func bToMb(b uint64) uint64 {
|
||||
return b / 1024 / 1024
|
||||
}
|
||||
|
||||
// Start initializes a Monitors's monitors
|
||||
func (mp *Monitors) Start(ts tapir.Service, configDir string, doLogging bool) {
|
||||
func (mp *Monitors) Start(ts tapir.Service, mcfn MessageCountFn, configDir string, doLogging bool) {
|
||||
mp.log = doLogging
|
||||
mp.configDir = configDir
|
||||
mp.starttime = time.Now()
|
||||
mp.breakChannel = make(chan bool)
|
||||
mp.MessageCounter = NewCounter()
|
||||
mp.messageCountFn = mcfn
|
||||
|
||||
// Maintain a count of total messages
|
||||
mp.TotalMessageCounter = NewCounter()
|
||||
mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) {
|
||||
c = float64(mp.MessageCounter.Count())
|
||||
mp.TotalMessageCounter.Add(int(c))
|
||||
mp.MessageCounter.Reset()
|
||||
return
|
||||
})
|
||||
|
||||
var pidUsageLock sync.Mutex
|
||||
// pidusage doesn't support windows
|
||||
if runtime.GOOS != "windows" {
|
||||
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 {
|
||||
pidUsageLock.Lock()
|
||||
defer pidUsageLock.Unlock()
|
||||
sysInfo, err := pidusage.GetStat(os.Getpid())
|
||||
if err != nil {
|
||||
log.Errorf("pidusage.GetStat failed with: %s", err)
|
||||
return 0.0
|
||||
}
|
||||
return float64(sysInfo.CPU)
|
||||
})
|
||||
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 {
|
||||
pidUsageLock.Lock()
|
||||
defer pidUsageLock.Unlock()
|
||||
sysInfo, err := pidusage.GetStat(os.Getpid())
|
||||
if err != nil {
|
||||
log.Errorf("pidusage.GetStat failed with: %s", err)
|
||||
return 0.0
|
||||
}
|
||||
return float64(sysInfo.Memory)
|
||||
})
|
||||
} else {
|
||||
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 {
|
||||
return 0.0
|
||||
})
|
||||
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 {
|
||||
return 0.0
|
||||
})
|
||||
}
|
||||
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 {
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
return float64(bToMb(m.Sys))
|
||||
})
|
||||
|
||||
// TODO: replace with ts.
|
||||
mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ts.Metrics().ConnectionCount) })
|
||||
|
||||
if mp.log {
|
||||
|
@ -89,16 +66,34 @@ func (mp *Monitors) Start(ts tapir.Service, configDir string, doLogging bool) {
|
|||
}
|
||||
|
||||
func (mp *Monitors) run() {
|
||||
mp.running = true
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Minute):
|
||||
mp.lock.Lock()
|
||||
mp.report()
|
||||
mp.lock.Unlock()
|
||||
case <-mp.breakChannel:
|
||||
mp.lock.Lock()
|
||||
mp.running = false
|
||||
mp.lock.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func FormatDuration(ts time.Duration) string {
|
||||
const (
|
||||
Day = 24 * time.Hour
|
||||
)
|
||||
d := ts / Day
|
||||
ts = ts % Day
|
||||
h := ts / time.Hour
|
||||
ts = ts % time.Hour
|
||||
m := ts / time.Minute
|
||||
return fmt.Sprintf("%dd%dh%dm", d, h, m)
|
||||
}
|
||||
|
||||
func (mp *Monitors) report() {
|
||||
f, err := os.Create(path.Join(mp.configDir, reportFile))
|
||||
if err != nil {
|
||||
|
@ -109,18 +104,16 @@ func (mp *Monitors) report() {
|
|||
|
||||
w := bufio.NewWriter(f)
|
||||
|
||||
fmt.Fprintf(w, "Uptime: %v\n\n", time.Since(mp.starttime))
|
||||
fmt.Fprintf(w, "Uptime: %v \n", FormatDuration(time.Since(mp.starttime)))
|
||||
fmt.Fprintf(w, "Total Messages: %v \n\n", mp.messageCountFn())
|
||||
|
||||
fmt.Fprintln(w, "messages:")
|
||||
fmt.Fprintln(w, "Messages:")
|
||||
mp.Messages.Report(w)
|
||||
|
||||
fmt.Fprintln(w, "\nClient Connections:")
|
||||
mp.ClientConns.Report(w)
|
||||
|
||||
fmt.Fprintln(w, "\nCPU:")
|
||||
mp.CPU.Report(w)
|
||||
|
||||
fmt.Fprintln(w, "\nMemory:")
|
||||
fmt.Fprintln(w, "\nSys Memory:")
|
||||
mp.Memory.Report(w)
|
||||
|
||||
w.Flush()
|
||||
|
@ -128,11 +121,15 @@ func (mp *Monitors) report() {
|
|||
|
||||
// Stop stops all the monitors in a Monitors
|
||||
func (mp *Monitors) Stop() {
|
||||
if mp.log {
|
||||
mp.breakChannel <- true
|
||||
mp.lock.Lock()
|
||||
running := mp.running
|
||||
mp.lock.Unlock()
|
||||
if running {
|
||||
if mp.log {
|
||||
mp.breakChannel <- true
|
||||
}
|
||||
mp.Messages.Stop()
|
||||
mp.Memory.Stop()
|
||||
mp.ClientConns.Stop()
|
||||
}
|
||||
mp.Messages.Stop()
|
||||
mp.CPU.Stop()
|
||||
mp.Memory.Stop()
|
||||
mp.ClientConns.Stop()
|
||||
}
|
||||
|
|
|
@ -15,13 +15,14 @@ func TestMonitors(t *testing.T) {
|
|||
os.Mkdir("testLog", 0700)
|
||||
service := new(tor2.BaseOnionService)
|
||||
mp := Monitors{}
|
||||
mp.Start(service, "testLog", true)
|
||||
mp.Start(service, func() int { return 1 }, "testLog", true)
|
||||
mp.MessageCounter.Add(1)
|
||||
log.Infof("sleeping for minute to give to for monitors to trigger...")
|
||||
// wait a minute for it to trigger
|
||||
time.Sleep(62 * time.Second)
|
||||
|
||||
// it didn't segfault? that's good, did it create a log file?
|
||||
if _, err := os.Stat(filepath.Join("testLog", "serverMonitorReport.txt")); err != nil {
|
||||
if _, err := os.Stat(filepath.Join("testLog", "serverMonitorReport.txt")); err != nil {
|
||||
t.Errorf("serverMonitorReport.txt not generated")
|
||||
}
|
||||
|
||||
|
|
78
server.go
78
server.go
|
@ -42,21 +42,22 @@ type Server interface {
|
|||
TofuBundle() string
|
||||
GetAttribute(string) string
|
||||
SetAttribute(string, string)
|
||||
SetMonitorLogging(bool)
|
||||
}
|
||||
|
||||
type server struct {
|
||||
service tapir.Service
|
||||
config *Config
|
||||
metricsPack metrics.Monitors
|
||||
tokenTapirService tapir.Service
|
||||
tokenServer *privacypass.TokenServer
|
||||
tokenService primitives.Identity
|
||||
tokenServicePrivKey ed25519.PrivateKey
|
||||
tokenServiceStopped bool
|
||||
onionServiceStopped bool
|
||||
running bool
|
||||
existingMessageCount int
|
||||
lock sync.RWMutex
|
||||
config *Config
|
||||
service tapir.Service
|
||||
messageStore storage.MessageStoreInterface
|
||||
metricsPack metrics.Monitors
|
||||
tokenTapirService tapir.Service
|
||||
tokenServer *privacypass.TokenServer
|
||||
tokenService primitives.Identity
|
||||
tokenServicePrivKey ed25519.PrivateKey
|
||||
tokenServiceStopped bool
|
||||
onionServiceStopped bool
|
||||
running bool
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewServer creates and configures a new server based on the supplied configuration
|
||||
|
@ -78,6 +79,21 @@ func (s *server) Identity() primitives.Identity {
|
|||
return s.config.Identity()
|
||||
}
|
||||
|
||||
// helper fn to pass to metrics
|
||||
func (s *server) getStorageTotalMessageCount() int {
|
||||
if s.messageStore != nil {
|
||||
return s.messageStore.MessagesCount()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// helper fn to pass to storage
|
||||
func (s *server) incMessageCount() {
|
||||
if s.metricsPack.MessageCounter != nil {
|
||||
s.metricsPack.MessageCounter.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts a server with the given privateKey
|
||||
func (s *server) Run(acn connectivity.ACN) error {
|
||||
s.lock.Lock()
|
||||
|
@ -91,17 +107,17 @@ func (s *server) Run(acn connectivity.ACN) error {
|
|||
service.Init(acn, s.config.PrivateKey, &identity)
|
||||
s.service = service
|
||||
log.Infof("cwtch server running on cwtch:%s\n", s.Onion())
|
||||
s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
|
||||
|
||||
ms, err := storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.metricsPack.MessageCounter)
|
||||
if s.config.ServerReporting.LogMetricsToFile {
|
||||
s.metricsPack.Start(service, s.getStorageTotalMessageCount, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
|
||||
}
|
||||
|
||||
var err error
|
||||
s.messageStore, err = storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.incMessageCount)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not open database: %v", err)
|
||||
}
|
||||
|
||||
// Needed because we only collect metrics on a per-session basis
|
||||
// TODO fix metrics so they persist across sessions?
|
||||
s.existingMessageCount = len(ms.FetchMessages())
|
||||
|
||||
s.tokenTapirService = new(tor2.BaseOnionService)
|
||||
s.tokenTapirService.Init(acn, s.tokenServicePrivKey, &s.tokenService)
|
||||
tokenApplication := new(applications.TokenApplication)
|
||||
|
@ -114,7 +130,7 @@ func (s *server) Run(acn connectivity.ACN) error {
|
|||
s.tokenServiceStopped = true
|
||||
}()
|
||||
go func() {
|
||||
s.service.Listen(NewTokenBoardServer(ms, s.tokenServer))
|
||||
s.service.Listen(NewTokenBoardServer(s.messageStore, s.tokenServer))
|
||||
s.onionServiceStopped = true
|
||||
}()
|
||||
|
||||
|
@ -151,6 +167,7 @@ func (s *server) Stop() {
|
|||
defer s.lock.Unlock()
|
||||
if s.running {
|
||||
s.service.Shutdown()
|
||||
s.messageStore.Close()
|
||||
s.tokenTapirService.Shutdown()
|
||||
log.Infof("Closing Token server Database...")
|
||||
|
||||
|
@ -169,20 +186,16 @@ func (s *server) Destroy() {
|
|||
|
||||
// Statistics is an encapsulation of information about the server that an operator might want to know at a glance.
|
||||
type Statistics struct {
|
||||
TotalMessages int
|
||||
TotalMessages int
|
||||
TotalConnections int
|
||||
}
|
||||
|
||||
// GetStatistics is a stub method for providing some high level information about
|
||||
// the server operation to bundling applications (e.g. the UI)
|
||||
func (s *server) GetStatistics() Statistics {
|
||||
// TODO Statistics from Metrics is very awkward. Metrics needs an overhaul to make safe
|
||||
total := s.existingMessageCount
|
||||
if s.metricsPack.TotalMessageCounter != nil {
|
||||
total += s.metricsPack.TotalMessageCounter.Count()
|
||||
}
|
||||
|
||||
return Statistics{
|
||||
TotalMessages: total,
|
||||
TotalMessages: s.messageStore.MessagesCount(),
|
||||
TotalConnections: s.service.Metrics().ConnectionCount,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,3 +242,14 @@ func (s *server) GetAttribute(key string) string {
|
|||
func (s *server) SetAttribute(key, val string) {
|
||||
s.config.SetAttribute(key, val)
|
||||
}
|
||||
|
||||
// SetMonitorLogging turns on or off the monitor logging suite, and logging to a file in the server dir
|
||||
func (s *server) SetMonitorLogging(do bool) {
|
||||
s.config.ServerReporting.LogMetricsToFile = do
|
||||
s.config.Save()
|
||||
if do {
|
||||
s.metricsPack.Start(s.service, s.getStorageTotalMessageCount, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
|
||||
} else {
|
||||
s.metricsPack.Stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,9 +42,7 @@ const (
|
|||
|
||||
// Reporting is a struct for storing a the config a server needs to be a peer, and connect to a group to report
|
||||
type Reporting struct {
|
||||
LogMetricsToFile bool `json:"logMetricsToFile"`
|
||||
ReportingGroupID string `json:"reportingGroupId"`
|
||||
ReportingServerAddr string `json:"reportingServerAddr"`
|
||||
LogMetricsToFile bool `json:"logMetricsToFile"`
|
||||
}
|
||||
|
||||
// Config is a struct for storing basic server configuration
|
||||
|
@ -92,9 +90,7 @@ func initDefaultConfig(configDir, filename string, encrypted bool) *Config {
|
|||
config.TokenServerPublicKey = tid.PublicKey()
|
||||
config.MaxBufferLines = 100000
|
||||
config.ServerReporting = Reporting{
|
||||
LogMetricsToFile: true,
|
||||
ReportingGroupID: "",
|
||||
ReportingServerAddr: "",
|
||||
LogMetricsToFile: false,
|
||||
}
|
||||
config.Attributes[AttrAutostart] = "false"
|
||||
|
||||
|
@ -112,19 +108,20 @@ func initDefaultConfig(configDir, filename string, encrypted bool) *Config {
|
|||
|
||||
// LoadCreateDefaultConfigFile loads a Config from or creates a default config and saves it to a json file specified by filename
|
||||
// if the encrypted flag is true the config is store encrypted by password
|
||||
func LoadCreateDefaultConfigFile(configDir, filename string, encrypted bool, password string) (*Config, error) {
|
||||
func LoadCreateDefaultConfigFile(configDir, filename string, encrypted bool, password string, defaultLogToFile bool) (*Config, error) {
|
||||
if _, err := os.Stat(path.Join(configDir, filename)); os.IsNotExist(err) {
|
||||
return CreateConfig(configDir, filename, encrypted, password)
|
||||
return CreateConfig(configDir, filename, encrypted, password, defaultLogToFile)
|
||||
}
|
||||
return LoadConfig(configDir, filename, encrypted, password)
|
||||
}
|
||||
|
||||
// CreateConfig creates a default config and saves it to a json file specified by filename
|
||||
// if the encrypted flag is true the config is store encrypted by password
|
||||
func CreateConfig(configDir, filename string, encrypted bool, password string) (*Config, error) {
|
||||
func CreateConfig(configDir, filename string, encrypted bool, password string, defaultLogToFile bool) (*Config, error) {
|
||||
log.Debugf("CreateConfig for server with configDir: %s\n", configDir)
|
||||
os.MkdirAll(configDir, 0700)
|
||||
config := initDefaultConfig(configDir, filename, encrypted)
|
||||
config.ServerReporting.LogMetricsToFile = defaultLogToFile
|
||||
if encrypted {
|
||||
key, _, err := v1.InitV1Directory(configDir, password)
|
||||
if err != nil {
|
||||
|
|
|
@ -63,7 +63,6 @@ func (s *servers) LoadServers(password string) ([]string, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Infof("LoadServers returning: %s\n", loadedServers)
|
||||
return loadedServers, nil
|
||||
}
|
||||
|
||||
|
@ -71,7 +70,7 @@ func (s *servers) LoadServers(password string) ([]string, error) {
|
|||
func (s *servers) CreateServer(password string) (Server, error) {
|
||||
newLocalID := model.GenerateRandomID()
|
||||
directory := path.Join(s.directory, newLocalID)
|
||||
config, err := CreateConfig(directory, ServerConfigFile, true, password)
|
||||
config, err := CreateConfig(directory, ServerConfigFile, true, password, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -5,22 +5,22 @@ import (
|
|||
"database/sql"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/server/metrics"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mattn/go-sqlite3" // sqlite3 driver
|
||||
)
|
||||
|
||||
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
|
||||
type MessageStoreInterface interface {
|
||||
AddMessage(groups.EncryptedGroupMessage)
|
||||
FetchMessages() []*groups.EncryptedGroupMessage
|
||||
MessagesCount() int
|
||||
FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage
|
||||
Close()
|
||||
}
|
||||
|
||||
// SqliteMessageStore is an sqlite3 backed message store
|
||||
type SqliteMessageStore struct {
|
||||
messageCounter metrics.Counter
|
||||
database *sql.DB
|
||||
incMessageCounterFn func()
|
||||
database *sql.DB
|
||||
|
||||
// Some prepared queries...
|
||||
preparedInsertStatement *sql.Stmt // A Stmt is safe for concurrent use by multiple goroutines.
|
||||
|
@ -36,7 +36,9 @@ func (s *SqliteMessageStore) Close() {
|
|||
|
||||
// AddMessage implements the MessageStoreInterface AddMessage for sqlite message store
|
||||
func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) {
|
||||
s.messageCounter.Add(1)
|
||||
if s.incMessageCounterFn != nil {
|
||||
s.incMessageCounterFn()
|
||||
}
|
||||
// ignore this clearly invalid message...
|
||||
if len(message.Signature) == 0 {
|
||||
return
|
||||
|
@ -49,6 +51,29 @@ func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s SqliteMessageStore) MessagesCount() int {
|
||||
rows, err := s.database.Query("SELECT COUNT(*) from messages")
|
||||
if err != nil {
|
||||
log.Errorf("%v", err)
|
||||
return -1
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := rows.Next()
|
||||
if !result {
|
||||
return -1
|
||||
}
|
||||
|
||||
var rownum int
|
||||
err = rows.Scan(&rownum)
|
||||
if err != nil {
|
||||
log.Errorf("error fetching rows: %v", err)
|
||||
return -1
|
||||
}
|
||||
|
||||
return rownum
|
||||
}
|
||||
|
||||
// FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store
|
||||
func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage {
|
||||
rows, err := s.database.Query("SELECT id, signature,ciphertext from messages")
|
||||
|
@ -107,7 +132,7 @@ func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGrou
|
|||
|
||||
// InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist)
|
||||
// and returns an open database
|
||||
func InitializeSqliteMessageStore(dbfile string, messageCounter metrics.Counter) (*SqliteMessageStore, error) {
|
||||
func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*SqliteMessageStore, error) {
|
||||
db, err := sql.Open("sqlite3", dbfile)
|
||||
if err != nil {
|
||||
log.Errorf("database %v cannot be created or opened %v", dbfile, err)
|
||||
|
@ -123,7 +148,7 @@ func InitializeSqliteMessageStore(dbfile string, messageCounter metrics.Counter)
|
|||
log.Infof("Database Initialized")
|
||||
slms := new(SqliteMessageStore)
|
||||
slms.database = db
|
||||
slms.messageCounter = messageCounter
|
||||
slms.incMessageCounterFn = incMessageCounterFn
|
||||
|
||||
sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);`
|
||||
stmt, err := slms.database.Prepare(sqlStmt)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/binary"
|
||||
"git.openprivacy.ca/cwtch.im/server/metrics"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mattn/go-sqlite3" // sqlite3 driver
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -15,7 +16,7 @@ func TestMessageStore(t *testing.T) {
|
|||
os.Remove(filename)
|
||||
log.SetLevel(log.LevelDebug)
|
||||
counter := metrics.NewCounter()
|
||||
db, err := InitializeSqliteMessageStore(filename, counter)
|
||||
db, err := InitializeSqliteMessageStore(filename, func() { counter.Add(1) })
|
||||
if err != nil {
|
||||
t.Fatalf("Error: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue