Priority Queue Most Common Contact Requests
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2022-12-07 11:30:11 -08:00
parent bfe8b1e51f
commit 06a2539502
3 changed files with 38 additions and 16 deletions

View File

@ -112,14 +112,15 @@ type contactRetry struct {
onion string onion string
lastCheck time.Time lastCheck time.Time
connections sync.Map //[string]*contact connections sync.Map //[string]*contact
connCount int connCount int
pendingQueue *connectionQueue pendingQueue *connectionQueue
priorityQueue *connectionQueue
} }
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin { func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue()} cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
return cr return cr
} }
@ -175,7 +176,24 @@ func (cr *contactRetry) run() {
if cr.ACNUp { if cr.ACNUp {
cr.requeueReady() cr.requeueReady()
connectingCount := cr.connectingCount() connectingCount := cr.connectingCount()
log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount) log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
// do priority connections first...
for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 {
for {
contact := cr.priorityQueue.dequeue()
if contact == nil {
break
}
// could have received incoming connection while in queue, make sure still disconnected before trying
if contact.state == connections.DISCONNECTED {
cr.publishConnectionRequest(contact)
connectingCount++
break
}
}
}
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
for { for {
contact := cr.pendingQueue.dequeue() contact := cr.pendingQueue.dequeue()
@ -226,7 +244,13 @@ func (cr *contactRetry) run() {
if c, ok := cr.connections.Load(id); ok { if c, ok := cr.connections.Load(id); ok {
contact := c.(*contact) contact := c.(*contact)
if contact.state == connections.DISCONNECTED && !contact.queued { if contact.state == connections.DISCONNECTED && !contact.queued {
cr.pendingQueue.insert(contact)
// prioritize connections made in the last week
if time.Since(contact.lastSeen).Hours() < 168 {
cr.priorityQueue.insert(contact)
} else {
cr.pendingQueue.insert(contact)
}
} }
} }
case event.ProtocolEngineCreated: case event.ProtocolEngineCreated:
@ -289,7 +313,11 @@ func (cr *contactRetry) requeueReady() {
return true return true
}) })
for _, contact := range retryable { for _, contact := range retryable {
cr.pendingQueue.insert(contact) if time.Since(contact.lastSeen).Hours() < 168 {
cr.priorityQueue.insert(contact)
} else {
cr.pendingQueue.insert(contact)
}
} }
} }
@ -310,11 +338,6 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta
return return
} }
// if it's been more than a week then don't add
if time.Now().Sub(lastSeen).Hours() > 168 {
return
}
if _, exists := cr.connections.Load(id); !exists { if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false} p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false}
cr.connections.Store(id, p) cr.connections.Store(id, p)

View File

@ -825,7 +825,9 @@ func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
if err == nil { if err == nil {
lastSeen = cp.GetConversationLastSeenTime(ci.ID) lastSeen = cp.GetConversationLastSeenTime(ci.ID)
} }
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) if !ci.ACL[ci.Handle].Blocked && ci.Accepted {
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
}
} }
// QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests // QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests

View File

@ -3,9 +3,6 @@
echo "Checking code quality (you want to see no output here)" echo "Checking code quality (you want to see no output here)"
echo "" echo ""
echo "Vetting:"
go vet ./...
echo "" echo ""
echo "Linting:" echo "Linting:"