Add a Priority Queue for Most Common Contacts #481
|
@ -20,6 +20,8 @@ const circutTimeoutSecs int = 120
|
|||
const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
|
||||
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
|
||||
|
||||
const PriorityQueueTimeSinceQualifierHours float64 = 168
|
||||
|
||||
type connectionType int
|
||||
|
||||
const (
|
||||
|
@ -112,14 +114,15 @@ type contactRetry struct {
|
|||
onion string
|
||||
lastCheck time.Time
|
||||
|
||||
connections sync.Map //[string]*contact
|
||||
connCount int
|
||||
pendingQueue *connectionQueue
|
||||
connections sync.Map //[string]*contact
|
||||
connCount int
|
||||
pendingQueue *connectionQueue
|
||||
priorityQueue *connectionQueue
|
||||
}
|
||||
|
||||
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
|
||||
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
|
||||
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue()}
|
||||
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
|
||||
return cr
|
||||
}
|
||||
|
||||
|
@ -175,9 +178,26 @@ func (cr *contactRetry) run() {
|
|||
if cr.ACNUp {
|
||||
cr.requeueReady()
|
||||
connectingCount := cr.connectingCount()
|
||||
log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount)
|
||||
log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
|
||||
|
||||
// do priority connections first...
|
||||
dan marked this conversation as resolved
|
||||
for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 {
|
||||
contact := cr.priorityQueue.dequeue()
|
||||
if contact == nil {
|
||||
dan
commented
this shouldn't be needed, the outer loop checks len, so it should be redundant this shouldn't be needed, the outer loop checks len, so it should be redundant
|
||||
break
|
||||
}
|
||||
// could have received incoming connection while in queue, make sure still disconnected before trying
|
||||
if contact.state == connections.DISCONNECTED {
|
||||
cr.publishConnectionRequest(contact)
|
||||
connectingCount++
|
||||
}
|
||||
}
|
||||
|
||||
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
|
||||
contact := cr.pendingQueue.dequeue()
|
||||
if contact == nil {
|
||||
break
|
||||
}
|
||||
// could have received incoming connection while in queue, make sure still disconnected before trying
|
||||
if contact.state == connections.DISCONNECTED {
|
||||
cr.publishConnectionRequest(contact)
|
||||
|
@ -220,7 +240,13 @@ func (cr *contactRetry) run() {
|
|||
if c, ok := cr.connections.Load(id); ok {
|
||||
contact := c.(*contact)
|
||||
if contact.state == connections.DISCONNECTED && !contact.queued {
|
||||
cr.pendingQueue.insert(contact)
|
||||
|
||||
// prioritize connections made in the last week
|
||||
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
||||
cr.priorityQueue.insert(contact)
|
||||
} else {
|
||||
cr.pendingQueue.insert(contact)
|
||||
}
|
||||
dan marked this conversation as resolved
dan
commented
define 168 as something like PriorityQueueTimeSinceQualifierHours or... umm.. something less unwordly so we can tweak it in one place instead of two define 168 as something like PriorityQueueTimeSinceQualifierHours or... umm.. something less unwordly so we can tweak it in one place instead of two
|
||||
}
|
||||
}
|
||||
case event.ProtocolEngineCreated:
|
||||
|
@ -283,7 +309,11 @@ func (cr *contactRetry) requeueReady() {
|
|||
return true
|
||||
})
|
||||
for _, contact := range retryable {
|
||||
cr.pendingQueue.insert(contact)
|
||||
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
||||
cr.priorityQueue.insert(contact)
|
||||
} else {
|
||||
cr.pendingQueue.insert(contact)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -825,7 +825,9 @@ func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
|
|||
if err == nil {
|
||||
lastSeen = cp.GetConversationLastSeenTime(ci.ID)
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
|
||||
if !ci.ACL[ci.Handle].Blocked && ci.Accepted {
|
||||
dan
commented
eek good catch thanks eek good catch thanks
|
||||
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
|
||||
}
|
||||
}
|
||||
|
||||
// QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
|
||||
|
|
|
@ -3,9 +3,6 @@
|
|||
echo "Checking code quality (you want to see no output here)"
|
||||
echo ""
|
||||
|
||||
echo "Vetting:"
|
||||
go vet ./...
|
||||
|
||||
echo ""
|
||||
echo "Linting:"
|
||||
|
||||
|
|
Loading…
Reference in New Issue
why the nested inner for loop?