From 726fe28498b23cde22d86e5f724cf0e8ca196109 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Sat, 3 Dec 2022 09:49:32 -0800 Subject: [PATCH] remove locking/atomic from contactRetry as its single threaded --- app/plugins/contactRetry.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index efd83fe..704a984 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -6,7 +6,6 @@ import ( "git.openprivacy.ca/openprivacy/log" "math" "sync" - "sync/atomic" "time" ) @@ -62,7 +61,6 @@ func (a *contact) compare(b *contact) int { type connectionQueue struct { queue []*contact - lock sync.Mutex } func newConnectionQueue() *connectionQueue { @@ -70,8 +68,6 @@ func newConnectionQueue() *connectionQueue { } func (cq *connectionQueue) insert(c *contact) { - cq.lock.Lock() - defer cq.lock.Unlock() // find loc i := 0 var b *contact @@ -93,8 +89,6 @@ func (cq *connectionQueue) insert(c *contact) { } func (cq *connectionQueue) dequeue() *contact { - cq.lock.Lock() - defer cq.lock.Unlock() if len(cq.queue) == 0 { return nil } @@ -115,7 +109,7 @@ type contactRetry struct { lastCheck time.Time connections sync.Map //[string]*contact - connCount int64 + connCount int pendingQueue *connectionQueue } @@ -174,8 +168,7 @@ func (cr *contactRetry) run() { for { cr.requeueReady() connectingCount := cr.connectingCount() - connCount := atomic.LoadInt64(&cr.connCount) - log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), connCount, connectingCount) + log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount) for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { contact := cr.pendingQueue.dequeue() // could have received incoming connection while in queue, make sure still disconnected before trying @@ -258,10 +251,9 @@ func (cr *contactRetry) requeueReady() { } retryable := []*contact{} - count := atomic.LoadInt64(&cr.connCount) throughPutPerMin := cr.maxTorCircuitsPending() / (circutTimeoutSecs / 60) - adjustedBaseTimeout := int(count) / throughPutPerMin * 60 + adjustedBaseTimeout := cr.connCount / throughPutPerMin * 60 if adjustedBaseTimeout < circutTimeoutSecs { adjustedBaseTimeout = circutTimeoutSecs } else if adjustedBaseTimeout > MaxBaseTimeoutSec { @@ -303,7 +295,7 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta if _, exists := cr.connections.Load(id); !exists { p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false} cr.connections.Store(id, p) - atomic.AddInt64(&cr.connCount, 1) + cr.connCount += 1 return } }