contactRetry has protocol engine existence awareness (prep for turning profiles on/off)
continuous-integration/drone/pr Build is passing Details
continuous-integration/drone/push Build is pending Details

This commit is contained in:
Dan Ballard 2022-12-03 15:48:09 -08:00
parent 5658e9aa9f
commit c8a6a1b079
4 changed files with 51 additions and 32 deletions

View File

@ -44,7 +44,7 @@ type Application interface {
QueryACNStatus() QueryACNStatus()
QueryACNVersion() QueryACNVersion()
ActivateEngine(doListn, doPeers, doServers bool) ActivateEngines(doListn, doPeers, doServers bool)
ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool)
DeactivatePeerEngine(onion string) DeactivatePeerEngine(onion string)
@ -253,12 +253,15 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
return false return false
} }
func (app *application) ActivateEngine(doListen, doPeers, doServers bool) { // ActivateEngines launches all peer engines
log.Debugf("ActivateEngine") func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
log.Debugf("ActivateEngines")
for _, profile := range app.peers { for _, profile := range app.peers {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
} }
app.QueryACNStatus()
if doListen { if doListen {
for _, profile := range app.peers { for _, profile := range app.peers {
@ -280,6 +283,8 @@ func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doSe
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()
if doListen { if doListen {
profile.Listen() profile.Listen()
} }

View File

@ -102,14 +102,15 @@ func (cq *connectionQueue) dequeue() *contact {
} }
type contactRetry struct { type contactRetry struct {
bus event.Manager bus event.Manager
queue event.Queue queue event.Queue
networkUp bool ACNUp bool
networkUpTime time.Time ACNUpTime time.Time
running bool protocolEngine bool
breakChan chan bool running bool
onion string breakChan chan bool
lastCheck time.Time onion string
lastCheck time.Time
connections sync.Map //[string]*contact connections sync.Map //[string]*contact
connCount int connCount int
@ -118,13 +119,13 @@ type contactRetry struct {
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin { func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, networkUp: false, networkUpTime: time.Now(), onion: onion, pendingQueue: newConnectionQueue()} cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue()}
return cr return cr
} }
// maxTorCircuitsPending a function to throttle access to tor network during start up // maxTorCircuitsPending a function to throttle access to tor network during start up
func (cr *contactRetry) maxTorCircuitsPending() int { func (cr *contactRetry) maxTorCircuitsPending() int {
timeSinceStart := time.Since(cr.networkUpTime) timeSinceStart := time.Since(cr.ACNUpTime)
if timeSinceStart < 30*time.Second { if timeSinceStart < 30*time.Second {
return 4 return 4
} else if timeSinceStart < 4*time.Minute { } else if timeSinceStart < 4*time.Minute {
@ -167,20 +168,24 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.PeerRequest, cr.queue) cr.bus.Subscribe(event.PeerRequest, cr.queue)
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
cr.bus.Subscribe(event.QueueJoinServer, cr.queue) cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
for { for {
cr.requeueReady() if cr.ACNUp {
connectingCount := cr.connectingCount() cr.requeueReady()
log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount) connectingCount := cr.connectingCount()
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount)
contact := cr.pendingQueue.dequeue() for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
// could have received incoming connection while in queue, make sure still disconnected before trying contact := cr.pendingQueue.dequeue()
if contact.state == connections.DISCONNECTED { // could have received incoming connection while in queue, make sure still disconnected before trying
cr.publishConnectionRequest(contact) if contact.state == connections.DISCONNECTED {
connectingCount++ cr.publishConnectionRequest(contact)
connectingCount++
}
} }
cr.lastCheck = time.Now()
} }
cr.lastCheck = time.Now()
select { select {
case e := <-cr.queue.OutChan(): case e := <-cr.queue.OutChan():
@ -218,19 +223,27 @@ func (cr *contactRetry) run() {
cr.pendingQueue.insert(contact) cr.pendingQueue.insert(contact)
} }
} }
case event.ProtocolEngineCreated:
cr.protocolEngine = true
case event.ProtocolEngineShutdown:
cr.ACNUp = false
cr.protocolEngine = false
case event.ACNStatus: case event.ACNStatus:
prog := e.Data[event.Progress] prog := e.Data[event.Progress]
if prog == "100" && !cr.networkUp { if !cr.protocolEngine {
cr.networkUp = true continue
cr.networkUpTime = time.Now() }
if prog == "100" && !cr.ACNUp {
cr.ACNUp = true
cr.ACNUpTime = time.Now()
cr.connections.Range(func(k, v interface{}) bool { cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact) p := v.(*contact)
p.failedCount = 0 p.failedCount = 0
return true return true
}) })
} else if prog != "100" { } else if prog != "100" {
cr.networkUp = false cr.ACNUp = false
} }
} }
@ -245,7 +258,7 @@ func (cr *contactRetry) run() {
} }
func (cr *contactRetry) requeueReady() { func (cr *contactRetry) requeueReady() {
if !cr.networkUp { if !cr.ACNUp {
return return
} }

View File

@ -53,6 +53,7 @@ const (
// attributes GroupServer - the onion of the server to leave // attributes GroupServer - the onion of the server to leave
LeaveServer = Type("LeaveServer") LeaveServer = Type("LeaveServer")
ProtocolEngineCreated = Type("ProtocolEngineCreated")
ProtocolEngineShutdown = Type("ProtocolEngineShutdown") ProtocolEngineShutdown = Type("ProtocolEngineShutdown")
ProtocolEngineStartListen = Type("ProtocolEngineStartListen") ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
ProtocolEngineStopped = Type("ProtocolEngineStopped") ProtocolEngineStopped = Type("ProtocolEngineStopped")

View File

@ -1080,10 +1080,10 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
log.Infof("StartConnections for %v", cp.GetOnion()) log.Infof("StartConnections for %v", cp.GetOnion())
for _, conversation := range byRecent { for _, conversation := range byRecent {
if conversation.model.IsServer() { if conversation.model.IsServer() {
log.Infof(" QueueJoinServer(%v)", conversation.model.Handle) log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle)
cp.QueueJoinServer(conversation.model.Handle) cp.QueueJoinServer(conversation.model.Handle)
} else { } else {
log.Infof(" QueuePeerWithOnion(%v)", conversation.model.Handle) log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle)
cp.QueuePeeringWithOnion(conversation.model.Handle) cp.QueuePeeringWithOnion(conversation.model.Handle)
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -1099,7 +1099,7 @@ func (cp *cwtchPeer) StartPeersConnections() {
byRecent := cp.getConnectionsSortedByLastSeen(true, false) byRecent := cp.getConnectionsSortedByLastSeen(true, false)
for _, conversation := range byRecent { for _, conversation := range byRecent {
log.Infof(" PeerWithOnion(%v)", conversation.model.Handle) log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle)
cp.QueuePeeringWithOnion(conversation.model.Handle) cp.QueuePeeringWithOnion(conversation.model.Handle)
} }
} }
@ -1113,7 +1113,7 @@ func (cp *cwtchPeer) StartServerConnections() {
for _, conversation := range byRecent { for _, conversation := range byRecent {
if conversation.model.IsServer() { if conversation.model.IsServer() {
log.Infof(" QueueJoinServer(%v)", conversation.model.Handle) log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle)
cp.QueueJoinServer(conversation.model.Handle) cp.QueueJoinServer(conversation.model.Handle)
} }
} }