forked from cwtch.im/cwtch
99 lines
2.2 KiB
Go
99 lines
2.2 KiB
Go
package plugins
|
|
|
|
import (
|
|
"cwtch.im/cwtch/event"
|
|
"cwtch.im/cwtch/protocol/connections"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const tickTime = 10 * time.Second
|
|
const maxBakoff int = 32 // 320 seconds or ~5 min
|
|
|
|
type peer struct {
|
|
id string
|
|
state connections.ConnectionState
|
|
|
|
ticks int
|
|
backoff int
|
|
}
|
|
|
|
type contactRetry struct {
|
|
bus event.Manager
|
|
queue event.Queue
|
|
|
|
breakChan chan bool
|
|
|
|
peers sync.Map //[string]*peer
|
|
}
|
|
|
|
// NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
|
|
func NewContactRetry(bus event.Manager) Plugin {
|
|
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), peers: sync.Map{}}
|
|
return cr
|
|
}
|
|
|
|
func (cr *contactRetry) Start() {
|
|
go cr.run()
|
|
}
|
|
|
|
func (cr *contactRetry) run() {
|
|
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
|
|
for {
|
|
select {
|
|
case e := <-cr.queue.OutChan():
|
|
switch e.EventType {
|
|
case event.PeerStateChange:
|
|
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
|
|
peer := e.Data[event.RemotePeer]
|
|
cr.handleEvent(peer, state)
|
|
}
|
|
|
|
case <-time.After(tickTime):
|
|
cr.peers.Range(func(k, v interface{}) bool {
|
|
p := v.(*peer)
|
|
|
|
if p.state == connections.DISCONNECTED {
|
|
p.ticks++
|
|
if p.ticks == p.backoff {
|
|
p.ticks = 0
|
|
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
|
}
|
|
}
|
|
|
|
return true
|
|
})
|
|
|
|
case <-cr.breakChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState) {
|
|
if _, exists := cr.peers.Load(id); !exists {
|
|
p := &peer{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0}
|
|
cr.peers.Store(id, p)
|
|
return
|
|
}
|
|
|
|
pinf, _ := cr.peers.Load(id)
|
|
p := pinf.(*peer)
|
|
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
|
|
p.state = connections.DISCONNECTED
|
|
if p.backoff < maxBakoff {
|
|
p.backoff *= 2
|
|
}
|
|
p.ticks = 0
|
|
} else if state == connections.CONNECTING || state == connections.CONNECTED {
|
|
p.state = state
|
|
} else if state == connections.AUTHENTICATED {
|
|
p.state = state
|
|
p.backoff = 1
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) Shutdown() {
|
|
cr.breakChan <- true
|
|
}
|