520 lines
16 KiB
Go
520 lines
16 KiB
Go
package plugins
|
|
|
|
import (
|
|
"cwtch.im/cwtch/event"
|
|
"cwtch.im/cwtch/protocol/connections"
|
|
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
|
"git.openprivacy.ca/openprivacy/log"
|
|
"math"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Todo: Move to protocol/connections
|
|
// This Plugin is now required and it makes more sense to run more integrated in engine
|
|
|
|
const tickTimeSec = 30
|
|
const tickTime = tickTimeSec * time.Second
|
|
|
|
const circuitTimeoutSecs int = 120
|
|
|
|
const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
|
|
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
|
|
|
|
const PriorityQueueTimeSinceQualifierHours float64 = 168
|
|
|
|
type connectionType int
|
|
|
|
const (
|
|
peerConn connectionType = iota
|
|
serverConn
|
|
)
|
|
|
|
type contact struct {
|
|
id string
|
|
state connections.ConnectionState
|
|
ctype connectionType
|
|
|
|
lastAttempt time.Time
|
|
failedCount int
|
|
|
|
lastSeen time.Time
|
|
queued bool
|
|
}
|
|
|
|
// compare a to b
|
|
// returns -1 if a < b
|
|
//
|
|
// 0 if a == b
|
|
// +1 if a > b
|
|
//
|
|
// algo: sort by failedCount first favouring less attempts, then sort by lastSeen time favouring more recent connections
|
|
func (a *contact) compare(b *contact) int {
|
|
if a.failedCount < b.failedCount {
|
|
return -1
|
|
} else if a.failedCount > b.failedCount {
|
|
return +1
|
|
}
|
|
|
|
if a.lastSeen.After(b.lastSeen) {
|
|
return -1
|
|
} else if a.lastSeen.Before(b.lastSeen) {
|
|
return +1
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
type connectionQueue struct {
|
|
queue []*contact
|
|
}
|
|
|
|
func newConnectionQueue() *connectionQueue {
|
|
return &connectionQueue{queue: []*contact{}}
|
|
}
|
|
|
|
func (cq *connectionQueue) insert(c *contact) {
|
|
// find loc
|
|
i := 0
|
|
var b *contact
|
|
for i, b = range cq.queue {
|
|
if c.compare(b) >= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
// insert
|
|
if len(cq.queue) == i { // nil or empty slice or after last element
|
|
cq.queue = append(cq.queue, c)
|
|
} else {
|
|
cq.queue = append(cq.queue[:i+1], cq.queue[i:]...) // index < len(a)
|
|
cq.queue[i] = c
|
|
}
|
|
|
|
c.queued = true
|
|
}
|
|
|
|
func (cq *connectionQueue) dequeue() *contact {
|
|
if len(cq.queue) == 0 {
|
|
return nil
|
|
}
|
|
c := cq.queue[0]
|
|
cq.queue = cq.queue[1:]
|
|
c.queued = false
|
|
return c
|
|
}
|
|
|
|
func (cq *connectionQueue) len() int {
|
|
return len(cq.queue)
|
|
}
|
|
|
|
type contactRetry struct {
|
|
bus event.Manager
|
|
queue event.Queue
|
|
ACNUp bool
|
|
ACNUpTime time.Time
|
|
protocolEngine bool
|
|
running bool
|
|
breakChan chan bool
|
|
onion string
|
|
lastCheck time.Time
|
|
acnProgress int
|
|
|
|
connections sync.Map //[string]*contact
|
|
pendingQueue *connectionQueue
|
|
priorityQueue *connectionQueue
|
|
authorizedPeers sync.Map
|
|
stallRetries bool
|
|
}
|
|
|
|
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
|
|
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
|
|
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, stallRetries: true, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()}
|
|
return cr
|
|
}
|
|
|
|
// maxTorCircuitsPending a function to throttle access to tor network during start up
|
|
func (cr *contactRetry) maxTorCircuitsPending() int {
|
|
timeSinceStart := time.Since(cr.ACNUpTime)
|
|
if timeSinceStart < 30*time.Second {
|
|
return 4
|
|
} else if timeSinceStart < 4*time.Minute {
|
|
return 8
|
|
} else if timeSinceStart < 8*time.Minute {
|
|
return 16
|
|
}
|
|
return connections.TorMaxPendingConns
|
|
}
|
|
|
|
func (cr *contactRetry) connectingCount() int {
|
|
connecting := 0
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
conn := v.(*contact)
|
|
if conn.state == connections.CONNECTING {
|
|
connecting++
|
|
}
|
|
return true
|
|
})
|
|
return connecting
|
|
}
|
|
|
|
func (cr *contactRetry) Start() {
|
|
if !cr.running {
|
|
go cr.run()
|
|
} else {
|
|
log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion)
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) Id() PluginID {
|
|
return CONNECTIONRETRY
|
|
}
|
|
|
|
func (cr *contactRetry) run() {
|
|
cr.running = true
|
|
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
|
|
cr.bus.Subscribe(event.ACNStatus, cr.queue)
|
|
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
|
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
|
|
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
|
|
cr.bus.Subscribe(event.DisconnectPeerRequest, cr.queue)
|
|
cr.bus.Subscribe(event.DisconnectServerRequest, cr.queue)
|
|
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
|
|
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
|
|
cr.bus.Subscribe(event.DeleteContact, cr.queue)
|
|
cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue)
|
|
cr.bus.Subscribe(event.PurgeRetries, cr.queue)
|
|
cr.bus.Subscribe(event.ResumeRetries, cr.queue)
|
|
for {
|
|
// Only attempt connection if both the ACN and the Protocol Engines are Online...
|
|
log.Debugf("restartFlow checking state")
|
|
if cr.ACNUp && cr.protocolEngine && !cr.stallRetries {
|
|
log.Debugf("restartFlow time to queue!!")
|
|
cr.requeueReady()
|
|
connectingCount := cr.connectingCount()
|
|
|
|
// do priority connections first...
|
|
for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 {
|
|
contact := cr.priorityQueue.dequeue()
|
|
if contact == nil {
|
|
break
|
|
}
|
|
// could have received incoming connection while in queue, make sure still disconnected before trying
|
|
if contact.state == connections.DISCONNECTED {
|
|
cr.publishConnectionRequest(contact)
|
|
connectingCount++
|
|
}
|
|
}
|
|
|
|
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
|
|
contact := cr.pendingQueue.dequeue()
|
|
if contact == nil {
|
|
break
|
|
}
|
|
// could have received incoming connection while in queue, make sure still disconnected before trying
|
|
if contact.state == connections.DISCONNECTED {
|
|
cr.publishConnectionRequest(contact)
|
|
connectingCount++
|
|
}
|
|
}
|
|
cr.lastCheck = time.Now()
|
|
}
|
|
// regardless of if we're up, run manual force deconnectiong of timed out connections
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
p := v.(*contact)
|
|
if p.state == connections.CONNECTING && time.Since(p.lastAttempt) > time.Duration(circuitTimeoutSecs)*time.Second*2 {
|
|
// we have been "connecting" for twice the circuttimeout so it's failed, we just didn't learn about it, manually disconnect
|
|
cr.handleEvent(p.id, connections.DISCONNECTED, p.ctype)
|
|
log.Errorf("had to manually set peer %v of profile %v to DISCONNECTED due to assumed circuit timeout (%v) seconds", p.id, cr.onion, circuitTimeoutSecs*2)
|
|
}
|
|
return true
|
|
})
|
|
|
|
select {
|
|
case e := <-cr.queue.OutChan():
|
|
switch e.EventType {
|
|
case event.PurgeRetries:
|
|
// Purge All Authorized Peers
|
|
cr.authorizedPeers.Range(func(key interface{}, value interface{}) bool {
|
|
cr.authorizedPeers.Delete(key)
|
|
return true
|
|
})
|
|
// Purge All Connection States
|
|
cr.connections.Range(func(key interface{}, value interface{}) bool {
|
|
cr.connections.Delete(key)
|
|
return true
|
|
})
|
|
case event.ResumeRetries:
|
|
log.Infof("resuming retries...")
|
|
cr.stallRetries = false
|
|
case event.DisconnectPeerRequest:
|
|
peer := e.Data[event.RemotePeer]
|
|
cr.authorizedPeers.Delete(peer)
|
|
case event.DisconnectServerRequest:
|
|
peer := e.Data[event.GroupServer]
|
|
cr.authorizedPeers.Delete(peer)
|
|
case event.DeleteContact:
|
|
// this case covers both servers and peers (servers are peers, and go through the
|
|
// same delete conversation flow)
|
|
peer := e.Data[event.RemotePeer]
|
|
cr.authorizedPeers.Delete(peer)
|
|
case event.UpdateConversationAuthorization:
|
|
// if we update the conversation authorization then we need to check if
|
|
// we need to remove blocked conversations from the regular flow.
|
|
peer := e.Data[event.RemotePeer]
|
|
blocked := e.Data[event.Blocked]
|
|
if blocked == "true" {
|
|
cr.authorizedPeers.Delete(peer)
|
|
}
|
|
case event.PeerStateChange:
|
|
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
|
|
peer := e.Data[event.RemotePeer]
|
|
// only handle state change events from pre-authorized peers;
|
|
if _, exists := cr.authorizedPeers.Load(peer); exists {
|
|
cr.handleEvent(peer, state, peerConn)
|
|
}
|
|
case event.ServerStateChange:
|
|
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
|
|
server := e.Data[event.GroupServer]
|
|
// only handle state change events from pre-authorized servers;
|
|
if _, exists := cr.authorizedPeers.Load(server); exists {
|
|
cr.handleEvent(server, state, serverConn)
|
|
}
|
|
case event.QueueJoinServer:
|
|
fallthrough
|
|
case event.QueuePeerRequest:
|
|
lastSeen, err := time.Parse(time.RFC3339Nano, e.Data[event.LastSeen])
|
|
if err != nil {
|
|
lastSeen = event.CwtchEpoch
|
|
}
|
|
|
|
id := ""
|
|
if peer, exists := e.Data[event.RemotePeer]; exists {
|
|
id = peer
|
|
cr.addConnection(peer, connections.DISCONNECTED, peerConn, lastSeen)
|
|
} else if server, exists := e.Data[event.GroupServer]; exists {
|
|
id = server
|
|
cr.addConnection(server, connections.DISCONNECTED, serverConn, lastSeen)
|
|
}
|
|
// this was an authorized event, and so we store this peer.
|
|
log.Debugf("authorizing id: %v", id)
|
|
cr.authorizedPeers.Store(id, true)
|
|
if c, ok := cr.connections.Load(id); ok {
|
|
contact := c.(*contact)
|
|
if contact.state == connections.DISCONNECTED {
|
|
// prioritize connections made in the last week
|
|
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
|
cr.priorityQueue.insert(contact)
|
|
} else {
|
|
cr.pendingQueue.insert(contact)
|
|
}
|
|
}
|
|
}
|
|
|
|
case event.ProtocolEngineShutdown:
|
|
cr.ACNUp = false
|
|
cr.protocolEngine = false
|
|
cr.stallRetries = true
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
p := v.(*contact)
|
|
if p.state == connections.AUTHENTICATED || p.state == connections.SYNCED {
|
|
p.lastSeen = time.Now()
|
|
}
|
|
p.state = connections.DISCONNECTED
|
|
p.failedCount = 0
|
|
return true
|
|
})
|
|
case event.ProtocolEngineCreated:
|
|
cr.protocolEngine = true
|
|
cr.processStatus()
|
|
|
|
case event.ACNStatus:
|
|
progData := e.Data[event.Progress]
|
|
if prog, err := strconv.Atoi(progData); err == nil {
|
|
cr.acnProgress = prog
|
|
cr.processStatus()
|
|
}
|
|
}
|
|
|
|
case <-time.After(tickTime):
|
|
continue
|
|
|
|
case <-cr.breakChan:
|
|
cr.running = false
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) processStatus() {
|
|
if !cr.protocolEngine {
|
|
cr.ACNUp = false
|
|
return
|
|
}
|
|
if cr.acnProgress == 100 && !cr.ACNUp {
|
|
// ACN is up...at this point we need to completely reset our state
|
|
// as there is no guarantee that the tor daemon shares our state anymore...
|
|
cr.ACNUp = true
|
|
cr.ACNUpTime = time.Now()
|
|
|
|
// reset all of the queues...
|
|
cr.priorityQueue = newConnectionQueue()
|
|
cr.pendingQueue = newConnectionQueue()
|
|
|
|
// Loop through connections. Reset state, and requeue...
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
p := v.(*contact)
|
|
|
|
// only reload connections if they are on the authorized peers list
|
|
if _, exists := cr.authorizedPeers.Load(p.id); exists {
|
|
p.queued = true
|
|
// prioritize connections made recently...
|
|
log.Debugf("adding %v to queue", p.id)
|
|
if time.Since(p.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
|
cr.priorityQueue.insert(p)
|
|
} else {
|
|
cr.pendingQueue.insert(p)
|
|
}
|
|
}
|
|
|
|
return true
|
|
})
|
|
|
|
} else if cr.acnProgress != 100 {
|
|
cr.ACNUp = false
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
p := v.(*contact)
|
|
p.failedCount = 0
|
|
p.queued = false
|
|
p.state = connections.DISCONNECTED
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) requeueReady() {
|
|
if !cr.ACNUp {
|
|
return
|
|
}
|
|
|
|
var retryable []*contact
|
|
|
|
throughPutPerMin := int((float64(cr.maxTorCircuitsPending()) / float64(circuitTimeoutSecs)) * 60.0)
|
|
queueCount := cr.priorityQueue.len() + cr.pendingQueue.len()
|
|
// adjustedBaseTimeout = basetimeoust * (queuedItemsCount / throughPutPerMin)
|
|
// when less items are queued than through put it'll lower adjustedBaseTimeOut, but that'll be reset in the next block
|
|
// when more items are queued it will increase the timeout, to a max of MaxBaseTimeoutSec (enforced in the next block)
|
|
adjustedBaseTimeout := circuitTimeoutSecs * (queueCount / throughPutPerMin)
|
|
|
|
// circuitTimeoutSecs (120s) < adjustedBaseTimeout < MaxBaseTimeoutSec (300s)
|
|
if adjustedBaseTimeout < circuitTimeoutSecs {
|
|
adjustedBaseTimeout = circuitTimeoutSecs
|
|
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
|
|
adjustedBaseTimeout = MaxBaseTimeoutSec
|
|
}
|
|
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
p := v.(*contact)
|
|
|
|
// Don't retry anyone who isn't on the authorized peers list
|
|
if _, exists := cr.authorizedPeers.Load(p.id); exists {
|
|
if p.state == connections.DISCONNECTED && !p.queued {
|
|
timeout := time.Duration((math.Pow(2, float64(p.failedCount)))*float64(adjustedBaseTimeout /*baseTimeoutSec*/)) * time.Second
|
|
if time.Since(p.lastAttempt) > timeout {
|
|
retryable = append(retryable, p)
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
for _, contact := range retryable {
|
|
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
|
cr.priorityQueue.insert(contact)
|
|
} else {
|
|
cr.pendingQueue.insert(contact)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
|
|
log.Debugf("RestartFlow Publish Connection Request listener %v", contact)
|
|
if contact.ctype == peerConn {
|
|
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
|
|
}
|
|
if contact.ctype == serverConn {
|
|
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id}))
|
|
}
|
|
contact.state = connections.CONNECTING // Hacky but needed so we don't over flood waiting for PeerStateChange from engine
|
|
contact.lastAttempt = time.Now()
|
|
}
|
|
|
|
func (cr *contactRetry) addConnection(id string, state connections.ConnectionState, ctype connectionType, lastSeen time.Time) {
|
|
// don't handle contact retries for ourselves
|
|
if id == cr.onion {
|
|
return
|
|
}
|
|
|
|
if _, exists := cr.connections.Load(id); !exists {
|
|
p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false}
|
|
cr.connections.Store(id, p)
|
|
return
|
|
} else {
|
|
// we have rerequested this connnection, probably via an explicit ask, update it's state
|
|
if c, ok := cr.connections.Load(id); ok {
|
|
contact := c.(*contact)
|
|
contact.state = state
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
|
log.Debugf("cr.handleEvent state to %v on id %v", connections.ConnectionStateName[state], id)
|
|
|
|
// don't handle contact retries for ourselves
|
|
if id == cr.onion {
|
|
return
|
|
}
|
|
|
|
// reject events that contain invalid hostnames...we cannot connect to them
|
|
// and they could result in spurious connection attempts...
|
|
if !tor.IsValidHostname(id) {
|
|
return
|
|
}
|
|
|
|
if _, exists := cr.connections.Load(id); !exists {
|
|
// We have an event for something we don't know about...
|
|
// The only reason this should happen is if a *new* Peer/Server connection has changed.
|
|
// Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized.
|
|
cr.addConnection(id, state, ctype, time.Now())
|
|
return
|
|
}
|
|
|
|
pinf, _ := cr.connections.Load(id)
|
|
p := pinf.(*contact)
|
|
log.Debugf(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion)
|
|
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
|
|
if p.state == connections.SYNCED || p.state == connections.AUTHENTICATED {
|
|
p.lastSeen = time.Now()
|
|
} else {
|
|
p.failedCount += 1
|
|
}
|
|
p.state = connections.DISCONNECTED
|
|
p.lastAttempt = time.Now()
|
|
if p.failedCount > maxFailedBackoff {
|
|
p.failedCount = maxFailedBackoff
|
|
}
|
|
} else if state == connections.CONNECTING || state == connections.CONNECTED {
|
|
p.state = state
|
|
} else if state == connections.AUTHENTICATED || state == connections.SYNCED {
|
|
p.state = state
|
|
p.lastSeen = time.Now()
|
|
p.failedCount = 0
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) Shutdown() {
|
|
cr.breakChan <- true
|
|
cr.queue.Shutdown()
|
|
}
|