make contact retry plugin acn connection state aware; make contact retry plugin do groups; remove connectionManager bad retry logic; allow querringing of ACN status
the build was successful Details

This commit is contained in:
Dan Ballard 2019-09-26 16:43:34 -07:00
vanhempi 15582c7e79
commit df420034ea
9 muutettua tiedostoa jossa 97 lisäystä ja 64 poistoa

Näytä tiedosto

@ -44,6 +44,7 @@ type Application interface {
GetPrimaryBus() event.Manager
GetEventBus(onion string) event.Manager
QueryACNStatus()
ShutdownPeer(string)
Shutdown()
@ -67,11 +68,7 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
app := &application{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
app.appletPeers.init()
fn := func(progress int, status string) {
progStr := strconv.Itoa(progress)
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
}
app.appletACN.init(acn, fn)
app.appletACN.init(acn, app.getACNStatusHandler())
return app
}
@ -196,6 +193,21 @@ func (ac *applicationCore) GetEventBus(onion string) event.Manager {
return nil
}
func (app *application) getACNStatusHandler() func(int, string) {
return func(progress int, status string) {
progStr := strconv.Itoa(progress)
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
for _, bus := range app.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
}
}
}
func (app *application) QueryACNStatus() {
prog, status := app.acn.GetBootstrapStatus()
app.getACNStatusHandler()(prog, status)
}
// ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) {
app.mutex.Lock()

Näytä tiedosto

@ -107,6 +107,11 @@ func (ac *applicationClient) LoadProfiles(password string) {
ac.bridge.Write(&message)
}
func (ac *applicationClient) QueryACNStatus() {
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.GetACNStatus, map[event.Field]string{})}
ac.bridge.Write(&message)
}
// ShutdownPeer shuts down a peer and removes it from the app's management
func (ac *applicationClient) ShutdownPeer(onion string) {
ac.mutex.Lock()

Näytä tiedosto

@ -31,11 +31,7 @@ type ApplicationService interface {
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}
fn := func(progress int, status string) {
progStr := strconv.Itoa(progress)
appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
}
appService.appletACN.init(acn, fn)
appService.appletACN.init(acn, appService.getACNStatusHandler())
appService.handle = appService.handleEvent
go appService.listen()
@ -74,6 +70,9 @@ func (as *applicationService) handleEvent(ev *event.Event) {
message := event.IPCMessage{Dest: onion, Message: *ev}
as.bridge.Write(&message)
}
case event.GetACNStatus:
prog, status := as.acn.GetBootstrapStatus()
as.getACNStatusHandler()(prog, status)
case event.ShutdownPeer:
onion := ev.Data[event.Identity]
as.ShutdownPeer(onion)
@ -127,6 +126,16 @@ func (as *applicationService) loadProfiles(password string) {
}
}
func (as *applicationService) getACNStatusHandler() func(int, string) {
return func(progress int, status string) {
progStr := strconv.Itoa(progress)
as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
for _, bus := range as.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
}
}
}
func (as *applicationService) ShutdownPeer(onion string) {
as.engines[onion].Shutdown()
delete(as.engines, onion)

Näytä tiedosto

@ -10,26 +10,35 @@ import (
const tickTime = 10 * time.Second
const maxBakoff int = 32 // 320 seconds or ~5 min
type peer struct {
type connectionType int
const (
peerConn connectionType = iota
serverConn
)
type contact struct {
id string
state connections.ConnectionState
ctype connectionType
ticks int
backoff int
}
type contactRetry struct {
bus event.Manager
queue event.Queue
bus event.Manager
queue event.Queue
networkUp bool
breakChan chan bool
peers sync.Map //[string]*peer
connections sync.Map //[string]*contact
}
// NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewContactRetry(bus event.Manager) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), peers: sync.Map{}}
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewConnectionRetry(bus event.Manager) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), connections: sync.Map{}, networkUp: false}
return cr
}
@ -39,6 +48,9 @@ func (cr *contactRetry) Start() {
func (cr *contactRetry) run() {
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
cr.bus.Subscribe(event.ACNStatus, cr.queue)
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
for {
select {
case e := <-cr.queue.OutChan():
@ -46,21 +58,46 @@ func (cr *contactRetry) run() {
case event.PeerStateChange:
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
peer := e.Data[event.RemotePeer]
cr.handleEvent(peer, state)
cr.handleEvent(peer, state, peerConn)
case event.ServerStateChange:
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
server := e.Data[event.GroupServer]
cr.handleEvent(server, state, serverConn)
case event.ACNStatus:
prog := e.Data[event.Progreess]
if prog == "100" && cr.networkUp == false {
cr.networkUp = true
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.ticks = 0
p.backoff = 1
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
return true
})
} else if prog != "100" {
cr.networkUp = false
}
}
case <-time.After(tickTime):
cr.peers.Range(func(k, v interface{}) bool {
p := v.(*peer)
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks == p.backoff {
p.ticks = 0
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
if cr.networkUp {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
} else {
cr.bus.Publish(event.NewEventList(event.JoinServer, event.GroupServer, p.id))
}
}
}
}
return true
})
@ -70,15 +107,15 @@ func (cr *contactRetry) run() {
}
}
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState) {
if _, exists := cr.peers.Load(id); !exists {
p := &peer{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0}
cr.peers.Store(id, p)
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype}
cr.connections.Store(id, p)
return
}
pinf, _ := cr.peers.Load(id)
p := pinf.(*peer)
pinf, _ := cr.connections.Load(id)
p := pinf.(*contact)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED
if p.backoff < maxBakoff {

Näytä tiedosto

@ -9,7 +9,7 @@ type PluginID int
// These are the plugin IDs for the supplied plugins
const (
CONTACTRETRY PluginID = iota
CONNECTIONRETRY PluginID = iota
)
// Plugin is the interface for a plugin
@ -21,8 +21,8 @@ type Plugin interface {
// Get is a plugin factory for the requested plugin
func Get(id PluginID, bus event.Manager) Plugin {
switch id {
case CONTACTRETRY:
return NewContactRetry(bus)
case CONNECTIONRETRY:
return NewConnectionRetry(bus)
}
return nil

Näytä tiedosto

@ -29,6 +29,7 @@ const (
BlockUnknownPeers = Type("BlockUnknownPeers")
AllowUnknownPeers = Type("AllowUnknownPeers")
// GroupServer
JoinServer = Type("JoinServer")
ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
@ -160,6 +161,8 @@ const (
// Error(err)
AppError = Type("AppError")
GetACNStatus = Type("GetACNStatus")
// Progress, Status
ACNStatus = Type("ACNStatus")
)

Näytä tiedosto

@ -4,7 +4,6 @@ import (
"testing"
)
func TestProfileIdentity(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")

Näytä tiedosto

@ -6,14 +6,12 @@ import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"sync"
"time"
)
// Manager encapsulates all the logic necessary to manage outgoing peer and server connections.
type Manager struct {
serverConnections map[string]*PeerServerConnection
lock sync.Mutex
breakChannel chan bool
acn connectivity.ACN
}
@ -22,7 +20,6 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager {
m := new(Manager)
m.acn = acn
m.serverConnections = make(map[string]*PeerServerConnection)
m.breakChannel = make(chan bool)
return m
}
@ -66,36 +63,8 @@ func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerC
return
}
// AttemptReconnections repeatedly attempts to reconnect with failed peers and servers.
func (m *Manager) AttemptReconnections() {
maxTimeout := time.Minute * 5
// nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that
timeout := time.Millisecond * 500
for {
select {
case <-time.After(timeout):
m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
}
}
m.lock.Unlock()
if timeout < maxTimeout {
timeout = timeout * 2
} else {
timeout = maxTimeout
}
case <-m.breakChannel:
return
}
}
}
// Shutdown closes all connections under management (freeing their goroutines)
func (m *Manager) Shutdown() {
m.breakChannel <- true
m.lock.Lock()
for onion, psc := range m.serverConnections {
psc.Close()

Näytä tiedosto

@ -63,7 +63,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.acn = acn
engine.connectionsManager = NewConnectionsManager(engine.acn)
go engine.connectionsManager.AttemptReconnections()
// Init the Server running the Simple App.
engine.service = new(tor.BaseOnionService)