Merge branch 'event_times' of cwtch.im/cwtch into master

This commit is contained in:
erinn 2019-01-21 19:58:08 +00:00 committed by Gogs
commit 9054006899
5 changed files with 31 additions and 21 deletions

View File

@ -25,3 +25,12 @@ const (
SendMessageToPeer = Type("SendMessageToPeer") SendMessageToPeer = Type("SendMessageToPeer")
NewMessageFromPeer = Type("NewMessageFromPeer") NewMessageFromPeer = Type("NewMessageFromPeer")
) )
// Field defines common event attributes
type Field string
// Defining Common Field Types
const (
TimestampSent = Field("TimestampSent")
TimestampReceived = Field("TimestampReceived")
)

View File

@ -10,11 +10,11 @@ import (
type Event struct { type Event struct {
EventType Type EventType Type
EventID string EventID string
Data map[string]string Data map[Field]string
} }
// NewEvent creates a new event object with a unique ID and the given type and data. // NewEvent creates a new event object with a unique ID and the given type and data.
func NewEvent(eventType Type, data map[string]string) Event { func NewEvent(eventType Type, data map[Field]string) Event {
return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data} return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
} }

View File

@ -14,7 +14,7 @@ func TestEventManager(t *testing.T) {
// We need to make this buffer at least 1, otherwise we will log an error! // We need to make this buffer at least 1, otherwise we will log an error!
testChan := make(chan Event, 1) testChan := make(chan Event, 1)
eventManager.Subscribe("TEST", testChan) eventManager.Subscribe("TEST", testChan)
eventManager.Publish(Event{EventType: "TEST", Data: map[string]string{"Value": "Hello World"}}) eventManager.Publish(Event{EventType: "TEST", Data: map[Field]string{"Value": "Hello World"}})
event := <-testChan event := <-testChan
if event.EventType == "TEST" && event.Data["Value"] == "Hello World" { if event.EventType == "TEST" && event.Data["Value"] == "Hello World" {
@ -52,11 +52,11 @@ func TestEventManagerMultiple(t *testing.T) {
eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel) eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel)
eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel) eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel)
eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}}) eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}})
eventManager.Publish(Event{EventType: "GroupEvent", Data: map[string]string{"Value": "Hello World Group"}}) eventManager.Publish(Event{EventType: "GroupEvent", Data: map[Field]string{"Value": "Hello World Group"}})
eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}}) eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}})
eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[string]string{"Value": "Hello World Error"}}) eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[Field]string{"Value": "Hello World Error"}})
eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[string]string{"Value": "Noone should see this!"}}) eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[Field]string{"Value": "Noone should see this!"}})
assertLength := func(len int, expected int, label string) { assertLength := func(len int, expected int, label string) {
if len != expected { if len != expected {

View File

@ -178,7 +178,7 @@ func (cp *cwtchPeer) GetProfile() *model.Profile {
// PeerWithOnion is the entry point for cwtchPeer relationships // PeerWithOnion is the entry point for cwtchPeer relationships
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[string]string{"Onion": onion})) cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{"Onion": onion}))
return nil return nil
} }
@ -191,14 +191,14 @@ func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
invite, err := group.Invite(group.InitialMessage) invite, err := group.Invite(group.InitialMessage)
if err == nil { if err == nil {
cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[string]string{"Onion": onion, "Invite": string(invite)})) cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[event.Field]string{"Onion": onion, "Invite": string(invite)}))
} }
return err return err
} }
// JoinServer manages a new server connection with the given onion address // JoinServer manages a new server connection with the given onion address
func (cp *cwtchPeer) JoinServer(onion string) { func (cp *cwtchPeer) JoinServer(onion string) {
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[string]string{"Onion": onion})) cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{"Onion": onion}))
} }
// SendMessageToGroup attemps to sent the given message to the given group id. // SendMessageToGroup attemps to sent the given message to the given group id.
@ -210,14 +210,14 @@ func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
if err == nil { if err == nil {
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[string]string{"Server": group.GroupServer, "Ciphertext": string(ct), "Signature": string(sig)})) cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{"Server": group.GroupServer, "Ciphertext": string(ct), "Signature": string(sig)}))
} }
return err return err
} }
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
event := event.NewEvent(event.SendMessageToPeer, map[string]string{"Peer": onion, "Message": message}) event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{"Peer": onion, "Message": message})
cp.eventBus.Publish(event) cp.eventBus.Publish(event)
return event.EventID return event.EventID
} }
@ -244,7 +244,7 @@ func (cp *cwtchPeer) TrustPeer(peer string) error {
// BlockPeer blocks an existing peer relationship. // BlockPeer blocks an existing peer relationship.
func (cp *cwtchPeer) BlockPeer(peer string) error { func (cp *cwtchPeer) BlockPeer(peer string) error {
err := cp.Profile.BlockPeer(peer) err := cp.Profile.BlockPeer(peer)
cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[string]string{"Onion": peer})) cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[event.Field]string{"Onion": peer}))
return err return err
} }
@ -259,7 +259,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
} }
func (cp *cwtchPeer) Listen() { func (cp *cwtchPeer) Listen() {
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[string]string{})) cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{}))
} }
// Shutdown kills all connections and cleans up all goroutines for the peer // Shutdown kills all connections and cleans up all goroutines for the peer
@ -283,7 +283,7 @@ func (cp *cwtchPeer) eventHandler() {
ok, groupID, message := cp.Profile.AttemptDecryption([]byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"])) ok, groupID, message := cp.Profile.AttemptDecryption([]byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"]))
log.Debugf("ok,gid,msg = %v,%v,%v", ok, groupID, message) log.Debugf("ok,gid,msg = %v,%v,%v", ok, groupID, message)
if ok { if ok {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[string]string{"Data": message.Message, "GroupID": groupID, "Onion": message.PeerID})) cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.String(), event.TimestampSent: message.Timestamp.String(), "Data": message.Message, "GroupID": groupID, "Onion": message.PeerID}))
} }
case event.NewGroupInvite: case event.NewGroupInvite:
var groupInvite protocol.GroupChatInvite var groupInvite protocol.GroupChatInvite

View File

@ -14,6 +14,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"sync" "sync"
"time"
) )
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
@ -109,7 +110,7 @@ func (e *Engine) listenFn() {
ra := new(application.RicochetApplication) ra := new(application.RicochetApplication)
onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort) onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort)
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ { if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname(), "Error": err.Error()})) e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{"Identity": e.Identity.Hostname(), "Error": err.Error()}))
return return
} }
@ -140,7 +141,7 @@ func (e *Engine) listenFn() {
e.started = true e.started = true
e.app = ra e.app = ra
ra.Run(onionService) ra.Run(onionService)
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname()})) e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{"Identity": e.Identity.Hostname()}))
return return
} }
@ -197,7 +198,7 @@ func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error {
func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
// Publish Event so that a Profile Engine can deal with it. // Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[string]string{"Ciphertext": string(gm.GetCiphertext()), "Signature": string(gm.GetSignature())})) e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{"Ciphertext": string(gm.GetCiphertext()), "Signature": string(gm.GetSignature())}))
} }
// JoinServer manages a new server connection with the given onion address // JoinServer manages a new server connection with the given onion address
@ -253,12 +254,12 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
marshal, err := proto.Marshal(gci) marshal, err := proto.Marshal(gci)
if err == nil { if err == nil {
cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[string]string{"Onion": cph.Onion, "GroupInvite": string(marshal)})) cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().String(), "Onion": cph.Onion, "GroupInvite": string(marshal)}))
} }
} }
// HandlePacket handles the Cwtch cwtchPeer Data Channel // HandlePacket handles the Cwtch cwtchPeer Data Channel
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte { func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[string]string{"Onion": cph.Onion, "Data": string(data)})) cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().String(), "Onion": cph.Onion, "Data": string(data)}))
return []byte{} // TODO remove this return []byte{} // TODO remove this
} }