Merge pull request 'refactor monitors, remove pidusage, fix logging optionality, improve fetch stats, improve reporting' (#25) from monitorsSql into trunk
continuous-integration/drone/push Build is passing Details

Reviewed-on: #25
This commit is contained in:
Sarah Jamie Lewis 2021-11-27 00:11:41 +00:00
commit 49a69bdfec
11 changed files with 174 additions and 162 deletions

View File

@ -8,6 +8,7 @@ import (
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mattn/go-sqlite3" // sqlite3 driver
"io/ioutil"
mrand "math/rand"
"os"
@ -43,8 +44,6 @@ func main() {
config.MaxBufferLines = 100000
config.ServerReporting = cwtchserver.Reporting{
LogMetricsToFile: true,
ReportingGroupID: "",
ReportingServerAddr: "",
}
config.ConfigDir = "."
config.FilePath = cwtchserver.ServerConfigFile
@ -53,7 +52,7 @@ func main() {
return
}
serverConfig, err := cwtchserver.LoadCreateDefaultConfigFile(configDir, cwtchserver.ServerConfigFile, false, "")
serverConfig, err := cwtchserver.LoadCreateDefaultConfigFile(configDir, cwtchserver.ServerConfigFile, false, "", true)
if err != nil {
log.Errorf("Could not load/create config file: %s\n", err)
return

1
go.mod
View File

@ -9,6 +9,5 @@ require (
git.openprivacy.ca/openprivacy/log v1.0.3
github.com/gtank/ristretto255 v0.1.2
github.com/mattn/go-sqlite3 v1.14.7
github.com/struCoder/pidusage v0.2.1
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee
)

43
go.sum
View File

@ -1,32 +1,13 @@
cwtch.im/cwtch v0.7.3 h1:3f2Q2XFgNtq8hVbEz7K++exS1gDsO+zmSO7X7Fqz8Mo=
cwtch.im/cwtch v0.7.3/go.mod h1:S0JRLSTwFM+kRrpOPKgUQn/loKe/fc6lLDnOFopRKq8=
cwtch.im/cwtch v0.8.0 h1:QDRaDBTXefFRPZPqUMtxoNhOcgXv0rl0bGjysSOmJX0=
cwtch.im/cwtch v0.8.0/go.mod h1:+SY/4ueF1U7mK+CX8hZFbtd+GC1lx/cReo110KgtQAw=
cwtch.im/cwtch v0.8.5 h1:W67jAF2oRwqWytbZEv1UeCqW0cU2x69tgUw8iy27xFA=
cwtch.im/cwtch v0.8.5/go.mod h1:5GHxaaeVnKeXSU64IvtCKzkqhU8DRiLoVM+tiBT8kkc=
cwtch.im/cwtch v0.12.2 h1:I+ndKadCRCITw4SPbd+1cpRv+z/7iHjjTUv8OzRwTrE=
cwtch.im/cwtch v0.12.2/go.mod h1:QpTkQK7MqNt0dQK9/pBk5VpkvFhy6xuoxJIn401B8fM=
filippo.io/edwards25519 v1.0.0-rc.1 h1:m0VOOB23frXZvAOK44usCgLWvtsxIoMCTBGJZlpmGfU=
filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
git.openprivacy.ca/cwtch.im/tapir v0.4.0 h1:clG8uORt0NKEhT4P+Dpw1pzyUuYzYBMevGqn2pciKk8=
git.openprivacy.ca/cwtch.im/tapir v0.4.0/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E=
git.openprivacy.ca/cwtch.im/tapir v0.4.1 h1:9LMpQX41IzecNNlRc1FZKXHg6wlFss679tFsa3vzb3Y=
git.openprivacy.ca/cwtch.im/tapir v0.4.1/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E=
git.openprivacy.ca/cwtch.im/tapir v0.4.2 h1:bxMWZnVJXX4dqqOFS7ELW4iFkVL4GS8wiRkjRv5rJe8=
git.openprivacy.ca/cwtch.im/tapir v0.4.2/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E=
git.openprivacy.ca/cwtch.im/tapir v0.4.4 h1:KyuTVmr9GYptTCeR7JDODjmhBBbnIBf9V3NSC4+6bHc=
git.openprivacy.ca/cwtch.im/tapir v0.4.4/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE=
git.openprivacy.ca/cwtch.im/tapir v0.4.9 h1:LXonlztwvI1F1++0IyomIcDH1/Bxzo+oN8YjGonNvjM=
git.openprivacy.ca/cwtch.im/tapir v0.4.9/go.mod h1:p4bHo3DAO8wwimU6JAeZXbfPQ4jnoA2bV+4YvknWTNQ=
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
git.openprivacy.ca/openprivacy/connectivity v1.4.3 h1:i2Ad/U9FlL9dKr2bhRck7lJ8NoWyGtoEfUwoCyMT0fU=
git.openprivacy.ca/openprivacy/connectivity v1.4.3/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
git.openprivacy.ca/openprivacy/connectivity v1.4.5 h1:UYMdCWPzEAP7LbqdMXGNXmfKjWlvfnKdmewBtnbgQRI=
git.openprivacy.ca/openprivacy/connectivity v1.4.5/go.mod h1:JVRCIdL+lAG6ohBFWiKeC/MN42nnC0sfFszR9XG6vPQ=
git.openprivacy.ca/openprivacy/connectivity v1.5.0 h1:ZxsR/ZaVKXIkD2x6FlajZn62ciNQjamrI4i/5xIpdoQ=
git.openprivacy.ca/openprivacy/connectivity v1.5.0/go.mod h1:UjQiGBnWbotmBzIw59B8H6efwDadjkKzm3RPT1UaIRw=
git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM=
git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
@ -56,32 +37,16 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ=
github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI=
github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY=
github.com/struCoder/pidusage v0.2.1/go.mod h1:bewtP2KUA1TBUyza5+/PCpSQ6sc/H6jJbIKAzqW86BA=
github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee h1:4yd7jl+vXjalO5ztz6Vc1VADv+S/80LGJmyl1ROJ2AI=
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -98,14 +63,6 @@ golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e h1:FDhOuMEY4JVRztM/gsbk+IKUQ8kj74bxZrgw87eMMVc=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=

View File

@ -71,6 +71,7 @@ const (
Average
)
// TODO port to SQLite for persistence between runs?
type monitorHistory struct {
monitorType MonitorType
monitorAccumulation MonitorAccumulation
@ -150,13 +151,25 @@ func (mh *monitorHistory) Months() []float64 {
return mh.returnCopy(mh.perMonthForYear[:])
}
const timeDay = time.Hour * 24
const timeWeek = timeDay * 7
const timeMonth = timeDay * 28
func (mh *monitorHistory) Report(w *bufio.Writer) {
mh.lock.Lock()
fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:]))
if time.Since(mh.starttime) >= time.Hour {
fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:]))
}
if time.Since(mh.starttime) >= timeDay {
fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:]))
}
if time.Since(mh.starttime) >= timeWeek {
fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:]))
}
if time.Since(mh.starttime) >= timeMonth {
fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:]))
}
mh.lock.Unlock()
}
@ -215,10 +228,10 @@ func (mh *monitorHistory) monitorThread() {
case <-time.After(time.Minute):
mh.lock.Lock()
minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation)
minuteAcc := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation)
if time.Since(mh.timeLastHourRotate) > time.Hour {
rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation)
rotateAndAccumulate(mh.perHourForDay[:], minuteAcc, mh.monitorAccumulation)
mh.timeLastHourRotate = time.Now()
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/openprivacy/log"
"github.com/struCoder/pidusage"
"os"
"path"
"runtime"
@ -17,70 +16,48 @@ const (
reportFile = "serverMonitorReport.txt"
)
type MessageCountFn func() int
// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns
type Monitors struct {
MessageCounter Counter
TotalMessageCounter Counter
Messages MonitorHistory
CPU MonitorHistory
Memory MonitorHistory
ClientConns MonitorHistory
messageCountFn MessageCountFn
starttime time.Time
breakChannel chan bool
log bool
configDir string
running bool
lock sync.Mutex
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}
// Start initializes a Monitors's monitors
func (mp *Monitors) Start(ts tapir.Service, configDir string, doLogging bool) {
func (mp *Monitors) Start(ts tapir.Service, mcfn MessageCountFn, configDir string, doLogging bool) {
mp.log = doLogging
mp.configDir = configDir
mp.starttime = time.Now()
mp.breakChannel = make(chan bool)
mp.MessageCounter = NewCounter()
mp.messageCountFn = mcfn
// Maintain a count of total messages
mp.TotalMessageCounter = NewCounter()
mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) {
c = float64(mp.MessageCounter.Count())
mp.TotalMessageCounter.Add(int(c))
mp.MessageCounter.Reset()
return
})
var pidUsageLock sync.Mutex
// pidusage doesn't support windows
if runtime.GOOS != "windows" {
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 {
pidUsageLock.Lock()
defer pidUsageLock.Unlock()
sysInfo, err := pidusage.GetStat(os.Getpid())
if err != nil {
log.Errorf("pidusage.GetStat failed with: %s", err)
return 0.0
}
return float64(sysInfo.CPU)
})
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 {
pidUsageLock.Lock()
defer pidUsageLock.Unlock()
sysInfo, err := pidusage.GetStat(os.Getpid())
if err != nil {
log.Errorf("pidusage.GetStat failed with: %s", err)
return 0.0
}
return float64(sysInfo.Memory)
var m runtime.MemStats
runtime.ReadMemStats(&m)
return float64(bToMb(m.Sys))
})
} else {
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 {
return 0.0
})
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 {
return 0.0
})
}
// TODO: replace with ts.
mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ts.Metrics().ConnectionCount) })
if mp.log {
@ -89,16 +66,34 @@ func (mp *Monitors) Start(ts tapir.Service, configDir string, doLogging bool) {
}
func (mp *Monitors) run() {
mp.running = true
for {
select {
case <-time.After(time.Minute):
mp.lock.Lock()
mp.report()
mp.lock.Unlock()
case <-mp.breakChannel:
mp.lock.Lock()
mp.running = false
mp.lock.Unlock()
return
}
}
}
func FormatDuration(ts time.Duration) string {
const (
Day = 24 * time.Hour
)
d := ts / Day
ts = ts % Day
h := ts / time.Hour
ts = ts % time.Hour
m := ts / time.Minute
return fmt.Sprintf("%dd%dh%dm", d, h, m)
}
func (mp *Monitors) report() {
f, err := os.Create(path.Join(mp.configDir, reportFile))
if err != nil {
@ -109,18 +104,16 @@ func (mp *Monitors) report() {
w := bufio.NewWriter(f)
fmt.Fprintf(w, "Uptime: %v\n\n", time.Since(mp.starttime))
fmt.Fprintf(w, "Uptime: %v \n", FormatDuration(time.Since(mp.starttime)))
fmt.Fprintf(w, "Total Messages: %v \n\n", mp.messageCountFn())
fmt.Fprintln(w, "messages:")
fmt.Fprintln(w, "Messages:")
mp.Messages.Report(w)
fmt.Fprintln(w, "\nClient Connections:")
mp.ClientConns.Report(w)
fmt.Fprintln(w, "\nCPU:")
mp.CPU.Report(w)
fmt.Fprintln(w, "\nMemory:")
fmt.Fprintln(w, "\nSys Memory:")
mp.Memory.Report(w)
w.Flush()
@ -128,11 +121,15 @@ func (mp *Monitors) report() {
// Stop stops all the monitors in a Monitors
func (mp *Monitors) Stop() {
mp.lock.Lock()
running := mp.running
mp.lock.Unlock()
if running {
if mp.log {
mp.breakChannel <- true
}
mp.Messages.Stop()
mp.CPU.Stop()
mp.Memory.Stop()
mp.ClientConns.Stop()
}
}

View File

@ -15,7 +15,8 @@ func TestMonitors(t *testing.T) {
os.Mkdir("testLog", 0700)
service := new(tor2.BaseOnionService)
mp := Monitors{}
mp.Start(service, "testLog", true)
mp.Start(service, func() int { return 1 }, "testLog", true)
mp.MessageCounter.Add(1)
log.Infof("sleeping for minute to give to for monitors to trigger...")
// wait a minute for it to trigger
time.Sleep(62 * time.Second)

View File

@ -42,11 +42,13 @@ type Server interface {
TofuBundle() string
GetAttribute(string) string
SetAttribute(string, string)
SetMonitorLogging(bool)
}
type server struct {
service tapir.Service
config *Config
service tapir.Service
messageStore storage.MessageStoreInterface
metricsPack metrics.Monitors
tokenTapirService tapir.Service
tokenServer *privacypass.TokenServer
@ -55,7 +57,6 @@ type server struct {
tokenServiceStopped bool
onionServiceStopped bool
running bool
existingMessageCount int
lock sync.RWMutex
}
@ -78,6 +79,21 @@ func (s *server) Identity() primitives.Identity {
return s.config.Identity()
}
// helper fn to pass to metrics
func (s *server) getStorageTotalMessageCount() int {
if s.messageStore != nil {
return s.messageStore.MessagesCount()
}
return 0
}
// helper fn to pass to storage
func (s *server) incMessageCount() {
if s.metricsPack.MessageCounter != nil {
s.metricsPack.MessageCounter.Add(1)
}
}
// Run starts a server with the given privateKey
func (s *server) Run(acn connectivity.ACN) error {
s.lock.Lock()
@ -91,17 +107,17 @@ func (s *server) Run(acn connectivity.ACN) error {
service.Init(acn, s.config.PrivateKey, &identity)
s.service = service
log.Infof("cwtch server running on cwtch:%s\n", s.Onion())
s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
ms, err := storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.metricsPack.MessageCounter)
if s.config.ServerReporting.LogMetricsToFile {
s.metricsPack.Start(service, s.getStorageTotalMessageCount, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
}
var err error
s.messageStore, err = storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.incMessageCount)
if err != nil {
return fmt.Errorf("could not open database: %v", err)
}
// Needed because we only collect metrics on a per-session basis
// TODO fix metrics so they persist across sessions?
s.existingMessageCount = len(ms.FetchMessages())
s.tokenTapirService = new(tor2.BaseOnionService)
s.tokenTapirService.Init(acn, s.tokenServicePrivKey, &s.tokenService)
tokenApplication := new(applications.TokenApplication)
@ -114,7 +130,7 @@ func (s *server) Run(acn connectivity.ACN) error {
s.tokenServiceStopped = true
}()
go func() {
s.service.Listen(NewTokenBoardServer(ms, s.tokenServer))
s.service.Listen(NewTokenBoardServer(s.messageStore, s.tokenServer))
s.onionServiceStopped = true
}()
@ -151,6 +167,7 @@ func (s *server) Stop() {
defer s.lock.Unlock()
if s.running {
s.service.Shutdown()
s.messageStore.Close()
s.tokenTapirService.Shutdown()
log.Infof("Closing Token server Database...")
@ -170,19 +187,15 @@ func (s *server) Destroy() {
// Statistics is an encapsulation of information about the server that an operator might want to know at a glance.
type Statistics struct {
TotalMessages int
TotalConnections int
}
// GetStatistics is a stub method for providing some high level information about
// the server operation to bundling applications (e.g. the UI)
func (s *server) GetStatistics() Statistics {
// TODO Statistics from Metrics is very awkward. Metrics needs an overhaul to make safe
total := s.existingMessageCount
if s.metricsPack.TotalMessageCounter != nil {
total += s.metricsPack.TotalMessageCounter.Count()
}
return Statistics{
TotalMessages: total,
TotalMessages: s.messageStore.MessagesCount(),
TotalConnections: s.service.Metrics().ConnectionCount,
}
}
@ -229,3 +242,14 @@ func (s *server) GetAttribute(key string) string {
func (s *server) SetAttribute(key, val string) {
s.config.SetAttribute(key, val)
}
// SetMonitorLogging turns on or off the monitor logging suite, and logging to a file in the server dir
func (s *server) SetMonitorLogging(do bool) {
s.config.ServerReporting.LogMetricsToFile = do
s.config.Save()
if do {
s.metricsPack.Start(s.service, s.getStorageTotalMessageCount, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
} else {
s.metricsPack.Stop()
}
}

View File

@ -43,8 +43,6 @@ const (
// Reporting is a struct for storing a the config a server needs to be a peer, and connect to a group to report
type Reporting struct {
LogMetricsToFile bool `json:"logMetricsToFile"`
ReportingGroupID string `json:"reportingGroupId"`
ReportingServerAddr string `json:"reportingServerAddr"`
}
// Config is a struct for storing basic server configuration
@ -92,9 +90,7 @@ func initDefaultConfig(configDir, filename string, encrypted bool) *Config {
config.TokenServerPublicKey = tid.PublicKey()
config.MaxBufferLines = 100000
config.ServerReporting = Reporting{
LogMetricsToFile: true,
ReportingGroupID: "",
ReportingServerAddr: "",
LogMetricsToFile: false,
}
config.Attributes[AttrAutostart] = "false"
@ -112,19 +108,20 @@ func initDefaultConfig(configDir, filename string, encrypted bool) *Config {
// LoadCreateDefaultConfigFile loads a Config from or creates a default config and saves it to a json file specified by filename
// if the encrypted flag is true the config is store encrypted by password
func LoadCreateDefaultConfigFile(configDir, filename string, encrypted bool, password string) (*Config, error) {
func LoadCreateDefaultConfigFile(configDir, filename string, encrypted bool, password string, defaultLogToFile bool) (*Config, error) {
if _, err := os.Stat(path.Join(configDir, filename)); os.IsNotExist(err) {
return CreateConfig(configDir, filename, encrypted, password)
return CreateConfig(configDir, filename, encrypted, password, defaultLogToFile)
}
return LoadConfig(configDir, filename, encrypted, password)
}
// CreateConfig creates a default config and saves it to a json file specified by filename
// if the encrypted flag is true the config is store encrypted by password
func CreateConfig(configDir, filename string, encrypted bool, password string) (*Config, error) {
func CreateConfig(configDir, filename string, encrypted bool, password string, defaultLogToFile bool) (*Config, error) {
log.Debugf("CreateConfig for server with configDir: %s\n", configDir)
os.MkdirAll(configDir, 0700)
config := initDefaultConfig(configDir, filename, encrypted)
config.ServerReporting.LogMetricsToFile = defaultLogToFile
if encrypted {
key, _, err := v1.InitV1Directory(configDir, password)
if err != nil {

View File

@ -63,7 +63,6 @@ func (s *servers) LoadServers(password string) ([]string, error) {
}
}
}
log.Infof("LoadServers returning: %s\n", loadedServers)
return loadedServers, nil
}
@ -71,7 +70,7 @@ func (s *servers) LoadServers(password string) ([]string, error) {
func (s *servers) CreateServer(password string) (Server, error) {
newLocalID := model.GenerateRandomID()
directory := path.Join(s.directory, newLocalID)
config, err := CreateConfig(directory, ServerConfigFile, true, password)
config, err := CreateConfig(directory, ServerConfigFile, true, password, false)
if err != nil {
return nil, err
}

View File

@ -5,21 +5,21 @@ import (
"database/sql"
"encoding/base64"
"fmt"
"git.openprivacy.ca/cwtch.im/server/metrics"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mattn/go-sqlite3" // sqlite3 driver
)
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
type MessageStoreInterface interface {
AddMessage(groups.EncryptedGroupMessage)
FetchMessages() []*groups.EncryptedGroupMessage
MessagesCount() int
FetchMessagesFrom(signature []byte) []*groups.EncryptedGroupMessage
Close()
}
// SqliteMessageStore is an sqlite3 backed message store
type SqliteMessageStore struct {
messageCounter metrics.Counter
incMessageCounterFn func()
database *sql.DB
// Some prepared queries...
@ -36,7 +36,9 @@ func (s *SqliteMessageStore) Close() {
// AddMessage implements the MessageStoreInterface AddMessage for sqlite message store
func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) {
s.messageCounter.Add(1)
if s.incMessageCounterFn != nil {
s.incMessageCounterFn()
}
// ignore this clearly invalid message...
if len(message.Signature) == 0 {
return
@ -49,6 +51,29 @@ func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) {
}
}
func (s SqliteMessageStore) MessagesCount() int {
rows, err := s.database.Query("SELECT COUNT(*) from messages")
if err != nil {
log.Errorf("%v", err)
return -1
}
defer rows.Close()
result := rows.Next()
if !result {
return -1
}
var rownum int
err = rows.Scan(&rownum)
if err != nil {
log.Errorf("error fetching rows: %v", err)
return -1
}
return rownum
}
// FetchMessages implements the MessageStoreInterface FetchMessages for sqlite message store
func (s SqliteMessageStore) FetchMessages() []*groups.EncryptedGroupMessage {
rows, err := s.database.Query("SELECT id, signature,ciphertext from messages")
@ -107,7 +132,7 @@ func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGrou
// InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist)
// and returns an open database
func InitializeSqliteMessageStore(dbfile string, messageCounter metrics.Counter) (*SqliteMessageStore, error) {
func InitializeSqliteMessageStore(dbfile string, incMessageCounterFn func()) (*SqliteMessageStore, error) {
db, err := sql.Open("sqlite3", dbfile)
if err != nil {
log.Errorf("database %v cannot be created or opened %v", dbfile, err)
@ -123,7 +148,7 @@ func InitializeSqliteMessageStore(dbfile string, messageCounter metrics.Counter)
log.Infof("Database Initialized")
slms := new(SqliteMessageStore)
slms.database = db
slms.messageCounter = messageCounter
slms.incMessageCounterFn = incMessageCounterFn
sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);`
stmt, err := slms.database.Prepare(sqlStmt)

View File

@ -5,6 +5,7 @@ import (
"encoding/binary"
"git.openprivacy.ca/cwtch.im/server/metrics"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mattn/go-sqlite3" // sqlite3 driver
"os"
"testing"
"time"
@ -15,7 +16,7 @@ func TestMessageStore(t *testing.T) {
os.Remove(filename)
log.SetLevel(log.LevelDebug)
counter := metrics.NewCounter()
db, err := InitializeSqliteMessageStore(filename, counter)
db, err := InitializeSqliteMessageStore(filename, func() { counter.Add(1) })
if err != nil {
t.Fatalf("Error: %v", err)
}