First cut of automatic acknowledgements and protocol contexts
This commit is contained in:
parent
d8ce3bee4e
commit
4c16ec379f
|
@ -6,7 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WaitGetPeer is a helper function for utility apps not writen using the event bus
|
// WaitGetPeer is a helper function for utility apps not written using the event bus
|
||||||
// Proper use of an App is to call CreatePeer and then process the NewPeer event
|
// Proper use of an App is to call CreatePeer and then process the NewPeer event
|
||||||
// however for small utility use, this function which polls the app until the peer is created
|
// however for small utility use, this function which polls the app until the peer is created
|
||||||
// may fill that usecase better
|
// may fill that usecase better
|
||||||
|
|
|
@ -41,6 +41,11 @@ const (
|
||||||
SendMessageToPeer = Type("SendMessageToPeer")
|
SendMessageToPeer = Type("SendMessageToPeer")
|
||||||
NewMessageFromPeer = Type("NewMessageFromPeer")
|
NewMessageFromPeer = Type("NewMessageFromPeer")
|
||||||
|
|
||||||
|
// Peer acknowledges a previously sent message
|
||||||
|
// attributes
|
||||||
|
// EventID: The original event id that the peer is responding too.
|
||||||
|
PeerAcknowledgement = Type("PeerAcknowledgement")
|
||||||
|
|
||||||
// attributes:
|
// attributes:
|
||||||
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"]
|
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"]
|
||||||
// Error: string describing the error
|
// Error: string describing the error
|
||||||
|
@ -154,8 +159,10 @@ const (
|
||||||
|
|
||||||
Error = Field("Error")
|
Error = Field("Error")
|
||||||
|
|
||||||
Progreess = Field("Progress")
|
Progreess = Field("Progress")
|
||||||
Status = Field("Status")
|
Status = Field("Status")
|
||||||
|
EventID = Field("EventID")
|
||||||
|
EventContext = Field("EventContext")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Defining Common errors
|
// Defining Common errors
|
||||||
|
|
|
@ -101,18 +101,21 @@ func (e *engine) eventHandler() {
|
||||||
case event.PeerRequest:
|
case event.PeerRequest:
|
||||||
e.peerWithOnion(ev.Data[event.RemotePeer])
|
e.peerWithOnion(ev.Data[event.RemotePeer])
|
||||||
case event.InvitePeerToGroup:
|
case event.InvitePeerToGroup:
|
||||||
e.inviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite]))
|
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], "im.cwtch.invite", []byte(ev.Data[event.GroupInvite]))
|
||||||
case event.JoinServer:
|
case event.JoinServer:
|
||||||
e.joinServer(ev.Data[event.GroupServer])
|
e.joinServer(ev.Data[event.GroupServer])
|
||||||
case event.SendMessageToGroup:
|
case event.SendMessageToGroup:
|
||||||
e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
||||||
case event.SendMessageToPeer:
|
case event.SendMessageToPeer:
|
||||||
log.Debugf("Sending Message to Peer.....")
|
// TODO: remove this passthrough once the UI is integrated.
|
||||||
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
|
context, ok := ev.Data[event.EventContext]
|
||||||
|
if !ok {
|
||||||
|
context = "im.cwtch.raw"
|
||||||
|
}
|
||||||
|
err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.GroupInvite]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
||||||
}
|
}
|
||||||
connection.Send([]byte(ev.Data[event.Data]))
|
|
||||||
case event.BlockPeer:
|
case event.BlockPeer:
|
||||||
e.blocked.Store(ev.Data[event.RemotePeer], true)
|
e.blocked.Store(ev.Data[event.RemotePeer], true)
|
||||||
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
|
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
|
||||||
|
@ -131,6 +134,7 @@ func (e *engine) eventHandler() {
|
||||||
func (e *engine) listenFn() {
|
func (e *engine) listenFn() {
|
||||||
peerAppTemplate := new(PeerApp)
|
peerAppTemplate := new(PeerApp)
|
||||||
peerAppTemplate.MessageHandler = e.handlePeerMessage
|
peerAppTemplate.MessageHandler = e.handlePeerMessage
|
||||||
|
peerAppTemplate.OnAcknowledgement = e.peerAck
|
||||||
peerAppTemplate.OnAuth = e.peerAuthed
|
peerAppTemplate.OnAuth = e.peerAuthed
|
||||||
peerAppTemplate.OnClose = e.peerDisconnected
|
peerAppTemplate.OnClose = e.peerDisconnected
|
||||||
err := e.service.Listen(peerAppTemplate)
|
err := e.service.Listen(peerAppTemplate)
|
||||||
|
@ -149,6 +153,7 @@ func (e *engine) Shutdown() {
|
||||||
func (e *engine) peerWithOnion(onion string) {
|
func (e *engine) peerWithOnion(onion string) {
|
||||||
peerAppTemplate := new(PeerApp)
|
peerAppTemplate := new(PeerApp)
|
||||||
peerAppTemplate.MessageHandler = e.handlePeerMessage
|
peerAppTemplate.MessageHandler = e.handlePeerMessage
|
||||||
|
peerAppTemplate.OnAcknowledgement = e.peerAck
|
||||||
peerAppTemplate.OnAuth = e.peerAuthed
|
peerAppTemplate.OnAuth = e.peerAuthed
|
||||||
peerAppTemplate.OnClose = e.peerDisconnected
|
peerAppTemplate.OnClose = e.peerDisconnected
|
||||||
e.service.Connect(onion, peerAppTemplate)
|
e.service.Connect(onion, peerAppTemplate)
|
||||||
|
@ -161,6 +166,12 @@ func (e *engine) peerAuthed(onion string) {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *engine) peerAck(eventID string) {
|
||||||
|
e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{
|
||||||
|
event.EventID: eventID,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
func (e *engine) peerDisconnected(onion string) {
|
func (e *engine) peerDisconnected(onion string) {
|
||||||
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
||||||
event.RemotePeer: string(onion),
|
event.RemotePeer: string(onion),
|
||||||
|
@ -168,13 +179,13 @@ func (e *engine) peerDisconnected(onion string) {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// inviteOnionToGroup kicks off the invite process
|
// sendMessageToPeer sends a message to a peer under a given context
|
||||||
func (e *engine) inviteOnionToGroup(onion string, invite []byte) error {
|
func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
|
||||||
conn, err := e.service.GetConnection(onion)
|
conn, err := e.service.GetConnection(onion)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
peerApp, ok := conn.App.(*PeerApp)
|
peerApp, ok := conn.App.(*PeerApp)
|
||||||
if ok {
|
if ok {
|
||||||
peerApp.SendMessage(invite)
|
peerApp.SendMessage(PeerMessage{eventID, context, message})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
panic("this should never happen")
|
panic("this should never happen")
|
||||||
|
|
|
@ -3,16 +3,25 @@ package connections
|
||||||
import (
|
import (
|
||||||
"cwtch.im/tapir"
|
"cwtch.im/tapir"
|
||||||
"cwtch.im/tapir/applications"
|
"cwtch.im/tapir/applications"
|
||||||
|
"encoding/json"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerApp encapsulates the behaviour of a Cwtch Peer
|
// PeerApp encapsulates the behaviour of a Cwtch Peer
|
||||||
type PeerApp struct {
|
type PeerApp struct {
|
||||||
applications.AuthApp
|
applications.AuthApp
|
||||||
connection *tapir.Connection
|
connection *tapir.Connection
|
||||||
MessageHandler func(string, []byte)
|
MessageHandler func(string, []byte)
|
||||||
OnAuth func(string)
|
OnAuth func(string)
|
||||||
OnClose func(string)
|
OnClose func(string)
|
||||||
|
OnAcknowledgement func(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeerMessage is an encapsulation that can be used by higher level applications
|
||||||
|
type PeerMessage struct {
|
||||||
|
ID string // A unique Message ID (primarily used for acknowledgments)
|
||||||
|
Context string // A unique context identifier i.e. im.cwtch.chat
|
||||||
|
Data []byte // The serialized data packet.
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInstance should always return a new instantiation of the application.
|
// NewInstance should always return a new instantiation of the application.
|
||||||
|
@ -43,14 +52,26 @@ func (pa PeerApp) listen() {
|
||||||
message := pa.connection.Expect()
|
message := pa.connection.Expect()
|
||||||
if len(message) == 0 {
|
if len(message) == 0 {
|
||||||
log.Errorf("0 byte read, socket has likely failed. Closing the listen goroutine")
|
log.Errorf("0 byte read, socket has likely failed. Closing the listen goroutine")
|
||||||
|
pa.OnClose(pa.connection.Hostname)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pa.MessageHandler(pa.connection.Hostname, message)
|
var peerMessage PeerMessage
|
||||||
|
err := json.Unmarshal(message, &peerMessage)
|
||||||
|
if err == nil {
|
||||||
|
if peerMessage.Context == "im.cwtch.acknowledgement" {
|
||||||
|
pa.OnAcknowledgement(peerMessage.ID)
|
||||||
|
} else {
|
||||||
|
pa.MessageHandler(pa.connection.Hostname, peerMessage.Data)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMessage sends the peer a preformatted message
|
// SendMessage sends the peer a preformatted message
|
||||||
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
||||||
func (pa PeerApp) SendMessage(message []byte) {
|
func (pa PeerApp) SendMessage(message PeerMessage) {
|
||||||
pa.connection.Send(message)
|
serialized, _ := json.Marshal(message)
|
||||||
|
pa.connection.Send(serialized)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue