Merge pull request 'logic and thread safety fixes' (#333) from dan/cwtch:networkCheck into master
the build was successful Details

Reviewed-on: #333
This commit is contained in:
Sarah Jamie Lewis 2020-11-20 15:55:21 -08:00
commit c6d45626fa
1 changed files with 39 additions and 14 deletions

View File

@ -10,12 +10,18 @@ import (
"time" "time"
) )
// NetworkCheckError is a status for when the NetworkCheck Plugin has had an error making an out going connection indicating it may be offline
const NetworkCheckError = "Error"
// NetworkCheckSuccess is a status for when the NetworkCheck Plugin has had a successful message from a peer, indicating it is online right now
const NetworkCheckSuccess = "Success"
// networkCheck is a convenience plugin for testing high level availability of onion services // networkCheck is a convenience plugin for testing high level availability of onion services
type networkCheck struct { type networkCheck struct {
bus event.Manager bus event.Manager
queue event.Queue queue event.Queue
acn connectivity.ACN acn connectivity.ACN
onionsToCheck []string onionsToCheck sync.Map // onion:string => true:bool
breakChan chan bool breakChan chan bool
running bool running bool
offline bool offline bool
@ -34,12 +40,15 @@ func (nc *networkCheck) Start() {
func (nc *networkCheck) run() { func (nc *networkCheck) run() {
nc.running = true nc.running = true
nc.offline = true
nc.bus.Subscribe(event.ProtocolEngineStartListen, nc.queue) nc.bus.Subscribe(event.ProtocolEngineStartListen, nc.queue)
nc.bus.Subscribe(event.NewMessageFromPeer, nc.queue) nc.bus.Subscribe(event.NewMessageFromPeer, nc.queue)
nc.bus.Subscribe(event.PeerAcknowledgement, nc.queue) nc.bus.Subscribe(event.PeerAcknowledgement, nc.queue)
nc.bus.Subscribe(event.EncryptedGroupMessage, nc.queue) nc.bus.Subscribe(event.EncryptedGroupMessage, nc.queue)
nc.bus.Subscribe(event.PeerStateChange, nc.queue) nc.bus.Subscribe(event.PeerStateChange, nc.queue)
nc.bus.Subscribe(event.ServerStateChange, nc.queue) nc.bus.Subscribe(event.ServerStateChange, nc.queue)
nc.bus.Subscribe(event.NewGetValMessageFromPeer, nc.queue)
nc.bus.Subscribe(event.NewRetValMessageFromPeer, nc.queue)
var lastMessageReceived time.Time var lastMessageReceived time.Time
for { for {
select { select {
@ -53,17 +62,23 @@ func (nc *networkCheck) run() {
// under normal operating circumstances // under normal operating circumstances
case event.ProtocolEngineStartListen: case event.ProtocolEngineStartListen:
log.Debugf("initiating connection check for %v", e.Data[event.Onion]) log.Debugf("initiating connection check for %v", e.Data[event.Onion])
nc.onionsToCheck = append(nc.onionsToCheck, e.Data[event.Onion]) nc.onionsToCheck.Store(e.Data[event.Onion], true)
if time.Now().Sub(lastMessageReceived) > time.Minute {
nc.selfTest()
}
case event.PeerStateChange: case event.PeerStateChange:
fallthrough fallthrough
case event.ServerStateChange: case event.ServerStateChange:
// if we successfully connect / authenticated to a remote server / peer then we obviously have internet // if we successfully connect / authenticated to a remote server / peer then we obviously have internet
connectionState := e.Data[event.ConnectionState] connectionState := e.Data[event.ConnectionState]
nc.offlineLock.Lock() nc.offlineLock.Lock()
if nc.offline && (connectionState == connections.ConnectionStateName[connections.AUTHENTICATED] || connectionState == connections.ConnectionStateName[connections.CONNECTED]) { if connectionState == connections.ConnectionStateName[connections.AUTHENTICATED] || connectionState == connections.ConnectionStateName[connections.CONNECTED] {
lastMessageReceived = time.Now() lastMessageReceived = time.Now()
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: "Success"}))
nc.offline = false if nc.offline {
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: NetworkCheckSuccess}))
nc.offline = false
}
} }
nc.offlineLock.Unlock() nc.offlineLock.Unlock()
default: default:
@ -74,7 +89,7 @@ func (nc *networkCheck) run() {
lastMessageReceived = time.Now() lastMessageReceived = time.Now()
nc.offlineLock.Lock() nc.offlineLock.Lock()
if nc.offline { if nc.offline {
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: "Success"})) nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Error: "", event.Status: NetworkCheckSuccess}))
nc.offline = false nc.offline = false
} }
nc.offlineLock.Unlock() nc.offlineLock.Unlock()
@ -82,9 +97,7 @@ func (nc *networkCheck) run() {
case <-time.After(tickTime): case <-time.After(tickTime):
// if we haven't received an action in the last minute...kick off a set of testing // if we haven't received an action in the last minute...kick off a set of testing
if time.Now().Sub(lastMessageReceived) > time.Minute { if time.Now().Sub(lastMessageReceived) > time.Minute {
for _, onion := range nc.onionsToCheck { nc.selfTest()
go nc.checkConnection(onion)
}
} }
} }
} }
@ -98,8 +111,20 @@ func (nc *networkCheck) Shutdown() {
} }
} }
func (nc *networkCheck) selfTest() {
nc.onionsToCheck.Range(func(key, val interface{}) bool {
go nc.checkConnection(key.(string))
return true
})
}
// //
func (nc *networkCheck) checkConnection(onion string) { func (nc *networkCheck) checkConnection(onion string) {
prog, _ := nc.acn.GetBootstrapStatus()
if prog != 100 {
return
}
// we want to definitively time these actions out faster than tor will, because these onions should definitely be // we want to definitively time these actions out faster than tor will, because these onions should definitely be
// online // online
ClientTimeout := TimeoutPolicy(time.Second * 60) ClientTimeout := TimeoutPolicy(time.Second * 60)
@ -114,13 +139,13 @@ func (nc *networkCheck) checkConnection(onion string) {
defer nc.offlineLock.Unlock() defer nc.offlineLock.Unlock()
// regardless of the outcome we want to report a status to let anyone who might care know that we did do a check // regardless of the outcome we want to report a status to let anyone who might care know that we did do a check
if err != nil { if err != nil {
log.Debugf("publishing network error for %v", onion) log.Debugf("publishing network error for %v -- %v\n", onion, err)
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: err.Error(), event.Status: "Error"})) nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: err.Error(), event.Status: NetworkCheckError}))
nc.offline = false nc.offline = true
} else { } else {
log.Debugf("publishing network success for %v", onion) log.Debugf("publishing network success for %v", onion)
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: "", event.Status: "Success"})) nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: "", event.Status: NetworkCheckSuccess}))
nc.offline = true nc.offline = false
} }
} }