Deep Copy Events in the Event Bus to Prevent Map Concurrency Issue
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2021-03-24 14:42:15 -07:00
parent afd6201a98
commit 2832d17cb9
4 changed files with 37 additions and 12 deletions

View File

@ -1,6 +1,8 @@
package event
import "sync"
import (
"sync"
)
type queue struct {
infChan infiniteChannel

View File

@ -2,6 +2,8 @@ package event
import (
"crypto/rand"
"encoding/json"
"git.openprivacy.ca/openprivacy/log"
"math"
"math/big"
"sync"
@ -48,7 +50,7 @@ func NewEventList(eventType Type, args ...interface{}) Event {
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
type manager struct {
subscribers map[Type][]Queue
events chan Event
events chan []byte
mapMutex sync.Mutex
internal chan bool
closed bool
@ -73,7 +75,7 @@ func NewEventManager() Manager {
// Initialize sets up the Manager.
func (em *manager) initialize() {
em.subscribers = make(map[Type][]Queue)
em.events = make(chan Event)
em.events = make(chan []byte)
em.internal = make(chan bool)
em.closed = false
go em.eventBus()
@ -90,7 +92,12 @@ func (em *manager) Subscribe(eventType Type, queue Queue) {
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *manager) Publish(event Event) {
if event.EventType != "" && em.closed != true {
em.events <- event
// Deep Copy the Event...
eventJSON,err := json.Marshal(event)
if err != nil {
log.Errorf("Error serializing event: %v", event)
}
em.events <- eventJSON
}
}
@ -102,13 +109,21 @@ func (em *manager) PublishLocal(event Event) {
// eventBus is an internal function that is used to distribute events to all subscribers
func (em *manager) eventBus() {
for {
event := <-em.events
eventJSON := <-em.events
// In the case on an empty event. Teardown the Queue
if event.EventType == "" {
if len(eventJSON) == 0 {
log.Errorf("Received zero length event")
break
}
var event Event
err := json.Unmarshal(eventJSON, &event)
if err != nil {
log.Errorf("Error on Deep Copy: %v %v", eventJSON, err)
}
// maps aren't thread safe
em.mapMutex.Lock()
subscribers := em.subscribers[event.EventType]
@ -116,7 +131,10 @@ func (em *manager) eventBus() {
// Send the event to any subscribers to that event type
for _, subscriber := range subscribers {
subscriber.Publish(event)
// Deep Copy for Each Subscriber
var eventCopy Event
json.Unmarshal(eventJSON, &eventCopy)
subscriber.Publish(eventCopy)
}
}
@ -126,7 +144,7 @@ func (em *manager) eventBus() {
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
func (em *manager) Shutdown() {
em.events <- Event{}
em.events <- []byte{}
em.closed = true
// wait for eventBus to finish
<-em.internal

View File

@ -418,7 +418,7 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: string(ct), event.Signature: string(sig)}))
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
return string(sig), err
@ -604,7 +604,9 @@ func (cp *cwtchPeer) eventHandler() {
case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline
cp.mutex.Lock()
ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
ciphertext,_ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
cp.mutex.Unlock()
if ok && !seen {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))

View File

@ -7,6 +7,7 @@ import (
"cwtch.im/tapir"
"cwtch.im/tapir/networks/tor"
"cwtch.im/tapir/primitives"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@ -143,7 +144,9 @@ func (e *engine) eventHandler() {
case event.DeleteGroup:
// TODO: There isn't a way here to determine if other Groups are using a server connection...
case event.SendMessageToGroup:
err := e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
ciphertext,_ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
err := e.sendMessageToGroup(ev.Data[event.GroupServer],ciphertext, signature)
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
@ -430,7 +433,7 @@ func (e *engine) deleteConnection(id string) {
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.Ciphertext), event.Signature: string(gm.Signature)}))
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
}
// sendMessageToGroup attempts to sent the given message to the given group id.