diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index b4274ab..0cdeaa9 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -1,166 +1,166 @@ package plugins import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/protocol/connections" - "git.openprivacy.ca/openprivacy/log" - "sync" - "time" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/openprivacy/log" + "sync" + "time" ) -const tickTime = 5 * time.Second -const maxBackoff int = 64 // 320 seconds or ~5 min +const tickTime = 30 * time.Second +const maxBackoff int = 10 // 320 seconds or ~5 min type connectionType int const ( - peerConn connectionType = iota - serverConn + peerConn connectionType = iota + serverConn ) type contact struct { - id string - state connections.ConnectionState - ctype connectionType + id string + state connections.ConnectionState + ctype connectionType - ticks int - backoff int + ticks int + backoff int } type contactRetry struct { - bus event.Manager - queue event.Queue - networkUp bool - running bool - breakChan chan bool - onion string - lastCheck time.Time + bus event.Manager + queue event.Queue + networkUp bool + running bool + breakChan chan bool + onion string + lastCheck time.Time - connections sync.Map //[string]*contact + connections sync.Map //[string]*contact } // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} - return cr + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion} + return cr } func (cr *contactRetry) Start() { - if !cr.running { - go cr.run() - } else { - log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) - } + if !cr.running { + go cr.run() + } else { + log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion) + } } func (cr *contactRetry) run() { - cr.running = true - cr.bus.Subscribe(event.PeerStateChange, cr.queue) - cr.bus.Subscribe(event.ACNStatus, cr.queue) - cr.bus.Subscribe(event.ServerStateChange, cr.queue) + cr.running = true + cr.bus.Subscribe(event.PeerStateChange, cr.queue) + cr.bus.Subscribe(event.ACNStatus, cr.queue) + cr.bus.Subscribe(event.ServerStateChange, cr.queue) - for { - if time.Since(cr.lastCheck) > tickTime { - cr.retryDisconnected() - cr.lastCheck = time.Now() - } - select { - case e := <-cr.queue.OutChan(): - switch e.EventType { - case event.PeerStateChange: - state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] - peer := e.Data[event.RemotePeer] - cr.handleEvent(peer, state, peerConn) + for { + if time.Since(cr.lastCheck) > tickTime { + cr.retryDisconnected() + cr.lastCheck = time.Now() + } + select { + case e := <-cr.queue.OutChan(): + switch e.EventType { + case event.PeerStateChange: + state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] + peer := e.Data[event.RemotePeer] + cr.handleEvent(peer, state, peerConn) - case event.ServerStateChange: - state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] - server := e.Data[event.GroupServer] - cr.handleEvent(server, state, serverConn) + case event.ServerStateChange: + state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]] + server := e.Data[event.GroupServer] + cr.handleEvent(server, state, serverConn) - case event.ACNStatus: - prog := e.Data[event.Progress] - if prog == "100" && !cr.networkUp { - cr.networkUp = true - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) - p.ticks = 0 - p.backoff = 1 - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - } - return true - }) - } else if prog != "100" { - cr.networkUp = false - } - } + case event.ACNStatus: + prog := e.Data[event.Progress] + if prog == "100" && !cr.networkUp { + cr.networkUp = true + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + p.ticks = 0 + p.backoff = 1 + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + return true + }) + } else if prog != "100" { + cr.networkUp = false + } + } - case <-time.After(tickTime): - continue + case <-time.After(tickTime): + continue - case <-cr.breakChan: - cr.running = false - return - } - } + case <-cr.breakChan: + cr.running = false + return + } + } } func (cr *contactRetry) retryDisconnected() { - cr.connections.Range(func(k, v interface{}) bool { - p := v.(*contact) + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) - if p.state == connections.DISCONNECTED { - p.ticks++ - if p.ticks >= p.backoff { - p.ticks = 0 - if cr.networkUp { - if p.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) - } - if p.ctype == serverConn { - cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) - } - } - } - } - return true - }) + if p.state == connections.DISCONNECTED { + p.ticks++ + if p.ticks >= p.backoff { + p.ticks = 0 + if cr.networkUp { + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } + } + } + } + return true + }) } func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { - // don't handle contact retries for ourselves - if id == cr.onion { - return - } + // don't handle contact retries for ourselves + if id == cr.onion { + return + } - if _, exists := cr.connections.Load(id); !exists { - p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} - cr.connections.Store(id, p) - return - } + if _, exists := cr.connections.Load(id); !exists { + p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} + cr.connections.Store(id, p) + return + } - pinf, _ := cr.connections.Load(id) - p := pinf.(*contact) - if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { - p.state = connections.DISCONNECTED - if p.backoff == 0 { - p.backoff = 1 - } else if p.backoff < maxBackoff { - p.backoff *= 2 - } - p.ticks = 0 - } else if state == connections.CONNECTING || state == connections.CONNECTED { - p.state = state - } else if state == connections.AUTHENTICATED { - p.state = state - p.backoff = 0 - } + pinf, _ := cr.connections.Load(id) + p := pinf.(*contact) + if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { + p.state = connections.DISCONNECTED + if p.backoff == 0 { + p.backoff = 1 + } else if p.backoff < maxBackoff { + p.backoff *= 2 + } + p.ticks = 0 + } else if state == connections.CONNECTING || state == connections.CONNECTED { + p.state = state + } else if state == connections.AUTHENTICATED { + p.state = state + p.backoff = 0 + } } func (cr *contactRetry) Shutdown() { - cr.breakChan <- true + cr.breakChan <- true }