262 lines
8.5 KiB
Go
262 lines
8.5 KiB
Go
package servers
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"cwtch.im/cwtch/event"
|
|
"cwtch.im/cwtch/model"
|
|
"cwtch.im/cwtch/server"
|
|
"cwtch.im/ui/go/constants"
|
|
"cwtch.im/ui/go/the"
|
|
"cwtch.im/ui/go/ui"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"git.openprivacy.ca/openprivacy/connectivity"
|
|
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
|
"git.openprivacy.ca/openprivacy/log"
|
|
"io/ioutil"
|
|
"math"
|
|
"math/big"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
)
|
|
|
|
// ServerManager is responsible for managing user operated servers
|
|
type ServerManager struct {
|
|
servers sync.Map
|
|
configDir string
|
|
acn connectivity.ACN
|
|
}
|
|
|
|
type serverStatusCache struct {
|
|
online bool
|
|
autostart bool
|
|
messages uint64
|
|
bundle []byte
|
|
}
|
|
|
|
// LaunchServiceManager is responsible for setting up everything relating to managing servers in the UI.
|
|
func LaunchServiceManager(gcd *ui.GrandCentralDispatcher, acn connectivity.ACN, configDir string) {
|
|
sm := new(ServerManager)
|
|
sm.configDir = configDir
|
|
sm.acn = acn
|
|
sm.Init(gcd)
|
|
}
|
|
|
|
// initializeServerCache sets up a new cache based on the config. Notably it stores a newly signed keybundle that
|
|
// the ui uses to allow people to share server key bundles.
|
|
func initializeServerCache(config server.Config) (*server.Server, serverStatusCache) {
|
|
newServer := new(server.Server)
|
|
newServer.Setup(config)
|
|
newServer.KeyBundle()
|
|
return newServer, serverStatusCache{
|
|
false,
|
|
config.AutoStart,
|
|
0,
|
|
newServer.KeyBundle().Serialize(),
|
|
}
|
|
|
|
}
|
|
|
|
func (sm *ServerManager) Init(gcd *ui.GrandCentralDispatcher) {
|
|
q := event.NewQueue()
|
|
the.AppBus.Subscribe(constants.NewServer, q)
|
|
the.AppBus.Subscribe(constants.ListServers, q)
|
|
the.AppBus.Subscribe(constants.ServerStatusUpdate, q)
|
|
the.AppBus.Subscribe(event.Shutdown, q)
|
|
|
|
// NOTE: Servers don't (yet?) benefit from any kind of encryption of their config (unlike peer profiles)
|
|
// FIXME: We assume servers can be malicious, a compromised server will not be able to derive any additional
|
|
// metadata from a hosted group (assuming that the server isn't also a group participant) - this logic doesn't
|
|
// really hold if the server being hosted as part of the UI - however a large pool of self hosted servers also mitigates
|
|
// many of risks e.g lack of server diversification / availability of servers.
|
|
// Like many parts of the metadata resistant risk model, it is a compromise
|
|
log.Debugf("Reading server directory: %v", sm.configDir)
|
|
os.MkdirAll(sm.configDir, 0700)
|
|
items, _ := ioutil.ReadDir(sm.configDir)
|
|
for _, item := range items {
|
|
if item.IsDir() {
|
|
log.Debugf("Found Server Directory %v / %v", sm.configDir, item.Name())
|
|
config := server.LoadConfig(path.Join(sm.configDir, item.Name()), "serverconfig")
|
|
identity := config.Identity()
|
|
log.Debugf("Launching Server goroutine for %v", identity.Hostname())
|
|
s, cache := initializeServerCache(config)
|
|
sm.servers.Store(identity.Hostname(), cache)
|
|
go sm.runServer(s)
|
|
} else {
|
|
log.Debugf("Found non server directory: %v", item.Name())
|
|
}
|
|
}
|
|
|
|
// Update the UI with all know servers
|
|
sm.ListServers(gcd)
|
|
|
|
for {
|
|
e := q.Next()
|
|
switch e.EventType {
|
|
case constants.NewServer:
|
|
sm.NewServer()
|
|
sm.ListServers(gcd)
|
|
case constants.ListServers:
|
|
sm.ListServers(gcd)
|
|
case constants.ServerStatusUpdate:
|
|
sm.ListServers(gcd)
|
|
case event.Shutdown:
|
|
// we don't need to do anything else here, all the server goroutines will also subscribe to the
|
|
// shutdown event and return nicely.
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewServer createa a new server
|
|
func (sm *ServerManager) NewServer() {
|
|
log.Debugf("Adding a new Server")
|
|
num, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
|
|
if err == nil {
|
|
serverDir := path.Join(sm.configDir, num.String())
|
|
os.MkdirAll(serverDir, 0700)
|
|
config := server.LoadConfig(serverDir, "serverconfig")
|
|
identity := config.Identity()
|
|
s, cache := initializeServerCache(config)
|
|
sm.servers.Store(identity.Hostname(), cache)
|
|
go sm.runServer(s)
|
|
}
|
|
}
|
|
|
|
// runServer sets up an event queue per server to allow them to manage their own state.
|
|
func (sm *ServerManager) runServer(s *server.Server) {
|
|
q := event.NewQueue()
|
|
the.AppBus.Subscribe(constants.StartServer, q)
|
|
the.AppBus.Subscribe(constants.StopServer, q)
|
|
the.AppBus.Subscribe(constants.CheckServerStatus, q)
|
|
the.AppBus.Subscribe(constants.AutoStart, q)
|
|
the.AppBus.Subscribe(event.Shutdown, q)
|
|
|
|
identity := s.Identity()
|
|
|
|
cache, ok := sm.servers.Load(identity.Hostname())
|
|
if ok {
|
|
serverStatusCache := cache.(serverStatusCache)
|
|
if serverStatusCache.autostart {
|
|
the.AppBus.Publish(event.NewEvent(constants.StartServer, map[event.Field]string{event.Onion: identity.Hostname()}))
|
|
}
|
|
}
|
|
|
|
log.Debugf("Launching Server %v", identity.Hostname())
|
|
log.Debugf("Launching Event Bus for Server %v", identity.Hostname())
|
|
for {
|
|
e := q.Next()
|
|
|
|
switch e.EventType {
|
|
case constants.StartServer:
|
|
onion := e.Data[event.Onion]
|
|
if onion == identity.Hostname() {
|
|
if running, _ := s.CheckStatus(); running {
|
|
// we are already running
|
|
log.Debugf("Server %v Already Running", onion)
|
|
continue
|
|
}
|
|
log.Debugf("Running Server %v", onion)
|
|
s.Run(sm.acn)
|
|
|
|
// TODO Remove this whole blob once we can actually create groups in the UI
|
|
// This is for developers who want to test their newly created server with a group.
|
|
group, _ := model.NewGroup(tor.GetTorV3Hostname(identity.PublicKey()))
|
|
group.SignGroup([]byte{})
|
|
invite, err := group.Invite([]byte{})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Secret Debugging Group for Testing: %v\n", "torv3"+base64.StdEncoding.EncodeToString(invite))
|
|
cache, ok := sm.servers.Load(onion)
|
|
if ok {
|
|
serverStatusCache := cache.(serverStatusCache)
|
|
serverStatusCache.online = true
|
|
sm.servers.Store(onion, serverStatusCache)
|
|
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
|
|
}
|
|
}
|
|
case constants.StopServer:
|
|
onion := e.Data[event.Onion]
|
|
if onion == identity.Hostname() {
|
|
s.Shutdown()
|
|
cache, ok := sm.servers.Load(onion)
|
|
if ok {
|
|
serverStatusCache := cache.(serverStatusCache)
|
|
serverStatusCache.online = false
|
|
sm.servers.Store(onion, serverStatusCache)
|
|
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
|
|
}
|
|
}
|
|
case constants.CheckServerStatus:
|
|
onion := e.Data[event.Onion]
|
|
if onion == identity.Hostname() {
|
|
// Kick off a restart
|
|
if _, err := s.CheckStatus(); err != nil {
|
|
s.Shutdown()
|
|
s.Run(sm.acn)
|
|
}
|
|
cache, ok := sm.servers.Load(onion)
|
|
if ok {
|
|
serverStatusCache := cache.(serverStatusCache)
|
|
serverStatusCache.messages = uint64(s.GetStatistics().TotalMessages)
|
|
log.Debugf("Server Statistics %v %v", onion, serverStatusCache.messages)
|
|
sm.servers.Store(onion, serverStatusCache)
|
|
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
|
|
}
|
|
}
|
|
case constants.AutoStart:
|
|
onion := e.Data[event.Onion]
|
|
if onion == identity.Hostname() {
|
|
autostart := e.Data[constants.AutoStartEnabled] == event.True
|
|
s.ConfigureAutostart(autostart)
|
|
cache, ok := sm.servers.Load(onion)
|
|
if ok {
|
|
serverStatusCache := cache.(serverStatusCache)
|
|
serverStatusCache.autostart = autostart
|
|
sm.servers.Store(onion, serverStatusCache)
|
|
the.AppBus.Publish(event.NewEvent(constants.ServerStatusUpdate, map[event.Field]string{}))
|
|
}
|
|
}
|
|
case event.Shutdown:
|
|
s.Shutdown()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// ListServers iterates through the current collection of servers and their associated
|
|
// cache and sends an update to the UI.
|
|
func (sm *ServerManager) ListServers(gcd *ui.GrandCentralDispatcher) {
|
|
log.Debugf("Listing Servers...")
|
|
sm.servers.Range(func(k interface{}, v interface{}) bool {
|
|
serverOnion := k.(string)
|
|
statusCache := v.(serverStatusCache)
|
|
status := 0
|
|
if statusCache.online {
|
|
status = 1
|
|
}
|
|
|
|
// TODO this doesn't allow for an expansion of key types, this whole flow needs rethinking...
|
|
key_types := []model.KeyType{model.KeyTypeServerOnion, model.KeyTypeTokenOnion, model.KeyTypePrivacyPass}
|
|
var keyNames []string
|
|
var keys []string
|
|
|
|
keybundle, _ := model.DeserializeAndVerify(statusCache.bundle)
|
|
|
|
for _, key_type := range key_types {
|
|
log.Debugf("Looking up %v %v", key_type, keyNames)
|
|
if keybundle.HasKeyType(key_type) {
|
|
key, _ := keybundle.GetKey(key_type)
|
|
keyNames = append(keyNames, string(key_type))
|
|
keys = append(keys, string(key))
|
|
}
|
|
}
|
|
|
|
log.Debugf("Updating Server %v %v %v", serverOnion, status, statusCache.messages)
|
|
gcd.AddServer(serverOnion, serverOnion, serverOnion, status, statusCache.autostart, "server:"+base64.StdEncoding.EncodeToString(statusCache.bundle), int(statusCache.messages), keyNames, keys)
|
|
return true
|
|
})
|
|
}
|