Browse Source

Adding Network Status Plugin + Fixing plugin goroutine leaks

pull/280/head
Sarah Jamie Lewis 1 month ago
parent
commit
99ea31ce82
11 changed files with 161 additions and 12 deletions
  1. 3
    2
      app/app.go
  2. 1
    1
      app/appClient.go
  3. 4
    2
      app/appService.go
  4. 17
    5
      app/applets.go
  5. 97
    0
      app/plugins/networkCheck.go
  6. 5
    1
      app/plugins/plugin.go
  7. 10
    0
      event/common.go
  8. 6
    0
      go.mod
  9. 14
    0
      go.sum
  10. 1
    1
      peer/cwtch_peer.go
  11. 3
    0
      testing/cwtch_peer_server_integration_test.go

+ 3
- 2
app/app.go View File

@@ -119,7 +119,7 @@ func (app *application) CreatePeer(name string, password string) {
}

func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
app.AddPlugin(onion, pluginID, app.eventBuses[onion])
app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
}

// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
@@ -220,15 +220,16 @@ func (app *application) ShutdownPeer(onion string) {
delete(app.engines, onion)
app.storage[onion].Shutdown()
delete(app.storage, onion)
app.appletPlugins.Shutdown()
}

// Shutdown shutsdown all peers of an app and then the tormanager
func (app *application) Shutdown() {
for id, peer := range app.peers {
peer.Shutdown()
app.appletPlugins.ShutdownPeer(id)
app.engines[id].Shutdown()
app.storage[id].Shutdown()
app.appletPlugins.Shutdown()
app.eventBuses[id].Shutdown()
}
app.appBus.Shutdown()

+ 1
- 1
app/appClient.go View File

@@ -123,7 +123,7 @@ func (ac *applicationClient) ShutdownPeer(onion string) {
ac.bridge.Write(&message)
}

// Shutdown shuts down the application lcienr and all front end peer components
// Shutdown shuts down the application client and all front end peer components
func (ac *applicationClient) Shutdown() {
for id := range ac.peers {
ac.ShutdownPeer(id)

+ 4
- 2
app/appService.go View File

@@ -50,7 +50,7 @@ func (as *applicationService) handleEvent(ev *event.Event) {
case event.AddPeerPlugin:
onion := ev.Data[event.Identity]
pluginID, _ := strconv.Atoi(ev.Data[event.Data])
as.AddPlugin(onion, plugins.PluginID(pluginID), as.eventBuses[onion])
as.AddPlugin(onion, plugins.PluginID(pluginID), as.eventBuses[onion], as.acn)
case event.LoadProfiles:
password := ev.Data[event.Password]
as.loadProfiles(password)
@@ -147,8 +147,10 @@ func (as *applicationService) ShutdownPeer(onion string) {

// Shutdown shuts down the application Service and all peer related backend parts
func (as *applicationService) Shutdown() {
log.Debugf("shutting down application service...")
as.appletPlugins.Shutdown()
for id := range as.engines {
as.appletPlugins.Shutdown()
log.Debugf("shutting down application service peer engine %v", id)
as.ShutdownPeer(id)
}
}

+ 17
- 5
app/applets.go View File

@@ -77,16 +77,27 @@ func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer {
// ***** applet Plugins

func (ap *appletPlugins) Shutdown() {
log.Debugf("shutting down applet plugins...")
ap.plugins.Range(func(k, v interface{}) bool {
plugins := v.([]plugins.Plugin)
log.Debugf("shutting down plugins for %v", k)
ap.ShutdownPeer(k.(string))
return true
})
}

func (ap *appletPlugins) ShutdownPeer(peerid string) {
log.Debugf("shutting down plugins for %v", peerid)
pluginsI, ok := ap.plugins.Load(peerid)
if ok {
plugins := pluginsI.([]plugins.Plugin)
for _, plugin := range plugins {
log.Debugf("shutting down plugin: %v", plugin)
plugin.Shutdown()
}
return true
})
}
}

func (ap *appletPlugins) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager) {
func (ap *appletPlugins) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
if _, exists := ap.plugins.Load(peerid); !exists {
ap.plugins.Store(peerid, []plugins.Plugin{})
}
@@ -94,8 +105,9 @@ func (ap *appletPlugins) AddPlugin(peerid string, id plugins.PluginID, bus event
pluginsinf, _ := ap.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin)

newp := plugins.Get(id, bus)
newp := plugins.Get(id, bus, acn)
newp.Start()
peerPlugins = append(peerPlugins, newp)
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
ap.plugins.Store(peerid, peerPlugins)
}

+ 97
- 0
app/plugins/networkCheck.go View File

@@ -0,0 +1,97 @@
package plugins

import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"git.openprivacy.ca/openprivacy/libricochet-go/policies"
"time"
)

// networkCheck is a convenience plugin for testing high level availability of onion services
type networkCheck struct {
bus event.Manager
queue event.Queue
acn connectivity.ACN
onionsToCheck []string
breakChan chan bool
running bool
}

// NewNetworkCheck returns a Plugin that when started will attempt various network tests
func NewNetworkCheck(bus event.Manager, acn connectivity.ACN) Plugin {
nc := &networkCheck{bus: bus, acn: acn, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
return nc
}

func (nc *networkCheck) Start() {
go nc.run()
}

func (nc *networkCheck) run() {
nc.running = true
nc.bus.Subscribe(event.ProtocolEngineStartListen, nc.queue)
nc.bus.Subscribe(event.NewMessageFromPeer, nc.queue)
nc.bus.Subscribe(event.PeerAcknowledgement, nc.queue)
nc.bus.Subscribe(event.EncryptedGroupMessage, nc.queue)
var lastMessageReceived time.Time
for {
select {
case <-nc.breakChan:
nc.running = false
return
case e := <-nc.queue.OutChan():
switch e.EventType {
// On receipt of a Listen request for an onion service we will add the onion to our list
// and then we will wait a minute and check the connection for the first time (the onion should be up)
// under normal operating circumstances
case event.ProtocolEngineStartListen:
log.Debugf("initiating connection check for %v", e.Data[event.Onion])
nc.onionsToCheck = append(nc.onionsToCheck, e.Data[event.Onion])
default:
// if we receive either an encrypted group message or a peer acknowledgement we can assume the network
// is up and running (our onion service might still not be available, but we would aim to detect that
// through other actions
// we reset out timer
lastMessageReceived = time.Now()
}
case <-time.After(tickTime):
// if we haven't received an action in the last minute...kick off a set of testing
if time.Now().Sub(lastMessageReceived) > time.Minute {
for _, onion := range nc.onionsToCheck {
go nc.checkConnection(onion)
}
}
}
}
}

func (nc *networkCheck) Shutdown() {
if nc.running {
nc.queue.Shutdown()
log.Debugf("shutting down network status plugin")
nc.breakChan <- true
}
}

//
func (nc *networkCheck) checkConnection(onion string) {
// we want to definitively time these actions out faster than tor will, because these onions should definitely be
// online
ClientTimeout := policies.TimeoutPolicy(time.Second * 60)
err := ClientTimeout.ExecuteAction(func() error {
conn, _, err := nc.acn.Open(onion)
if err == nil {
conn.Close()
}
return err
})
// regardless of the outcome we want to report a status to let anyone who might care know that we did do a check
if err != nil {
log.Debugf("publishing network error for %v", onion)
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: err.Error(), event.Status: "Error"}))
} else {
log.Debugf("publishing network success for %v", onion)
nc.bus.Publish(event.NewEvent(event.NetworkStatus, map[event.Field]string{event.Onion: onion, event.Error: "", event.Status: "Success"}))
}
}

+ 5
- 1
app/plugins/plugin.go View File

@@ -2,6 +2,7 @@ package plugins

import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
)

// PluginID is used as an ID for signaling plugin activities
@@ -10,6 +11,7 @@ type PluginID int
// These are the plugin IDs for the supplied plugins
const (
CONNECTIONRETRY PluginID = iota
NETWORKCHECK
)

// Plugin is the interface for a plugin
@@ -19,10 +21,12 @@ type Plugin interface {
}

// Get is a plugin factory for the requested plugin
func Get(id PluginID, bus event.Manager) Plugin {
func Get(id PluginID, bus event.Manager, acn connectivity.ACN) Plugin {
switch id {
case CONNECTIONRETRY:
return NewConnectionRetry(bus)
case NETWORKCHECK:
return NewNetworkCheck(bus, acn)
}

return nil

+ 10
- 0
event/common.go View File

@@ -165,6 +165,12 @@ const (

// Progress, Status
ACNStatus = Type("ACNStatus")

// Network Status
// Status: Success || Error
// Error: Description of the Error
// Onion: the local onion we attempt to check
NetworkStatus = Type("NetworkError")
)

// Field defines common event attributes
@@ -172,6 +178,10 @@ type Field string

// Defining Common Field Types
const (

// A peers local onion address
Onion = Field("Onion")

RemotePeer = Field("RemotePeer")
Ciphertext = Field("Ciphertext")
Signature = Field("Signature")

+ 6
- 0
go.mod View File

@@ -4,12 +4,18 @@ require (
cwtch.im/tapir v0.1.11
git.openprivacy.ca/openprivacy/libricochet-go v1.0.6
github.com/c-bata/go-prompt v0.2.3
github.com/client9/misspell v0.3.4 // indirect
github.com/golang/protobuf v1.3.2
github.com/gordonklaus/ineffassign v0.0.0-20190601041439-ed7b1b5ee0f8 // indirect
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859 // indirect
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 // indirect
github.com/struCoder/pidusage v0.1.2
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/net v0.0.0-20190628185345-da137c7871d7
golang.org/x/tools v0.0.0-20191031220737-6d8f1af9ccc0 // indirect
)

go 1.13

+ 14
- 0
go.sum View File

@@ -10,6 +10,8 @@ github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7I
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s=
github.com/c-bata/go-prompt v0.2.3/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cretz/bine v0.1.0 h1:1/fvhLE+fk0bPzjdO5Ci+0ComYxEMuB1JhM4X5skT3g=
github.com/cretz/bine v0.1.0/go.mod h1:6PF6fWAvYtwjRGkAuDEJeWNOv3a2hUouSP/yRYXmvHw=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
@@ -18,6 +20,8 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gordonklaus/ineffassign v0.0.0-20190601041439-ed7b1b5ee0f8 h1:ehVe1P3MbhHjeN/Rn66N2fGLrP85XXO1uxpLhv0jtX8=
github.com/gordonklaus/ineffassign v0.0.0-20190601041439-ed7b1b5ee0f8/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
@@ -41,15 +45,25 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd h1:/e+gpKk9r3dJobndpTytxS2gOy6m5uvpg+ISQoEcusQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191031220737-6d8f1af9ccc0 h1:+o3suEKE/4hCUt6qjV8SDcVZhz2dO8UWlHliCa+4bvg=
golang.org/x/tools v0.0.0-20191031220737-6d8f1af9ccc0/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

+ 1
- 1
peer/cwtch_peer.go View File

@@ -320,7 +320,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
func (cp *cwtchPeer) Listen() {
log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{}))
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion}))
}

// StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager)

+ 3
- 0
testing/cwtch_peer_server_integration_test.go View File

@@ -2,6 +2,7 @@ package testing

import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/app/utils"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/event/bridge"
@@ -163,6 +164,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
appClient.CreatePeer("carol", "asdfasdf")

alice := utils.WaitGetPeer(app, "alice")
app.AddPeerPlugin(alice.GetProfile().Onion, plugins.NETWORKCHECK)
fmt.Println("Alice created:", alice.GetProfile().Onion)
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite})

@@ -409,6 +411,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Shutting down ACN...")
acn.Close()
time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostACN := runtime.NumGoroutine()

// Printing out the current goroutines

Loading…
Cancel
Save