fix contact Retry timeout logic
continuous-integration/drone/pr Build is pending Details

This commit is contained in:
Dan Ballard 2023-07-07 00:05:45 -07:00
parent def585b23b
commit eb0636a229
2 changed files with 67 additions and 8 deletions

View File

@ -104,6 +104,10 @@ func (cq *connectionQueue) dequeue() *contact {
return c return c
} }
func (cq *connectionQueue) len() int {
return len(cq.queue)
}
type contactRetry struct { type contactRetry struct {
bus event.Manager bus event.Manager
queue event.Queue queue event.Queue
@ -117,14 +121,13 @@ type contactRetry struct {
acnProgress int acnProgress int
connections sync.Map //[string]*contact connections sync.Map //[string]*contact
connCount int
pendingQueue *connectionQueue pendingQueue *connectionQueue
priorityQueue *connectionQueue priorityQueue *connectionQueue
} }
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin { func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
return cr return cr
} }
@ -182,7 +185,6 @@ func (cr *contactRetry) run() {
log.Debugf("restartFlow time to queue!!") log.Debugf("restartFlow time to queue!!")
cr.requeueReady() cr.requeueReady()
connectingCount := cr.connectingCount() connectingCount := cr.connectingCount()
log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
// do priority connections first... // do priority connections first...
for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 { for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 {
@ -301,7 +303,6 @@ func (cr *contactRetry) processStatus() {
cr.ACNUpTime = time.Now() cr.ACNUpTime = time.Now()
// reset all of the queues... // reset all of the queues...
cr.connCount = 0
cr.priorityQueue = newConnectionQueue() cr.priorityQueue = newConnectionQueue()
cr.pendingQueue = newConnectionQueue() cr.pendingQueue = newConnectionQueue()
@ -340,8 +341,14 @@ func (cr *contactRetry) requeueReady() {
var retryable []*contact var retryable []*contact
throughPutPerMin := cr.maxTorCircuitsPending() / (circuitTimeoutSecs / 60) throughPutPerMin := int((float64(cr.maxTorCircuitsPending()) / float64(circuitTimeoutSecs)) * 60.0)
adjustedBaseTimeout := cr.connCount / throughPutPerMin * 60 queueCount := cr.priorityQueue.len() + cr.pendingQueue.len()
// adjustedBaseTimeout = basetimeoust * (queuedItemsCount / throughPutPerMin)
// when less items are queued than through put it'll lower adjustedBaseTimeOut, but that'll be reset in the next block
// when more items are queued it will increase the timeout, to a max of MaxBaseTimeoutSec (enforced in the next block)
adjustedBaseTimeout := circuitTimeoutSecs * (queueCount / throughPutPerMin)
// circuitTimeoutSecs (120s) < adjustedBaseTimeout < MaxBaseTimeoutSec (300s)
if adjustedBaseTimeout < circuitTimeoutSecs { if adjustedBaseTimeout < circuitTimeoutSecs {
adjustedBaseTimeout = circuitTimeoutSecs adjustedBaseTimeout = circuitTimeoutSecs
} else if adjustedBaseTimeout > MaxBaseTimeoutSec { } else if adjustedBaseTimeout > MaxBaseTimeoutSec {
@ -388,14 +395,12 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta
if _, exists := cr.connections.Load(id); !exists { if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false} p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false}
cr.connections.Store(id, p) cr.connections.Store(id, p)
cr.connCount += 1
return return
} else { } else {
// we have rerequested this connnection. Force set the queued parameter to true. // we have rerequested this connnection. Force set the queued parameter to true.
p, _ := cr.connections.Load(id) p, _ := cr.connections.Load(id)
if !p.(*contact).queued { if !p.(*contact).queued {
p.(*contact).queued = true p.(*contact).queued = true
cr.connCount += 1
} }
} }
} }

View File

@ -14,6 +14,7 @@ import (
// We are invasively checking the internal state of the retry plugin and accessing pointers from another // We are invasively checking the internal state of the retry plugin and accessing pointers from another
// thread. // thread.
// We could build an entire thread safe monitoring functonality, but that would dramatically expand the scope of this test. // We could build an entire thread safe monitoring functonality, but that would dramatically expand the scope of this test.
func TestContactRetryQueue(t *testing.T) { func TestContactRetryQueue(t *testing.T) {
log.SetLevel(log.LevelDebug) log.SetLevel(log.LevelDebug)
bus := event.NewEventManager() bus := event.NewEventManager()
@ -72,3 +73,56 @@ func TestContactRetryQueue(t *testing.T) {
cr.Shutdown() cr.Shutdown()
} }
// Takes around 4 min unless you adjust the consts for tickTimeSec and circuitTimeoutSecs
/*
func TestRetryEmission(t *testing.T) {
log.SetLevel(log.LevelDebug)
log.Infof("*** Starting TestRetryEmission! ***")
bus := event.NewEventManager()
testQueue := event.NewQueue()
bus.Subscribe(event.PeerRequest, testQueue)
cr := NewConnectionRetry(bus, "").(*contactRetry)
cr.Start()
time.Sleep(100 * time.Millisecond)
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, "100"))
bus.Publish(event.NewEventList(event.ProtocolEngineCreated))
pub, _, _ := ed25519.GenerateKey(rand.Reader)
peerAddr := tor.GetTorV3Hostname(pub)
bus.Publish(event.NewEventList(event.QueuePeerRequest, event.RemotePeer, peerAddr, event.LastSeen, time.Now().Format(time.RFC3339Nano)))
log.Infof("Fetching 1st event")
ev := testQueue.Next()
if ev.EventType != event.PeerRequest {
t.Errorf("1st event emitted was %v, expected %v", ev.EventType, event.PeerRequest)
}
log.Infof("1st event: %v", ev)
bus.Publish(event.NewEventList(event.PeerStateChange, event.RemotePeer, peerAddr, event.ConnectionState, connections.ConnectionStateName[connections.DISCONNECTED]))
log.Infof("fetching 2nd event")
ev = testQueue.Next()
log.Infof("2nd event: %v", ev)
if ev.EventType != event.PeerRequest {
t.Errorf("2nd event emitted was %v, expected %v", ev.EventType, event.PeerRequest)
}
bus.Publish(event.NewEventList(event.PeerStateChange, event.RemotePeer, peerAddr, event.ConnectionState, connections.ConnectionStateName[connections.CONNECTED]))
time.Sleep(100 * time.Millisecond)
bus.Publish(event.NewEventList(event.PeerStateChange, event.RemotePeer, peerAddr, event.ConnectionState, connections.ConnectionStateName[connections.DISCONNECTED]))
log.Infof("fetching 3rd event")
ev = testQueue.Next()
log.Infof("3nd event: %v", ev)
if ev.EventType != event.PeerRequest {
t.Errorf("3nd event emitted was %v, expected %v", ev.EventType, event.PeerRequest)
}
cr.Shutdown()
}
*/