forked from cwtch.im/cwtch
logic and thread safety fixes
This commit is contained in:
parent
c79b083bbc
commit
f3399195ab
|
@ -10,12 +10,18 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// NetworkCheckError is a status for when the NetworkCheck Plugin has had an error making an out going connection indicating it may be offline
|
||||
const NetworkCheckError = "Error"
|
||||
|
||||
// NetworkCheckSuccess is a status for when the NetworkCheck Plugin has had a successful message from a peer, indicating it is online right now
|
||||
const NetworkCheckSuccess = "Success"
|
||||
|
||||
// networkCheck is a convenience plugin for testing high level availability of onion services
|
||||
type networkCheck struct {
|
||||
bus event.Manager
|
||||
queue event.Queue
|
||||
acn connectivity.ACN
|
||||
onionsToCheck []string
|
||||
onionsToCheck sync.Map // onion:string => true:bool
|
||||
breakChan chan bool
|
||||
running bool
|
||||
offline bool
|
||||
|
@ -34,12 +40,15 @@ func (nc *networkCheck) Start() {
|
|||
|
||||
func (nc *networkCheck) run() {
|
||||
nc.running = true
|
||||
nc.offline = true
|
||||
nc.bus.Subscribe(event.ProtocolEngineStartListen, nc.queue)
|
||||
nc.bus.Subscribe(event.NewMessageFromPeer, nc.queue)
|
||||
nc.bus.Subscribe(event.PeerAcknowledgement, nc.queue)
|
||||
nc.bus.Subscribe(event.EncryptedGroupMessage, nc.queue)
|
||||
nc.bus.Subscribe(event.PeerStateChange, nc.queue)
|
||||
nc.bus.Subscribe(event.ServerStateChange, nc.queue)
|
||||
nc.bus.Subscribe(event.NewGetValMessageFromPeer, nc.queue)
|
||||
nc.bus.Subscribe(event.NewRetValMessageFromPeer, nc.queue)
|
||||
var lastMessageReceived time.Time
|
||||
for {
|
||||
select {
|
||||
|
@ -53,17 +62,23 @@ func (nc *networkCheck) run() {
|
|||
// under normal operating circumstances
|
||||
case event.ProtocolEngineStartListen:
|
||||
log.Debugf("initiating connection check for %v", e.Data[event.Onion])
|
||||
nc.onionsToCheck = append(nc.onionsToCheck, e.Data[event.Onion])
|
||||
nc.onionsToCheck.Store(e.Data[event.Onion], true)
|
||||
if time.Now().Sub(lastMessageReceived) > time.Minute {
|
||||
nc.selfTest()
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
fallthrough
|
||||
case event.ServerStateChange:
|
||||
// if we successfully connect / authenticated to a remote server / peer then we obviously have internet
|
||||
connectionState := e.Data[event.ConnectionState]
|
||||
nc.offlineLock.Lock()
|
||||
if nc.offline && (connectionState == connections.ConnectionStateName[connections.AUTHENTICATED] || connectionState == connections.ConnectionStateName[connections.CONNECTED]) {
|
||||
if connectionState == connections.ConnectionStateName[connections.AUTHENTICATED] || connectionState == connections.ConnectionStateName[connections.CONNECTED] {
|
||||
lastMessageReceived = time.Now()
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: "Success"}))
|
||||
nc.offline = false
|
||||
|
||||
if nc.offline {
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: NetworkCheckSuccess}))
|
||||
nc.offline = false
|
||||
}
|
||||
}
|
||||
nc.offlineLock.Unlock()
|
||||
default:
|
||||
|
@ -74,7 +89,7 @@ func (nc *networkCheck) run() {
|
|||
lastMessageReceived = time.Now()
|
||||
nc.offlineLock.Lock()
|
||||
if nc.offline {
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: "Success"}))
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: NetworkCheckSuccess}))
|
||||
nc.offline = false
|
||||
}
|
||||
nc.offlineLock.Unlock()
|
||||
|
@ -82,9 +97,7 @@ func (nc *networkCheck) run() {
|
|||
case <-time.After(tickTime):
|
||||
// if we haven't received an action in the last minute...kick off a set of testing
|
||||
if time.Now().Sub(lastMessageReceived) > time.Minute {
|
||||
for _, onion := range nc.onionsToCheck {
|
||||
go nc.checkConnection(onion)
|
||||
}
|
||||
nc.selfTest()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -98,8 +111,20 @@ func (nc *networkCheck) Shutdown() {
|
|||
}
|
||||
}
|
||||
|
||||
func (nc *networkCheck) selfTest() {
|
||||
nc.onionsToCheck.Range(func(key, val interface{}) bool {
|
||||
go nc.checkConnection(key.(string))
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
//
|
||||
func (nc *networkCheck) checkConnection(onion string) {
|
||||
prog, _ := nc.acn.GetBootstrapStatus()
|
||||
if prog != 100 {
|
||||
return
|
||||
}
|
||||
|
||||
// we want to definitively time these actions out faster than tor will, because these onions should definitely be
|
||||
// online
|
||||
ClientTimeout := TimeoutPolicy(time.Second * 60)
|
||||
|
@ -114,13 +139,13 @@ func (nc *networkCheck) checkConnection(onion string) {
|
|||
defer nc.offlineLock.Unlock()
|
||||
// regardless of the outcome we want to report a status to let anyone who might care know that we did do a check
|
||||
if err != nil {
|
||||
log.Debugf("publishing network error for %v", onion)
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: err.Error(), event.Status: "Error"}))
|
||||
nc.offline = false
|
||||
log.Debugf("publishing network error for %v -- %v\n", onion, err)
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: err.Error(), event.Status: NetworkCheckError}))
|
||||
nc.offline = true
|
||||
} else {
|
||||
log.Debugf("publishing network success for %v", onion)
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: "", event.Status: "Success"}))
|
||||
nc.offline = true
|
||||
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: "", event.Status: NetworkCheckSuccess}))
|
||||
nc.offline = false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue