forked from cwtch.im/cwtch
314 lines
11 KiB
Go
314 lines
11 KiB
Go
package connections
|
|
|
|
import (
|
|
"cwtch.im/cwtch/event"
|
|
"cwtch.im/cwtch/protocol"
|
|
"cwtch.im/tapir"
|
|
"cwtch.im/tapir/applications"
|
|
"cwtch.im/tapir/networks/tor"
|
|
"cwtch.im/tapir/primitives"
|
|
"errors"
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
|
"github.com/golang/protobuf/proto"
|
|
"golang.org/x/crypto/ed25519"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type engine struct {
|
|
queue event.Queue
|
|
connectionsManager *Manager
|
|
|
|
// Engine Attributes
|
|
identity primitives.Identity
|
|
acn connectivity.ACN
|
|
|
|
// Engine State
|
|
started bool
|
|
|
|
// Blocklist
|
|
blocked sync.Map
|
|
|
|
// Pointer to the Global Event Manager
|
|
eventManager event.Manager
|
|
|
|
// Nextgen Tapir Service
|
|
service tapir.Service
|
|
|
|
// Required for listen(), inaccessible from identity
|
|
privateKey ed25519.PrivateKey
|
|
|
|
shuttingDown bool
|
|
}
|
|
|
|
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
|
|
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
|
|
type Engine interface {
|
|
ACN() connectivity.ACN
|
|
EventManager() event.Manager
|
|
Shutdown()
|
|
}
|
|
|
|
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
|
|
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, blockedPeers []string) Engine {
|
|
engine := new(engine)
|
|
engine.identity = identity
|
|
engine.privateKey = privateKey
|
|
engine.queue = event.NewQueue()
|
|
go engine.eventHandler()
|
|
|
|
engine.acn = acn
|
|
engine.connectionsManager = NewConnectionsManager(engine.acn)
|
|
go engine.connectionsManager.AttemptReconnections()
|
|
|
|
// Init the Server running the Simple App.
|
|
engine.service = new(tor.BaseOnionService)
|
|
engine.service.Init(acn, privateKey, &identity)
|
|
|
|
engine.eventManager = eventManager
|
|
|
|
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
|
|
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
|
|
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
|
|
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
|
|
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue)
|
|
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue)
|
|
engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
|
|
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)
|
|
|
|
engine.eventManager.Subscribe(event.BlockPeer, engine.queue)
|
|
engine.eventManager.Subscribe(event.UnblockPeer, engine.queue)
|
|
for _, peer := range blockedPeers {
|
|
engine.blocked.Store(peer, true)
|
|
}
|
|
return engine
|
|
}
|
|
|
|
func (e *engine) ACN() connectivity.ACN {
|
|
return e.acn
|
|
}
|
|
|
|
func (e *engine) EventManager() event.Manager {
|
|
return e.eventManager
|
|
}
|
|
|
|
// eventHandler process events from other subsystems
|
|
func (e *engine) eventHandler() {
|
|
for {
|
|
ev := e.queue.Next()
|
|
switch ev.EventType {
|
|
case event.StatusRequest:
|
|
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
|
|
case event.PeerRequest:
|
|
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
|
case event.InvitePeerToGroup:
|
|
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
|
|
case event.JoinServer:
|
|
e.joinServer(ev.Data[event.GroupServer])
|
|
case event.DeleteContact:
|
|
onion := ev.Data[event.RemotePeer]
|
|
e.deleteConnection(onion)
|
|
case event.DeleteGroup:
|
|
// TODO: There isn't a way here to determine if other Groups are using a server connection...
|
|
case event.SendMessageToGroup:
|
|
e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
|
case event.SendMessageToPeer:
|
|
// TODO: remove this passthrough once the UI is integrated.
|
|
context, ok := ev.Data[event.EventContext]
|
|
if !ok {
|
|
context = event.ContextRaw
|
|
}
|
|
err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
|
|
if err != nil {
|
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
|
}
|
|
case event.UnblockPeer:
|
|
// We simply remove the peer from our blocklist
|
|
// The UI has the responsibility to reinitiate contact with the peer.
|
|
// (this should happen periodically in any case)
|
|
e.blocked.Delete(ev.Data[event.RemotePeer])
|
|
case event.BlockPeer:
|
|
e.blocked.Store(ev.Data[event.RemotePeer], true)
|
|
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
|
|
if connection != nil && err == nil {
|
|
connection.Close()
|
|
}
|
|
// Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before
|
|
// an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because
|
|
// there isn't an active connection and we are stuck waiting for tor to time out)
|
|
e.peerDisconnected(ev.Data[event.RemotePeer])
|
|
case event.ProtocolEngineStartListen:
|
|
go e.listenFn()
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *engine) createPeerTemplate() *PeerApp {
|
|
peerAppTemplate := new(PeerApp)
|
|
peerAppTemplate.IsBlocked = func(onion string) bool {
|
|
_, blocked := e.blocked.Load(onion)
|
|
return blocked
|
|
}
|
|
peerAppTemplate.MessageHandler = e.handlePeerMessage
|
|
peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck)
|
|
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
|
|
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
|
|
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
|
|
return peerAppTemplate
|
|
}
|
|
|
|
// Listen sets up an onion listener to process incoming cwtch messages
|
|
func (e *engine) listenFn() {
|
|
err := e.service.Listen(e.createPeerTemplate())
|
|
if !e.shuttingDown {
|
|
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
|
|
}
|
|
return
|
|
}
|
|
|
|
// Shutdown tears down the eventHandler goroutine
|
|
func (e *engine) Shutdown() {
|
|
e.shuttingDown = true
|
|
e.connectionsManager.Shutdown()
|
|
e.service.Shutdown()
|
|
e.queue.Shutdown()
|
|
}
|
|
|
|
// peerWithOnion is the entry point for cwtchPeer relationships
|
|
// needs to be run in a goroutine as will block on Open.
|
|
func (e *engine) peerWithOnion(onion string) {
|
|
_, blocked := e.blocked.Load(onion)
|
|
if !blocked {
|
|
e.ignoreOnShutdown(e.peerConnecting)(onion)
|
|
connected, err := e.service.Connect(onion, e.createPeerTemplate())
|
|
|
|
// If we are already connected...check if we are authed and issue an auth event
|
|
// (This allows the ui to be stateless)
|
|
if connected && err != nil {
|
|
conn, err := e.service.GetConnection(onion)
|
|
if err == nil {
|
|
if conn.HasCapability(applications.AuthCapability) {
|
|
e.ignoreOnShutdown(e.peerAuthed)(onion)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
|
|
if !connected && err != nil {
|
|
e.ignoreOnShutdown(e.peerDisconnected)(onion)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
|
|
return func(x string) {
|
|
if !e.shuttingDown {
|
|
f(x)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
|
|
return func(x, y string) {
|
|
if !e.shuttingDown {
|
|
f(x, y)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *engine) peerAuthed(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
|
event.RemotePeer: string(onion),
|
|
event.ConnectionState: ConnectionStateName[AUTHENTICATED],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) peerConnecting(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
|
event.RemotePeer: string(onion),
|
|
event.ConnectionState: ConnectionStateName[CONNECTING],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) peerAck(onion string, eventID string) {
|
|
e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{
|
|
event.EventID: eventID,
|
|
event.RemotePeer: onion,
|
|
}))
|
|
}
|
|
|
|
func (e *engine) peerDisconnected(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
|
event.RemotePeer: string(onion),
|
|
event.ConnectionState: ConnectionStateName[DISCONNECTED],
|
|
}))
|
|
}
|
|
|
|
// sendMessageToPeer sends a message to a peer under a given context
|
|
func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
|
|
conn, err := e.service.GetConnection(onion)
|
|
if err == nil {
|
|
peerApp, ok := (conn.App()).(*PeerApp)
|
|
if ok {
|
|
peerApp.SendMessage(PeerMessage{eventID, context, message})
|
|
return nil
|
|
}
|
|
return errors.New("failed type assertion conn.App != PeerApp")
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (e *engine) deleteConnection(id string) {
|
|
conn, err := e.service.GetConnection(id)
|
|
if err == nil {
|
|
conn.Close()
|
|
}
|
|
}
|
|
|
|
// receiveGroupMessage is a callback function that processes GroupMessages from a given server
|
|
func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
|
|
// Publish Event so that a Profile Engine can deal with it.
|
|
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
|
|
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
|
|
}
|
|
|
|
// joinServer manages a new server connection with the given onion address
|
|
func (e *engine) joinServer(onion string) {
|
|
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage)
|
|
}
|
|
|
|
// sendMessageToGroup attempts to sent the given message to the given group id.
|
|
func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
|
|
psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
|
|
if psc == nil {
|
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: "server is offline or the connection has yet to finalize"}))
|
|
}
|
|
gm := &protocol.GroupMessage{
|
|
Ciphertext: ct,
|
|
Signature: sig,
|
|
}
|
|
err := psc.SendGroupMessage(gm)
|
|
|
|
if err != nil {
|
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: err.Error()}))
|
|
}
|
|
}
|
|
|
|
func (e *engine) handlePeerMessage(hostname string, context string, message []byte) {
|
|
log.Debugf("New message from peer: %v %v", hostname, context)
|
|
if context == event.ContextInvite {
|
|
cpp := &protocol.CwtchPeerPacket{}
|
|
err := proto.Unmarshal(message, cpp)
|
|
if err == nil && cpp.GetGroupChatInvite() != nil {
|
|
marshal, _ := proto.Marshal(cpp.GetGroupChatInvite())
|
|
e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(marshal)}))
|
|
}
|
|
} else {
|
|
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
|
|
}
|
|
}
|