commit
192c97f6f5
16 changed files with 1264 additions and 0 deletions
@ -0,0 +1,6 @@ |
|||
tokens.db |
|||
serverConfig.json |
|||
coverage.out |
|||
tordir |
|||
.idea/ |
|||
messages |
@ -0,0 +1,102 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"crypto/rand" |
|||
"cwtch.im/cwtch/model" |
|||
"encoding/base64" |
|||
cwtchserver "git.openprivacy.ca/cwtch.im/server" |
|||
"git.openprivacy.ca/cwtch.im/tapir/primitives" |
|||
"git.openprivacy.ca/openprivacy/connectivity/tor" |
|||
"git.openprivacy.ca/openprivacy/log" |
|||
mrand "math/rand" |
|||
"os" |
|||
"os/signal" |
|||
"syscall" |
|||
"time" |
|||
) |
|||
|
|||
const ( |
|||
serverConfigFile = "serverConfig.json" |
|||
) |
|||
|
|||
func main() { |
|||
log.AddEverythingFromPattern("server/app/main") |
|||
log.AddEverythingFromPattern("server/server") |
|||
log.ExcludeFromPattern("service.go") |
|||
log.SetLevel(log.LevelDebug) |
|||
configDir := os.Getenv("CWTCH_CONFIG_DIR") |
|||
|
|||
if len(os.Args) == 2 && os.Args[1] == "gen1" { |
|||
config := new(cwtchserver.Config) |
|||
id, pk := primitives.InitializeEphemeralIdentity() |
|||
tid, tpk := primitives.InitializeEphemeralIdentity() |
|||
config.PrivateKey = pk |
|||
config.PublicKey = id.PublicKey() |
|||
config.TokenServerPrivateKey = tpk |
|||
config.TokenServerPublicKey = tid.PublicKey() |
|||
config.MaxBufferLines = 100000 |
|||
config.ServerReporting = cwtchserver.Reporting{ |
|||
LogMetricsToFile: true, |
|||
ReportingGroupID: "", |
|||
ReportingServerAddr: "", |
|||
} |
|||
config.Save(".", "serverConfig.json") |
|||
return |
|||
} |
|||
|
|||
serverConfig := cwtchserver.LoadConfig(configDir, serverConfigFile) |
|||
|
|||
// we don't need real randomness for the port, just to avoid a possible conflict...
|
|||
mrand.Seed(int64(time.Now().Nanosecond())) |
|||
controlPort := mrand.Intn(1000) + 9052 |
|||
|
|||
// generate a random password
|
|||
key := make([]byte, 64) |
|||
_, err := rand.Read(key) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
os.MkdirAll("tordir/tor", 0700) |
|||
tor.NewTorrc().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("./tordir/tor/torrc") |
|||
acn, err := tor.NewTorACNWithAuth("tordir", "", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) |
|||
|
|||
if err != nil { |
|||
log.Errorf("\nError connecting to Tor: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
defer acn.Close() |
|||
|
|||
server := new(cwtchserver.Server) |
|||
log.Infoln("starting cwtch server...") |
|||
|
|||
server.Setup(serverConfig) |
|||
|
|||
// TODO create a random group for testing
|
|||
group, _ := model.NewGroup(tor.GetTorV3Hostname(serverConfig.PublicKey)) |
|||
group.SignGroup([]byte{}) |
|||
invite, err := group.Invite() |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
bundle := server.KeyBundle().Serialize() |
|||
log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle)) |
|||
|
|||
log.Infof("Server Tofu Bundle: tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString(bundle), invite) |
|||
|
|||
// Graceful Shutdown
|
|||
c := make(chan os.Signal, 1) |
|||
signal.Notify(c, os.Interrupt, syscall.SIGTERM) |
|||
go func() { |
|||
<-c |
|||
acn.Close() |
|||
server.Close() |
|||
os.Exit(1) |
|||
}() |
|||
|
|||
server.Run(acn) |
|||
for { |
|||
time.Sleep(time.Second) |
|||
} |
|||
} |
@ -0,0 +1,33 @@ |
|||
#!/bin/sh |
|||
set -o errexit |
|||
|
|||
chmod_files() { find $2 -type f -exec chmod -v $1 {} \; |
|||
} |
|||
chmod_dirs() { find $2 -type d -exec chmod -v $1 {} \; |
|||
} |
|||
|
|||
chown ${TOR_USER}:${TOR_USER} /run/tor/ |
|||
chmod 770 /run/tor |
|||
|
|||
chown -Rv ${TOR_USER}:${TOR_USER} /var/lib/tor |
|||
chmod_dirs 700 /var/lib/tor |
|||
chmod_files 600 /var/lib/tor |
|||
|
|||
echo -e "\n========================================================" |
|||
# Display OS version, Tor version & torrc in log |
|||
echo -e "Alpine Version: \c" && cat /etc/alpine-release |
|||
tor --version |
|||
#cat /etc/tor/torrc |
|||
echo -e "========================================================\n" |
|||
|
|||
tor -f /etc/tor/torrc |
|||
|
|||
#Cwtch will crash and burn if 9051 isn't ready |
|||
sleep 15 |
|||
|
|||
if [ -z "${CWTCH_CONFIG_DIR}" ]; then |
|||
CWTCH_CONFIG_DIR=/etc/cwtch/ |
|||
fi |
|||
|
|||
#Run cwtch (or whatever the user passed) |
|||
CWTCH_CONFIG_DIR=$CWTCH_CONFIG_DIR exec "$@" |
@ -0,0 +1,27 @@ |
|||
User _tor |
|||
DataDirectory /var/lib/tor |
|||
|
|||
ORPort 0 |
|||
ExitRelay 0 |
|||
IPv6Exit 0 |
|||
|
|||
#We need this running in the background as the server doesn't launch it itself |
|||
RunAsDaemon 1 |
|||
|
|||
ClientOnly 1 |
|||
SocksPort 9050 |
|||
|
|||
ControlPort 9051 |
|||
ControlSocket /run/tor/control |
|||
ControlSocketsGroupWritable 1 |
|||
CookieAuthentication 1 |
|||
CookieAuthFile /run/tor/control.authcookie |
|||
CookieAuthFileGroupReadable 1 |
|||
#HashedControlPassword 16:B4C8EE980C085EE460AEA9094350DAA9C2B5F841400E9BBA247368400A |
|||
|
|||
# Run as a relay only (change policy to enable exit node) |
|||
ExitPolicy reject *:* # no exits allowed |
|||
ExitPolicy reject6 *:* |
|||
|
|||
# Additional config built by the entrypoint will go here |
|||
|
@ -0,0 +1,13 @@ |
|||
module git.openprivacy.ca/cwtch.im/server |
|||
|
|||
go 1.14 |
|||
|
|||
require ( |
|||
cwtch.im/cwtch v0.7.3 |
|||
git.openprivacy.ca/cwtch.im/tapir v0.4.0 |
|||
git.openprivacy.ca/openprivacy/connectivity v1.4.3 |
|||
git.openprivacy.ca/openprivacy/log v1.0.2 |
|||
github.com/gtank/ristretto255 v0.1.2 |
|||
github.com/struCoder/pidusage v0.1.3 |
|||
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee |
|||
) |
@ -0,0 +1,60 @@ |
|||
cwtch.im/cwtch v0.7.3 h1:3f2Q2XFgNtq8hVbEz7K++exS1gDsO+zmSO7X7Fqz8Mo= |
|||
cwtch.im/cwtch v0.7.3/go.mod h1:S0JRLSTwFM+kRrpOPKgUQn/loKe/fc6lLDnOFopRKq8= |
|||
git.openprivacy.ca/cwtch.im/tapir v0.4.0 h1:clG8uORt0NKEhT4P+Dpw1pzyUuYzYBMevGqn2pciKk8= |
|||
git.openprivacy.ca/cwtch.im/tapir v0.4.0/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E= |
|||
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= |
|||
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= |
|||
git.openprivacy.ca/openprivacy/connectivity v1.4.3 h1:i2Ad/U9FlL9dKr2bhRck7lJ8NoWyGtoEfUwoCyMT0fU= |
|||
git.openprivacy.ca/openprivacy/connectivity v1.4.3/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo= |
|||
git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= |
|||
git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM= |
|||
git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= |
|||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
|||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= |
|||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
|||
github.com/gtank/merlin v0.1.1 h1:eQ90iG7K9pOhtereWsmyRJ6RAwcP4tHTDBHXNg+u5is= |
|||
github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= |
|||
github.com/gtank/ristretto255 v0.1.2 h1:JEqUCPA1NvLq5DwYtuzigd7ss8fwbYay9fi4/5uMzcc= |
|||
github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o= |
|||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= |
|||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= |
|||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= |
|||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= |
|||
github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 h1:hLDRPB66XQT/8+wG9WsDpiCvZf1yKO7sz7scAjSlBa0= |
|||
github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM= |
|||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= |
|||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= |
|||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= |
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= |
|||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= |
|||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= |
|||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= |
|||
github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ= |
|||
github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= |
|||
go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= |
|||
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= |
|||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
|||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= |
|||
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee h1:4yd7jl+vXjalO5ztz6Vc1VADv+S/80LGJmyl1ROJ2AI= |
|||
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= |
|||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= |
|||
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= |
|||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= |
|||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= |
|||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
|||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
|||
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
|||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
|||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
|||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= |
|||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
|||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= |
|||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
|||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
|||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
|||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
|||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
|||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= |
|||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
|||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= |
|||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
@ -0,0 +1,249 @@ |
|||
package metrics |
|||
|
|||
import ( |
|||
"bufio" |
|||
"fmt" |
|||
"strings" |
|||
"sync" |
|||
"sync/atomic" |
|||
"time" |
|||
) |
|||
|
|||
type counter struct { |
|||
startTime time.Time |
|||
count uint64 |
|||
total uint64 |
|||
} |
|||
|
|||
// Counter providers a threadsafe counter to use for storing long running counts
|
|||
type Counter interface { |
|||
Add(unit int) |
|||
Reset() |
|||
|
|||
Count() int |
|||
GetStarttime() time.Time |
|||
} |
|||
|
|||
// NewCounter initializes a counter starting at time.Now() and a count of 0 and returns it
|
|||
func NewCounter() Counter { |
|||
c := &counter{startTime: time.Now(), count: 0, total: 0} |
|||
return c |
|||
} |
|||
|
|||
// Add add a count of unit to the counter
|
|||
func (c *counter) Add(unit int) { |
|||
atomic.AddUint64(&c.count, uint64(unit)) |
|||
} |
|||
|
|||
// Count returns the count since Start
|
|||
func (c *counter) Count() int { |
|||
return int(atomic.LoadUint64(&c.count)) |
|||
} |
|||
|
|||
func (c *counter) Reset() { |
|||
atomic.StoreUint64(&c.count, 0) |
|||
c.startTime = time.Now() |
|||
} |
|||
|
|||
// GetStarttime returns the starttime of the counter
|
|||
func (c *counter) GetStarttime() time.Time { |
|||
return c.startTime |
|||
} |
|||
|
|||
// MonitorType controls how the monitor will report itself
|
|||
type MonitorType int |
|||
|
|||
const ( |
|||
// Count indicates the monitor should report in interger format
|
|||
Count MonitorType = iota |
|||
// Percent indicates the monitor should report in decimal format with 2 places
|
|||
Percent |
|||
// MegaBytes indicates the monitor should transform the raw number into MBs
|
|||
MegaBytes |
|||
) |
|||
|
|||
// MonitorAccumulation controls how monitor data is accumulated over time into larger summary buckets
|
|||
type MonitorAccumulation int |
|||
|
|||
const ( |
|||
// Cumulative values with sum over time
|
|||
Cumulative MonitorAccumulation = iota |
|||
// Average values will average over time
|
|||
Average |
|||
) |
|||
|
|||
type monitorHistory struct { |
|||
monitorType MonitorType |
|||
monitorAccumulation MonitorAccumulation |
|||
|
|||
starttime time.Time |
|||
perMinutePerHour [60]float64 |
|||
timeLastHourRotate time.Time |
|||
perHourForDay [24]float64 |
|||
timeLastDayRotate time.Time |
|||
perDayForWeek [7]float64 |
|||
timeLastWeekRotate time.Time |
|||
perWeekForMonth [4]float64 |
|||
timeLastMonthRotate time.Time |
|||
perMonthForYear [12]float64 |
|||
|
|||
monitor func() float64 |
|||
|
|||
breakChannel chan bool |
|||
lock sync.Mutex |
|||
} |
|||
|
|||
// MonitorHistory runs a monitor every minute and rotates and averages the results out across time
|
|||
type MonitorHistory interface { |
|||
Start() |
|||
Stop() |
|||
|
|||
Minutes() []float64 |
|||
Hours() []float64 |
|||
Days() []float64 |
|||
Weeks() []float64 |
|||
Months() []float64 |
|||
|
|||
Report(w *bufio.Writer) |
|||
} |
|||
|
|||
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
|
|||
func NewMonitorHistory(t MonitorType, a MonitorAccumulation, monitor func() float64) MonitorHistory { |
|||
mh := &monitorHistory{monitorType: t, monitorAccumulation: a, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool), |
|||
timeLastHourRotate: time.Now(), timeLastDayRotate: time.Now(), timeLastWeekRotate: time.Now(), |
|||
timeLastMonthRotate: time.Now()} |
|||
mh.Start() |
|||
return mh |
|||
} |
|||
|
|||
// Start starts a monitorHistory go rountine to run the monitor at intervals and rotate history
|
|||
func (mh *monitorHistory) Start() { |
|||
go mh.monitorThread() |
|||
} |
|||
|
|||
// Stop stops a monitorHistory go routine
|
|||
func (mh *monitorHistory) Stop() { |
|||
mh.breakChannel <- true |
|||
} |
|||
|
|||
// Minutes returns the last 60 minute monitoring results
|
|||
func (mh *monitorHistory) Minutes() []float64 { |
|||
return mh.returnCopy(mh.perMinutePerHour[:]) |
|||
} |
|||
|
|||
// Hours returns the last 24 hourly averages of monitor results
|
|||
func (mh *monitorHistory) Hours() []float64 { |
|||
return mh.returnCopy(mh.perHourForDay[:]) |
|||
} |
|||
|
|||
// Days returns the last 7 day averages of monitor results
|
|||
func (mh *monitorHistory) Days() []float64 { |
|||
return mh.returnCopy(mh.perDayForWeek[:]) |
|||
} |
|||
|
|||
// Weeks returns the last 4 weeks of averages of monitor results
|
|||
func (mh *monitorHistory) Weeks() []float64 { |
|||
return mh.returnCopy(mh.perWeekForMonth[:]) |
|||
} |
|||
|
|||
// Months returns the last 12 months of averages of monitor results
|
|||
func (mh *monitorHistory) Months() []float64 { |
|||
return mh.returnCopy(mh.perMonthForYear[:]) |
|||
} |
|||
|
|||
func (mh *monitorHistory) Report(w *bufio.Writer) { |
|||
mh.lock.Lock() |
|||
fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:])) |
|||
fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:])) |
|||
fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:])) |
|||
fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:])) |
|||
fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:])) |
|||
mh.lock.Unlock() |
|||
} |
|||
|
|||
func reportLine(t MonitorType, array []float64) string { |
|||
switch t { |
|||
case Count: |
|||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.0f", array)), " "), "[]") |
|||
case Percent: |
|||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.2f", array)), " "), "[]") |
|||
case MegaBytes: |
|||
mbs := make([]int, len(array)) |
|||
for i, b := range array { |
|||
mbs[i] = int(b) / 1024 / 1024 |
|||
} |
|||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%d", mbs)), "MBs "), "[]") + "MBs" |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func (mh *monitorHistory) returnCopy(slice []float64) []float64 { |
|||
retSlice := make([]float64, len(slice)) |
|||
mh.lock.Lock() |
|||
for i, v := range slice { |
|||
retSlice[i] = v |
|||
} |
|||
mh.lock.Unlock() |
|||
return retSlice |
|||
} |
|||
|
|||
func rotateAndAccumulate(array []float64, newVal float64, acc MonitorAccumulation) float64 { |
|||
total := float64(0.0) |
|||
for i := len(array) - 1; i > 0; i-- { |
|||
array[i] = array[i-1] |
|||
total += array[i] |
|||
} |
|||
array[0] = newVal |
|||
total += newVal |
|||
if acc == Cumulative { |
|||
return total |
|||
} |
|||
return total / float64(len(array)) |
|||
} |
|||
func accumulate(array []float64, acc MonitorAccumulation) float64 { |
|||
total := float64(0) |
|||
for _, x := range array { |
|||
total += x |
|||
} |
|||
if acc == Cumulative { |
|||
return total |
|||
} |
|||
return total / float64(len(array)) |
|||
} |
|||
|
|||
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
|
|||
func (mh *monitorHistory) monitorThread() { |
|||
for { |
|||
select { |
|||
case <-time.After(time.Minute): |
|||
mh.lock.Lock() |
|||
|
|||
minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation) |
|||
|
|||
if time.Now().Sub(mh.timeLastHourRotate) > time.Hour { |
|||
rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation) |
|||
mh.timeLastHourRotate = time.Now() |
|||
} |
|||
|
|||
if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 { |
|||
rotateAndAccumulate(mh.perDayForWeek[:], accumulate(mh.perHourForDay[:], mh.monitorAccumulation), mh.monitorAccumulation) |
|||
mh.timeLastDayRotate = time.Now() |
|||
} |
|||
|
|||
if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 { |
|||
rotateAndAccumulate(mh.perWeekForMonth[:], accumulate(mh.perDayForWeek[:], mh.monitorAccumulation), mh.monitorAccumulation) |
|||
mh.timeLastWeekRotate = time.Now() |
|||
} |
|||
|
|||
if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 { |
|||
rotateAndAccumulate(mh.perMonthForYear[:], accumulate(mh.perWeekForMonth[:], mh.monitorAccumulation), mh.monitorAccumulation) |
|||
mh.timeLastMonthRotate = time.Now() |
|||
} |
|||
|
|||
mh.lock.Unlock() |
|||
|
|||
case <-mh.breakChannel: |
|||
return |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,37 @@ |
|||
package metrics |
|||
|
|||
import ( |
|||
"testing" |
|||
"time" |
|||
) |
|||
|
|||
func TestCounter(t *testing.T) { |
|||
starttime := time.Now() |
|||
c := NewCounter() |
|||
|
|||
max := 100 |
|||
done := make(chan bool, max) |
|||
|
|||
// slightly stress test atomic nature of metric by flooding with threads Add()ing
|
|||
for i := 0; i < max; i++ { |
|||
go func() { |
|||
c.Add(1) |
|||
done <- true |
|||
}() |
|||
} |
|||
|
|||
for i := 0; i < max; i++ { |
|||
<-done |
|||
} |
|||
|
|||
val := c.Count() |
|||
if val != 100 { |
|||
t.Errorf("counter count was not 100") |
|||
} |
|||
|
|||
counterStart := c.GetStarttime() |
|||
|
|||
if counterStart.Sub(starttime) > time.Millisecond { |
|||
t.Errorf("counter's starttime was innaccurate %v", counterStart.Sub(starttime)) |
|||
} |
|||
} |
@ -0,0 +1,119 @@ |
|||
package metrics |
|||
|
|||
import ( |
|||
"bufio" |
|||
"fmt" |
|||
"git.openprivacy.ca/cwtch.im/tapir" |
|||
"git.openprivacy.ca/openprivacy/log" |
|||
"github.com/struCoder/pidusage" |
|||
"os" |
|||
"path" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
const ( |
|||
reportFile = "serverMonitorReport.txt" |
|||
) |
|||
|
|||
// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns
|
|||
type Monitors struct { |
|||
MessageCounter Counter |
|||
TotalMessageCounter Counter |
|||
Messages MonitorHistory |
|||
CPU MonitorHistory |
|||
Memory MonitorHistory |
|||
ClientConns MonitorHistory |
|||
starttime time.Time |
|||
breakChannel chan bool |
|||
log bool |
|||
configDir string |
|||
} |
|||
|
|||
// Start initializes a Monitors's monitors
|
|||
func (mp *Monitors) Start(ts tapir.Service, configDir string, log bool) { |
|||
mp.log = log |
|||
mp.configDir = configDir |
|||
mp.starttime = time.Now() |
|||
mp.breakChannel = make(chan bool) |
|||
mp.MessageCounter = NewCounter() |
|||
|
|||
// Maintain a count of total messages
|
|||
mp.TotalMessageCounter = NewCounter() |
|||
mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) { |
|||
c = float64(mp.MessageCounter.Count()) |
|||
mp.TotalMessageCounter.Add(int(c)) |
|||
mp.MessageCounter.Reset() |
|||
return |
|||
}) |
|||
|
|||
var pidUsageLock sync.Mutex |
|||
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { |
|||
pidUsageLock.Lock() |
|||
defer pidUsageLock.Unlock() |
|||
sysInfo, _ := pidusage.GetStat(os.Getpid()) |
|||
return float64(sysInfo.CPU) |
|||
}) |
|||
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { |
|||
pidUsageLock.Lock() |
|||
defer pidUsageLock.Unlock() |
|||
sysInfo, _ := pidusage.GetStat(os.Getpid()) |
|||
return float64(sysInfo.Memory) |
|||
}) |
|||
|
|||
// TODO: replace with ts.
|
|||
mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ts.Metrics().ConnectionCount) }) |
|||
|
|||
if mp.log { |
|||
go mp.run() |
|||
} |
|||
} |
|||
|
|||
func (mp *Monitors) run() { |
|||
for { |
|||
select { |
|||
case <-time.After(time.Minute): |
|||
mp.report() |
|||
case <-mp.breakChannel: |
|||
return |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (mp *Monitors) report() { |
|||
f, err := os.Create(path.Join(mp.configDir, reportFile)) |
|||
if err != nil { |
|||
log.Errorf("Could not open monitor reporting file: %v", err) |
|||
return |
|||
} |
|||
defer f.Close() |
|||
|
|||
w := bufio.NewWriter(f) |
|||
|
|||
fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime)) |
|||
|
|||
fmt.Fprintln(w, "messages:") |
|||
mp.Messages.Report(w) |
|||
|
|||
fmt.Fprintln(w, "\nClient Connections:") |
|||
mp.ClientConns.Report(w) |
|||
|
|||
fmt.Fprintln(w, "\nCPU:") |
|||
mp.CPU.Report(w) |
|||
|
|||
fmt.Fprintln(w, "\nMemory:") |
|||
mp.Memory.Report(w) |
|||
|
|||
w.Flush() |
|||
} |
|||
|
|||
// Stop stops all the monitors in a Monitors
|
|||
func (mp *Monitors) Stop() { |
|||
if mp.log { |
|||
mp.breakChannel <- true |
|||
} |
|||
mp.Messages.Stop() |
|||
mp.CPU.Stop() |
|||
mp.Memory.Stop() |
|||
mp.ClientConns.Stop() |
|||
} |
@ -0,0 +1,161 @@ |
|||
package server |
|||
|
|||
import ( |
|||
"crypto/ed25519" |
|||
"cwtch.im/cwtch/model" |
|||
"cwtch.im/cwtch/server/metrics" |
|||
"cwtch.im/cwtch/server/storage" |
|||
"fmt" |
|||
"git.openprivacy.ca/cwtch.im/tapir" |
|||
"git.openprivacy.ca/cwtch.im/tapir/applications" |
|||
tor2 "git.openprivacy.ca/cwtch.im/tapir/networks/tor" |
|||
"git.openprivacy.ca/cwtch.im/tapir/persistence" |
|||
"git.openprivacy.ca/cwtch.im/tapir/primitives" |
|||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" |
|||
"git.openprivacy.ca/openprivacy/connectivity" |
|||
"git.openprivacy.ca/openprivacy/connectivity/tor" |
|||
"git.openprivacy.ca/openprivacy/log" |
|||
"path" |
|||
"sync" |
|||
) |
|||
|
|||
// Server encapsulates a complete, compliant Cwtch server.
|
|||
type Server struct { |
|||
service tapir.Service |
|||
config Config |
|||
metricsPack metrics.Monitors |
|||
tokenTapirService tapir.Service |
|||
tokenServer *privacypass.TokenServer |
|||
tokenService primitives.Identity |
|||
tokenServicePrivKey ed25519.PrivateKey |
|||
tokenServiceStopped bool |
|||
onionServiceStopped bool |
|||
running bool |
|||
existingMessageCount int |
|||
lock sync.RWMutex |
|||
} |
|||
|
|||
// Setup initialized a server from a given configuration
|
|||
func (s *Server) Setup(serverConfig Config) { |
|||
s.config = serverConfig |
|||
bs := new(persistence.BoltPersistence) |
|||
bs.Open(path.Join(serverConfig.ConfigDir, "tokens.db")) |
|||
s.tokenServer = privacypass.NewTokenServerFromStore(&serverConfig.TokenServiceK, bs) |
|||
log.Infof("Y: %v", s.tokenServer.Y) |
|||
s.tokenService = s.config.TokenServiceIdentity() |
|||
s.tokenServicePrivKey = s.config.TokenServerPrivateKey |
|||
} |
|||
|
|||
// Identity returns the main onion identity of the server
|
|||
func (s *Server) Identity() primitives.Identity { |
|||
return s.config.Identity() |
|||
} |
|||
|
|||
// Run starts a server with the given privateKey
|
|||
func (s *Server) Run(acn connectivity.ACN) error { |
|||
addressIdentity := tor.GetTorV3Hostname(s.config.PublicKey) |
|||
identity := primitives.InitializeIdentity("", &s.config.PrivateKey, &s.config.PublicKey) |
|||
var service tapir.Service |
|||
service = new(tor2.BaseOnionService) |
|||
service.Init(acn, s.config.PrivateKey, &identity) |
|||
s.service = service |
|||
log.Infof("cwtch server running on cwtch:%s\n", addressIdentity+".onion:") |
|||
s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile) |
|||
|
|||
ms := new(storage.MessageStore) |
|||
err := ms.Init(s.config.ConfigDir, s.config.MaxBufferLines, s.metricsPack.MessageCounter) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// Needed because we only collect metrics on a per-session basis
|
|||
// TODO fix metrics so they persist across sessions?
|
|||
s.existingMessageCount = len(ms.FetchMessages()) |
|||
|
|||
s.tokenTapirService = new(tor2.BaseOnionService) |
|||
s.tokenTapirService.Init(acn, s.tokenServicePrivKey, &s.tokenService) |
|||
tokenApplication := new(applications.TokenApplication) |
|||
tokenApplication.TokenService = s.tokenServer |
|||
powTokenApp := new(applications.ApplicationChain). |
|||
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability). |
|||
ChainApplication(tokenApplication, applications.HasTokensCapability) |
|||
go func() { |
|||
s.tokenTapirService.Listen(powTokenApp) |
|||
s.tokenServiceStopped = true |
|||
}() |
|||
go func() { |
|||
s.service.Listen(NewTokenBoardServer(ms, s.tokenServer)) |
|||
s.onionServiceStopped = true |
|||
}() |
|||
|
|||
s.lock.Lock() |
|||
s.running = true |
|||
s.lock.Unlock() |
|||
return nil |
|||
} |
|||
|
|||
// KeyBundle provides the signed keybundle of the server
|
|||
func (s *Server) KeyBundle() *model.KeyBundle { |
|||
kb := model.NewKeyBundle() |
|||
identity := s.config.Identity() |
|||
kb.Keys[model.KeyTypeServerOnion] = model.Key(identity.Hostname()) |
|||
kb.Keys[model.KeyTypeTokenOnion] = model.Key(s.tokenService.Hostname()) |
|||
kb.Keys[model.KeyTypePrivacyPass] = model.Key(s.tokenServer.Y.String()) |
|||
kb.Sign(identity) |
|||
return kb |
|||
} |
|||
|
|||
// CheckStatus returns true if the server is running and/or an error if any part of the server needs to be restarted.
|
|||
func (s *Server) CheckStatus() (bool, error) { |
|||
s.lock.RLock() |
|||
defer s.lock.RUnlock() |
|||
if s.onionServiceStopped == true || s.tokenServiceStopped == true { |
|||
return s.running, fmt.Errorf("one of more server components are down: onion:%v token service: %v", s.onionServiceStopped, s.tokenServiceStopped) |
|||
} |
|||
return s.running, nil |
|||
} |
|||
|
|||
// Shutdown kills the app closing all connections and freeing all goroutines
|
|||
func (s *Server) Shutdown() { |
|||
s.lock.Lock() |
|||
defer s.lock.Unlock() |
|||
s.service.Shutdown() |
|||
s.tokenTapirService.Shutdown() |
|||
s.metricsPack.Stop() |
|||
s.running = true |
|||
|
|||
} |
|||
|
|||
// Statistics is an encapsulation of information about the server that an operator might want to know at a glance.
|
|||
type Statistics struct { |
|||
TotalMessages int |
|||
} |
|||
|
|||
// GetStatistics is a stub method for providing some high level information about
|
|||
// the server operation to bundling applications (e.g. the UI)
|
|||
func (s *Server) GetStatistics() Statistics { |
|||
// TODO Statistics from Metrics is very awkward. Metrics needs an overhaul to make safe
|
|||
total := s.existingMessageCount |
|||
if s.metricsPack.TotalMessageCounter != nil { |
|||
total += s.metricsPack.TotalMessageCounter.Count() |
|||
} |
|||
|
|||
return Statistics{ |
|||
TotalMessages: total, |
|||
} |
|||
} |
|||
|
|||
// ConfigureAutostart sets whether this server should autostart (in the Cwtch UI/bundling application)
|
|||
func (s *Server) ConfigureAutostart(autostart bool) { |
|||
s.config.AutoStart = autostart |
|||
s.config.Save(s.config.ConfigDir, s.config.FilePath) |
|||
} |
|||
|
|||
// Close shuts down the cwtch server in a safe way.
|
|||
func (s *Server) Close() { |
|||
log.Infof("Shutting down server") |
|||
s.lock.Lock() |
|||
defer s.lock.Unlock() |
|||
log.Infof("Closing Token Server Database...") |
|||
s.tokenServer.Close() |
|||
} |
@ -0,0 +1,98 @@ |
|||
package server |
|||
|
|||
import ( |
|||
"crypto/rand" |
|||
"encoding/json" |
|||
"git.openprivacy.ca/cwtch.im/tapir/primitives" |
|||
"git.openprivacy.ca/openprivacy/log" |
|||
"github.com/gtank/ristretto255" |
|||
"golang.org/x/crypto/ed25519" |
|||
"io/ioutil" |
|||
"path" |
|||
) |
|||
|
|||
// Reporting is a struct for storing a the config a server needs to be a peer, and connect to a group to report
|
|||
type Reporting struct { |
|||
LogMetricsToFile bool `json:"logMetricsToFile"` |
|||
ReportingGroupID string `json:"reportingGroupId"` |
|||
ReportingServerAddr string `json:"reportingServerAddr"` |
|||
} |
|||
|
|||
// Config is a struct for storing basic server configuration
|
|||
type Config struct { |
|||
ConfigDir string `json:"-"` |
|||
FilePath string `json:"-"` |
|||
MaxBufferLines int `json:"maxBufferLines"` |
|||
|
|||
PublicKey ed25519.PublicKey `json:"publicKey"` |
|||
PrivateKey ed25519.PrivateKey `json:"privateKey"` |
|||
|
|||
TokenServerPublicKey ed25519.PublicKey `json:"tokenServerPublicKey"` |
|||
TokenServerPrivateKey ed25519.PrivateKey `json:"tokenServerPrivateKey"` |
|||
|
|||
TokenServiceK ristretto255.Scalar `json:"tokenServiceK"` |
|||
|
|||
ServerReporting Reporting `json:"serverReporting"` |
|||
AutoStart bool `json:"autostart"` |
|||
} |
|||
|
|||
// Identity returns an encapsulation of the servers keys
|
|||
func (config *Config) Identity() primitives.Identity { |
|||
return primitives.InitializeIdentity("", &config.PrivateKey, &config.PublicKey) |
|||
} |
|||
|
|||
// TokenServiceIdentity returns an encapsulation of the servers token server (experimental)
|
|||
func (config *Config) TokenServiceIdentity() primitives.Identity { |
|||
return primitives.InitializeIdentity("", &config.TokenServerPrivateKey, &config.TokenServerPublicKey) |
|||
} |
|||
|
|||
// Save dumps the latest version of the config to a json file given by filename
|
|||
func (config *Config) Save(dir, filename string) { |
|||
log.Infof("Saving config to %s\n", path.Join(dir, filename)) |
|||
bytes, _ := json.MarshalIndent(config, "", "\t") |
|||
ioutil.WriteFile(path.Join(dir, filename), bytes, 0600) |
|||
} |
|||
|
|||
// LoadConfig loads a Config from a json file specified by filename
|
|||
func LoadConfig(configDir, filename string) Config { |
|||
log.Infof("Loading config from %s\n", path.Join(configDir, filename)) |
|||
config := Config{} |
|||
|
|||
id, pk := primitives.InitializeEphemeralIdentity() |
|||
tid, tpk := primitives.InitializeEphemeralIdentity() |
|||
config.PrivateKey = pk |
|||
config.PublicKey = id.PublicKey() |
|||
config.TokenServerPrivateKey = tpk |
|||
config.TokenServerPublicKey = tid.PublicKey() |
|||
config.MaxBufferLines = 100000 |
|||
config.ServerReporting = Reporting{ |
|||
LogMetricsToFile: true, |
|||
ReportingGroupID: "", |
|||
ReportingServerAddr: "", |
|||
} |
|||
config.AutoStart = false |
|||
config.ConfigDir = configDir |
|||
config.FilePath = filename |
|||
|
|||
k := new(ristretto255.Scalar) |
|||
b := make([]byte, 64) |
|||
_, err := rand.Read(b) |
|||
if err != nil { |
|||
// unable to generate secure random numbers
|
|||
panic("unable to generate secure random numbers") |
|||
} |
|||
k.FromUniformBytes(b) |
|||
config.TokenServiceK = *k |
|||
|
|||
raw, err := ioutil.ReadFile(path.Join(configDir, filename)) |
|||
if err == nil { |
|||
err = json.Unmarshal(raw, &config) |
|||
|
|||
if err != nil { |
|||
log.Errorf("reading config: %v", err) |
|||
} |
|||
} |
|||
// Always save (first time generation, new version with new variables populated)
|
|||
config.Save(configDir, filename) |
|||
return config |
|||
} |
@ -0,0 +1,119 @@ |
|||
package server |
|||
|
|||
import ( |
|||
"cwtch.im/cwtch/protocol/groups" |
|||
"cwtch.im/cwtch/server/storage" |
|||
"encoding/json" |
|||
"git.openprivacy.ca/cwtch.im/tapir" |
|||
"git.openprivacy.ca/cwtch.im/tapir/applications" |
|||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" |
|||
"git.openprivacy.ca/openprivacy/log" |
|||
) |
|||
|
|||
// NewTokenBoardServer generates new Server for Token Board
|
|||
func NewTokenBoardServer(store storage.MessageStoreInterface, tokenService *privacypass.TokenServer) tapir.Application { |
|||
tba := new(TokenboardServer) |
|||
tba.TokenService = tokenService |
|||
tba.LegacyMessageStore = store |
|||
return tba |
|||
} |
|||
|
|||
// TokenboardServer defines the token board server
|
|||
type TokenboardServer struct { |
|||
applications.AuthApp |
|||
connection tapir.Connection |
|||
TokenService *privacypass.TokenServer |
|||
LegacyMessageStore storage.MessageStoreInterface |
|||
} |
|||
|
|||
// NewInstance creates a new TokenBoardApp
|
|||
func (ta *TokenboardServer) NewInstance() tapir.Application { |
|||
tba := new(TokenboardServer) |
|||
tba.TokenService = ta.TokenService |
|||
tba.LegacyMessageStore = ta.LegacyMessageStore |
|||
return tba |
|||
} |
|||
|
|||
// Init initializes the cryptographic TokenBoardApp
|
|||
func (ta *TokenboardServer) Init(connection tapir.Connection) { |
|||
ta.AuthApp.Init(connection) |
|||
if connection.HasCapability(applications.AuthCapability) { |
|||
ta.connection = connection |
|||
go ta.Listen() |
|||
} else { |
|||
connection.Close() |
|||
} |
|||
} |
|||
|
|||
// Listen processes the messages for this application
|
|||
func (ta *TokenboardServer) Listen() { |
|||
for { |
|||
data := ta.connection.Expect() |
|||
if len(data) == 0 { |
|||
log.Debugf("Server Closing Connection") |
|||
ta.connection.Close() |
|||
return // connection is closed
|
|||
} |
|||
|
|||
var message groups.Message |
|||
if err := json.Unmarshal(data, &message); err != nil { |
|||
log.Debugf("Server Closing Connection Because of Malformed Client Packet %v", err) |
|||
ta.connection.Close() |
|||
return // connection is closed
|
|||
} |
|||
|
|||
switch message.MessageType { |
|||
case groups.PostRequestMessage: |
|||
if message.PostRequest != nil { |
|||
postrequest := *message.PostRequest |
|||
log.Debugf("Received a Post Message Request: %v", ta.connection.Hostname()) |
|||
ta.postMessageRequest(postrequest) |
|||
} else { |
|||
log.Debugf("Server Closing Connection Because of PostRequestMessage Client Packet") |
|||
ta.connection.Close() |
|||
return // connection is closed
|
|||
} |
|||
case groups.ReplayRequestMessage: |
|||
if message.ReplayRequest != nil { |
|||
log.Debugf("Received Replay Request %v", message.ReplayRequest) |
|||
messages := ta.LegacyMessageStore.FetchMessages() |
|||
response, _ := json.Marshal(groups.Message{MessageType: groups.ReplayResultMessage, ReplayResult: &groups.ReplayResult{NumMessages: len(messages)}}) |
|||
log.Debugf("Sending Replay Response %v", groups.ReplayResult{NumMessages: len(messages)}) |
|||
ta.connection.Send(response) |
|||
for _, message := range messages { |
|||
data, _ = json.Marshal(message) |
|||
ta.connection.Send(data) |
|||
} |
|||
log.Debugf("Finished Requested Sync") |
|||
// Set sync and then send any new messages that might have happened while we were syncing
|
|||
ta.connection.SetCapability(groups.CwtchServerSyncedCapability) |
|||
newMessages := ta.LegacyMessageStore.FetchMessages() |
|||
if len(newMessages) > len(messages) { |
|||
for _, message := range newMessages[len(messages):] { |
|||
data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: *message}}) |
|||
ta.connection.Send(data) |
|||
} |
|||
} |
|||
} else { |
|||
log.Debugf("Server Closing Connection Because of Malformed ReplayRequestMessage Packet") |
|||
ta.connection.Close() |
|||
return // connection is closed
|
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (ta *TokenboardServer) postMessageRequest(pr groups.PostRequest) { |
|||
if err := ta.TokenService.SpendToken(pr.Token, append(pr.EGM.ToBytes(), ta.connection.ID().Hostname()...)); err == nil { |
|||
log.Debugf("Token is valid") |
|||
ta.LegacyMessageStore.AddMessage(pr.EGM) |
|||
data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: true}}) |
|||
ta.connection.Send(data) |
|||
data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: pr.EGM}}) |
|||
ta.connection.Broadcast(data, groups.CwtchServerSyncedCapability) |
|||
} else { |
|||
log.Debugf("Attempt to spend an invalid token: %v", err) |
|||
data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: false}}) |
|||
ta.connection.Send(data) |
|||
} |
|||
} |
@ -0,0 +1,152 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"bufio" |
|||
"cwtch.im/cwtch/protocol/groups" |
|||
"cwtch.im/cwtch/server/metrics" |
|||
"encoding/json" |
|||
"fmt" |
|||
"git.openprivacy.ca/openprivacy/log" |
|||
"os" |
|||
"path" |
|||
"sync" |
|||
) |
|||
|
|||
const ( |
|||
fileStorePartitions = 10 |
|||
fileStoreFilename = "cwtch.messages" |
|||
directory = "messages" |
|||
) |
|||
|
|||
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
|
|||
type MessageStoreInterface interface { |
|||
AddMessage(groups.EncryptedGroupMessage) |
|||
FetchMessages() []*groups.EncryptedGroupMessage |
|||
} |
|||
|
|||
// MessageStore is a file-backed implementation of MessageStoreInterface
|
|||
type MessageStore struct { |
|||
activeLogFile *os.File |
|||
filePos int |
|||
storeDirectory string |
|||
lock sync.Mutex |
|||
messages []*groups.EncryptedGroupMessage |
|||
messageCounter metrics.Counter |
|||
maxBufferLines int |
|||
bufferPos int |
|||
bufferRotated bool |
|||
} |
|||
|
|||
// Close closes the message store and underlying resources.
|
|||
func (ms *MessageStore) Close() { |
|||
ms.lock.Lock() |
|||
ms.messages = nil |
|||
ms.activeLogFile.Close() |
|||
ms.lock.Unlock() |
|||
} |
|||
|
|||
func (ms *MessageStore) updateBuffer(gm *groups.EncryptedGroupMessage) { |
|||
ms.messages[ms.bufferPos] = gm |
|||
ms.bufferPos++ |
|||
if ms.bufferPos == ms.maxBufferLines { |
|||
ms.bufferPos = 0 |
|||
ms.bufferRotated = true |
|||
} |
|||
} |
|||
|
|||
func (ms *MessageStore) initAndLoadFiles() error { |
|||
ms.activeLogFile = nil |
|||
for i := fileStorePartitions - 1; i >= 0; i-- { |
|||
ms.filePos = 0 |
|||
filename := path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)) |
|||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) |
|||
if err != nil { |
|||
log.Errorf("MessageStore could not open: %v: %v", filename, err) |
|||
continue |
|||
} |
|||
ms.activeLogFile = f |
|||
|
|||
scanner := bufio.NewScanner(f) |
|||
for scanner.Scan() { |
|||
gms := scanner.Text() |
|||
ms.filePos++ |
|||
gm := &groups.EncryptedGroupMessage{} |
|||
err := json.Unmarshal([]byte(gms), gm) |
|||
if err == nil { |
|||
ms.updateBuffer(gm) |
|||
} |
|||
} |
|||
} |
|||
if ms.activeLogFile == nil { |
|||
return fmt.Errorf("Could not create log file to write to in %s", ms.storeDirectory) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (ms *MessageStore) updateFile(gm *groups.EncryptedGroupMessage) { |
|||
s, err := json.Marshal(gm) |
|||
if err != nil { |
|||
log.Errorf("Failed to unmarshal group message %v\n", err) |
|||
} |
|||
fmt.Fprintf(ms.activeLogFile, "%s\n", s) |
|||
ms.filePos++ |
|||
if ms.filePos >= ms.maxBufferLines/fileStorePartitions { |
|||
ms.rotateFileStore() |
|||
} |
|||
} |
|||
|
|||
func (ms *MessageStore) rotateFileStore() { |
|||
ms.activeLogFile.Close() |
|||
os.Remove(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, fileStorePartitions-1))) |
|||
|
|||
for i := fileStorePartitions - 2; i >= 0; i-- { |
|||
os.Rename(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)), path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i+1))) |
|||
} |
|||
|
|||
f, err := os.OpenFile(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, 0)), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) |
|||
if err != nil { |
|||
log.Errorf("Could not open new message store file in: %s", ms.storeDirectory) |
|||
} |
|||
ms.filePos = 0 |
|||
ms.activeLogFile = f |
|||
} |
|||
|
|||
// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename
|
|||
func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error { |
|||
ms.storeDirectory = path.Join(appDirectory, directory) |
|||
os.Mkdir(ms.storeDirectory, 0700) |
|||
|
|||
ms.bufferPos = 0 |
|||
ms.maxBufferLines = maxBufferLines |
|||
ms.messages = make([]*groups.EncryptedGroupMessage, maxBufferLines) |
|||
ms.bufferRotated = false |
|||
ms.messageCounter = messageCounter |
|||
|
|||
err := ms.initAndLoadFiles() |
|||
return err |
|||
} |
|||
|
|||
// FetchMessages returns all messages from the backing file.
|
|||
func (ms *MessageStore) FetchMessages() (messages []*groups.EncryptedGroupMessage) { |
|||
ms.lock.Lock() |
|||
if !ms.bufferRotated { |
|||
messages = make([]*groups.EncryptedGroupMessage, ms.bufferPos) |
|||
copy(messages, ms.messages[0:ms.bufferPos]) |
|||
} else { |
|||
messages = make([]*groups.EncryptedGroupMessage, ms.maxBufferLines) |
|||
copy(messages, ms.messages[ms.bufferPos:ms.maxBufferLines]) |
|||
copy(messages[ms.bufferPos:], ms.messages[0:ms.bufferPos]) |
|||
} |
|||
ms.lock.Unlock() |
|||
return |
|||
} |
|||
|
|||
// AddMessage adds a GroupMessage to the store
|
|||
func (ms *MessageStore) AddMessage(gm groups.EncryptedGroupMessage) { |
|||
ms.messageCounter.Add(1) |
|||
ms.lock.Lock() |
|||
ms.updateBuffer(&gm) |
|||
ms.updateFile(&gm) |
|||
|
|||
ms.lock.Unlock() |
|||
} |
@ -0,0 +1,54 @@ |
|||
package storage |
|||
|
|||
import ( |
|||
"cwtch.im/cwtch/protocol/groups" |
|||
"cwtch.im/cwtch/server/metrics" |
|||
"os" |
|||
"strconv" |
|||
"testing" |
|||
) |
|||
|
|||
func TestMessageStore(t *testing.T) { |
|||
os.Remove("ms.test") |
|||
ms := new(MessageStore) |
|||
counter := metrics.NewCounter() |
|||
ms.Init("./", 1000, counter) |
|||
for i := 0; i < 499; i++ { |
|||
gm := groups.EncryptedGroupMessage{ |
|||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), |
|||
} |
|||
ms.AddMessage(gm) |
|||
} |
|||
if counter.Count() != 499 { |
|||
t.Errorf("Counter should be at 499 was %v", counter.Count()) |
|||
} |
|||
ms.Close() |
|||
ms.Init("./", 1000, counter) |
|||
m := ms.FetchMessages() |
|||
if len(m) != 499 { |
|||
t.Errorf("Should have been 499 was %v", len(m)) |
|||
} |
|||
|
|||
counter.Reset() |
|||
|
|||
for i := 0; i < 1000; i++ { |
|||
gm := groups.EncryptedGroupMessage{ |
|||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), |
|||
} |
|||
ms.AddMessage(gm) |
|||
} |
|||
|
|||
m = ms.FetchMessages() |
|||
if len(m) != 1000 { |
|||
t.Errorf("Should have been 1000 was %v", len(m)) |
|||
} |
|||
ms.Close() |
|||
ms.Init("./", 1000, counter) |
|||
m = ms.FetchMessages() |
|||
if len(m) != 999 { |
|||
t.Errorf("Should have been 999 was %v", len(m)) |
|||
} |
|||
ms.Close() |
|||
|
|||
os.RemoveAll("./messages") |
|||
} |
@ -0,0 +1,24 @@ |
|||
#!/bin/sh |
|||
|
|||
echo "Checking code quality (you want to see no output here)" |
|||
echo "" |
|||
|
|||
echo "Vetting:" |
|||
go list ./... | xargs go vet |
|||
|
|||
echo "" |
|||
echo "Linting:" |
|||
|
|||
go list ./... | xargs golint |
|||
|
|||
|
|||
echo "Time to format" |
|||
gofmt -l -s -w . |
|||
|
|||
# ineffassign (https://github.com/gordonklaus/ineffassign) |
|||
echo "Checking for ineffectual assignment of errors (unchecked errors...)" |
|||
ineffassign . |
|||
|
|||
# misspell (https://github.com/client9/misspell/cmd/misspell) |
|||
echo "Checking for misspelled words..." |
|||
misspell . | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea" |
@ -0,0 +1,10 @@ |
|||
#!/bin/bash |
|||
|
|||
set -e |
|||
pwd |
|||
GORACE="haltonerror=1" |
|||
go test -race ${1} -coverprofile=server.metrics.cover.out -v ./metrics |
|||
go test -race ${1} -coverprofile=server.metrics.cover.out -v ./storage |
|||
echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \ |
|||
awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out |
|||
rm -rf *.cover.out |
Loading…
Reference in new issue