Browse Source

make event.Queue use internal infinite channels; make event.Manager not use failable writes

pull/270/head
Dan Ballard 3 months ago
parent
commit
bd75e44555

+ 3
- 3
app/cli/main.go View File

@@ -209,9 +209,9 @@ func completer(d prompt.Document) []prompt.Suggest {
}

func handleAppEvents(em event.Manager) {
queue := event.NewEventQueue(100)
em.Subscribe(event.NewPeer, queue.EventChannel)
em.Subscribe(event.PeerError, queue.EventChannel)
queue := event.NewQueue()
em.Subscribe(event.NewPeer, queue)
em.Subscribe(event.PeerError, queue)

for {
ev := queue.Next()

+ 2
- 2
app/peer/alice/alice.go View File

@@ -27,8 +27,8 @@ func main() {
alice := utils.WaitGetPeer(app, "alice")
app.LaunchPeers()
eventBus := app.GetEventBus(alice.GetProfile().Onion)
queue := event.NewEventQueue(100)
eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel)
queue := event.NewQueue()
eventBus.Subscribe(event.NewMessageFromPeer, queue)

// For every new Data Packet Alice received she will Print it out.
for {

+ 4
- 4
app/plugins/contactRetry.go View File

@@ -20,7 +20,7 @@ type peer struct {

type contactRetry struct {
bus event.Manager
queue *event.Queue
queue event.Queue

breakChan chan bool

@@ -29,7 +29,7 @@ type contactRetry struct {

// NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewContactRetry(bus event.Manager) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewEventQueue(1000), breakChan: make(chan bool), peers: sync.Map{}}
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), peers: sync.Map{}}
return cr
}

@@ -38,10 +38,10 @@ func (cr *contactRetry) Start() {
}

func (cr *contactRetry) run() {
cr.bus.Subscribe(event.PeerStateChange, cr.queue.EventChannel)
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
for {
select {
case e := <-cr.queue.EventChannel:
case e := <-cr.queue.OutChan():
switch e.EventType {
case event.PeerStateChange:
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]

+ 0
- 31
event/EventQueue.go View File

@@ -1,31 +0,0 @@
package event

// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
// the event.Manager.
type Queue struct {
EventChannel chan Event
}

// NewEventQueue initializes an event.Queue of the given buffer size.
func NewEventQueue(buffer int) *Queue {
queue := new(Queue)
queue.EventChannel = make(chan Event, buffer)
return queue
}

// Backlog returns the length of the queue backlog
func (eq *Queue) Backlog() int {
return len(eq.EventChannel)
}

// Next returns the next available event from the front of the queue
func (eq *Queue) Next() (event Event) {
event = <-eq.EventChannel
return
}

// Shutdown closes our EventChannel
func (eq *Queue) Shutdown() {
close(eq.EventChannel)
}

+ 5
- 2
event/common.go View File

@@ -33,9 +33,12 @@ const (
// GroupID
AcceptGroupInvite = Type("AcceptGroupInvite")

SendMessageToGroup = Type("SendMessagetoGroup")
SendMessageToGroup = Type("SendMessagetoGroup")

//Ciphertext, Signature:
EncryptedGroupMessage = Type("EncryptedGroupMessage")
NewMessageFromGroup = Type("NewMessageFromGroup")
//TimestampReceived, TimestampSent, Data(Message), GroupID, Signature, PreviousSignature, RemotePeer
NewMessageFromGroup = Type("NewMessageFromGroup")

// an error was encountered trying to send a particular Message to a group
// attributes:

+ 80
- 0
event/eventQueue.go View File

@@ -0,0 +1,80 @@
package event

type queue struct {
infChan infiniteChannel
}

type simpleQueue struct {
eventChannel chan Event
}

// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
// the event.Manager.
type Queue interface {
InChan() chan<- Event
OutChan() <-chan Event
Next() *Event
Shutdown()
Len() int
}

// NewQueue initializes an event.Queue
func NewQueue() Queue {
queue := &queue{infChan: *newInfiniteChannel()}
return queue
}

// NewSimpleQueue initializes an event.Queue of the given buffer size.
func NewSimpleQueue(buffer int) Queue {
queue := new(simpleQueue)
queue.eventChannel = make(chan Event, buffer)
return queue
}

func (sq *simpleQueue) InChan() chan<- Event {
return sq.eventChannel
}

func (sq *simpleQueue) OutChan() <-chan Event {
return sq.eventChannel
}

// Backlog returns the length of the queue backlog
func (sq *simpleQueue) Len() int {
return len(sq.eventChannel)
}

// Next returns the next available event from the front of the queue
func (sq *simpleQueue) Next() *Event {
event := <-sq.eventChannel
return &event
}

// Shutdown closes our eventChannel
func (sq *simpleQueue) Shutdown() {
close(sq.eventChannel)
}

func (iq *queue) InChan() chan<- Event {
return iq.infChan.In()
}

func (iq *queue) OutChan() <-chan Event {
return iq.infChan.Out()
}

// Out returns the next available event from the front of the queue
func (iq *queue) Next() *Event {
event := <-iq.infChan.Out()
return &event
}

func (iq *queue) Len() int {
return iq.infChan.Len()
}

// Shutdown closes our eventChannel
func (iq *queue) Shutdown() {
iq.infChan.Close()
}

+ 6
- 13
event/eventmanager.go View File

@@ -1,7 +1,6 @@
package event

import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"sync"
)
@@ -33,7 +32,7 @@ func NewEventList(eventType Type, args ...interface{}) Event {

// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
type manager struct {
subscribers map[Type][]chan Event
subscribers map[Type][]chan<- Event
events chan Event
mapMutex sync.Mutex
internal chan bool
@@ -42,7 +41,7 @@ type manager struct {

// Manager is an interface for an event bus
type Manager interface {
Subscribe(Type, chan Event)
Subscribe(Type, Queue)
Publish(Event)
PublishLocal(Event)
Shutdown()
@@ -57,7 +56,7 @@ func NewEventManager() Manager {

// Initialize sets up the Manager.
func (em *manager) initialize() {
em.subscribers = make(map[Type][]chan Event)
em.subscribers = make(map[Type][]chan<- Event)
em.events = make(chan Event)
em.internal = make(chan bool)
em.closed = false
@@ -66,10 +65,10 @@ func (em *manager) initialize() {

// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
// will be sent to the eventChannel.
func (em *manager) Subscribe(eventType Type, eventChannel chan Event) {
func (em *manager) Subscribe(eventType Type, queue Queue) {
em.mapMutex.Lock()
defer em.mapMutex.Unlock()
em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel)
em.subscribers[eventType] = append(em.subscribers[eventType], queue.InChan())
}

// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
@@ -101,13 +100,7 @@ func (em *manager) eventBus() {

// Send the event to any subscribers to that event type
for _, subscriber := range subscribers {
select {
case subscriber <- event:
log.Debugf("Sending %v to %v", event.EventType, subscriber)
default:
log.Errorf("Failed to send %v to %v. The subsystem might be running too slow!", event.EventType, subscriber)
}

subscriber <- event
}
}


+ 15
- 13
event/eventmanager_test.go View File

@@ -12,7 +12,8 @@ func TestEventManager(t *testing.T) {

// We need to make this buffer at least 1, otherwise we will log an error!
testChan := make(chan Event, 1)
eventManager.Subscribe("TEST", testChan)
simpleQueue := &simpleQueue{testChan}
eventManager.Subscribe("TEST", simpleQueue)
eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}})

event := <-testChan
@@ -31,7 +32,8 @@ func TestEventManagerOverflow(t *testing.T) {

// Explicitly setting this to 0 log an error!
testChan := make(chan Event)
eventManager.Subscribe("TEST", testChan)
simpleQueue := &simpleQueue{testChan}
eventManager.Subscribe("TEST", simpleQueue)
eventManager.Publish(Event{EventType: "TEST"})
}

@@ -39,15 +41,15 @@ func TestEventManagerMultiple(t *testing.T) {
log.SetLevel(log.LevelDebug)
eventManager := NewEventManager()

groupEventQueue := NewEventQueue(10)
peerEventQueue := NewEventQueue(10)
allEventQueue := NewEventQueue(10)
groupEventQueue := NewQueue()
peerEventQueue := NewQueue()
allEventQueue := NewQueue()

eventManager.Subscribe("PeerEvent", peerEventQueue.EventChannel)
eventManager.Subscribe("GroupEvent", groupEventQueue.EventChannel)
eventManager.Subscribe("PeerEvent", allEventQueue.EventChannel)
eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel)
eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel)
eventManager.Subscribe("PeerEvent", peerEventQueue)
eventManager.Subscribe("GroupEvent", groupEventQueue)
eventManager.Subscribe("PeerEvent", allEventQueue)
eventManager.Subscribe("GroupEvent", allEventQueue)
eventManager.Subscribe("ErrorEvent", allEventQueue)

eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}})
eventManager.Publish(Event{EventType: "GroupEvent", Data: map[Field]string{"Value": "Hello World Group"}})
@@ -63,9 +65,9 @@ func TestEventManagerMultiple(t *testing.T) {

time.Sleep(time.Second)

assertLength(groupEventQueue.Backlog(), 1, "Group Event Queue Length")
assertLength(peerEventQueue.Backlog(), 2, "Peer Event Queue Length")
assertLength(allEventQueue.Backlog(), 4, "All Event Queue Length")
assertLength(groupEventQueue.Len(), 1, "Group Event Queue Length")
assertLength(peerEventQueue.Len(), 2, "Peer Event Queue Length")
assertLength(allEventQueue.Len(), 4, "All Event Queue Length")

checkEvent := func(eventType Type, expected Type, label string) {
if eventType != expected {

+ 2
- 2
event/eventmanageripc.go View File

@@ -29,8 +29,8 @@ func (ipcm *ipcManager) PublishLocal(ev Event) {
ipcm.manager.Publish(ev)
}

func (ipcm *ipcManager) Subscribe(eventType Type, eventChan chan Event) {
ipcm.manager.Subscribe(eventType, eventChan)
func (ipcm *ipcManager) Subscribe(eventType Type, queue Queue) {
ipcm.manager.Subscribe(eventType, queue)
}

func (ipcm *ipcManager) Shutdown() {

+ 73
- 0
event/infinitechannel.go View File

@@ -0,0 +1,73 @@
package event

/*
This package is taken from https://github.com/eapache/channels
as per their suggestion we are not importing the entire package and instead cherry picking and adapting what is needed

It is covered by the MIT License https://github.com/eapache/channels/blob/master/LICENSE
*/

// infiniteChannel implements the Channel interface with an infinite buffer between the input and the output.
type infiniteChannel struct {
input, output chan Event
length chan int
buffer *infiniteQueue
}

func newInfiniteChannel() *infiniteChannel {
ch := &infiniteChannel{
input: make(chan Event),
output: make(chan Event),
length: make(chan int),
buffer: newInfinitQueue(),
}
go ch.infiniteBuffer()
return ch
}

func (ch *infiniteChannel) In() chan<- Event {
return ch.input
}

func (ch *infiniteChannel) Out() <-chan Event {
return ch.output
}

func (ch *infiniteChannel) Len() int {
return <-ch.length
}

func (ch *infiniteChannel) Close() {
close(ch.input)
}

func (ch *infiniteChannel) infiniteBuffer() {
var input, output chan Event
var next Event
input = ch.input

for input != nil || output != nil {
select {
case elem, open := <-input:
if open {
ch.buffer.Add(elem)
} else {
input = nil
}
case output <- next:
ch.buffer.Remove()
case ch.length <- ch.buffer.Length():
}

if ch.buffer.Length() > 0 {
output = ch.output
next = ch.buffer.Peek()
} else {
output = nil
//next = nil
}
}

close(ch.output)
close(ch.length)
}

+ 108
- 0
event/infinitequeue.go View File

@@ -0,0 +1,108 @@
package event

/*
This package is taken from https://github.com/eapache/channels
as per their suggestion we are not importing the entire package and instead cherry picking and adapting what is needed

It is covered by the MIT License https://github.com/eapache/channels/blob/master/LICENSE
*/
/*
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
Using this instead of other, simpler, queue implementations (slice+append or linked list) provides
substantial memory and time benefits, and fewer GC pauses.
The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe.
*/

// minQueueLen is smallest capacity that queue may have.
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
const minQueueLen = 16

// Queue represents a single instance of the queue data structure.
type infiniteQueue struct {
buf []Event
head, tail, count int
}

// New constructs and returns a new Queue.
func newInfinitQueue() *infiniteQueue {
return &infiniteQueue{
buf: make([]Event, minQueueLen),
}
}

// Length returns the number of elements currently stored in the queue.
func (q *infiniteQueue) Length() int {
return q.count
}

// resizes the queue to fit exactly twice its current contents
// this can result in shrinking if the queue is less than half-full
func (q *infiniteQueue) resize() {
newBuf := make([]Event, q.count<<1)

if q.tail > q.head {
copy(newBuf, q.buf[q.head:q.tail])
} else {
n := copy(newBuf, q.buf[q.head:])
copy(newBuf[n:], q.buf[:q.tail])
}

q.head = 0
q.tail = q.count
q.buf = newBuf
}

// Add puts an element on the end of the queue.
func (q *infiniteQueue) Add(elem Event) {
if q.count == len(q.buf) {
q.resize()
}

q.buf[q.tail] = elem
// bitwise modulus
q.tail = (q.tail + 1) & (len(q.buf) - 1)
q.count++
}

// Peek returns the element at the head of the queue. This call panics
// if the queue is empty.
func (q *infiniteQueue) Peek() Event {
if q.count <= 0 {
panic("queue: Peek() called on empty queue")
}
return q.buf[q.head]
}

// Get returns the element at index i in the queue. If the index is
// invalid, the call will panic. This method accepts both positive and
// negative index values. Index 0 refers to the first element, and
// index -1 refers to the last.
func (q *infiniteQueue) Get(i int) Event {
// If indexing backwards, convert to positive index.
if i < 0 {
i += q.count
}
if i < 0 || i >= q.count {
panic("queue: Get() called with index out of range")
}
// bitwise modulus
return q.buf[(q.head+i)&(len(q.buf)-1)]
}

// Remove removes and returns the element from the front of the queue. If the
// queue is empty, the call will panic.
func (q *infiniteQueue) Remove() Event {
if q.count <= 0 {
panic("queue: Remove() called on empty queue")
}
ret := q.buf[q.head]
//q.buf[q.head] = nil
// bitwise modulus
q.head = (q.head + 1) & (len(q.buf) - 1)
q.count--
// Resize down if buffer 1/4 full.
if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) {
q.resize()
}
return ret
}

+ 6
- 6
peer/cwtch_peer.go View File

@@ -22,7 +22,7 @@ type cwtchPeer struct {
mutex sync.Mutex
shutdown bool

queue *event.Queue
queue event.Queue
eventBus event.Manager
}

@@ -84,14 +84,14 @@ func FromProfile(profile *model.Profile) CwtchPeer {

// Init instantiates a cwtchPeer
func (cp *cwtchPeer) Init(eventBus event.Manager) {
cp.queue = event.NewEventQueue(100)
cp.queue = event.NewQueue()
go cp.eventHandler()

cp.eventBus = eventBus
cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.ServerStateChange, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.PeerStateChange, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue)
cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue)
cp.eventBus.Subscribe(event.ServerStateChange, cp.queue)
cp.eventBus.Subscribe(event.PeerStateChange, cp.queue)
}

// ImportGroup intializes a group from an imported source rather than a peer invite

+ 13
- 13
protocol/connections/engine.go View File

@@ -17,7 +17,7 @@ import (
)

type engine struct {
queue *event.Queue
queue event.Queue
connectionsManager *Manager

// Engine Attributes
@@ -55,7 +55,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine := new(engine)
engine.identity = identity
engine.privateKey = privateKey
engine.queue = event.NewEventQueue(100)
engine.queue = event.NewQueue()
go engine.eventHandler()

engine.acn = acn
@@ -68,17 +68,17 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK

engine.eventManager = eventManager

engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.DeleteContact, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.UnblockPeer, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue)
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue)
engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)
engine.eventManager.Subscribe(event.BlockPeer, engine.queue)
engine.eventManager.Subscribe(event.UnblockPeer, engine.queue)
for _, peer := range blockedPeers {
engine.blocked.Store(peer, true)
}

+ 1
- 1
protocol/connections/fetch/peer_fetch_channel.go View File

@@ -98,7 +98,7 @@ func (cpfc *CwtchPeerFetchChannel) Packet(data []byte) {
gm := csp.GetGroupMessage()
// We create a new go routine here to avoid leaking any information about processing time
// TODO Server can probably try to use this to DoS a peer
go cpfc.Handler.HandleGroupMessage(gm)
cpfc.Handler.HandleGroupMessage(gm)
}
}
}

+ 18
- 17
storage/profile_store.go View File

@@ -20,7 +20,7 @@ type profileStore struct {
password string
profile *model.Profile
eventManager event.Manager
queue *event.Queue
queue event.Queue
writer bool
}

@@ -38,27 +38,28 @@ type ProfileStore interface {
func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
ps.queue = event.NewEventQueue(100)
//ps.queue = event.NewQueue(100)
ps.queue = event.NewQueue()
if profile != nil {
ps.save()
}
go ps.eventHandler()

ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.UnblockPeer, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.DeleteContact, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.BlockPeer, ps.queue)
ps.eventManager.Subscribe(event.UnblockPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)

return ps
}

Loading…
Cancel
Save