Fix goroutine leak in Network Check Plugin + remove simpleQueue #441
|
@ -1,6 +1,7 @@
|
||||||
package plugins
|
package plugins
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
"cwtch.im/cwtch/protocol/connections"
|
"cwtch.im/cwtch/protocol/connections"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -161,16 +162,20 @@ type TimeoutPolicy time.Duration
|
||||||
// by the time specified by TimeoutPolicy
|
// by the time specified by TimeoutPolicy
|
||||||
func (tp *TimeoutPolicy) ExecuteAction(action func() error) error {
|
func (tp *TimeoutPolicy) ExecuteAction(action func() error) error {
|
||||||
|
|
||||||
c := make(chan error)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*tp))
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// channel is buffered- this is important!
|
||||||
|
c := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
|
// this write is non-blocking as this goroutine has sole access to the channel
|
||||||
c <- action()
|
c <- action()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tick := time.NewTicker(time.Duration(*tp))
|
|
||||||
select {
|
select {
|
||||||
case <-tick.C:
|
|
||||||
return fmt.Errorf("ActionTimedOutError")
|
|
||||||
case err := <-c:
|
case err := <-c:
|
||||||
return err
|
return err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fmt.Errorf("ActionTimedOutError")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test timeout policy, checking for goroutine leaks in addition to successful timeouts
|
||||||
|
func TestTimeoutPolicy(t *testing.T) {
|
||||||
|
|
||||||
|
gonumStart := runtime.NumGoroutine()
|
||||||
|
|
||||||
|
tp := TimeoutPolicy(time.Second)
|
||||||
|
|
||||||
|
// test with timeout
|
||||||
|
err := tp.ExecuteAction(func() error {
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("timeout should have occurred")
|
||||||
|
}
|
||||||
|
|
||||||
|
// test without timeout
|
||||||
|
err = tp.ExecuteAction(func() error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("timeout should not have occurred")
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for gorutine clean up
|
||||||
|
time.Sleep(time.Second * 4)
|
||||||
|
|
||||||
|
// final check
|
||||||
|
gonumEnd := runtime.NumGoroutine()
|
||||||
|
if gonumStart != gonumEnd {
|
||||||
|
t.Fatalf("goroutine leak in execute action")
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,12 +10,6 @@ type queue struct {
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type simpleQueue struct {
|
|
||||||
eventChannel chan Event
|
|
||||||
lock sync.Mutex
|
|
||||||
closed bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
|
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
|
||||||
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
|
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
|
||||||
// the event.Manager.
|
// the event.Manager.
|
||||||
|
@ -33,49 +27,6 @@ func NewQueue() Queue {
|
||||||
return queue
|
return queue
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSimpleQueue initializes an event.Queue of the given buffer size.
|
|
||||||
func NewSimpleQueue(buffer int) Queue {
|
|
||||||
queue := new(simpleQueue)
|
|
||||||
queue.eventChannel = make(chan Event, buffer)
|
|
||||||
return queue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sq *simpleQueue) inChan() chan<- Event {
|
|
||||||
return sq.eventChannel
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sq *simpleQueue) OutChan() <-chan Event {
|
|
||||||
return sq.eventChannel
|
|
||||||
}
|
|
||||||
|
|
||||||
// Backlog returns the length of the queue backlog
|
|
||||||
func (sq *simpleQueue) Len() int {
|
|
||||||
return len(sq.eventChannel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next returns the next available event from the front of the queue
|
|
||||||
func (sq *simpleQueue) Next() Event {
|
|
||||||
event := <-sq.eventChannel
|
|
||||||
return event
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown closes our eventChannel
|
|
||||||
func (sq *simpleQueue) Shutdown() {
|
|
||||||
sq.lock.Lock()
|
|
||||||
sq.closed = true
|
|
||||||
close(sq.eventChannel)
|
|
||||||
sq.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown closes our eventChannel
|
|
||||||
func (sq *simpleQueue) Publish(event Event) {
|
|
||||||
sq.lock.Lock()
|
|
||||||
if !sq.closed {
|
|
||||||
sq.inChan() <- event
|
|
||||||
}
|
|
||||||
sq.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (iq *queue) inChan() chan<- Event {
|
func (iq *queue) inChan() chan<- Event {
|
||||||
return iq.infChan.In()
|
return iq.infChan.In()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package event
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -12,12 +11,11 @@ func TestEventManager(t *testing.T) {
|
||||||
eventManager := NewEventManager()
|
eventManager := NewEventManager()
|
||||||
|
|
||||||
// We need to make this buffer at least 1, otherwise we will log an error!
|
// We need to make this buffer at least 1, otherwise we will log an error!
|
||||||
testChan := make(chan Event, 1)
|
simpleQueue := NewQueue()
|
||||||
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
|
|
||||||
eventManager.Subscribe("TEST", simpleQueue)
|
eventManager.Subscribe("TEST", simpleQueue)
|
||||||
eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}})
|
eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}})
|
||||||
|
|
||||||
event := <-testChan
|
event := simpleQueue.Next()
|
||||||
if event.EventType == "TEST" && event.Data["Value"] == "Hello World" {
|
if event.EventType == "TEST" && event.Data["Value"] == "Hello World" {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -27,17 +25,6 @@ func TestEventManager(t *testing.T) {
|
||||||
eventManager.Shutdown()
|
eventManager.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Most basic Manager Test, Initialize, Subscribe, Publish, Receive
|
|
||||||
func TestEventManagerOverflow(t *testing.T) {
|
|
||||||
eventManager := NewEventManager()
|
|
||||||
|
|
||||||
// Explicitly setting this to 0 log an error!
|
|
||||||
testChan := make(chan Event)
|
|
||||||
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
|
|
||||||
eventManager.Subscribe("TEST", simpleQueue)
|
|
||||||
eventManager.Publish(Event{EventType: "TEST"})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEventManagerMultiple(t *testing.T) {
|
func TestEventManagerMultiple(t *testing.T) {
|
||||||
log.SetLevel(log.LevelDebug)
|
log.SetLevel(log.LevelDebug)
|
||||||
eventManager := NewEventManager()
|
eventManager := NewEventManager()
|
||||||
|
|
|
@ -17,7 +17,7 @@ gofmt -l -s -w .
|
||||||
|
|
||||||
# ineffassign (https://github.com/gordonklaus/ineffassign)
|
# ineffassign (https://github.com/gordonklaus/ineffassign)
|
||||||
echo "Checking for ineffectual assignment of errors (unchecked errors...)"
|
echo "Checking for ineffectual assignment of errors (unchecked errors...)"
|
||||||
ineffassign ./...
|
ineffassign ./..
|
||||||
|
|
||||||
# misspell (https://github.com/client9/misspell/cmd/misspell)
|
# misspell (https://github.com/client9/misspell/cmd/misspell)
|
||||||
echo "Checking for misspelled words..."
|
echo "Checking for misspelled words..."
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
set -e
|
set -e
|
||||||
pwd
|
pwd
|
||||||
GORACE="haltonerror=1"
|
GORACE="haltonerror=1"
|
||||||
|
go test -race ${1} -coverprofile=plugins.cover.out -v ./app/plugins
|
||||||
go test -race ${1} -coverprofile=model.cover.out -v ./model
|
go test -race ${1} -coverprofile=model.cover.out -v ./model
|
||||||
go test -race ${1} -coverprofile=event.cover.out -v ./event
|
go test -race ${1} -coverprofile=event.cover.out -v ./event
|
||||||
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
|
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
|
||||||
|
|
Loading…
Reference in New Issue