package event import ( "sync" ) type queue struct { infChan infiniteChannel lock sync.Mutex closed bool } type simpleQueue struct { eventChannel chan Event lock sync.Mutex closed bool } // Queue is a wrapper around a channel for handling Events in a consistent way across subsystems. // The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from // the event.Manager. type Queue interface { Publish(event Event) Next() Event Shutdown() OutChan() <-chan Event Len() int } // NewQueue initializes an event.Queue func NewQueue() Queue { queue := &queue{infChan: *newInfiniteChannel()} return queue } // NewSimpleQueue initializes an event.Queue of the given buffer size. func NewSimpleQueue(buffer int) Queue { queue := new(simpleQueue) queue.eventChannel = make(chan Event, buffer) return queue } func (sq *simpleQueue) inChan() chan<- Event { return sq.eventChannel } func (sq *simpleQueue) OutChan() <-chan Event { return sq.eventChannel } // Backlog returns the length of the queue backlog func (sq *simpleQueue) Len() int { return len(sq.eventChannel) } // Next returns the next available event from the front of the queue func (sq *simpleQueue) Next() Event { event := <-sq.eventChannel return event } // Shutdown closes our eventChannel func (sq *simpleQueue) Shutdown() { sq.lock.Lock() sq.closed = true close(sq.eventChannel) sq.lock.Unlock() } // Shutdown closes our eventChannel func (sq *simpleQueue) Publish(event Event) { sq.lock.Lock() if !sq.closed { sq.inChan() <- event } sq.lock.Unlock() } func (iq *queue) inChan() chan<- Event { return iq.infChan.In() } func (iq *queue) OutChan() <-chan Event { return iq.infChan.Out() } // Out returns the next available event from the front of the queue func (iq *queue) Next() Event { event := <-iq.infChan.Out() return event } func (iq *queue) Len() int { return iq.infChan.Len() } // Shutdown closes our eventChannel func (iq *queue) Shutdown() { iq.lock.Lock() iq.closed = true iq.infChan.Close() iq.lock.Unlock() } func (iq *queue) Publish(event Event) { iq.lock.Lock() if !iq.closed { iq.inChan() <- event } iq.lock.Unlock() }