This repository has been archived on 2021-06-24. You can view files and clone it, but cannot push or open issues or pull requests.
ui/go/features/servers/server_manager.go

262 lines
8.5 KiB
Go

package servers
import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/server"
"cwtch.im/ui/go/constants"
"cwtch.im/ui/go/the"
"cwtch.im/ui/go/ui"
"encoding/base64"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
"math"
"math/big"
"os"
"path"
"sync"
)
// ServerManager is responsible for managing user operated servers
type ServerManager struct {
servers sync.Map
configDir string
acn connectivity.ACN
}
type serverStatusCache struct {
online bool
autostart bool
messages uint64
bundle []byte
}
// LaunchServiceManager is responsible for setting up everything relating to managing servers in the UI.
func LaunchServiceManager(gcd *ui.GrandCentralDispatcher, acn connectivity.ACN, configDir string) {
sm := new(ServerManager)
sm.configDir = configDir
sm.acn = acn
sm.Init(gcd)
}
// initializeServerCache sets up a new cache based on the config. Notably it stores a newly signed keybundle that
// the ui uses to allow people to share server key bundles.
func initializeServerCache(config server.Config) (*server.Server, serverStatusCache) {
newServer := new(server.Server)
newServer.Setup(config)
newServer.KeyBundle()
return newServer, serverStatusCache{
false,
config.AutoStart,
0,
newServer.KeyBundle().Serialize(),
}
}
func (sm *ServerManager) Init(gcd *ui.GrandCentralDispatcher) {
q := event.NewQueue()
the.AppBus.Subscribe(constants.NewServer, q)
the.AppBus.Subscribe(constants.ListServers, q)
the.AppBus.Subscribe(constants.ServerStatusUpdate, q)
the.AppBus.Subscribe(event.Shutdown, q)
// NOTE: Servers don't (yet?) benefit from any kind of encryption of their config (unlike peer profiles)
// FIXME: We assume servers can be malicious, a compromised server will not be able to derive any additional
// metadata from a hosted group (assuming that the server isn't also a group participant) - this logic doesn't
// really hold if the server being hosted as part of the UI - however a large pool of self hosted servers also mitigates
// many of risks e.g lack of server diversification / availability of servers.
// Like many parts of the metadata resistant risk model, it is a compromise
log.Debugf("Reading server directory: %v", sm.configDir)
os.MkdirAll(sm.configDir, 0700)
items, _ := ioutil.ReadDir(sm.configDir)
for _, item := range items {
if item.IsDir() {
log.Debugf("Found Server Directory %v / %v", sm.configDir, item.Name())
config := server.LoadConfig(path.Join(sm.configDir, item.Name()), "serverconfig")
identity := config.Identity()
log.Debugf("Launching Server goroutine for %v", identity.Hostname())
s, cache := initializeServerCache(config)
sm.servers.Store(identity.Hostname(), cache)
go sm.runServer(s)
} else {
log.Debugf("Found non server directory: %v", item.Name())
}
}
// Update the UI with all know servers
sm.ListServers(gcd)
for {
e := q.Next()
switch e.EventType {
case constants.NewServer:
sm.NewServer()
sm.ListServers(gcd)
case constants.ListServers:
sm.ListServers(gcd)
case constants.ServerStatusUpdate:
sm.ListServers(gcd)
case event.Shutdown:
// we don't need to do anything else here, all the server goroutines will also subscribe to the
// shutdown event and return nicely.
return
}
}
}
// NewServer createa a new server
func (sm *ServerManager) NewServer() {
log.Debugf("Adding a new Server")
num, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
if err == nil {
serverDir := path.Join(sm.configDir, num.String())
os.MkdirAll(serverDir, 0700)
config := server.LoadConfig(serverDir, "serverconfig")
identity := config.Identity()
s, cache := initializeServerCache(config)
sm.servers.Store(identity.Hostname(), cache)
go sm.runServer(s)
}
}
// runServer sets up an event queue per server to allow them to manage their own state.
func (sm *ServerManager) runServer(s *server.Server) {
q := event.NewQueue()
the.AppBus.Subscribe(constants.StartServer, q)
the.AppBus.Subscribe(constants.StopServer, q)
the.AppBus.Subscribe(constants.CheckServerStatus, q)
the.AppBus.Subscribe(constants.AutoStart, q)
the.AppBus.Subscribe(event.Shutdown, q)
identity := s.Identity()
cache, ok := sm.servers.Load(identity.Hostname())
if ok {
serverStatusCache := cache.(serverStatusCache)
if serverStatusCache.autostart {
the.AppBus.Publish(event.NewEvent(constants.StartServer, map[event.Field]string{event.Onion: identity.Hostname()}))
}
}
log.Debugf("Launching Server %v", identity.Hostname())
log.Debugf("Launching Event Bus for Server %v", identity.Hostname())
for {
e := q.Next()
switch e.EventType {
case constants.StartServer:
onion := e.Data[event.Onion]
if onion == identity.Hostname() {
if running, _ := s.CheckStatus(); running {
// we are already running
log.Debugf("Server %v Already Running", onion)
continue
}
log.Debugf("Running Server %v", onion)
s.Run(sm.acn)
// TODO Remove this whole blob once we can actually create groups in the UI
// This is for developers who want to test their newly created server with a group.
group, _ := model.NewGroup(tor.GetTorV3Hostname(identity.PublicKey()))
group.SignGroup([]byte{})
invite, err := group.Invite([]byte{})
if err != nil {
panic(err)
}
fmt.Printf("Secret Debugging Group for Testing: %v\n", "torv3"+base64.StdEncoding.EncodeToString(invite))
cache, ok := sm.servers.Load(onion)
if ok {
serverStatusCache := cache.(serverStatusCache)
serverStatusCache.online = true
sm.servers.Store(onion, serverStatusCache)
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
}
}
case constants.StopServer:
onion := e.Data[event.Onion]
if onion == identity.Hostname() {
s.Shutdown()
cache, ok := sm.servers.Load(onion)
if ok {
serverStatusCache := cache.(serverStatusCache)
serverStatusCache.online = false
sm.servers.Store(onion, serverStatusCache)
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
}
}
case constants.CheckServerStatus:
onion := e.Data[event.Onion]
if onion == identity.Hostname() {
// Kick off a restart
if _, err := s.CheckStatus(); err != nil {
s.Shutdown()
s.Run(sm.acn)
}
cache, ok := sm.servers.Load(onion)
if ok {
serverStatusCache := cache.(serverStatusCache)
serverStatusCache.messages = uint64(s.GetStatistics().TotalMessages)
log.Debugf("Server Statistics %v %v", onion, serverStatusCache.messages)
sm.servers.Store(onion, serverStatusCache)
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
}
}
case constants.AutoStart:
onion := e.Data[event.Onion]
if onion == identity.Hostname() {
autostart := e.Data[constants.AutoStartEnabled] == event.True
s.ConfigureAutostart(autostart)
cache, ok := sm.servers.Load(onion)
if ok {
serverStatusCache := cache.(serverStatusCache)
serverStatusCache.autostart = autostart
sm.servers.Store(onion, serverStatusCache)
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
}
}
case event.Shutdown:
s.Shutdown()
return
}
}
}
// ListServers iterates through the current collection of servers and their associated
// cache and sends an update to the UI.
func (sm *ServerManager) ListServers(gcd *ui.GrandCentralDispatcher) {
log.Debugf("Listing Servers...")
sm.servers.Range(func(k interface{}, v interface{}) bool {
serverOnion := k.(string)
statusCache := v.(serverStatusCache)
status := 0
if statusCache.online {
status = 1
}
// TODO this doesn't allow for an expansion of key types, this whole flow needs rethinking...
key_types := []model.KeyType{model.KeyTypeServerOnion, model.KeyTypeTokenOnion, model.KeyTypePrivacyPass}
var keyNames []string
var keys []string
keybundle, _ := model.DeserializeAndVerify(statusCache.bundle)
for _, key_type := range key_types {
log.Debugf("Looking up %v %v", key_type, keyNames)
if keybundle.HasKeyType(key_type) {
key, _ := keybundle.GetKey(key_type)
keyNames = append(keyNames, string(key_type))
keys = append(keys, string(key))
}
}
log.Debugf("Updating Server %v %v %v", serverOnion, status, statusCache.messages)
gcd.AddServer(serverOnion, serverOnion, serverOnion, status, statusCache.autostart, "server:"+base64.StdEncoding.EncodeToString(statusCache.bundle), int(statusCache.messages), keyNames, keys)
return true
})
}