commit 192c97f6f5ef7c01cbd8325df2f2dc9d554eff4f Author: Sarah Jamie Lewis Date: Fri May 7 11:36:34 2021 -0700 Initial Copy over from Cwtch diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2d532c --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +tokens.db +serverConfig.json +coverage.out +tordir +.idea/ +messages \ No newline at end of file diff --git a/app/main.go b/app/main.go new file mode 100644 index 0000000..e77f72d --- /dev/null +++ b/app/main.go @@ -0,0 +1,102 @@ +package main + +import ( + "crypto/rand" + "cwtch.im/cwtch/model" + "encoding/base64" + cwtchserver "git.openprivacy.ca/cwtch.im/server" + "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "git.openprivacy.ca/openprivacy/log" + mrand "math/rand" + "os" + "os/signal" + "syscall" + "time" +) + +const ( + serverConfigFile = "serverConfig.json" +) + +func main() { + log.AddEverythingFromPattern("server/app/main") + log.AddEverythingFromPattern("server/server") + log.ExcludeFromPattern("service.go") + log.SetLevel(log.LevelDebug) + configDir := os.Getenv("CWTCH_CONFIG_DIR") + + if len(os.Args) == 2 && os.Args[1] == "gen1" { + config := new(cwtchserver.Config) + id, pk := primitives.InitializeEphemeralIdentity() + tid, tpk := primitives.InitializeEphemeralIdentity() + config.PrivateKey = pk + config.PublicKey = id.PublicKey() + config.TokenServerPrivateKey = tpk + config.TokenServerPublicKey = tid.PublicKey() + config.MaxBufferLines = 100000 + config.ServerReporting = cwtchserver.Reporting{ + LogMetricsToFile: true, + ReportingGroupID: "", + ReportingServerAddr: "", + } + config.Save(".", "serverConfig.json") + return + } + + serverConfig := cwtchserver.LoadConfig(configDir, serverConfigFile) + + // we don't need real randomness for the port, just to avoid a possible conflict... + mrand.Seed(int64(time.Now().Nanosecond())) + controlPort := mrand.Intn(1000) + 9052 + + // generate a random password + key := make([]byte, 64) + _, err := rand.Read(key) + if err != nil { + panic(err) + } + + os.MkdirAll("tordir/tor", 0700) + tor.NewTorrc().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("./tordir/tor/torrc") + acn, err := tor.NewTorACNWithAuth("tordir", "", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + + if err != nil { + log.Errorf("\nError connecting to Tor: %v\n", err) + os.Exit(1) + } + defer acn.Close() + + server := new(cwtchserver.Server) + log.Infoln("starting cwtch server...") + + server.Setup(serverConfig) + + // TODO create a random group for testing + group, _ := model.NewGroup(tor.GetTorV3Hostname(serverConfig.PublicKey)) + group.SignGroup([]byte{}) + invite, err := group.Invite() + if err != nil { + panic(err) + } + + bundle := server.KeyBundle().Serialize() + log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle)) + + log.Infof("Server Tofu Bundle: tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString(bundle), invite) + + // Graceful Shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + acn.Close() + server.Close() + os.Exit(1) + }() + + server.Run(acn) + for { + time.Sleep(time.Second) + } +} diff --git a/docker/docker-entrypoint b/docker/docker-entrypoint new file mode 100755 index 0000000..1e55c91 --- /dev/null +++ b/docker/docker-entrypoint @@ -0,0 +1,33 @@ +#!/bin/sh +set -o errexit + +chmod_files() { find $2 -type f -exec chmod -v $1 {} \; +} +chmod_dirs() { find $2 -type d -exec chmod -v $1 {} \; +} + +chown ${TOR_USER}:${TOR_USER} /run/tor/ +chmod 770 /run/tor + +chown -Rv ${TOR_USER}:${TOR_USER} /var/lib/tor +chmod_dirs 700 /var/lib/tor +chmod_files 600 /var/lib/tor + +echo -e "\n========================================================" +# Display OS version, Tor version & torrc in log +echo -e "Alpine Version: \c" && cat /etc/alpine-release +tor --version +#cat /etc/tor/torrc +echo -e "========================================================\n" + +tor -f /etc/tor/torrc + +#Cwtch will crash and burn if 9051 isn't ready +sleep 15 + +if [ -z "${CWTCH_CONFIG_DIR}" ]; then + CWTCH_CONFIG_DIR=/etc/cwtch/ +fi + +#Run cwtch (or whatever the user passed) +CWTCH_CONFIG_DIR=$CWTCH_CONFIG_DIR exec "$@" diff --git a/docker/torrc b/docker/torrc new file mode 100644 index 0000000..6277a45 --- /dev/null +++ b/docker/torrc @@ -0,0 +1,27 @@ +User _tor +DataDirectory /var/lib/tor + +ORPort 0 +ExitRelay 0 +IPv6Exit 0 + +#We need this running in the background as the server doesn't launch it itself +RunAsDaemon 1 + +ClientOnly 1 +SocksPort 9050 + +ControlPort 9051 +ControlSocket /run/tor/control +ControlSocketsGroupWritable 1 +CookieAuthentication 1 +CookieAuthFile /run/tor/control.authcookie +CookieAuthFileGroupReadable 1 +#HashedControlPassword 16:B4C8EE980C085EE460AEA9094350DAA9C2B5F841400E9BBA247368400A + +# Run as a relay only (change policy to enable exit node) +ExitPolicy reject *:* # no exits allowed +ExitPolicy reject6 *:* + +# Additional config built by the entrypoint will go here + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..02a8c8e --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module git.openprivacy.ca/cwtch.im/server + +go 1.14 + +require ( + cwtch.im/cwtch v0.7.3 + git.openprivacy.ca/cwtch.im/tapir v0.4.0 + git.openprivacy.ca/openprivacy/connectivity v1.4.3 + git.openprivacy.ca/openprivacy/log v1.0.2 + github.com/gtank/ristretto255 v0.1.2 + github.com/struCoder/pidusage v0.1.3 + golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4d5f4db --- /dev/null +++ b/go.sum @@ -0,0 +1,60 @@ +cwtch.im/cwtch v0.7.3 h1:3f2Q2XFgNtq8hVbEz7K++exS1gDsO+zmSO7X7Fqz8Mo= +cwtch.im/cwtch v0.7.3/go.mod h1:S0JRLSTwFM+kRrpOPKgUQn/loKe/fc6lLDnOFopRKq8= +git.openprivacy.ca/cwtch.im/tapir v0.4.0 h1:clG8uORt0NKEhT4P+Dpw1pzyUuYzYBMevGqn2pciKk8= +git.openprivacy.ca/cwtch.im/tapir v0.4.0/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E= +git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= +git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= +git.openprivacy.ca/openprivacy/connectivity v1.4.3 h1:i2Ad/U9FlL9dKr2bhRck7lJ8NoWyGtoEfUwoCyMT0fU= +git.openprivacy.ca/openprivacy/connectivity v1.4.3/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo= +git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= +git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM= +git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gtank/merlin v0.1.1 h1:eQ90iG7K9pOhtereWsmyRJ6RAwcP4tHTDBHXNg+u5is= +github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= +github.com/gtank/ristretto255 v0.1.2 h1:JEqUCPA1NvLq5DwYtuzigd7ss8fwbYay9fi4/5uMzcc= +github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 h1:hLDRPB66XQT/8+wG9WsDpiCvZf1yKO7sz7scAjSlBa0= +github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ= +github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= +go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= +go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee h1:4yd7jl+vXjalO5ztz6Vc1VADv+S/80LGJmyl1ROJ2AI= +golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..9d89190 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,249 @@ +package metrics + +import ( + "bufio" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" +) + +type counter struct { + startTime time.Time + count uint64 + total uint64 +} + +// Counter providers a threadsafe counter to use for storing long running counts +type Counter interface { + Add(unit int) + Reset() + + Count() int + GetStarttime() time.Time +} + +// NewCounter initializes a counter starting at time.Now() and a count of 0 and returns it +func NewCounter() Counter { + c := &counter{startTime: time.Now(), count: 0, total: 0} + return c +} + +// Add add a count of unit to the counter +func (c *counter) Add(unit int) { + atomic.AddUint64(&c.count, uint64(unit)) +} + +// Count returns the count since Start +func (c *counter) Count() int { + return int(atomic.LoadUint64(&c.count)) +} + +func (c *counter) Reset() { + atomic.StoreUint64(&c.count, 0) + c.startTime = time.Now() +} + +// GetStarttime returns the starttime of the counter +func (c *counter) GetStarttime() time.Time { + return c.startTime +} + +// MonitorType controls how the monitor will report itself +type MonitorType int + +const ( + // Count indicates the monitor should report in interger format + Count MonitorType = iota + // Percent indicates the monitor should report in decimal format with 2 places + Percent + // MegaBytes indicates the monitor should transform the raw number into MBs + MegaBytes +) + +// MonitorAccumulation controls how monitor data is accumulated over time into larger summary buckets +type MonitorAccumulation int + +const ( + // Cumulative values with sum over time + Cumulative MonitorAccumulation = iota + // Average values will average over time + Average +) + +type monitorHistory struct { + monitorType MonitorType + monitorAccumulation MonitorAccumulation + + starttime time.Time + perMinutePerHour [60]float64 + timeLastHourRotate time.Time + perHourForDay [24]float64 + timeLastDayRotate time.Time + perDayForWeek [7]float64 + timeLastWeekRotate time.Time + perWeekForMonth [4]float64 + timeLastMonthRotate time.Time + perMonthForYear [12]float64 + + monitor func() float64 + + breakChannel chan bool + lock sync.Mutex +} + +// MonitorHistory runs a monitor every minute and rotates and averages the results out across time +type MonitorHistory interface { + Start() + Stop() + + Minutes() []float64 + Hours() []float64 + Days() []float64 + Weeks() []float64 + Months() []float64 + + Report(w *bufio.Writer) +} + +// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor +func NewMonitorHistory(t MonitorType, a MonitorAccumulation, monitor func() float64) MonitorHistory { + mh := &monitorHistory{monitorType: t, monitorAccumulation: a, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool), + timeLastHourRotate: time.Now(), timeLastDayRotate: time.Now(), timeLastWeekRotate: time.Now(), + timeLastMonthRotate: time.Now()} + mh.Start() + return mh +} + +// Start starts a monitorHistory go rountine to run the monitor at intervals and rotate history +func (mh *monitorHistory) Start() { + go mh.monitorThread() +} + +// Stop stops a monitorHistory go routine +func (mh *monitorHistory) Stop() { + mh.breakChannel <- true +} + +// Minutes returns the last 60 minute monitoring results +func (mh *monitorHistory) Minutes() []float64 { + return mh.returnCopy(mh.perMinutePerHour[:]) +} + +// Hours returns the last 24 hourly averages of monitor results +func (mh *monitorHistory) Hours() []float64 { + return mh.returnCopy(mh.perHourForDay[:]) +} + +// Days returns the last 7 day averages of monitor results +func (mh *monitorHistory) Days() []float64 { + return mh.returnCopy(mh.perDayForWeek[:]) +} + +// Weeks returns the last 4 weeks of averages of monitor results +func (mh *monitorHistory) Weeks() []float64 { + return mh.returnCopy(mh.perWeekForMonth[:]) +} + +// Months returns the last 12 months of averages of monitor results +func (mh *monitorHistory) Months() []float64 { + return mh.returnCopy(mh.perMonthForYear[:]) +} + +func (mh *monitorHistory) Report(w *bufio.Writer) { + mh.lock.Lock() + fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:])) + fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:])) + fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:])) + fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:])) + fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:])) + mh.lock.Unlock() +} + +func reportLine(t MonitorType, array []float64) string { + switch t { + case Count: + return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.0f", array)), " "), "[]") + case Percent: + return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.2f", array)), " "), "[]") + case MegaBytes: + mbs := make([]int, len(array)) + for i, b := range array { + mbs[i] = int(b) / 1024 / 1024 + } + return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%d", mbs)), "MBs "), "[]") + "MBs" + } + return "" +} + +func (mh *monitorHistory) returnCopy(slice []float64) []float64 { + retSlice := make([]float64, len(slice)) + mh.lock.Lock() + for i, v := range slice { + retSlice[i] = v + } + mh.lock.Unlock() + return retSlice +} + +func rotateAndAccumulate(array []float64, newVal float64, acc MonitorAccumulation) float64 { + total := float64(0.0) + for i := len(array) - 1; i > 0; i-- { + array[i] = array[i-1] + total += array[i] + } + array[0] = newVal + total += newVal + if acc == Cumulative { + return total + } + return total / float64(len(array)) +} +func accumulate(array []float64, acc MonitorAccumulation) float64 { + total := float64(0) + for _, x := range array { + total += x + } + if acc == Cumulative { + return total + } + return total / float64(len(array)) +} + +// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation +func (mh *monitorHistory) monitorThread() { + for { + select { + case <-time.After(time.Minute): + mh.lock.Lock() + + minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation) + + if time.Now().Sub(mh.timeLastHourRotate) > time.Hour { + rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation) + mh.timeLastHourRotate = time.Now() + } + + if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 { + rotateAndAccumulate(mh.perDayForWeek[:], accumulate(mh.perHourForDay[:], mh.monitorAccumulation), mh.monitorAccumulation) + mh.timeLastDayRotate = time.Now() + } + + if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 { + rotateAndAccumulate(mh.perWeekForMonth[:], accumulate(mh.perDayForWeek[:], mh.monitorAccumulation), mh.monitorAccumulation) + mh.timeLastWeekRotate = time.Now() + } + + if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 { + rotateAndAccumulate(mh.perMonthForYear[:], accumulate(mh.perWeekForMonth[:], mh.monitorAccumulation), mh.monitorAccumulation) + mh.timeLastMonthRotate = time.Now() + } + + mh.lock.Unlock() + + case <-mh.breakChannel: + return + } + } +} diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go new file mode 100644 index 0000000..c17e4d0 --- /dev/null +++ b/metrics/metrics_test.go @@ -0,0 +1,37 @@ +package metrics + +import ( + "testing" + "time" +) + +func TestCounter(t *testing.T) { + starttime := time.Now() + c := NewCounter() + + max := 100 + done := make(chan bool, max) + + // slightly stress test atomic nature of metric by flooding with threads Add()ing + for i := 0; i < max; i++ { + go func() { + c.Add(1) + done <- true + }() + } + + for i := 0; i < max; i++ { + <-done + } + + val := c.Count() + if val != 100 { + t.Errorf("counter count was not 100") + } + + counterStart := c.GetStarttime() + + if counterStart.Sub(starttime) > time.Millisecond { + t.Errorf("counter's starttime was innaccurate %v", counterStart.Sub(starttime)) + } +} diff --git a/metrics/monitors.go b/metrics/monitors.go new file mode 100644 index 0000000..c0da8a2 --- /dev/null +++ b/metrics/monitors.go @@ -0,0 +1,119 @@ +package metrics + +import ( + "bufio" + "fmt" + "git.openprivacy.ca/cwtch.im/tapir" + "git.openprivacy.ca/openprivacy/log" + "github.com/struCoder/pidusage" + "os" + "path" + "sync" + "time" +) + +const ( + reportFile = "serverMonitorReport.txt" +) + +// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns +type Monitors struct { + MessageCounter Counter + TotalMessageCounter Counter + Messages MonitorHistory + CPU MonitorHistory + Memory MonitorHistory + ClientConns MonitorHistory + starttime time.Time + breakChannel chan bool + log bool + configDir string +} + +// Start initializes a Monitors's monitors +func (mp *Monitors) Start(ts tapir.Service, configDir string, log bool) { + mp.log = log + mp.configDir = configDir + mp.starttime = time.Now() + mp.breakChannel = make(chan bool) + mp.MessageCounter = NewCounter() + + // Maintain a count of total messages + mp.TotalMessageCounter = NewCounter() + mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) { + c = float64(mp.MessageCounter.Count()) + mp.TotalMessageCounter.Add(int(c)) + mp.MessageCounter.Reset() + return + }) + + var pidUsageLock sync.Mutex + mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { + pidUsageLock.Lock() + defer pidUsageLock.Unlock() + sysInfo, _ := pidusage.GetStat(os.Getpid()) + return float64(sysInfo.CPU) + }) + mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { + pidUsageLock.Lock() + defer pidUsageLock.Unlock() + sysInfo, _ := pidusage.GetStat(os.Getpid()) + return float64(sysInfo.Memory) + }) + + // TODO: replace with ts. + mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ts.Metrics().ConnectionCount) }) + + if mp.log { + go mp.run() + } +} + +func (mp *Monitors) run() { + for { + select { + case <-time.After(time.Minute): + mp.report() + case <-mp.breakChannel: + return + } + } +} + +func (mp *Monitors) report() { + f, err := os.Create(path.Join(mp.configDir, reportFile)) + if err != nil { + log.Errorf("Could not open monitor reporting file: %v", err) + return + } + defer f.Close() + + w := bufio.NewWriter(f) + + fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime)) + + fmt.Fprintln(w, "messages:") + mp.Messages.Report(w) + + fmt.Fprintln(w, "\nClient Connections:") + mp.ClientConns.Report(w) + + fmt.Fprintln(w, "\nCPU:") + mp.CPU.Report(w) + + fmt.Fprintln(w, "\nMemory:") + mp.Memory.Report(w) + + w.Flush() +} + +// Stop stops all the monitors in a Monitors +func (mp *Monitors) Stop() { + if mp.log { + mp.breakChannel <- true + } + mp.Messages.Stop() + mp.CPU.Stop() + mp.Memory.Stop() + mp.ClientConns.Stop() +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..4a6c031 --- /dev/null +++ b/server.go @@ -0,0 +1,161 @@ +package server + +import ( + "crypto/ed25519" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/server/metrics" + "cwtch.im/cwtch/server/storage" + "fmt" + "git.openprivacy.ca/cwtch.im/tapir" + "git.openprivacy.ca/cwtch.im/tapir/applications" + tor2 "git.openprivacy.ca/cwtch.im/tapir/networks/tor" + "git.openprivacy.ca/cwtch.im/tapir/persistence" + "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" + "git.openprivacy.ca/openprivacy/connectivity" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "git.openprivacy.ca/openprivacy/log" + "path" + "sync" +) + +// Server encapsulates a complete, compliant Cwtch server. +type Server struct { + service tapir.Service + config Config + metricsPack metrics.Monitors + tokenTapirService tapir.Service + tokenServer *privacypass.TokenServer + tokenService primitives.Identity + tokenServicePrivKey ed25519.PrivateKey + tokenServiceStopped bool + onionServiceStopped bool + running bool + existingMessageCount int + lock sync.RWMutex +} + +// Setup initialized a server from a given configuration +func (s *Server) Setup(serverConfig Config) { + s.config = serverConfig + bs := new(persistence.BoltPersistence) + bs.Open(path.Join(serverConfig.ConfigDir, "tokens.db")) + s.tokenServer = privacypass.NewTokenServerFromStore(&serverConfig.TokenServiceK, bs) + log.Infof("Y: %v", s.tokenServer.Y) + s.tokenService = s.config.TokenServiceIdentity() + s.tokenServicePrivKey = s.config.TokenServerPrivateKey +} + +// Identity returns the main onion identity of the server +func (s *Server) Identity() primitives.Identity { + return s.config.Identity() +} + +// Run starts a server with the given privateKey +func (s *Server) Run(acn connectivity.ACN) error { + addressIdentity := tor.GetTorV3Hostname(s.config.PublicKey) + identity := primitives.InitializeIdentity("", &s.config.PrivateKey, &s.config.PublicKey) + var service tapir.Service + service = new(tor2.BaseOnionService) + service.Init(acn, s.config.PrivateKey, &identity) + s.service = service + log.Infof("cwtch server running on cwtch:%s\n", addressIdentity+".onion:") + s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile) + + ms := new(storage.MessageStore) + err := ms.Init(s.config.ConfigDir, s.config.MaxBufferLines, s.metricsPack.MessageCounter) + if err != nil { + return err + } + + // Needed because we only collect metrics on a per-session basis + // TODO fix metrics so they persist across sessions? + s.existingMessageCount = len(ms.FetchMessages()) + + s.tokenTapirService = new(tor2.BaseOnionService) + s.tokenTapirService.Init(acn, s.tokenServicePrivKey, &s.tokenService) + tokenApplication := new(applications.TokenApplication) + tokenApplication.TokenService = s.tokenServer + powTokenApp := new(applications.ApplicationChain). + ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability). + ChainApplication(tokenApplication, applications.HasTokensCapability) + go func() { + s.tokenTapirService.Listen(powTokenApp) + s.tokenServiceStopped = true + }() + go func() { + s.service.Listen(NewTokenBoardServer(ms, s.tokenServer)) + s.onionServiceStopped = true + }() + + s.lock.Lock() + s.running = true + s.lock.Unlock() + return nil +} + +// KeyBundle provides the signed keybundle of the server +func (s *Server) KeyBundle() *model.KeyBundle { + kb := model.NewKeyBundle() + identity := s.config.Identity() + kb.Keys[model.KeyTypeServerOnion] = model.Key(identity.Hostname()) + kb.Keys[model.KeyTypeTokenOnion] = model.Key(s.tokenService.Hostname()) + kb.Keys[model.KeyTypePrivacyPass] = model.Key(s.tokenServer.Y.String()) + kb.Sign(identity) + return kb +} + +// CheckStatus returns true if the server is running and/or an error if any part of the server needs to be restarted. +func (s *Server) CheckStatus() (bool, error) { + s.lock.RLock() + defer s.lock.RUnlock() + if s.onionServiceStopped == true || s.tokenServiceStopped == true { + return s.running, fmt.Errorf("one of more server components are down: onion:%v token service: %v", s.onionServiceStopped, s.tokenServiceStopped) + } + return s.running, nil +} + +// Shutdown kills the app closing all connections and freeing all goroutines +func (s *Server) Shutdown() { + s.lock.Lock() + defer s.lock.Unlock() + s.service.Shutdown() + s.tokenTapirService.Shutdown() + s.metricsPack.Stop() + s.running = true + +} + +// Statistics is an encapsulation of information about the server that an operator might want to know at a glance. +type Statistics struct { + TotalMessages int +} + +// GetStatistics is a stub method for providing some high level information about +// the server operation to bundling applications (e.g. the UI) +func (s *Server) GetStatistics() Statistics { + // TODO Statistics from Metrics is very awkward. Metrics needs an overhaul to make safe + total := s.existingMessageCount + if s.metricsPack.TotalMessageCounter != nil { + total += s.metricsPack.TotalMessageCounter.Count() + } + + return Statistics{ + TotalMessages: total, + } +} + +// ConfigureAutostart sets whether this server should autostart (in the Cwtch UI/bundling application) +func (s *Server) ConfigureAutostart(autostart bool) { + s.config.AutoStart = autostart + s.config.Save(s.config.ConfigDir, s.config.FilePath) +} + +// Close shuts down the cwtch server in a safe way. +func (s *Server) Close() { + log.Infof("Shutting down server") + s.lock.Lock() + defer s.lock.Unlock() + log.Infof("Closing Token Server Database...") + s.tokenServer.Close() +} diff --git a/serverConfig.go b/serverConfig.go new file mode 100644 index 0000000..b305db3 --- /dev/null +++ b/serverConfig.go @@ -0,0 +1,98 @@ +package server + +import ( + "crypto/rand" + "encoding/json" + "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/openprivacy/log" + "github.com/gtank/ristretto255" + "golang.org/x/crypto/ed25519" + "io/ioutil" + "path" +) + +// Reporting is a struct for storing a the config a server needs to be a peer, and connect to a group to report +type Reporting struct { + LogMetricsToFile bool `json:"logMetricsToFile"` + ReportingGroupID string `json:"reportingGroupId"` + ReportingServerAddr string `json:"reportingServerAddr"` +} + +// Config is a struct for storing basic server configuration +type Config struct { + ConfigDir string `json:"-"` + FilePath string `json:"-"` + MaxBufferLines int `json:"maxBufferLines"` + + PublicKey ed25519.PublicKey `json:"publicKey"` + PrivateKey ed25519.PrivateKey `json:"privateKey"` + + TokenServerPublicKey ed25519.PublicKey `json:"tokenServerPublicKey"` + TokenServerPrivateKey ed25519.PrivateKey `json:"tokenServerPrivateKey"` + + TokenServiceK ristretto255.Scalar `json:"tokenServiceK"` + + ServerReporting Reporting `json:"serverReporting"` + AutoStart bool `json:"autostart"` +} + +// Identity returns an encapsulation of the servers keys +func (config *Config) Identity() primitives.Identity { + return primitives.InitializeIdentity("", &config.PrivateKey, &config.PublicKey) +} + +// TokenServiceIdentity returns an encapsulation of the servers token server (experimental) +func (config *Config) TokenServiceIdentity() primitives.Identity { + return primitives.InitializeIdentity("", &config.TokenServerPrivateKey, &config.TokenServerPublicKey) +} + +// Save dumps the latest version of the config to a json file given by filename +func (config *Config) Save(dir, filename string) { + log.Infof("Saving config to %s\n", path.Join(dir, filename)) + bytes, _ := json.MarshalIndent(config, "", "\t") + ioutil.WriteFile(path.Join(dir, filename), bytes, 0600) +} + +// LoadConfig loads a Config from a json file specified by filename +func LoadConfig(configDir, filename string) Config { + log.Infof("Loading config from %s\n", path.Join(configDir, filename)) + config := Config{} + + id, pk := primitives.InitializeEphemeralIdentity() + tid, tpk := primitives.InitializeEphemeralIdentity() + config.PrivateKey = pk + config.PublicKey = id.PublicKey() + config.TokenServerPrivateKey = tpk + config.TokenServerPublicKey = tid.PublicKey() + config.MaxBufferLines = 100000 + config.ServerReporting = Reporting{ + LogMetricsToFile: true, + ReportingGroupID: "", + ReportingServerAddr: "", + } + config.AutoStart = false + config.ConfigDir = configDir + config.FilePath = filename + + k := new(ristretto255.Scalar) + b := make([]byte, 64) + _, err := rand.Read(b) + if err != nil { + // unable to generate secure random numbers + panic("unable to generate secure random numbers") + } + k.FromUniformBytes(b) + config.TokenServiceK = *k + + raw, err := ioutil.ReadFile(path.Join(configDir, filename)) + if err == nil { + err = json.Unmarshal(raw, &config) + + if err != nil { + log.Errorf("reading config: %v", err) + } + } + // Always save (first time generation, new version with new variables populated) + config.Save(configDir, filename) + return config +} diff --git a/server_tokenboard.go b/server_tokenboard.go new file mode 100644 index 0000000..1b14272 --- /dev/null +++ b/server_tokenboard.go @@ -0,0 +1,119 @@ +package server + +import ( + "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/server/storage" + "encoding/json" + "git.openprivacy.ca/cwtch.im/tapir" + "git.openprivacy.ca/cwtch.im/tapir/applications" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" + "git.openprivacy.ca/openprivacy/log" +) + +// NewTokenBoardServer generates new Server for Token Board +func NewTokenBoardServer(store storage.MessageStoreInterface, tokenService *privacypass.TokenServer) tapir.Application { + tba := new(TokenboardServer) + tba.TokenService = tokenService + tba.LegacyMessageStore = store + return tba +} + +// TokenboardServer defines the token board server +type TokenboardServer struct { + applications.AuthApp + connection tapir.Connection + TokenService *privacypass.TokenServer + LegacyMessageStore storage.MessageStoreInterface +} + +// NewInstance creates a new TokenBoardApp +func (ta *TokenboardServer) NewInstance() tapir.Application { + tba := new(TokenboardServer) + tba.TokenService = ta.TokenService + tba.LegacyMessageStore = ta.LegacyMessageStore + return tba +} + +// Init initializes the cryptographic TokenBoardApp +func (ta *TokenboardServer) Init(connection tapir.Connection) { + ta.AuthApp.Init(connection) + if connection.HasCapability(applications.AuthCapability) { + ta.connection = connection + go ta.Listen() + } else { + connection.Close() + } +} + +// Listen processes the messages for this application +func (ta *TokenboardServer) Listen() { + for { + data := ta.connection.Expect() + if len(data) == 0 { + log.Debugf("Server Closing Connection") + ta.connection.Close() + return // connection is closed + } + + var message groups.Message + if err := json.Unmarshal(data, &message); err != nil { + log.Debugf("Server Closing Connection Because of Malformed Client Packet %v", err) + ta.connection.Close() + return // connection is closed + } + + switch message.MessageType { + case groups.PostRequestMessage: + if message.PostRequest != nil { + postrequest := *message.PostRequest + log.Debugf("Received a Post Message Request: %v", ta.connection.Hostname()) + ta.postMessageRequest(postrequest) + } else { + log.Debugf("Server Closing Connection Because of PostRequestMessage Client Packet") + ta.connection.Close() + return // connection is closed + } + case groups.ReplayRequestMessage: + if message.ReplayRequest != nil { + log.Debugf("Received Replay Request %v", message.ReplayRequest) + messages := ta.LegacyMessageStore.FetchMessages() + response, _ := json.Marshal(groups.Message{MessageType: groups.ReplayResultMessage, ReplayResult: &groups.ReplayResult{NumMessages: len(messages)}}) + log.Debugf("Sending Replay Response %v", groups.ReplayResult{NumMessages: len(messages)}) + ta.connection.Send(response) + for _, message := range messages { + data, _ = json.Marshal(message) + ta.connection.Send(data) + } + log.Debugf("Finished Requested Sync") + // Set sync and then send any new messages that might have happened while we were syncing + ta.connection.SetCapability(groups.CwtchServerSyncedCapability) + newMessages := ta.LegacyMessageStore.FetchMessages() + if len(newMessages) > len(messages) { + for _, message := range newMessages[len(messages):] { + data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: *message}}) + ta.connection.Send(data) + } + } + } else { + log.Debugf("Server Closing Connection Because of Malformed ReplayRequestMessage Packet") + ta.connection.Close() + return // connection is closed + } + } + } +} + +func (ta *TokenboardServer) postMessageRequest(pr groups.PostRequest) { + if err := ta.TokenService.SpendToken(pr.Token, append(pr.EGM.ToBytes(), ta.connection.ID().Hostname()...)); err == nil { + log.Debugf("Token is valid") + ta.LegacyMessageStore.AddMessage(pr.EGM) + data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: true}}) + ta.connection.Send(data) + data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: pr.EGM}}) + ta.connection.Broadcast(data, groups.CwtchServerSyncedCapability) + } else { + log.Debugf("Attempt to spend an invalid token: %v", err) + data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: false}}) + ta.connection.Send(data) + } +} diff --git a/storage/message_store.go b/storage/message_store.go new file mode 100644 index 0000000..2fb1c93 --- /dev/null +++ b/storage/message_store.go @@ -0,0 +1,152 @@ +package storage + +import ( + "bufio" + "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/server/metrics" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/log" + "os" + "path" + "sync" +) + +const ( + fileStorePartitions = 10 + fileStoreFilename = "cwtch.messages" + directory = "messages" +) + +// MessageStoreInterface defines an interface to interact with a store of cwtch messages. +type MessageStoreInterface interface { + AddMessage(groups.EncryptedGroupMessage) + FetchMessages() []*groups.EncryptedGroupMessage +} + +// MessageStore is a file-backed implementation of MessageStoreInterface +type MessageStore struct { + activeLogFile *os.File + filePos int + storeDirectory string + lock sync.Mutex + messages []*groups.EncryptedGroupMessage + messageCounter metrics.Counter + maxBufferLines int + bufferPos int + bufferRotated bool +} + +// Close closes the message store and underlying resources. +func (ms *MessageStore) Close() { + ms.lock.Lock() + ms.messages = nil + ms.activeLogFile.Close() + ms.lock.Unlock() +} + +func (ms *MessageStore) updateBuffer(gm *groups.EncryptedGroupMessage) { + ms.messages[ms.bufferPos] = gm + ms.bufferPos++ + if ms.bufferPos == ms.maxBufferLines { + ms.bufferPos = 0 + ms.bufferRotated = true + } +} + +func (ms *MessageStore) initAndLoadFiles() error { + ms.activeLogFile = nil + for i := fileStorePartitions - 1; i >= 0; i-- { + ms.filePos = 0 + filename := path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)) + f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) + if err != nil { + log.Errorf("MessageStore could not open: %v: %v", filename, err) + continue + } + ms.activeLogFile = f + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + gms := scanner.Text() + ms.filePos++ + gm := &groups.EncryptedGroupMessage{} + err := json.Unmarshal([]byte(gms), gm) + if err == nil { + ms.updateBuffer(gm) + } + } + } + if ms.activeLogFile == nil { + return fmt.Errorf("Could not create log file to write to in %s", ms.storeDirectory) + } + return nil +} + +func (ms *MessageStore) updateFile(gm *groups.EncryptedGroupMessage) { + s, err := json.Marshal(gm) + if err != nil { + log.Errorf("Failed to unmarshal group message %v\n", err) + } + fmt.Fprintf(ms.activeLogFile, "%s\n", s) + ms.filePos++ + if ms.filePos >= ms.maxBufferLines/fileStorePartitions { + ms.rotateFileStore() + } +} + +func (ms *MessageStore) rotateFileStore() { + ms.activeLogFile.Close() + os.Remove(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, fileStorePartitions-1))) + + for i := fileStorePartitions - 2; i >= 0; i-- { + os.Rename(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)), path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i+1))) + } + + f, err := os.OpenFile(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, 0)), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) + if err != nil { + log.Errorf("Could not open new message store file in: %s", ms.storeDirectory) + } + ms.filePos = 0 + ms.activeLogFile = f +} + +// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename +func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error { + ms.storeDirectory = path.Join(appDirectory, directory) + os.Mkdir(ms.storeDirectory, 0700) + + ms.bufferPos = 0 + ms.maxBufferLines = maxBufferLines + ms.messages = make([]*groups.EncryptedGroupMessage, maxBufferLines) + ms.bufferRotated = false + ms.messageCounter = messageCounter + + err := ms.initAndLoadFiles() + return err +} + +// FetchMessages returns all messages from the backing file. +func (ms *MessageStore) FetchMessages() (messages []*groups.EncryptedGroupMessage) { + ms.lock.Lock() + if !ms.bufferRotated { + messages = make([]*groups.EncryptedGroupMessage, ms.bufferPos) + copy(messages, ms.messages[0:ms.bufferPos]) + } else { + messages = make([]*groups.EncryptedGroupMessage, ms.maxBufferLines) + copy(messages, ms.messages[ms.bufferPos:ms.maxBufferLines]) + copy(messages[ms.bufferPos:], ms.messages[0:ms.bufferPos]) + } + ms.lock.Unlock() + return +} + +// AddMessage adds a GroupMessage to the store +func (ms *MessageStore) AddMessage(gm groups.EncryptedGroupMessage) { + ms.messageCounter.Add(1) + ms.lock.Lock() + ms.updateBuffer(&gm) + ms.updateFile(&gm) + + ms.lock.Unlock() +} diff --git a/storage/message_store_test.go b/storage/message_store_test.go new file mode 100644 index 0000000..d6ce82e --- /dev/null +++ b/storage/message_store_test.go @@ -0,0 +1,54 @@ +package storage + +import ( + "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/server/metrics" + "os" + "strconv" + "testing" +) + +func TestMessageStore(t *testing.T) { + os.Remove("ms.test") + ms := new(MessageStore) + counter := metrics.NewCounter() + ms.Init("./", 1000, counter) + for i := 0; i < 499; i++ { + gm := groups.EncryptedGroupMessage{ + Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), + } + ms.AddMessage(gm) + } + if counter.Count() != 499 { + t.Errorf("Counter should be at 499 was %v", counter.Count()) + } + ms.Close() + ms.Init("./", 1000, counter) + m := ms.FetchMessages() + if len(m) != 499 { + t.Errorf("Should have been 499 was %v", len(m)) + } + + counter.Reset() + + for i := 0; i < 1000; i++ { + gm := groups.EncryptedGroupMessage{ + Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), + } + ms.AddMessage(gm) + } + + m = ms.FetchMessages() + if len(m) != 1000 { + t.Errorf("Should have been 1000 was %v", len(m)) + } + ms.Close() + ms.Init("./", 1000, counter) + m = ms.FetchMessages() + if len(m) != 999 { + t.Errorf("Should have been 999 was %v", len(m)) + } + ms.Close() + + os.RemoveAll("./messages") +} diff --git a/testing/quality.sh b/testing/quality.sh new file mode 100755 index 0000000..c913c92 --- /dev/null +++ b/testing/quality.sh @@ -0,0 +1,24 @@ +#!/bin/sh + +echo "Checking code quality (you want to see no output here)" +echo "" + +echo "Vetting:" +go list ./... | xargs go vet + +echo "" +echo "Linting:" + +go list ./... | xargs golint + + +echo "Time to format" +gofmt -l -s -w . + +# ineffassign (https://github.com/gordonklaus/ineffassign) +echo "Checking for ineffectual assignment of errors (unchecked errors...)" +ineffassign . + +# misspell (https://github.com/client9/misspell/cmd/misspell) +echo "Checking for misspelled words..." +misspell . | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea" diff --git a/testing/tests.sh b/testing/tests.sh new file mode 100755 index 0000000..962e34e --- /dev/null +++ b/testing/tests.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e +pwd +GORACE="haltonerror=1" +go test -race ${1} -coverprofile=server.metrics.cover.out -v ./metrics +go test -race ${1} -coverprofile=server.metrics.cover.out -v ./storage +echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \ +awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out +rm -rf *.cover.out