diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 3589c1a..23f0609 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -20,6 +20,8 @@ const circutTimeoutSecs int = 120 const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m +const PriorityQueueTimeSinceQualifierHours float64 = 168 + type connectionType int const ( @@ -112,14 +114,15 @@ type contactRetry struct { onion string lastCheck time.Time - connections sync.Map //[string]*contact - connCount int - pendingQueue *connectionQueue + connections sync.Map //[string]*contact + connCount int + pendingQueue *connectionQueue + priorityQueue *connectionQueue } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue()} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} return cr } @@ -175,9 +178,26 @@ func (cr *contactRetry) run() { if cr.ACNUp { cr.requeueReady() connectingCount := cr.connectingCount() - log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount) + log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount) + + // do priority connections first... + for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 { + contact := cr.priorityQueue.dequeue() + if contact == nil { + break + } + // could have received incoming connection while in queue, make sure still disconnected before trying + if contact.state == connections.DISCONNECTED { + cr.publishConnectionRequest(contact) + connectingCount++ + } + } + for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { contact := cr.pendingQueue.dequeue() + if contact == nil { + break + } // could have received incoming connection while in queue, make sure still disconnected before trying if contact.state == connections.DISCONNECTED { cr.publishConnectionRequest(contact) @@ -220,7 +240,13 @@ func (cr *contactRetry) run() { if c, ok := cr.connections.Load(id); ok { contact := c.(*contact) if contact.state == connections.DISCONNECTED && !contact.queued { - cr.pendingQueue.insert(contact) + + // prioritize connections made in the last week + if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours { + cr.priorityQueue.insert(contact) + } else { + cr.pendingQueue.insert(contact) + } } } case event.ProtocolEngineCreated: @@ -283,7 +309,11 @@ func (cr *contactRetry) requeueReady() { return true }) for _, contact := range retryable { - cr.pendingQueue.insert(contact) + if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours { + cr.priorityQueue.insert(contact) + } else { + cr.pendingQueue.insert(contact) + } } } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 9b29ba7..6f911ee 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -825,7 +825,9 @@ func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) { if err == nil { lastSeen = cp.GetConversationLastSeenTime(ci.ID) } - cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) + if !ci.ACL[ci.Handle].Blocked && ci.Accepted { + cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) + } } // QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests diff --git a/testing/quality.sh b/testing/quality.sh index 5f6427f..c0bb82b 100755 --- a/testing/quality.sh +++ b/testing/quality.sh @@ -3,9 +3,6 @@ echo "Checking code quality (you want to see no output here)" echo "" -echo "Vetting:" -go vet ./... - echo "" echo "Linting:"