diff --git a/app/app.go b/app/app.go index 2d403d3..a4d9cf1 100644 --- a/app/app.go +++ b/app/app.go @@ -1,57 +1,57 @@ package app import ( - "cwtch.im/cwtch/app/plugins" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/model/attr" - "cwtch.im/cwtch/model/constants" - "cwtch.im/cwtch/peer" - "cwtch.im/cwtch/protocol/connections" - "cwtch.im/cwtch/storage" - "git.openprivacy.ca/openprivacy/connectivity" - "git.openprivacy.ca/openprivacy/log" - "os" - path "path/filepath" - "strconv" - "sync" + "cwtch.im/cwtch/app/plugins" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" + "cwtch.im/cwtch/storage" + "git.openprivacy.ca/openprivacy/connectivity" + "git.openprivacy.ca/openprivacy/log" + "os" + path "path/filepath" + "strconv" + "sync" ) type application struct { - eventBuses map[string]event.Manager - directory string + eventBuses map[string]event.Manager + directory string - peerLock sync.Mutex - peers map[string]peer.CwtchPeer - acn connectivity.ACN - plugins sync.Map //map[string] []plugins.Plugin + peerLock sync.Mutex + peers map[string]peer.CwtchPeer + acn connectivity.ACN + plugins sync.Map //map[string] []plugins.Plugin - engines map[string]connections.Engine - appBus event.Manager - appmutex sync.Mutex + engines map[string]connections.Engine + appBus event.Manager + appmutex sync.Mutex } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers type Application interface { - LoadProfiles(password string) - CreateTaggedPeer(name string, password string, tag string) - ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) - DeletePeer(onion string, currentPassword string) - AddPeerPlugin(onion string, pluginID plugins.PluginID) + LoadProfiles(password string) + CreateTaggedPeer(name string, password string, tag string) + ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) + DeletePeer(onion string, currentPassword string) + AddPeerPlugin(onion string, pluginID plugins.PluginID) - GetPrimaryBus() event.Manager - GetEventBus(onion string) event.Manager - QueryACNStatus() - QueryACNVersion() + GetPrimaryBus() event.Manager + GetEventBus(onion string) event.Manager + QueryACNStatus() + QueryACNVersion() - ActivePeerEngine(onion string, doListen, doPeers, doServers bool) - DeactivatePeerEngine(onion string) + ActivePeerEngine(onion string, doListen, doPeers, doServers bool) + DeactivatePeerEngine(onion string) - ShutdownPeer(string) - Shutdown() + ShutdownPeer(string) + Shutdown() - GetPeer(onion string) peer.CwtchPeer - ListProfiles() []string + GetPeer(onion string) peer.CwtchPeer + ListProfiles() []string } // LoadProfileFn is the function signature for a function in an app that loads a profile @@ -59,295 +59,295 @@ type LoadProfileFn func(profile peer.CwtchPeer) // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { - log.Debugf("NewApp(%v)\n", appDirectory) - os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) + log.Debugf("NewApp(%v)\n", appDirectory) + os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) - app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} - app.peers = make(map[string]peer.CwtchPeer) + app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} + app.peers = make(map[string]peer.CwtchPeer) - app.acn = acn - statusHandler := app.getACNStatusHandler() - acn.SetStatusCallback(statusHandler) - acn.SetVersionCallback(app.getACNVersionHandler()) - prog, status := acn.GetBootstrapStatus() - statusHandler(prog, status) + app.acn = acn + statusHandler := app.getACNStatusHandler() + acn.SetStatusCallback(statusHandler) + acn.SetVersionCallback(app.getACNVersionHandler()) + prog, status := acn.GetBootstrapStatus() + statusHandler(prog, status) - return app + return app } // ListProfiles returns a map of onions to their profile's Name func (app *application) ListProfiles() []string { - var keys []string + var keys []string - app.peerLock.Lock() - defer app.peerLock.Unlock() - for handle := range app.peers { - keys = append(keys, handle) - } - return keys + app.peerLock.Lock() + defer app.peerLock.Unlock() + for handle := range app.peers { + keys = append(keys, handle) + } + return keys } // GetPeer returns a cwtchPeer for a given onion address func (app *application) GetPeer(onion string) peer.CwtchPeer { - if peer, ok := app.peers[onion]; ok { - return peer - } - return nil + if peer, ok := app.peers[onion]; ok { + return peer + } + return nil } func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) { - if _, exists := ap.plugins.Load(peerid); !exists { - ap.plugins.Store(peerid, []plugins.Plugin{}) - } + if _, exists := ap.plugins.Load(peerid); !exists { + ap.plugins.Store(peerid, []plugins.Plugin{}) + } - pluginsinf, _ := ap.plugins.Load(peerid) - peerPlugins := pluginsinf.([]plugins.Plugin) + pluginsinf, _ := ap.plugins.Load(peerid) + peerPlugins := pluginsinf.([]plugins.Plugin) - newp, err := plugins.Get(id, bus, acn, peerid) - if err == nil { - newp.Start() - peerPlugins = append(peerPlugins, newp) - log.Debugf("storing plugin for %v %v", peerid, peerPlugins) - ap.plugins.Store(peerid, peerPlugins) - } else { - log.Errorf("error adding plugin: %v", err) - } + newp, err := plugins.Get(id, bus, acn, peerid) + if err == nil { + newp.Start() + peerPlugins = append(peerPlugins, newp) + log.Debugf("storing plugin for %v %v", peerid, peerPlugins) + ap.plugins.Store(peerid, peerPlugins) + } else { + log.Errorf("error adding plugin: %v", err) + } } func (app *application) CreateTaggedPeer(name string, password string, tag string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() - profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) + profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) - profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) - if err != nil { - log.Errorf("Error Creating Peer: %v", err) - app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) - return - } + profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) + if err != nil { + log.Errorf("Error Creating Peer: %v", err) + app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) + return + } - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile - if tag != "" { - profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) - } + if tag != "" { + profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) + } - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) } func (app *application) DeletePeer(onion string, password string) { - log.Infof("DeletePeer called on %v\n", onion) - app.appmutex.Lock() - defer app.appmutex.Unlock() + log.Infof("DeletePeer called on %v\n", onion) + app.appmutex.Lock() + defer app.appmutex.Unlock() - if app.peers[onion].CheckPassword(password) { - app.shutdownPeer(onion) - app.peers[onion].Delete() + if app.peers[onion].CheckPassword(password) { + app.shutdownPeer(onion) + app.peers[onion].Delete() - // Shutdown and Remove the Engine + // Shutdown and Remove the Engine - log.Debugf("Delete peer for %v Done\n", onion) - app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) - return - } - app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) + log.Debugf("Delete peer for %v Done\n", onion) + app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) + return + } + app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) } func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { - app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) + app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) } func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) { - profileDirectory := path.Join(app.directory, "profiles") - profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) - if profile != nil || err == nil { - app.installProfile(profile) - } - return profile, err + profileDirectory := path.Join(app.directory, "profiles") + profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) + if profile != nil || err == nil { + app.installProfile(profile) + } + return profile, err } // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them func (app *application) LoadProfiles(password string) { - count := 0 - migrating := false + count := 0 + migrating := false - files, err := os.ReadDir(path.Join(app.directory, "profiles")) - if err != nil { - log.Errorf("error: cannot read profiles directory: %v", err) - return - } + files, err := os.ReadDir(path.Join(app.directory, "profiles")) + if err != nil { + log.Errorf("error: cannot read profiles directory: %v", err) + return + } - for _, file := range files { - // Attempt to load an encrypted database - profileDirectory := path.Join(app.directory, "profiles", file.Name()) - profile, err := peer.FromEncryptedDatabase(profileDirectory, password) - loaded := false - if err == nil { - // return the load the profile... - log.Infof("loading profile from new-type storage database...") - loaded = app.installProfile(profile) - } else { // On failure attempt to load a legacy profile - profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) - if err != nil { - continue - } - log.Infof("found legacy profile. importing to new database structure...") - legacyProfile := profileStore.GetProfileCopy(true) - if !migrating { - migrating = true - app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) - } + for _, file := range files { + // Attempt to load an encrypted database + profileDirectory := path.Join(app.directory, "profiles", file.Name()) + profile, err := peer.FromEncryptedDatabase(profileDirectory, password) + loaded := false + if err == nil { + // return the load the profile... + log.Infof("loading profile from new-type storage database...") + loaded = app.installProfile(profile) + } else { // On failure attempt to load a legacy profile + profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) + if err != nil { + continue + } + log.Infof("found legacy profile. importing to new database structure...") + legacyProfile := profileStore.GetProfileCopy(true) + if !migrating { + migrating = true + app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) + } - cps, err := peer.CreateEncryptedStore(profileDirectory, password) - if err != nil { - log.Errorf("error creating encrypted store: %v", err) - } - profile := peer.ImportLegacyProfile(legacyProfile, cps) - loaded = app.installProfile(profile) - } - if loaded { - count++ - } - } - if count == 0 { - message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) - app.appBus.Publish(message) - } - if migrating { - app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) - } + cps, err := peer.CreateEncryptedStore(profileDirectory, password) + if err != nil { + log.Errorf("error creating encrypted store: %v", err) + } + profile := peer.ImportLegacyProfile(legacyProfile, cps) + loaded = app.installProfile(profile) + } + if loaded { + count++ + } + } + if count == 0 { + message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) + app.appBus.Publish(message) + } + if migrating { + app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) + } } // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true func (app *application) installProfile(profile peer.CwtchPeer) bool { - app.appmutex.Lock() - defer app.appmutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() - // Only attempt to finalize the profile if we don't have one loaded... - if app.peers[profile.GetOnion()] == nil { - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) - return true - } - // Otherwise shutdown the connections - profile.Shutdown() - return false + // Only attempt to finalize the profile if we don't have one loaded... + if app.peers[profile.GetOnion()] == nil { + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) + return true + } + // Otherwise shutdown the connections + profile.Shutdown() + return false } -/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online +// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { - profile := app.GetPeer(onion) - if profile != nil { - app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) - if doListen { - profile.Listen() - } - if doPeers { - profile.StartPeersConnections() - } - if doServers { - profile.StartServerConnections() - } - } + profile := app.GetPeer(onion) + if profile != nil { + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + if doListen { + profile.Listen() + } + if doPeers { + profile.StartPeersConnections() + } + if doServers { + profile.StartServerConnections() + } + } } -/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline +// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline func (app *application) DeactivatePeerEngine(onion string) { - if engine, exists := app.engines[onion]; exists { - engine.Shutdown() - delete(app.engines, onion) - } + if engine, exists := app.engines[onion]; exists { + engine.Shutdown() + delete(app.engines, onion) + } } // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific func (app *application) GetPrimaryBus() event.Manager { - return app.appBus + return app.appBus } // GetEventBus returns a cwtchPeer's event bus func (app *application) GetEventBus(onion string) event.Manager { - if manager, ok := app.eventBuses[onion]; ok { - return manager - } - return nil + if manager, ok := app.eventBuses[onion]; ok { + return manager + } + return nil } func (app *application) getACNStatusHandler() func(int, string) { - return func(progress int, status string) { - progStr := strconv.Itoa(progress) - app.appmutex.Lock() - app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) - for _, bus := range app.eventBuses { - bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) - } - app.appmutex.Unlock() - } + return func(progress int, status string) { + progStr := strconv.Itoa(progress) + app.appmutex.Lock() + app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) + for _, bus := range app.eventBuses { + bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) + } + app.appmutex.Unlock() + } } func (app *application) getACNVersionHandler() func(string) { - return func(version string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() - app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) - } + return func(version string) { + app.appmutex.Lock() + defer app.appmutex.Unlock() + app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) + } } func (app *application) QueryACNStatus() { - prog, status := app.acn.GetBootstrapStatus() - app.getACNStatusHandler()(prog, status) + prog, status := app.acn.GetBootstrapStatus() + app.getACNStatusHandler()(prog, status) } func (app *application) QueryACNVersion() { - version := app.acn.GetVersion() - app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) + version := app.acn.GetVersion() + app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) } // ShutdownPeer shuts down a peer and removes it from the app's management func (app *application) ShutdownPeer(onion string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() - app.shutdownPeer(onion) + app.appmutex.Lock() + defer app.appmutex.Unlock() + app.shutdownPeer(onion) } -/// shutdownPeer mutex unlocked helper shutdown peer +// shutdownPeer mutex unlocked helper shutdown peer func (app *application) shutdownPeer(onion string) { - app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - app.eventBuses[onion].Shutdown() - delete(app.eventBuses, onion) - app.peers[onion].Shutdown() - delete(app.peers, onion) - if _, ok := app.engines[onion]; ok { - app.engines[onion].Shutdown() - delete(app.engines, onion) - } - log.Debugf("shutting down plugins for %v", onion) - pluginsI, ok := app.plugins.Load(onion) - if ok { - plugins := pluginsI.([]plugins.Plugin) - for _, plugin := range plugins { - log.Debugf("shutting down plugin: %v", plugin) - plugin.Shutdown() - } - } - app.plugins.Delete(onion) + app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) + app.eventBuses[onion].Shutdown() + delete(app.eventBuses, onion) + app.peers[onion].Shutdown() + delete(app.peers, onion) + if _, ok := app.engines[onion]; ok { + app.engines[onion].Shutdown() + delete(app.engines, onion) + } + log.Debugf("shutting down plugins for %v", onion) + pluginsI, ok := app.plugins.Load(onion) + if ok { + plugins := pluginsI.([]plugins.Plugin) + for _, plugin := range plugins { + log.Debugf("shutting down plugin: %v", plugin) + plugin.Shutdown() + } + } + app.plugins.Delete(onion) } // Shutdown shutsdown all peers of an app func (app *application) Shutdown() { - app.appmutex.Lock() - defer app.appmutex.Unlock() - for id := range app.peers { - log.Debugf("Shutting Down Peer %v", id) - app.shutdownPeer(id) - } - log.Debugf("Shutting Down App") - app.appBus.Shutdown() - log.Debugf("Shut Down Complete") + app.appmutex.Lock() + defer app.appmutex.Unlock() + for id := range app.peers { + log.Debugf("Shutting Down Peer %v", id) + app.shutdownPeer(id) + } + log.Debugf("Shutting Down App") + app.appBus.Shutdown() + log.Debugf("Shut Down Complete") } diff --git a/app/plugins/antispam.go b/app/plugins/antispam.go new file mode 100644 index 0000000..467137d --- /dev/null +++ b/app/plugins/antispam.go @@ -0,0 +1,43 @@ +package plugins + +import ( + "cwtch.im/cwtch/event" + "git.openprivacy.ca/openprivacy/log" + "time" +) + +const antispamTickTime = 30 * time.Second + +type antispam struct { + bus event.Manager + queue event.Queue + breakChan chan bool +} + +func (a *antispam) Start() { + go a.run() +} + +func (a antispam) Shutdown() { + a.breakChan <- true +} + +func (a *antispam) run() { + log.Infof("running antispam trigger plugin") + for { + select { + case <-time.After(antispamTickTime): + // no fuss, just trigger the check. Downstream will filter out superfluous actions + a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{})) + continue + case <-a.breakChan: + return + } + } +} + +// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval +func NewAntiSpam(bus event.Manager) Plugin { + cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)} + return cr +} diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index b4274ab..08d7211 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -8,8 +8,8 @@ import ( "time" ) -const tickTime = 5 * time.Second -const maxBackoff int = 64 // 320 seconds or ~5 min +const tickTime = 30 * time.Second +const maxBackoff int = 10 // 320 seconds or ~5 min type connectionType int diff --git a/app/plugins/plugin.go b/app/plugins/plugin.go index f5a9915..7ebf773 100644 --- a/app/plugins/plugin.go +++ b/app/plugins/plugin.go @@ -13,6 +13,7 @@ type PluginID int const ( CONNECTIONRETRY PluginID = iota NETWORKCHECK + ANTISPAM ) // Plugin is the interface for a plugin @@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl return NewConnectionRetry(bus, onion), nil case NETWORKCHECK: return NewNetworkCheck(onion, bus, acn), nil + case ANTISPAM: + return NewAntiSpam(bus), nil } return nil, fmt.Errorf("plugin not defined %v", id) diff --git a/event/common.go b/event/common.go index 9fa02bf..9b9f975 100644 --- a/event/common.go +++ b/event/common.go @@ -201,7 +201,9 @@ const ( StartingStorageMiragtion = Type("StartingStorageMigration") DoneStorageMigration = Type("DoneStorageMigration") - TokenManagerInfo = Type("TokenManagerInfo") + TokenManagerInfo = Type("TokenManagerInfo") + TriggerAntispamCheck = Type("TriggerAntispamCheck") + MakeAntispamPayment = Type("MakeAntispamPayment") ) // Field defines common event attributes diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 03b45fb..5d8771f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -14,6 +14,7 @@ import ( "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" "math/bits" + rand2 "math/rand" "os" path "path/filepath" "runtime" @@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, - event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true} + event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true} // DefaultEventsToHandle specifies which events will be subscribed to // @@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{ event.NewRetValMessageFromPeer, event.ManifestReceived, event.FileDownloaded, + event.TriggerAntispamCheck, } // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer @@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState { // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // address. func (cp *cwtchPeer) PeerWithOnion(onion string) { - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) + go func() { + // wait a random number of seconds before triggering + // this cuts down on contention in the event + randWait := time.Duration(rand2.Int() % 60) + time.Sleep(randWait * time.Second) + cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) + }() } // SendInviteToConversation kicks off the invite process @@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error { return errors.New("no keys found for server connection") } +// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails... +// TODO in the future we might want to expose this in CwtchPeer interface +// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down +// on the number of events (right now it should be minimal) +func (cp *cwtchPeer) MakeAntispamPayment(server string) { + cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server})) +} + // ResyncServer completely tears down and resyncs a new server connection with the given handle func (cp *cwtchPeer) ResyncServer(handle string) error { ci, err := cp.FetchConversationInfo(handle) @@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() { cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) } } + case event.TriggerAntispamCheck: + conversations, _ := cp.FetchConversations() + for _, conversation := range conversations { + if conversation.IsServer() { + cp.MakeAntispamPayment(conversation.Handle) + } + } default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index fc29319..0d1f8d3 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) + // Token Server + engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue) + for peer, authorization := range peerAuthorizations { engine.authorizations.Store(peer, authorization) } @@ -160,6 +163,8 @@ func (e *engine) eventHandler() { signature = []byte{} } go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + case event.MakeAntispamPayment: + go e.makeAntispamPayment(ev.Data[event.GroupServer]) case event.LeaveServer: e.leaveServer(ev.Data[event.GroupServer]) case event.DeleteContact: @@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) { } } +func (e *engine) makeAntispamPayment(onion string) { + log.Debugf("making antispam payment") + e.ephemeralServicesLock.Lock() + ephemeralService, ok := e.ephemeralServices[onion] + e.ephemeralServicesLock.Unlock() + + if ephemeralService == nil || !ok { + log.Debugf("could not find associated group for antispam payment") + return + } + + conn, err := ephemeralService.service.GetConnection(onion) + if err == nil { + tokenApp, ok := (conn.App()).(*TokenBoardClient) + if ok { + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) + tokenManager := tokenManagerPointer.(*TokenManager) + log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) + if tokenManager.NumTokens() < 5 { + go tokenApp.MakePayment() + } + } + } +} + // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index dc73290..edd7932 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) { } } else { // The auth protocol wasn't completed, we can safely shutdown the connection + // send an onclose here because we *may* have triggered this and we want to retry later... + pa.OnClose(connection.Hostname()) connection.Close() } } diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 8a12f23..0bbc59a 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -2,8 +2,8 @@ package connections import ( "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/utils" "encoding/json" - "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/networks/tor" @@ -191,28 +191,30 @@ func (ta *TokenBoardClient) MakePayment() error { connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp) if connected && err == nil { log.Debugf("Waiting for successful Token Acquisition...") - conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) - if err == nil { - powtapp, ok := conn.App().(*applications.TokenApplication) - if ok { - log.Debugf("Updating Tokens") - ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens) - log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) - conn.Close() + tp := utils.TimeoutPolicy(time.Second * 30) + err := tp.ExecuteAction(func() error { + conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) + if err == nil { + powtapp, ok := conn.App().(*applications.TokenApplication) + if ok { + log.Debugf("Updating Tokens") + ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens) + log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) + conn.Close() + return nil + } + log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App())) return nil } - log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App())) - return errors.New("invalid cast of powapp. this should never happen") + return nil + }) + + // we timed out + if err != nil { + ta.connection.Close() } - log.Debugf("could not connect to payment server..trying again: %v", err) - return ta.MakePayment() - } else if connected && err != nil { - log.Debugf("inexplicable error: %v", err) } - log.Debugf("failed to make a connection. trying again...") - // it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice - time.Sleep(time.Second) - return ta.MakePayment() + return err } // NextToken retrieves the next token