Adjust contact retry

This commit is contained in:
Sarah Jamie Lewis 2022-09-10 10:33:17 -07:00
parent d455eb6477
commit 27cec93ad7
1 changed files with 120 additions and 120 deletions

View File

@ -1,166 +1,166 @@
package plugins package plugins
import ( import (
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"sync" "sync"
"time" "time"
) )
const tickTime = 5 * time.Second const tickTime = 30 * time.Second
const maxBackoff int = 64 // 320 seconds or ~5 min const maxBackoff int = 10 // 320 seconds or ~5 min
type connectionType int type connectionType int
const ( const (
peerConn connectionType = iota peerConn connectionType = iota
serverConn serverConn
) )
type contact struct { type contact struct {
id string id string
state connections.ConnectionState state connections.ConnectionState
ctype connectionType ctype connectionType
ticks int ticks int
backoff int backoff int
} }
type contactRetry struct { type contactRetry struct {
bus event.Manager bus event.Manager
queue event.Queue queue event.Queue
networkUp bool networkUp bool
running bool running bool
breakChan chan bool breakChan chan bool
onion string onion string
lastCheck time.Time lastCheck time.Time
connections sync.Map //[string]*contact connections sync.Map //[string]*contact
} }
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin { func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion}
return cr return cr
} }
func (cr *contactRetry) Start() { func (cr *contactRetry) Start() {
if !cr.running { if !cr.running {
go cr.run() go cr.run()
} else { } else {
log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion)
} }
} }
func (cr *contactRetry) run() { func (cr *contactRetry) run() {
cr.running = true cr.running = true
cr.bus.Subscribe(event.PeerStateChange, cr.queue) cr.bus.Subscribe(event.PeerStateChange, cr.queue)
cr.bus.Subscribe(event.ACNStatus, cr.queue) cr.bus.Subscribe(event.ACNStatus, cr.queue)
cr.bus.Subscribe(event.ServerStateChange, cr.queue) cr.bus.Subscribe(event.ServerStateChange, cr.queue)
for { for {
if time.Since(cr.lastCheck) > tickTime { if time.Since(cr.lastCheck) > tickTime {
cr.retryDisconnected() cr.retryDisconnected()
cr.lastCheck = time.Now() cr.lastCheck = time.Now()
} }
select { select {
case e := <-cr.queue.OutChan(): case e := <-cr.queue.OutChan():
switch e.EventType { switch e.EventType {
case event.PeerStateChange: case event.PeerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
peer := e.Data[event.RemotePeer] peer := e.Data[event.RemotePeer]
cr.handleEvent(peer, state, peerConn) cr.handleEvent(peer, state, peerConn)
case event.ServerStateChange: case event.ServerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
server := e.Data[event.GroupServer] server := e.Data[event.GroupServer]
cr.handleEvent(server, state, serverConn) cr.handleEvent(server, state, serverConn)
case event.ACNStatus: case event.ACNStatus:
prog := e.Data[event.Progress] prog := e.Data[event.Progress]
if prog == "100" && !cr.networkUp { if prog == "100" && !cr.networkUp {
cr.networkUp = true cr.networkUp = true
cr.connections.Range(func(k, v interface{}) bool { cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact) p := v.(*contact)
p.ticks = 0 p.ticks = 0
p.backoff = 1 p.backoff = 1
if p.ctype == peerConn { if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
} }
if p.ctype == serverConn { if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
} }
return true return true
}) })
} else if prog != "100" { } else if prog != "100" {
cr.networkUp = false cr.networkUp = false
} }
} }
case <-time.After(tickTime): case <-time.After(tickTime):
continue continue
case <-cr.breakChan: case <-cr.breakChan:
cr.running = false cr.running = false
return return
} }
} }
} }
func (cr *contactRetry) retryDisconnected() { func (cr *contactRetry) retryDisconnected() {
cr.connections.Range(func(k, v interface{}) bool { cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact) p := v.(*contact)
if p.state == connections.DISCONNECTED { if p.state == connections.DISCONNECTED {
p.ticks++ p.ticks++
if p.ticks >= p.backoff { if p.ticks >= p.backoff {
p.ticks = 0 p.ticks = 0
if cr.networkUp { if cr.networkUp {
if p.ctype == peerConn { if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
} }
if p.ctype == serverConn { if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
} }
} }
} }
} }
return true return true
}) })
} }
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
// don't handle contact retries for ourselves // don't handle contact retries for ourselves
if id == cr.onion { if id == cr.onion {
return return
} }
if _, exists := cr.connections.Load(id); !exists { if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
cr.connections.Store(id, p) cr.connections.Store(id, p)
return return
} }
pinf, _ := cr.connections.Load(id) pinf, _ := cr.connections.Load(id)
p := pinf.(*contact) p := pinf.(*contact)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED p.state = connections.DISCONNECTED
if p.backoff == 0 { if p.backoff == 0 {
p.backoff = 1 p.backoff = 1
} else if p.backoff < maxBackoff { } else if p.backoff < maxBackoff {
p.backoff *= 2 p.backoff *= 2
} }
p.ticks = 0 p.ticks = 0
} else if state == connections.CONNECTING || state == connections.CONNECTED { } else if state == connections.CONNECTING || state == connections.CONNECTED {
p.state = state p.state = state
} else if state == connections.AUTHENTICATED { } else if state == connections.AUTHENTICATED {
p.state = state p.state = state
p.backoff = 0 p.backoff = 0
} }
} }
func (cr *contactRetry) Shutdown() { func (cr *contactRetry) Shutdown() {
cr.breakChan <- true cr.breakChan <- true
} }