From d455eb64771b9741010b83612fb7c25829d572c0 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sat, 10 Sep 2022 10:15:32 -0700 Subject: [PATCH] Fix Issues with Antispam triggering / Add explicit timeout calls for group servers / token aquisition and optimistic closing for peers --- app/app.go | 6 +-- app/plugins/antispam.go | 43 +++++++++++++++++++++ app/plugins/plugin.go | 3 ++ event/common.go | 4 +- peer/cwtch_peer.go | 27 ++++++++++++- protocol/connections/engine.go | 30 ++++++++++++++ protocol/connections/peerapp.go | 2 + protocol/connections/tokenboardclientapp.go | 40 ++++++++++--------- 8 files changed, 130 insertions(+), 25 deletions(-) create mode 100644 app/plugins/antispam.go diff --git a/app/app.go b/app/app.go index 2d403d3..3e3f9d4 100644 --- a/app/app.go +++ b/app/app.go @@ -241,7 +241,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { return false } -/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online +// / ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { profile := app.GetPeer(onion) if profile != nil { @@ -258,7 +258,7 @@ func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServ } } -/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline +// / DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline func (app *application) DeactivatePeerEngine(onion string) { if engine, exists := app.engines[onion]; exists { engine.Shutdown() @@ -316,7 +316,7 @@ func (app *application) ShutdownPeer(onion string) { app.shutdownPeer(onion) } -/// shutdownPeer mutex unlocked helper shutdown peer +// / shutdownPeer mutex unlocked helper shutdown peer func (app *application) shutdownPeer(onion string) { app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) app.eventBuses[onion].Shutdown() diff --git a/app/plugins/antispam.go b/app/plugins/antispam.go new file mode 100644 index 0000000..467137d --- /dev/null +++ b/app/plugins/antispam.go @@ -0,0 +1,43 @@ +package plugins + +import ( + "cwtch.im/cwtch/event" + "git.openprivacy.ca/openprivacy/log" + "time" +) + +const antispamTickTime = 30 * time.Second + +type antispam struct { + bus event.Manager + queue event.Queue + breakChan chan bool +} + +func (a *antispam) Start() { + go a.run() +} + +func (a antispam) Shutdown() { + a.breakChan <- true +} + +func (a *antispam) run() { + log.Infof("running antispam trigger plugin") + for { + select { + case <-time.After(antispamTickTime): + // no fuss, just trigger the check. Downstream will filter out superfluous actions + a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{})) + continue + case <-a.breakChan: + return + } + } +} + +// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval +func NewAntiSpam(bus event.Manager) Plugin { + cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)} + return cr +} diff --git a/app/plugins/plugin.go b/app/plugins/plugin.go index f5a9915..7ebf773 100644 --- a/app/plugins/plugin.go +++ b/app/plugins/plugin.go @@ -13,6 +13,7 @@ type PluginID int const ( CONNECTIONRETRY PluginID = iota NETWORKCHECK + ANTISPAM ) // Plugin is the interface for a plugin @@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl return NewConnectionRetry(bus, onion), nil case NETWORKCHECK: return NewNetworkCheck(onion, bus, acn), nil + case ANTISPAM: + return NewAntiSpam(bus), nil } return nil, fmt.Errorf("plugin not defined %v", id) diff --git a/event/common.go b/event/common.go index 9fa02bf..9b9f975 100644 --- a/event/common.go +++ b/event/common.go @@ -201,7 +201,9 @@ const ( StartingStorageMiragtion = Type("StartingStorageMigration") DoneStorageMigration = Type("DoneStorageMigration") - TokenManagerInfo = Type("TokenManagerInfo") + TokenManagerInfo = Type("TokenManagerInfo") + TriggerAntispamCheck = Type("TriggerAntispamCheck") + MakeAntispamPayment = Type("MakeAntispamPayment") ) // Field defines common event attributes diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 03b45fb..5d8771f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -14,6 +14,7 @@ import ( "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" "math/bits" + rand2 "math/rand" "os" path "path/filepath" "runtime" @@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, - event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true} + event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true} // DefaultEventsToHandle specifies which events will be subscribed to // @@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{ event.NewRetValMessageFromPeer, event.ManifestReceived, event.FileDownloaded, + event.TriggerAntispamCheck, } // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer @@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState { // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // address. func (cp *cwtchPeer) PeerWithOnion(onion string) { - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) + go func() { + // wait a random number of seconds before triggering + // this cuts down on contention in the event + randWait := time.Duration(rand2.Int() % 60) + time.Sleep(randWait * time.Second) + cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) + }() } // SendInviteToConversation kicks off the invite process @@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error { return errors.New("no keys found for server connection") } +// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails... +// TODO in the future we might want to expose this in CwtchPeer interface +// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down +// on the number of events (right now it should be minimal) +func (cp *cwtchPeer) MakeAntispamPayment(server string) { + cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server})) +} + // ResyncServer completely tears down and resyncs a new server connection with the given handle func (cp *cwtchPeer) ResyncServer(handle string) error { ci, err := cp.FetchConversationInfo(handle) @@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() { cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) } } + case event.TriggerAntispamCheck: + conversations, _ := cp.FetchConversations() + for _, conversation := range conversations { + if conversation.IsServer() { + cp.MakeAntispamPayment(conversation.Handle) + } + } default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index fc29319..0d1f8d3 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) + // Token Server + engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue) + for peer, authorization := range peerAuthorizations { engine.authorizations.Store(peer, authorization) } @@ -160,6 +163,8 @@ func (e *engine) eventHandler() { signature = []byte{} } go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + case event.MakeAntispamPayment: + go e.makeAntispamPayment(ev.Data[event.GroupServer]) case event.LeaveServer: e.leaveServer(ev.Data[event.GroupServer]) case event.DeleteContact: @@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) { } } +func (e *engine) makeAntispamPayment(onion string) { + log.Debugf("making antispam payment") + e.ephemeralServicesLock.Lock() + ephemeralService, ok := e.ephemeralServices[onion] + e.ephemeralServicesLock.Unlock() + + if ephemeralService == nil || !ok { + log.Debugf("could not find associated group for antispam payment") + return + } + + conn, err := ephemeralService.service.GetConnection(onion) + if err == nil { + tokenApp, ok := (conn.App()).(*TokenBoardClient) + if ok { + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) + tokenManager := tokenManagerPointer.(*TokenManager) + log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) + if tokenManager.NumTokens() < 5 { + go tokenApp.MakePayment() + } + } + } +} + // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index dc73290..edd7932 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) { } } else { // The auth protocol wasn't completed, we can safely shutdown the connection + // send an onclose here because we *may* have triggered this and we want to retry later... + pa.OnClose(connection.Hostname()) connection.Close() } } diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index 8a12f23..0bbc59a 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -2,8 +2,8 @@ package connections import ( "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/utils" "encoding/json" - "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/networks/tor" @@ -191,28 +191,30 @@ func (ta *TokenBoardClient) MakePayment() error { connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp) if connected && err == nil { log.Debugf("Waiting for successful Token Acquisition...") - conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) - if err == nil { - powtapp, ok := conn.App().(*applications.TokenApplication) - if ok { - log.Debugf("Updating Tokens") - ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens) - log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) - conn.Close() + tp := utils.TimeoutPolicy(time.Second * 30) + err := tp.ExecuteAction(func() error { + conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) + if err == nil { + powtapp, ok := conn.App().(*applications.TokenApplication) + if ok { + log.Debugf("Updating Tokens") + ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens) + log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) + conn.Close() + return nil + } + log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App())) return nil } - log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App())) - return errors.New("invalid cast of powapp. this should never happen") + return nil + }) + + // we timed out + if err != nil { + ta.connection.Close() } - log.Debugf("could not connect to payment server..trying again: %v", err) - return ta.MakePayment() - } else if connected && err != nil { - log.Debugf("inexplicable error: %v", err) } - log.Debugf("failed to make a connection. trying again...") - // it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice - time.Sleep(time.Second) - return ta.MakePayment() + return err } // NextToken retrieves the next token