From 2832d17cb9c97de3a6d42a2baa92dd7e9f656084 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 24 Mar 2021 14:42:15 -0700 Subject: [PATCH] Deep Copy Events in the Event Bus to Prevent Map Concurrency Issue --- event/eventQueue.go | 4 +++- event/eventmanager.go | 32 +++++++++++++++++++++++++------- peer/cwtch_peer.go | 6 ++++-- protocol/connections/engine.go | 7 +++++-- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/event/eventQueue.go b/event/eventQueue.go index 94b40eb..a338d9d 100644 --- a/event/eventQueue.go +++ b/event/eventQueue.go @@ -1,6 +1,8 @@ package event -import "sync" +import ( + "sync" +) type queue struct { infChan infiniteChannel diff --git a/event/eventmanager.go b/event/eventmanager.go index be1c19f..e675b7c 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -2,6 +2,8 @@ package event import ( "crypto/rand" + "encoding/json" + "git.openprivacy.ca/openprivacy/log" "math" "math/big" "sync" @@ -48,7 +50,7 @@ func NewEventList(eventType Type, args ...interface{}) Event { // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. type manager struct { subscribers map[Type][]Queue - events chan Event + events chan []byte mapMutex sync.Mutex internal chan bool closed bool @@ -73,7 +75,7 @@ func NewEventManager() Manager { // Initialize sets up the Manager. func (em *manager) initialize() { em.subscribers = make(map[Type][]Queue) - em.events = make(chan Event) + em.events = make(chan []byte) em.internal = make(chan bool) em.closed = false go em.eventBus() @@ -90,7 +92,12 @@ func (em *manager) Subscribe(eventType Type, queue Queue) { // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers func (em *manager) Publish(event Event) { if event.EventType != "" && em.closed != true { - em.events <- event + // Deep Copy the Event... + eventJSON,err := json.Marshal(event) + if err != nil { + log.Errorf("Error serializing event: %v", event) + } + em.events <- eventJSON } } @@ -102,13 +109,21 @@ func (em *manager) PublishLocal(event Event) { // eventBus is an internal function that is used to distribute events to all subscribers func (em *manager) eventBus() { for { - event := <-em.events + eventJSON := <-em.events // In the case on an empty event. Teardown the Queue - if event.EventType == "" { + if len(eventJSON) == 0 { + log.Errorf("Received zero length event") break } + var event Event + err := json.Unmarshal(eventJSON, &event) + + if err != nil { + log.Errorf("Error on Deep Copy: %v %v", eventJSON, err) + } + // maps aren't thread safe em.mapMutex.Lock() subscribers := em.subscribers[event.EventType] @@ -116,7 +131,10 @@ func (em *manager) eventBus() { // Send the event to any subscribers to that event type for _, subscriber := range subscribers { - subscriber.Publish(event) + // Deep Copy for Each Subscriber + var eventCopy Event + json.Unmarshal(eventJSON, &eventCopy) + subscriber.Publish(eventCopy) } } @@ -126,7 +144,7 @@ func (em *manager) eventBus() { // Shutdown triggers, and waits for, the internal eventBus goroutine to finish func (em *manager) Shutdown() { - em.events <- Event{} + em.events <- []byte{} em.closed = true // wait for eventBus to finish <-em.internal diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 3bea251..dcbdc3c 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -418,7 +418,7 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) ( ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) if err == nil { - cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: string(ct), event.Signature: string(sig)})) + cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})) } return string(sig), err @@ -604,7 +604,9 @@ func (cp *cwtchPeer) eventHandler() { case event.EncryptedGroupMessage: // If successful, a side effect is the message is added to the group's timeline cp.mutex.Lock() - ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) + ciphertext,_ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) + signature,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature) cp.mutex.Unlock() if ok && !seen { cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 29be800..24b5272 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -7,6 +7,7 @@ import ( "cwtch.im/tapir" "cwtch.im/tapir/networks/tor" "cwtch.im/tapir/primitives" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -143,7 +144,9 @@ func (e *engine) eventHandler() { case event.DeleteGroup: // TODO: There isn't a way here to determine if other Groups are using a server connection... case event.SendMessageToGroup: - err := e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) + ciphertext,_ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) + signature,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + err := e.sendMessageToGroup(ev.Data[event.GroupServer],ciphertext, signature) if err != nil { e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()})) } @@ -430,7 +433,7 @@ func (e *engine) deleteConnection(id string) { func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) { // Publish Event so that a Profile Engine can deal with it. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! - e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.Ciphertext), event.Signature: string(gm.Signature)})) + e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)})) } // sendMessageToGroup attempts to sent the given message to the given group id.