diff --git a/app/app.go b/app/app.go index 796cc2e..734db56 100644 --- a/app/app.go +++ b/app/app.go @@ -44,7 +44,7 @@ type Application interface { QueryACNStatus() QueryACNVersion() - ActivateEngine(doListn, doPeers, doServers bool) + ActivateEngines(doListn, doPeers, doServers bool) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) DeactivatePeerEngine(onion string) @@ -253,12 +253,15 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { return false } -func (app *application) ActivateEngine(doListen, doPeers, doServers bool) { - log.Debugf("ActivateEngine") +// ActivateEngines launches all peer engines +func (app *application) ActivateEngines(doListen, doPeers, doServers bool) { + log.Debugf("ActivateEngines") for _, profile := range app.peers { app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) } + app.QueryACNStatus() if doListen { for _, profile := range app.peers { @@ -280,6 +283,8 @@ func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doSe profile := app.GetPeer(onion) if profile != nil { app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) + app.QueryACNStatus() if doListen { profile.Listen() } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 5a2d81b..dcfd06d 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -102,14 +102,15 @@ func (cq *connectionQueue) dequeue() *contact { } type contactRetry struct { - bus event.Manager - queue event.Queue - networkUp bool - networkUpTime time.Time - running bool - breakChan chan bool - onion string - lastCheck time.Time + bus event.Manager + queue event.Queue + ACNUp bool + ACNUpTime time.Time + protocolEngine bool + running bool + breakChan chan bool + onion string + lastCheck time.Time connections sync.Map //[string]*contact connCount int @@ -118,13 +119,13 @@ type contactRetry struct { // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing func NewConnectionRetry(bus event.Manager, onion string) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, networkUp: false, networkUpTime: time.Now(), onion: onion, pendingQueue: newConnectionQueue()} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, ACNUp: false, ACNUpTime: time.Now(), protocolEngine: false, onion: onion, pendingQueue: newConnectionQueue()} return cr } // maxTorCircuitsPending a function to throttle access to tor network during start up func (cr *contactRetry) maxTorCircuitsPending() int { - timeSinceStart := time.Since(cr.networkUpTime) + timeSinceStart := time.Since(cr.ACNUpTime) if timeSinceStart < 30*time.Second { return 4 } else if timeSinceStart < 4*time.Minute { @@ -167,20 +168,24 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.PeerRequest, cr.queue) cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) cr.bus.Subscribe(event.QueueJoinServer, cr.queue) + cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue) + cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue) for { - cr.requeueReady() - connectingCount := cr.connectingCount() - log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount) - for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { - contact := cr.pendingQueue.dequeue() - // could have received incoming connection while in queue, make sure still disconnected before trying - if contact.state == connections.DISCONNECTED { - cr.publishConnectionRequest(contact) - connectingCount++ + if cr.ACNUp { + cr.requeueReady() + connectingCount := cr.connectingCount() + log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), cr.connCount, connectingCount) + for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 { + contact := cr.pendingQueue.dequeue() + // could have received incoming connection while in queue, make sure still disconnected before trying + if contact.state == connections.DISCONNECTED { + cr.publishConnectionRequest(contact) + connectingCount++ + } } + cr.lastCheck = time.Now() } - cr.lastCheck = time.Now() select { case e := <-cr.queue.OutChan(): @@ -218,19 +223,27 @@ func (cr *contactRetry) run() { cr.pendingQueue.insert(contact) } } + case event.ProtocolEngineCreated: + cr.protocolEngine = true + case event.ProtocolEngineShutdown: + cr.ACNUp = false + cr.protocolEngine = false case event.ACNStatus: prog := e.Data[event.Progress] - if prog == "100" && !cr.networkUp { - cr.networkUp = true - cr.networkUpTime = time.Now() + if !cr.protocolEngine { + continue + } + if prog == "100" && !cr.ACNUp { + cr.ACNUp = true + cr.ACNUpTime = time.Now() cr.connections.Range(func(k, v interface{}) bool { p := v.(*contact) p.failedCount = 0 return true }) } else if prog != "100" { - cr.networkUp = false + cr.ACNUp = false } } @@ -245,7 +258,7 @@ func (cr *contactRetry) run() { } func (cr *contactRetry) requeueReady() { - if !cr.networkUp { + if !cr.ACNUp { return } diff --git a/event/common.go b/event/common.go index 51bc7fd..abb6976 100644 --- a/event/common.go +++ b/event/common.go @@ -53,6 +53,7 @@ const ( // attributes GroupServer - the onion of the server to leave LeaveServer = Type("LeaveServer") + ProtocolEngineCreated = Type("ProtocolEngineCreated") ProtocolEngineShutdown = Type("ProtocolEngineShutdown") ProtocolEngineStartListen = Type("ProtocolEngineStartListen") ProtocolEngineStopped = Type("ProtocolEngineStopped") diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 26c5b3f..9bb3dc8 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1080,10 +1080,10 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) { log.Infof("StartConnections for %v", cp.GetOnion()) for _, conversation := range byRecent { if conversation.model.IsServer() { - log.Infof(" QueueJoinServer(%v)", conversation.model.Handle) + log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle) cp.QueueJoinServer(conversation.model.Handle) } else { - log.Infof(" QueuePeerWithOnion(%v)", conversation.model.Handle) + log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle) cp.QueuePeeringWithOnion(conversation.model.Handle) } time.Sleep(50 * time.Millisecond) @@ -1099,7 +1099,7 @@ func (cp *cwtchPeer) StartPeersConnections() { byRecent := cp.getConnectionsSortedByLastSeen(true, false) for _, conversation := range byRecent { - log.Infof(" PeerWithOnion(%v)", conversation.model.Handle) + log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle) cp.QueuePeeringWithOnion(conversation.model.Handle) } } @@ -1113,7 +1113,7 @@ func (cp *cwtchPeer) StartServerConnections() { for _, conversation := range byRecent { if conversation.model.IsServer() { - log.Infof(" QueueJoinServer(%v)", conversation.model.Handle) + log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle) cp.QueueJoinServer(conversation.model.Handle) } }