This commit is contained in:
Sarah Jamie Lewis 2022-09-10 10:34:36 -07:00
parent 27cec93ad7
commit 0f4c6de2e6
1 changed files with 118 additions and 118 deletions

View File

@ -1,11 +1,11 @@
package plugins
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
"sync"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
"sync"
"time"
)
const tickTime = 30 * time.Second
@ -14,153 +14,153 @@ const maxBackoff int = 10 // 320 seconds or ~5 min
type connectionType int
const (
peerConn connectionType = iota
serverConn
peerConn connectionType = iota
serverConn
)
type contact struct {
id string
state connections.ConnectionState
ctype connectionType
id string
state connections.ConnectionState
ctype connectionType
ticks int
backoff int
ticks int
backoff int
}
type contactRetry struct {
bus event.Manager
queue event.Queue
networkUp bool
running bool
breakChan chan bool
onion string
lastCheck time.Time
bus event.Manager
queue event.Queue
networkUp bool
running bool
breakChan chan bool
onion string
lastCheck time.Time
connections sync.Map //[string]*contact
connections sync.Map //[string]*contact
}
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion}
return cr
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion}
return cr
}
func (cr *contactRetry) Start() {
if !cr.running {
go cr.run()
} else {
log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion)
}
if !cr.running {
go cr.run()
} else {
log.Errorf("Attempted to start Contact Retry plugin twice for %v", cr.onion)
}
}
func (cr *contactRetry) run() {
cr.running = true
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
cr.bus.Subscribe(event.ACNStatus, cr.queue)
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
cr.running = true
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
cr.bus.Subscribe(event.ACNStatus, cr.queue)
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
for {
if time.Since(cr.lastCheck) > tickTime {
cr.retryDisconnected()
cr.lastCheck = time.Now()
}
select {
case e := <-cr.queue.OutChan():
switch e.EventType {
case event.PeerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
peer := e.Data[event.RemotePeer]
cr.handleEvent(peer, state, peerConn)
for {
if time.Since(cr.lastCheck) > tickTime {
cr.retryDisconnected()
cr.lastCheck = time.Now()
}
select {
case e := <-cr.queue.OutChan():
switch e.EventType {
case event.PeerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
peer := e.Data[event.RemotePeer]
cr.handleEvent(peer, state, peerConn)
case event.ServerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
server := e.Data[event.GroupServer]
cr.handleEvent(server, state, serverConn)
case event.ServerStateChange:
state := connections.ConnectionStateToType()[e.Data[event.ConnectionState]]
server := e.Data[event.GroupServer]
cr.handleEvent(server, state, serverConn)
case event.ACNStatus:
prog := e.Data[event.Progress]
if prog == "100" && !cr.networkUp {
cr.networkUp = true
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.ticks = 0
p.backoff = 1
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}
return true
})
} else if prog != "100" {
cr.networkUp = false
}
}
case event.ACNStatus:
prog := e.Data[event.Progress]
if prog == "100" && !cr.networkUp {
cr.networkUp = true
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.ticks = 0
p.backoff = 1
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}
return true
})
} else if prog != "100" {
cr.networkUp = false
}
}
case <-time.After(tickTime):
continue
case <-time.After(tickTime):
continue
case <-cr.breakChan:
cr.running = false
return
}
}
case <-cr.breakChan:
cr.running = false
return
}
}
}
func (cr *contactRetry) retryDisconnected() {
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks >= p.backoff {
p.ticks = 0
if cr.networkUp {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}
}
}
}
return true
})
if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks >= p.backoff {
p.ticks = 0
if cr.networkUp {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}
}
}
}
return true
})
}
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
// don't handle contact retries for ourselves
if id == cr.onion {
return
}
// don't handle contact retries for ourselves
if id == cr.onion {
return
}
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
cr.connections.Store(id, p)
return
}
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
cr.connections.Store(id, p)
return
}
pinf, _ := cr.connections.Load(id)
p := pinf.(*contact)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED
if p.backoff == 0 {
p.backoff = 1
} else if p.backoff < maxBackoff {
p.backoff *= 2
}
p.ticks = 0
} else if state == connections.CONNECTING || state == connections.CONNECTED {
p.state = state
} else if state == connections.AUTHENTICATED {
p.state = state
p.backoff = 0
}
pinf, _ := cr.connections.Load(id)
p := pinf.(*contact)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED
if p.backoff == 0 {
p.backoff = 1
} else if p.backoff < maxBackoff {
p.backoff *= 2
}
p.ticks = 0
} else if state == connections.CONNECTING || state == connections.CONNECTED {
p.state = state
} else if state == connections.AUTHENTICATED {
p.state = state
p.backoff = 0
}
}
func (cr *contactRetry) Shutdown() {
cr.breakChan <- true
cr.breakChan <- true
}