136 lines
3.3 KiB
Go
136 lines
3.3 KiB
Go
package plugins
|
|
|
|
import (
|
|
"cwtch.im/cwtch/event"
|
|
"cwtch.im/cwtch/protocol/connections"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const tickTime = 10 * time.Second
|
|
const maxBakoff int = 32 // 320 seconds or ~5 min
|
|
|
|
type connectionType int
|
|
|
|
const (
|
|
peerConn connectionType = iota
|
|
serverConn
|
|
)
|
|
|
|
type contact struct {
|
|
id string
|
|
state connections.ConnectionState
|
|
ctype connectionType
|
|
|
|
ticks int
|
|
backoff int
|
|
}
|
|
|
|
type contactRetry struct {
|
|
bus event.Manager
|
|
queue event.Queue
|
|
networkUp bool
|
|
|
|
breakChan chan bool
|
|
|
|
connections sync.Map //[string]*contact
|
|
}
|
|
|
|
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
|
|
func NewConnectionRetry(bus event.Manager) Plugin {
|
|
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), connections: sync.Map{}, networkUp: false}
|
|
return cr
|
|
}
|
|
|
|
func (cr *contactRetry) Start() {
|
|
go cr.run()
|
|
}
|
|
|
|
func (cr *contactRetry) run() {
|
|
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
|
|
cr.bus.Subscribe(event.ACNStatus, cr.queue)
|
|
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
|
|
|
for {
|
|
select {
|
|
case e := <-cr.queue.OutChan():
|
|
switch e.EventType {
|
|
case event.PeerStateChange:
|
|
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
|
|
peer := e.Data[event.RemotePeer]
|
|
cr.handleEvent(peer, state, peerConn)
|
|
|
|
case event.ServerStateChange:
|
|
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
|
|
server := e.Data[event.GroupServer]
|
|
cr.handleEvent(server, state, serverConn)
|
|
|
|
case event.ACNStatus:
|
|
prog := e.Data[event.Progreess]
|
|
if prog == "100" && cr.networkUp == false {
|
|
cr.networkUp = true
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
p := v.(*contact)
|
|
p.ticks = 0
|
|
p.backoff = 1
|
|
if p.ctype == peerConn {
|
|
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
|
}
|
|
return true
|
|
})
|
|
} else if prog != "100" {
|
|
cr.networkUp = false
|
|
}
|
|
}
|
|
|
|
case <-time.After(tickTime):
|
|
cr.connections.Range(func(k, v interface{}) bool {
|
|
p := v.(*contact)
|
|
|
|
if p.state == connections.DISCONNECTED {
|
|
p.ticks++
|
|
if p.ticks == p.backoff {
|
|
p.ticks = 0
|
|
if cr.networkUp {
|
|
if p.ctype == peerConn {
|
|
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
|
|
case <-cr.breakChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
|
|
if _, exists := cr.connections.Load(id); !exists {
|
|
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype}
|
|
cr.connections.Store(id, p)
|
|
return
|
|
}
|
|
|
|
pinf, _ := cr.connections.Load(id)
|
|
p := pinf.(*contact)
|
|
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
|
|
p.state = connections.DISCONNECTED
|
|
if p.backoff < maxBakoff {
|
|
p.backoff *= 2
|
|
}
|
|
p.ticks = 0
|
|
} else if state == connections.CONNECTING || state == connections.CONNECTED {
|
|
p.state = state
|
|
} else if state == connections.AUTHENTICATED {
|
|
p.state = state
|
|
p.backoff = 1
|
|
}
|
|
}
|
|
|
|
func (cr *contactRetry) Shutdown() {
|
|
cr.breakChan <- true
|
|
}
|