2019-08-14 20:56:45 +00:00
|
|
|
package event
|
|
|
|
|
2019-11-12 20:39:55 +00:00
|
|
|
import "sync"
|
|
|
|
|
2019-08-14 20:56:45 +00:00
|
|
|
type queue struct {
|
|
|
|
infChan infiniteChannel
|
2019-11-12 20:39:55 +00:00
|
|
|
lock sync.Mutex
|
|
|
|
closed bool
|
2019-08-14 20:56:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type simpleQueue struct {
|
|
|
|
eventChannel chan Event
|
2019-11-12 20:39:55 +00:00
|
|
|
lock sync.Mutex
|
|
|
|
closed bool
|
2019-08-14 20:56:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
|
|
|
|
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
|
|
|
|
// the event.Manager.
|
|
|
|
type Queue interface {
|
2019-11-12 20:39:55 +00:00
|
|
|
Publish(event Event)
|
2019-08-14 20:56:45 +00:00
|
|
|
Next() *Event
|
|
|
|
Shutdown()
|
2019-11-12 20:39:55 +00:00
|
|
|
OutChan() <-chan Event
|
2019-08-14 20:56:45 +00:00
|
|
|
Len() int
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewQueue initializes an event.Queue
|
|
|
|
func NewQueue() Queue {
|
|
|
|
queue := &queue{infChan: *newInfiniteChannel()}
|
|
|
|
return queue
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewSimpleQueue initializes an event.Queue of the given buffer size.
|
|
|
|
func NewSimpleQueue(buffer int) Queue {
|
|
|
|
queue := new(simpleQueue)
|
|
|
|
queue.eventChannel = make(chan Event, buffer)
|
|
|
|
return queue
|
|
|
|
}
|
|
|
|
|
2019-11-12 20:39:55 +00:00
|
|
|
func (sq *simpleQueue) inChan() chan<- Event {
|
2019-08-14 20:56:45 +00:00
|
|
|
return sq.eventChannel
|
|
|
|
}
|
|
|
|
|
|
|
|
func (sq *simpleQueue) OutChan() <-chan Event {
|
|
|
|
return sq.eventChannel
|
|
|
|
}
|
|
|
|
|
|
|
|
// Backlog returns the length of the queue backlog
|
|
|
|
func (sq *simpleQueue) Len() int {
|
|
|
|
return len(sq.eventChannel)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns the next available event from the front of the queue
|
|
|
|
func (sq *simpleQueue) Next() *Event {
|
|
|
|
event := <-sq.eventChannel
|
|
|
|
return &event
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown closes our eventChannel
|
|
|
|
func (sq *simpleQueue) Shutdown() {
|
2019-11-12 20:39:55 +00:00
|
|
|
sq.lock.Lock()
|
|
|
|
sq.closed = true
|
2019-08-14 20:56:45 +00:00
|
|
|
close(sq.eventChannel)
|
2019-11-12 20:39:55 +00:00
|
|
|
sq.lock.Unlock()
|
2019-08-14 20:56:45 +00:00
|
|
|
}
|
|
|
|
|
2019-11-12 20:39:55 +00:00
|
|
|
// Shutdown closes our eventChannel
|
|
|
|
func (sq *simpleQueue) Publish(event Event) {
|
|
|
|
sq.lock.Lock()
|
|
|
|
if !sq.closed {
|
|
|
|
sq.inChan() <- event
|
|
|
|
}
|
|
|
|
sq.lock.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iq *queue) inChan() chan<- Event {
|
2019-08-14 20:56:45 +00:00
|
|
|
return iq.infChan.In()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iq *queue) OutChan() <-chan Event {
|
|
|
|
return iq.infChan.Out()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Out returns the next available event from the front of the queue
|
|
|
|
func (iq *queue) Next() *Event {
|
|
|
|
event := <-iq.infChan.Out()
|
|
|
|
return &event
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iq *queue) Len() int {
|
|
|
|
return iq.infChan.Len()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown closes our eventChannel
|
|
|
|
func (iq *queue) Shutdown() {
|
2019-11-12 20:39:55 +00:00
|
|
|
iq.lock.Lock()
|
|
|
|
iq.closed = true
|
2019-08-14 20:56:45 +00:00
|
|
|
iq.infChan.Close()
|
2019-11-12 20:39:55 +00:00
|
|
|
iq.lock.Unlock()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iq *queue) Publish(event Event) {
|
|
|
|
iq.lock.Lock()
|
|
|
|
if !iq.closed {
|
|
|
|
iq.inChan() <- event
|
|
|
|
}
|
|
|
|
iq.lock.Unlock()
|
2019-08-14 20:56:45 +00:00
|
|
|
}
|