better handle acn/engine isntability #512
63
app/app.go
63
app/app.go
|
@ -24,13 +24,13 @@ type application struct {
|
|||
eventBuses map[string]event.Manager
|
||||
directory string
|
||||
|
||||
peerLock sync.Mutex
|
||||
peers map[string]peer.CwtchPeer
|
||||
acn connectivity.ACN
|
||||
plugins sync.Map //map[string] []plugins.Plugin
|
||||
peers map[string]peer.CwtchPeer
|
||||
acn connectivity.ACN
|
||||
plugins sync.Map //map[string] []plugins.Plugin
|
||||
|
||||
engines map[string]connections.Engine
|
||||
appBus event.Manager
|
||||
eventQueue event.Queue
|
||||
appmutex sync.Mutex
|
||||
engineHooks connections.EngineHooks
|
||||
|
||||
|
@ -97,7 +97,7 @@ func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
|
|||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
|
||||
|
||||
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
|
||||
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings, eventQueue: event.NewQueue()}
|
||||
app.peers = make(map[string]peer.CwtchPeer)
|
||||
app.engineHooks = connections.DefaultEngineHooks{}
|
||||
app.acn = acn
|
||||
|
@ -107,6 +107,9 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
|
|||
prog, status := acn.GetBootstrapStatus()
|
||||
statusHandler(prog, status)
|
||||
|
||||
app.GetPrimaryBus().Subscribe(event.ACNStatus, app.eventQueue)
|
||||
go app.eventHandler()
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
|
@ -128,9 +131,6 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
|
|||
defer app.appmutex.Unlock()
|
||||
app.settings.WriteGlobalSettings(settings)
|
||||
|
||||
// we now need to propagate changes to all peers
|
||||
app.peerLock.Lock()
|
||||
defer app.peerLock.Unlock()
|
||||
for _, profile := range app.peers {
|
||||
profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments)
|
||||
|
||||
|
@ -150,8 +150,8 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
|
|||
func (app *application) ListProfiles() []string {
|
||||
var keys []string
|
||||
|
||||
app.peerLock.Lock()
|
||||
defer app.peerLock.Unlock()
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
for handle := range app.peers {
|
||||
keys = append(keys, handle)
|
||||
}
|
||||
|
@ -474,6 +474,48 @@ func (app *application) QueryACNVersion() {
|
|||
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
|
||||
}
|
||||
|
||||
func (app *application) eventHandler() {
|
||||
acnStatus := -1
|
||||
for {
|
||||
e := app.eventQueue.Next()
|
||||
switch e.EventType {
|
||||
case event.ACNStatus:
|
||||
|
||||
newAcnStatus, err := strconv.Atoi(e.Data[event.Progress])
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if newAcnStatus == 100 {
|
||||
if acnStatus != 100 {
|
||||
for _, onion := range app.ListProfiles() {
|
||||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
|
||||
if !exists || autostart == "true" {
|
||||
app.ActivatePeerEngine(onion)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if acnStatus == 100 {
|
||||
// just fell offline
|
||||
for _, onion := range app.ListProfiles() {
|
||||
app.DeactivatePeerEngine(onion)
|
||||
}
|
||||
}
|
||||
}
|
||||
acnStatus = newAcnStatus
|
||||
|
||||
default:
|
||||
// invalid event, signifies shutdown
|
||||
if e.EventType == "" {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ShutdownPeer shuts down a peer and removes it from the app's management
|
||||
func (app *application) ShutdownPeer(onion string) {
|
||||
app.appmutex.Lock()
|
||||
|
@ -519,6 +561,7 @@ func (app *application) Shutdown() {
|
|||
app.shutdownPeer(id)
|
||||
}
|
||||
log.Debugf("Shutting Down App")
|
||||
app.eventQueue.Shutdown()
|
||||
app.appBus.Shutdown()
|
||||
log.Debugf("Shut Down Complete")
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -113,6 +114,7 @@ type contactRetry struct {
|
|||
breakChan chan bool
|
||||
onion string
|
||||
lastCheck time.Time
|
||||
acnProgress int
|
||||
|
||||
connections sync.Map //[string]*contact
|
||||
connCount int
|
||||
|
@ -249,8 +251,6 @@ func (cr *contactRetry) run() {
|
|||
}
|
||||
}
|
||||
}
|
||||
case event.ProtocolEngineCreated:
|
||||
cr.protocolEngine = true
|
||||
|
||||
case event.ProtocolEngineShutdown:
|
||||
cr.ACNUp = false
|
||||
|
@ -265,22 +265,15 @@ func (cr *contactRetry) run() {
|
|||
p.failedCount = 0
|
||||
return true
|
||||
})
|
||||
case event.ProtocolEngineCreated:
|
||||
cr.protocolEngine = true
|
||||
cr.processStatus()
|
||||
|
||||
case event.ACNStatus:
|
||||
prog := e.Data[event.Progress]
|
||||
if !cr.protocolEngine {
|
||||
continue
|
||||
}
|
||||
if prog == "100" && !cr.ACNUp {
|
||||
cr.ACNUp = true
|
||||
cr.ACNUpTime = time.Now()
|
||||
cr.connections.Range(func(k, v interface{}) bool {
|
||||
p := v.(*contact)
|
||||
p.failedCount = 0
|
||||
return true
|
||||
})
|
||||
} else if prog != "100" {
|
||||
cr.ACNUp = false
|
||||
progData := e.Data[event.Progress]
|
||||
if prog, err := strconv.Atoi(progData); err == nil {
|
||||
cr.acnProgress = prog
|
||||
cr.processStatus()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,6 +287,24 @@ func (cr *contactRetry) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (cr *contactRetry) processStatus() {
|
||||
if !cr.protocolEngine {
|
||||
cr.ACNUp = false
|
||||
return
|
||||
}
|
||||
if cr.acnProgress == 100 && !cr.ACNUp {
|
||||
cr.ACNUp = true
|
||||
cr.ACNUpTime = time.Now()
|
||||
cr.connections.Range(func(k, v interface{}) bool {
|
||||
p := v.(*contact)
|
||||
p.failedCount = 0
|
||||
return true
|
||||
})
|
||||
} else if cr.acnProgress != 100 {
|
||||
cr.ACNUp = false
|
||||
}
|
||||
}
|
||||
|
||||
func (cr *contactRetry) requeueReady() {
|
||||
if !cr.ACNUp {
|
||||
return
|
||||
|
|
|
@ -1336,7 +1336,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
ci, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
|
||||
if ci == nil || err != nil {
|
||||
log.Errorf("no server connection count")
|
||||
return
|
||||
continue
|
||||
}
|
||||
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), ev.Data[event.Signature])
|
||||
conversations, err := cp.FetchConversations()
|
||||
|
|
|
@ -342,18 +342,20 @@ func (e *engine) Shutdown() {
|
|||
// don't accept any more events...
|
||||
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
|
||||
e.service.Shutdown()
|
||||
|
||||
e.shuttingDown.Store(true)
|
||||
|
||||
e.ephemeralServicesLock.Lock()
|
||||
defer e.ephemeralServicesLock.Unlock()
|
||||
for _, connection := range e.ephemeralServices {
|
||||
log.Infof("shutting down ephemeral service")
|
||||
connection.connectingLock.Lock()
|
||||
connection.service.Shutdown()
|
||||
connection.connectingLock.Unlock()
|
||||
}
|
||||
// work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a
|
||||
// goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits
|
||||
go func() {
|
||||
connection.connectingLock.Lock()
|
||||
connection.service.Shutdown()
|
||||
connection.connectingLock.Unlock()
|
||||
|
||||
}()
|
||||
}
|
||||
e.queue.Shutdown()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
this should be a struct level member,