better handle acn/engine isntability #512
63
app/app.go
63
app/app.go
|
@ -24,13 +24,13 @@ type application struct {
|
||||||
eventBuses map[string]event.Manager
|
eventBuses map[string]event.Manager
|
||||||
directory string
|
directory string
|
||||||
|
|
||||||
peerLock sync.Mutex
|
peers map[string]peer.CwtchPeer
|
||||||
peers map[string]peer.CwtchPeer
|
acn connectivity.ACN
|
||||||
acn connectivity.ACN
|
plugins sync.Map //map[string] []plugins.Plugin
|
||||||
plugins sync.Map //map[string] []plugins.Plugin
|
|
||||||
|
|
||||||
engines map[string]connections.Engine
|
engines map[string]connections.Engine
|
||||||
appBus event.Manager
|
appBus event.Manager
|
||||||
|
eventQueue event.Queue
|
||||||
appmutex sync.Mutex
|
appmutex sync.Mutex
|
||||||
engineHooks connections.EngineHooks
|
engineHooks connections.EngineHooks
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
|
||||||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||||
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
|
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
|
||||||
|
|
||||||
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
|
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings, eventQueue: event.NewQueue()}
|
||||||
app.peers = make(map[string]peer.CwtchPeer)
|
app.peers = make(map[string]peer.CwtchPeer)
|
||||||
app.engineHooks = connections.DefaultEngineHooks{}
|
app.engineHooks = connections.DefaultEngineHooks{}
|
||||||
app.acn = acn
|
app.acn = acn
|
||||||
|
@ -107,6 +107,9 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
|
||||||
prog, status := acn.GetBootstrapStatus()
|
prog, status := acn.GetBootstrapStatus()
|
||||||
statusHandler(prog, status)
|
statusHandler(prog, status)
|
||||||
|
|
||||||
|
app.GetPrimaryBus().Subscribe(event.ACNStatus, app.eventQueue)
|
||||||
|
go app.eventHandler()
|
||||||
|
|
||||||
return app
|
return app
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,9 +131,6 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
|
||||||
defer app.appmutex.Unlock()
|
defer app.appmutex.Unlock()
|
||||||
app.settings.WriteGlobalSettings(settings)
|
app.settings.WriteGlobalSettings(settings)
|
||||||
|
|
||||||
// we now need to propagate changes to all peers
|
|
||||||
app.peerLock.Lock()
|
|
||||||
defer app.peerLock.Unlock()
|
|
||||||
for _, profile := range app.peers {
|
for _, profile := range app.peers {
|
||||||
profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments)
|
profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments)
|
||||||
|
|
||||||
|
@ -150,8 +150,8 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
|
||||||
func (app *application) ListProfiles() []string {
|
func (app *application) ListProfiles() []string {
|
||||||
var keys []string
|
var keys []string
|
||||||
|
|
||||||
app.peerLock.Lock()
|
app.appmutex.Lock()
|
||||||
defer app.peerLock.Unlock()
|
defer app.appmutex.Unlock()
|
||||||
for handle := range app.peers {
|
for handle := range app.peers {
|
||||||
keys = append(keys, handle)
|
keys = append(keys, handle)
|
||||||
}
|
}
|
||||||
|
@ -474,6 +474,48 @@ func (app *application) QueryACNVersion() {
|
||||||
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
|
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (app *application) eventHandler() {
|
||||||
|
acnStatus := -1
|
||||||
|
for {
|
||||||
|
e := app.eventQueue.Next()
|
||||||
|
switch e.EventType {
|
||||||
|
case event.ACNStatus:
|
||||||
|
|||||||
|
newAcnStatus, err := strconv.Atoi(e.Data[event.Progress])
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if newAcnStatus == 100 {
|
||||||
|
if acnStatus != 100 {
|
||||||
|
for _, onion := range app.ListProfiles() {
|
||||||
|
profile := app.GetPeer(onion)
|
||||||
|
if profile != nil {
|
||||||
|
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
|
||||||
|
if !exists || autostart == "true" {
|
||||||
|
app.ActivatePeerEngine(onion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if acnStatus == 100 {
|
||||||
|
// just fell offline
|
||||||
|
for _, onion := range app.ListProfiles() {
|
||||||
|
app.DeactivatePeerEngine(onion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
acnStatus = newAcnStatus
|
||||||
|
|
||||||
|
default:
|
||||||
|
// invalid event, signifies shutdown
|
||||||
|
if e.EventType == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ShutdownPeer shuts down a peer and removes it from the app's management
|
// ShutdownPeer shuts down a peer and removes it from the app's management
|
||||||
func (app *application) ShutdownPeer(onion string) {
|
func (app *application) ShutdownPeer(onion string) {
|
||||||
app.appmutex.Lock()
|
app.appmutex.Lock()
|
||||||
|
@ -519,6 +561,7 @@ func (app *application) Shutdown() {
|
||||||
app.shutdownPeer(id)
|
app.shutdownPeer(id)
|
||||||
}
|
}
|
||||||
log.Debugf("Shutting Down App")
|
log.Debugf("Shutting Down App")
|
||||||
|
app.eventQueue.Shutdown()
|
||||||
app.appBus.Shutdown()
|
app.appBus.Shutdown()
|
||||||
log.Debugf("Shut Down Complete")
|
log.Debugf("Shut Down Complete")
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"cwtch.im/cwtch/protocol/connections"
|
"cwtch.im/cwtch/protocol/connections"
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"math"
|
"math"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -113,6 +114,7 @@ type contactRetry struct {
|
||||||
breakChan chan bool
|
breakChan chan bool
|
||||||
onion string
|
onion string
|
||||||
lastCheck time.Time
|
lastCheck time.Time
|
||||||
|
acnProgress int
|
||||||
|
|
||||||
connections sync.Map //[string]*contact
|
connections sync.Map //[string]*contact
|
||||||
connCount int
|
connCount int
|
||||||
|
@ -249,8 +251,6 @@ func (cr *contactRetry) run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case event.ProtocolEngineCreated:
|
|
||||||
cr.protocolEngine = true
|
|
||||||
|
|
||||||
case event.ProtocolEngineShutdown:
|
case event.ProtocolEngineShutdown:
|
||||||
cr.ACNUp = false
|
cr.ACNUp = false
|
||||||
|
@ -265,22 +265,15 @@ func (cr *contactRetry) run() {
|
||||||
p.failedCount = 0
|
p.failedCount = 0
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
case event.ProtocolEngineCreated:
|
||||||
|
cr.protocolEngine = true
|
||||||
|
cr.processStatus()
|
||||||
|
|
||||||
case event.ACNStatus:
|
case event.ACNStatus:
|
||||||
prog := e.Data[event.Progress]
|
progData := e.Data[event.Progress]
|
||||||
if !cr.protocolEngine {
|
if prog, err := strconv.Atoi(progData); err == nil {
|
||||||
continue
|
cr.acnProgress = prog
|
||||||
}
|
cr.processStatus()
|
||||||
if prog == "100" && !cr.ACNUp {
|
|
||||||
cr.ACNUp = true
|
|
||||||
cr.ACNUpTime = time.Now()
|
|
||||||
cr.connections.Range(func(k, v interface{}) bool {
|
|
||||||
p := v.(*contact)
|
|
||||||
p.failedCount = 0
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
} else if prog != "100" {
|
|
||||||
cr.ACNUp = false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,6 +287,24 @@ func (cr *contactRetry) run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cr *contactRetry) processStatus() {
|
||||||
|
if !cr.protocolEngine {
|
||||||
|
cr.ACNUp = false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if cr.acnProgress == 100 && !cr.ACNUp {
|
||||||
|
cr.ACNUp = true
|
||||||
|
cr.ACNUpTime = time.Now()
|
||||||
|
cr.connections.Range(func(k, v interface{}) bool {
|
||||||
|
p := v.(*contact)
|
||||||
|
p.failedCount = 0
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
} else if cr.acnProgress != 100 {
|
||||||
|
cr.ACNUp = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (cr *contactRetry) requeueReady() {
|
func (cr *contactRetry) requeueReady() {
|
||||||
if !cr.ACNUp {
|
if !cr.ACNUp {
|
||||||
return
|
return
|
||||||
|
|
|
@ -1336,7 +1336,7 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
ci, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
|
ci, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
|
||||||
if ci == nil || err != nil {
|
if ci == nil || err != nil {
|
||||||
log.Errorf("no server connection count")
|
log.Errorf("no server connection count")
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), ev.Data[event.Signature])
|
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), ev.Data[event.Signature])
|
||||||
conversations, err := cp.FetchConversations()
|
conversations, err := cp.FetchConversations()
|
||||||
|
|
|
@ -342,18 +342,20 @@ func (e *engine) Shutdown() {
|
||||||
// don't accept any more events...
|
// don't accept any more events...
|
||||||
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
|
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
|
||||||
e.service.Shutdown()
|
e.service.Shutdown()
|
||||||
|
|
||||||
e.shuttingDown.Store(true)
|
e.shuttingDown.Store(true)
|
||||||
|
|
||||||
e.ephemeralServicesLock.Lock()
|
e.ephemeralServicesLock.Lock()
|
||||||
defer e.ephemeralServicesLock.Unlock()
|
defer e.ephemeralServicesLock.Unlock()
|
||||||
for _, connection := range e.ephemeralServices {
|
for _, connection := range e.ephemeralServices {
|
||||||
log.Infof("shutting down ephemeral service")
|
log.Infof("shutting down ephemeral service")
|
||||||
connection.connectingLock.Lock()
|
// work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a
|
||||||
connection.service.Shutdown()
|
// goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits
|
||||||
connection.connectingLock.Unlock()
|
go func() {
|
||||||
}
|
connection.connectingLock.Lock()
|
||||||
|
connection.service.Shutdown()
|
||||||
|
connection.connectingLock.Unlock()
|
||||||
|
|
||||||
|
}()
|
||||||
|
}
|
||||||
e.queue.Shutdown()
|
e.queue.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
this should be a struct level member,