2019-08-02 01:09:01 +00:00
package plugins
import (
2022-09-10 17:34:36 +00:00
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
2023-08-28 20:17:55 +00:00
"git.openprivacy.ca/openprivacy/connectivity/tor"
2022-09-10 17:34:36 +00:00
"git.openprivacy.ca/openprivacy/log"
2022-09-26 01:28:04 +00:00
"math"
2023-04-27 21:16:24 +00:00
"strconv"
2022-09-10 17:34:36 +00:00
"sync"
"time"
2019-08-02 01:09:01 +00:00
)
2022-12-03 17:50:31 +00:00
// Todo: Move to protocol/connections
// This Plugin is now required and it makes more sense to run more integrated in engine
2022-12-03 00:23:11 +00:00
const tickTimeSec = 30
2022-09-26 01:28:04 +00:00
const tickTime = tickTimeSec * time . Second
2022-12-03 00:23:11 +00:00
2023-04-17 19:05:02 +00:00
const circuitTimeoutSecs int = 120
2022-12-03 00:23:11 +00:00
const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
2019-08-02 01:09:01 +00:00
2022-12-07 20:55:58 +00:00
const PriorityQueueTimeSinceQualifierHours float64 = 168
2019-09-26 23:43:34 +00:00
type connectionType int
const (
2022-09-10 17:34:36 +00:00
peerConn connectionType = iota
serverConn
2019-09-26 23:43:34 +00:00
)
type contact struct {
2022-09-10 17:34:36 +00:00
id string
state connections . ConnectionState
ctype connectionType
2019-08-02 01:09:01 +00:00
2022-12-03 00:23:11 +00:00
lastAttempt time . Time
failedCount int
lastSeen time . Time
queued bool
}
// compare a to b
// returns -1 if a < b
//
// 0 if a == b
// +1 if a > b
//
// algo: sort by failedCount first favouring less attempts, then sort by lastSeen time favouring more recent connections
func ( a * contact ) compare ( b * contact ) int {
if a . failedCount < b . failedCount {
return - 1
} else if a . failedCount > b . failedCount {
return + 1
}
if a . lastSeen . After ( b . lastSeen ) {
return - 1
} else if a . lastSeen . Before ( b . lastSeen ) {
return + 1
}
return 0
}
type connectionQueue struct {
queue [ ] * contact
}
func newConnectionQueue ( ) * connectionQueue {
return & connectionQueue { queue : [ ] * contact { } }
}
func ( cq * connectionQueue ) insert ( c * contact ) {
// find loc
i := 0
var b * contact
for i , b = range cq . queue {
if c . compare ( b ) >= 0 {
break
}
}
// insert
if len ( cq . queue ) == i { // nil or empty slice or after last element
cq . queue = append ( cq . queue , c )
} else {
cq . queue = append ( cq . queue [ : i + 1 ] , cq . queue [ i : ] ... ) // index < len(a)
cq . queue [ i ] = c
}
c . queued = true
}
func ( cq * connectionQueue ) dequeue ( ) * contact {
if len ( cq . queue ) == 0 {
return nil
}
c := cq . queue [ 0 ]
cq . queue = cq . queue [ 1 : ]
c . queued = false
return c
2019-08-02 01:09:01 +00:00
}
2023-07-07 07:05:45 +00:00
func ( cq * connectionQueue ) len ( ) int {
return len ( cq . queue )
}
2019-08-02 01:09:01 +00:00
type contactRetry struct {
2022-12-03 23:48:09 +00:00
bus event . Manager
queue event . Queue
ACNUp bool
ACNUpTime time . Time
protocolEngine bool
running bool
breakChan chan bool
onion string
lastCheck time . Time
2023-04-27 21:16:24 +00:00
acnProgress int
2022-09-10 17:34:36 +00:00
2023-08-28 20:23:25 +00:00
connections sync . Map //[string]*contact
pendingQueue * connectionQueue
priorityQueue * connectionQueue
2023-08-28 20:17:55 +00:00
authorizedPeers sync . Map
2023-09-19 19:21:53 +00:00
stallRetries bool
2019-08-02 01:09:01 +00:00
}
2022-12-03 00:23:11 +00:00
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
2021-03-19 21:20:22 +00:00
func NewConnectionRetry ( bus event . Manager , onion string ) Plugin {
2023-09-19 19:21:53 +00:00
cr := & contactRetry { bus : bus , queue : event . NewQueue ( ) , breakChan : make ( chan bool , 1 ) , authorizedPeers : sync . Map { } , connections : sync . Map { } , stallRetries : true , ACNUp : false , ACNUpTime : time . Now ( ) , protocolEngine : false , onion : onion , pendingQueue : newConnectionQueue ( ) , priorityQueue : newConnectionQueue ( ) }
2022-09-10 17:34:36 +00:00
return cr
2019-08-02 01:09:01 +00:00
}
2022-12-03 00:23:11 +00:00
// maxTorCircuitsPending a function to throttle access to tor network during start up
func ( cr * contactRetry ) maxTorCircuitsPending ( ) int {
2022-12-03 23:48:09 +00:00
timeSinceStart := time . Since ( cr . ACNUpTime )
2022-12-03 00:23:11 +00:00
if timeSinceStart < 30 * time . Second {
return 4
} else if timeSinceStart < 4 * time . Minute {
return 8
} else if timeSinceStart < 8 * time . Minute {
return 16
}
return connections . TorMaxPendingConns
}
func ( cr * contactRetry ) connectingCount ( ) int {
connecting := 0
cr . connections . Range ( func ( k , v interface { } ) bool {
conn := v . ( * contact )
if conn . state == connections . CONNECTING {
connecting ++
}
return true
} )
return connecting
}
2019-08-02 01:09:01 +00:00
func ( cr * contactRetry ) Start ( ) {
2022-09-10 17:34:36 +00:00
if ! cr . running {
go cr . run ( )
} else {
log . Errorf ( "Attempted to start Contact Retry plugin twice for %v" , cr . onion )
}
2019-08-02 01:09:01 +00:00
}
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) Id ( ) PluginID {
return CONNECTIONRETRY
}
2019-08-02 01:09:01 +00:00
func ( cr * contactRetry ) run ( ) {
2022-09-10 17:34:36 +00:00
cr . running = true
cr . bus . Subscribe ( event . PeerStateChange , cr . queue )
cr . bus . Subscribe ( event . ACNStatus , cr . queue )
cr . bus . Subscribe ( event . ServerStateChange , cr . queue )
2022-12-03 00:23:11 +00:00
cr . bus . Subscribe ( event . QueuePeerRequest , cr . queue )
cr . bus . Subscribe ( event . QueueJoinServer , cr . queue )
2023-09-13 17:01:00 +00:00
cr . bus . Subscribe ( event . DisconnectPeerRequest , cr . queue )
cr . bus . Subscribe ( event . DisconnectServerRequest , cr . queue )
2022-12-03 23:48:09 +00:00
cr . bus . Subscribe ( event . ProtocolEngineShutdown , cr . queue )
cr . bus . Subscribe ( event . ProtocolEngineCreated , cr . queue )
2023-08-29 19:16:49 +00:00
cr . bus . Subscribe ( event . DeleteContact , cr . queue )
2023-08-29 19:20:08 +00:00
cr . bus . Subscribe ( event . UpdateConversationAuthorization , cr . queue )
2023-09-19 19:21:53 +00:00
cr . bus . Subscribe ( event . PurgeRetries , cr . queue )
cr . bus . Subscribe ( event . ResumeRetries , cr . queue )
2022-09-10 17:34:36 +00:00
for {
2023-05-16 22:41:08 +00:00
// Only attempt connection if both the ACN and the Protocol Engines are Online...
log . Debugf ( "restartFlow checking state" )
2023-09-19 19:21:53 +00:00
if cr . ACNUp && cr . protocolEngine && ! cr . stallRetries {
2023-05-16 22:41:08 +00:00
log . Debugf ( "restartFlow time to queue!!" )
2022-12-03 23:48:09 +00:00
cr . requeueReady ( )
connectingCount := cr . connectingCount ( )
2022-12-07 19:30:11 +00:00
// do priority connections first...
for connectingCount < cr . maxTorCircuitsPending ( ) && len ( cr . priorityQueue . queue ) > 0 {
2022-12-07 20:55:58 +00:00
contact := cr . priorityQueue . dequeue ( )
if contact == nil {
break
}
// could have received incoming connection while in queue, make sure still disconnected before trying
if contact . state == connections . DISCONNECTED {
cr . publishConnectionRequest ( contact )
connectingCount ++
2022-12-07 19:30:11 +00:00
}
}
2022-12-03 23:48:09 +00:00
for connectingCount < cr . maxTorCircuitsPending ( ) && len ( cr . pendingQueue . queue ) > 0 {
2022-12-07 20:55:58 +00:00
contact := cr . pendingQueue . dequeue ( )
if contact == nil {
break
}
// could have received incoming connection while in queue, make sure still disconnected before trying
if contact . state == connections . DISCONNECTED {
cr . publishConnectionRequest ( contact )
connectingCount ++
2022-12-03 23:48:09 +00:00
}
2022-12-03 00:23:11 +00:00
}
2022-12-03 23:48:09 +00:00
cr . lastCheck = time . Now ( )
2022-09-10 17:34:36 +00:00
}
2023-07-20 04:57:13 +00:00
// regardless of if we're up, run manual force deconnectiong of timed out connections
cr . connections . Range ( func ( k , v interface { } ) bool {
p := v . ( * contact )
if p . state == connections . CONNECTING && time . Since ( p . lastAttempt ) > time . Duration ( circuitTimeoutSecs ) * time . Second * 2 {
// we have been "connecting" for twice the circuttimeout so it's failed, we just didn't learn about it, manually disconnect
cr . handleEvent ( p . id , connections . DISCONNECTED , p . ctype )
log . Errorf ( "had to manually set peer %v of profile %v to DISCONNECTED due to assumed circuit timeout (%v) seconds" , p . id , cr . onion , circuitTimeoutSecs * 2 )
}
return true
} )
2022-12-03 00:23:11 +00:00
2022-09-10 17:34:36 +00:00
select {
case e := <- cr . queue . OutChan ( ) :
switch e . EventType {
2023-09-19 19:21:53 +00:00
case event . PurgeRetries :
// Purge All Authorized Peers
cr . authorizedPeers . Range ( func ( key interface { } , value interface { } ) bool {
cr . authorizedPeers . Delete ( key )
return true
} )
// Purge All Connection States
cr . connections . Range ( func ( key interface { } , value interface { } ) bool {
cr . connections . Delete ( key )
return true
} )
case event . ResumeRetries :
log . Infof ( "resuming retries..." )
cr . stallRetries = false
2023-09-13 17:01:00 +00:00
case event . DisconnectPeerRequest :
peer := e . Data [ event . RemotePeer ]
cr . authorizedPeers . Delete ( peer )
case event . DisconnectServerRequest :
peer := e . Data [ event . GroupServer ]
cr . authorizedPeers . Delete ( peer )
2023-08-29 19:16:49 +00:00
case event . DeleteContact :
// this case covers both servers and peers (servers are peers, and go through the
// same delete conversation flow)
peer := e . Data [ event . RemotePeer ]
cr . authorizedPeers . Delete ( peer )
2023-08-29 19:20:08 +00:00
case event . UpdateConversationAuthorization :
// if we update the conversation authorization then we need to check if
// we need to remove blocked conversations from the regular flow.
peer := e . Data [ event . RemotePeer ]
blocked := e . Data [ event . Blocked ]
if blocked == "true" {
cr . authorizedPeers . Delete ( peer )
}
2022-09-10 17:34:36 +00:00
case event . PeerStateChange :
state := connections . ConnectionStateToType ( ) [ e . Data [ event . ConnectionState ] ]
peer := e . Data [ event . RemotePeer ]
2023-08-28 20:17:55 +00:00
// only handle state change events from pre-authorized peers;
2023-08-28 20:23:25 +00:00
if _ , exists := cr . authorizedPeers . Load ( peer ) ; exists {
2023-08-28 20:17:55 +00:00
cr . handleEvent ( peer , state , peerConn )
}
2022-09-10 17:34:36 +00:00
case event . ServerStateChange :
state := connections . ConnectionStateToType ( ) [ e . Data [ event . ConnectionState ] ]
server := e . Data [ event . GroupServer ]
2023-08-28 20:17:55 +00:00
// only handle state change events from pre-authorized servers;
2023-08-28 20:23:25 +00:00
if _ , exists := cr . authorizedPeers . Load ( server ) ; exists {
2023-08-28 20:17:55 +00:00
cr . handleEvent ( server , state , serverConn )
}
2022-12-03 00:23:11 +00:00
case event . QueueJoinServer :
fallthrough
case event . QueuePeerRequest :
2022-12-06 05:07:09 +00:00
lastSeen , err := time . Parse ( time . RFC3339Nano , e . Data [ event . LastSeen ] )
2022-12-03 00:23:11 +00:00
if err != nil {
lastSeen = event . CwtchEpoch
}
id := ""
if peer , exists := e . Data [ event . RemotePeer ] ; exists {
id = peer
cr . addConnection ( peer , connections . DISCONNECTED , peerConn , lastSeen )
} else if server , exists := e . Data [ event . GroupServer ] ; exists {
id = server
cr . addConnection ( server , connections . DISCONNECTED , serverConn , lastSeen )
}
2023-08-28 20:17:55 +00:00
// this was an authorized event, and so we store this peer.
2023-08-28 20:35:54 +00:00
log . Debugf ( "authorizing id: %v" , id )
2023-08-28 20:17:55 +00:00
cr . authorizedPeers . Store ( id , true )
2022-12-03 00:23:11 +00:00
if c , ok := cr . connections . Load ( id ) ; ok {
contact := c . ( * contact )
2024-01-02 23:17:59 +00:00
if contact . state == connections . DISCONNECTED {
2022-12-07 19:30:11 +00:00
// prioritize connections made in the last week
2022-12-07 20:55:58 +00:00
if time . Since ( contact . lastSeen ) . Hours ( ) < PriorityQueueTimeSinceQualifierHours {
2022-12-07 19:30:11 +00:00
cr . priorityQueue . insert ( contact )
} else {
cr . pendingQueue . insert ( contact )
}
2022-12-03 00:23:11 +00:00
}
}
2022-12-03 23:48:09 +00:00
case event . ProtocolEngineShutdown :
cr . ACNUp = false
cr . protocolEngine = false
2023-09-19 19:21:53 +00:00
cr . stallRetries = true
2022-12-14 00:13:37 +00:00
cr . connections . Range ( func ( k , v interface { } ) bool {
p := v . ( * contact )
if p . state == connections . AUTHENTICATED || p . state == connections . SYNCED {
p . lastSeen = time . Now ( )
}
p . state = connections . DISCONNECTED
p . failedCount = 0
return true
} )
2023-04-27 21:16:24 +00:00
case event . ProtocolEngineCreated :
cr . protocolEngine = true
cr . processStatus ( )
2022-12-14 00:13:37 +00:00
2022-09-10 17:34:36 +00:00
case event . ACNStatus :
2023-04-27 21:16:24 +00:00
progData := e . Data [ event . Progress ]
if prog , err := strconv . Atoi ( progData ) ; err == nil {
cr . acnProgress = prog
cr . processStatus ( )
2022-09-10 17:34:36 +00:00
}
}
case <- time . After ( tickTime ) :
continue
case <- cr . breakChan :
cr . running = false
return
}
}
2019-08-02 01:09:01 +00:00
}
2023-04-27 21:16:24 +00:00
func ( cr * contactRetry ) processStatus ( ) {
if ! cr . protocolEngine {
cr . ACNUp = false
return
}
if cr . acnProgress == 100 && ! cr . ACNUp {
2023-05-16 22:41:08 +00:00
// ACN is up...at this point we need to completely reset our state
// as there is no guarantee that the tor daemon shares our state anymore...
2023-04-27 21:16:24 +00:00
cr . ACNUp = true
cr . ACNUpTime = time . Now ( )
2023-05-16 22:41:08 +00:00
// reset all of the queues...
cr . priorityQueue = newConnectionQueue ( )
cr . pendingQueue = newConnectionQueue ( )
// Loop through connections. Reset state, and requeue...
2023-04-27 21:16:24 +00:00
cr . connections . Range ( func ( k , v interface { } ) bool {
p := v . ( * contact )
2023-09-18 14:46:33 +00:00
// only reload connections if they are on the authorized peers list
if _ , exists := cr . authorizedPeers . Load ( p . id ) ; exists {
p . queued = true
// prioritize connections made recently...
log . Debugf ( "adding %v to queue" , p . id )
if time . Since ( p . lastSeen ) . Hours ( ) < PriorityQueueTimeSinceQualifierHours {
cr . priorityQueue . insert ( p )
} else {
cr . pendingQueue . insert ( p )
}
2023-05-16 22:41:08 +00:00
}
2023-04-27 21:16:24 +00:00
return true
} )
2023-05-16 22:41:08 +00:00
2023-04-27 21:16:24 +00:00
} else if cr . acnProgress != 100 {
cr . ACNUp = false
2023-05-16 22:41:08 +00:00
cr . connections . Range ( func ( k , v interface { } ) bool {
p := v . ( * contact )
p . failedCount = 0
p . queued = false
p . state = connections . DISCONNECTED
return true
} )
2023-04-27 21:16:24 +00:00
}
}
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) requeueReady ( ) {
2022-12-03 23:48:09 +00:00
if ! cr . ACNUp {
2022-12-03 00:23:11 +00:00
return
}
2023-04-17 19:05:02 +00:00
var retryable [ ] * contact
2022-12-03 00:23:11 +00:00
2023-07-07 07:05:45 +00:00
throughPutPerMin := int ( ( float64 ( cr . maxTorCircuitsPending ( ) ) / float64 ( circuitTimeoutSecs ) ) * 60.0 )
queueCount := cr . priorityQueue . len ( ) + cr . pendingQueue . len ( )
// adjustedBaseTimeout = basetimeoust * (queuedItemsCount / throughPutPerMin)
// when less items are queued than through put it'll lower adjustedBaseTimeOut, but that'll be reset in the next block
// when more items are queued it will increase the timeout, to a max of MaxBaseTimeoutSec (enforced in the next block)
adjustedBaseTimeout := circuitTimeoutSecs * ( queueCount / throughPutPerMin )
// circuitTimeoutSecs (120s) < adjustedBaseTimeout < MaxBaseTimeoutSec (300s)
2023-04-17 19:05:02 +00:00
if adjustedBaseTimeout < circuitTimeoutSecs {
adjustedBaseTimeout = circuitTimeoutSecs
2022-12-03 00:23:11 +00:00
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
adjustedBaseTimeout = MaxBaseTimeoutSec
}
2022-09-10 17:34:36 +00:00
cr . connections . Range ( func ( k , v interface { } ) bool {
p := v . ( * contact )
2023-09-18 14:46:33 +00:00
// Don't retry anyone who isn't on the authorized peers list
if _ , exists := cr . authorizedPeers . Load ( p . id ) ; exists {
if p . state == connections . DISCONNECTED && ! p . queued {
timeout := time . Duration ( ( math . Pow ( 2 , float64 ( p . failedCount ) ) ) * float64 ( adjustedBaseTimeout /*baseTimeoutSec*/ ) ) * time . Second
if time . Since ( p . lastAttempt ) > timeout {
retryable = append ( retryable , p )
}
2022-09-10 17:34:36 +00:00
}
}
return true
} )
2022-12-03 00:23:11 +00:00
for _ , contact := range retryable {
2022-12-07 20:55:58 +00:00
if time . Since ( contact . lastSeen ) . Hours ( ) < PriorityQueueTimeSinceQualifierHours {
2022-12-07 19:30:11 +00:00
cr . priorityQueue . insert ( contact )
} else {
cr . pendingQueue . insert ( contact )
}
2022-12-03 00:23:11 +00:00
}
2021-05-07 20:54:12 +00:00
}
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) publishConnectionRequest ( contact * contact ) {
2023-05-16 22:41:08 +00:00
log . Debugf ( "RestartFlow Publish Connection Request listener %v" , contact )
2022-12-03 00:23:11 +00:00
if contact . ctype == peerConn {
2023-05-02 19:26:46 +00:00
cr . bus . Publish ( event . NewEvent ( event . PeerRequest , map [ event . Field ] string { event . RemotePeer : contact . id } ) )
2022-12-03 00:23:11 +00:00
}
if contact . ctype == serverConn {
cr . bus . Publish ( event . NewEvent ( event . RetryServerRequest , map [ event . Field ] string { event . GroupServer : contact . id } ) )
}
contact . state = connections . CONNECTING // Hacky but needed so we don't over flood waiting for PeerStateChange from engine
contact . lastAttempt = time . Now ( )
}
2022-03-22 19:44:18 +00:00
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) addConnection ( id string , state connections . ConnectionState , ctype connectionType , lastSeen time . Time ) {
2022-09-10 17:34:36 +00:00
// don't handle contact retries for ourselves
if id == cr . onion {
return
}
if _ , exists := cr . connections . Load ( id ) ; ! exists {
2022-12-03 00:23:11 +00:00
p := & contact { id : id , state : state , failedCount : 0 , lastAttempt : event . CwtchEpoch , ctype : ctype , lastSeen : lastSeen , queued : false }
2022-09-10 17:34:36 +00:00
cr . connections . Store ( id , p )
2022-12-03 00:23:11 +00:00
return
2023-05-09 17:36:52 +00:00
} else {
2024-01-02 23:17:59 +00:00
// we have rerequested this connnection, probably via an explicit ask, update it's state
if c , ok := cr . connections . Load ( id ) ; ok {
contact := c . ( * contact )
contact . state = state
2023-05-09 17:36:52 +00:00
}
2022-12-03 00:23:11 +00:00
}
}
func ( cr * contactRetry ) handleEvent ( id string , state connections . ConnectionState , ctype connectionType ) {
log . Debugf ( "cr.handleEvent state to %v on id %v" , connections . ConnectionStateName [ state ] , id )
// don't handle contact retries for ourselves
if id == cr . onion {
return
}
2023-08-28 20:17:55 +00:00
// reject events that contain invalid hostnames...we cannot connect to them
// and they could result in spurious connection attempts...
2023-08-28 20:23:25 +00:00
if ! tor . IsValidHostname ( id ) {
2023-08-28 20:17:55 +00:00
return
}
2022-12-03 00:23:11 +00:00
if _ , exists := cr . connections . Load ( id ) ; ! exists {
2023-05-02 19:26:46 +00:00
// We have an event for something we don't know about...
// The only reason this should happen is if a *new* Peer/Server connection has changed.
// Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized.
cr . addConnection ( id , state , ctype , time . Now ( ) )
2022-09-10 17:34:36 +00:00
return
}
pinf , _ := cr . connections . Load ( id )
p := pinf . ( * contact )
2022-12-04 19:06:50 +00:00
log . Debugf ( " managing state change for %v %v to %v by self %v" , id , connections . ConnectionStateName [ p . state ] , connections . ConnectionStateName [ state ] , cr . onion )
2022-09-10 17:34:36 +00:00
if state == connections . DISCONNECTED || state == connections . FAILED || state == connections . KILLED {
2022-12-03 00:23:11 +00:00
if p . state == connections . SYNCED || p . state == connections . AUTHENTICATED {
p . lastSeen = time . Now ( )
} else {
p . failedCount += 1
}
2022-09-10 17:34:36 +00:00
p . state = connections . DISCONNECTED
2022-12-03 00:23:11 +00:00
p . lastAttempt = time . Now ( )
if p . failedCount > maxFailedBackoff {
p . failedCount = maxFailedBackoff
2022-09-10 17:34:36 +00:00
}
} else if state == connections . CONNECTING || state == connections . CONNECTED {
p . state = state
2022-12-03 00:23:11 +00:00
} else if state == connections . AUTHENTICATED || state == connections . SYNCED {
2022-09-10 17:34:36 +00:00
p . state = state
2022-12-03 00:23:11 +00:00
p . lastSeen = time . Now ( )
p . failedCount = 0
2022-09-26 01:28:04 +00:00
}
}
2019-08-02 01:09:01 +00:00
func ( cr * contactRetry ) Shutdown ( ) {
2022-09-10 17:34:36 +00:00
cr . breakChan <- true
2022-12-03 00:23:11 +00:00
cr . queue . Shutdown ( )
2019-08-02 01:09:01 +00:00
}