diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index e8f67dd..d7fa5b0 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -104,6 +104,10 @@ func (cq *connectionQueue) dequeue() *contact { return c } +func (cq *connectionQueue) len() int { + return len(cq.queue) +} + type contactRetry struct { bus event.Manager queue event.Queue @@ -117,14 +121,13 @@ type contactRetry struct { acnProgress int connections sync.Map //[string]*contact - connCount int pendingQueue *connectionQueue priorityQueue *connectionQueue } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} return cr } @@ -182,7 +185,6 @@ func (cr *contactRetry) run() { log.Debugf("restartFlow time to queue!!") cr.requeueReady() connectingCount := cr.connectingCount() - log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount) // do priority connections first... for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 { @@ -301,7 +303,6 @@ func (cr *contactRetry) processStatus() { cr.ACNUpTime = time.Now() // reset all of the queues... - cr.connCount = 0 cr.priorityQueue = newConnectionQueue() cr.pendingQueue = newConnectionQueue() @@ -340,8 +341,14 @@ func (cr *contactRetry) requeueReady() { var retryable []*contact - throughPutPerMin := cr.maxTorCircuitsPending() / (circuitTimeoutSecs / 60) - adjustedBaseTimeout := cr.connCount / throughPutPerMin * 60 + throughPutPerMin := int((float64(cr.maxTorCircuitsPending()) / float64(circuitTimeoutSecs)) * 60.0) + queueCount := cr.priorityQueue.len() + cr.pendingQueue.len() + // adjustedBaseTimeout = basetimeoust * (queuedItemsCount / throughPutPerMin) + // when less items are queued than through put it'll lower adjustedBaseTimeOut, but that'll be reset in the next block + // when more items are queued it will increase the timeout, to a max of MaxBaseTimeoutSec (enforced in the next block) + adjustedBaseTimeout := circuitTimeoutSecs * (queueCount / throughPutPerMin) + + // circuitTimeoutSecs (120s) < adjustedBaseTimeout < MaxBaseTimeoutSec (300s) if adjustedBaseTimeout < circuitTimeoutSecs { adjustedBaseTimeout = circuitTimeoutSecs } else if adjustedBaseTimeout > MaxBaseTimeoutSec { @@ -388,14 +395,12 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta if _, exists := cr.connections.Load(id); !exists { p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false} cr.connections.Store(id, p) - cr.connCount += 1 return } else { // we have rerequested this connnection. Force set the queued parameter to true. p, _ := cr.connections.Load(id) if !p.(*contact).queued { p.(*contact).queued = true - cr.connCount += 1 } } } diff --git a/app/plugins/contactRetry_test.go b/app/plugins/contactRetry_test.go index d1c202e..adf2604 100644 --- a/app/plugins/contactRetry_test.go +++ b/app/plugins/contactRetry_test.go @@ -14,6 +14,7 @@ import ( // We are invasively checking the internal state of the retry plugin and accessing pointers from another // thread. // We could build an entire thread safe monitoring functonality, but that would dramatically expand the scope of this test. + func TestContactRetryQueue(t *testing.T) { log.SetLevel(log.LevelDebug) bus := event.NewEventManager() @@ -72,3 +73,56 @@ func TestContactRetryQueue(t *testing.T) { cr.Shutdown() } + +// Takes around 4 min unless you adjust the consts for tickTimeSec and circuitTimeoutSecs +/* +func TestRetryEmission(t *testing.T) { + log.SetLevel(log.LevelDebug) + log.Infof("*** Starting TestRetryEmission! ***") + bus := event.NewEventManager() + + testQueue := event.NewQueue() + bus.Subscribe(event.PeerRequest, testQueue) + + cr := NewConnectionRetry(bus, "").(*contactRetry) + cr.Start() + time.Sleep(100 * time.Millisecond) + + bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, "100")) + bus.Publish(event.NewEventList(event.ProtocolEngineCreated)) + + pub, _, _ := ed25519.GenerateKey(rand.Reader) + peerAddr := tor.GetTorV3Hostname(pub) + + bus.Publish(event.NewEventList(event.QueuePeerRequest, event.RemotePeer, peerAddr, event.LastSeen, time.Now().Format(time.RFC3339Nano))) + + log.Infof("Fetching 1st event") + ev := testQueue.Next() + if ev.EventType != event.PeerRequest { + t.Errorf("1st event emitted was %v, expected %v", ev.EventType, event.PeerRequest) + } + log.Infof("1st event: %v", ev) + + bus.Publish(event.NewEventList(event.PeerStateChange, event.RemotePeer, peerAddr, event.ConnectionState, connections.ConnectionStateName[connections.DISCONNECTED])) + + log.Infof("fetching 2nd event") + ev = testQueue.Next() + log.Infof("2nd event: %v", ev) + if ev.EventType != event.PeerRequest { + t.Errorf("2nd event emitted was %v, expected %v", ev.EventType, event.PeerRequest) + } + + bus.Publish(event.NewEventList(event.PeerStateChange, event.RemotePeer, peerAddr, event.ConnectionState, connections.ConnectionStateName[connections.CONNECTED])) + time.Sleep(100 * time.Millisecond) + bus.Publish(event.NewEventList(event.PeerStateChange, event.RemotePeer, peerAddr, event.ConnectionState, connections.ConnectionStateName[connections.DISCONNECTED])) + + log.Infof("fetching 3rd event") + ev = testQueue.Next() + log.Infof("3nd event: %v", ev) + if ev.EventType != event.PeerRequest { + t.Errorf("3nd event emitted was %v, expected %v", ev.EventType, event.PeerRequest) + } + + cr.Shutdown() +} +*/