package plugins import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/connections" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" "math" "strconv" "sync" "time" ) // Todo: Move to protocol/connections // This Plugin is now required and it makes more sense to run more integrated in engine const tickTimeSec = 30 const tickTime = tickTimeSec * time.Second const circuitTimeoutSecs int = 120 const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m const PriorityQueueTimeSinceQualifierHours float64 = 168 type connectionType int const ( peerConn connectionType = iota serverConn ) type contact struct { id string state connections.ConnectionState ctype connectionType lastAttempt time.Time failedCount int lastSeen time.Time queued bool } // compare a to b // returns -1 if a < b // // 0 if a == b // +1 if a > b // // algo: sort by failedCount first favouring less attempts, then sort by lastSeen time favouring more recent connections func (a *contact) compare(b *contact) int { if a.failedCount < b.failedCount { return -1 } else if a.failedCount > b.failedCount { return +1 } if a.lastSeen.After(b.lastSeen) { return -1 } else if a.lastSeen.Before(b.lastSeen) { return +1 } return 0 } type connectionQueue struct { queue []*contact } func newConnectionQueue() *connectionQueue { return &connectionQueue{queue: []*contact{}} } func (cq *connectionQueue) insert(c *contact) { // find loc i := 0 var b *contact for i, b = range cq.queue { if c.compare(b) >= 0 { break } } // insert if len(cq.queue) == i { // nil or empty slice or after last element cq.queue = append(cq.queue, c) } else { cq.queue = append(cq.queue[:i+1], cq.queue[i:]...) // index < len(a) cq.queue[i] = c } c.queued = true } func (cq *connectionQueue) dequeue() *contact { if len(cq.queue) == 0 { return nil } c := cq.queue[0] cq.queue = cq.queue[1:] c.queued = false return c } func (cq *connectionQueue) len() int { return len(cq.queue) } type contactRetry struct { bus event.Manager queue event.Queue ACNUp bool ACNUpTime time.Time protocolEngine bool running bool breakChan chan bool onion string lastCheck time.Time acnProgress int connections sync.Map //[string]*contact pendingQueue *connectionQueue priorityQueue *connectionQueue authorizedPeers sync.Map stallRetries bool } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), authorizedPeers: sync.Map{}, connections: sync.Map{}, stallRetries: true, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue(), priorityQueue: newConnectionQueue()} return cr } // maxTorCircuitsPending a function to throttle access to tor network during start up func (cr *contactRetry) maxTorCircuitsPending() int { timeSinceStart := time.Since(cr.ACNUpTime) if timeSinceStart < 30*time.Second { return 4 } else if timeSinceStart < 4*time.Minute { return 8 } else if timeSinceStart < 8*time.Minute { return 16 } return connections.TorMaxPendingConns } func (cr *contactRetry) connectingCount() int { connecting := 0 cr.connections.Range(func(k, v interface{}) bool { conn := v.(*contact) if conn.state == connections.CONNECTING { connecting++ } return true }) return connecting } func (cr *contactRetry) Start() { if !cr.running { go cr.run() } else { log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) } } func (cr *contactRetry) Id() PluginID { return CONNECTIONRETRY } func (cr *contactRetry) run() { cr.running = true cr.bus.Subscribe(event.PeerStateChange, cr.queue) cr.bus.Subscribe(event.ACNStatus, cr.queue) cr.bus.Subscribe(event.ServerStateChange, cr.queue) cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) cr.bus.Subscribe(event.QueueJoinServer, cr.queue) cr.bus.Subscribe(event.DisconnectPeerRequest, cr.queue) cr.bus.Subscribe(event.DisconnectServerRequest, cr.queue) cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue) cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) cr.bus.Subscribe(event.DeleteContact, cr.queue) cr.bus.Subscribe(event.UpdateConversationAuthorization, cr.queue) cr.bus.Subscribe(event.PurgeRetries, cr.queue) cr.bus.Subscribe(event.ResumeRetries, cr.queue) for { // Only attempt connection if both the ACN and the Protocol Engines are Online... log.Debugf("restartFlow checking state") if cr.ACNUp && cr.protocolEngine && !cr.stallRetries { log.Debugf("restartFlow time to queue!!") cr.requeueReady() connectingCount := cr.connectingCount() // do priority connections first... for connectingCount < cr.maxTorCircuitsPending() && len(cr.priorityQueue.queue) > 0 { contact := cr.priorityQueue.dequeue() if contact == nil { break } // could have received incoming connection while in queue, make sure still disconnected before trying if contact.state == connections.DISCONNECTED { cr.publishConnectionRequest(contact) connectingCount++ } } for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { contact := cr.pendingQueue.dequeue() if contact == nil { break } // could have received incoming connection while in queue, make sure still disconnected before trying if contact.state == connections.DISCONNECTED { cr.publishConnectionRequest(contact) connectingCount++ } } cr.lastCheck = time.Now() } // regardless of if we're up, run manual force deconnectiong of timed out connections cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) if p.state == connections.CONNECTING && time.Since(p.lastAttempt) > time.Duration(circuitTimeoutSecs)*time.Second*2 { // we have been "connecting" for twice the circuttimeout so it's failed, we just didn't learn about it, manually disconnect cr.handleEvent(p.id, connections.DISCONNECTED, p.ctype) log.Errorf("had to manually set peer %v of profile %v to DISCONNECTED due to assumed circuit timeout (%v) seconds", p.id, cr.onion, circuitTimeoutSecs*2) } return true }) select { case e := <-cr.queue.OutChan(): switch e.EventType { case event.PurgeRetries: // Purge All Authorized Peers cr.authorizedPeers.Range(func(key interface{}, value interface{}) bool { cr.authorizedPeers.Delete(key) return true }) // Purge All Connection States cr.connections.Range(func(key interface{}, value interface{}) bool { cr.connections.Delete(key) return true }) case event.ResumeRetries: log.Infof("resuming retries...") cr.stallRetries = false case event.DisconnectPeerRequest: peer := e.Data[event.RemotePeer] cr.authorizedPeers.Delete(peer) case event.DisconnectServerRequest: peer := e.Data[event.GroupServer] cr.authorizedPeers.Delete(peer) case event.DeleteContact: // this case covers both servers and peers (servers are peers, and go through the // same delete conversation flow) peer := e.Data[event.RemotePeer] cr.authorizedPeers.Delete(peer) case event.UpdateConversationAuthorization: // if we update the conversation authorization then we need to check if // we need to remove blocked conversations from the regular flow. peer := e.Data[event.RemotePeer] blocked := e.Data[event.Blocked] if blocked == "true" { cr.authorizedPeers.Delete(peer) } case event.PeerStateChange: state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] peer := e.Data[event.RemotePeer] // only handle state change events from pre-authorized peers; if _, exists := cr.authorizedPeers.Load(peer); exists { cr.handleEvent(peer, state, peerConn) } case event.ServerStateChange: state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] server := e.Data[event.GroupServer] // only handle state change events from pre-authorized servers; if _, exists := cr.authorizedPeers.Load(server); exists { cr.handleEvent(server, state, serverConn) } case event.QueueJoinServer: fallthrough case event.QueuePeerRequest: lastSeen, err := time.Parse(time.RFC3339Nano, e.Data[event.LastSeen]) if err != nil { lastSeen = event.CwtchEpoch } id := "" if peer, exists := e.Data[event.RemotePeer]; exists { id = peer cr.addConnection(peer, connections.DISCONNECTED, peerConn, lastSeen) } else if server, exists := e.Data[event.GroupServer]; exists { id = server cr.addConnection(server, connections.DISCONNECTED, serverConn, lastSeen) } // this was an authorized event, and so we store this peer. log.Debugf("authorizing id: %v", id) cr.authorizedPeers.Store(id, true) if c, ok := cr.connections.Load(id); ok { contact := c.(*contact) if contact.state == connections.DISCONNECTED { // prioritize connections made in the last week if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours { cr.priorityQueue.insert(contact) } else { cr.pendingQueue.insert(contact) } } } case event.ProtocolEngineShutdown: cr.ACNUp = false cr.protocolEngine = false cr.stallRetries = true cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) if p.state == connections.AUTHENTICATED || p.state == connections.SYNCED { p.lastSeen = time.Now() } p.state = connections.DISCONNECTED p.failedCount = 0 return true }) case event.ProtocolEngineCreated: cr.protocolEngine = true cr.processStatus() case event.ACNStatus: progData := e.Data[event.Progress] if prog, err := strconv.Atoi(progData); err == nil { cr.acnProgress = prog cr.processStatus() } } case <-time.After(tickTime): continue case <-cr.breakChan: cr.running = false return } } } func (cr *contactRetry) processStatus() { if !cr.protocolEngine { cr.ACNUp = false return } if cr.acnProgress == 100 && !cr.ACNUp { // ACN is up...at this point we need to completely reset our state // as there is no guarantee that the tor daemon shares our state anymore... cr.ACNUp = true cr.ACNUpTime = time.Now() // reset all of the queues... cr.priorityQueue = newConnectionQueue() cr.pendingQueue = newConnectionQueue() // Loop through connections. Reset state, and requeue... cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) // only reload connections if they are on the authorized peers list if _, exists := cr.authorizedPeers.Load(p.id); exists { p.queued = true // prioritize connections made recently... log.Debugf("adding %v to queue", p.id) if time.Since(p.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours { cr.priorityQueue.insert(p) } else { cr.pendingQueue.insert(p) } } return true }) } else if cr.acnProgress != 100 { cr.ACNUp = false cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) p.failedCount = 0 p.queued = false p.state = connections.DISCONNECTED return true }) } } func (cr *contactRetry) requeueReady() { if !cr.ACNUp { return } var retryable []*contact throughPutPerMin := int((float64(cr.maxTorCircuitsPending()) / float64(circuitTimeoutSecs)) * 60.0) queueCount := cr.priorityQueue.len() + cr.pendingQueue.len() // adjustedBaseTimeout = basetimeoust * (queuedItemsCount / throughPutPerMin) // when less items are queued than through put it'll lower adjustedBaseTimeOut, but that'll be reset in the next block // when more items are queued it will increase the timeout, to a max of MaxBaseTimeoutSec (enforced in the next block) adjustedBaseTimeout := circuitTimeoutSecs * (queueCount / throughPutPerMin) // circuitTimeoutSecs (120s) < adjustedBaseTimeout < MaxBaseTimeoutSec (300s) if adjustedBaseTimeout < circuitTimeoutSecs { adjustedBaseTimeout = circuitTimeoutSecs } else if adjustedBaseTimeout > MaxBaseTimeoutSec { adjustedBaseTimeout = MaxBaseTimeoutSec } cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) // Don't retry anyone who isn't on the authorized peers list if _, exists := cr.authorizedPeers.Load(p.id); exists { if p.state == connections.DISCONNECTED && !p.queued { timeout := time.Duration((math.Pow(2, float64(p.failedCount)))*float64(adjustedBaseTimeout /*baseTimeoutSec*/)) * time.Second if time.Since(p.lastAttempt) > timeout { retryable = append(retryable, p) } } } return true }) for _, contact := range retryable { if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours { cr.priorityQueue.insert(contact) } else { cr.pendingQueue.insert(contact) } } } func (cr *contactRetry) publishConnectionRequest(contact *contact) { log.Debugf("RestartFlow Publish Connection Request listener %v", contact) if contact.ctype == peerConn { cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) } if contact.ctype == serverConn { cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id})) } contact.state = connections.CONNECTING // Hacky but needed so we don't over flood waiting for PeerStateChange from engine contact.lastAttempt = time.Now() } func (cr *contactRetry) addConnection(id string, state connections.ConnectionState, ctype connectionType, lastSeen time.Time) { // don't handle contact retries for ourselves if id == cr.onion { return } if _, exists := cr.connections.Load(id); !exists { p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false} cr.connections.Store(id, p) return } else { // we have rerequested this connnection, probably via an explicit ask, update it's state if c, ok := cr.connections.Load(id); ok { contact := c.(*contact) contact.state = state } } } func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { log.Debugf("cr.handleEvent state to %v on id %v", connections.ConnectionStateName[state], id) // don't handle contact retries for ourselves if id == cr.onion { return } // reject events that contain invalid hostnames...we cannot connect to them // and they could result in spurious connection attempts... if !tor.IsValidHostname(id) { return } if _, exists := cr.connections.Load(id); !exists { // We have an event for something we don't know about... // The only reason this should happen is if a *new* Peer/Server connection has changed. // Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized. cr.addConnection(id, state, ctype, time.Now()) return } pinf, _ := cr.connections.Load(id) p := pinf.(*contact) log.Debugf(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion) if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { if p.state == connections.SYNCED || p.state == connections.AUTHENTICATED { p.lastSeen = time.Now() } else { p.failedCount += 1 } p.state = connections.DISCONNECTED p.lastAttempt = time.Now() if p.failedCount > maxFailedBackoff { p.failedCount = maxFailedBackoff } } else if state == connections.CONNECTING || state == connections.CONNECTED { p.state = state } else if state == connections.AUTHENTICATED || state == connections.SYNCED { p.state = state p.lastSeen = time.Now() p.failedCount = 0 } } func (cr *contactRetry) Shutdown() { cr.breakChan <- true cr.queue.Shutdown() }