From bd75e44555e9b84ac0d14c8f2d471aed95f17e87 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Wed, 14 Aug 2019 13:56:45 -0700 Subject: [PATCH] make event.Queue use internal infinite channels; make event.Manager not use failable writes --- app/cli/main.go | 6 +- app/peer/alice/alice.go | 4 +- app/plugins/contactRetry.go | 8 +- event/EventQueue.go | 31 ----- event/common.go | 7 +- event/eventQueue.go | 80 +++++++++++++ event/eventmanager.go | 19 +-- event/eventmanager_test.go | 28 ++--- event/eventmanageripc.go | 4 +- event/infinitechannel.go | 73 ++++++++++++ event/infinitequeue.go | 108 ++++++++++++++++++ peer/cwtch_peer.go | 12 +- protocol/connections/engine.go | 24 ++-- .../connections/fetch/peer_fetch_channel.go | 2 +- storage/profile_store.go | 35 +++--- 15 files changed, 335 insertions(+), 106 deletions(-) delete mode 100644 event/EventQueue.go create mode 100644 event/eventQueue.go create mode 100644 event/infinitechannel.go create mode 100644 event/infinitequeue.go diff --git a/app/cli/main.go b/app/cli/main.go index d4dc549..b858a5d 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -209,9 +209,9 @@ func completer(d prompt.Document) []prompt.Suggest { } func handleAppEvents(em event.Manager) { - queue := event.NewEventQueue(100) - em.Subscribe(event.NewPeer, queue.EventChannel) - em.Subscribe(event.PeerError, queue.EventChannel) + queue := event.NewQueue() + em.Subscribe(event.NewPeer, queue) + em.Subscribe(event.PeerError, queue) for { ev := queue.Next() diff --git a/app/peer/alice/alice.go b/app/peer/alice/alice.go index 579a1d2..c3d7d1e 100644 --- a/app/peer/alice/alice.go +++ b/app/peer/alice/alice.go @@ -27,8 +27,8 @@ func main() { alice := utils.WaitGetPeer(app, "alice") app.LaunchPeers() eventBus := app.GetEventBus(alice.GetProfile().Onion) - queue := event.NewEventQueue(100) - eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel) + queue := event.NewQueue() + eventBus.Subscribe(event.NewMessageFromPeer, queue) // For every new Data Packet Alice received she will Print it out. for { diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 1b51ba0..41e15e8 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -20,7 +20,7 @@ type peer struct { type contactRetry struct { bus event.Manager - queue *event.Queue + queue event.Queue breakChan chan bool @@ -29,7 +29,7 @@ type contactRetry struct { // NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing func NewContactRetry(bus event.Manager) Plugin { - cr := &contactRetry{bus: bus, queue: event.NewEventQueue(1000), breakChan: make(chan bool), peers: sync.Map{}} + cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool), peers: sync.Map{}} return cr } @@ -38,10 +38,10 @@ func (cr *contactRetry) Start() { } func (cr *contactRetry) run() { - cr.bus.Subscribe(event.PeerStateChange, cr.queue.EventChannel) + cr.bus.Subscribe(event.PeerStateChange, cr.queue) for { select { - case e := <-cr.queue.EventChannel: + case e := <-cr.queue.OutChan(): switch e.EventType { case event.PeerStateChange: state := connections.ConnectionStateToType[e.Data[event.ConnectionState]] diff --git a/event/EventQueue.go b/event/EventQueue.go deleted file mode 100644 index acd2ce6..0000000 --- a/event/EventQueue.go +++ /dev/null @@ -1,31 +0,0 @@ -package event - -// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems. -// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from -// the event.Manager. -type Queue struct { - EventChannel chan Event -} - -// NewEventQueue initializes an event.Queue of the given buffer size. -func NewEventQueue(buffer int) *Queue { - queue := new(Queue) - queue.EventChannel = make(chan Event, buffer) - return queue -} - -// Backlog returns the length of the queue backlog -func (eq *Queue) Backlog() int { - return len(eq.EventChannel) -} - -// Next returns the next available event from the front of the queue -func (eq *Queue) Next() (event Event) { - event = <-eq.EventChannel - return -} - -// Shutdown closes our EventChannel -func (eq *Queue) Shutdown() { - close(eq.EventChannel) -} diff --git a/event/common.go b/event/common.go index 1a6efd4..188f25b 100644 --- a/event/common.go +++ b/event/common.go @@ -33,9 +33,12 @@ const ( // GroupID AcceptGroupInvite = Type("AcceptGroupInvite") - SendMessageToGroup = Type("SendMessagetoGroup") + SendMessageToGroup = Type("SendMessagetoGroup") + + //Ciphertext, Signature: EncryptedGroupMessage = Type("EncryptedGroupMessage") - NewMessageFromGroup = Type("NewMessageFromGroup") + //TimestampReceived, TimestampSent, Data(Message), GroupID, Signature, PreviousSignature, RemotePeer + NewMessageFromGroup = Type("NewMessageFromGroup") // an error was encountered trying to send a particular Message to a group // attributes: diff --git a/event/eventQueue.go b/event/eventQueue.go new file mode 100644 index 0000000..6bb72e0 --- /dev/null +++ b/event/eventQueue.go @@ -0,0 +1,80 @@ +package event + +type queue struct { + infChan infiniteChannel +} + +type simpleQueue struct { + eventChannel chan Event +} + +// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems. +// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from +// the event.Manager. +type Queue interface { + InChan() chan<- Event + OutChan() <-chan Event + Next() *Event + Shutdown() + Len() int +} + +// NewQueue initializes an event.Queue +func NewQueue() Queue { + queue := &queue{infChan: *newInfiniteChannel()} + return queue +} + +// NewSimpleQueue initializes an event.Queue of the given buffer size. +func NewSimpleQueue(buffer int) Queue { + queue := new(simpleQueue) + queue.eventChannel = make(chan Event, buffer) + return queue +} + +func (sq *simpleQueue) InChan() chan<- Event { + return sq.eventChannel +} + +func (sq *simpleQueue) OutChan() <-chan Event { + return sq.eventChannel +} + +// Backlog returns the length of the queue backlog +func (sq *simpleQueue) Len() int { + return len(sq.eventChannel) +} + +// Next returns the next available event from the front of the queue +func (sq *simpleQueue) Next() *Event { + event := <-sq.eventChannel + return &event +} + +// Shutdown closes our eventChannel +func (sq *simpleQueue) Shutdown() { + close(sq.eventChannel) +} + +func (iq *queue) InChan() chan<- Event { + return iq.infChan.In() +} + +func (iq *queue) OutChan() <-chan Event { + return iq.infChan.Out() +} + +// Out returns the next available event from the front of the queue +func (iq *queue) Next() *Event { + event := <-iq.infChan.Out() + return &event +} + +func (iq *queue) Len() int { + return iq.infChan.Len() +} + +// Shutdown closes our eventChannel +func (iq *queue) Shutdown() { + iq.infChan.Close() +} diff --git a/event/eventmanager.go b/event/eventmanager.go index c62d837..c571ca0 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -1,7 +1,6 @@ package event import ( - "git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/utils" "sync" ) @@ -33,7 +32,7 @@ func NewEventList(eventType Type, args ...interface{}) Event { // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. type manager struct { - subscribers map[Type][]chan Event + subscribers map[Type][]chan<- Event events chan Event mapMutex sync.Mutex internal chan bool @@ -42,7 +41,7 @@ type manager struct { // Manager is an interface for an event bus type Manager interface { - Subscribe(Type, chan Event) + Subscribe(Type, Queue) Publish(Event) PublishLocal(Event) Shutdown() @@ -57,7 +56,7 @@ func NewEventManager() Manager { // Initialize sets up the Manager. func (em *manager) initialize() { - em.subscribers = make(map[Type][]chan Event) + em.subscribers = make(map[Type][]chan<- Event) em.events = make(chan Event) em.internal = make(chan bool) em.closed = false @@ -66,10 +65,10 @@ func (em *manager) initialize() { // Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type // will be sent to the eventChannel. -func (em *manager) Subscribe(eventType Type, eventChannel chan Event) { +func (em *manager) Subscribe(eventType Type, queue Queue) { em.mapMutex.Lock() defer em.mapMutex.Unlock() - em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel) + em.subscribers[eventType] = append(em.subscribers[eventType], queue.InChan()) } // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers @@ -101,13 +100,7 @@ func (em *manager) eventBus() { // Send the event to any subscribers to that event type for _, subscriber := range subscribers { - select { - case subscriber <- event: - log.Debugf("Sending %v to %v", event.EventType, subscriber) - default: - log.Errorf("Failed to send %v to %v. The subsystem might be running too slow!", event.EventType, subscriber) - } - + subscriber <- event } } diff --git a/event/eventmanager_test.go b/event/eventmanager_test.go index cf8e2fb..9db3db7 100644 --- a/event/eventmanager_test.go +++ b/event/eventmanager_test.go @@ -12,7 +12,8 @@ func TestEventManager(t *testing.T) { // We need to make this buffer at least 1, otherwise we will log an error! testChan := make(chan Event, 1) - eventManager.Subscribe("TEST", testChan) + simpleQueue := &simpleQueue{testChan} + eventManager.Subscribe("TEST", simpleQueue) eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}}) event := <-testChan @@ -31,7 +32,8 @@ func TestEventManagerOverflow(t *testing.T) { // Explicitly setting this to 0 log an error! testChan := make(chan Event) - eventManager.Subscribe("TEST", testChan) + simpleQueue := &simpleQueue{testChan} + eventManager.Subscribe("TEST", simpleQueue) eventManager.Publish(Event{EventType: "TEST"}) } @@ -39,15 +41,15 @@ func TestEventManagerMultiple(t *testing.T) { log.SetLevel(log.LevelDebug) eventManager := NewEventManager() - groupEventQueue := NewEventQueue(10) - peerEventQueue := NewEventQueue(10) - allEventQueue := NewEventQueue(10) + groupEventQueue := NewQueue() + peerEventQueue := NewQueue() + allEventQueue := NewQueue() - eventManager.Subscribe("PeerEvent", peerEventQueue.EventChannel) - eventManager.Subscribe("GroupEvent", groupEventQueue.EventChannel) - eventManager.Subscribe("PeerEvent", allEventQueue.EventChannel) - eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel) - eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel) + eventManager.Subscribe("PeerEvent", peerEventQueue) + eventManager.Subscribe("GroupEvent", groupEventQueue) + eventManager.Subscribe("PeerEvent", allEventQueue) + eventManager.Subscribe("GroupEvent", allEventQueue) + eventManager.Subscribe("ErrorEvent", allEventQueue) eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}}) eventManager.Publish(Event{EventType: "GroupEvent", Data: map[Field]string{"Value": "Hello World Group"}}) @@ -63,9 +65,9 @@ func TestEventManagerMultiple(t *testing.T) { time.Sleep(time.Second) - assertLength(groupEventQueue.Backlog(), 1, "Group Event Queue Length") - assertLength(peerEventQueue.Backlog(), 2, "Peer Event Queue Length") - assertLength(allEventQueue.Backlog(), 4, "All Event Queue Length") + assertLength(groupEventQueue.Len(), 1, "Group Event Queue Length") + assertLength(peerEventQueue.Len(), 2, "Peer Event Queue Length") + assertLength(allEventQueue.Len(), 4, "All Event Queue Length") checkEvent := func(eventType Type, expected Type, label string) { if eventType != expected { diff --git a/event/eventmanageripc.go b/event/eventmanageripc.go index dda3589..4dddcd1 100644 --- a/event/eventmanageripc.go +++ b/event/eventmanageripc.go @@ -29,8 +29,8 @@ func (ipcm *ipcManager) PublishLocal(ev Event) { ipcm.manager.Publish(ev) } -func (ipcm *ipcManager) Subscribe(eventType Type, eventChan chan Event) { - ipcm.manager.Subscribe(eventType, eventChan) +func (ipcm *ipcManager) Subscribe(eventType Type, queue Queue) { + ipcm.manager.Subscribe(eventType, queue) } func (ipcm *ipcManager) Shutdown() { diff --git a/event/infinitechannel.go b/event/infinitechannel.go new file mode 100644 index 0000000..630da8c --- /dev/null +++ b/event/infinitechannel.go @@ -0,0 +1,73 @@ +package event + +/* +This package is taken from https://github.com/eapache/channels +as per their suggestion we are not importing the entire package and instead cherry picking and adapting what is needed + +It is covered by the MIT License https://github.com/eapache/channels/blob/master/LICENSE +*/ + +// infiniteChannel implements the Channel interface with an infinite buffer between the input and the output. +type infiniteChannel struct { + input, output chan Event + length chan int + buffer *infiniteQueue +} + +func newInfiniteChannel() *infiniteChannel { + ch := &infiniteChannel{ + input: make(chan Event), + output: make(chan Event), + length: make(chan int), + buffer: newInfinitQueue(), + } + go ch.infiniteBuffer() + return ch +} + +func (ch *infiniteChannel) In() chan<- Event { + return ch.input +} + +func (ch *infiniteChannel) Out() <-chan Event { + return ch.output +} + +func (ch *infiniteChannel) Len() int { + return <-ch.length +} + +func (ch *infiniteChannel) Close() { + close(ch.input) +} + +func (ch *infiniteChannel) infiniteBuffer() { + var input, output chan Event + var next Event + input = ch.input + + for input != nil || output != nil { + select { + case elem, open := <-input: + if open { + ch.buffer.Add(elem) + } else { + input = nil + } + case output <- next: + ch.buffer.Remove() + case ch.length <- ch.buffer.Length(): + } + + if ch.buffer.Length() > 0 { + output = ch.output + next = ch.buffer.Peek() + } else { + output = nil + //next = nil + } + } + + close(ch.output) + close(ch.length) +} diff --git a/event/infinitequeue.go b/event/infinitequeue.go new file mode 100644 index 0000000..8a0e8ac --- /dev/null +++ b/event/infinitequeue.go @@ -0,0 +1,108 @@ +package event + +/* +This package is taken from https://github.com/eapache/channels +as per their suggestion we are not importing the entire package and instead cherry picking and adapting what is needed + +It is covered by the MIT License https://github.com/eapache/channels/blob/master/LICENSE +*/ +/* +Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. +Using this instead of other, simpler, queue implementations (slice+append or linked list) provides +substantial memory and time benefits, and fewer GC pauses. +The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe. +*/ + +// minQueueLen is smallest capacity that queue may have. +// Must be power of 2 for bitwise modulus: x % n == x & (n - 1). +const minQueueLen = 16 + +// Queue represents a single instance of the queue data structure. +type infiniteQueue struct { + buf []Event + head, tail, count int +} + +// New constructs and returns a new Queue. +func newInfinitQueue() *infiniteQueue { + return &infiniteQueue{ + buf: make([]Event, minQueueLen), + } +} + +// Length returns the number of elements currently stored in the queue. +func (q *infiniteQueue) Length() int { + return q.count +} + +// resizes the queue to fit exactly twice its current contents +// this can result in shrinking if the queue is less than half-full +func (q *infiniteQueue) resize() { + newBuf := make([]Event, q.count<<1) + + if q.tail > q.head { + copy(newBuf, q.buf[q.head:q.tail]) + } else { + n := copy(newBuf, q.buf[q.head:]) + copy(newBuf[n:], q.buf[:q.tail]) + } + + q.head = 0 + q.tail = q.count + q.buf = newBuf +} + +// Add puts an element on the end of the queue. +func (q *infiniteQueue) Add(elem Event) { + if q.count == len(q.buf) { + q.resize() + } + + q.buf[q.tail] = elem + // bitwise modulus + q.tail = (q.tail + 1) & (len(q.buf) - 1) + q.count++ +} + +// Peek returns the element at the head of the queue. This call panics +// if the queue is empty. +func (q *infiniteQueue) Peek() Event { + if q.count <= 0 { + panic("queue: Peek() called on empty queue") + } + return q.buf[q.head] +} + +// Get returns the element at index i in the queue. If the index is +// invalid, the call will panic. This method accepts both positive and +// negative index values. Index 0 refers to the first element, and +// index -1 refers to the last. +func (q *infiniteQueue) Get(i int) Event { + // If indexing backwards, convert to positive index. + if i < 0 { + i += q.count + } + if i < 0 || i >= q.count { + panic("queue: Get() called with index out of range") + } + // bitwise modulus + return q.buf[(q.head+i)&(len(q.buf)-1)] +} + +// Remove removes and returns the element from the front of the queue. If the +// queue is empty, the call will panic. +func (q *infiniteQueue) Remove() Event { + if q.count <= 0 { + panic("queue: Remove() called on empty queue") + } + ret := q.buf[q.head] + //q.buf[q.head] = nil + // bitwise modulus + q.head = (q.head + 1) & (len(q.buf) - 1) + q.count-- + // Resize down if buffer 1/4 full. + if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) { + q.resize() + } + return ret +} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 1bd1e24..937aa0e 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -22,7 +22,7 @@ type cwtchPeer struct { mutex sync.Mutex shutdown bool - queue *event.Queue + queue event.Queue eventBus event.Manager } @@ -84,14 +84,14 @@ func FromProfile(profile *model.Profile) CwtchPeer { // Init instantiates a cwtchPeer func (cp *cwtchPeer) Init(eventBus event.Manager) { - cp.queue = event.NewEventQueue(100) + cp.queue = event.NewQueue() go cp.eventHandler() cp.eventBus = eventBus - cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel) - cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel) - cp.eventBus.Subscribe(event.ServerStateChange, cp.queue.EventChannel) - cp.eventBus.Subscribe(event.PeerStateChange, cp.queue.EventChannel) + cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue) + cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue) + cp.eventBus.Subscribe(event.ServerStateChange, cp.queue) + cp.eventBus.Subscribe(event.PeerStateChange, cp.queue) } // ImportGroup intializes a group from an imported source rather than a peer invite diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 7787685..524e770 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -17,7 +17,7 @@ import ( ) type engine struct { - queue *event.Queue + queue event.Queue connectionsManager *Manager // Engine Attributes @@ -55,7 +55,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine := new(engine) engine.identity = identity engine.privateKey = privateKey - engine.queue = event.NewEventQueue(100) + engine.queue = event.NewQueue() go engine.eventHandler() engine.acn = acn @@ -68,17 +68,17 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager = eventManager - engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.DeleteContact, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.DeleteGroup, engine.queue.EventChannel) + engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) + engine.eventManager.Subscribe(event.PeerRequest, engine.queue) + engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) + engine.eventManager.Subscribe(event.JoinServer, engine.queue) + engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue) + engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue) + engine.eventManager.Subscribe(event.DeleteContact, engine.queue) + engine.eventManager.Subscribe(event.DeleteGroup, engine.queue) - engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel) - engine.eventManager.Subscribe(event.UnblockPeer, engine.queue.EventChannel) + engine.eventManager.Subscribe(event.BlockPeer, engine.queue) + engine.eventManager.Subscribe(event.UnblockPeer, engine.queue) for _, peer := range blockedPeers { engine.blocked.Store(peer, true) } diff --git a/protocol/connections/fetch/peer_fetch_channel.go b/protocol/connections/fetch/peer_fetch_channel.go index 62f3835..8f61850 100644 --- a/protocol/connections/fetch/peer_fetch_channel.go +++ b/protocol/connections/fetch/peer_fetch_channel.go @@ -98,7 +98,7 @@ func (cpfc *CwtchPeerFetchChannel) Packet(data []byte) { gm := csp.GetGroupMessage() // We create a new go routine here to avoid leaking any information about processing time // TODO Server can probably try to use this to DoS a peer - go cpfc.Handler.HandleGroupMessage(gm) + cpfc.Handler.HandleGroupMessage(gm) } } } diff --git a/storage/profile_store.go b/storage/profile_store.go index aef9ff4..53a1478 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -20,7 +20,7 @@ type profileStore struct { password string profile *model.Profile eventManager event.Manager - queue *event.Queue + queue event.Queue writer bool } @@ -38,27 +38,28 @@ type ProfileStore interface { func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore { os.Mkdir(directory, 0700) ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} - ps.queue = event.NewEventQueue(100) + //ps.queue = event.NewQueue(100) + ps.queue = event.NewQueue() if profile != nil { ps.save() } go ps.eventHandler() - ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.UnblockPeer, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.PeerCreated, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.GroupCreated, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.PeerStateChange, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.ServerStateChange, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.DeleteContact, ps.queue.EventChannel) - ps.eventManager.Subscribe(event.DeleteGroup, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.BlockPeer, ps.queue) + ps.eventManager.Subscribe(event.UnblockPeer, ps.queue) + ps.eventManager.Subscribe(event.PeerCreated, ps.queue) + ps.eventManager.Subscribe(event.GroupCreated, ps.queue) + ps.eventManager.Subscribe(event.SetProfileName, ps.queue) + ps.eventManager.Subscribe(event.SetAttribute, ps.queue) + ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue) + ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue) + ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue) + ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue) + ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) + ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) + ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) + ps.eventManager.Subscribe(event.DeleteContact, ps.queue) + ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) return ps }