Browse Source

Merge branch 'reconncect' of dan/cwtch into master

tags/v0.3.1
Sarah Jamie Lewis 2 months ago
parent
commit
832c4c28d5

+ 17
- 5
app/app.go View File

@@ -44,6 +44,7 @@ type Application interface {

GetPrimaryBus() event.Manager
GetEventBus(onion string) event.Manager
QueryACNStatus()

ShutdownPeer(string)
Shutdown()
@@ -67,11 +68,7 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
app := &application{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
app.appletPeers.init()

fn := func(progress int, status string) {
progStr := strconv.Itoa(progress)
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
}
app.appletACN.init(acn, fn)
app.appletACN.init(acn, app.getACNStatusHandler())
return app
}

@@ -196,6 +193,21 @@ func (ac *applicationCore) GetEventBus(onion string) event.Manager {
return nil
}

func (app *application) getACNStatusHandler() func(int, string) {
return func(progress int, status string) {
progStr := strconv.Itoa(progress)
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
for _, bus := range app.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
}
}
}

func (app *application) QueryACNStatus() {
prog, status := app.acn.GetBootstrapStatus()
app.getACNStatusHandler()(prog, status)
}

// ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) {
app.mutex.Lock()

+ 5
- 0
app/appClient.go View File

@@ -107,6 +107,11 @@ func (ac *applicationClient) LoadProfiles(password string) {
ac.bridge.Write(&message)
}

func (ac *applicationClient) QueryACNStatus() {
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.GetACNStatus, map[event.Field]string{})}
ac.bridge.Write(&message)
}

// ShutdownPeer shuts down a peer and removes it from the app's management
func (ac *applicationClient) ShutdownPeer(onion string) {
ac.mutex.Lock()

+ 14
- 5
app/appService.go View File

@@ -31,11 +31,7 @@ type ApplicationService interface {
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}

fn := func(progress int, status string) {
progStr := strconv.Itoa(progress)
appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
}
appService.appletACN.init(acn, fn)
appService.appletACN.init(acn, appService.getACNStatusHandler())
appService.handle = appService.handleEvent

go appService.listen()
@@ -74,6 +70,9 @@ func (as *applicationService) handleEvent(ev *event.Event) {
message := event.IPCMessage{Dest: onion, Message: *ev}
as.bridge.Write(&message)
}
case event.GetACNStatus:
prog, status := as.acn.GetBootstrapStatus()
as.getACNStatusHandler()(prog, status)
case event.ShutdownPeer:
onion := ev.Data[event.Identity]
as.ShutdownPeer(onion)
@@ -127,6 +126,16 @@ func (as *applicationService) loadProfiles(password string) {
}
}

func (as *applicationService) getACNStatusHandler() func(int, string) {
return func(progress int, status string) {
progStr := strconv.Itoa(progress)
as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
for _, bus := range as.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
}
}
}

func (as *applicationService) ShutdownPeer(onion string) {
as.engines[onion].Shutdown()
delete(as.engines, onion)

+ 55
- 18
app/plugins/contactRetry.go View File

@@ -10,26 +10,35 @@ import (
const tickTime = 10 * time.Second
const maxBakoff int = 32 // 320 seconds or ~5 min

type peer struct {
type connectionType int

const (
peerConn connectionType = iota
serverConn
)

type contact struct {
id string
state connections.ConnectionState
ctype connectionType

ticks int
backoff int
}

type contactRetry struct {
bus event.Manager
queue event.Queue
bus event.Manager
queue event.Queue
networkUp bool

breakChan chan bool

peers sync.Map //[string]*peer
connections sync.Map //[string]*contact
}

// NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewContactRetry(bus event.Manager) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), peers: sync.Map{}}
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewConnectionRetry(bus event.Manager) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), connections: sync.Map{}, networkUp: false}
return cr
}

@@ -39,6 +48,9 @@ func (cr *contactRetry) Start() {

func (cr *contactRetry) run() {
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
cr.bus.Subscribe(event.ACNStatus, cr.queue)
cr.bus.Subscribe(event.ServerStateChange, cr.queue)

for {
select {
case e := <-cr.queue.OutChan():
@@ -46,21 +58,46 @@ func (cr *contactRetry) run() {
case event.PeerStateChange:
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
peer := e.Data[event.RemotePeer]
cr.handleEvent(peer, state)
cr.handleEvent(peer, state, peerConn)

case event.ServerStateChange:
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
server := e.Data[event.GroupServer]
cr.handleEvent(server, state, serverConn)

case event.ACNStatus:
prog := e.Data[event.Progreess]
if prog == "100" && cr.networkUp == false {
cr.networkUp = true
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.ticks = 0
p.backoff = 1
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
return true
})
} else if prog != "100" {
cr.networkUp = false
}
}

case <-time.After(tickTime):
cr.peers.Range(func(k, v interface{}) bool {
p := v.(*peer)
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)

if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks == p.backoff {
p.ticks = 0
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
if cr.networkUp {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
} else {
cr.bus.Publish(event.NewEventList(event.JoinServer, event.GroupServer, p.id))
}
}
}
}

return true
})

@@ -70,15 +107,15 @@ func (cr *contactRetry) run() {
}
}

func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState) {
if _, exists := cr.peers.Load(id); !exists {
p := &peer{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0}
cr.peers.Store(id, p)
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0, ctype: ctype}
cr.connections.Store(id, p)
return
}

pinf, _ := cr.peers.Load(id)
p := pinf.(*peer)
pinf, _ := cr.connections.Load(id)
p := pinf.(*contact)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED
if p.backoff < maxBakoff {

+ 3
- 3
app/plugins/plugin.go View File

@@ -9,7 +9,7 @@ type PluginID int

// These are the plugin IDs for the supplied plugins
const (
CONTACTRETRY PluginID = iota
CONNECTIONRETRY PluginID = iota
)

// Plugin is the interface for a plugin
@@ -21,8 +21,8 @@ type Plugin interface {
// Get is a plugin factory for the requested plugin
func Get(id PluginID, bus event.Manager) Plugin {
switch id {
case CONTACTRETRY:
return NewContactRetry(bus)
case CONNECTIONRETRY:
return NewConnectionRetry(bus)
}

return nil

+ 3
- 0
event/common.go View File

@@ -29,6 +29,7 @@ const (
BlockUnknownPeers = Type("BlockUnknownPeers")
AllowUnknownPeers = Type("AllowUnknownPeers")

// GroupServer
JoinServer = Type("JoinServer")

ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
@@ -160,6 +161,8 @@ const (
// Error(err)
AppError = Type("AppError")

GetACNStatus = Type("GetACNStatus")

// Progress, Status
ACNStatus = Type("ACNStatus")
)

+ 0
- 1
model/profile_test.go View File

@@ -4,7 +4,6 @@ import (
"testing"
)


func TestProfileIdentity(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")

+ 0
- 31
protocol/connections/connectionsmanager.go View File

@@ -6,14 +6,12 @@ import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"

"sync"
"time"
)

// Manager encapsulates all the logic necessary to manage outgoing peer and server connections.
type Manager struct {
serverConnections map[string]*PeerServerConnection
lock sync.Mutex
breakChannel chan bool
acn connectivity.ACN
}

@@ -22,7 +20,6 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager {
m := new(Manager)
m.acn = acn
m.serverConnections = make(map[string]*PeerServerConnection)
m.breakChannel = make(chan bool)
return m
}

@@ -66,36 +63,8 @@ func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerC
return
}

// AttemptReconnections repeatedly attempts to reconnect with failed peers and servers.
func (m *Manager) AttemptReconnections() {
maxTimeout := time.Minute * 5
// nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that
timeout := time.Millisecond * 500
for {
select {
case <-time.After(timeout):
m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
}
}
m.lock.Unlock()

if timeout < maxTimeout {
timeout = timeout * 2
} else {
timeout = maxTimeout
}
case <-m.breakChannel:
return
}
}
}

// Shutdown closes all connections under management (freeing their goroutines)
func (m *Manager) Shutdown() {
m.breakChannel <- true
m.lock.Lock()
for onion, psc := range m.serverConnections {
psc.Close()

+ 0
- 1
protocol/connections/engine.go View File

@@ -63,7 +63,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK

engine.acn = acn
engine.connectionsManager = NewConnectionsManager(engine.acn)
go engine.connectionsManager.AttemptReconnections()

// Init the Server running the Simple App.
engine.service = new(tor.BaseOnionService)

Loading…
Cancel
Save