timeout_fixes_tokens #460

Merged
sarah merged 4 commits from timeout_fixes_tokens into master 2022-09-10 18:43:59 +00:00
9 changed files with 372 additions and 267 deletions

View File

@ -241,7 +241,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
return false
}
/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
profile := app.GetPeer(onion)
if profile != nil {
@ -258,7 +258,7 @@ func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServ
}
}
/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
func (app *application) DeactivatePeerEngine(onion string) {
if engine, exists := app.engines[onion]; exists {
engine.Shutdown()
@ -316,7 +316,7 @@ func (app *application) ShutdownPeer(onion string) {
app.shutdownPeer(onion)
}
/// shutdownPeer mutex unlocked helper shutdown peer
// shutdownPeer mutex unlocked helper shutdown peer
func (app *application) shutdownPeer(onion string) {
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.eventBuses[onion].Shutdown()

43
app/plugins/antispam.go Normal file
View File

@ -0,0 +1,43 @@
package plugins
import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/log"
"time"
)
const antispamTickTime = 30 * time.Second
type antispam struct {
bus event.Manager
queue event.Queue
breakChan chan bool
}
func (a *antispam) Start() {
go a.run()
}
func (a antispam) Shutdown() {
a.breakChan <- true
}
func (a *antispam) run() {
log.Infof("running antispam trigger plugin")
for {
select {
case <-time.After(antispamTickTime):
// no fuss, just trigger the check. Downstream will filter out superfluous actions
a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{}))
continue
case <-a.breakChan:
return
}
}
}
// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval
func NewAntiSpam(bus event.Manager) Plugin {
cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
return cr
}

View File

@ -8,8 +8,8 @@ import (
"time"
)
const tickTime = 5 * time.Second
const maxBackoff int = 64 // 320 seconds or ~5 min
const tickTime = 30 * time.Second
const maxBackoff int = 10 // 320 seconds or ~5 min
type connectionType int

View File

@ -13,6 +13,7 @@ type PluginID int
const (
CONNECTIONRETRY PluginID = iota
NETWORKCHECK
ANTISPAM
)
// Plugin is the interface for a plugin
@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
return NewConnectionRetry(bus, onion), nil
case NETWORKCHECK:
return NewNetworkCheck(onion, bus, acn), nil
case ANTISPAM:
return NewAntiSpam(bus), nil
}
return nil, fmt.Errorf("plugin not defined %v", id)

View File

@ -202,6 +202,8 @@ const (
DoneStorageMigration = Type("DoneStorageMigration")
TokenManagerInfo = Type("TokenManagerInfo")
TriggerAntispamCheck = Type("TriggerAntispamCheck")
MakeAntispamPayment = Type("MakeAntispamPayment")
)
// Field defines common event attributes

View File

@ -14,6 +14,7 @@ import (
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"math/bits"
rand2 "math/rand"
"os"
path "path/filepath"
"runtime"
@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true}
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true}
// DefaultEventsToHandle specifies which events will be subscribed to
//
@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{
event.NewRetValMessageFromPeer,
event.ManifestReceived,
event.FileDownloaded,
event.TriggerAntispamCheck,
}
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
// address.
func (cp *cwtchPeer) PeerWithOnion(onion string) {
go func() {
Review

this logic here will slow down later contact adds.

What about adding this inside the loop in StartPeersConnections and StartServerConnections as that's where our bulk connection problems are, not later one offs?

Also have you found if this helps at all? my experiment with it hasn't made any noticable change

this logic here will slow down later contact adds. What about adding this inside the loop in StartPeersConnections and StartServerConnections as that's where our bulk connection problems are, not later one offs? Also have you found if this helps at all? my experiment with it hasn't made any noticable change
// wait a random number of seconds before triggering
// this cuts down on contention in the event
randWait := time.Duration(rand2.Int() % 60)
time.Sleep(randWait * time.Second)
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
}()
}
// SendInviteToConversation kicks off the invite process
@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
return errors.New("no keys found for server connection")
}
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
// TODO in the future we might want to expose this in CwtchPeer interface
// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down
// on the number of events (right now it should be minimal)
func (cp *cwtchPeer) MakeAntispamPayment(server string) {
cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server}))
}
// ResyncServer completely tears down and resyncs a new server connection with the given handle
func (cp *cwtchPeer) ResyncServer(handle string) error {
ci, err := cp.FetchConversationInfo(handle)
@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() {
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
}
}
case event.TriggerAntispamCheck:
conversations, _ := cp.FetchConversations()
for _, conversation := range conversations {
if conversation.IsServer() {
cp.MakeAntispamPayment(conversation.Handle)
}
}
default:
if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)

View File

@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
// Token Server
engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue)
for peer, authorization := range peerAuthorizations {
engine.authorizations.Store(peer, authorization)
}
@ -160,6 +163,8 @@ func (e *engine) eventHandler() {
signature = []byte{}
}
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
case event.MakeAntispamPayment:
go e.makeAntispamPayment(ev.Data[event.GroupServer])
case event.LeaveServer:
e.leaveServer(ev.Data[event.GroupServer])
case event.DeleteContact:
@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) {
}
}
func (e *engine) makeAntispamPayment(onion string) {
log.Debugf("making antispam payment")
e.ephemeralServicesLock.Lock()
ephemeralService, ok := e.ephemeralServices[onion]
e.ephemeralServicesLock.Unlock()
if ephemeralService == nil || !ok {
log.Debugf("could not find associated group for antispam payment")
return
}
conn, err := ephemeralService.service.GetConnection(onion)
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
tokenManager := tokenManagerPointer.(*TokenManager)
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
if tokenManager.NumTokens() < 5 {
go tokenApp.MakePayment()
}
}
}
}
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {

View File

@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
}
} else {
// The auth protocol wasn't completed, we can safely shutdown the connection
// send an onclose here because we *may* have triggered this and we want to retry later...
pa.OnClose(connection.Hostname())
connection.Close()
}
}

View File

@ -2,8 +2,8 @@ package connections
import (
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/utils"
"encoding/json"
"errors"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
@ -191,6 +191,8 @@ func (ta *TokenBoardClient) MakePayment() error {
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
if connected && err == nil {
log.Debugf("Waiting for successful Token Acquisition...")
tp := utils.TimeoutPolicy(time.Second * 30)
err := tp.ExecuteAction(func() error {
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
if err == nil {
powtapp, ok := conn.App().(*applications.TokenApplication)
@ -202,17 +204,17 @@ func (ta *TokenBoardClient) MakePayment() error {
return nil
}
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
return errors.New("invalid cast of powapp. this should never happen")
return nil
}
log.Debugf("could not connect to payment server..trying again: %v", err)
return ta.MakePayment()
} else if connected && err != nil {
log.Debugf("inexplicable error: %v", err)
return nil
})
// we timed out
if err != nil {
ta.connection.Close()
}
log.Debugf("failed to make a connection. trying again...")
// it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice
time.Sleep(time.Second)
return ta.MakePayment()
}
return err
}
// NextToken retrieves the next token