cwtch/app/app.go

599 rivejä
19 KiB
Go

package app
import (
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/extensions"
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/functionality/servers"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"os"
path "path/filepath"
"strconv"
"sync"
)
type application struct {
eventBuses map[string]event.Manager
directory string
peers map[string]peer.CwtchPeer
acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine
appBus event.Manager
eventQueue event.Queue
appmutex sync.Mutex
engineHooks connections.EngineHooks
settings *settings.GlobalSettingsFile
}
func (app *application) IsFeatureEnabled(experiment string) bool {
globalSettings := app.ReadSettings()
if globalSettings.ExperimentsEnabled {
if status, exists := globalSettings.Experiments[experiment]; exists {
return status
}
}
return false
}
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
type Application interface {
LoadProfiles(password string)
CreateProfile(name string, password string, autostart bool)
InstallEngineHooks(engineHooks connections.EngineHooks)
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
EnhancedImportProfile(exportedCwtchFile string, password string) string
DeleteProfile(onion string, currentPassword string)
AddPeerPlugin(onion string, pluginID plugins.PluginID)
GetPrimaryBus() event.Manager
GetEventBus(onion string) event.Manager
QueryACNStatus()
QueryACNVersion()
ConfigureConnections(onion string, doListn, doPeers, doServers bool)
ActivatePeerEngine(onion string)
DeactivatePeerEngine(onion string)
ReadSettings() settings.GlobalSettings
UpdateSettings(settings settings.GlobalSettings)
IsFeatureEnabled(experiment string) bool
ShutdownPeer(string)
Shutdown()
GetPeer(onion string) peer.CwtchPeer
ListProfiles() []string
}
// LoadProfileFn is the function signature for a function in an app that loads a profile
type LoadProfileFn func(profile peer.CwtchPeer)
func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
log.Debugf("NewApp(%v)\n", appDirectory)
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
// Note: we basically presume this doesn't fail. If the file doesn't exist we create it, and as such the
// only plausible error conditions are related to file create e.g. low disk space. If that is the case then
// many other parts of Cwtch are likely to fail also.
globalSettingsFile, err := settings.InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles)
if err != nil {
log.Errorf("error initializing global globalSettingsFile file %s. Global globalSettingsFile might not be loaded or saved", err)
}
return globalSettingsFile
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings, eventQueue: event.NewQueue()}
app.peers = make(map[string]peer.CwtchPeer)
app.engineHooks = connections.DefaultEngineHooks{}
app.acn = acn
statusHandler := app.getACNStatusHandler()
acn.SetStatusCallback(statusHandler)
acn.SetVersionCallback(app.getACNVersionHandler())
prog, status := acn.GetBootstrapStatus()
statusHandler(prog, status)
app.GetPrimaryBus().Subscribe(event.ACNStatus, app.eventQueue)
go app.eventHandler()
return app
}
func (app *application) InstallEngineHooks(engineHooks connections.EngineHooks) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
app.engineHooks = engineHooks
}
func (app *application) ReadSettings() settings.GlobalSettings {
app.appmutex.Lock()
defer app.appmutex.Unlock()
return app.settings.ReadGlobalSettings()
}
func (app *application) UpdateSettings(settings settings.GlobalSettings) {
// don't allow any other application changes while settings update
app.appmutex.Lock()
defer app.appmutex.Unlock()
app.settings.WriteGlobalSettings(settings)
for _, profile := range app.peers {
profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments)
// Explicitly toggle blocking/unblocking of unknown connections for profiles
// that have been loaded.
if settings.BlockUnknownConnections {
profile.BlockUnknownConnections()
} else {
profile.AllowUnknownConnections()
}
profile.NotifySettingsUpdate(settings)
}
}
// ListProfiles returns a map of onions to their profile's Name
func (app *application) ListProfiles() []string {
var keys []string
app.appmutex.Lock()
defer app.appmutex.Unlock()
for handle := range app.peers {
keys = append(keys, handle)
}
return keys
}
// GetPeer returns a cwtchPeer for a given onion address
func (app *application) GetPeer(onion string) peer.CwtchPeer {
app.appmutex.Lock()
defer app.appmutex.Unlock()
if profile, ok := app.peers[onion]; ok {
return profile
}
return nil
}
func (app *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
if _, exists := app.plugins.Load(peerid); !exists {
app.plugins.Store(peerid, []plugins.Plugin{})
}
pluginsinf, _ := app.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin)
for _, plugin := range peerPlugins {
if plugin.Id() == id {
log.Errorf("trying to add second instance of plugin %v to peer %v", id, peerid)
return
}
}
newp, err := plugins.Get(id, bus, acn, peerid)
if err == nil {
newp.Start()
peerPlugins = append(peerPlugins, newp)
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
app.plugins.Store(peerid, peerPlugins)
} else {
log.Errorf("error adding plugin: %v", err)
}
}
func (app *application) CreateProfile(name string, password string, autostart bool) {
autostartVal := constants.True
if !autostart {
autostartVal = constants.False
}
tagVal := constants.ProfileTypeV1Password
if password == DefactoPasswordForUnencryptedProfiles {
tagVal = constants.ProfileTypeV1DefaultPassword
}
app.CreatePeer(name, password, map[attr.ZonedPath]string{
attr.ProfileZone.ConstructZonedPath(constants.Tag): tagVal,
attr.ProfileZone.ConstructZonedPath(constants.PeerAutostart): autostartVal,
})
}
func (app *application) setupPeer(profile peer.CwtchPeer) {
eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus
// Initialize the Peer with the Given Event Bus
app.peers[profile.GetOnion()] = profile
profile.Init(eventBus)
// Update the Peer with the Most Recent Experiment State...
globalSettings := app.settings.ReadGlobalSettings()
profile.UpdateExperiments(globalSettings.ExperimentsEnabled, globalSettings.Experiments)
app.registerHooks(profile)
// Register the Peer With Application Plugins..
app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory
app.AddPeerPlugin(profile.GetOnion(), plugins.HEARTBEAT) // Now Mandatory
}
func (app *application) CreatePeer(name string, password string, attributes map[attr.ZonedPath]string) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID())
profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password)
if err != nil {
log.Errorf("Error Creating Peer: %v", err)
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
return
}
app.setupPeer(profile)
for zp, val := range attributes {
zone, key := attr.ParseZone(zp.ToString())
profile.SetScopedZonedAttribute(attr.LocalScope, zone, key, val)
}
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
}
func (app *application) DeleteProfile(onion string, password string) {
log.Debugf("DeleteProfile called on %v\n", onion)
app.appmutex.Lock()
defer app.appmutex.Unlock()
// short circuit to prevent nil-pointer panic if this function is called twice (or incorrectly)
peer := app.peers[onion]
if peer == nil {
log.Errorf("shutdownPeer called with invalid onion %v", onion)
return
}
// allow a blank password to delete "unencrypted" accounts...
if password == "" {
password = DefactoPasswordForUnencryptedProfiles
}
if peer.CheckPassword(password) {
// soft-shutdown
peer.Shutdown()
// delete the underlying storage
peer.Delete()
// hard shutdown / remove from app
app.shutdownPeer(onion)
// Shutdown and Remove the Engine
log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
return
}
app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion))
}
func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
}
func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) {
profileDirectory := path.Join(app.directory, "profiles")
profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password)
if profile != nil || err == nil {
app.installProfile(profile)
}
return profile, err
}
func (app *application) EnhancedImportProfile(exportedCwtchFile string, password string) string {
_, err := app.ImportProfile(exportedCwtchFile, password)
if err == nil {
return ""
}
return err.Error()
}
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) {
count := 0
migrating := false
files, err := os.ReadDir(path.Join(app.directory, "profiles"))
if err != nil {
log.Errorf("error: cannot read profiles directory: %v", err)
return
}
for _, file := range files {
// Attempt to load an encrypted database
profileDirectory := path.Join(app.directory, "profiles", file.Name())
profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
loaded := false
if err == nil {
// return the load the profile...
log.Infof("loading profile from new-type storage database...")
loaded = app.installProfile(profile)
} else { // On failure attempt to load a legacy profile
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
if err != nil {
continue
}
log.Infof("found legacy profile. importing to new database structure...")
legacyProfile := profileStore.GetProfileCopy(true)
if !migrating {
migrating = true
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
}
cps, err := peer.CreateEncryptedStore(profileDirectory, password)
if err != nil {
log.Errorf("error creating encrypted store: %v", err)
continue
}
profile := peer.ImportLegacyProfile(legacyProfile, cps)
loaded = app.installProfile(profile)
}
if loaded {
count++
}
}
if count == 0 {
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
app.appBus.Publish(message)
}
if migrating {
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
}
}
func (app *application) registerHooks(profile peer.CwtchPeer) {
// Register Hooks
profile.RegisterHook(extensions.ProfileValueExtension{})
profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality))
// Ensure that Profiles have the Most Up to Date Settings...
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
}
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.appmutex.Lock()
defer app.appmutex.Unlock()
// Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil {
app.setupPeer(profile)
// Finalize the Creation of Peer / Notify any Interfaces..
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true
}
// Otherwise shutdown the connections
profile.Shutdown()
return false
}
// ActivatePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion)
if profile != nil {
if _, exists := app.engines[onion]; !exists {
eventBus, exists := app.eventBuses[profile.GetOnion()]
if !exists {
// todo handle this case?
log.Errorf("cannot activate peer engine without an event bus")
return
}
engine, err := profile.GenerateProtocolEngine(app.acn, eventBus, app.engineHooks)
if err == nil {
log.Debugf("restartFlow: Creating a New Protocol Engine...")
app.engines[profile.GetOnion()] = engine
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()
} else {
log.Errorf("corrupted profile detected for %v", onion)
}
}
}
}
// ConfigureConnections autostarts the given kinds of connections.
func (app *application) ConfigureConnections(onion string, listen bool, peers bool, servers bool) {
profile := app.GetPeer(onion)
if profile != nil {
profileBus, exists := app.eventBuses[profile.GetOnion()]
if exists {
// if we are making a decision to ignore
if !peers || !servers {
profileBus.Publish(event.NewEventList(event.PurgeRetries))
}
// enable the engine if it doesn't exist...
// note: this function is idempotent
app.ActivatePeerEngine(onion)
if listen {
profile.Listen()
}
profileBus.Publish(event.NewEventList(event.ResumeRetries))
// do this in the background, for large contact lists it can take a long time...
go profile.StartConnections(peers, servers)
}
} else {
log.Errorf("profile does not exist %v", onion)
}
}
// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
func (app *application) DeactivatePeerEngine(onion string) {
if engine, exists := app.engines[onion]; exists {
engine.Shutdown()
delete(app.engines, onion)
}
}
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
func (app *application) GetPrimaryBus() event.Manager {
return app.appBus
}
// GetEventBus returns a cwtchPeer's event bus
func (app *application) GetEventBus(onion string) event.Manager {
if manager, ok := app.eventBuses[onion]; ok {
return manager
}
return nil
}
func (app *application) getACNStatusHandler() func(int, string) {
return func(progress int, status string) {
progStr := strconv.Itoa(progress)
app.appmutex.Lock()
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
for _, bus := range app.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
}
app.appmutex.Unlock()
}
}
func (app *application) getACNVersionHandler() func(string) {
return func(version string) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
}
}
func (app *application) QueryACNStatus() {
prog, status := app.acn.GetBootstrapStatus()
app.getACNStatusHandler()(prog, status)
}
func (app *application) QueryACNVersion() {
version := app.acn.GetVersion()
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
}
func (app *application) eventHandler() {
acnStatus := -1
for {
e := app.eventQueue.Next()
switch e.EventType {
case event.ACNStatus:
newAcnStatus, err := strconv.Atoi(e.Data[event.Progress])
if err != nil {
break
}
if newAcnStatus == 100 {
if acnStatus != 100 {
for _, onion := range app.ListProfiles() {
profile := app.GetPeer(onion)
if profile != nil {
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
if !exists || autostart == "true" {
if appearOfflineExists && appearOffline == "true" {
// don't configure any connections...
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
app.ConfigureConnections(onion, false, false, false)
} else {
app.ConfigureConnections(onion, true, true, true)
}
}
}
}
}
} else {
if acnStatus == 100 {
// just fell offline
for _, onion := range app.ListProfiles() {
app.DeactivatePeerEngine(onion)
}
}
}
acnStatus = newAcnStatus
default:
// invalid event, signifies shutdown
if e.EventType == "" {
return
}
}
}
}
// ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
app.shutdownPeer(onion)
}
// shutdownPeer mutex unlocked helper shutdown peer
//
//nolint:nilaway
func (app *application) shutdownPeer(onion string) {
// short circuit to prevent nil-pointer panic if this function is called twice (or incorrectly)
onionEventBus := app.eventBuses[onion]
onionPeer := app.peers[onion]
if onionEventBus == nil || onionPeer == nil {
log.Errorf("shutdownPeer called with invalid onion %v", onion)
return
}
// we are an internal locked method, app.eventBuses[onion] cannot fail...
onionEventBus.Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
onionEventBus.Shutdown()
delete(app.eventBuses, onion)
onionPeer.Shutdown()
delete(app.peers, onion)
if onionEngine, ok := app.engines[onion]; ok {
onionEngine.Shutdown()
delete(app.engines, onion)
}
log.Debugf("shutting down plugins for %v", onion)
pluginsI, ok := app.plugins.Load(onion)
if ok {
appPlugins := pluginsI.([]plugins.Plugin)
for _, plugin := range appPlugins {
plugin.Shutdown()
}
}
app.plugins.Delete(onion)
}
// Shutdown shutsdown all peers of an app
func (app *application) Shutdown() {
app.appmutex.Lock()
defer app.appmutex.Unlock()
for id := range app.peers {
log.Debugf("Shutting Down Peer %v", id)
app.shutdownPeer(id)
}
log.Debugf("Shutting Down App")
app.eventQueue.Shutdown()
app.appBus.Shutdown()
log.Debugf("Shut Down Complete")
}