Merge pull request 'New Cwtch Tool App, Extract MakePayment into a standalone function, Fix race condition in engine.' (#468) from indents_and_tools into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #468
This commit is contained in:
Dan Ballard 2022-10-03 21:03:47 +00:00
commit 8a1f9376e2
7 changed files with 468 additions and 305 deletions

5
.gitignore vendored
View File

@ -28,4 +28,7 @@ tokens1.db
arch/ arch/
testing/encryptedstorage/encrypted_storage_profiles testing/encryptedstorage/encrypted_storage_profiles
testing/encryptedstorage/tordir testing/encryptedstorage/tordir
*.tar.gz *.tar.gz
data-dir-cwtchtool/
tokens
tordir/

View File

@ -1,57 +1,57 @@
package app package app
import ( import (
"cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage" "cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"os" "os"
path "path/filepath" path "path/filepath"
"strconv" "strconv"
"sync" "sync"
) )
type application struct { type application struct {
eventBuses map[string]event.Manager eventBuses map[string]event.Manager
directory string directory string
peerLock sync.Mutex peerLock sync.Mutex
peers map[string]peer.CwtchPeer peers map[string]peer.CwtchPeer
acn connectivity.ACN acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine engines map[string]connections.Engine
appBus event.Manager appBus event.Manager
appmutex sync.Mutex appmutex sync.Mutex
} }
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
type Application interface { type Application interface {
LoadProfiles(password string) LoadProfiles(password string)
CreateTaggedPeer(name string, password string, tag string) CreateTaggedPeer(name string, password string, tag string)
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
DeletePeer(onion string, currentPassword string) DeletePeer(onion string, currentPassword string)
AddPeerPlugin(onion string, pluginID plugins.PluginID) AddPeerPlugin(onion string, pluginID plugins.PluginID)
GetPrimaryBus() event.Manager GetPrimaryBus() event.Manager
GetEventBus(onion string) event.Manager GetEventBus(onion string) event.Manager
QueryACNStatus() QueryACNStatus()
QueryACNVersion() QueryACNVersion()
ActivePeerEngine(onion string, doListen, doPeers, doServers bool) ActivePeerEngine(onion string, doListen, doPeers, doServers bool)
DeactivatePeerEngine(onion string) DeactivatePeerEngine(onion string)
ShutdownPeer(string) ShutdownPeer(string)
Shutdown() Shutdown()
GetPeer(onion string) peer.CwtchPeer GetPeer(onion string) peer.CwtchPeer
ListProfiles() []string ListProfiles() []string
} }
// LoadProfileFn is the function signature for a function in an app that loads a profile // LoadProfileFn is the function signature for a function in an app that loads a profile
@ -59,295 +59,295 @@ type LoadProfileFn func(profile peer.CwtchPeer)
// NewApp creates a new app with some environment awareness and initializes a Tor Manager // NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application { func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory) log.Debugf("NewApp(%v)\n", appDirectory)
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
app.peers = make(map[string]peer.CwtchPeer) app.peers = make(map[string]peer.CwtchPeer)
app.acn = acn app.acn = acn
statusHandler := app.getACNStatusHandler() statusHandler := app.getACNStatusHandler()
acn.SetStatusCallback(statusHandler) acn.SetStatusCallback(statusHandler)
acn.SetVersionCallback(app.getACNVersionHandler()) acn.SetVersionCallback(app.getACNVersionHandler())
prog, status := acn.GetBootstrapStatus() prog, status := acn.GetBootstrapStatus()
statusHandler(prog, status) statusHandler(prog, status)
return app return app
} }
// ListProfiles returns a map of onions to their profile's Name // ListProfiles returns a map of onions to their profile's Name
func (app *application) ListProfiles() []string { func (app *application) ListProfiles() []string {
var keys []string var keys []string
app.peerLock.Lock() app.peerLock.Lock()
defer app.peerLock.Unlock() defer app.peerLock.Unlock()
for handle := range app.peers { for handle := range app.peers {
keys = append(keys, handle) keys = append(keys, handle)
} }
return keys return keys
} }
// GetPeer returns a cwtchPeer for a given onion address // GetPeer returns a cwtchPeer for a given onion address
func (app *application) GetPeer(onion string) peer.CwtchPeer { func (app *application) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := app.peers[onion]; ok { if peer, ok := app.peers[onion]; ok {
return peer return peer
} }
return nil return nil
} }
func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) { func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
if _, exists := ap.plugins.Load(peerid); !exists { if _, exists := ap.plugins.Load(peerid); !exists {
ap.plugins.Store(peerid, []plugins.Plugin{}) ap.plugins.Store(peerid, []plugins.Plugin{})
} }
pluginsinf, _ := ap.plugins.Load(peerid) pluginsinf, _ := ap.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin) peerPlugins := pluginsinf.([]plugins.Plugin)
newp, err := plugins.Get(id, bus, acn, peerid) newp, err := plugins.Get(id, bus, acn, peerid)
if err == nil { if err == nil {
newp.Start() newp.Start()
peerPlugins = append(peerPlugins, newp) peerPlugins = append(peerPlugins, newp)
log.Debugf("storing plugin for %v %v", peerid, peerPlugins) log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
ap.plugins.Store(peerid, peerPlugins) ap.plugins.Store(peerid, peerPlugins)
} else { } else {
log.Errorf("error adding plugin: %v", err) log.Errorf("error adding plugin: %v", err)
} }
} }
func (app *application) CreateTaggedPeer(name string, password string, tag string) { func (app *application) CreateTaggedPeer(name string, password string, tag string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID())
profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password)
if err != nil { if err != nil {
log.Errorf("Error Creating Peer: %v", err) log.Errorf("Error Creating Peer: %v", err)
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
return return
} }
eventBus := event.NewEventManager() eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()]) profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile app.peers[profile.GetOnion()] = profile
if tag != "" { if tag != "" {
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
} }
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
} }
func (app *application) DeletePeer(onion string, password string) { func (app *application) DeletePeer(onion string, password string) {
log.Infof("DeletePeer called on %v\n", onion) log.Infof("DeletePeer called on %v\n", onion)
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
if app.peers[onion].CheckPassword(password) { if app.peers[onion].CheckPassword(password) {
app.shutdownPeer(onion) app.shutdownPeer(onion)
app.peers[onion].Delete() app.peers[onion].Delete()
// Shutdown and Remove the Engine // Shutdown and Remove the Engine
log.Debugf("Delete peer for %v Done\n", onion) log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
return return
} }
app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion))
} }
func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
} }
func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) { func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) {
profileDirectory := path.Join(app.directory, "profiles") profileDirectory := path.Join(app.directory, "profiles")
profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password)
if profile != nil || err == nil { if profile != nil || err == nil {
app.installProfile(profile) app.installProfile(profile)
} }
return profile, err return profile, err
} }
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) { func (app *application) LoadProfiles(password string) {
count := 0 count := 0
migrating := false migrating := false
files, err := os.ReadDir(path.Join(app.directory, "profiles")) files, err := os.ReadDir(path.Join(app.directory, "profiles"))
if err != nil { if err != nil {
log.Errorf("error: cannot read profiles directory: %v", err) log.Errorf("error: cannot read profiles directory: %v", err)
return return
} }
for _, file := range files { for _, file := range files {
// Attempt to load an encrypted database // Attempt to load an encrypted database
profileDirectory := path.Join(app.directory, "profiles", file.Name()) profileDirectory := path.Join(app.directory, "profiles", file.Name())
profile, err := peer.FromEncryptedDatabase(profileDirectory, password) profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
loaded := false loaded := false
if err == nil { if err == nil {
// return the load the profile... // return the load the profile...
log.Infof("loading profile from new-type storage database...") log.Infof("loading profile from new-type storage database...")
loaded = app.installProfile(profile) loaded = app.installProfile(profile)
} else { // On failure attempt to load a legacy profile } else { // On failure attempt to load a legacy profile
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
if err != nil { if err != nil {
continue continue
} }
log.Infof("found legacy profile. importing to new database structure...") log.Infof("found legacy profile. importing to new database structure...")
legacyProfile := profileStore.GetProfileCopy(true) legacyProfile := profileStore.GetProfileCopy(true)
if !migrating { if !migrating {
migrating = true migrating = true
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
} }
cps, err := peer.CreateEncryptedStore(profileDirectory, password) cps, err := peer.CreateEncryptedStore(profileDirectory, password)
if err != nil { if err != nil {
log.Errorf("error creating encrypted store: %v", err) log.Errorf("error creating encrypted store: %v", err)
} }
profile := peer.ImportLegacyProfile(legacyProfile, cps) profile := peer.ImportLegacyProfile(legacyProfile, cps)
loaded = app.installProfile(profile) loaded = app.installProfile(profile)
} }
if loaded { if loaded {
count++ count++
} }
} }
if count == 0 { if count == 0 {
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
app.appBus.Publish(message) app.appBus.Publish(message)
} }
if migrating { if migrating {
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
} }
} }
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
func (app *application) installProfile(profile peer.CwtchPeer) bool { func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
// Only attempt to finalize the profile if we don't have one loaded... // Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil { if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager() eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()]) profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile app.peers[profile.GetOnion()] = profile
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true return true
} }
// Otherwise shutdown the connections // Otherwise shutdown the connections
profile.Shutdown() profile.Shutdown()
return false return false
} }
// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online // ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
if doListen { if doListen {
profile.Listen() profile.Listen()
} }
if doPeers { if doPeers {
profile.StartPeersConnections() profile.StartPeersConnections()
} }
if doServers { if doServers {
profile.StartServerConnections() profile.StartServerConnections()
} }
} }
} }
// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline // DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
func (app *application) DeactivatePeerEngine(onion string) { func (app *application) DeactivatePeerEngine(onion string) {
if engine, exists := app.engines[onion]; exists { if engine, exists := app.engines[onion]; exists {
engine.Shutdown() engine.Shutdown()
delete(app.engines, onion) delete(app.engines, onion)
} }
} }
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
func (app *application) GetPrimaryBus() event.Manager { func (app *application) GetPrimaryBus() event.Manager {
return app.appBus return app.appBus
} }
// GetEventBus returns a cwtchPeer's event bus // GetEventBus returns a cwtchPeer's event bus
func (app *application) GetEventBus(onion string) event.Manager { func (app *application) GetEventBus(onion string) event.Manager {
if manager, ok := app.eventBuses[onion]; ok { if manager, ok := app.eventBuses[onion]; ok {
return manager return manager
} }
return nil return nil
} }
func (app *application) getACNStatusHandler() func(int, string) { func (app *application) getACNStatusHandler() func(int, string) {
return func(progress int, status string) { return func(progress int, status string) {
progStr := strconv.Itoa(progress) progStr := strconv.Itoa(progress)
app.appmutex.Lock() app.appmutex.Lock()
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
for _, bus := range app.eventBuses { for _, bus := range app.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
} }
app.appmutex.Unlock() app.appmutex.Unlock()
} }
} }
func (app *application) getACNVersionHandler() func(string) { func (app *application) getACNVersionHandler() func(string) {
return func(version string) { return func(version string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
} }
} }
func (app *application) QueryACNStatus() { func (app *application) QueryACNStatus() {
prog, status := app.acn.GetBootstrapStatus() prog, status := app.acn.GetBootstrapStatus()
app.getACNStatusHandler()(prog, status) app.getACNStatusHandler()(prog, status)
} }
func (app *application) QueryACNVersion() { func (app *application) QueryACNVersion() {
version := app.acn.GetVersion() version := app.acn.GetVersion()
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
} }
// ShutdownPeer shuts down a peer and removes it from the app's management // ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) { func (app *application) ShutdownPeer(onion string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
app.shutdownPeer(onion) app.shutdownPeer(onion)
} }
// shutdownPeer mutex unlocked helper shutdown peer // shutdownPeer mutex unlocked helper shutdown peer
func (app *application) shutdownPeer(onion string) { func (app *application) shutdownPeer(onion string) {
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.eventBuses[onion].Shutdown() app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion) delete(app.eventBuses, onion)
app.peers[onion].Shutdown() app.peers[onion].Shutdown()
delete(app.peers, onion) delete(app.peers, onion)
if _, ok := app.engines[onion]; ok { if _, ok := app.engines[onion]; ok {
app.engines[onion].Shutdown() app.engines[onion].Shutdown()
delete(app.engines, onion) delete(app.engines, onion)
} }
log.Debugf("shutting down plugins for %v", onion) log.Debugf("shutting down plugins for %v", onion)
pluginsI, ok := app.plugins.Load(onion) pluginsI, ok := app.plugins.Load(onion)
if ok { if ok {
plugins := pluginsI.([]plugins.Plugin) plugins := pluginsI.([]plugins.Plugin)
for _, plugin := range plugins { for _, plugin := range plugins {
log.Debugf("shutting down plugin: %v", plugin) log.Debugf("shutting down plugin: %v", plugin)
plugin.Shutdown() plugin.Shutdown()
} }
} }
app.plugins.Delete(onion) app.plugins.Delete(onion)
} }
// Shutdown shutsdown all peers of an app // Shutdown shutsdown all peers of an app
func (app *application) Shutdown() { func (app *application) Shutdown() {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
for id := range app.peers { for id := range app.peers {
log.Debugf("Shutting Down Peer %v", id) log.Debugf("Shutting Down Peer %v", id)
app.shutdownPeer(id) app.shutdownPeer(id)
} }
log.Debugf("Shutting Down App") log.Debugf("Shutting Down App")
app.appBus.Shutdown() app.appBus.Shutdown()
log.Debugf("Shut Down Complete") log.Debugf("Shut Down Complete")
} }

View File

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
@ -63,7 +64,7 @@ type engine struct {
tokenManagers sync.Map // [tokenService][]TokenManager tokenManagers sync.Map // [tokenService][]TokenManager
shuttingDown bool shuttingDown atomic.Bool
} }
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
@ -139,7 +140,7 @@ func (e *engine) eventHandler() {
for { for {
ev := e.queue.Next() ev := e.queue.Next()
// optimistic shutdown... // optimistic shutdown...
if e.shuttingDown { if e.shuttingDown.Load() {
return return
} }
switch ev.EventType { switch ev.EventType {
@ -297,7 +298,7 @@ func (e *engine) createPeerTemplate() *PeerApp {
// Listen sets up an onion listener to process incoming cwtch messages // Listen sets up an onion listener to process incoming cwtch messages
func (e *engine) listenFn() { func (e *engine) listenFn() {
err := e.service.Listen(e.createPeerTemplate()) err := e.service.Listen(e.createPeerTemplate())
if !e.shuttingDown { if !e.shuttingDown.Load() {
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()})) e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
} }
} }
@ -307,7 +308,8 @@ func (e *engine) Shutdown() {
// don't accept any more events... // don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown() e.service.Shutdown()
e.shuttingDown = true
e.shuttingDown.Store(true)
e.ephemeralServicesLock.Lock() e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock() defer e.ephemeralServicesLock.Unlock()
@ -368,7 +370,7 @@ func (e *engine) makeAntispamPayment(onion string) {
tokenManager := tokenManagerPointer.(*TokenManager) tokenManager := tokenManagerPointer.(*TokenManager)
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
if tokenManager.NumTokens() < 5 { if tokenManager.NumTokens() < 5 {
go tokenApp.MakePayment() go tokenApp.PurchaseTokens()
} }
} }
} }
@ -431,7 +433,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
func (e *engine) ignoreOnShutdown(f func(string)) func(string) { func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
return func(x string) { return func(x string) {
if !e.shuttingDown { if !e.shuttingDown.Load() {
f(x) f(x)
} }
} }
@ -439,7 +441,7 @@ func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) { func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
return func(x, y string) { return func(x, y string) {
if !e.shuttingDown { if !e.shuttingDown.Load() {
f(x, y) f(x, y)
} }
} }
@ -615,7 +617,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
if ok { if ok {
if spent, numtokens := tokenApp.Post(ct, sig); !spent { if spent, numtokens := tokenApp.Post(ct, sig); !spent {
// we failed to post, probably because we ran out of tokens... so make a payment // we failed to post, probably because we ran out of tokens... so make a payment
go tokenApp.MakePayment() go tokenApp.PurchaseTokens()
// backoff // backoff
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
// try again // try again
@ -623,7 +625,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) e.sendMessageToGroup(groupID, server, ct, sig, attempts+1)
} else { } else {
if numtokens < 5 { if numtokens < 5 {
go tokenApp.MakePayment() go tokenApp.PurchaseTokens()
} }
} }
// regardless we return.... // regardless we return....
@ -696,7 +698,7 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
// Send an explicit acknowledgement // Send an explicit acknowledgement
// Every other protocol should have a explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow // Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil { if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()}))
} }

View File

@ -0,0 +1,59 @@
package connections
import (
"cwtch.im/cwtch/utils"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"reflect"
"time"
)
// MakePayment uses the PoW based token protocol to obtain more tokens
func MakePayment(tokenServiceOnion string, tokenService *privacypass.TokenServer, acn connectivity.ACN, handler TokenBoardHandler) error {
log.Debugf("making a payment")
id, sk := primitives.InitializeEphemeralIdentity()
client := new(tor.BaseOnionService)
client.Init(acn, sk, &id)
defer client.Shutdown()
tokenApplication := new(applications.TokenApplication)
tokenApplication.TokenService = tokenService
powTokenApp := new(applications.ApplicationChain).
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
ChainApplication(tokenApplication, applications.HasTokensCapability)
log.Debugf("waiting for successful PoW auth...")
tp := utils.TimeoutPolicy(time.Second * 30)
err := tp.ExecuteAction(func() error {
connected, err := client.Connect(tokenServiceOnion, powTokenApp)
if connected && err == nil {
log.Debugf("waiting for successful token acquisition...")
conn, err := client.WaitForCapabilityOrClose(tokenServiceOnion, applications.HasTokensCapability)
if err == nil {
powtapp, ok := conn.App().(*applications.TokenApplication)
if ok {
log.Debugf("updating tokens")
handler.NewTokenHandler(tokenServiceOnion, powtapp.Tokens)
log.Debugf("transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
conn.Close()
return nil
}
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
return nil
}
return nil
}
return err
})
// we timed out
if err != nil {
log.Debugf("make payment timeout...")
return err
}
return err
}

View File

@ -2,18 +2,13 @@ package connections
import ( import (
"cwtch.im/cwtch/protocol/groups" "cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/utils"
"encoding/json" "encoding/json"
"git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255" "github.com/gtank/ristretto255"
"reflect"
"time"
) )
// TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board // TokenBoardHandler encapsulates all the various handlers a client needs to interact with a token board
@ -73,8 +68,8 @@ func (ta *TokenBoardClient) Init(connection tapir.Connection) {
log.Debugf("Successfully Initialized Connection to %v", connection.Hostname()) log.Debugf("Successfully Initialized Connection to %v", connection.Hostname())
go ta.Listen() go ta.Listen()
// Optimistically acquire many tokens for this server... // Optimistically acquire many tokens for this server...
go ta.MakePayment() go ta.PurchaseTokens()
go ta.MakePayment() go ta.PurchaseTokens()
ta.Replay() ta.Replay()
} else { } else {
connection.Close() connection.Close()
@ -152,7 +147,7 @@ func (ta *TokenBoardClient) Replay() {
// PurchaseTokens purchases the given number of tokens from the server (using the provided payment handler) // PurchaseTokens purchases the given number of tokens from the server (using the provided payment handler)
func (ta *TokenBoardClient) PurchaseTokens() { func (ta *TokenBoardClient) PurchaseTokens() {
ta.MakePayment() MakePayment(ta.tokenServiceOnion, ta.tokenService, ta.acn, ta.tokenBoardHandler)
} }
// Post sends a Post Request to the server // Post sends a Post Request to the server
@ -172,51 +167,6 @@ func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
return false, numTokens return false, numTokens
} }
// MakePayment uses the PoW based token protocol to obtain more tokens
func (ta *TokenBoardClient) MakePayment() error {
log.Debugf("Making a Payment")
id, sk := primitives.InitializeEphemeralIdentity()
client := new(tor.BaseOnionService)
client.Init(ta.acn, sk, &id)
defer client.Shutdown()
tokenApplication := new(applications.TokenApplication)
tokenApplication.TokenService = ta.tokenService
powTokenApp := new(applications.ApplicationChain).
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
ChainApplication(tokenApplication, applications.HasTokensCapability)
log.Debugf("Waiting for successful PoW Auth...")
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
if connected && err == nil {
log.Debugf("Waiting for successful Token Acquisition...")
tp := utils.TimeoutPolicy(time.Second * 30)
err := tp.ExecuteAction(func() error {
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
if err == nil {
powtapp, ok := conn.App().(*applications.TokenApplication)
if ok {
log.Debugf("Updating Tokens")
ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens)
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
conn.Close()
return nil
}
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
return nil
}
return nil
})
// we timed out
if err != nil {
ta.connection.Close()
}
}
return err
}
// NextToken retrieves the next token // NextToken retrieves the next token
func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) { func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) {
token, numtokens, err := ta.tokenBoardHandler.FetchToken(ta.tokenServiceOnion) token, numtokens, err := ta.tokenBoardHandler.FetchToken(ta.tokenServiceOnion)

150
tools/cwtch_tools.go Normal file
View File

@ -0,0 +1,150 @@
package main
import (
"crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255"
mrand "math/rand"
"os"
path "path/filepath"
"strings"
"time"
)
var tool = flag.String("tool", "", "the tool to use")
var bundle = flag.String("bundle", "", "a server bundle to parse")
func main() {
flag.Parse()
if *tool == "" {
fmt.Fprintf(os.Stderr, "usage: cwtch_tools -tool <tool>\n")
flag.PrintDefaults()
}
switch *tool {
case "bundleparse":
bundle, err := bundleParse(*bundle)
if err == nil {
fmt.Printf("bundle: %s\n", bundle.Serialize())
} else {
fmt.Printf("error parsing bundle: %v", err)
}
case "gettokens":
getTokens(*bundle)
default:
fmt.Fprintf(os.Stderr, "unknown tool: %s \n", *tool)
}
}
func bundleParse(bundle string) (*model.KeyBundle, error) {
if strings.HasPrefix(bundle, constants.ServerPrefix) {
bundle, err := base64.StdEncoding.DecodeString(bundle[len(constants.ServerPrefix):])
if err != nil {
fmt.Printf("invalid server bundle: %v\n", err)
}
keyBundle, err := model.DeserializeAndVerify([]byte(bundle))
return keyBundle, err
} else {
return nil, fmt.Errorf("unknown bundle prefix")
}
}
func getTokens(bundle string) {
log.SetLevel(log.LevelDebug)
keyBundle, err := bundleParse(bundle)
if err != nil {
log.Errorf("error parsing keybundle: %v", err)
return
}
privacyPassKey, err := keyBundle.GetKey(model.KeyTypePrivacyPass)
if err != nil {
log.Errorf("error parsing keybundle: %v", err)
return
}
tokenServiceKey, err := keyBundle.GetKey(model.KeyTypeTokenOnion)
if err != nil {
log.Errorf("error parsing keybundle: %v", err)
return
}
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
// we don't need real randomness for the port, just to avoid a possible conflict...
mrand.Seed(int64(time.Now().Nanosecond()))
socksPort := mrand.Intn(1000) + 9051
controlPort := mrand.Intn(1000) + 9052
// generate a random password
key := make([]byte, 64)
_, err = rand.Read(key)
if err != nil {
log.Errorf("insufficient randomness: %v", err)
return
}
torDataDir := "./data-dir-cwtchtool"
if err = os.MkdirAll(torDataDir, 0700); err != nil {
log.Errorf("could not create data dir")
return
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
log.Errorf("Could not start Tor: %v", err)
return
}
acn.WaitTillBootstrapped()
defer acn.Close()
Y := new(ristretto255.Element)
Y.UnmarshalText([]byte(privacyPassKey))
tokenServer := privacypass.NewTokenServer()
tokenServer.Y = Y
err = connections.MakePayment(string(tokenServiceKey), tokenServer, acn, Handler{})
if err != nil {
log.Errorf("timed out trying to get payments")
}
}
type Handler struct {
}
func (h Handler) GroupMessageHandler(server string, gm *groups.EncryptedGroupMessage) {
}
func (h Handler) ServerAuthedHandler(server string) {
}
func (h Handler) ServerSyncedHandler(server string) {
}
func (h Handler) ServerClosedHandler(server string) {
}
func (h Handler) NewTokenHandler(tokenService string, tokens []*privacypass.Token) {
data, _ := json.Marshal(tokens)
os.WriteFile("tokens", data, 0600)
}
func (h Handler) FetchToken(tokenService string) (*privacypass.Token, int, error) {
return nil, 0, nil
}

View File

@ -12,7 +12,6 @@ type TimeoutPolicy time.Duration
// ExecuteAction runs a function and returns an error if it hasn't returned // ExecuteAction runs a function and returns an error if it hasn't returned
// by the time specified by TimeoutPolicy // by the time specified by TimeoutPolicy
func (tp *TimeoutPolicy) ExecuteAction(action func() error) error { func (tp *TimeoutPolicy) ExecuteAction(action func() error) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*tp)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*tp))
defer cancel() defer cancel()