From 16c8095e5fad8c3d68c7ac406e69b9700d9f759b Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 21 Jan 2019 10:47:07 -0800 Subject: [PATCH] Adding Timestamp to Events Also starting to define common event fields to prevent type confusion / spelling issues --- event/common.go | 9 +++++++++ event/eventmanager.go | 4 ++-- event/eventmanager_test.go | 12 ++++++------ peer/cwtch_peer.go | 16 ++++++++-------- protocol/connections/engine.go | 11 ++++++----- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/event/common.go b/event/common.go index 151c3b2..55f32e2 100644 --- a/event/common.go +++ b/event/common.go @@ -25,3 +25,12 @@ const ( SendMessageToPeer = Type("SendMessageToPeer") NewMessageFromPeer = Type("NewMessageFromPeer") ) + +// Field defines common event attributes +type Field string + +// Defining Common Field Types +const ( + TimestampSent = Field("TimestampSent") + TimestampReceived = Field("TimestampReceived") +) diff --git a/event/eventmanager.go b/event/eventmanager.go index 01a2599..58b1462 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -10,11 +10,11 @@ import ( type Event struct { EventType Type EventID string - Data map[string]string + Data map[Field]string } // NewEvent creates a new event object with a unique ID and the given type and data. -func NewEvent(eventType Type, data map[string]string) Event { +func NewEvent(eventType Type, data map[Field]string) Event { return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data} } diff --git a/event/eventmanager_test.go b/event/eventmanager_test.go index a2fd4c6..8828272 100644 --- a/event/eventmanager_test.go +++ b/event/eventmanager_test.go @@ -14,7 +14,7 @@ func TestEventManager(t *testing.T) { // We need to make this buffer at least 1, otherwise we will log an error! testChan := make(chan Event, 1) eventManager.Subscribe("TEST", testChan) - eventManager.Publish(Event{EventType: "TEST", Data: map[string]string{"Value": "Hello World"}}) + eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}}) event := <-testChan if event.EventType == "TEST" && event.Data["Value"] == "Hello World" { @@ -52,11 +52,11 @@ func TestEventManagerMultiple(t *testing.T) { eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel) eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel) - eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}}) - eventManager.Publish(Event{EventType: "GroupEvent", Data: map[string]string{"Value": "Hello World Group"}}) - eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}}) - eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[string]string{"Value": "Hello World Error"}}) - eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[string]string{"Value": "Noone should see this!"}}) + eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}}) + eventManager.Publish(Event{EventType: "GroupEvent", Data: map[Field]string{"Value": "Hello World Group"}}) + eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}}) + eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[Field]string{"Value": "Hello World Error"}}) + eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[Field]string{"Value": "Noone should see this!"}}) assertLength := func(len int, expected int, label string) { if len != expected { diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 39d003b..3f0188b 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -178,7 +178,7 @@ func (cp *cwtchPeer) GetProfile() *model.Profile { // PeerWithOnion is the entry point for cwtchPeer relationships func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[string]string{"Onion": onion})) + cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{"Onion": onion})) return nil } @@ -191,14 +191,14 @@ func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error { invite, err := group.Invite(group.InitialMessage) if err == nil { - cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[string]string{"Onion": onion, "Invite": string(invite)})) + cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[event.Field]string{"Onion": onion, "Invite": string(invite)})) } return err } // JoinServer manages a new server connection with the given onion address func (cp *cwtchPeer) JoinServer(onion string) { - cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[string]string{"Onion": onion})) + cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{"Onion": onion})) } // SendMessageToGroup attemps to sent the given message to the given group id. @@ -210,14 +210,14 @@ func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error { ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) if err == nil { - cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[string]string{"Server": group.GroupServer, "Ciphertext": string(ct), "Signature": string(sig)})) + cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{"Server": group.GroupServer, "Ciphertext": string(ct), "Signature": string(sig)})) } return err } func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { - event := event.NewEvent(event.SendMessageToPeer, map[string]string{"Peer": onion, "Message": message}) + event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{"Peer": onion, "Message": message}) cp.eventBus.Publish(event) return event.EventID } @@ -244,7 +244,7 @@ func (cp *cwtchPeer) TrustPeer(peer string) error { // BlockPeer blocks an existing peer relationship. func (cp *cwtchPeer) BlockPeer(peer string) error { err := cp.Profile.BlockPeer(peer) - cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[string]string{"Onion": peer})) + cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[event.Field]string{"Onion": peer})) return err } @@ -259,7 +259,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) { } func (cp *cwtchPeer) Listen() { - cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[string]string{})) + cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{})) } // Shutdown kills all connections and cleans up all goroutines for the peer @@ -283,7 +283,7 @@ func (cp *cwtchPeer) eventHandler() { ok, groupID, message := cp.Profile.AttemptDecryption([]byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"])) log.Debugf("ok,gid,msg = %v,%v,%v", ok, groupID, message) if ok { - cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[string]string{"Data": message.Message, "GroupID": groupID, "Onion": message.PeerID})) + cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.String(), event.TimestampSent: message.Timestamp.String(), "Data": message.Message, "GroupID": groupID, "Onion": message.PeerID})) } case event.NewGroupInvite: var groupInvite protocol.GroupChatInvite diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 5558eae..0e9bb30 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -14,6 +14,7 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/crypto/ed25519" "sync" + "time" ) // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. @@ -109,7 +110,7 @@ func (e *Engine) listenFn() { ra := new(application.RicochetApplication) onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort) if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ { - e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname(), "Error": err.Error()})) + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{"Identity": e.Identity.Hostname(), "Error": err.Error()})) return } @@ -140,7 +141,7 @@ func (e *Engine) listenFn() { e.started = true e.app = ra ra.Run(onionService) - e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname()})) + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{"Identity": e.Identity.Hostname()})) return } @@ -197,7 +198,7 @@ func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error { func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { // Publish Event so that a Profile Engine can deal with it. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! - e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[string]string{"Ciphertext": string(gm.GetCiphertext()), "Signature": string(gm.GetSignature())})) + e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{"Ciphertext": string(gm.GetCiphertext()), "Signature": string(gm.GetSignature())})) } // JoinServer manages a new server connection with the given onion address @@ -253,12 +254,12 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) marshal, err := proto.Marshal(gci) if err == nil { - cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[string]string{"Onion": cph.Onion, "GroupInvite": string(marshal)})) + cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().String(), "Onion": cph.Onion, "GroupInvite": string(marshal)})) } } // HandlePacket handles the Cwtch cwtchPeer Data Channel func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte { - cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[string]string{"Onion": cph.Onion, "Data": string(data)})) + cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().String(), "Onion": cph.Onion, "Data": string(data)})) return []byte{} // TODO remove this }