Fix goroutine leak in Network Check Plugin + remove simpleQueue
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2022-04-14 13:18:08 -07:00
parent 9896961b40
commit 75703bf359
6 changed files with 54 additions and 67 deletions

View File

@ -1,6 +1,7 @@
package plugins
import (
"context"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"fmt"
@ -161,16 +162,20 @@ type TimeoutPolicy time.Duration
// by the time specified by TimeoutPolicy
func (tp *TimeoutPolicy) ExecuteAction(action func() error) error {
c := make(chan error)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(*tp))
defer cancel()
// channel is buffered- this is important!
c := make(chan error, 1)
go func() {
// this write is non-blocking as this goroutine has sole access to the channel
c <- action()
}()
tick := time.NewTicker(time.Duration(*tp))
select {
case <-tick.C:
return fmt.Errorf("ActionTimedOutError")
case err := <-c:
return err
case <-ctx.Done():
return fmt.Errorf("ActionTimedOutError")
}
}

View File

@ -0,0 +1,42 @@
package plugins
import (
"runtime"
"testing"
"time"
)
// Test timeout policy, checking for goroutine leaks in addition to successful timeouts
func TestTimeoutPolicy(t *testing.T) {
gonumStart := runtime.NumGoroutine()
tp := TimeoutPolicy(time.Second)
// test with timeout
err := tp.ExecuteAction(func() error {
time.Sleep(time.Second * 2)
return nil
})
if err == nil {
t.Fatalf("timeout should have occurred")
}
// test without timeout
err = tp.ExecuteAction(func() error {
return nil
})
if err != nil {
t.Fatalf("timeout should not have occurred")
}
// wait for gorutine clean up
time.Sleep(time.Second * 4)
// final check
gonumEnd := runtime.NumGoroutine()
if gonumStart != gonumEnd {
t.Fatalf("goroutine leak in execute action")
}
}

View File

@ -10,12 +10,6 @@ type queue struct {
closed bool
}
type simpleQueue struct {
eventChannel chan Event
lock sync.Mutex
closed bool
}
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
// the event.Manager.
@ -33,49 +27,6 @@ func NewQueue() Queue {
return queue
}
// NewSimpleQueue initializes an event.Queue of the given buffer size.
func NewSimpleQueue(buffer int) Queue {
queue := new(simpleQueue)
queue.eventChannel = make(chan Event, buffer)
return queue
}
func (sq *simpleQueue) inChan() chan<- Event {
return sq.eventChannel
}
func (sq *simpleQueue) OutChan() <-chan Event {
return sq.eventChannel
}
// Backlog returns the length of the queue backlog
func (sq *simpleQueue) Len() int {
return len(sq.eventChannel)
}
// Next returns the next available event from the front of the queue
func (sq *simpleQueue) Next() Event {
event := <-sq.eventChannel
return event
}
// Shutdown closes our eventChannel
func (sq *simpleQueue) Shutdown() {
sq.lock.Lock()
sq.closed = true
close(sq.eventChannel)
sq.lock.Unlock()
}
// Shutdown closes our eventChannel
func (sq *simpleQueue) Publish(event Event) {
sq.lock.Lock()
if !sq.closed {
sq.inChan() <- event
}
sq.lock.Unlock()
}
func (iq *queue) inChan() chan<- Event {
return iq.infChan.In()
}

View File

@ -2,7 +2,6 @@ package event
import (
"git.openprivacy.ca/openprivacy/log"
"sync"
"testing"
"time"
)
@ -13,7 +12,7 @@ func TestEventManager(t *testing.T) {
// We need to make this buffer at least 1, otherwise we will log an error!
testChan := make(chan Event, 1)
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
simpleQueue := NewQueue()
eventManager.Subscribe("TEST", simpleQueue)
eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}})
@ -27,17 +26,6 @@ func TestEventManager(t *testing.T) {
eventManager.Shutdown()
}
// Most basic Manager Test, Initialize, Subscribe, Publish, Receive
func TestEventManagerOverflow(t *testing.T) {
eventManager := NewEventManager()
// Explicitly setting this to 0 log an error!
testChan := make(chan Event)
simpleQueue := &simpleQueue{testChan, sync.Mutex{}, false}
eventManager.Subscribe("TEST", simpleQueue)
eventManager.Publish(Event{EventType: "TEST"})
}
func TestEventManagerMultiple(t *testing.T) {
log.SetLevel(log.LevelDebug)
eventManager := NewEventManager()

View File

@ -17,7 +17,7 @@ gofmt -l -s -w .
# ineffassign (https://github.com/gordonklaus/ineffassign)
echo "Checking for ineffectual assignment of errors (unchecked errors...)"
ineffassign ./...
ineffassign ./..
# misspell (https://github.com/client9/misspell/cmd/misspell)
echo "Checking for misspelled words..."

View File

@ -3,6 +3,7 @@
set -e
pwd
GORACE="haltonerror=1"
go test -race ${1} -coverprofile=plugins.cover.out -v ./app/plugins
go test -race ${1} -coverprofile=model.cover.out -v ./model
go test -race ${1} -coverprofile=event.cover.out -v ./event
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1