2019-08-02 01:09:01 +00:00
package plugins
import (
2022-09-10 17:34:36 +00:00
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
2022-09-26 01:28:04 +00:00
"math"
2022-09-10 17:34:36 +00:00
"sync"
"time"
2019-08-02 01:09:01 +00:00
)
2022-12-03 17:50:31 +00:00
// Todo: Move to protocol/connections
// This Plugin is now required and it makes more sense to run more integrated in engine
2022-12-03 00:23:11 +00:00
const tickTimeSec = 30
2022-09-26 01:28:04 +00:00
const tickTime = tickTimeSec * time . Second
2022-12-03 00:23:11 +00:00
const circutTimeoutSecs int = 120
const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
2019-08-02 01:09:01 +00:00
2019-09-26 23:43:34 +00:00
type connectionType int
const (
2022-09-10 17:34:36 +00:00
peerConn connectionType = iota
serverConn
2019-09-26 23:43:34 +00:00
)
type contact struct {
2022-09-10 17:34:36 +00:00
id string
state connections . ConnectionState
ctype connectionType
2019-08-02 01:09:01 +00:00
2022-12-03 00:23:11 +00:00
lastAttempt time . Time
failedCount int
lastSeen time . Time
queued bool
}
// compare a to b
// returns -1 if a < b
//
// 0 if a == b
// +1 if a > b
//
// algo: sort by failedCount first favouring less attempts, then sort by lastSeen time favouring more recent connections
func ( a * contact ) compare ( b * contact ) int {
if a . failedCount < b . failedCount {
return - 1
} else if a . failedCount > b . failedCount {
return + 1
}
if a . lastSeen . After ( b . lastSeen ) {
return - 1
} else if a . lastSeen . Before ( b . lastSeen ) {
return + 1
}
return 0
}
type connectionQueue struct {
queue [ ] * contact
}
func newConnectionQueue ( ) * connectionQueue {
return & connectionQueue { queue : [ ] * contact { } }
}
func ( cq * connectionQueue ) insert ( c * contact ) {
// find loc
i := 0
var b * contact
for i , b = range cq . queue {
if c . compare ( b ) >= 0 {
break
}
}
// insert
if len ( cq . queue ) == i { // nil or empty slice or after last element
cq . queue = append ( cq . queue , c )
} else {
cq . queue = append ( cq . queue [ : i + 1 ] , cq . queue [ i : ] ... ) // index < len(a)
cq . queue [ i ] = c
}
c . queued = true
}
func ( cq * connectionQueue ) dequeue ( ) * contact {
if len ( cq . queue ) == 0 {
return nil
}
c := cq . queue [ 0 ]
cq . queue = cq . queue [ 1 : ]
c . queued = false
return c
2019-08-02 01:09:01 +00:00
}
type contactRetry struct {
2022-12-03 00:23:11 +00:00
bus event . Manager
queue event . Queue
networkUp bool
networkUpTime time . Time
running bool
breakChan chan bool
onion string
lastCheck time . Time
2022-09-10 17:34:36 +00:00
2022-12-03 00:23:11 +00:00
connections sync . Map //[string]*contact
2022-12-03 17:49:32 +00:00
connCount int
2022-12-03 00:23:11 +00:00
pendingQueue * connectionQueue
2019-08-02 01:09:01 +00:00
}
2022-12-03 00:23:11 +00:00
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
2021-03-19 21:20:22 +00:00
func NewConnectionRetry ( bus event . Manager , onion string ) Plugin {
2022-12-03 00:23:11 +00:00
cr := & contactRetry { bus : bus , queue : event . NewQueue ( ) , breakChan : make ( chan bool , 1 ) , connections : sync . Map { } , connCount : 0 , networkUp : false , networkUpTime : time . Now ( ) , onion : onion , pendingQueue : newConnectionQueue ( ) }
2022-09-10 17:34:36 +00:00
return cr
2019-08-02 01:09:01 +00:00
}
2022-12-03 00:23:11 +00:00
// maxTorCircuitsPending a function to throttle access to tor network during start up
func ( cr * contactRetry ) maxTorCircuitsPending ( ) int {
2022-12-03 17:50:31 +00:00
timeSinceStart := time . Since ( cr . networkUpTime )
2022-12-03 00:23:11 +00:00
if timeSinceStart < 30 * time . Second {
return 4
} else if timeSinceStart < 4 * time . Minute {
return 8
} else if timeSinceStart < 8 * time . Minute {
return 16
}
return connections . TorMaxPendingConns
}
func ( cr * contactRetry ) connectingCount ( ) int {
connecting := 0
cr . connections . Range ( func ( k , v interface { } ) bool {
conn := v . ( * contact )
if conn . state == connections . CONNECTING {
connecting ++
}
return true
} )
return connecting
}
2019-08-02 01:09:01 +00:00
func ( cr * contactRetry ) Start ( ) {
2022-09-10 17:34:36 +00:00
if ! cr . running {
go cr . run ( )
} else {
log . Errorf ( "Attempted to start Contact Retry plugin twice for %v" , cr . onion )
}
2019-08-02 01:09:01 +00:00
}
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) Id ( ) PluginID {
return CONNECTIONRETRY
}
2019-08-02 01:09:01 +00:00
func ( cr * contactRetry ) run ( ) {
2022-09-10 17:34:36 +00:00
cr . running = true
cr . bus . Subscribe ( event . PeerStateChange , cr . queue )
cr . bus . Subscribe ( event . ACNStatus , cr . queue )
cr . bus . Subscribe ( event . ServerStateChange , cr . queue )
2022-12-03 00:23:11 +00:00
cr . bus . Subscribe ( event . PeerRequest , cr . queue )
cr . bus . Subscribe ( event . QueuePeerRequest , cr . queue )
cr . bus . Subscribe ( event . QueueJoinServer , cr . queue )
2022-09-10 17:34:36 +00:00
for {
2022-12-03 00:23:11 +00:00
cr . requeueReady ( )
connectingCount := cr . connectingCount ( )
2022-12-03 17:49:32 +00:00
log . Debugf ( "checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v" , len ( cr . pendingQueue . queue ) , cr . connCount , connectingCount )
2022-12-03 00:23:11 +00:00
for connectingCount < cr . maxTorCircuitsPending ( ) && len ( cr . pendingQueue . queue ) > 0 {
contact := cr . pendingQueue . dequeue ( )
// could have received incoming connection while in queue, make sure still disconnected before trying
if contact . state == connections . DISCONNECTED {
cr . publishConnectionRequest ( contact )
connectingCount ++
}
2022-09-10 17:34:36 +00:00
}
2022-12-03 00:23:11 +00:00
cr . lastCheck = time . Now ( )
2022-09-10 17:34:36 +00:00
select {
case e := <- cr . queue . OutChan ( ) :
switch e . EventType {
case event . PeerStateChange :
state := connections . ConnectionStateToType ( ) [ e . Data [ event . ConnectionState ] ]
peer := e . Data [ event . RemotePeer ]
cr . handleEvent ( peer , state , peerConn )
case event . ServerStateChange :
state := connections . ConnectionStateToType ( ) [ e . Data [ event . ConnectionState ] ]
server := e . Data [ event . GroupServer ]
cr . handleEvent ( server , state , serverConn )
2022-12-03 00:23:11 +00:00
case event . QueueJoinServer :
fallthrough
case event . QueuePeerRequest :
lastSeen , err := time . Parse ( e . Data [ event . LastSeen ] , time . RFC3339Nano )
if err != nil {
lastSeen = event . CwtchEpoch
}
id := ""
if peer , exists := e . Data [ event . RemotePeer ] ; exists {
id = peer
cr . addConnection ( peer , connections . DISCONNECTED , peerConn , lastSeen )
} else if server , exists := e . Data [ event . GroupServer ] ; exists {
id = server
cr . addConnection ( server , connections . DISCONNECTED , serverConn , lastSeen )
}
if c , ok := cr . connections . Load ( id ) ; ok {
contact := c . ( * contact )
if contact . state == connections . DISCONNECTED && ! contact . queued {
cr . pendingQueue . insert ( contact )
}
}
2022-09-10 17:34:36 +00:00
case event . ACNStatus :
prog := e . Data [ event . Progress ]
if prog == "100" && ! cr . networkUp {
cr . networkUp = true
2022-12-03 00:23:11 +00:00
cr . networkUpTime = time . Now ( )
2022-09-10 17:34:36 +00:00
cr . connections . Range ( func ( k , v interface { } ) bool {
p := v . ( * contact )
2022-12-03 00:23:11 +00:00
p . failedCount = 0
2022-09-10 17:34:36 +00:00
return true
} )
} else if prog != "100" {
cr . networkUp = false
}
}
case <- time . After ( tickTime ) :
continue
case <- cr . breakChan :
cr . running = false
return
}
}
2019-08-02 01:09:01 +00:00
}
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) requeueReady ( ) {
if ! cr . networkUp {
return
}
retryable := [ ] * contact { }
throughPutPerMin := cr . maxTorCircuitsPending ( ) / ( circutTimeoutSecs / 60 )
2022-12-03 17:49:32 +00:00
adjustedBaseTimeout := cr . connCount / throughPutPerMin * 60
2022-12-03 00:23:11 +00:00
if adjustedBaseTimeout < circutTimeoutSecs {
adjustedBaseTimeout = circutTimeoutSecs
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
adjustedBaseTimeout = MaxBaseTimeoutSec
}
2022-09-10 17:34:36 +00:00
cr . connections . Range ( func ( k , v interface { } ) bool {
p := v . ( * contact )
2022-12-03 00:23:11 +00:00
if p . state == connections . DISCONNECTED && ! p . queued {
timeout := time . Duration ( ( math . Pow ( 2 , float64 ( p . failedCount ) ) ) * float64 ( adjustedBaseTimeout /*baseTimeoutSec*/ ) ) * time . Second
2022-12-03 17:50:31 +00:00
if time . Since ( p . lastAttempt ) > timeout {
2022-12-03 00:23:11 +00:00
retryable = append ( retryable , p )
2022-09-10 17:34:36 +00:00
}
}
return true
} )
2022-12-03 00:23:11 +00:00
for _ , contact := range retryable {
cr . pendingQueue . insert ( contact )
}
2021-05-07 20:54:12 +00:00
}
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) publishConnectionRequest ( contact * contact ) {
if contact . ctype == peerConn {
cr . bus . Publish ( event . NewEvent ( event . RetryPeerRequest , map [ event . Field ] string { event . RemotePeer : contact . id } ) )
}
if contact . ctype == serverConn {
cr . bus . Publish ( event . NewEvent ( event . RetryServerRequest , map [ event . Field ] string { event . GroupServer : contact . id } ) )
}
contact . state = connections . CONNECTING // Hacky but needed so we don't over flood waiting for PeerStateChange from engine
contact . lastAttempt = time . Now ( )
}
2022-03-22 19:44:18 +00:00
2022-12-03 00:23:11 +00:00
func ( cr * contactRetry ) addConnection ( id string , state connections . ConnectionState , ctype connectionType , lastSeen time . Time ) {
2022-09-10 17:34:36 +00:00
// don't handle contact retries for ourselves
if id == cr . onion {
return
}
if _ , exists := cr . connections . Load ( id ) ; ! exists {
2022-12-03 00:23:11 +00:00
p := & contact { id : id , state : state , failedCount : 0 , lastAttempt : event . CwtchEpoch , ctype : ctype , lastSeen : lastSeen , queued : false }
2022-09-10 17:34:36 +00:00
cr . connections . Store ( id , p )
2022-12-03 17:49:32 +00:00
cr . connCount += 1
2022-12-03 00:23:11 +00:00
return
}
}
func ( cr * contactRetry ) handleEvent ( id string , state connections . ConnectionState , ctype connectionType ) {
log . Debugf ( "cr.handleEvent state to %v on id %v" , connections . ConnectionStateName [ state ] , id )
// don't handle contact retries for ourselves
if id == cr . onion {
return
}
if _ , exists := cr . connections . Load ( id ) ; ! exists {
cr . addConnection ( id , state , ctype , event . CwtchEpoch )
2022-09-10 17:34:36 +00:00
return
}
pinf , _ := cr . connections . Load ( id )
p := pinf . ( * contact )
2022-12-03 00:23:11 +00:00
log . Infof ( " managing state change for %v %v to %v by self %v" , id , connections . ConnectionStateName [ p . state ] , connections . ConnectionStateName [ state ] , cr . onion )
2022-09-10 17:34:36 +00:00
if state == connections . DISCONNECTED || state == connections . FAILED || state == connections . KILLED {
2022-12-03 00:23:11 +00:00
if p . state == connections . SYNCED || p . state == connections . AUTHENTICATED {
p . lastSeen = time . Now ( )
} else {
p . failedCount += 1
}
2022-09-10 17:34:36 +00:00
p . state = connections . DISCONNECTED
2022-12-03 00:23:11 +00:00
p . lastAttempt = time . Now ( )
if p . failedCount > maxFailedBackoff {
p . failedCount = maxFailedBackoff
2022-09-10 17:34:36 +00:00
}
} else if state == connections . CONNECTING || state == connections . CONNECTED {
p . state = state
2022-12-03 00:23:11 +00:00
} else if state == connections . AUTHENTICATED || state == connections . SYNCED {
2022-09-10 17:34:36 +00:00
p . state = state
2022-12-03 00:23:11 +00:00
p . lastSeen = time . Now ( )
p . failedCount = 0
2022-09-26 01:28:04 +00:00
}
}
2019-08-02 01:09:01 +00:00
func ( cr * contactRetry ) Shutdown ( ) {
2022-09-10 17:34:36 +00:00
cr . breakChan <- true
2022-12-03 00:23:11 +00:00
cr . queue . Shutdown ( )
2019-08-02 01:09:01 +00:00
}