add support for server metrics; delete eventhandler dup fn for push(); make server response to settings change stateful

This commit is contained in:
Dan Ballard 2021-11-25 17:59:00 -08:00
parent 37eb9f681a
commit 69df789355
4 changed files with 104 additions and 37 deletions

View File

@ -10,15 +10,6 @@ import (
"git.openprivacy.ca/cwtch.im/libcwtch-go/constants"
"git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers"
)
<<<<<<< HEAD
"git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers"
=======
"git.openprivacy.ca/cwtch.im/libcwtch-go/features"
"git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers"
"git.openprivacy.ca/openprivacy/log"
"strings"
>>>>>>> minor support to manage profile servers
)
const groupExperiment = "tapir-groups-experiment"

View File

@ -4,13 +4,14 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model/attr"
"fmt"
"git.openprivacy.ca/cwtch.im/libcwtch-go/constants"
"git.openprivacy.ca/cwtch.im/server"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"os"
"path/filepath"
"strconv"
"sync"
"time"
)
const serversExperiment = "servers-experiment"
@ -20,10 +21,13 @@ const (
NewServer = event.Type("NewServer")
ServerIntentUpdate = event.Type("ServerIntentUpdate")
ServerDeleted = event.Type("ServerDeleted")
ServerStatsUpdate = event.Type("ServerStatsUpdate")
)
const (
Intent = event.Field("Intent")
Intent = event.Field("Intent")
TotalMessages = event.Field("TotalMessages")
Connections = event.Field("Connections")
)
const (
@ -43,10 +47,15 @@ type ServerInfo struct {
StorageType string
}
type PublishFn func(event.Event)
var lock sync.Mutex
var appServers server.Servers
var publishFn PublishFn
var killStatsUpdate chan bool = make(chan bool, 1)
var enabled bool = false
func InitServers(acn connectivity.ACN, appdir string) {
func InitServers(acn connectivity.ACN, appdir string, pfn PublishFn) {
lock.Lock()
defer lock.Unlock()
if appServers == nil {
@ -56,16 +65,26 @@ func InitServers(acn connectivity.ACN, appdir string) {
log.Errorf("Could not init servers directory: %s", err)
}
appServers = server.NewServers(acn, serversDir)
appServers.LoadServers(constants.DefactoPasswordForUnencryptedProfiles)
publishFn = pfn
}
}
func DeactivateServers() {
func Disable() {
lock.Lock()
defer lock.Unlock()
if appServers != nil {
appServers.Stop()
}
if enabled {
enabled = false
killStatsUpdate <- true
}
}
func Enabled() bool {
lock.Lock()
defer lock.Unlock()
return enabled
}
// ServersFunctionality provides experiment gated server functionality
@ -82,8 +101,23 @@ func ExperimentGate(experimentMap map[string]bool) (*ServersFunctionality, error
return nil, fmt.Errorf("gated by %v", serversExperiment)
}
func (sf *ServersFunctionality) Enable() {
lock.Lock()
defer lock.Unlock()
if appServers != nil && !enabled {
enabled = true
go cacheForwardServerMetricUpdates()
}
}
func (sf *ServersFunctionality) LoadServers(password string) ([]string, error) {
return appServers.LoadServers(password)
servers, err := appServers.LoadServers(password)
// server:1.3/libcwtch-go:1.4 accidentely enabled monitor logging by default. make sure it's turned off
for _, onion := range servers {
server := appServers.GetServer(onion)
server.SetMonitorLogging(false)
}
return servers, err
}
func (sf *ServersFunctionality) CreateServer(password string) (server.Server, error) {
@ -94,6 +128,14 @@ func (sf *ServersFunctionality) GetServer(onion string) server.Server {
return appServers.GetServer(onion)
}
func (sf *ServersFunctionality) GetServerStatistics(onion string) server.Statistics {
s := appServers.GetServer(onion)
if s != nil {
return s.GetStatistics()
}
return server.Statistics{}
}
func (sf *ServersFunctionality) ListServers() []string {
return appServers.ListServers()
}
@ -143,3 +185,31 @@ func (si *ServerInfo) EnrichEvent(e *event.Event) {
e.Data["Running"] = "false"
}
}
// cacheForwardServerMetricUpdates every minute gets metrics for all servers, and if they have changed, sends events to the UI
func cacheForwardServerMetricUpdates() {
var cache map[string]server.Statistics = make(map[string]server.Statistics)
duration := time.Second // allow first load
for {
select {
case <-time.After(duration):
duration = time.Minute
serverList := appServers.ListServers()
for _, serverOnion := range serverList {
server := appServers.GetServer(serverOnion)
newStats := server.GetStatistics()
if stats, ok := cache[serverOnion]; !ok || stats.TotalConnections != newStats.TotalConnections || stats.TotalMessages != newStats.TotalMessages {
cache[serverOnion] = newStats
publishFn(event.NewEventList(ServerStatsUpdate, event.Identity, serverOnion, TotalMessages, strconv.Itoa(newStats.TotalMessages), Connections, strconv.Itoa(newStats.TotalConnections)))
}
}
case <-killStatsUpdate:
return
}
}
}
func Shutdown() {
Disable()
appServers.Destroy()
}

40
lib.go
View File

@ -93,7 +93,6 @@ func StartCwtch(appDir string, torPath string) int {
log.SetUseColor(false)
}
log.SetLevel(log.LevelInfo)
log.AddEverythingFromPattern("libcwtch-go")
if logLevel := os.Getenv("LOG_LEVEL"); strings.ToLower(logLevel) == "debug" {
log.SetLevel(log.LevelDebug)
}
@ -143,7 +142,7 @@ func _startCwtch(appDir string, torPath string) {
err := os.MkdirAll(appDir, 0700)
if err != nil {
log.Errorf("Error creating appDir %v: %v\n", appDir, err)
eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error creating appDir %v: %v", appDir, err)))
eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error creating appDir %v: %v", appDir, err)))
return
}
@ -170,7 +169,7 @@ func _startCwtch(appDir string, torPath string) {
if err != nil {
log.Errorf("error creating tor data directory: %v. Aborting app start up", err)
eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err)))
eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err)))
return
}
@ -178,22 +177,22 @@ func _startCwtch(appDir string, torPath string) {
if err != nil {
log.Errorf("error constructing torrc: %v", err)
eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err)))
eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err)))
return
}
acn, err := tor.NewTorACNWithAuth(appDir, torPath, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
log.Errorf("Error connecting to Tor replacing with ErrorACN: %v\n", err)
eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err)))
eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err)))
return
}
globalAppDir = appDir
globalACN = acn
newApp := app.NewApp(acn, appDir)
servers.InitServers(acn, appDir)
application = app.NewApp(acn, appDir)
servers.InitServers(acn, appDir, eventHandler.Push)
eventHandler.HandleApp(newApp)
eventHandler.HandleApp(application)
peer.DefaultEventsToHandle = []event.Type{
event.EncryptedGroupMessage,
@ -215,8 +214,12 @@ func _startCwtch(appDir string, torPath string) {
settings := utils.ReadGlobalSettings()
settingsJson, _ := json.Marshal(settings)
newApp.LoadProfiles(constants.DefactoPasswordForUnencryptedProfiles)
application = newApp
application.LoadProfiles(constants.DefactoPasswordForUnencryptedProfiles)
LoadServers(constants.DefactoPasswordForUnencryptedProfiles)
serversHandler, err := servers.ExperimentGate(utils.ReadGlobalSettings().Experiments)
if err == nil {
serversHandler.Enable()
}
publishLoadedServers()
LaunchServers()
@ -272,6 +275,7 @@ func ReconnectCwtchForeground() {
}
}
LoadServers(constants.DefactoPasswordForUnencryptedProfiles)
publishLoadedServers()
settingsJson, _ := json.Marshal(settings)
@ -323,14 +327,17 @@ func SendAppEvent(eventJson string) {
settings := utils.ReadGlobalSettings()
// TODO: Should track state an only launch servers if server experiment re-enabled, not every save of settings
_, err = servers.ExperimentGate(settings.Experiments)
sh, err := servers.ExperimentGate(settings.Experiments)
if err == nil {
servers.InitServers(globalACN, globalAppDir)
publishLoadedServers()
LaunchServers()
servers.InitServers(globalACN, globalAppDir, eventHandler.Push)
LoadServers(constants.DefactoPasswordForUnencryptedProfiles)
if !servers.Enabled() {
sh.Enable()
publishLoadedServers()
LaunchServers()
}
} else {
servers.DeactivateServers()
servers.Disable()
}
// Group Experiment Refresh
@ -884,6 +891,7 @@ func ShutdownCwtch() {
// Kill the isolate
eventHandler.Push(event.NewEvent(event.Shutdown, map[event.Field]string{}))
servers.Shutdown()
// Allow for the shutdown events to go through and then purge everything else...
log.Infof("Shutting Down Application...")
application.Shutdown()

View File

@ -33,12 +33,6 @@ func NewEventHandler() *EventHandler {
return eh
}
// PublishAppEvent is a way for libCwtch-go to publish an event for consumption by a UI before a Cwtch app has been initialized
// Main use: to signal an error before a cwtch app could be created
func (eh *EventHandler) PublishAppEvent(event event.Event) {
eh.appBusQueue.Publish(event)
}
func (eh *EventHandler) HandleApp(application app.Application) {
eh.app = application
application.GetPrimaryBus().Subscribe(event.NewPeer, eh.appBusQueue)
@ -54,6 +48,7 @@ func (eh *EventHandler) HandleApp(application app.Application) {
application.GetPrimaryBus().Subscribe(servers.NewServer, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(servers.ServerIntentUpdate, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(servers.ServerDeleted, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(servers.ServerStatsUpdate, eh.appBusQueue)
}
func (eh *EventHandler) GetNextEvent() string {
@ -447,6 +442,9 @@ func (eh *EventHandler) forwardProfileMessages(onion string, q event.Queue) {
}
}
// Push pushes an event onto the app event bus
// It is also a way for libCwtch-go to publish an event for consumption by a UI before a Cwtch app has been initialized
// use: to signal an error before a cwtch app could be created
func (eh *EventHandler) Push(newEvent event.Event) {
eh.appBusQueue.Publish(newEvent)
}