diff --git a/features/groups/group_functionality.go b/features/groups/group_functionality.go index d4e3089..77be796 100644 --- a/features/groups/group_functionality.go +++ b/features/groups/group_functionality.go @@ -10,15 +10,6 @@ import ( "git.openprivacy.ca/cwtch.im/libcwtch-go/constants" "git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers" ) -<<<<<<< HEAD -"git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers" -======= -"git.openprivacy.ca/cwtch.im/libcwtch-go/features" -"git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers" -"git.openprivacy.ca/openprivacy/log" -"strings" ->>>>>>> minor support to manage profile servers -) const groupExperiment = "tapir-groups-experiment" diff --git a/features/servers/servers_functionality.go b/features/servers/servers_functionality.go index a18a1d4..f15d361 100644 --- a/features/servers/servers_functionality.go +++ b/features/servers/servers_functionality.go @@ -4,13 +4,14 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model/attr" "fmt" - "git.openprivacy.ca/cwtch.im/libcwtch-go/constants" "git.openprivacy.ca/cwtch.im/server" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "os" "path/filepath" + "strconv" "sync" + "time" ) const serversExperiment = "servers-experiment" @@ -20,10 +21,13 @@ const ( NewServer = event.Type("NewServer") ServerIntentUpdate = event.Type("ServerIntentUpdate") ServerDeleted = event.Type("ServerDeleted") + ServerStatsUpdate = event.Type("ServerStatsUpdate") ) const ( - Intent = event.Field("Intent") + Intent = event.Field("Intent") + TotalMessages = event.Field("TotalMessages") + Connections = event.Field("Connections") ) const ( @@ -43,10 +47,15 @@ type ServerInfo struct { StorageType string } +type PublishFn func(event.Event) + var lock sync.Mutex var appServers server.Servers +var publishFn PublishFn +var killStatsUpdate chan bool = make(chan bool, 1) +var enabled bool = false -func InitServers(acn connectivity.ACN, appdir string) { +func InitServers(acn connectivity.ACN, appdir string, pfn PublishFn) { lock.Lock() defer lock.Unlock() if appServers == nil { @@ -56,16 +65,26 @@ func InitServers(acn connectivity.ACN, appdir string) { log.Errorf("Could not init servers directory: %s", err) } appServers = server.NewServers(acn, serversDir) - appServers.LoadServers(constants.DefactoPasswordForUnencryptedProfiles) + publishFn = pfn } } -func DeactivateServers() { +func Disable() { lock.Lock() defer lock.Unlock() if appServers != nil { appServers.Stop() } + if enabled { + enabled = false + killStatsUpdate <- true + } +} + +func Enabled() bool { + lock.Lock() + defer lock.Unlock() + return enabled } // ServersFunctionality provides experiment gated server functionality @@ -82,8 +101,23 @@ func ExperimentGate(experimentMap map[string]bool) (*ServersFunctionality, error return nil, fmt.Errorf("gated by %v", serversExperiment) } +func (sf *ServersFunctionality) Enable() { + lock.Lock() + defer lock.Unlock() + if appServers != nil && !enabled { + enabled = true + go cacheForwardServerMetricUpdates() + } +} + func (sf *ServersFunctionality) LoadServers(password string) ([]string, error) { - return appServers.LoadServers(password) + servers, err := appServers.LoadServers(password) + // server:1.3/libcwtch-go:1.4 accidentely enabled monitor logging by default. make sure it's turned off + for _, onion := range servers { + server := appServers.GetServer(onion) + server.SetMonitorLogging(false) + } + return servers, err } func (sf *ServersFunctionality) CreateServer(password string) (server.Server, error) { @@ -94,6 +128,14 @@ func (sf *ServersFunctionality) GetServer(onion string) server.Server { return appServers.GetServer(onion) } +func (sf *ServersFunctionality) GetServerStatistics(onion string) server.Statistics { + s := appServers.GetServer(onion) + if s != nil { + return s.GetStatistics() + } + return server.Statistics{} +} + func (sf *ServersFunctionality) ListServers() []string { return appServers.ListServers() } @@ -143,3 +185,31 @@ func (si *ServerInfo) EnrichEvent(e *event.Event) { e.Data["Running"] = "false" } } + +// cacheForwardServerMetricUpdates every minute gets metrics for all servers, and if they have changed, sends events to the UI +func cacheForwardServerMetricUpdates() { + var cache map[string]server.Statistics = make(map[string]server.Statistics) + duration := time.Second // allow first load + for { + select { + case <-time.After(duration): + duration = time.Minute + serverList := appServers.ListServers() + for _, serverOnion := range serverList { + server := appServers.GetServer(serverOnion) + newStats := server.GetStatistics() + if stats, ok := cache[serverOnion]; !ok || stats.TotalConnections != newStats.TotalConnections || stats.TotalMessages != newStats.TotalMessages { + cache[serverOnion] = newStats + publishFn(event.NewEventList(ServerStatsUpdate, event.Identity, serverOnion, TotalMessages, strconv.Itoa(newStats.TotalMessages), Connections, strconv.Itoa(newStats.TotalConnections))) + } + } + case <-killStatsUpdate: + return + } + } +} + +func Shutdown() { + Disable() + appServers.Destroy() +} diff --git a/lib.go b/lib.go index 71fd9b1..3eb5a77 100644 --- a/lib.go +++ b/lib.go @@ -93,7 +93,6 @@ func StartCwtch(appDir string, torPath string) int { log.SetUseColor(false) } log.SetLevel(log.LevelInfo) - log.AddEverythingFromPattern("libcwtch-go") if logLevel := os.Getenv("LOG_LEVEL"); strings.ToLower(logLevel) == "debug" { log.SetLevel(log.LevelDebug) } @@ -143,7 +142,7 @@ func _startCwtch(appDir string, torPath string) { err := os.MkdirAll(appDir, 0700) if err != nil { log.Errorf("Error creating appDir %v: %v\n", appDir, err) - eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error creating appDir %v: %v", appDir, err))) + eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error creating appDir %v: %v", appDir, err))) return } @@ -170,7 +169,7 @@ func _startCwtch(appDir string, torPath string) { if err != nil { log.Errorf("error creating tor data directory: %v. Aborting app start up", err) - eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err))) + eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err))) return } @@ -178,22 +177,22 @@ func _startCwtch(appDir string, torPath string) { if err != nil { log.Errorf("error constructing torrc: %v", err) - eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err))) + eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err))) return } acn, err := tor.NewTorACNWithAuth(appDir, torPath, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) if err != nil { log.Errorf("Error connecting to Tor replacing with ErrorACN: %v\n", err) - eventHandler.PublishAppEvent(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err))) + eventHandler.Push(event.NewEventList(utils.CwtchStartError, event.Error, fmt.Sprintf("Error connecting to Tor: %v", err))) return } globalAppDir = appDir globalACN = acn - newApp := app.NewApp(acn, appDir) - servers.InitServers(acn, appDir) + application = app.NewApp(acn, appDir) + servers.InitServers(acn, appDir, eventHandler.Push) - eventHandler.HandleApp(newApp) + eventHandler.HandleApp(application) peer.DefaultEventsToHandle = []event.Type{ event.EncryptedGroupMessage, @@ -215,8 +214,12 @@ func _startCwtch(appDir string, torPath string) { settings := utils.ReadGlobalSettings() settingsJson, _ := json.Marshal(settings) - newApp.LoadProfiles(constants.DefactoPasswordForUnencryptedProfiles) - application = newApp + application.LoadProfiles(constants.DefactoPasswordForUnencryptedProfiles) + LoadServers(constants.DefactoPasswordForUnencryptedProfiles) + serversHandler, err := servers.ExperimentGate(utils.ReadGlobalSettings().Experiments) + if err == nil { + serversHandler.Enable() + } publishLoadedServers() LaunchServers() @@ -272,6 +275,7 @@ func ReconnectCwtchForeground() { } } + LoadServers(constants.DefactoPasswordForUnencryptedProfiles) publishLoadedServers() settingsJson, _ := json.Marshal(settings) @@ -323,14 +327,17 @@ func SendAppEvent(eventJson string) { settings := utils.ReadGlobalSettings() - // TODO: Should track state an only launch servers if server experiment re-enabled, not every save of settings - _, err = servers.ExperimentGate(settings.Experiments) + sh, err := servers.ExperimentGate(settings.Experiments) if err == nil { - servers.InitServers(globalACN, globalAppDir) - publishLoadedServers() - LaunchServers() + servers.InitServers(globalACN, globalAppDir, eventHandler.Push) + LoadServers(constants.DefactoPasswordForUnencryptedProfiles) + if !servers.Enabled() { + sh.Enable() + publishLoadedServers() + LaunchServers() + } } else { - servers.DeactivateServers() + servers.Disable() } // Group Experiment Refresh @@ -884,6 +891,7 @@ func ShutdownCwtch() { // Kill the isolate eventHandler.Push(event.NewEvent(event.Shutdown, map[event.Field]string{})) + servers.Shutdown() // Allow for the shutdown events to go through and then purge everything else... log.Infof("Shutting Down Application...") application.Shutdown() diff --git a/utils/eventHandler.go b/utils/eventHandler.go index 53997a4..4cc5784 100644 --- a/utils/eventHandler.go +++ b/utils/eventHandler.go @@ -33,12 +33,6 @@ func NewEventHandler() *EventHandler { return eh } -// PublishAppEvent is a way for libCwtch-go to publish an event for consumption by a UI before a Cwtch app has been initialized -// Main use: to signal an error before a cwtch app could be created -func (eh *EventHandler) PublishAppEvent(event event.Event) { - eh.appBusQueue.Publish(event) -} - func (eh *EventHandler) HandleApp(application app.Application) { eh.app = application application.GetPrimaryBus().Subscribe(event.NewPeer, eh.appBusQueue) @@ -54,6 +48,7 @@ func (eh *EventHandler) HandleApp(application app.Application) { application.GetPrimaryBus().Subscribe(servers.NewServer, eh.appBusQueue) application.GetPrimaryBus().Subscribe(servers.ServerIntentUpdate, eh.appBusQueue) application.GetPrimaryBus().Subscribe(servers.ServerDeleted, eh.appBusQueue) + application.GetPrimaryBus().Subscribe(servers.ServerStatsUpdate, eh.appBusQueue) } func (eh *EventHandler) GetNextEvent() string { @@ -447,6 +442,9 @@ func (eh *EventHandler) forwardProfileMessages(onion string, q event.Queue) { } } +// Push pushes an event onto the app event bus +// It is also a way for libCwtch-go to publish an event for consumption by a UI before a Cwtch app has been initialized +// use: to signal an error before a cwtch app could be created func (eh *EventHandler) Push(newEvent event.Event) { eh.appBusQueue.Publish(newEvent) }