timeout_fixes_tokens #460
|
@ -241,7 +241,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
||||
// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
||||
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
|
||||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
|
@ -258,7 +258,7 @@ func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServ
|
|||
}
|
||||
}
|
||||
|
||||
/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
|
||||
// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
|
||||
func (app *application) DeactivatePeerEngine(onion string) {
|
||||
if engine, exists := app.engines[onion]; exists {
|
||||
engine.Shutdown()
|
||||
|
@ -316,7 +316,7 @@ func (app *application) ShutdownPeer(onion string) {
|
|||
app.shutdownPeer(onion)
|
||||
}
|
||||
|
||||
/// shutdownPeer mutex unlocked helper shutdown peer
|
||||
// shutdownPeer mutex unlocked helper shutdown peer
|
||||
func (app *application) shutdownPeer(onion string) {
|
||||
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
||||
app.eventBuses[onion].Shutdown()
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"time"
|
||||
)
|
||||
|
||||
const antispamTickTime = 30 * time.Second
|
||||
|
||||
type antispam struct {
|
||||
bus event.Manager
|
||||
queue event.Queue
|
||||
breakChan chan bool
|
||||
}
|
||||
|
||||
func (a *antispam) Start() {
|
||||
go a.run()
|
||||
}
|
||||
|
||||
func (a antispam) Shutdown() {
|
||||
a.breakChan <- true
|
||||
}
|
||||
|
||||
func (a *antispam) run() {
|
||||
log.Infof("running antispam trigger plugin")
|
||||
for {
|
||||
select {
|
||||
case <-time.After(antispamTickTime):
|
||||
// no fuss, just trigger the check. Downstream will filter out superfluous actions
|
||||
a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{}))
|
||||
continue
|
||||
case <-a.breakChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval
|
||||
func NewAntiSpam(bus event.Manager) Plugin {
|
||||
cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
|
||||
return cr
|
||||
}
|
|
@ -8,8 +8,8 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const tickTime = 5 * time.Second
|
||||
const maxBackoff int = 64 // 320 seconds or ~5 min
|
||||
const tickTime = 30 * time.Second
|
||||
const maxBackoff int = 10 // 320 seconds or ~5 min
|
||||
|
||||
type connectionType int
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ type PluginID int
|
|||
const (
|
||||
CONNECTIONRETRY PluginID = iota
|
||||
NETWORKCHECK
|
||||
ANTISPAM
|
||||
)
|
||||
|
||||
// Plugin is the interface for a plugin
|
||||
|
@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
|
|||
return NewConnectionRetry(bus, onion), nil
|
||||
case NETWORKCHECK:
|
||||
return NewNetworkCheck(onion, bus, acn), nil
|
||||
case ANTISPAM:
|
||||
return NewAntiSpam(bus), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("plugin not defined %v", id)
|
||||
|
|
|
@ -202,6 +202,8 @@ const (
|
|||
DoneStorageMigration = Type("DoneStorageMigration")
|
||||
|
||||
TokenManagerInfo = Type("TokenManagerInfo")
|
||||
TriggerAntispamCheck = Type("TriggerAntispamCheck")
|
||||
MakeAntispamPayment = Type("MakeAntispamPayment")
|
||||
)
|
||||
|
||||
// Field defines common event attributes
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"math/bits"
|
||||
rand2 "math/rand"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"runtime"
|
||||
|
@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true
|
|||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true,
|
||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
|
||||
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true}
|
||||
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true}
|
||||
|
||||
// DefaultEventsToHandle specifies which events will be subscribed to
|
||||
//
|
||||
|
@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{
|
|||
event.NewRetValMessageFromPeer,
|
||||
event.ManifestReceived,
|
||||
event.FileDownloaded,
|
||||
event.TriggerAntispamCheck,
|
||||
}
|
||||
|
||||
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
||||
|
@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
|
|||
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
|
||||
// address.
|
||||
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
||||
go func() {
|
||||
|
||||
// wait a random number of seconds before triggering
|
||||
// this cuts down on contention in the event
|
||||
randWait := time.Duration(rand2.Int() % 60)
|
||||
time.Sleep(randWait * time.Second)
|
||||
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
|
||||
}()
|
||||
}
|
||||
|
||||
// SendInviteToConversation kicks off the invite process
|
||||
|
@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
|||
return errors.New("no keys found for server connection")
|
||||
}
|
||||
|
||||
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
|
||||
// TODO in the future we might want to expose this in CwtchPeer interface
|
||||
// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down
|
||||
// on the number of events (right now it should be minimal)
|
||||
func (cp *cwtchPeer) MakeAntispamPayment(server string) {
|
||||
cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server}))
|
||||
}
|
||||
|
||||
// ResyncServer completely tears down and resyncs a new server connection with the given handle
|
||||
func (cp *cwtchPeer) ResyncServer(handle string) error {
|
||||
ci, err := cp.FetchConversationInfo(handle)
|
||||
|
@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
||||
}
|
||||
}
|
||||
case event.TriggerAntispamCheck:
|
||||
conversations, _ := cp.FetchConversations()
|
||||
for _, conversation := range conversations {
|
||||
if conversation.IsServer() {
|
||||
cp.MakeAntispamPayment(conversation.Handle)
|
||||
}
|
||||
}
|
||||
default:
|
||||
if ev.EventType != "" {
|
||||
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
||||
|
|
|
@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
|||
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
|
||||
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
|
||||
|
||||
// Token Server
|
||||
engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue)
|
||||
|
||||
for peer, authorization := range peerAuthorizations {
|
||||
engine.authorizations.Store(peer, authorization)
|
||||
}
|
||||
|
@ -160,6 +163,8 @@ func (e *engine) eventHandler() {
|
|||
signature = []byte{}
|
||||
}
|
||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||
case event.MakeAntispamPayment:
|
||||
go e.makeAntispamPayment(ev.Data[event.GroupServer])
|
||||
case event.LeaveServer:
|
||||
e.leaveServer(ev.Data[event.GroupServer])
|
||||
case event.DeleteContact:
|
||||
|
@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *engine) makeAntispamPayment(onion string) {
|
||||
log.Debugf("making antispam payment")
|
||||
e.ephemeralServicesLock.Lock()
|
||||
ephemeralService, ok := e.ephemeralServices[onion]
|
||||
e.ephemeralServicesLock.Unlock()
|
||||
|
||||
if ephemeralService == nil || !ok {
|
||||
log.Debugf("could not find associated group for antispam payment")
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := ephemeralService.service.GetConnection(onion)
|
||||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
if ok {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
|
||||
if tokenManager.NumTokens() < 5 {
|
||||
go tokenApp.MakePayment()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||
// needs to be run in a goroutine as will block on Open.
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||
|
|
|
@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
|
|||
}
|
||||
} else {
|
||||
// The auth protocol wasn't completed, we can safely shutdown the connection
|
||||
// send an onclose here because we *may* have triggered this and we want to retry later...
|
||||
pa.OnClose(connection.Hostname())
|
||||
connection.Close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ package connections
|
|||
|
||||
import (
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"cwtch.im/cwtch/utils"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
||||
|
@ -191,6 +191,8 @@ func (ta *TokenBoardClient) MakePayment() error {
|
|||
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
|
||||
if connected && err == nil {
|
||||
log.Debugf("Waiting for successful Token Acquisition...")
|
||||
tp := utils.TimeoutPolicy(time.Second * 30)
|
||||
err := tp.ExecuteAction(func() error {
|
||||
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
||||
if err == nil {
|
||||
powtapp, ok := conn.App().(*applications.TokenApplication)
|
||||
|
@ -202,17 +204,17 @@ func (ta *TokenBoardClient) MakePayment() error {
|
|||
return nil
|
||||
}
|
||||
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
|
||||
return errors.New("invalid cast of powapp. this should never happen")
|
||||
return nil
|
||||
}
|
||||
log.Debugf("could not connect to payment server..trying again: %v", err)
|
||||
return ta.MakePayment()
|
||||
} else if connected && err != nil {
|
||||
log.Debugf("inexplicable error: %v", err)
|
||||
return nil
|
||||
})
|
||||
|
||||
// we timed out
|
||||
if err != nil {
|
||||
ta.connection.Close()
|
||||
}
|
||||
log.Debugf("failed to make a connection. trying again...")
|
||||
// it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice
|
||||
time.Sleep(time.Second)
|
||||
return ta.MakePayment()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// NextToken retrieves the next token
|
||||
|
|
Loading…
Reference in New Issue
this logic here will slow down later contact adds.
What about adding this inside the loop in StartPeersConnections and StartServerConnections as that's where our bulk connection problems are, not later one offs?
Also have you found if this helps at all? my experiment with it hasn't made any noticable change