forked from cwtch.im/cwtch
Merge pull request 'Deep Copy Events in the Event Bus to Prevent Map Concurrency Issue' (#341) from newui-bugfix into master
Reviewed-on: cwtch.im/cwtch#341
This commit is contained in:
commit
3ac36db511
|
@ -1,6 +1,8 @@
|
|||
package event
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type queue struct {
|
||||
infChan infiniteChannel
|
||||
|
|
|
@ -2,6 +2,8 @@ package event
|
|||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"math"
|
||||
"math/big"
|
||||
"sync"
|
||||
|
@ -48,7 +50,7 @@ func NewEventList(eventType Type, args ...interface{}) Event {
|
|||
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
|
||||
type manager struct {
|
||||
subscribers map[Type][]Queue
|
||||
events chan Event
|
||||
events chan []byte
|
||||
mapMutex sync.Mutex
|
||||
internal chan bool
|
||||
closed bool
|
||||
|
@ -73,7 +75,7 @@ func NewEventManager() Manager {
|
|||
// Initialize sets up the Manager.
|
||||
func (em *manager) initialize() {
|
||||
em.subscribers = make(map[Type][]Queue)
|
||||
em.events = make(chan Event)
|
||||
em.events = make(chan []byte)
|
||||
em.internal = make(chan bool)
|
||||
em.closed = false
|
||||
go em.eventBus()
|
||||
|
@ -90,7 +92,12 @@ func (em *manager) Subscribe(eventType Type, queue Queue) {
|
|||
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
|
||||
func (em *manager) Publish(event Event) {
|
||||
if event.EventType != "" && em.closed != true {
|
||||
em.events <- event
|
||||
// Deep Copy the Event...
|
||||
eventJSON,err := json.Marshal(event)
|
||||
if err != nil {
|
||||
log.Errorf("Error serializing event: %v", event)
|
||||
}
|
||||
em.events <- eventJSON
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,13 +109,21 @@ func (em *manager) PublishLocal(event Event) {
|
|||
// eventBus is an internal function that is used to distribute events to all subscribers
|
||||
func (em *manager) eventBus() {
|
||||
for {
|
||||
event := <-em.events
|
||||
eventJSON := <-em.events
|
||||
|
||||
// In the case on an empty event. Teardown the Queue
|
||||
if event.EventType == "" {
|
||||
if len(eventJSON) == 0 {
|
||||
log.Errorf("Received zero length event")
|
||||
break
|
||||
}
|
||||
|
||||
var event Event
|
||||
err := json.Unmarshal(eventJSON, &event)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Error on Deep Copy: %v %v", eventJSON, err)
|
||||
}
|
||||
|
||||
// maps aren't thread safe
|
||||
em.mapMutex.Lock()
|
||||
subscribers := em.subscribers[event.EventType]
|
||||
|
@ -116,7 +131,10 @@ func (em *manager) eventBus() {
|
|||
|
||||
// Send the event to any subscribers to that event type
|
||||
for _, subscriber := range subscribers {
|
||||
subscriber.Publish(event)
|
||||
// Deep Copy for Each Subscriber
|
||||
var eventCopy Event
|
||||
json.Unmarshal(eventJSON, &eventCopy)
|
||||
subscriber.Publish(eventCopy)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,7 +144,7 @@ func (em *manager) eventBus() {
|
|||
|
||||
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
|
||||
func (em *manager) Shutdown() {
|
||||
em.events <- Event{}
|
||||
em.events <- []byte{}
|
||||
em.closed = true
|
||||
// wait for eventBus to finish
|
||||
<-em.internal
|
||||
|
|
|
@ -418,7 +418,7 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (
|
|||
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
|
||||
|
||||
if err == nil {
|
||||
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: string(ct), event.Signature: string(sig)}))
|
||||
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||
}
|
||||
|
||||
return string(sig), err
|
||||
|
@ -604,7 +604,9 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
case event.EncryptedGroupMessage:
|
||||
// If successful, a side effect is the message is added to the group's timeline
|
||||
cp.mutex.Lock()
|
||||
ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
||||
ciphertext,_ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||
signature,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
|
||||
cp.mutex.Unlock()
|
||||
if ok && !seen {
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"cwtch.im/tapir"
|
||||
"cwtch.im/tapir/networks/tor"
|
||||
"cwtch.im/tapir/primitives"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -143,7 +144,9 @@ func (e *engine) eventHandler() {
|
|||
case event.DeleteGroup:
|
||||
// TODO: There isn't a way here to determine if other Groups are using a server connection...
|
||||
case event.SendMessageToGroup:
|
||||
err := e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
||||
ciphertext,_ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||
signature,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
err := e.sendMessageToGroup(ev.Data[event.GroupServer],ciphertext, signature)
|
||||
if err != nil {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
}
|
||||
|
@ -430,7 +433,7 @@ func (e *engine) deleteConnection(id string) {
|
|||
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
|
||||
// Publish Event so that a Profile Engine can deal with it.
|
||||
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
|
||||
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.Ciphertext), event.Signature: string(gm.Signature)}))
|
||||
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
|
||||
}
|
||||
|
||||
// sendMessageToGroup attempts to sent the given message to the given group id.
|
||||
|
|
Loading…
Reference in New Issue