forked from cwtch.im/cwtch
1
0
Fork 0
cwtch/event/eventQueue.go

81 lines
1.7 KiB
Go

package event
type queue struct {
infChan infiniteChannel
}
type simpleQueue struct {
eventChannel chan Event
}
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
// the event.Manager.
type Queue interface {
InChan() chan<- Event
OutChan() <-chan Event
Next() *Event
Shutdown()
Len() int
}
// NewQueue initializes an event.Queue
func NewQueue() Queue {
queue := &queue{infChan: *newInfiniteChannel()}
return queue
}
// NewSimpleQueue initializes an event.Queue of the given buffer size.
func NewSimpleQueue(buffer int) Queue {
queue := new(simpleQueue)
queue.eventChannel = make(chan Event, buffer)
return queue
}
func (sq *simpleQueue) InChan() chan<- Event {
return sq.eventChannel
}
func (sq *simpleQueue) OutChan() <-chan Event {
return sq.eventChannel
}
// Backlog returns the length of the queue backlog
func (sq *simpleQueue) Len() int {
return len(sq.eventChannel)
}
// Next returns the next available event from the front of the queue
func (sq *simpleQueue) Next() *Event {
event := <-sq.eventChannel
return &event
}
// Shutdown closes our eventChannel
func (sq *simpleQueue) Shutdown() {
close(sq.eventChannel)
}
func (iq *queue) InChan() chan<- Event {
return iq.infChan.In()
}
func (iq *queue) OutChan() <-chan Event {
return iq.infChan.Out()
}
// Out returns the next available event from the front of the queue
func (iq *queue) Next() *Event {
event := <-iq.infChan.Out()
return &event
}
func (iq *queue) Len() int {
return iq.infChan.Len()
}
// Shutdown closes our eventChannel
func (iq *queue) Shutdown() {
iq.infChan.Close()
}