Merge pull request 'timeout_fixes_tokens' (#460) from timeout_fixes_tokens into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #460
This commit is contained in:
Sarah Jamie Lewis 2022-09-10 18:43:58 +00:00
commit 35ca930628
9 changed files with 372 additions and 267 deletions

View File

@ -1,57 +1,57 @@
package app package app
import ( import (
"cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage" "cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"os" "os"
path "path/filepath" path "path/filepath"
"strconv" "strconv"
"sync" "sync"
) )
type application struct { type application struct {
eventBuses map[string]event.Manager eventBuses map[string]event.Manager
directory string directory string
peerLock sync.Mutex peerLock sync.Mutex
peers map[string]peer.CwtchPeer peers map[string]peer.CwtchPeer
acn connectivity.ACN acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine engines map[string]connections.Engine
appBus event.Manager appBus event.Manager
appmutex sync.Mutex appmutex sync.Mutex
} }
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
type Application interface { type Application interface {
LoadProfiles(password string) LoadProfiles(password string)
CreateTaggedPeer(name string, password string, tag string) CreateTaggedPeer(name string, password string, tag string)
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
DeletePeer(onion string, currentPassword string) DeletePeer(onion string, currentPassword string)
AddPeerPlugin(onion string, pluginID plugins.PluginID) AddPeerPlugin(onion string, pluginID plugins.PluginID)
GetPrimaryBus() event.Manager GetPrimaryBus() event.Manager
GetEventBus(onion string) event.Manager GetEventBus(onion string) event.Manager
QueryACNStatus() QueryACNStatus()
QueryACNVersion() QueryACNVersion()
ActivePeerEngine(onion string, doListen, doPeers, doServers bool) ActivePeerEngine(onion string, doListen, doPeers, doServers bool)
DeactivatePeerEngine(onion string) DeactivatePeerEngine(onion string)
ShutdownPeer(string) ShutdownPeer(string)
Shutdown() Shutdown()
GetPeer(onion string) peer.CwtchPeer GetPeer(onion string) peer.CwtchPeer
ListProfiles() []string ListProfiles() []string
} }
// LoadProfileFn is the function signature for a function in an app that loads a profile // LoadProfileFn is the function signature for a function in an app that loads a profile
@ -59,295 +59,295 @@ type LoadProfileFn func(profile peer.CwtchPeer)
// NewApp creates a new app with some environment awareness and initializes a Tor Manager // NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application { func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory) log.Debugf("NewApp(%v)\n", appDirectory)
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
app.peers = make(map[string]peer.CwtchPeer) app.peers = make(map[string]peer.CwtchPeer)
app.acn = acn app.acn = acn
statusHandler := app.getACNStatusHandler() statusHandler := app.getACNStatusHandler()
acn.SetStatusCallback(statusHandler) acn.SetStatusCallback(statusHandler)
acn.SetVersionCallback(app.getACNVersionHandler()) acn.SetVersionCallback(app.getACNVersionHandler())
prog, status := acn.GetBootstrapStatus() prog, status := acn.GetBootstrapStatus()
statusHandler(prog, status) statusHandler(prog, status)
return app return app
} }
// ListProfiles returns a map of onions to their profile's Name // ListProfiles returns a map of onions to their profile's Name
func (app *application) ListProfiles() []string { func (app *application) ListProfiles() []string {
var keys []string var keys []string
app.peerLock.Lock() app.peerLock.Lock()
defer app.peerLock.Unlock() defer app.peerLock.Unlock()
for handle := range app.peers { for handle := range app.peers {
keys = append(keys, handle) keys = append(keys, handle)
} }
return keys return keys
} }
// GetPeer returns a cwtchPeer for a given onion address // GetPeer returns a cwtchPeer for a given onion address
func (app *application) GetPeer(onion string) peer.CwtchPeer { func (app *application) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := app.peers[onion]; ok { if peer, ok := app.peers[onion]; ok {
return peer return peer
} }
return nil return nil
} }
func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) { func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
if _, exists := ap.plugins.Load(peerid); !exists { if _, exists := ap.plugins.Load(peerid); !exists {
ap.plugins.Store(peerid, []plugins.Plugin{}) ap.plugins.Store(peerid, []plugins.Plugin{})
} }
pluginsinf, _ := ap.plugins.Load(peerid) pluginsinf, _ := ap.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin) peerPlugins := pluginsinf.([]plugins.Plugin)
newp, err := plugins.Get(id, bus, acn, peerid) newp, err := plugins.Get(id, bus, acn, peerid)
if err == nil { if err == nil {
newp.Start() newp.Start()
peerPlugins = append(peerPlugins, newp) peerPlugins = append(peerPlugins, newp)
log.Debugf("storing plugin for %v %v", peerid, peerPlugins) log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
ap.plugins.Store(peerid, peerPlugins) ap.plugins.Store(peerid, peerPlugins)
} else { } else {
log.Errorf("error adding plugin: %v", err) log.Errorf("error adding plugin: %v", err)
} }
} }
func (app *application) CreateTaggedPeer(name string, password string, tag string) { func (app *application) CreateTaggedPeer(name string, password string, tag string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID())
profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password)
if err != nil { if err != nil {
log.Errorf("Error Creating Peer: %v", err) log.Errorf("Error Creating Peer: %v", err)
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
return return
} }
eventBus := event.NewEventManager() eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()]) profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile app.peers[profile.GetOnion()] = profile
if tag != "" { if tag != "" {
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
} }
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
} }
func (app *application) DeletePeer(onion string, password string) { func (app *application) DeletePeer(onion string, password string) {
log.Infof("DeletePeer called on %v\n", onion) log.Infof("DeletePeer called on %v\n", onion)
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
if app.peers[onion].CheckPassword(password) { if app.peers[onion].CheckPassword(password) {
app.shutdownPeer(onion) app.shutdownPeer(onion)
app.peers[onion].Delete() app.peers[onion].Delete()
// Shutdown and Remove the Engine // Shutdown and Remove the Engine
log.Debugf("Delete peer for %v Done\n", onion) log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
return return
} }
app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion))
} }
func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
} }
func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) { func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) {
profileDirectory := path.Join(app.directory, "profiles") profileDirectory := path.Join(app.directory, "profiles")
profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password)
if profile != nil || err == nil { if profile != nil || err == nil {
app.installProfile(profile) app.installProfile(profile)
} }
return profile, err return profile, err
} }
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) { func (app *application) LoadProfiles(password string) {
count := 0 count := 0
migrating := false migrating := false
files, err := os.ReadDir(path.Join(app.directory, "profiles")) files, err := os.ReadDir(path.Join(app.directory, "profiles"))
if err != nil { if err != nil {
log.Errorf("error: cannot read profiles directory: %v", err) log.Errorf("error: cannot read profiles directory: %v", err)
return return
} }
for _, file := range files { for _, file := range files {
// Attempt to load an encrypted database // Attempt to load an encrypted database
profileDirectory := path.Join(app.directory, "profiles", file.Name()) profileDirectory := path.Join(app.directory, "profiles", file.Name())
profile, err := peer.FromEncryptedDatabase(profileDirectory, password) profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
loaded := false loaded := false
if err == nil { if err == nil {
// return the load the profile... // return the load the profile...
log.Infof("loading profile from new-type storage database...") log.Infof("loading profile from new-type storage database...")
loaded = app.installProfile(profile) loaded = app.installProfile(profile)
} else { // On failure attempt to load a legacy profile } else { // On failure attempt to load a legacy profile
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
if err != nil { if err != nil {
continue continue
} }
log.Infof("found legacy profile. importing to new database structure...") log.Infof("found legacy profile. importing to new database structure...")
legacyProfile := profileStore.GetProfileCopy(true) legacyProfile := profileStore.GetProfileCopy(true)
if !migrating { if !migrating {
migrating = true migrating = true
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
} }
cps, err := peer.CreateEncryptedStore(profileDirectory, password) cps, err := peer.CreateEncryptedStore(profileDirectory, password)
if err != nil { if err != nil {
log.Errorf("error creating encrypted store: %v", err) log.Errorf("error creating encrypted store: %v", err)
} }
profile := peer.ImportLegacyProfile(legacyProfile, cps) profile := peer.ImportLegacyProfile(legacyProfile, cps)
loaded = app.installProfile(profile) loaded = app.installProfile(profile)
} }
if loaded { if loaded {
count++ count++
} }
} }
if count == 0 { if count == 0 {
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
app.appBus.Publish(message) app.appBus.Publish(message)
} }
if migrating { if migrating {
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
} }
} }
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
func (app *application) installProfile(profile peer.CwtchPeer) bool { func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
// Only attempt to finalize the profile if we don't have one loaded... // Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil { if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager() eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()]) profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile app.peers[profile.GetOnion()] = profile
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true return true
} }
// Otherwise shutdown the connections // Otherwise shutdown the connections
profile.Shutdown() profile.Shutdown()
return false return false
} }
/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online // ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
if doListen { if doListen {
profile.Listen() profile.Listen()
} }
if doPeers { if doPeers {
profile.StartPeersConnections() profile.StartPeersConnections()
} }
if doServers { if doServers {
profile.StartServerConnections() profile.StartServerConnections()
} }
} }
} }
/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline // DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
func (app *application) DeactivatePeerEngine(onion string) { func (app *application) DeactivatePeerEngine(onion string) {
if engine, exists := app.engines[onion]; exists { if engine, exists := app.engines[onion]; exists {
engine.Shutdown() engine.Shutdown()
delete(app.engines, onion) delete(app.engines, onion)
} }
} }
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
func (app *application) GetPrimaryBus() event.Manager { func (app *application) GetPrimaryBus() event.Manager {
return app.appBus return app.appBus
} }
// GetEventBus returns a cwtchPeer's event bus // GetEventBus returns a cwtchPeer's event bus
func (app *application) GetEventBus(onion string) event.Manager { func (app *application) GetEventBus(onion string) event.Manager {
if manager, ok := app.eventBuses[onion]; ok { if manager, ok := app.eventBuses[onion]; ok {
return manager return manager
} }
return nil return nil
} }
func (app *application) getACNStatusHandler() func(int, string) { func (app *application) getACNStatusHandler() func(int, string) {
return func(progress int, status string) { return func(progress int, status string) {
progStr := strconv.Itoa(progress) progStr := strconv.Itoa(progress)
app.appmutex.Lock() app.appmutex.Lock()
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
for _, bus := range app.eventBuses { for _, bus := range app.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
} }
app.appmutex.Unlock() app.appmutex.Unlock()
} }
} }
func (app *application) getACNVersionHandler() func(string) { func (app *application) getACNVersionHandler() func(string) {
return func(version string) { return func(version string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
} }
} }
func (app *application) QueryACNStatus() { func (app *application) QueryACNStatus() {
prog, status := app.acn.GetBootstrapStatus() prog, status := app.acn.GetBootstrapStatus()
app.getACNStatusHandler()(prog, status) app.getACNStatusHandler()(prog, status)
} }
func (app *application) QueryACNVersion() { func (app *application) QueryACNVersion() {
version := app.acn.GetVersion() version := app.acn.GetVersion()
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
} }
// ShutdownPeer shuts down a peer and removes it from the app's management // ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) { func (app *application) ShutdownPeer(onion string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
app.shutdownPeer(onion) app.shutdownPeer(onion)
} }
/// shutdownPeer mutex unlocked helper shutdown peer // shutdownPeer mutex unlocked helper shutdown peer
func (app *application) shutdownPeer(onion string) { func (app *application) shutdownPeer(onion string) {
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.eventBuses[onion].Shutdown() app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion) delete(app.eventBuses, onion)
app.peers[onion].Shutdown() app.peers[onion].Shutdown()
delete(app.peers, onion) delete(app.peers, onion)
if _, ok := app.engines[onion]; ok { if _, ok := app.engines[onion]; ok {
app.engines[onion].Shutdown() app.engines[onion].Shutdown()
delete(app.engines, onion) delete(app.engines, onion)
} }
log.Debugf("shutting down plugins for %v", onion) log.Debugf("shutting down plugins for %v", onion)
pluginsI, ok := app.plugins.Load(onion) pluginsI, ok := app.plugins.Load(onion)
if ok { if ok {
plugins := pluginsI.([]plugins.Plugin) plugins := pluginsI.([]plugins.Plugin)
for _, plugin := range plugins { for _, plugin := range plugins {
log.Debugf("shutting down plugin: %v", plugin) log.Debugf("shutting down plugin: %v", plugin)
plugin.Shutdown() plugin.Shutdown()
} }
} }
app.plugins.Delete(onion) app.plugins.Delete(onion)
} }
// Shutdown shutsdown all peers of an app // Shutdown shutsdown all peers of an app
func (app *application) Shutdown() { func (app *application) Shutdown() {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
for id := range app.peers { for id := range app.peers {
log.Debugf("Shutting Down Peer %v", id) log.Debugf("Shutting Down Peer %v", id)
app.shutdownPeer(id) app.shutdownPeer(id)
} }
log.Debugf("Shutting Down App") log.Debugf("Shutting Down App")
app.appBus.Shutdown() app.appBus.Shutdown()
log.Debugf("Shut Down Complete") log.Debugf("Shut Down Complete")
} }

43
app/plugins/antispam.go Normal file
View File

@ -0,0 +1,43 @@
package plugins
import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/log"
"time"
)
const antispamTickTime = 30 * time.Second
type antispam struct {
bus event.Manager
queue event.Queue
breakChan chan bool
}
func (a *antispam) Start() {
go a.run()
}
func (a antispam) Shutdown() {
a.breakChan <- true
}
func (a *antispam) run() {
log.Infof("running antispam trigger plugin")
for {
select {
case <-time.After(antispamTickTime):
// no fuss, just trigger the check. Downstream will filter out superfluous actions
a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{}))
continue
case <-a.breakChan:
return
}
}
}
// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval
func NewAntiSpam(bus event.Manager) Plugin {
cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
return cr
}

View File

@ -8,8 +8,8 @@ import (
"time" "time"
) )
const tickTime = 5 * time.Second const tickTime = 30 * time.Second
const maxBackoff int = 64 // 320 seconds or ~5 min const maxBackoff int = 10 // 320 seconds or ~5 min
type connectionType int type connectionType int

View File

@ -13,6 +13,7 @@ type PluginID int
const ( const (
CONNECTIONRETRY PluginID = iota CONNECTIONRETRY PluginID = iota
NETWORKCHECK NETWORKCHECK
ANTISPAM
) )
// Plugin is the interface for a plugin // Plugin is the interface for a plugin
@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
return NewConnectionRetry(bus, onion), nil return NewConnectionRetry(bus, onion), nil
case NETWORKCHECK: case NETWORKCHECK:
return NewNetworkCheck(onion, bus, acn), nil return NewNetworkCheck(onion, bus, acn), nil
case ANTISPAM:
return NewAntiSpam(bus), nil
} }
return nil, fmt.Errorf("plugin not defined %v", id) return nil, fmt.Errorf("plugin not defined %v", id)

View File

@ -201,7 +201,9 @@ const (
StartingStorageMiragtion = Type("StartingStorageMigration") StartingStorageMiragtion = Type("StartingStorageMigration")
DoneStorageMigration = Type("DoneStorageMigration") DoneStorageMigration = Type("DoneStorageMigration")
TokenManagerInfo = Type("TokenManagerInfo") TokenManagerInfo = Type("TokenManagerInfo")
TriggerAntispamCheck = Type("TriggerAntispamCheck")
MakeAntispamPayment = Type("MakeAntispamPayment")
) )
// Field defines common event attributes // Field defines common event attributes

View File

@ -14,6 +14,7 @@ import (
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"math/bits" "math/bits"
rand2 "math/rand"
"os" "os"
path "path/filepath" path "path/filepath"
"runtime" "runtime"
@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true} event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true}
// DefaultEventsToHandle specifies which events will be subscribed to // DefaultEventsToHandle specifies which events will be subscribed to
// //
@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{
event.NewRetValMessageFromPeer, event.NewRetValMessageFromPeer,
event.ManifestReceived, event.ManifestReceived,
event.FileDownloaded, event.FileDownloaded,
event.TriggerAntispamCheck,
} }
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
// address. // address.
func (cp *cwtchPeer) PeerWithOnion(onion string) { func (cp *cwtchPeer) PeerWithOnion(onion string) {
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) go func() {
// wait a random number of seconds before triggering
// this cuts down on contention in the event
randWait := time.Duration(rand2.Int() % 60)
time.Sleep(randWait * time.Second)
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
}()
} }
// SendInviteToConversation kicks off the invite process // SendInviteToConversation kicks off the invite process
@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
return errors.New("no keys found for server connection") return errors.New("no keys found for server connection")
} }
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
// TODO in the future we might want to expose this in CwtchPeer interface
// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down
// on the number of events (right now it should be minimal)
func (cp *cwtchPeer) MakeAntispamPayment(server string) {
cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server}))
}
// ResyncServer completely tears down and resyncs a new server connection with the given handle // ResyncServer completely tears down and resyncs a new server connection with the given handle
func (cp *cwtchPeer) ResyncServer(handle string) error { func (cp *cwtchPeer) ResyncServer(handle string) error {
ci, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() {
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
} }
} }
case event.TriggerAntispamCheck:
conversations, _ := cp.FetchConversations()
for _, conversation := range conversations {
if conversation.IsServer() {
cp.MakeAntispamPayment(conversation.Handle)
}
}
default: default:
if ev.EventType != "" { if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)

View File

@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
// Token Server
engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue)
for peer, authorization := range peerAuthorizations { for peer, authorization := range peerAuthorizations {
engine.authorizations.Store(peer, authorization) engine.authorizations.Store(peer, authorization)
} }
@ -160,6 +163,8 @@ func (e *engine) eventHandler() {
signature = []byte{} signature = []byte{}
} }
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
case event.MakeAntispamPayment:
go e.makeAntispamPayment(ev.Data[event.GroupServer])
case event.LeaveServer: case event.LeaveServer:
e.leaveServer(ev.Data[event.GroupServer]) e.leaveServer(ev.Data[event.GroupServer])
case event.DeleteContact: case event.DeleteContact:
@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) {
} }
} }
func (e *engine) makeAntispamPayment(onion string) {
log.Debugf("making antispam payment")
e.ephemeralServicesLock.Lock()
ephemeralService, ok := e.ephemeralServices[onion]
e.ephemeralServicesLock.Unlock()
if ephemeralService == nil || !ok {
log.Debugf("could not find associated group for antispam payment")
return
}
conn, err := ephemeralService.service.GetConnection(onion)
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
tokenManager := tokenManagerPointer.(*TokenManager)
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
if tokenManager.NumTokens() < 5 {
go tokenApp.MakePayment()
}
}
}
}
// peerWithTokenServer is the entry point for cwtchPeer - server relationships // peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open. // needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {

View File

@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
} }
} else { } else {
// The auth protocol wasn't completed, we can safely shutdown the connection // The auth protocol wasn't completed, we can safely shutdown the connection
// send an onclose here because we *may* have triggered this and we want to retry later...
pa.OnClose(connection.Hostname())
connection.Close() connection.Close()
} }
} }

View File

@ -2,8 +2,8 @@ package connections
import ( import (
"cwtch.im/cwtch/protocol/groups" "cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/utils"
"encoding/json" "encoding/json"
"errors"
"git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor" "git.openprivacy.ca/cwtch.im/tapir/networks/tor"
@ -191,28 +191,30 @@ func (ta *TokenBoardClient) MakePayment() error {
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp) connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
if connected && err == nil { if connected && err == nil {
log.Debugf("Waiting for successful Token Acquisition...") log.Debugf("Waiting for successful Token Acquisition...")
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) tp := utils.TimeoutPolicy(time.Second * 30)
if err == nil { err := tp.ExecuteAction(func() error {
powtapp, ok := conn.App().(*applications.TokenApplication) conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
if ok { if err == nil {
log.Debugf("Updating Tokens") powtapp, ok := conn.App().(*applications.TokenApplication)
ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens) if ok {
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) log.Debugf("Updating Tokens")
conn.Close() ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens)
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
conn.Close()
return nil
}
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
return nil return nil
} }
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App())) return nil
return errors.New("invalid cast of powapp. this should never happen") })
// we timed out
if err != nil {
ta.connection.Close()
} }
log.Debugf("could not connect to payment server..trying again: %v", err)
return ta.MakePayment()
} else if connected && err != nil {
log.Debugf("inexplicable error: %v", err)
} }
log.Debugf("failed to make a connection. trying again...") return err
// it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice
time.Sleep(time.Second)
return ta.MakePayment()
} }
// NextToken retrieves the next token // NextToken retrieves the next token