forked from cwtch.im/cwtch
1
0
Fork 0
cwtch/event/eventQueue.go

111 lines
2.2 KiB
Go

package event
import "sync"
type queue struct {
infChan infiniteChannel
lock sync.Mutex
closed bool
}
type simpleQueue struct {
eventChannel chan Event
lock sync.Mutex
closed bool
}
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
// the event.Manager.
type Queue interface {
Publish(event Event)
Next() *Event
Shutdown()
OutChan() <-chan Event
Len() int
}
// NewQueue initializes an event.Queue
func NewQueue() Queue {
queue := &queue{infChan: *newInfiniteChannel()}
return queue
}
// NewSimpleQueue initializes an event.Queue of the given buffer size.
func NewSimpleQueue(buffer int) Queue {
queue := new(simpleQueue)
queue.eventChannel = make(chan Event, buffer)
return queue
}
func (sq *simpleQueue) inChan() chan<- Event {
return sq.eventChannel
}
func (sq *simpleQueue) OutChan() <-chan Event {
return sq.eventChannel
}
// Backlog returns the length of the queue backlog
func (sq *simpleQueue) Len() int {
return len(sq.eventChannel)
}
// Next returns the next available event from the front of the queue
func (sq *simpleQueue) Next() *Event {
event := <-sq.eventChannel
return &event
}
// Shutdown closes our eventChannel
func (sq *simpleQueue) Shutdown() {
sq.lock.Lock()
sq.closed = true
close(sq.eventChannel)
sq.lock.Unlock()
}
// Shutdown closes our eventChannel
func (sq *simpleQueue) Publish(event Event) {
sq.lock.Lock()
if !sq.closed {
sq.inChan() <- event
}
sq.lock.Unlock()
}
func (iq *queue) inChan() chan<- Event {
return iq.infChan.In()
}
func (iq *queue) OutChan() <-chan Event {
return iq.infChan.Out()
}
// Out returns the next available event from the front of the queue
func (iq *queue) Next() *Event {
event := <-iq.infChan.Out()
return &event
}
func (iq *queue) Len() int {
return iq.infChan.Len()
}
// Shutdown closes our eventChannel
func (iq *queue) Shutdown() {
iq.lock.Lock()
iq.closed = true
iq.infChan.Close()
iq.lock.Unlock()
}
func (iq *queue) Publish(event Event) {
iq.lock.Lock()
if !iq.closed {
iq.inChan() <- event
}
iq.lock.Unlock()
}