Distinguish between Authenticated and Synced for Server Connections #379
78
Dockerfile
78
Dockerfile
|
@ -1,78 +0,0 @@
|
|||
FROM golang as server-build-stage
|
||||
ENV CGO_ENABLED=0 GOOS=linux
|
||||
|
||||
WORKDIR /go/src/cwtch.im/cwtch
|
||||
COPY . .
|
||||
|
||||
RUN go get -d -v ./...
|
||||
#RUN go install -v ./...
|
||||
WORKDIR /go/src/cwtch.im/cwtch/server/app/
|
||||
RUN go build -ldflags "-extldflags '-static'"
|
||||
|
||||
|
||||
|
||||
#----------------------------------------------
|
||||
FROM alpine:latest as tor-build-stage
|
||||
|
||||
# Install prerequisites
|
||||
RUN apk --no-cache add --update \
|
||||
gnupg \
|
||||
build-base \
|
||||
libevent \
|
||||
libevent-dev \
|
||||
libressl \
|
||||
libressl-dev \
|
||||
xz-libs \
|
||||
xz-dev \
|
||||
zlib \
|
||||
zlib-dev \
|
||||
zstd \
|
||||
zstd-dev \
|
||||
&& wget -q https://www.torproject.org/dist/tor-0.3.5.3-alpha.tar.gz \
|
||||
&& tar xf tor-0.3.5.3-alpha.tar.gz \
|
||||
&& cd tor-0.3.5.3-alpha \
|
||||
&& ./configure \
|
||||
&& make install \
|
||||
&& ls -R /usr/local/
|
||||
|
||||
FROM alpine:latest
|
||||
MAINTAINER Ablative Hosting <support@ablative.hosting>
|
||||
|
||||
#BSD habits die hard
|
||||
ENV TOR_USER=_tor
|
||||
|
||||
# Installing dependencies of Tor and pwgen
|
||||
RUN apk --no-cache add --update \
|
||||
libevent \
|
||||
libressl \
|
||||
xz-libs \
|
||||
zlib \
|
||||
zstd \
|
||||
zstd-dev \
|
||||
pwgen
|
||||
|
||||
# Copy Tor
|
||||
COPY --from=tor-build-stage /usr/local/ /usr/local/
|
||||
|
||||
# Create an unprivileged tor user
|
||||
RUN addgroup -S $TOR_USER && adduser -G $TOR_USER -S $TOR_USER && adduser -G _tor -S cwtchd && mkdir /run/tor
|
||||
|
||||
# Copy Tor configuration file
|
||||
COPY ./server/docker/torrc /etc/tor/torrc
|
||||
|
||||
# Copy docker-entrypoint
|
||||
COPY ./server/docker/docker-entrypoint /usr/local/bin/
|
||||
|
||||
# Copy across cwtch
|
||||
COPY --from=server-build-stage /go/src/cwtch.im/cwtch/server/app/app /usr/local/bin/cwtch_server
|
||||
|
||||
# Persist data
|
||||
VOLUME /etc/tor /var/lib/tor /etc/cwtch
|
||||
|
||||
ENTRYPOINT ["docker-entrypoint"]
|
||||
|
||||
#cwtchd is in the _tor group so can access the socket but that's it
|
||||
#USER cwtchd
|
||||
|
||||
#Launches the cwtchd daemon
|
||||
CMD ["/usr/local/bin/cwtch_server"]
|
|
@ -813,6 +813,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
case event.ServerStateChange:
|
||||
cp.mutex.Lock()
|
||||
// We update both the server contact status, as well as the groups the server belongs to
|
||||
log.Debugf("Got Server State Change %v", ev)
|
||||
cp.Profile.Contacts[ev.Data[event.GroupServer]].State = ev.Data[event.ConnectionState]
|
||||
|
||||
// TODO deprecate this, the UI should consult the server contact entry instead (it's far more efficient)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
|
@ -291,15 +292,24 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
|
|||
|
||||
Y := ristretto255.NewElement()
|
||||
Y.UnmarshalText([]byte(tokenServerY))
|
||||
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverSynced, e.serverDisconnected))
|
||||
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
|
||||
e.ephemeralServices.Store(onion, ephemeralService)
|
||||
// If we are already connected...check if we are authed and issue an auth event
|
||||
// (This allows the ui to be stateless)
|
||||
if connected && err != nil {
|
||||
conn, err := ephemeralService.GetConnection(onion)
|
||||
if err == nil {
|
||||
|
||||
// If the server is synced, resend the synced status update
|
||||
if conn.HasCapability(groups.CwtchServerSyncedCapability) {
|
||||
e.ignoreOnShutdown(e.serverConnected)(onion)
|
||||
e.ignoreOnShutdown(e.serverSynced)(onion)
|
||||
return
|
||||
}
|
||||
|
||||
// If the server is authed, resend the auth status update
|
||||
if conn.HasCapability(applications.AuthCapability) {
|
||||
// Resend the authed event...
|
||||
e.ignoreOnShutdown(e.serverAuthed)(onion)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -359,6 +369,13 @@ func (e *engine) serverConnected(onion string) {
|
|||
}))
|
||||
}
|
||||
|
||||
func (e *engine) serverAuthed(onion string) {
|
||||
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: onion,
|
||||
event.ConnectionState: ConnectionStateName[AUTHENTICATED],
|
||||
}))
|
||||
}
|
||||
|
||||
func (e *engine) serverSynced(onion string) {
|
||||
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: onion,
|
||||
|
|
|
@ -16,13 +16,14 @@ import (
|
|||
)
|
||||
|
||||
// NewTokenBoardClient generates a new Client for Token Board
|
||||
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, lastKnownSignature []byte, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverSyncedHandler func(server string), serverClosedHandler func(server string)) tapir.Application {
|
||||
func NewTokenBoardClient(acn connectivity.ACN, Y *ristretto255.Element, tokenServiceOnion string, lastKnownSignature []byte, groupMessageHandler func(server string, gm *groups.EncryptedGroupMessage), serverAuthedHandler func(server string), serverSyncedHandler func(server string), serverClosedHandler func(server string)) tapir.Application {
|
||||
tba := new(TokenBoardClient)
|
||||
tba.acn = acn
|
||||
tba.tokenService = privacypass.NewTokenServer()
|
||||
tba.tokenService.Y = Y
|
||||
tba.tokenServiceOnion = tokenServiceOnion
|
||||
tba.receiveGroupMessageHandler = groupMessageHandler
|
||||
tba.serverAuthedHandler = serverAuthedHandler
|
||||
tba.serverSyncedHandler = serverSyncedHandler
|
||||
tba.serverClosedHandler = serverClosedHandler
|
||||
tba.lastKnownSignature = lastKnownSignature
|
||||
|
@ -34,6 +35,7 @@ type TokenBoardClient struct {
|
|||
applications.AuthApp
|
||||
connection tapir.Connection
|
||||
receiveGroupMessageHandler func(server string, gm *groups.EncryptedGroupMessage)
|
||||
serverAuthedHandler func(server string)
|
||||
serverSyncedHandler func(server string)
|
||||
serverClosedHandler func(server string)
|
||||
|
||||
|
@ -49,6 +51,7 @@ type TokenBoardClient struct {
|
|||
// NewInstance Client a new TokenBoardApp
|
||||
func (ta *TokenBoardClient) NewInstance() tapir.Application {
|
||||
tba := new(TokenBoardClient)
|
||||
tba.serverAuthedHandler = ta.serverAuthedHandler
|
||||
tba.serverSyncedHandler = ta.serverSyncedHandler
|
||||
tba.serverClosedHandler = ta.serverClosedHandler
|
||||
tba.receiveGroupMessageHandler = ta.receiveGroupMessageHandler
|
||||
|
@ -64,7 +67,7 @@ func (ta *TokenBoardClient) Init(connection tapir.Connection) {
|
|||
ta.AuthApp.Init(connection)
|
||||
if connection.HasCapability(applications.AuthCapability) {
|
||||
ta.connection = connection
|
||||
ta.connection.SetCapability(groups.CwtchServerSyncedCapability)
|
||||
ta.serverAuthedHandler(ta.connection.Hostname())
|
||||
log.Debugf("Successfully Initialized Connection")
|
||||
go ta.Listen()
|
||||
// Optimistically acquire many tokens for this server...
|
||||
|
@ -133,6 +136,7 @@ func (ta *TokenBoardClient) Listen() {
|
|||
}
|
||||
}
|
||||
ta.serverSyncedHandler(ta.connection.Hostname())
|
||||
ta.connection.SetCapability(groups.CwtchServerSyncedCapability)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"cwtch.im/cwtch/model"
|
||||
cwtchserver "cwtch.im/cwtch/server"
|
||||
"encoding/base64"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
serverConfigFile = "serverConfig.json"
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.AddEverythingFromPattern("server/app/main")
|
||||
log.AddEverythingFromPattern("server/server")
|
||||
log.SetLevel(log.LevelDebug)
|
||||
configDir := os.Getenv("CWTCH_CONFIG_DIR")
|
||||
|
||||
if len(os.Args) == 2 && os.Args[1] == "gen1" {
|
||||
config := new(cwtchserver.Config)
|
||||
id, pk := primitives.InitializeEphemeralIdentity()
|
||||
tid, tpk := primitives.InitializeEphemeralIdentity()
|
||||
config.PrivateKey = pk
|
||||
config.PublicKey = id.PublicKey()
|
||||
config.TokenServerPrivateKey = tpk
|
||||
config.TokenServerPublicKey = tid.PublicKey()
|
||||
config.MaxBufferLines = 100000
|
||||
config.ServerReporting = cwtchserver.Reporting{
|
||||
LogMetricsToFile: true,
|
||||
ReportingGroupID: "",
|
||||
ReportingServerAddr: "",
|
||||
}
|
||||
config.Save(".", "serverConfig.json")
|
||||
return
|
||||
}
|
||||
|
||||
serverConfig := cwtchserver.LoadConfig(configDir, serverConfigFile)
|
||||
|
||||
// we don't need real randomness for the port, just to avoid a possible conflict...
|
||||
mrand.Seed(int64(time.Now().Nanosecond()))
|
||||
controlPort := mrand.Intn(1000) + 9052
|
||||
|
||||
// generate a random password
|
||||
key := make([]byte, 64)
|
||||
_, err := rand.Read(key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
os.MkdirAll("tordir/tor", 0700)
|
||||
tor.NewTorrc().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("./tordir/tor/torrc")
|
||||
acn, err := tor.NewTorACNWithAuth("tordir", "", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("\nError connecting to Tor: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer acn.Close()
|
||||
|
||||
server := new(cwtchserver.Server)
|
||||
log.Infoln("starting cwtch server...")
|
||||
|
||||
server.Setup(serverConfig)
|
||||
|
||||
// TODO create a random group for testing
|
||||
group, _ := model.NewGroup(tor.GetTorV3Hostname(serverConfig.PublicKey))
|
||||
invite, err := group.Invite()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
bundle := server.KeyBundle().Serialize()
|
||||
log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle))
|
||||
|
||||
log.Infof("Server Tofu Bundle: tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString(bundle), invite)
|
||||
|
||||
// Graceful Shutdown
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-c
|
||||
acn.Close()
|
||||
server.Close()
|
||||
os.Exit(1)
|
||||
}()
|
||||
|
||||
server.Run(acn)
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
#!/bin/sh
|
||||
set -o errexit
|
||||
|
||||
chmod_files() { find $2 -type f -exec chmod -v $1 {} \;
|
||||
}
|
||||
chmod_dirs() { find $2 -type d -exec chmod -v $1 {} \;
|
||||
}
|
||||
|
||||
chown ${TOR_USER}:${TOR_USER} /run/tor/
|
||||
chmod 770 /run/tor
|
||||
|
||||
chown -Rv ${TOR_USER}:${TOR_USER} /var/lib/tor
|
||||
chmod_dirs 700 /var/lib/tor
|
||||
chmod_files 600 /var/lib/tor
|
||||
|
||||
echo -e "\n========================================================"
|
||||
# Display OS version, Tor version & torrc in log
|
||||
echo -e "Alpine Version: \c" && cat /etc/alpine-release
|
||||
tor --version
|
||||
#cat /etc/tor/torrc
|
||||
echo -e "========================================================\n"
|
||||
|
||||
tor -f /etc/tor/torrc
|
||||
|
||||
#Cwtch will crash and burn if 9051 isn't ready
|
||||
sleep 15
|
||||
|
||||
if [ -z "${CWTCH_CONFIG_DIR}" ]; then
|
||||
CWTCH_CONFIG_DIR=/etc/cwtch/
|
||||
fi
|
||||
|
||||
#Run cwtch (or whatever the user passed)
|
||||
CWTCH_CONFIG_DIR=$CWTCH_CONFIG_DIR exec "$@"
|
|
@ -1,27 +0,0 @@
|
|||
User _tor
|
||||
DataDirectory /var/lib/tor
|
||||
|
||||
ORPort 0
|
||||
ExitRelay 0
|
||||
IPv6Exit 0
|
||||
|
||||
#We need this running in the background as the server doesn't launch it itself
|
||||
RunAsDaemon 1
|
||||
|
||||
ClientOnly 1
|
||||
SocksPort 9050
|
||||
|
||||
ControlPort 9051
|
||||
ControlSocket /run/tor/control
|
||||
ControlSocketsGroupWritable 1
|
||||
CookieAuthentication 1
|
||||
CookieAuthFile /run/tor/control.authcookie
|
||||
CookieAuthFileGroupReadable 1
|
||||
#HashedControlPassword 16:B4C8EE980C085EE460AEA9094350DAA9C2B5F841400E9BBA247368400A
|
||||
|
||||
# Run as a relay only (change policy to enable exit node)
|
||||
ExitPolicy reject *:* # no exits allowed
|
||||
ExitPolicy reject6 *:*
|
||||
|
||||
# Additional config built by the entrypoint will go here
|
||||
|
|
@ -1,249 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type counter struct {
|
||||
startTime time.Time
|
||||
count uint64
|
||||
total uint64
|
||||
}
|
||||
|
||||
// Counter providers a threadsafe counter to use for storing long running counts
|
||||
type Counter interface {
|
||||
Add(unit int)
|
||||
Reset()
|
||||
|
||||
Count() int
|
||||
GetStarttime() time.Time
|
||||
}
|
||||
|
||||
// NewCounter initializes a counter starting at time.Now() and a count of 0 and returns it
|
||||
func NewCounter() Counter {
|
||||
c := &counter{startTime: time.Now(), count: 0, total: 0}
|
||||
return c
|
||||
}
|
||||
|
||||
// Add add a count of unit to the counter
|
||||
func (c *counter) Add(unit int) {
|
||||
atomic.AddUint64(&c.count, uint64(unit))
|
||||
}
|
||||
|
||||
// Count returns the count since Start
|
||||
func (c *counter) Count() int {
|
||||
return int(atomic.LoadUint64(&c.count))
|
||||
}
|
||||
|
||||
func (c *counter) Reset() {
|
||||
atomic.StoreUint64(&c.count, 0)
|
||||
c.startTime = time.Now()
|
||||
}
|
||||
|
||||
// GetStarttime returns the starttime of the counter
|
||||
func (c *counter) GetStarttime() time.Time {
|
||||
return c.startTime
|
||||
}
|
||||
|
||||
// MonitorType controls how the monitor will report itself
|
||||
type MonitorType int
|
||||
|
||||
const (
|
||||
// Count indicates the monitor should report in interger format
|
||||
Count MonitorType = iota
|
||||
// Percent indicates the monitor should report in decimal format with 2 places
|
||||
Percent
|
||||
// MegaBytes indicates the monitor should transform the raw number into MBs
|
||||
MegaBytes
|
||||
)
|
||||
|
||||
// MonitorAccumulation controls how monitor data is accumulated over time into larger summary buckets
|
||||
type MonitorAccumulation int
|
||||
|
||||
const (
|
||||
// Cumulative values with sum over time
|
||||
Cumulative MonitorAccumulation = iota
|
||||
// Average values will average over time
|
||||
Average
|
||||
)
|
||||
|
||||
type monitorHistory struct {
|
||||
monitorType MonitorType
|
||||
monitorAccumulation MonitorAccumulation
|
||||
|
||||
starttime time.Time
|
||||
perMinutePerHour [60]float64
|
||||
timeLastHourRotate time.Time
|
||||
perHourForDay [24]float64
|
||||
timeLastDayRotate time.Time
|
||||
perDayForWeek [7]float64
|
||||
timeLastWeekRotate time.Time
|
||||
perWeekForMonth [4]float64
|
||||
timeLastMonthRotate time.Time
|
||||
perMonthForYear [12]float64
|
||||
|
||||
monitor func() float64
|
||||
|
||||
breakChannel chan bool
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// MonitorHistory runs a monitor every minute and rotates and averages the results out across time
|
||||
type MonitorHistory interface {
|
||||
Start()
|
||||
Stop()
|
||||
|
||||
Minutes() []float64
|
||||
Hours() []float64
|
||||
Days() []float64
|
||||
Weeks() []float64
|
||||
Months() []float64
|
||||
|
||||
Report(w *bufio.Writer)
|
||||
}
|
||||
|
||||
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
|
||||
func NewMonitorHistory(t MonitorType, a MonitorAccumulation, monitor func() float64) MonitorHistory {
|
||||
mh := &monitorHistory{monitorType: t, monitorAccumulation: a, starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool),
|
||||
timeLastHourRotate: time.Now(), timeLastDayRotate: time.Now(), timeLastWeekRotate: time.Now(),
|
||||
timeLastMonthRotate: time.Now()}
|
||||
mh.Start()
|
||||
return mh
|
||||
}
|
||||
|
||||
// Start starts a monitorHistory go rountine to run the monitor at intervals and rotate history
|
||||
func (mh *monitorHistory) Start() {
|
||||
go mh.monitorThread()
|
||||
}
|
||||
|
||||
// Stop stops a monitorHistory go routine
|
||||
func (mh *monitorHistory) Stop() {
|
||||
mh.breakChannel <- true
|
||||
}
|
||||
|
||||
// Minutes returns the last 60 minute monitoring results
|
||||
func (mh *monitorHistory) Minutes() []float64 {
|
||||
return mh.returnCopy(mh.perMinutePerHour[:])
|
||||
}
|
||||
|
||||
// Hours returns the last 24 hourly averages of monitor results
|
||||
func (mh *monitorHistory) Hours() []float64 {
|
||||
return mh.returnCopy(mh.perHourForDay[:])
|
||||
}
|
||||
|
||||
// Days returns the last 7 day averages of monitor results
|
||||
func (mh *monitorHistory) Days() []float64 {
|
||||
return mh.returnCopy(mh.perDayForWeek[:])
|
||||
}
|
||||
|
||||
// Weeks returns the last 4 weeks of averages of monitor results
|
||||
func (mh *monitorHistory) Weeks() []float64 {
|
||||
return mh.returnCopy(mh.perWeekForMonth[:])
|
||||
}
|
||||
|
||||
// Months returns the last 12 months of averages of monitor results
|
||||
func (mh *monitorHistory) Months() []float64 {
|
||||
return mh.returnCopy(mh.perMonthForYear[:])
|
||||
}
|
||||
|
||||
func (mh *monitorHistory) Report(w *bufio.Writer) {
|
||||
mh.lock.Lock()
|
||||
fmt.Fprintln(w, "Minutes:", reportLine(mh.monitorType, mh.perMinutePerHour[:]))
|
||||
fmt.Fprintln(w, "Hours: ", reportLine(mh.monitorType, mh.perHourForDay[:]))
|
||||
fmt.Fprintln(w, "Days: ", reportLine(mh.monitorType, mh.perDayForWeek[:]))
|
||||
fmt.Fprintln(w, "Weeks: ", reportLine(mh.monitorType, mh.perWeekForMonth[:]))
|
||||
fmt.Fprintln(w, "Months: ", reportLine(mh.monitorType, mh.perMonthForYear[:]))
|
||||
mh.lock.Unlock()
|
||||
}
|
||||
|
||||
func reportLine(t MonitorType, array []float64) string {
|
||||
switch t {
|
||||
case Count:
|
||||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.0f", array)), " "), "[]")
|
||||
case Percent:
|
||||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%.2f", array)), " "), "[]")
|
||||
case MegaBytes:
|
||||
mbs := make([]int, len(array))
|
||||
for i, b := range array {
|
||||
mbs[i] = int(b) / 1024 / 1024
|
||||
}
|
||||
return strings.Trim(strings.Join(strings.Fields(fmt.Sprintf("%d", mbs)), "MBs "), "[]") + "MBs"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (mh *monitorHistory) returnCopy(slice []float64) []float64 {
|
||||
retSlice := make([]float64, len(slice))
|
||||
mh.lock.Lock()
|
||||
for i, v := range slice {
|
||||
retSlice[i] = v
|
||||
}
|
||||
mh.lock.Unlock()
|
||||
return retSlice
|
||||
}
|
||||
|
||||
func rotateAndAccumulate(array []float64, newVal float64, acc MonitorAccumulation) float64 {
|
||||
total := float64(0.0)
|
||||
for i := len(array) - 1; i > 0; i-- {
|
||||
array[i] = array[i-1]
|
||||
total += array[i]
|
||||
}
|
||||
array[0] = newVal
|
||||
total += newVal
|
||||
if acc == Cumulative {
|
||||
return total
|
||||
}
|
||||
return total / float64(len(array))
|
||||
}
|
||||
func accumulate(array []float64, acc MonitorAccumulation) float64 {
|
||||
total := float64(0)
|
||||
for _, x := range array {
|
||||
total += x
|
||||
}
|
||||
if acc == Cumulative {
|
||||
return total
|
||||
}
|
||||
return total / float64(len(array))
|
||||
}
|
||||
|
||||
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
|
||||
func (mh *monitorHistory) monitorThread() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Minute):
|
||||
mh.lock.Lock()
|
||||
|
||||
minuteAvg := rotateAndAccumulate(mh.perMinutePerHour[:], mh.monitor(), mh.monitorAccumulation)
|
||||
|
||||
if time.Now().Sub(mh.timeLastHourRotate) > time.Hour {
|
||||
rotateAndAccumulate(mh.perHourForDay[:], minuteAvg, mh.monitorAccumulation)
|
||||
mh.timeLastHourRotate = time.Now()
|
||||
}
|
||||
|
||||
if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 {
|
||||
rotateAndAccumulate(mh.perDayForWeek[:], accumulate(mh.perHourForDay[:], mh.monitorAccumulation), mh.monitorAccumulation)
|
||||
mh.timeLastDayRotate = time.Now()
|
||||
}
|
||||
|
||||
if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 {
|
||||
rotateAndAccumulate(mh.perWeekForMonth[:], accumulate(mh.perDayForWeek[:], mh.monitorAccumulation), mh.monitorAccumulation)
|
||||
mh.timeLastWeekRotate = time.Now()
|
||||
}
|
||||
|
||||
if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 {
|
||||
rotateAndAccumulate(mh.perMonthForYear[:], accumulate(mh.perWeekForMonth[:], mh.monitorAccumulation), mh.monitorAccumulation)
|
||||
mh.timeLastMonthRotate = time.Now()
|
||||
}
|
||||
|
||||
mh.lock.Unlock()
|
||||
|
||||
case <-mh.breakChannel:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCounter(t *testing.T) {
|
||||
starttime := time.Now()
|
||||
c := NewCounter()
|
||||
|
||||
max := 100
|
||||
done := make(chan bool, max)
|
||||
|
||||
// slightly stress test atomic nature of metric by flooding with threads Add()ing
|
||||
for i := 0; i < max; i++ {
|
||||
go func() {
|
||||
c.Add(1)
|
||||
done <- true
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < max; i++ {
|
||||
<-done
|
||||
}
|
||||
|
||||
val := c.Count()
|
||||
if val != 100 {
|
||||
t.Errorf("counter count was not 100")
|
||||
}
|
||||
|
||||
counterStart := c.GetStarttime()
|
||||
|
||||
if counterStart.Sub(starttime) > time.Millisecond {
|
||||
t.Errorf("counter's starttime was innaccurate %v", counterStart.Sub(starttime))
|
||||
}
|
||||
}
|
|
@ -1,119 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"github.com/struCoder/pidusage"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
reportFile = "serverMonitorReport.txt"
|
||||
)
|
||||
|
||||
// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns
|
||||
type Monitors struct {
|
||||
MessageCounter Counter
|
||||
TotalMessageCounter Counter
|
||||
Messages MonitorHistory
|
||||
CPU MonitorHistory
|
||||
Memory MonitorHistory
|
||||
ClientConns MonitorHistory
|
||||
starttime time.Time
|
||||
breakChannel chan bool
|
||||
log bool
|
||||
configDir string
|
||||
}
|
||||
|
||||
// Start initializes a Monitors's monitors
|
||||
func (mp *Monitors) Start(ts tapir.Service, configDir string, log bool) {
|
||||
mp.log = log
|
||||
mp.configDir = configDir
|
||||
mp.starttime = time.Now()
|
||||
mp.breakChannel = make(chan bool)
|
||||
mp.MessageCounter = NewCounter()
|
||||
|
||||
// Maintain a count of total messages
|
||||
mp.TotalMessageCounter = NewCounter()
|
||||
mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) {
|
||||
c = float64(mp.MessageCounter.Count())
|
||||
mp.TotalMessageCounter.Add(int(c))
|
||||
mp.MessageCounter.Reset()
|
||||
return
|
||||
})
|
||||
|
||||
var pidUsageLock sync.Mutex
|
||||
mp.CPU = NewMonitorHistory(Percent, Average, func() float64 {
|
||||
pidUsageLock.Lock()
|
||||
defer pidUsageLock.Unlock()
|
||||
sysInfo, _ := pidusage.GetStat(os.Getpid())
|
||||
return float64(sysInfo.CPU)
|
||||
})
|
||||
mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 {
|
||||
pidUsageLock.Lock()
|
||||
defer pidUsageLock.Unlock()
|
||||
sysInfo, _ := pidusage.GetStat(os.Getpid())
|
||||
return float64(sysInfo.Memory)
|
||||
})
|
||||
|
||||
// TODO: replace with ts.
|
||||
mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ts.Metrics().ConnectionCount) })
|
||||
|
||||
if mp.log {
|
||||
go mp.run()
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *Monitors) run() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Minute):
|
||||
mp.report()
|
||||
case <-mp.breakChannel:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *Monitors) report() {
|
||||
f, err := os.Create(path.Join(mp.configDir, reportFile))
|
||||
if err != nil {
|
||||
log.Errorf("Could not open monitor reporting file: %v", err)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
w := bufio.NewWriter(f)
|
||||
|
||||
fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime))
|
||||
|
||||
fmt.Fprintln(w, "messages:")
|
||||
mp.Messages.Report(w)
|
||||
|
||||
fmt.Fprintln(w, "\nClient Connections:")
|
||||
mp.ClientConns.Report(w)
|
||||
|
||||
fmt.Fprintln(w, "\nCPU:")
|
||||
mp.CPU.Report(w)
|
||||
|
||||
fmt.Fprintln(w, "\nMemory:")
|
||||
mp.Memory.Report(w)
|
||||
|
||||
w.Flush()
|
||||
}
|
||||
|
||||
// Stop stops all the monitors in a Monitors
|
||||
func (mp *Monitors) Stop() {
|
||||
if mp.log {
|
||||
mp.breakChannel <- true
|
||||
}
|
||||
mp.Messages.Stop()
|
||||
mp.CPU.Stop()
|
||||
mp.Memory.Stop()
|
||||
mp.ClientConns.Stop()
|
||||
}
|
161
server/server.go
161
server/server.go
|
@ -1,161 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/server/metrics"
|
||||
"cwtch.im/cwtch/server/storage"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
tor2 "git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/persistence"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Server encapsulates a complete, compliant Cwtch server.
|
||||
type Server struct {
|
||||
service tapir.Service
|
||||
config Config
|
||||
metricsPack metrics.Monitors
|
||||
tokenTapirService tapir.Service
|
||||
tokenServer *privacypass.TokenServer
|
||||
tokenService primitives.Identity
|
||||
tokenServicePrivKey ed25519.PrivateKey
|
||||
tokenServiceStopped bool
|
||||
onionServiceStopped bool
|
||||
running bool
|
||||
existingMessageCount int
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// Setup initialized a server from a given configuration
|
||||
func (s *Server) Setup(serverConfig Config) {
|
||||
s.config = serverConfig
|
||||
bs := new(persistence.BoltPersistence)
|
||||
bs.Open(path.Join(serverConfig.ConfigDir, "tokens.db"))
|
||||
s.tokenServer = privacypass.NewTokenServerFromStore(&serverConfig.TokenServiceK, bs)
|
||||
log.Infof("Y: %v", s.tokenServer.Y)
|
||||
s.tokenService = s.config.TokenServiceIdentity()
|
||||
s.tokenServicePrivKey = s.config.TokenServerPrivateKey
|
||||
}
|
||||
|
||||
// Identity returns the main onion identity of the server
|
||||
func (s *Server) Identity() primitives.Identity {
|
||||
return s.config.Identity()
|
||||
}
|
||||
|
||||
// Run starts a server with the given privateKey
|
||||
func (s *Server) Run(acn connectivity.ACN) error {
|
||||
addressIdentity := tor.GetTorV3Hostname(s.config.PublicKey)
|
||||
identity := primitives.InitializeIdentity("", &s.config.PrivateKey, &s.config.PublicKey)
|
||||
var service tapir.Service
|
||||
service = new(tor2.BaseOnionService)
|
||||
service.Init(acn, s.config.PrivateKey, &identity)
|
||||
s.service = service
|
||||
log.Infof("cwtch server running on cwtch:%s\n", addressIdentity+".onion:")
|
||||
s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
|
||||
|
||||
ms := new(storage.MessageStore)
|
||||
err := ms.Init(s.config.ConfigDir, s.config.MaxBufferLines, s.metricsPack.MessageCounter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Needed because we only collect metrics on a per-session basis
|
||||
// TODO fix metrics so they persist across sessions?
|
||||
s.existingMessageCount = len(ms.FetchMessages())
|
||||
|
||||
s.tokenTapirService = new(tor2.BaseOnionService)
|
||||
s.tokenTapirService.Init(acn, s.tokenServicePrivKey, &s.tokenService)
|
||||
tokenApplication := new(applications.TokenApplication)
|
||||
tokenApplication.TokenService = s.tokenServer
|
||||
powTokenApp := new(applications.ApplicationChain).
|
||||
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
|
||||
ChainApplication(tokenApplication, applications.HasTokensCapability)
|
||||
go func() {
|
||||
s.tokenTapirService.Listen(powTokenApp)
|
||||
s.tokenServiceStopped = true
|
||||
}()
|
||||
go func() {
|
||||
s.service.Listen(NewTokenBoardServer(ms, s.tokenServer))
|
||||
s.onionServiceStopped = true
|
||||
}()
|
||||
|
||||
s.lock.Lock()
|
||||
s.running = true
|
||||
s.lock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// KeyBundle provides the signed keybundle of the server
|
||||
func (s *Server) KeyBundle() *model.KeyBundle {
|
||||
kb := model.NewKeyBundle()
|
||||
identity := s.config.Identity()
|
||||
kb.Keys[model.KeyTypeServerOnion] = model.Key(identity.Hostname())
|
||||
kb.Keys[model.KeyTypeTokenOnion] = model.Key(s.tokenService.Hostname())
|
||||
kb.Keys[model.KeyTypePrivacyPass] = model.Key(s.tokenServer.Y.String())
|
||||
kb.Sign(identity)
|
||||
return kb
|
||||
}
|
||||
|
||||
// CheckStatus returns true if the server is running and/or an error if any part of the server needs to be restarted.
|
||||
func (s *Server) CheckStatus() (bool, error) {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
if s.onionServiceStopped == true || s.tokenServiceStopped == true {
|
||||
return s.running, fmt.Errorf("one of more server components are down: onion:%v token service: %v", s.onionServiceStopped, s.tokenServiceStopped)
|
||||
}
|
||||
return s.running, nil
|
||||
}
|
||||
|
||||
// Shutdown kills the app closing all connections and freeing all goroutines
|
||||
func (s *Server) Shutdown() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.service.Shutdown()
|
||||
s.tokenTapirService.Shutdown()
|
||||
s.metricsPack.Stop()
|
||||
s.running = true
|
||||
|
||||
}
|
||||
|
||||
// Statistics is an encapsulation of information about the server that an operator might want to know at a glance.
|
||||
type Statistics struct {
|
||||
TotalMessages int
|
||||
}
|
||||
|
||||
// GetStatistics is a stub method for providing some high level information about
|
||||
// the server operation to bundling applications (e.g. the UI)
|
||||
func (s *Server) GetStatistics() Statistics {
|
||||
// TODO Statistics from Metrics is very awkward. Metrics needs an overhaul to make safe
|
||||
total := s.existingMessageCount
|
||||
if s.metricsPack.TotalMessageCounter != nil {
|
||||
total += s.metricsPack.TotalMessageCounter.Count()
|
||||
}
|
||||
|
||||
return Statistics{
|
||||
TotalMessages: total,
|
||||
}
|
||||
}
|
||||
|
||||
// ConfigureAutostart sets whether this server should autostart (in the Cwtch UI/bundling application)
|
||||
func (s *Server) ConfigureAutostart(autostart bool) {
|
||||
s.config.AutoStart = autostart
|
||||
s.config.Save(s.config.ConfigDir, s.config.FilePath)
|
||||
}
|
||||
|
||||
// Close shuts down the cwtch server in a safe way.
|
||||
func (s *Server) Close() {
|
||||
log.Infof("Shutting down server")
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
log.Infof("Closing Token Server Database...")
|
||||
s.tokenServer.Close()
|
||||
}
|
|
@ -1,98 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"github.com/gtank/ristretto255"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
)
|
||||
|
||||
// Reporting is a struct for storing a the config a server needs to be a peer, and connect to a group to report
|
||||
type Reporting struct {
|
||||
LogMetricsToFile bool `json:"logMetricsToFile"`
|
||||
ReportingGroupID string `json:"reportingGroupId"`
|
||||
ReportingServerAddr string `json:"reportingServerAddr"`
|
||||
}
|
||||
|
||||
// Config is a struct for storing basic server configuration
|
||||
type Config struct {
|
||||
ConfigDir string `json:"-"`
|
||||
FilePath string `json:"-"`
|
||||
MaxBufferLines int `json:"maxBufferLines"`
|
||||
|
||||
PublicKey ed25519.PublicKey `json:"publicKey"`
|
||||
PrivateKey ed25519.PrivateKey `json:"privateKey"`
|
||||
|
||||
TokenServerPublicKey ed25519.PublicKey `json:"tokenServerPublicKey"`
|
||||
TokenServerPrivateKey ed25519.PrivateKey `json:"tokenServerPrivateKey"`
|
||||
|
||||
TokenServiceK ristretto255.Scalar `json:"tokenServiceK"`
|
||||
|
||||
ServerReporting Reporting `json:"serverReporting"`
|
||||
AutoStart bool `json:"autostart"`
|
||||
}
|
||||
|
||||
// Identity returns an encapsulation of the servers keys
|
||||
func (config *Config) Identity() primitives.Identity {
|
||||
return primitives.InitializeIdentity("", &config.PrivateKey, &config.PublicKey)
|
||||
}
|
||||
|
||||
// TokenServiceIdentity returns an encapsulation of the servers token server (experimental)
|
||||
func (config *Config) TokenServiceIdentity() primitives.Identity {
|
||||
return primitives.InitializeIdentity("", &config.TokenServerPrivateKey, &config.TokenServerPublicKey)
|
||||
}
|
||||
|
||||
// Save dumps the latest version of the config to a json file given by filename
|
||||
func (config *Config) Save(dir, filename string) {
|
||||
log.Infof("Saving config to %s\n", path.Join(dir, filename))
|
||||
bytes, _ := json.MarshalIndent(config, "", "\t")
|
||||
ioutil.WriteFile(path.Join(dir, filename), bytes, 0600)
|
||||
}
|
||||
|
||||
// LoadConfig loads a Config from a json file specified by filename
|
||||
func LoadConfig(configDir, filename string) Config {
|
||||
log.Infof("Loading config from %s\n", path.Join(configDir, filename))
|
||||
config := Config{}
|
||||
|
||||
id, pk := primitives.InitializeEphemeralIdentity()
|
||||
tid, tpk := primitives.InitializeEphemeralIdentity()
|
||||
config.PrivateKey = pk
|
||||
config.PublicKey = id.PublicKey()
|
||||
config.TokenServerPrivateKey = tpk
|
||||
config.TokenServerPublicKey = tid.PublicKey()
|
||||
config.MaxBufferLines = 100000
|
||||
config.ServerReporting = Reporting{
|
||||
LogMetricsToFile: true,
|
||||
ReportingGroupID: "",
|
||||
ReportingServerAddr: "",
|
||||
}
|
||||
config.AutoStart = false
|
||||
config.ConfigDir = configDir
|
||||
config.FilePath = filename
|
||||
|
||||
k := new(ristretto255.Scalar)
|
||||
b := make([]byte, 64)
|
||||
_, err := rand.Read(b)
|
||||
if err != nil {
|
||||
// unable to generate secure random numbers
|
||||
panic("unable to generate secure random numbers")
|
||||
}
|
||||
k.FromUniformBytes(b)
|
||||
config.TokenServiceK = *k
|
||||
|
||||
raw, err := ioutil.ReadFile(path.Join(configDir, filename))
|
||||
if err == nil {
|
||||
err = json.Unmarshal(raw, &config)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("reading config: %v", err)
|
||||
}
|
||||
}
|
||||
// Always save (first time generation, new version with new variables populated)
|
||||
config.Save(configDir, filename)
|
||||
return config
|
||||
}
|
|
@ -1,118 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"cwtch.im/cwtch/server/storage"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
)
|
||||
|
||||
// NewTokenBoardServer generates new Server for Token Board
|
||||
func NewTokenBoardServer(store storage.MessageStoreInterface, tokenService *privacypass.TokenServer) tapir.Application {
|
||||
tba := new(TokenboardServer)
|
||||
tba.TokenService = tokenService
|
||||
tba.LegacyMessageStore = store
|
||||
return tba
|
||||
}
|
||||
|
||||
// TokenboardServer defines the token board server
|
||||
type TokenboardServer struct {
|
||||
applications.AuthApp
|
||||
connection tapir.Connection
|
||||
TokenService *privacypass.TokenServer
|
||||
LegacyMessageStore storage.MessageStoreInterface
|
||||
}
|
||||
|
||||
// NewInstance creates a new TokenBoardApp
|
||||
func (ta *TokenboardServer) NewInstance() tapir.Application {
|
||||
tba := new(TokenboardServer)
|
||||
tba.TokenService = ta.TokenService
|
||||
tba.LegacyMessageStore = ta.LegacyMessageStore
|
||||
return tba
|
||||
}
|
||||
|
||||
// Init initializes the cryptographic TokenBoardApp
|
||||
func (ta *TokenboardServer) Init(connection tapir.Connection) {
|
||||
ta.AuthApp.Init(connection)
|
||||
if connection.HasCapability(applications.AuthCapability) {
|
||||
ta.connection = connection
|
||||
go ta.Listen()
|
||||
} else {
|
||||
connection.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Listen processes the messages for this application
|
||||
func (ta *TokenboardServer) Listen() {
|
||||
for {
|
||||
data := ta.connection.Expect()
|
||||
if len(data) == 0 {
|
||||
log.Debugf("Server Closing Connection")
|
||||
ta.connection.Close()
|
||||
return // connection is closed
|
||||
}
|
||||
|
||||
var message groups.Message
|
||||
if err := json.Unmarshal(data, &message); err != nil {
|
||||
log.Debugf("Server Closing Connection Because of Malformed Client Packet %v", err)
|
||||
ta.connection.Close()
|
||||
return // connection is closed
|
||||
}
|
||||
|
||||
switch message.MessageType {
|
||||
case groups.PostRequestMessage:
|
||||
if message.PostRequest != nil {
|
||||
postrequest := *message.PostRequest
|
||||
log.Debugf("Received a Post Message Request: %v", ta.connection.Hostname())
|
||||
ta.postMessageRequest(postrequest)
|
||||
} else {
|
||||
log.Debugf("Server Closing Connection Because of PostRequestMessage Client Packet")
|
||||
ta.connection.Close()
|
||||
return // connection is closed
|
||||
}
|
||||
case groups.ReplayRequestMessage:
|
||||
if message.ReplayRequest != nil {
|
||||
log.Debugf("Received Replay Request %v", message.ReplayRequest)
|
||||
messages := ta.LegacyMessageStore.FetchMessages()
|
||||
response, _ := json.Marshal(groups.Message{MessageType: groups.ReplayResultMessage, ReplayResult: &groups.ReplayResult{NumMessages: len(messages)}})
|
||||
log.Debugf("Sending Replay Response %v", groups.ReplayResult{NumMessages: len(messages)})
|
||||
ta.connection.Send(response)
|
||||
for _, message := range messages {
|
||||
data, _ = json.Marshal(message)
|
||||
ta.connection.Send(data)
|
||||
}
|
||||
// Set sync and then send any new messages that might have happened while we were syncing
|
||||
ta.connection.SetCapability(groups.CwtchServerSyncedCapability)
|
||||
newMessages := ta.LegacyMessageStore.FetchMessages()
|
||||
if len(newMessages) > len(messages) {
|
||||
for _, message := range newMessages[len(messages):] {
|
||||
data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: *message}})
|
||||
ta.connection.Send(data)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Debugf("Server Closing Connection Because of Malformed ReplayRequestMessage Packet")
|
||||
ta.connection.Close()
|
||||
return // connection is closed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ta *TokenboardServer) postMessageRequest(pr groups.PostRequest) {
|
||||
if err := ta.TokenService.SpendToken(pr.Token, append(pr.EGM.ToBytes(), ta.connection.ID().Hostname()...)); err == nil {
|
||||
log.Debugf("Token is valid")
|
||||
ta.LegacyMessageStore.AddMessage(pr.EGM)
|
||||
data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: true}})
|
||||
ta.connection.Send(data)
|
||||
data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: pr.EGM}})
|
||||
ta.connection.Broadcast(data, groups.CwtchServerSyncedCapability)
|
||||
} else {
|
||||
log.Debugf("Attempt to spend an invalid token: %v", err)
|
||||
data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: false}})
|
||||
ta.connection.Send(data)
|
||||
}
|
||||
}
|
|
@ -1,152 +0,0 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"cwtch.im/cwtch/server/metrics"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
fileStorePartitions = 10
|
||||
fileStoreFilename = "cwtch.messages"
|
||||
directory = "messages"
|
||||
)
|
||||
|
||||
// MessageStoreInterface defines an interface to interact with a store of cwtch messages.
|
||||
type MessageStoreInterface interface {
|
||||
AddMessage(groups.EncryptedGroupMessage)
|
||||
FetchMessages() []*groups.EncryptedGroupMessage
|
||||
}
|
||||
|
||||
// MessageStore is a file-backed implementation of MessageStoreInterface
|
||||
type MessageStore struct {
|
||||
activeLogFile *os.File
|
||||
filePos int
|
||||
storeDirectory string
|
||||
lock sync.Mutex
|
||||
messages []*groups.EncryptedGroupMessage
|
||||
messageCounter metrics.Counter
|
||||
maxBufferLines int
|
||||
bufferPos int
|
||||
bufferRotated bool
|
||||
}
|
||||
|
||||
// Close closes the message store and underlying resources.
|
||||
func (ms *MessageStore) Close() {
|
||||
ms.lock.Lock()
|
||||
ms.messages = nil
|
||||
ms.activeLogFile.Close()
|
||||
ms.lock.Unlock()
|
||||
}
|
||||
|
||||
func (ms *MessageStore) updateBuffer(gm *groups.EncryptedGroupMessage) {
|
||||
ms.messages[ms.bufferPos] = gm
|
||||
ms.bufferPos++
|
||||
if ms.bufferPos == ms.maxBufferLines {
|
||||
ms.bufferPos = 0
|
||||
ms.bufferRotated = true
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *MessageStore) initAndLoadFiles() error {
|
||||
ms.activeLogFile = nil
|
||||
for i := fileStorePartitions - 1; i >= 0; i-- {
|
||||
ms.filePos = 0
|
||||
filename := path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i))
|
||||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
log.Errorf("MessageStore could not open: %v: %v", filename, err)
|
||||
continue
|
||||
}
|
||||
ms.activeLogFile = f
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
for scanner.Scan() {
|
||||
gms := scanner.Text()
|
||||
ms.filePos++
|
||||
gm := &groups.EncryptedGroupMessage{}
|
||||
err := json.Unmarshal([]byte(gms), gm)
|
||||
if err == nil {
|
||||
ms.updateBuffer(gm)
|
||||
}
|
||||
}
|
||||
}
|
||||
if ms.activeLogFile == nil {
|
||||
return fmt.Errorf("Could not create log file to write to in %s", ms.storeDirectory)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MessageStore) updateFile(gm *groups.EncryptedGroupMessage) {
|
||||
s, err := json.Marshal(gm)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to unmarshal group message %v\n", err)
|
||||
}
|
||||
fmt.Fprintf(ms.activeLogFile, "%s\n", s)
|
||||
ms.filePos++
|
||||
if ms.filePos >= ms.maxBufferLines/fileStorePartitions {
|
||||
ms.rotateFileStore()
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *MessageStore) rotateFileStore() {
|
||||
ms.activeLogFile.Close()
|
||||
os.Remove(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, fileStorePartitions-1)))
|
||||
|
||||
for i := fileStorePartitions - 2; i >= 0; i-- {
|
||||
os.Rename(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)), path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i+1)))
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, 0)), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
log.Errorf("Could not open new message store file in: %s", ms.storeDirectory)
|
||||
}
|
||||
ms.filePos = 0
|
||||
ms.activeLogFile = f
|
||||
}
|
||||
|
||||
// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename
|
||||
func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error {
|
||||
ms.storeDirectory = path.Join(appDirectory, directory)
|
||||
os.Mkdir(ms.storeDirectory, 0700)
|
||||
|
||||
ms.bufferPos = 0
|
||||
ms.maxBufferLines = maxBufferLines
|
||||
ms.messages = make([]*groups.EncryptedGroupMessage, maxBufferLines)
|
||||
ms.bufferRotated = false
|
||||
ms.messageCounter = messageCounter
|
||||
|
||||
err := ms.initAndLoadFiles()
|
||||
return err
|
||||
}
|
||||
|
||||
// FetchMessages returns all messages from the backing file.
|
||||
func (ms *MessageStore) FetchMessages() (messages []*groups.EncryptedGroupMessage) {
|
||||
ms.lock.Lock()
|
||||
if !ms.bufferRotated {
|
||||
messages = make([]*groups.EncryptedGroupMessage, ms.bufferPos)
|
||||
copy(messages, ms.messages[0:ms.bufferPos])
|
||||
} else {
|
||||
messages = make([]*groups.EncryptedGroupMessage, ms.maxBufferLines)
|
||||
copy(messages, ms.messages[ms.bufferPos:ms.maxBufferLines])
|
||||
copy(messages[ms.bufferPos:], ms.messages[0:ms.bufferPos])
|
||||
}
|
||||
ms.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// AddMessage adds a GroupMessage to the store
|
||||
func (ms *MessageStore) AddMessage(gm groups.EncryptedGroupMessage) {
|
||||
ms.messageCounter.Add(1)
|
||||
ms.lock.Lock()
|
||||
ms.updateBuffer(&gm)
|
||||
ms.updateFile(&gm)
|
||||
|
||||
ms.lock.Unlock()
|
||||
}
|
|
@ -1,54 +0,0 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"cwtch.im/cwtch/server/metrics"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMessageStore(t *testing.T) {
|
||||
os.Remove("ms.test")
|
||||
ms := new(MessageStore)
|
||||
counter := metrics.NewCounter()
|
||||
ms.Init("./", 1000, counter)
|
||||
for i := 0; i < 499; i++ {
|
||||
gm := groups.EncryptedGroupMessage{
|
||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
||||
}
|
||||
ms.AddMessage(gm)
|
||||
}
|
||||
if counter.Count() != 499 {
|
||||
t.Errorf("Counter should be at 499 was %v", counter.Count())
|
||||
}
|
||||
ms.Close()
|
||||
ms.Init("./", 1000, counter)
|
||||
m := ms.FetchMessages()
|
||||
if len(m) != 499 {
|
||||
t.Errorf("Should have been 499 was %v", len(m))
|
||||
}
|
||||
|
||||
counter.Reset()
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
gm := groups.EncryptedGroupMessage{
|
||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
||||
}
|
||||
ms.AddMessage(gm)
|
||||
}
|
||||
|
||||
m = ms.FetchMessages()
|
||||
if len(m) != 1000 {
|
||||
t.Errorf("Should have been 1000 was %v", len(m))
|
||||
}
|
||||
ms.Close()
|
||||
ms.Init("./", 1000, counter)
|
||||
m = ms.FetchMessages()
|
||||
if len(m) != 999 {
|
||||
t.Errorf("Should have been 999 was %v", len(m))
|
||||
}
|
||||
ms.Close()
|
||||
|
||||
os.RemoveAll("./messages")
|
||||
}
|
|
@ -10,13 +10,10 @@ import (
|
|||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
cwtchserver "cwtch.im/cwtch/server"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"golang.org/x/net/proxy"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"os/user"
|
||||
|
@ -28,11 +25,6 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
serverKeyfile = "./../server/app/private_key"
|
||||
localKeyfile = "./private_key"
|
||||
)
|
||||
|
||||
var (
|
||||
aliceLines = []string{"Hello, I'm Alice", "bye"}
|
||||
bobLines = []string{"Hi, my name is Bob.", "toodles", "welcome"}
|
||||
|
@ -48,24 +40,6 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
|
|||
return numVerified
|
||||
}
|
||||
|
||||
func serverCheck(t *testing.T, serverAddr string) bool {
|
||||
torDialer, err := proxy.SOCKS5("tcp", "127.0.0.1:9050", nil, proxy.Direct)
|
||||
if err != nil {
|
||||
t.Logf("Could not get SOCKS5 proxy: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// Doesn't seem to be a way to turn the default timeout of 2 minutes down
|
||||
conn, err := torDialer.Dial("tcp", serverAddr+".onion:9878")
|
||||
if err != nil {
|
||||
t.Logf("Could not dial %v: %v", serverAddr, err)
|
||||
return false
|
||||
}
|
||||
|
||||
conn.Close()
|
||||
return true
|
||||
}
|
||||
|
||||
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
|
||||
peerName, _ := peer.GetAttribute(attr.GetLocalScope("name"))
|
||||
for {
|
||||
|
@ -148,32 +122,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
t.Logf("Tor pid: %v", pid)
|
||||
|
||||
// ***** Cwtch Server management *****
|
||||
var server *cwtchserver.Server
|
||||
|
||||
serverOnline := false
|
||||
var serverAddr string
|
||||
var serverKeyBundle []byte
|
||||
if !serverOnline {
|
||||
// launch app with new key
|
||||
fmt.Println("No server found!")
|
||||
server = new(cwtchserver.Server)
|
||||
fmt.Println("Starting cwtch server...")
|
||||
os.Remove("server-test.json")
|
||||
config := cwtchserver.LoadConfig(".", "server-test.json")
|
||||
identity := config.Identity()
|
||||
serverAddr = identity.Hostname()
|
||||
server.Setup(config)
|
||||
serverKeyBundle, _ = json.Marshal(server.KeyBundle())
|
||||
log.Debugf("server key bundle %s", serverKeyBundle)
|
||||
go server.Run(acn)
|
||||
|
||||
// let tor get established
|
||||
fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr)
|
||||
} else {
|
||||
fmt.Printf("Found existing cwtch server %v, using for tests...\n", serverAddr)
|
||||
}
|
||||
|
||||
numGoRoutinesPostServer := runtime.NumGoroutine()
|
||||
const ServerKeyBundleBase64 = "eyJLZXlzIjp7ImJ1bGxldGluX2JvYXJkX29uaW9uIjoibmZoeHp2enhpbnJpcGdkaDR0Mm00eGN5M2NyZjZwNGNiaGVjdGdja3VqM2lkc2pzYW90Z293YWQiLCJwcml2YWN5X3Bhc3NfcHVibGljX2tleSI6IjVwd2hQRGJ0c0EvdFI3ZHlUVUkzakpZZnM1L3Jaai9iQ1ZWZEpTc0Jtbk09IiwidG9rZW5fc2VydmljZV9vbmlvbiI6ImVvd25mcTRsNTZxMmU0NWs0bW03MjdsanJod3Z0aDZ5ZWN0dWV1bXB4emJ5cWxnbXVhZm1qdXFkIn0sIlNpZ25hdHVyZSI6IlY5R3NPMHNZWFJ1bGZxdzdmbGdtclVxSTBXS0JlSFIzNjIvR3hGbWZPekpEZjJaRks2ck9jNVRRR1ZxVWIrbXIwV2xId0pwdXh0UW1JRU9KNkplYkNRPT0ifQ=="
|
||||
const ServerAddr = "nfhxzvzxinripgdh4t2m4xcy3crf6p4cbhectgckuj3idsjsaotgowad"
|
||||
serverKeyBundle, _ := base64.StdEncoding.DecodeString(ServerKeyBundleBase64)
|
||||
|
||||
app := app2.NewApp(acn, "./storage")
|
||||
|
||||
|
@ -230,7 +182,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
if err := alice.AddServer(string(serverKeyBundle)); err != nil {
|
||||
t.Fatalf("Failed to Add Server Bundle %v", err)
|
||||
}
|
||||
alice.JoinServer(serverAddr)
|
||||
alice.JoinServer(ServerAddr)
|
||||
|
||||
fmt.Println("Alice peering with Bob...")
|
||||
alice.PeerWithOnion(bob.GetOnion())
|
||||
|
@ -238,8 +190,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
fmt.Println("Alice peering with Carol...")
|
||||
alice.PeerWithOnion(carol.GetOnion())
|
||||
|
||||
fmt.Println("Creating group on ", serverAddr, "...")
|
||||
groupID, _, err := alice.StartGroup(serverAddr)
|
||||
fmt.Println("Creating group on ", ServerAddr, "...")
|
||||
groupID, _, err := alice.StartGroup(ServerAddr)
|
||||
fmt.Printf("Created group: %v!\n", groupID)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to init group: %v", err)
|
||||
|
@ -375,7 +327,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
numGoRoutinesPostAlice := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Carol joining server...")
|
||||
carol.JoinServer(serverAddr)
|
||||
carol.JoinServer(ServerAddr)
|
||||
waitForPeerGroupConnection(t, carol, groupID)
|
||||
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
|
||||
|
||||
|
@ -463,12 +415,6 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
app.ShutdownPeer(bob.GetOnion())
|
||||
time.Sleep(time.Second * 3)
|
||||
numGoRoutinesPostBob := runtime.NumGoroutine()
|
||||
if server != nil {
|
||||
fmt.Println("Shutting down server...")
|
||||
server.Shutdown()
|
||||
time.Sleep(time.Second * 3)
|
||||
}
|
||||
numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Shutting down Carol...")
|
||||
appClient.ShutdownPeer(carol.GetOnion())
|
||||
|
@ -504,10 +450,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
// Very useful if we are leaking any.
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
|
||||
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
|
||||
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n",
|
||||
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
|
||||
numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN)
|
||||
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
|
||||
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n",
|
||||
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
|
||||
numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN)
|
||||
|
||||
if numGoRoutinesStart != numGoRoutinesPostACN {
|
||||
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN)
|
||||
|
|
|
@ -10,7 +10,6 @@ go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
|
|||
go test -race ${1} -coverprofile=storage.cover.out -v ./storage
|
||||
go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
|
||||
go test -race ${1} -coverprofile=peer.cover.out -v ./peer
|
||||
go test -race ${1} -coverprofile=server.metrics.cover.out -v ./server/metrics
|
||||
echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \
|
||||
awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out
|
||||
rm -rf *.cover.out
|
||||
|
|
Loading…
Reference in New Issue