timeout_fixes_tokens #460
486
app/app.go
486
app/app.go
|
@ -1,57 +1,57 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/app/plugins"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/storage"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"cwtch.im/cwtch/app/plugins"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/storage"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type application struct {
|
||||
eventBuses map[string]event.Manager
|
||||
directory string
|
||||
eventBuses map[string]event.Manager
|
||||
directory string
|
||||
|
||||
peerLock sync.Mutex
|
||||
peers map[string]peer.CwtchPeer
|
||||
acn connectivity.ACN
|
||||
plugins sync.Map //map[string] []plugins.Plugin
|
||||
peerLock sync.Mutex
|
||||
peers map[string]peer.CwtchPeer
|
||||
acn connectivity.ACN
|
||||
plugins sync.Map //map[string] []plugins.Plugin
|
||||
|
||||
engines map[string]connections.Engine
|
||||
appBus event.Manager
|
||||
appmutex sync.Mutex
|
||||
engines map[string]connections.Engine
|
||||
appBus event.Manager
|
||||
appmutex sync.Mutex
|
||||
}
|
||||
|
||||
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
|
||||
type Application interface {
|
||||
LoadProfiles(password string)
|
||||
CreateTaggedPeer(name string, password string, tag string)
|
||||
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
|
||||
DeletePeer(onion string, currentPassword string)
|
||||
AddPeerPlugin(onion string, pluginID plugins.PluginID)
|
||||
LoadProfiles(password string)
|
||||
CreateTaggedPeer(name string, password string, tag string)
|
||||
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
|
||||
DeletePeer(onion string, currentPassword string)
|
||||
AddPeerPlugin(onion string, pluginID plugins.PluginID)
|
||||
|
||||
GetPrimaryBus() event.Manager
|
||||
GetEventBus(onion string) event.Manager
|
||||
QueryACNStatus()
|
||||
QueryACNVersion()
|
||||
GetPrimaryBus() event.Manager
|
||||
GetEventBus(onion string) event.Manager
|
||||
QueryACNStatus()
|
||||
QueryACNVersion()
|
||||
|
||||
ActivePeerEngine(onion string, doListen, doPeers, doServers bool)
|
||||
DeactivatePeerEngine(onion string)
|
||||
ActivePeerEngine(onion string, doListen, doPeers, doServers bool)
|
||||
DeactivatePeerEngine(onion string)
|
||||
|
||||
ShutdownPeer(string)
|
||||
Shutdown()
|
||||
ShutdownPeer(string)
|
||||
Shutdown()
|
||||
|
||||
GetPeer(onion string) peer.CwtchPeer
|
||||
ListProfiles() []string
|
||||
GetPeer(onion string) peer.CwtchPeer
|
||||
ListProfiles() []string
|
||||
}
|
||||
|
||||
// LoadProfileFn is the function signature for a function in an app that loads a profile
|
||||
|
@ -59,295 +59,295 @@ type LoadProfileFn func(profile peer.CwtchPeer)
|
|||
|
||||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||
func NewApp(acn connectivity.ACN, appDirectory string) Application {
|
||||
log.Debugf("NewApp(%v)\n", appDirectory)
|
||||
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
|
||||
log.Debugf("NewApp(%v)\n", appDirectory)
|
||||
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
|
||||
|
||||
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
|
||||
app.peers = make(map[string]peer.CwtchPeer)
|
||||
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
|
||||
app.peers = make(map[string]peer.CwtchPeer)
|
||||
|
||||
app.acn = acn
|
||||
statusHandler := app.getACNStatusHandler()
|
||||
acn.SetStatusCallback(statusHandler)
|
||||
acn.SetVersionCallback(app.getACNVersionHandler())
|
||||
prog, status := acn.GetBootstrapStatus()
|
||||
statusHandler(prog, status)
|
||||
app.acn = acn
|
||||
statusHandler := app.getACNStatusHandler()
|
||||
acn.SetStatusCallback(statusHandler)
|
||||
acn.SetVersionCallback(app.getACNVersionHandler())
|
||||
prog, status := acn.GetBootstrapStatus()
|
||||
statusHandler(prog, status)
|
||||
|
||||
return app
|
||||
return app
|
||||
}
|
||||
|
||||
// ListProfiles returns a map of onions to their profile's Name
|
||||
func (app *application) ListProfiles() []string {
|
||||
var keys []string
|
||||
var keys []string
|
||||
|
||||
app.peerLock.Lock()
|
||||
defer app.peerLock.Unlock()
|
||||
for handle := range app.peers {
|
||||
keys = append(keys, handle)
|
||||
}
|
||||
return keys
|
||||
app.peerLock.Lock()
|
||||
defer app.peerLock.Unlock()
|
||||
for handle := range app.peers {
|
||||
keys = append(keys, handle)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// GetPeer returns a cwtchPeer for a given onion address
|
||||
func (app *application) GetPeer(onion string) peer.CwtchPeer {
|
||||
if peer, ok := app.peers[onion]; ok {
|
||||
return peer
|
||||
}
|
||||
return nil
|
||||
if peer, ok := app.peers[onion]; ok {
|
||||
return peer
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
|
||||
if _, exists := ap.plugins.Load(peerid); !exists {
|
||||
ap.plugins.Store(peerid, []plugins.Plugin{})
|
||||
}
|
||||
if _, exists := ap.plugins.Load(peerid); !exists {
|
||||
ap.plugins.Store(peerid, []plugins.Plugin{})
|
||||
}
|
||||
|
||||
pluginsinf, _ := ap.plugins.Load(peerid)
|
||||
peerPlugins := pluginsinf.([]plugins.Plugin)
|
||||
pluginsinf, _ := ap.plugins.Load(peerid)
|
||||
peerPlugins := pluginsinf.([]plugins.Plugin)
|
||||
|
||||
newp, err := plugins.Get(id, bus, acn, peerid)
|
||||
if err == nil {
|
||||
newp.Start()
|
||||
peerPlugins = append(peerPlugins, newp)
|
||||
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
|
||||
ap.plugins.Store(peerid, peerPlugins)
|
||||
} else {
|
||||
log.Errorf("error adding plugin: %v", err)
|
||||
}
|
||||
newp, err := plugins.Get(id, bus, acn, peerid)
|
||||
if err == nil {
|
||||
newp.Start()
|
||||
peerPlugins = append(peerPlugins, newp)
|
||||
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
|
||||
ap.plugins.Store(peerid, peerPlugins)
|
||||
} else {
|
||||
log.Errorf("error adding plugin: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *application) CreateTaggedPeer(name string, password string, tag string) {
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
|
||||
profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID())
|
||||
profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID())
|
||||
|
||||
profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password)
|
||||
if err != nil {
|
||||
log.Errorf("Error Creating Peer: %v", err)
|
||||
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
|
||||
return
|
||||
}
|
||||
profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password)
|
||||
if err != nil {
|
||||
log.Errorf("Error Creating Peer: %v", err)
|
||||
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
eventBus := event.NewEventManager()
|
||||
app.eventBuses[profile.GetOnion()] = eventBus
|
||||
profile.Init(app.eventBuses[profile.GetOnion()])
|
||||
app.peers[profile.GetOnion()] = profile
|
||||
eventBus := event.NewEventManager()
|
||||
app.eventBuses[profile.GetOnion()] = eventBus
|
||||
profile.Init(app.eventBuses[profile.GetOnion()])
|
||||
app.peers[profile.GetOnion()] = profile
|
||||
|
||||
if tag != "" {
|
||||
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
|
||||
}
|
||||
if tag != "" {
|
||||
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
|
||||
}
|
||||
|
||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
|
||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
|
||||
}
|
||||
|
||||
func (app *application) DeletePeer(onion string, password string) {
|
||||
log.Infof("DeletePeer called on %v\n", onion)
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
log.Infof("DeletePeer called on %v\n", onion)
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
|
||||
if app.peers[onion].CheckPassword(password) {
|
||||
app.shutdownPeer(onion)
|
||||
app.peers[onion].Delete()
|
||||
if app.peers[onion].CheckPassword(password) {
|
||||
app.shutdownPeer(onion)
|
||||
app.peers[onion].Delete()
|
||||
|
||||
// Shutdown and Remove the Engine
|
||||
// Shutdown and Remove the Engine
|
||||
|
||||
log.Debugf("Delete peer for %v Done\n", onion)
|
||||
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
|
||||
return
|
||||
}
|
||||
app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion))
|
||||
log.Debugf("Delete peer for %v Done\n", onion)
|
||||
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
|
||||
return
|
||||
}
|
||||
app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion))
|
||||
}
|
||||
|
||||
func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
|
||||
app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
|
||||
app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
|
||||
}
|
||||
|
||||
func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) {
|
||||
profileDirectory := path.Join(app.directory, "profiles")
|
||||
profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password)
|
||||
if profile != nil || err == nil {
|
||||
app.installProfile(profile)
|
||||
}
|
||||
return profile, err
|
||||
profileDirectory := path.Join(app.directory, "profiles")
|
||||
profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password)
|
||||
if profile != nil || err == nil {
|
||||
app.installProfile(profile)
|
||||
}
|
||||
return profile, err
|
||||
}
|
||||
|
||||
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
||||
func (app *application) LoadProfiles(password string) {
|
||||
count := 0
|
||||
migrating := false
|
||||
count := 0
|
||||
migrating := false
|
||||
|
||||
files, err := os.ReadDir(path.Join(app.directory, "profiles"))
|
||||
if err != nil {
|
||||
log.Errorf("error: cannot read profiles directory: %v", err)
|
||||
return
|
||||
}
|
||||
files, err := os.ReadDir(path.Join(app.directory, "profiles"))
|
||||
if err != nil {
|
||||
log.Errorf("error: cannot read profiles directory: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
// Attempt to load an encrypted database
|
||||
profileDirectory := path.Join(app.directory, "profiles", file.Name())
|
||||
profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
|
||||
loaded := false
|
||||
if err == nil {
|
||||
// return the load the profile...
|
||||
log.Infof("loading profile from new-type storage database...")
|
||||
loaded = app.installProfile(profile)
|
||||
} else { // On failure attempt to load a legacy profile
|
||||
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
log.Infof("found legacy profile. importing to new database structure...")
|
||||
legacyProfile := profileStore.GetProfileCopy(true)
|
||||
if !migrating {
|
||||
migrating = true
|
||||
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
|
||||
}
|
||||
for _, file := range files {
|
||||
// Attempt to load an encrypted database
|
||||
profileDirectory := path.Join(app.directory, "profiles", file.Name())
|
||||
profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
|
||||
loaded := false
|
||||
if err == nil {
|
||||
// return the load the profile...
|
||||
log.Infof("loading profile from new-type storage database...")
|
||||
loaded = app.installProfile(profile)
|
||||
} else { // On failure attempt to load a legacy profile
|
||||
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
log.Infof("found legacy profile. importing to new database structure...")
|
||||
legacyProfile := profileStore.GetProfileCopy(true)
|
||||
if !migrating {
|
||||
migrating = true
|
||||
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
|
||||
}
|
||||
|
||||
cps, err := peer.CreateEncryptedStore(profileDirectory, password)
|
||||
if err != nil {
|
||||
log.Errorf("error creating encrypted store: %v", err)
|
||||
}
|
||||
profile := peer.ImportLegacyProfile(legacyProfile, cps)
|
||||
loaded = app.installProfile(profile)
|
||||
}
|
||||
if loaded {
|
||||
count++
|
||||
}
|
||||
}
|
||||
if count == 0 {
|
||||
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
|
||||
app.appBus.Publish(message)
|
||||
}
|
||||
if migrating {
|
||||
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
|
||||
}
|
||||
cps, err := peer.CreateEncryptedStore(profileDirectory, password)
|
||||
if err != nil {
|
||||
log.Errorf("error creating encrypted store: %v", err)
|
||||
}
|
||||
profile := peer.ImportLegacyProfile(legacyProfile, cps)
|
||||
loaded = app.installProfile(profile)
|
||||
}
|
||||
if loaded {
|
||||
count++
|
||||
}
|
||||
}
|
||||
if count == 0 {
|
||||
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
|
||||
app.appBus.Publish(message)
|
||||
}
|
||||
if migrating {
|
||||
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
|
||||
}
|
||||
}
|
||||
|
||||
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
|
||||
func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
|
||||
// Only attempt to finalize the profile if we don't have one loaded...
|
||||
if app.peers[profile.GetOnion()] == nil {
|
||||
eventBus := event.NewEventManager()
|
||||
app.eventBuses[profile.GetOnion()] = eventBus
|
||||
profile.Init(app.eventBuses[profile.GetOnion()])
|
||||
app.peers[profile.GetOnion()] = profile
|
||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
|
||||
return true
|
||||
}
|
||||
// Otherwise shutdown the connections
|
||||
profile.Shutdown()
|
||||
return false
|
||||
// Only attempt to finalize the profile if we don't have one loaded...
|
||||
if app.peers[profile.GetOnion()] == nil {
|
||||
eventBus := event.NewEventManager()
|
||||
app.eventBuses[profile.GetOnion()] = eventBus
|
||||
profile.Init(app.eventBuses[profile.GetOnion()])
|
||||
app.peers[profile.GetOnion()] = profile
|
||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
|
||||
return true
|
||||
}
|
||||
// Otherwise shutdown the connections
|
||||
profile.Shutdown()
|
||||
return false
|
||||
}
|
||||
|
||||
/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
||||
// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
||||
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
|
||||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
|
||||
if doListen {
|
||||
profile.Listen()
|
||||
}
|
||||
if doPeers {
|
||||
profile.StartPeersConnections()
|
||||
}
|
||||
if doServers {
|
||||
profile.StartServerConnections()
|
||||
}
|
||||
}
|
||||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
|
||||
if doListen {
|
||||
profile.Listen()
|
||||
}
|
||||
if doPeers {
|
||||
profile.StartPeersConnections()
|
||||
}
|
||||
if doServers {
|
||||
profile.StartServerConnections()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
|
||||
// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
|
||||
func (app *application) DeactivatePeerEngine(onion string) {
|
||||
if engine, exists := app.engines[onion]; exists {
|
||||
engine.Shutdown()
|
||||
delete(app.engines, onion)
|
||||
}
|
||||
if engine, exists := app.engines[onion]; exists {
|
||||
engine.Shutdown()
|
||||
delete(app.engines, onion)
|
||||
}
|
||||
}
|
||||
|
||||
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
|
||||
func (app *application) GetPrimaryBus() event.Manager {
|
||||
return app.appBus
|
||||
return app.appBus
|
||||
}
|
||||
|
||||
// GetEventBus returns a cwtchPeer's event bus
|
||||
func (app *application) GetEventBus(onion string) event.Manager {
|
||||
if manager, ok := app.eventBuses[onion]; ok {
|
||||
return manager
|
||||
}
|
||||
return nil
|
||||
if manager, ok := app.eventBuses[onion]; ok {
|
||||
return manager
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *application) getACNStatusHandler() func(int, string) {
|
||||
return func(progress int, status string) {
|
||||
progStr := strconv.Itoa(progress)
|
||||
app.appmutex.Lock()
|
||||
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
|
||||
for _, bus := range app.eventBuses {
|
||||
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
|
||||
}
|
||||
app.appmutex.Unlock()
|
||||
}
|
||||
return func(progress int, status string) {
|
||||
progStr := strconv.Itoa(progress)
|
||||
app.appmutex.Lock()
|
||||
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
|
||||
for _, bus := range app.eventBuses {
|
||||
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
|
||||
}
|
||||
app.appmutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (app *application) getACNVersionHandler() func(string) {
|
||||
return func(version string) {
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
|
||||
}
|
||||
return func(version string) {
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
|
||||
}
|
||||
}
|
||||
|
||||
func (app *application) QueryACNStatus() {
|
||||
prog, status := app.acn.GetBootstrapStatus()
|
||||
app.getACNStatusHandler()(prog, status)
|
||||
prog, status := app.acn.GetBootstrapStatus()
|
||||
app.getACNStatusHandler()(prog, status)
|
||||
}
|
||||
|
||||
func (app *application) QueryACNVersion() {
|
||||
version := app.acn.GetVersion()
|
||||
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
|
||||
version := app.acn.GetVersion()
|
||||
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
|
||||
}
|
||||
|
||||
// ShutdownPeer shuts down a peer and removes it from the app's management
|
||||
func (app *application) ShutdownPeer(onion string) {
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
app.shutdownPeer(onion)
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
app.shutdownPeer(onion)
|
||||
}
|
||||
|
||||
/// shutdownPeer mutex unlocked helper shutdown peer
|
||||
// shutdownPeer mutex unlocked helper shutdown peer
|
||||
func (app *application) shutdownPeer(onion string) {
|
||||
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
||||
app.eventBuses[onion].Shutdown()
|
||||
delete(app.eventBuses, onion)
|
||||
app.peers[onion].Shutdown()
|
||||
delete(app.peers, onion)
|
||||
if _, ok := app.engines[onion]; ok {
|
||||
app.engines[onion].Shutdown()
|
||||
delete(app.engines, onion)
|
||||
}
|
||||
log.Debugf("shutting down plugins for %v", onion)
|
||||
pluginsI, ok := app.plugins.Load(onion)
|
||||
if ok {
|
||||
plugins := pluginsI.([]plugins.Plugin)
|
||||
for _, plugin := range plugins {
|
||||
log.Debugf("shutting down plugin: %v", plugin)
|
||||
plugin.Shutdown()
|
||||
}
|
||||
}
|
||||
app.plugins.Delete(onion)
|
||||
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
||||
app.eventBuses[onion].Shutdown()
|
||||
delete(app.eventBuses, onion)
|
||||
app.peers[onion].Shutdown()
|
||||
delete(app.peers, onion)
|
||||
if _, ok := app.engines[onion]; ok {
|
||||
app.engines[onion].Shutdown()
|
||||
delete(app.engines, onion)
|
||||
}
|
||||
log.Debugf("shutting down plugins for %v", onion)
|
||||
pluginsI, ok := app.plugins.Load(onion)
|
||||
if ok {
|
||||
plugins := pluginsI.([]plugins.Plugin)
|
||||
for _, plugin := range plugins {
|
||||
log.Debugf("shutting down plugin: %v", plugin)
|
||||
plugin.Shutdown()
|
||||
}
|
||||
}
|
||||
app.plugins.Delete(onion)
|
||||
}
|
||||
|
||||
// Shutdown shutsdown all peers of an app
|
||||
func (app *application) Shutdown() {
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
for id := range app.peers {
|
||||
log.Debugf("Shutting Down Peer %v", id)
|
||||
app.shutdownPeer(id)
|
||||
}
|
||||
log.Debugf("Shutting Down App")
|
||||
app.appBus.Shutdown()
|
||||
log.Debugf("Shut Down Complete")
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
for id := range app.peers {
|
||||
log.Debugf("Shutting Down Peer %v", id)
|
||||
app.shutdownPeer(id)
|
||||
}
|
||||
log.Debugf("Shutting Down App")
|
||||
app.appBus.Shutdown()
|
||||
log.Debugf("Shut Down Complete")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"time"
|
||||
)
|
||||
|
||||
const antispamTickTime = 30 * time.Second
|
||||
|
||||
type antispam struct {
|
||||
bus event.Manager
|
||||
queue event.Queue
|
||||
breakChan chan bool
|
||||
}
|
||||
|
||||
func (a *antispam) Start() {
|
||||
go a.run()
|
||||
}
|
||||
|
||||
func (a antispam) Shutdown() {
|
||||
a.breakChan <- true
|
||||
}
|
||||
|
||||
func (a *antispam) run() {
|
||||
log.Infof("running antispam trigger plugin")
|
||||
for {
|
||||
select {
|
||||
case <-time.After(antispamTickTime):
|
||||
// no fuss, just trigger the check. Downstream will filter out superfluous actions
|
||||
a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{}))
|
||||
continue
|
||||
case <-a.breakChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval
|
||||
func NewAntiSpam(bus event.Manager) Plugin {
|
||||
cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
|
||||
return cr
|
||||
}
|
|
@ -8,8 +8,8 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const tickTime = 5 * time.Second
|
||||
const maxBackoff int = 64 // 320 seconds or ~5 min
|
||||
const tickTime = 30 * time.Second
|
||||
const maxBackoff int = 10 // 320 seconds or ~5 min
|
||||
|
||||
type connectionType int
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ type PluginID int
|
|||
const (
|
||||
CONNECTIONRETRY PluginID = iota
|
||||
NETWORKCHECK
|
||||
ANTISPAM
|
||||
)
|
||||
|
||||
// Plugin is the interface for a plugin
|
||||
|
@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
|
|||
return NewConnectionRetry(bus, onion), nil
|
||||
case NETWORKCHECK:
|
||||
return NewNetworkCheck(onion, bus, acn), nil
|
||||
case ANTISPAM:
|
||||
return NewAntiSpam(bus), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("plugin not defined %v", id)
|
||||
|
|
|
@ -201,7 +201,9 @@ const (
|
|||
StartingStorageMiragtion = Type("StartingStorageMigration")
|
||||
DoneStorageMigration = Type("DoneStorageMigration")
|
||||
|
||||
TokenManagerInfo = Type("TokenManagerInfo")
|
||||
TokenManagerInfo = Type("TokenManagerInfo")
|
||||
TriggerAntispamCheck = Type("TriggerAntispamCheck")
|
||||
MakeAntispamPayment = Type("MakeAntispamPayment")
|
||||
)
|
||||
|
||||
// Field defines common event attributes
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"math/bits"
|
||||
rand2 "math/rand"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"runtime"
|
||||
|
@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true
|
|||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true,
|
||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
|
||||
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true}
|
||||
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true}
|
||||
|
||||
// DefaultEventsToHandle specifies which events will be subscribed to
|
||||
//
|
||||
|
@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{
|
|||
event.NewRetValMessageFromPeer,
|
||||
event.ManifestReceived,
|
||||
event.FileDownloaded,
|
||||
event.TriggerAntispamCheck,
|
||||
}
|
||||
|
||||
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
||||
|
@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
|
|||
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
|
||||
// address.
|
||||
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
||||
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
|
||||
go func() {
|
||||
|
||||
// wait a random number of seconds before triggering
|
||||
// this cuts down on contention in the event
|
||||
randWait := time.Duration(rand2.Int() % 60)
|
||||
time.Sleep(randWait * time.Second)
|
||||
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
|
||||
}()
|
||||
}
|
||||
|
||||
// SendInviteToConversation kicks off the invite process
|
||||
|
@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
|||
return errors.New("no keys found for server connection")
|
||||
}
|
||||
|
||||
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
|
||||
// TODO in the future we might want to expose this in CwtchPeer interface
|
||||
// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down
|
||||
// on the number of events (right now it should be minimal)
|
||||
func (cp *cwtchPeer) MakeAntispamPayment(server string) {
|
||||
cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server}))
|
||||
}
|
||||
|
||||
// ResyncServer completely tears down and resyncs a new server connection with the given handle
|
||||
func (cp *cwtchPeer) ResyncServer(handle string) error {
|
||||
ci, err := cp.FetchConversationInfo(handle)
|
||||
|
@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
||||
}
|
||||
}
|
||||
case event.TriggerAntispamCheck:
|
||||
conversations, _ := cp.FetchConversations()
|
||||
for _, conversation := range conversations {
|
||||
if conversation.IsServer() {
|
||||
cp.MakeAntispamPayment(conversation.Handle)
|
||||
}
|
||||
}
|
||||
default:
|
||||
if ev.EventType != "" {
|
||||
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
||||
|
|
|
@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
|||
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
|
||||
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
|
||||
|
||||
// Token Server
|
||||
engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue)
|
||||
|
||||
for peer, authorization := range peerAuthorizations {
|
||||
engine.authorizations.Store(peer, authorization)
|
||||
}
|
||||
|
@ -160,6 +163,8 @@ func (e *engine) eventHandler() {
|
|||
signature = []byte{}
|
||||
}
|
||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||
case event.MakeAntispamPayment:
|
||||
go e.makeAntispamPayment(ev.Data[event.GroupServer])
|
||||
case event.LeaveServer:
|
||||
e.leaveServer(ev.Data[event.GroupServer])
|
||||
case event.DeleteContact:
|
||||
|
@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *engine) makeAntispamPayment(onion string) {
|
||||
log.Debugf("making antispam payment")
|
||||
e.ephemeralServicesLock.Lock()
|
||||
ephemeralService, ok := e.ephemeralServices[onion]
|
||||
e.ephemeralServicesLock.Unlock()
|
||||
|
||||
if ephemeralService == nil || !ok {
|
||||
log.Debugf("could not find associated group for antispam payment")
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := ephemeralService.service.GetConnection(onion)
|
||||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
if ok {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
|
||||
if tokenManager.NumTokens() < 5 {
|
||||
go tokenApp.MakePayment()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||
// needs to be run in a goroutine as will block on Open.
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||
|
|
|
@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
|
|||
}
|
||||
} else {
|
||||
// The auth protocol wasn't completed, we can safely shutdown the connection
|
||||
// send an onclose here because we *may* have triggered this and we want to retry later...
|
||||
pa.OnClose(connection.Hostname())
|
||||
connection.Close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ package connections
|
|||
|
||||
import (
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"cwtch.im/cwtch/utils"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
||||
|
@ -191,28 +191,30 @@ func (ta *TokenBoardClient) MakePayment() error {
|
|||
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
|
||||
if connected && err == nil {
|
||||
log.Debugf("Waiting for successful Token Acquisition...")
|
||||
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
||||
if err == nil {
|
||||
powtapp, ok := conn.App().(*applications.TokenApplication)
|
||||
if ok {
|
||||
log.Debugf("Updating Tokens")
|
||||
ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens)
|
||||
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
|
||||
conn.Close()
|
||||
tp := utils.TimeoutPolicy(time.Second * 30)
|
||||
err := tp.ExecuteAction(func() error {
|
||||
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
||||
if err == nil {
|
||||
powtapp, ok := conn.App().(*applications.TokenApplication)
|
||||
if ok {
|
||||
log.Debugf("Updating Tokens")
|
||||
ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens)
|
||||
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
|
||||
conn.Close()
|
||||
return nil
|
||||
}
|
||||
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
|
||||
return nil
|
||||
}
|
||||
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
|
||||
return errors.New("invalid cast of powapp. this should never happen")
|
||||
return nil
|
||||
})
|
||||
|
||||
// we timed out
|
||||
if err != nil {
|
||||
ta.connection.Close()
|
||||
}
|
||||
log.Debugf("could not connect to payment server..trying again: %v", err)
|
||||
return ta.MakePayment()
|
||||
} else if connected && err != nil {
|
||||
log.Debugf("inexplicable error: %v", err)
|
||||
}
|
||||
log.Debugf("failed to make a connection. trying again...")
|
||||
// it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice
|
||||
time.Sleep(time.Second)
|
||||
return ta.MakePayment()
|
||||
return err
|
||||
}
|
||||
|
||||
// NextToken retrieves the next token
|
||||
|
|
Loading…
Reference in New Issue
this logic here will slow down later contact adds.
What about adding this inside the loop in StartPeersConnections and StartServerConnections as that's where our bulk connection problems are, not later one offs?
Also have you found if this helps at all? my experiment with it hasn't made any noticable change