diff --git a/app/app.go b/app/app.go index 62da8bd..d18c076 100644 --- a/app/app.go +++ b/app/app.go @@ -44,6 +44,7 @@ type Application interface { GetPrimaryBus() event.Manager GetEventBus(onion string) event.Manager + QueryACNStatus() ShutdownPeer(string) Shutdown() @@ -67,11 +68,7 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { app := &application{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} app.appletPeers.init() - fn := func(progress int, status string) { - progStr := strconv.Itoa(progress) - app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) - } - app.appletACN.init(acn, fn) + app.appletACN.init(acn, app.getACNStatusHandler()) return app } @@ -196,6 +193,21 @@ func (ac *applicationCore) GetEventBus(onion string) event.Manager { return nil } +func (app *application) getACNStatusHandler() func(int, string) { + return func(progress int, status string) { + progStr := strconv.Itoa(progress) + app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) + for _, bus := range app.eventBuses { + bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) + } + } +} + +func (app *application) QueryACNStatus() { + prog, status := app.acn.GetBootstrapStatus() + app.getACNStatusHandler()(prog, status) +} + // ShutdownPeer shuts down a peer and removes it from the app's management func (app *application) ShutdownPeer(onion string) { app.mutex.Lock() diff --git a/app/appClient.go b/app/appClient.go index a54325a..5664880 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -107,6 +107,11 @@ func (ac *applicationClient) LoadProfiles(password string) { ac.bridge.Write(&message) } +func (ac *applicationClient) QueryACNStatus() { + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.GetACNStatus, map[event.Field]string{})} + ac.bridge.Write(&message) +} + // ShutdownPeer shuts down a peer and removes it from the app's management func (ac *applicationClient) ShutdownPeer(onion string) { ac.mutex.Lock() diff --git a/app/appService.go b/app/appService.go index 6e7d719..08d1f15 100644 --- a/app/appService.go +++ b/app/appService.go @@ -31,11 +31,7 @@ type ApplicationService interface { func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService { appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}} - fn := func(progress int, status string) { - progStr := strconv.Itoa(progress) - appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)}) - } - appService.appletACN.init(acn, fn) + appService.appletACN.init(acn, appService.getACNStatusHandler()) appService.handle = appService.handleEvent go appService.listen() @@ -74,6 +70,9 @@ func (as *applicationService) handleEvent(ev *event.Event) { message := event.IPCMessage{Dest: onion, Message: *ev} as.bridge.Write(&message) } + case event.GetACNStatus: + prog, status := as.acn.GetBootstrapStatus() + as.getACNStatusHandler()(prog, status) case event.ShutdownPeer: onion := ev.Data[event.Identity] as.ShutdownPeer(onion) @@ -127,6 +126,16 @@ func (as *applicationService) loadProfiles(password string) { } } +func (as *applicationService) getACNStatusHandler() func(int, string) { + return func(progress int, status string) { + progStr := strconv.Itoa(progress) + as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)}) + for _, bus := range as.eventBuses { + bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) + } + } +} + func (as *applicationService) ShutdownPeer(onion string) { as.engines[onion].Shutdown() delete(as.engines, onion) diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 923c6a3..f5e879d 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -10,26 +10,35 @@ import ( const tickTime = 10 * time.Second const maxBakoff int = 32 // 320 seconds or ~5 min -type peer struct { +type connectionType int + +const ( + peerConn connectionType = iota + serverConn +) + +type contact struct { id string state connections.ConnectionState + ctype connectionType ticks int backoff int } type contactRetry struct { - bus event.Manager - queue event.Queue + bus event.Manager + queue event.Queue + networkUp bool breakChan chan bool - peers sync.Map //[string]*peer + connections sync.Map //[string]*contact } -// NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing -func NewContactRetry(bus event.Manager) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), peers: sync.Map{}} +// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing +func NewConnectionRetry(bus event.Manager) Plugin { + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), connections: sync.Map{}, networkUp: false} return cr } @@ -39,6 +48,9 @@ func (cr *contactRetry) Start() { func (cr *contactRetry) run() { cr.bus.Subscribe(event.PeerStateChange, cr.queue) + cr.bus.Subscribe(event.ACNStatus, cr.queue) + cr.bus.Subscribe(event.ServerStateChange, cr.queue) + for { select { case e := <-cr.queue.OutChan(): @@ -46,21 +58,46 @@ func (cr *contactRetry) run() { case event.PeerStateChange: state := connections.ConnectionStateToType[e.Data[event.ConnectionState]] peer := e.Data[event.RemotePeer] - cr.handleEvent(peer, state) + cr.handleEvent(peer, state, peerConn) + + case event.ServerStateChange: + state := connections.ConnectionStateToType[e.Data[event.ConnectionState]] + server := e.Data[event.GroupServer] + cr.handleEvent(server, state, serverConn) + + case event.ACNStatus: + prog := e.Data[event.Progreess] + if prog == "100" && cr.networkUp == false { + cr.networkUp = true + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) + p.ticks = 0 + p.backoff = 1 + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + return true + }) + } else if prog != "100" { + cr.networkUp = false + } } case <-time.After(tickTime): - cr.peers.Range(func(k, v interface{}) bool { - p := v.(*peer) + cr.connections.Range(func(k, v interface{}) bool { + p := v.(*contact) if p.state == connections.DISCONNECTED { p.ticks++ if p.ticks == p.backoff { p.ticks = 0 - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + if cr.networkUp { + if p.ctype == peerConn { + cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } else { + cr.bus.Publish(event.NewEventList(event.JoinServer, event.GroupServer, p.id)) + } + } } } - return true }) @@ -70,15 +107,15 @@ func (cr *contactRetry) run() { } } -func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState) { - if _, exists := cr.peers.Load(id); !exists { - p := &peer{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0} - cr.peers.Store(id, p) +func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { + if _, exists := cr.connections.Load(id); !exists { + p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype} + cr.connections.Store(id, p) return } - pinf, _ := cr.peers.Load(id) - p := pinf.(*peer) + pinf, _ := cr.connections.Load(id) + p := pinf.(*contact) if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { p.state = connections.DISCONNECTED if p.backoff < maxBakoff { diff --git a/app/plugins/plugin.go b/app/plugins/plugin.go index 6ef0a5e..9614394 100644 --- a/app/plugins/plugin.go +++ b/app/plugins/plugin.go @@ -9,7 +9,7 @@ type PluginID int // These are the plugin IDs for the supplied plugins const ( - CONTACTRETRY PluginID = iota + CONNECTIONRETRY PluginID = iota ) // Plugin is the interface for a plugin @@ -21,8 +21,8 @@ type Plugin interface { // Get is a plugin factory for the requested plugin func Get(id PluginID, bus event.Manager) Plugin { switch id { - case CONTACTRETRY: - return NewContactRetry(bus) + case CONNECTIONRETRY: + return NewConnectionRetry(bus) } return nil diff --git a/event/common.go b/event/common.go index 103968c..dceec48 100644 --- a/event/common.go +++ b/event/common.go @@ -29,6 +29,7 @@ const ( BlockUnknownPeers = Type("BlockUnknownPeers") AllowUnknownPeers = Type("AllowUnknownPeers") + // GroupServer JoinServer = Type("JoinServer") ProtocolEngineStartListen = Type("ProtocolEngineStartListen") @@ -160,6 +161,8 @@ const ( // Error(err) AppError = Type("AppError") + GetACNStatus = Type("GetACNStatus") + // Progress, Status ACNStatus = Type("ACNStatus") ) diff --git a/model/profile_test.go b/model/profile_test.go index f73488f..238e228 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -4,7 +4,6 @@ import ( "testing" ) - func TestProfileIdentity(t *testing.T) { sarah := GenerateNewProfile("Sarah") alice := GenerateNewProfile("Alice") diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index b21d159..1b917f1 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -6,14 +6,12 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" "sync" - "time" ) // Manager encapsulates all the logic necessary to manage outgoing peer and server connections. type Manager struct { serverConnections map[string]*PeerServerConnection lock sync.Mutex - breakChannel chan bool acn connectivity.ACN } @@ -22,7 +20,6 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager { m := new(Manager) m.acn = acn m.serverConnections = make(map[string]*PeerServerConnection) - m.breakChannel = make(chan bool) return m } @@ -66,36 +63,8 @@ func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerC return } -// AttemptReconnections repeatedly attempts to reconnect with failed peers and servers. -func (m *Manager) AttemptReconnections() { - maxTimeout := time.Minute * 5 - // nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that - timeout := time.Millisecond * 500 - for { - select { - case <-time.After(timeout): - m.lock.Lock() - for _, psc := range m.serverConnections { - if psc.GetState() == FAILED { - go psc.Run() - } - } - m.lock.Unlock() - - if timeout < maxTimeout { - timeout = timeout * 2 - } else { - timeout = maxTimeout - } - case <-m.breakChannel: - return - } - } -} - // Shutdown closes all connections under management (freeing their goroutines) func (m *Manager) Shutdown() { - m.breakChannel <- true m.lock.Lock() for onion, psc := range m.serverConnections { psc.Close() diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index d7ff0b3..6eb71d3 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -63,7 +63,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.acn = acn engine.connectionsManager = NewConnectionsManager(engine.acn) - go engine.connectionsManager.AttemptReconnections() // Init the Server running the Simple App. engine.service = new(tor.BaseOnionService)