Add a Priority Queue for Most Common Contacts #481

Merged
dan merged 4 commits from priority into master 2022-12-07 21:37:30 +00:00
3 changed files with 40 additions and 11 deletions

View File

@ -20,6 +20,8 @@ const circutTimeoutSecs int = 120
const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
const PriorityQueueTimeSinceQualifierHours float64 = 168
type connectionType int
const (
@ -112,14 +114,15 @@ type contactRetry struct {
onion string
lastCheck time.Time
connections sync.Map //[string]*contact
connCount int
pendingQueue *connectionQueue
connections sync.Map //[string]*contact
connCount int
pendingQueue *connectionQueue
priorityQueue *connectionQueue
}
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue()}
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
return cr
}
@ -175,9 +178,26 @@ func (cr *contactRetry) run() {
if cr.ACNUp {
cr.requeueReady()
connectingCount := cr.connectingCount()
log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount)
log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
// do priority connections first...
dan marked this conversation as resolved
Review

why the nested inner for loop?

why the nested inner for loop?
for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 {
contact := cr.priorityQueue.dequeue()
if contact == nil {
Review

this shouldn't be needed, the outer loop checks len, so it should be redundant

this shouldn't be needed, the outer loop checks len, so it should be redundant
break
}
// could have received incoming connection while in queue, make sure still disconnected before trying
if contact.state == connections.DISCONNECTED {
cr.publishConnectionRequest(contact)
connectingCount++
}
}
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
contact := cr.pendingQueue.dequeue()
if contact == nil {
break
}
// could have received incoming connection while in queue, make sure still disconnected before trying
if contact.state == connections.DISCONNECTED {
cr.publishConnectionRequest(contact)
@ -220,7 +240,13 @@ func (cr *contactRetry) run() {
if c, ok := cr.connections.Load(id); ok {
contact := c.(*contact)
if contact.state == connections.DISCONNECTED && !contact.queued {
cr.pendingQueue.insert(contact)
// prioritize connections made in the last week
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
cr.priorityQueue.insert(contact)
} else {
cr.pendingQueue.insert(contact)
}
dan marked this conversation as resolved
Review

define 168 as something like PriorityQueueTimeSinceQualifierHours or... umm.. something less unwordly so we can tweak it in one place instead of two

define 168 as something like PriorityQueueTimeSinceQualifierHours or... umm.. something less unwordly so we can tweak it in one place instead of two
}
}
case event.ProtocolEngineCreated:
@ -283,7 +309,11 @@ func (cr *contactRetry) requeueReady() {
return true
})
for _, contact := range retryable {
cr.pendingQueue.insert(contact)
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
cr.priorityQueue.insert(contact)
} else {
cr.pendingQueue.insert(contact)
}
}
}

View File

@ -825,7 +825,9 @@ func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
if err == nil {
lastSeen = cp.GetConversationLastSeenTime(ci.ID)
}
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
if !ci.ACL[ci.Handle].Blocked && ci.Accepted {
Review

eek good catch thanks

eek good catch thanks
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
}
}
// QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests

View File

@ -3,9 +3,6 @@
echo "Checking code quality (you want to see no output here)"
echo ""
echo "Vetting:"
go vet ./...
echo ""
echo "Linting:"