merge app/applets; remove engine init from create/load flow; add ability to turn on/off engine
continuous-integration/drone/push Build is pending Details
continuous-integration/drone/pr Build is pending Details

This commit is contained in:
Dan Ballard 2022-08-28 18:19:32 -07:00
parent 82346e399f
commit cce35fd86e
2 changed files with 96 additions and 152 deletions

View File

@ -21,10 +21,12 @@ import (
type application struct {
eventBuses map[string]event.Manager
directory string
coremutex sync.Mutex
appletPeers
appletACN
appletPlugins
peerLock sync.Mutex
peers map[string]peer.CwtchPeer
acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine
appBus event.Manager
appmutex sync.Mutex
@ -37,13 +39,15 @@ type Application interface {
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
DeletePeer(onion string, currentPassword string)
AddPeerPlugin(onion string, pluginID plugins.PluginID)
LaunchPeers()
GetPrimaryBus() event.Manager
GetEventBus(onion string) event.Manager
QueryACNStatus()
QueryACNVersion()
ActivePeerEngine(onion string)
DeactivatePeerEngine(onion string)
ShutdownPeer(string)
Shutdown()
@ -60,12 +64,57 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
app.appletPeers.init()
app.peers = make(map[string]peer.CwtchPeer)
app.acn = acn
statusHandler := app.getACNStatusHandler()
acn.SetStatusCallback(statusHandler)
acn.SetVersionCallback(app.getACNVersionHandler())
prog, status := acn.GetBootstrapStatus()
statusHandler(prog, status)
app.appletACN.init(acn, app.getACNStatusHandler(), app.getACNVersionHandler())
return app
}
// ListProfiles returns a map of onions to their profile's Name
func (app *application) ListProfiles() []string {
var keys []string
app.peerLock.Lock()
defer app.peerLock.Unlock()
for handle := range app.peers {
keys = append(keys, handle)
}
return keys
}
// GetPeer returns a cwtchPeer for a given onion address
func (app *application) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := app.peers[onion]; ok {
return peer
}
return nil
}
func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
if _, exists := ap.plugins.Load(peerid); !exists {
ap.plugins.Store(peerid, []plugins.Plugin{})
}
pluginsinf, _ := ap.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin)
newp, err := plugins.Get(id, bus, acn, peerid)
if err == nil {
newp.Start()
peerPlugins = append(peerPlugins, newp)
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
ap.plugins.Store(peerid, peerPlugins)
} else {
log.Errorf("error adding plugin: %v", err)
}
}
func (app *application) CreateTaggedPeer(name string, password string, tag string) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
@ -83,7 +132,6 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
if tag != "" {
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
@ -98,22 +146,10 @@ func (app *application) DeletePeer(onion string, password string) {
defer app.appmutex.Unlock()
if app.peers[onion].CheckPassword(password) {
app.appletPlugins.ShutdownPeer(onion)
app.plugins.Delete(onion)
app.shutdownPeer(onion)
app.peers[onion].Delete()
// Shutdown and Remove the Engine
app.engines[onion].Shutdown()
delete(app.engines, onion)
app.peers[onion].Shutdown()
app.peers[onion].Delete()
delete(app.peers, onion)
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.coremutex.Lock()
defer app.coremutex.Unlock()
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
@ -198,7 +234,6 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true
}
@ -207,6 +242,22 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
return false
}
/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivePeerEngine(onion string) {
profile := app.GetPeer(onion)
if profile != nil {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
}
}
/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
func (app *application) DeactivatePeerEngine(onion string) {
if engine, exists := app.engines[onion]; exists {
engine.Shutdown()
delete(app.engines, onion)
}
}
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
func (app *application) GetPrimaryBus() event.Manager {
return app.appBus
@ -254,22 +305,41 @@ func (app *application) QueryACNVersion() {
func (app *application) ShutdownPeer(onion string) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
app.shutdownPeer(onion)
}
/// shutdownPeer unlocked helper shutdown peer
func (app *application) shutdownPeer(onion string) {
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
app.peers[onion].Shutdown()
delete(app.peers, onion)
app.engines[onion].Shutdown()
delete(app.engines, onion)
app.appletPlugins.Shutdown()
log.Debugf("shutting down plugins for %v", onion)
pluginsI, ok := app.plugins.Load(onion)
if ok {
plugins := pluginsI.([]plugins.Plugin)
for _, plugin := range plugins {
log.Debugf("shutting down plugin: %v", plugin)
plugin.Shutdown()
}
}
app.plugins.Delete(onion)
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
}
// Shutdown shutsdown all peers of an app and then the tormanager
// Shutdown shutsdown all peers of an app
func (app *application) Shutdown() {
for id, peer := range app.peers {
log.Debugf("Shutting Down Peer %v", id)
peer.Shutdown()
log.Debugf("Shutting Down Plugins for %v", id)
app.appletPlugins.ShutdownPeer(id)
app.plugins.Range(func(k, v interface{}) bool {
log.Debugf("shutting down plugins for %v", k)
app.ShutdownPeer(k.(string))
return true
})
log.Debugf("Shutting Down Engines for %v", id)
app.engines[id].Shutdown()
log.Debugf("Shutting Down Bus for %v", id)

View File

@ -1,126 +0,0 @@
package app
import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"sync"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/peer"
)
type appletPeers struct {
peerLock sync.Mutex
peers map[string]peer.CwtchPeer
launched bool // bit hacky, place holder while we transition to full multi peer support and a better api
}
type appletACN struct {
acn connectivity.ACN
}
type appletPlugins struct {
plugins sync.Map //map[string] []plugins.Plugin
}
// ***** applet ACN
func (a *appletACN) init(acn connectivity.ACN, publishStatus func(int, string), publishVersion func(string)) {
a.acn = acn
acn.SetStatusCallback(publishStatus)
acn.SetVersionCallback(publishVersion)
prog, status := acn.GetBootstrapStatus()
publishStatus(prog, status)
}
func (a *appletACN) Shutdown() {
a.acn.Close()
}
// ***** appletPeers
func (ap *appletPeers) init() {
ap.peers = make(map[string]peer.CwtchPeer)
ap.launched = false
}
// LaunchPeers starts each peer Listening and connecting to peers and groups
func (ap *appletPeers) LaunchPeers() {
log.Debugf("appletPeers LaunchPeers\n")
ap.peerLock.Lock()
defer ap.peerLock.Unlock()
if ap.launched {
return
}
for pid, p := range ap.peers {
log.Debugf("Launching %v\n", pid)
p.Listen()
log.Debugf("done Listen() for %v\n", pid)
p.StartPeersConnections()
log.Debugf("done StartPeersConnections() for %v\n", pid)
}
ap.launched = true
}
// ListProfiles returns a map of onions to their profile's Name
func (ap *appletPeers) ListProfiles() []string {
var keys []string
ap.peerLock.Lock()
defer ap.peerLock.Unlock()
for handle := range ap.peers {
keys = append(keys, handle)
}
return keys
}
// GetPeer returns a cwtchPeer for a given onion address
func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := ap.peers[onion]; ok {
return peer
}
return nil
}
// ***** applet Plugins
func (ap *appletPlugins) Shutdown() {
log.Debugf("shutting down applet plugins...")
ap.plugins.Range(func(k, v interface{}) bool {
log.Debugf("shutting down plugins for %v", k)
ap.ShutdownPeer(k.(string))
return true
})
}
func (ap *appletPlugins) ShutdownPeer(peerid string) {
log.Debugf("shutting down plugins for %v", peerid)
pluginsI, ok := ap.plugins.Load(peerid)
if ok {
plugins := pluginsI.([]plugins.Plugin)
for _, plugin := range plugins {
log.Debugf("shutting down plugin: %v", plugin)
plugin.Shutdown()
}
}
}
func (ap *appletPlugins) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
if _, exists := ap.plugins.Load(peerid); !exists {
ap.plugins.Store(peerid, []plugins.Plugin{})
}
pluginsinf, _ := ap.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin)
newp, err := plugins.Get(id, bus, acn, peerid)
if err == nil {
newp.Start()
peerPlugins = append(peerPlugins, newp)
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
ap.plugins.Store(peerid, peerPlugins)
} else {
log.Errorf("error adding plugin: %v", err)
}
}