diff --git a/app/plugins/networkCheck.go b/app/plugins/networkCheck.go index 27598b8..3aec64b 100644 --- a/app/plugins/networkCheck.go +++ b/app/plugins/networkCheck.go @@ -1,6 +1,7 @@ package plugins import ( + "context" "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/connections" "fmt" @@ -161,16 +162,20 @@ type TimeoutPolicy time.Duration // by the time specified by TimeoutPolicy func (tp *TimeoutPolicy) ExecuteAction(action func() error) error { - c := make(chan error) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*tp)) + defer cancel() + + // channel is buffered- this is important! + c := make(chan error, 1) go func() { + // this write is non-blocking as this goroutine has sole access to the channel c <- action() }() - tick := time.NewTicker(time.Duration(*tp)) select { - case <-tick.C: - return fmt.Errorf("ActionTimedOutError") case err := <-c: return err + case <-ctx.Done(): + return fmt.Errorf("ActionTimedOutError") } } diff --git a/app/plugins/timeout_policy_test.go b/app/plugins/timeout_policy_test.go new file mode 100644 index 0000000..8f4383a --- /dev/null +++ b/app/plugins/timeout_policy_test.go @@ -0,0 +1,42 @@ +package plugins + +import ( + "runtime" + "testing" + "time" +) + +// Test timeout policy, checking for goroutine leaks in addition to successful timeouts +func TestTimeoutPolicy(t *testing.T) { + + gonumStart := runtime.NumGoroutine() + + tp := TimeoutPolicy(time.Second) + + // test with timeout + err := tp.ExecuteAction(func() error { + time.Sleep(time.Second * 2) + return nil + }) + if err == nil { + t.Fatalf("timeout should have occurred") + } + + // test without timeout + err = tp.ExecuteAction(func() error { + return nil + }) + + if err != nil { + t.Fatalf("timeout should not have occurred") + } + + // wait for gorutine clean up + time.Sleep(time.Second * 4) + + // final check + gonumEnd := runtime.NumGoroutine() + if gonumStart != gonumEnd { + t.Fatalf("goroutine leak in execute action") + } +} diff --git a/event/eventQueue.go b/event/eventQueue.go index a338d9d..8871ada 100644 --- a/event/eventQueue.go +++ b/event/eventQueue.go @@ -10,12 +10,6 @@ type queue struct { closed bool } -type simpleQueue struct { - eventChannel chan Event - lock sync.Mutex - closed bool -} - // Queue is a wrapper around a channel for handling Events in a consistent way across subsystems. // The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from // the event.Manager. @@ -33,49 +27,6 @@ func NewQueue() Queue { return queue } -// NewSimpleQueue initializes an event.Queue of the given buffer size. -func NewSimpleQueue(buffer int) Queue { - queue := new(simpleQueue) - queue.eventChannel = make(chan Event, buffer) - return queue -} - -func (sq *simpleQueue) inChan() chan<- Event { - return sq.eventChannel -} - -func (sq *simpleQueue) OutChan() <-chan Event { - return sq.eventChannel -} - -// Backlog returns the length of the queue backlog -func (sq *simpleQueue) Len() int { - return len(sq.eventChannel) -} - -// Next returns the next available event from the front of the queue -func (sq *simpleQueue) Next() Event { - event := <-sq.eventChannel - return event -} - -// Shutdown closes our eventChannel -func (sq *simpleQueue) Shutdown() { - sq.lock.Lock() - sq.closed = true - close(sq.eventChannel) - sq.lock.Unlock() -} - -// Shutdown closes our eventChannel -func (sq *simpleQueue) Publish(event Event) { - sq.lock.Lock() - if !sq.closed { - sq.inChan() <- event - } - sq.lock.Unlock() -} - func (iq *queue) inChan() chan<- Event { return iq.infChan.In() } diff --git a/event/eventmanager_test.go b/event/eventmanager_test.go index 3111267..5d7f7c5 100644 --- a/event/eventmanager_test.go +++ b/event/eventmanager_test.go @@ -2,7 +2,6 @@ package event import ( "git.openprivacy.ca/openprivacy/log" - "sync" "testing" "time" ) @@ -13,7 +12,7 @@ func TestEventManager(t *testing.T) { // We need to make this buffer at least 1, otherwise we will log an error! testChan := make(chan Event, 1) - simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false} + simpleQueue := NewQueue() eventManager.Subscribe("TEST", simpleQueue) eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}}) @@ -27,17 +26,6 @@ func TestEventManager(t *testing.T) { eventManager.Shutdown() } -// Most basic Manager Test, Initialize, Subscribe, Publish, Receive -func TestEventManagerOverflow(t *testing.T) { - eventManager := NewEventManager() - - // Explicitly setting this to 0 log an error! - testChan := make(chan Event) - simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false} - eventManager.Subscribe("TEST", simpleQueue) - eventManager.Publish(Event{EventType: "TEST"}) -} - func TestEventManagerMultiple(t *testing.T) { log.SetLevel(log.LevelDebug) eventManager := NewEventManager() diff --git a/testing/quality.sh b/testing/quality.sh index 43e7dda..5f6427f 100755 --- a/testing/quality.sh +++ b/testing/quality.sh @@ -17,7 +17,7 @@ gofmt -l -s -w . # ineffassign (https://github.com/gordonklaus/ineffassign) echo "Checking for ineffectual assignment of errors (unchecked errors...)" -ineffassign ./... +ineffassign ./.. # misspell (https://github.com/client9/misspell/cmd/misspell) echo "Checking for misspelled words..." diff --git a/testing/tests.sh b/testing/tests.sh index 4c0c958..bfd80a0 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -3,6 +3,7 @@ set -e pwd GORACE="haltonerror=1" +go test -race ${1} -coverprofile=plugins.cover.out -v ./app/plugins go test -race ${1} -coverprofile=model.cover.out -v ./model go test -race ${1} -coverprofile=event.cover.out -v ./event go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1