cwtch/protocol/connections/engine.go

225 lines
8.0 KiB
Go
Raw Normal View History

2019-01-04 21:44:21 +00:00
package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
2019-07-17 19:10:52 +00:00
"cwtch.im/tapir"
"cwtch.im/tapir/networks/tor"
2019-01-04 21:44:21 +00:00
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519"
2019-01-08 18:58:01 +00:00
"sync"
"time"
2019-01-04 21:44:21 +00:00
)
type engine struct {
2019-01-04 21:44:21 +00:00
queue *event.Queue
connectionsManager *Manager
// Engine Attributes
identity identity.Identity
acn connectivity.ACN
2019-01-04 21:44:21 +00:00
// Engine State
started bool
2019-01-08 18:58:01 +00:00
// Blocklist
blocked sync.Map
2019-01-04 21:44:21 +00:00
// Pointer to the Global Event Manager
eventManager event.Manager
2019-07-17 19:10:52 +00:00
// Nextgen Tapir Service
service tapir.Service
// Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey
}
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
type Engine interface {
Identity() identity.Identity
ACN() connectivity.ACN
EventManager() event.Manager
Shutdown()
2019-01-04 21:44:21 +00:00
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, blockedPeers []string) Engine {
engine := new(engine)
engine.identity = identity
2019-01-04 21:44:21 +00:00
engine.privateKey = privateKey
engine.queue = event.NewEventQueue(100)
go engine.eventHandler()
engine.acn = acn
engine.connectionsManager = NewConnectionsManager(engine.acn)
2019-01-04 21:44:21 +00:00
go engine.connectionsManager.AttemptReconnections()
2019-07-17 19:10:52 +00:00
// Init the Server running the Simple App.
engine.service = new(tor.BaseOnionService)
engine.service.Init(acn, privateKey, identity)
2019-01-04 21:44:21 +00:00
engine.eventManager = eventManager
2019-01-08 18:58:01 +00:00
2019-01-04 21:44:21 +00:00
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
2019-01-08 18:58:01 +00:00
engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel)
for _, peer := range blockedPeers {
engine.blocked.Store(peer, true)
}
2019-01-04 21:44:21 +00:00
return engine
}
func (e *engine) ACN() connectivity.ACN {
return e.acn
}
func (e *engine) Identity() identity.Identity {
return e.identity
}
func (e *engine) EventManager() event.Manager {
return e.eventManager
}
2019-01-04 21:44:21 +00:00
// eventHandler process events from other subsystems
func (e *engine) eventHandler() {
2019-01-04 21:44:21 +00:00
for {
ev := e.queue.Next()
switch ev.EventType {
case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
case event.PeerRequest:
e.peerWithOnion(ev.Data[event.RemotePeer])
2019-01-04 21:44:21 +00:00
case event.InvitePeerToGroup:
e.inviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite]))
2019-01-04 21:44:21 +00:00
case event.JoinServer:
e.joinServer(ev.Data[event.GroupServer])
2019-01-04 21:44:21 +00:00
case event.SendMessageToGroup:
e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
2019-01-04 21:44:21 +00:00
case event.SendMessageToPeer:
log.Debugf("Sending Message to Peer.....")
2019-07-17 19:10:52 +00:00
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
2019-01-04 21:44:21 +00:00
}
2019-07-17 19:10:52 +00:00
connection.Send([]byte(ev.Data[event.Data]))
2019-01-08 18:58:01 +00:00
case event.BlockPeer:
2019-01-21 20:08:03 +00:00
e.blocked.Store(ev.Data[event.RemotePeer], true)
2019-07-17 19:10:52 +00:00
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
if err != nil {
connection.Close()
}
2019-01-04 21:44:21 +00:00
case event.ProtocolEngineStartListen:
go e.listenFn()
default:
return
}
}
}
// Listen sets up an onion listener to process incoming cwtch messages
func (e *engine) listenFn() {
2019-07-17 19:10:52 +00:00
peerAppTemplate := new(PeerApp)
peerAppTemplate.MessageHandler = e.handlePeerMessage
peerAppTemplate.OnAuth = e.peerAuthed
peerAppTemplate.OnClose = e.peerDisconnected
err := e.service.Listen(peerAppTemplate)
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
2019-01-04 21:44:21 +00:00
return
}
// Shutdown tears down the eventHandler goroutine
func (e *engine) Shutdown() {
2019-07-17 19:10:52 +00:00
e.service.Shutdown()
2019-01-04 21:44:21 +00:00
e.connectionsManager.Shutdown()
e.queue.Shutdown()
}
// peerWithOnion is the entry point for cwtchPeer relationships
2019-07-17 19:10:52 +00:00
func (e *engine) peerWithOnion(onion string) {
peerAppTemplate := new(PeerApp)
peerAppTemplate.MessageHandler = e.handlePeerMessage
peerAppTemplate.OnAuth = e.peerAuthed
peerAppTemplate.OnClose = e.peerDisconnected
e.service.Connect(onion, peerAppTemplate)
}
func (e *engine) peerAuthed(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.ConnectionState: ConnectionStateName[AUTHENTICATED],
}))
}
func (e *engine) peerDisconnected(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.ConnectionState: ConnectionStateName[DISCONNECTED],
}))
2019-01-04 21:44:21 +00:00
}
// inviteOnionToGroup kicks off the invite process
func (e *engine) inviteOnionToGroup(onion string, invite []byte) error {
2019-07-17 19:10:52 +00:00
conn, err := e.service.GetConnection(onion)
if err == nil {
peerApp, ok := conn.App.(*PeerApp)
if ok {
peerApp.SendMessage(invite)
return nil
}
panic("this should never happen")
2019-01-04 21:44:21 +00:00
}
2019-07-17 19:10:52 +00:00
return err
2019-01-04 21:44:21 +00:00
}
// receiveGroupMessage is a callback function that processes GroupMessages from a given server
func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
2019-01-04 21:44:21 +00:00
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
2019-01-21 20:08:03 +00:00
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
2019-01-04 21:44:21 +00:00
}
// joinServer manages a new server connection with the given onion address
func (e *engine) joinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage)
2019-01-04 21:44:21 +00:00
}
// sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
2019-01-04 21:44:21 +00:00
psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
if psc == nil {
2019-02-20 20:58:05 +00:00
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: "server is offline or the connection has yet to finalize"}))
2019-01-04 21:44:21 +00:00
}
gm := &protocol.GroupMessage{
Ciphertext: ct,
Signature: sig,
}
err := psc.SendGroupMessage(gm)
2019-02-20 20:58:05 +00:00
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: err.Error()}))
}
2019-01-04 21:44:21 +00:00
}
2019-07-17 19:10:52 +00:00
func (e *engine) handlePeerMessage(hostname string, message []byte) {
cpp := &protocol.CwtchPeerPacket{}
err := proto.Unmarshal(message, cpp)
2019-01-04 21:44:21 +00:00
if err == nil {
2019-07-17 19:10:52 +00:00
if cpp.GetGroupChatInvite() != nil {
marshal, _ := proto.Marshal(cpp.GetGroupChatInvite())
e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(marshal)}))
}
2019-01-04 21:44:21 +00:00
}
2019-07-17 19:10:52 +00:00
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
2019-01-04 21:44:21 +00:00
}