remove locking/atomic from contactRetry as its single threaded
This commit is contained in:
parent
ad72ce6e7a
commit
726fe28498
|
@ -6,7 +6,6 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,7 +61,6 @@ func (a *contact) compare(b *contact) int {
|
||||||
|
|
||||||
type connectionQueue struct {
|
type connectionQueue struct {
|
||||||
queue []*contact
|
queue []*contact
|
||||||
lock sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConnectionQueue() *connectionQueue {
|
func newConnectionQueue() *connectionQueue {
|
||||||
|
@ -70,8 +68,6 @@ func newConnectionQueue() *connectionQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cq *connectionQueue) insert(c *contact) {
|
func (cq *connectionQueue) insert(c *contact) {
|
||||||
cq.lock.Lock()
|
|
||||||
defer cq.lock.Unlock()
|
|
||||||
// find loc
|
// find loc
|
||||||
i := 0
|
i := 0
|
||||||
var b *contact
|
var b *contact
|
||||||
|
@ -93,8 +89,6 @@ func (cq *connectionQueue) insert(c *contact) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cq *connectionQueue) dequeue() *contact {
|
func (cq *connectionQueue) dequeue() *contact {
|
||||||
cq.lock.Lock()
|
|
||||||
defer cq.lock.Unlock()
|
|
||||||
if len(cq.queue) == 0 {
|
if len(cq.queue) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -115,7 +109,7 @@ type contactRetry struct {
|
||||||
lastCheck time.Time
|
lastCheck time.Time
|
||||||
|
|
||||||
connections sync.Map //[string]*contact
|
connections sync.Map //[string]*contact
|
||||||
connCount int64
|
connCount int
|
||||||
pendingQueue *connectionQueue
|
pendingQueue *connectionQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,8 +168,7 @@ func (cr *contactRetry) run() {
|
||||||
for {
|
for {
|
||||||
cr.requeueReady()
|
cr.requeueReady()
|
||||||
connectingCount := cr.connectingCount()
|
connectingCount := cr.connectingCount()
|
||||||
connCount := atomic.LoadInt64(&cr.connCount)
|
log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount)
|
||||||
log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), connCount, connectingCount)
|
|
||||||
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
|
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
|
||||||
contact := cr.pendingQueue.dequeue()
|
contact := cr.pendingQueue.dequeue()
|
||||||
// could have received incoming connection while in queue, make sure still disconnected before trying
|
// could have received incoming connection while in queue, make sure still disconnected before trying
|
||||||
|
@ -258,10 +251,9 @@ func (cr *contactRetry) requeueReady() {
|
||||||
}
|
}
|
||||||
|
|
||||||
retryable := []*contact{}
|
retryable := []*contact{}
|
||||||
count := atomic.LoadInt64(&cr.connCount)
|
|
||||||
|
|
||||||
throughPutPerMin := cr.maxTorCircuitsPending() / (circutTimeoutSecs / 60)
|
throughPutPerMin := cr.maxTorCircuitsPending() / (circutTimeoutSecs / 60)
|
||||||
adjustedBaseTimeout := int(count) / throughPutPerMin * 60
|
adjustedBaseTimeout := cr.connCount / throughPutPerMin * 60
|
||||||
if adjustedBaseTimeout < circutTimeoutSecs {
|
if adjustedBaseTimeout < circutTimeoutSecs {
|
||||||
adjustedBaseTimeout = circutTimeoutSecs
|
adjustedBaseTimeout = circutTimeoutSecs
|
||||||
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
|
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
|
||||||
|
@ -303,7 +295,7 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta
|
||||||
if _, exists := cr.connections.Load(id); !exists {
|
if _, exists := cr.connections.Load(id); !exists {
|
||||||
p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false}
|
p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false}
|
||||||
cr.connections.Store(id, p)
|
cr.connections.Store(id, p)
|
||||||
atomic.AddInt64(&cr.connCount, 1)
|
cr.connCount += 1
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue