diff --git a/app/utils/utils.go b/app/utils/utils.go index dd30d3e..8b2d473 100644 --- a/app/utils/utils.go +++ b/app/utils/utils.go @@ -6,7 +6,7 @@ import ( "time" ) -// WaitGetPeer is a helper function for utility apps not writen using the event bus +// WaitGetPeer is a helper function for utility apps not written using the event bus // Proper use of an App is to call CreatePeer and then process the NewPeer event // however for small utility use, this function which polls the app until the peer is created // may fill that usecase better diff --git a/event/common.go b/event/common.go index b1b5e0e..139b3b8 100644 --- a/event/common.go +++ b/event/common.go @@ -41,6 +41,11 @@ const ( SendMessageToPeer = Type("SendMessageToPeer") NewMessageFromPeer = Type("NewMessageFromPeer") + // Peer acknowledges a previously sent message + // attributes + // EventID: The original event id that the peer is responding too. + PeerAcknowledgement = Type("PeerAcknowledgement") + // attributes: // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"] // Error: string describing the error @@ -154,8 +159,10 @@ const ( Error = Field("Error") - Progreess = Field("Progress") - Status = Field("Status") + Progreess = Field("Progress") + Status = Field("Status") + EventID = Field("EventID") + EventContext = Field("EventContext") ) // Defining Common errors diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 22c3739..c2bc0b4 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -101,18 +101,21 @@ func (e *engine) eventHandler() { case event.PeerRequest: e.peerWithOnion(ev.Data[event.RemotePeer]) case event.InvitePeerToGroup: - e.inviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite])) + e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], "im.cwtch.invite", []byte(ev.Data[event.GroupInvite])) case event.JoinServer: e.joinServer(ev.Data[event.GroupServer]) case event.SendMessageToGroup: e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) case event.SendMessageToPeer: - log.Debugf("Sending Message to Peer.....") - connection, err := e.service.GetConnection(ev.Data[event.RemotePeer]) + // TODO: remove this passthrough once the UI is integrated. + context, ok := ev.Data[event.EventContext] + if !ok { + context = "im.cwtch.raw" + } + err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.GroupInvite])) if err != nil { e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } - connection.Send([]byte(ev.Data[event.Data])) case event.BlockPeer: e.blocked.Store(ev.Data[event.RemotePeer], true) connection, err := e.service.GetConnection(ev.Data[event.RemotePeer]) @@ -131,6 +134,7 @@ func (e *engine) eventHandler() { func (e *engine) listenFn() { peerAppTemplate := new(PeerApp) peerAppTemplate.MessageHandler = e.handlePeerMessage + peerAppTemplate.OnAcknowledgement = e.peerAck peerAppTemplate.OnAuth = e.peerAuthed peerAppTemplate.OnClose = e.peerDisconnected err := e.service.Listen(peerAppTemplate) @@ -149,6 +153,7 @@ func (e *engine) Shutdown() { func (e *engine) peerWithOnion(onion string) { peerAppTemplate := new(PeerApp) peerAppTemplate.MessageHandler = e.handlePeerMessage + peerAppTemplate.OnAcknowledgement = e.peerAck peerAppTemplate.OnAuth = e.peerAuthed peerAppTemplate.OnClose = e.peerDisconnected e.service.Connect(onion, peerAppTemplate) @@ -161,6 +166,12 @@ func (e *engine) peerAuthed(onion string) { })) } +func (e *engine) peerAck(eventID string) { + e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{ + event.EventID: eventID, + })) +} + func (e *engine) peerDisconnected(onion string) { e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ event.RemotePeer: string(onion), @@ -168,13 +179,13 @@ func (e *engine) peerDisconnected(onion string) { })) } -// inviteOnionToGroup kicks off the invite process -func (e *engine) inviteOnionToGroup(onion string, invite []byte) error { +// sendMessageToPeer sends a message to a peer under a given context +func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error { conn, err := e.service.GetConnection(onion) if err == nil { peerApp, ok := conn.App.(*PeerApp) if ok { - peerApp.SendMessage(invite) + peerApp.SendMessage(PeerMessage{eventID, context, message}) return nil } panic("this should never happen") diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index ddb325d..08900df 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -3,16 +3,25 @@ package connections import ( "cwtch.im/tapir" "cwtch.im/tapir/applications" + "encoding/json" "git.openprivacy.ca/openprivacy/libricochet-go/log" ) // PeerApp encapsulates the behaviour of a Cwtch Peer type PeerApp struct { applications.AuthApp - connection *tapir.Connection - MessageHandler func(string, []byte) - OnAuth func(string) - OnClose func(string) + connection *tapir.Connection + MessageHandler func(string, []byte) + OnAuth func(string) + OnClose func(string) + OnAcknowledgement func(string) +} + +// PeerMessage is an encapsulation that can be used by higher level applications +type PeerMessage struct { + ID string // A unique Message ID (primarily used for acknowledgments) + Context string // A unique context identifier i.e. im.cwtch.chat + Data []byte // The serialized data packet. } // NewInstance should always return a new instantiation of the application. @@ -43,14 +52,26 @@ func (pa PeerApp) listen() { message := pa.connection.Expect() if len(message) == 0 { log.Errorf("0 byte read, socket has likely failed. Closing the listen goroutine") + pa.OnClose(pa.connection.Hostname) return } - pa.MessageHandler(pa.connection.Hostname, message) + var peerMessage PeerMessage + err := json.Unmarshal(message, &peerMessage) + if err == nil { + if peerMessage.Context == "im.cwtch.acknowledgement" { + pa.OnAcknowledgement(peerMessage.ID) + } else { + pa.MessageHandler(pa.connection.Hostname, peerMessage.Data) + } + } else { + log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err) + } } } // SendMessage sends the peer a preformatted message // NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol -func (pa PeerApp) SendMessage(message []byte) { - pa.connection.Send(message) +func (pa PeerApp) SendMessage(message PeerMessage) { + serialized, _ := json.Marshal(message) + pa.connection.Send(serialized) }