503 lines
18 KiB
Go
503 lines
18 KiB
Go
package connections
|
|
|
|
import (
|
|
"cwtch.im/cwtch/event"
|
|
"cwtch.im/cwtch/model"
|
|
"cwtch.im/cwtch/protocol/groups"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"git.openprivacy.ca/cwtch.im/tapir"
|
|
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
|
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
|
"git.openprivacy.ca/openprivacy/connectivity"
|
|
torProvider "git.openprivacy.ca/openprivacy/connectivity/tor"
|
|
"git.openprivacy.ca/openprivacy/log"
|
|
"github.com/gtank/ristretto255"
|
|
"golang.org/x/crypto/ed25519"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type engine struct {
|
|
queue event.Queue
|
|
|
|
// Engine Attributes
|
|
identity primitives.Identity
|
|
acn connectivity.ACN
|
|
|
|
// Engine State
|
|
started bool
|
|
|
|
// Authorization list of contacts to authorization status
|
|
authorizations sync.Map // string(onion) => model.Authorization
|
|
|
|
// Block Unknown Contacts
|
|
blockUnknownContacts bool
|
|
|
|
// Pointer to the Global Event Manager
|
|
eventManager event.Manager
|
|
|
|
// Nextgen Tapir Service
|
|
service tapir.Service
|
|
|
|
// Nextgen Tapir Service
|
|
ephemeralServices sync.Map // string(onion) => tapir.Service
|
|
|
|
// Required for listen(), inaccessible from identity
|
|
privateKey ed25519.PrivateKey
|
|
|
|
shuttingDown bool
|
|
}
|
|
|
|
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
|
|
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
|
|
type Engine interface {
|
|
ACN() connectivity.ACN
|
|
EventManager() event.Manager
|
|
Shutdown()
|
|
}
|
|
|
|
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
|
|
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization) Engine {
|
|
engine := new(engine)
|
|
engine.identity = identity
|
|
engine.privateKey = privateKey
|
|
engine.queue = event.NewQueue()
|
|
go engine.eventHandler()
|
|
|
|
engine.acn = acn
|
|
|
|
// Init the Server running the Simple App.
|
|
engine.service = new(tor.BaseOnionService)
|
|
engine.service.Init(acn, privateKey, &identity)
|
|
|
|
engine.eventManager = eventManager
|
|
|
|
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
|
|
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
|
|
engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
|
|
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
|
|
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
|
|
engine.eventManager.Subscribe(event.LeaveServer, engine.queue)
|
|
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue)
|
|
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue)
|
|
engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue)
|
|
engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue)
|
|
engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
|
|
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)
|
|
|
|
engine.eventManager.Subscribe(event.SetPeerAuthorization, engine.queue)
|
|
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
|
|
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
|
|
|
|
for peer, authorization := range peerAuthorizations {
|
|
engine.authorizations.Store(peer, authorization)
|
|
}
|
|
return engine
|
|
}
|
|
|
|
func (e *engine) ACN() connectivity.ACN {
|
|
return e.acn
|
|
}
|
|
|
|
func (e *engine) EventManager() event.Manager {
|
|
return e.eventManager
|
|
}
|
|
|
|
// eventHandler process events from other subsystems
|
|
func (e *engine) eventHandler() {
|
|
for {
|
|
ev := e.queue.Next()
|
|
switch ev.EventType {
|
|
case event.StatusRequest:
|
|
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
|
|
case event.PeerRequest:
|
|
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
|
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
|
}
|
|
case event.RetryPeerRequest:
|
|
// This event allows engine to treat (automated) retry peering requests differently to user-specified
|
|
// peer events
|
|
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
|
log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer])
|
|
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
|
}
|
|
case event.InvitePeerToGroup:
|
|
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
|
|
case event.JoinServer:
|
|
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY])
|
|
case event.LeaveServer:
|
|
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
|
|
if ok {
|
|
ephemeralService := es.(tapir.Service)
|
|
ephemeralService.Shutdown()
|
|
e.ephemeralServices.Delete(ev.Data[event.GroupServer])
|
|
}
|
|
case event.DeleteContact:
|
|
onion := ev.Data[event.RemotePeer]
|
|
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
|
|
e.authorizations.Delete(ev.Data[event.RemotePeer])
|
|
e.deleteConnection(onion)
|
|
case event.DeleteGroup:
|
|
// TODO: There isn't a way here to determine if other Groups are using a server connection...
|
|
case event.SendMessageToGroup:
|
|
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
|
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
|
err := e.sendMessageToGroup(ev.Data[event.GroupServer], ciphertext, signature)
|
|
if err != nil {
|
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
|
}
|
|
case event.SendMessageToPeer:
|
|
// TODO: remove this passthrough once the UI is integrated.
|
|
context, ok := ev.Data[event.EventContext]
|
|
if !ok {
|
|
context = event.ContextRaw
|
|
}
|
|
err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
|
|
if err != nil {
|
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
|
}
|
|
case event.SendGetValMessageToPeer:
|
|
e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path])
|
|
case event.SendRetValMessageToPeer:
|
|
e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists])
|
|
case event.SetPeerAuthorization:
|
|
auth := model.Authorization(ev.Data[event.Authorization])
|
|
e.authorizations.Store(ev.Data[event.RemotePeer], auth)
|
|
if auth == model.AuthBlocked {
|
|
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
|
|
if connection != nil && err == nil {
|
|
connection.Close()
|
|
}
|
|
// Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before
|
|
// an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because
|
|
// there isn't an active connection and we are stuck waiting for tor to time out)
|
|
e.peerDisconnected(ev.Data[event.RemotePeer])
|
|
}
|
|
case event.AllowUnknownPeers:
|
|
log.Debugf("%v now allows unknown connections", e.identity.Hostname())
|
|
e.blockUnknownContacts = false
|
|
case event.BlockUnknownPeers:
|
|
log.Debugf("%v now forbids unknown connections", e.identity.Hostname())
|
|
e.blockUnknownContacts = true
|
|
case event.ProtocolEngineStartListen:
|
|
go e.listenFn()
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *engine) isBlocked(onion string) bool {
|
|
authorization, known := e.authorizations.Load(onion)
|
|
if !known {
|
|
// if we block unknown peers we will block this contact
|
|
return e.blockUnknownContacts
|
|
}
|
|
return authorization.(model.Authorization) == model.AuthBlocked
|
|
}
|
|
|
|
func (e *engine) isAllowed(onion string) bool {
|
|
authorization, known := e.authorizations.Load(onion)
|
|
if !known {
|
|
log.Errorf("attempted to lookup authorization of onion not in map...that should never happen")
|
|
return false
|
|
}
|
|
if e.blockUnknownContacts {
|
|
return authorization.(model.Authorization) == model.AuthApproved
|
|
}
|
|
return authorization.(model.Authorization) != model.AuthBlocked
|
|
}
|
|
|
|
func (e *engine) createPeerTemplate() *PeerApp {
|
|
peerAppTemplate := new(PeerApp)
|
|
peerAppTemplate.IsBlocked = e.isBlocked
|
|
peerAppTemplate.IsAllowed = e.isAllowed
|
|
peerAppTemplate.MessageHandler = e.handlePeerMessage
|
|
peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck)
|
|
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
|
|
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
|
|
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
|
|
peerAppTemplate.RetValHandler = e.handlePeerRetVal
|
|
return peerAppTemplate
|
|
}
|
|
|
|
// Listen sets up an onion listener to process incoming cwtch messages
|
|
func (e *engine) listenFn() {
|
|
err := e.service.Listen(e.createPeerTemplate())
|
|
if !e.shuttingDown {
|
|
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
|
|
}
|
|
return
|
|
}
|
|
|
|
// Shutdown tears down the eventHandler goroutine
|
|
func (e *engine) Shutdown() {
|
|
e.shuttingDown = true
|
|
e.service.Shutdown()
|
|
e.queue.Shutdown()
|
|
}
|
|
|
|
// peerWithOnion is the entry point for cwtchPeer relationships
|
|
// needs to be run in a goroutine as will block on Open.
|
|
func (e *engine) peerWithOnion(onion string) {
|
|
log.Debugf("Called PeerWithOnion for %v", onion)
|
|
if !e.isBlocked(onion) {
|
|
e.ignoreOnShutdown(e.peerConnecting)(onion)
|
|
connected, err := e.service.Connect(onion, e.createPeerTemplate())
|
|
|
|
// If we are already connected...check if we are authed and issue an auth event
|
|
// (This allows the ui to be stateless)
|
|
if connected && err != nil {
|
|
conn, err := e.service.GetConnection(onion)
|
|
if err == nil {
|
|
if conn.HasCapability(cwtchCapability) {
|
|
e.ignoreOnShutdown(e.peerAuthed)(onion)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
|
|
if !connected && err != nil {
|
|
e.ignoreOnShutdown(e.peerDisconnected)(onion)
|
|
}
|
|
}
|
|
}
|
|
|
|
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
|
// needs to be run in a goroutine as will block on Open.
|
|
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) {
|
|
|
|
service, exists := e.ephemeralServices.Load(onion)
|
|
if exists {
|
|
connection := service.(*tor.BaseOnionService)
|
|
if conn, err := connection.GetConnection(onion); err == nil {
|
|
if conn.IsClosed() == false {
|
|
return
|
|
}
|
|
}
|
|
// Otherwise...let's reconnect
|
|
}
|
|
|
|
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
|
|
e.ignoreOnShutdown(e.serverConnecting)(onion)
|
|
// Create a new ephemeral service for this connection
|
|
ephemeralService := new(tor.BaseOnionService)
|
|
eid, epk := primitives.InitializeEphemeralIdentity()
|
|
ephemeralService.Init(e.acn, epk, &eid)
|
|
|
|
Y := ristretto255.NewElement()
|
|
Y.UnmarshalText([]byte(tokenServerY))
|
|
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, e.receiveGroupMessage, e.serverSynced))
|
|
e.ephemeralServices.Store(onion, ephemeralService)
|
|
// If we are already connected...check if we are authed and issue an auth event
|
|
// (This allows the ui to be stateless)
|
|
if connected && err != nil {
|
|
conn, err := ephemeralService.GetConnection(onion)
|
|
if err == nil {
|
|
if conn.HasCapability(groups.CwtchServerSyncedCapability) {
|
|
e.ignoreOnShutdown(e.serverConnected)(onion)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
|
|
if !connected && err != nil {
|
|
e.ignoreOnShutdown(e.serverDisconnected)(onion)
|
|
}
|
|
}
|
|
|
|
func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
|
|
return func(x string) {
|
|
if !e.shuttingDown {
|
|
f(x)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
|
|
return func(x, y string) {
|
|
if !e.shuttingDown {
|
|
f(x, y)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *engine) peerAuthed(onion string) {
|
|
_, known := e.authorizations.Load(onion)
|
|
if !known {
|
|
e.authorizations.Store(onion, model.AuthUnknown)
|
|
}
|
|
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
|
event.RemotePeer: string(onion),
|
|
event.ConnectionState: ConnectionStateName[AUTHENTICATED],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) peerConnecting(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
|
event.RemotePeer: string(onion),
|
|
event.ConnectionState: ConnectionStateName[CONNECTING],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) serverConnecting(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
|
event.GroupServer: string(onion),
|
|
event.ConnectionState: ConnectionStateName[CONNECTING],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) serverConnected(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
|
event.GroupServer: onion,
|
|
event.ConnectionState: ConnectionStateName[CONNECTED],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) serverSynced(onion string) {
|
|
log.Debugf("SERVER SYNCED: %v", onion)
|
|
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
|
event.GroupServer: onion,
|
|
event.ConnectionState: ConnectionStateName[SYNCED],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) serverDisconnected(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
|
event.GroupServer: onion,
|
|
event.ConnectionState: ConnectionStateName[DISCONNECTED],
|
|
}))
|
|
}
|
|
|
|
func (e *engine) peerAck(onion string, eventID string) {
|
|
e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{
|
|
event.EventID: eventID,
|
|
event.RemotePeer: onion,
|
|
}))
|
|
}
|
|
|
|
func (e *engine) peerDisconnected(onion string) {
|
|
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
|
event.RemotePeer: string(onion),
|
|
event.ConnectionState: ConnectionStateName[DISCONNECTED],
|
|
}))
|
|
}
|
|
|
|
// sendMessageToPeer sends a message to a peer under a given context
|
|
func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
|
|
conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
|
|
if err == nil {
|
|
peerApp, ok := (conn.App()).(*PeerApp)
|
|
if ok {
|
|
peerApp.SendMessage(PeerMessage{eventID, context, message})
|
|
return nil
|
|
}
|
|
return errors.New("failed type assertion conn.App != PeerApp")
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
|
|
log.Debugf("sendGetValMessage to peer %v %v%v\n", onion, scope, path)
|
|
getVal := peerGetVal{Scope: scope, Path: path}
|
|
message, err := json.Marshal(getVal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return e.sendMessageToPeer(eventID, onion, event.ContextGetVal, message)
|
|
}
|
|
|
|
func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error {
|
|
log.Debugf("sendRetValMessage to peer %v (%v) %v %v\n", onion, eventID, val, existsStr)
|
|
exists, _ := strconv.ParseBool(existsStr)
|
|
retVal := peerRetVal{Val: val, Exists: exists}
|
|
message, err := json.Marshal(retVal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return e.sendMessageToPeer(eventID, onion, event.ContextRetVal, message)
|
|
}
|
|
|
|
func (e *engine) deleteConnection(id string) {
|
|
conn, err := e.service.GetConnection(id)
|
|
if err == nil {
|
|
conn.Close()
|
|
}
|
|
}
|
|
|
|
// receiveGroupMessage is a callback function that processes GroupMessages from a given server
|
|
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
|
|
// Publish Event so that a Profile Engine can deal with it.
|
|
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
|
|
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
|
|
}
|
|
|
|
// sendMessageToGroup attempts to sent the given message to the given group id.
|
|
func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error {
|
|
es, ok := e.ephemeralServices.Load(server)
|
|
if !ok {
|
|
return fmt.Errorf("no service exists for group %v", server)
|
|
}
|
|
ephemeralService := es.(tapir.Service)
|
|
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
|
|
if err == nil {
|
|
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
|
if ok {
|
|
attempts := 0
|
|
for tokenApp.Post(ct, sig) == false {
|
|
// TODO This should eventually be wired back into the UI to allow it to error
|
|
tokenApp.MakePayment()
|
|
time.Sleep(time.Second * 5)
|
|
attempts++
|
|
if attempts == 5 {
|
|
return errors.New("failed to post to token board")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
return errors.New("failed type assertion conn.App != TokenBoardClientApp")
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
|
|
log.Debugf("New message from peer: %v %v", hostname, context)
|
|
if context == event.ContextInvite {
|
|
e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(message)}))
|
|
} else if context == event.ContextGetVal {
|
|
var getVal peerGetVal
|
|
err := json.Unmarshal(message, &getVal)
|
|
if err == nil {
|
|
ev := event.NewEventList(event.NewGetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path)
|
|
ev.EventID = eventID
|
|
e.eventManager.Publish(ev)
|
|
}
|
|
} else {
|
|
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
|
|
}
|
|
}
|
|
|
|
func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte) {
|
|
var getVal peerGetVal
|
|
var retVal peerRetVal
|
|
|
|
err := json.Unmarshal(getValData, &getVal)
|
|
if err != nil {
|
|
log.Errorf("Unmarshalling our own getVal request: %v\n", err)
|
|
return
|
|
}
|
|
err = json.Unmarshal(retValData, &retVal)
|
|
if err != nil {
|
|
log.Errorf("Unmarshalling peer response to getVal request")
|
|
return
|
|
}
|
|
|
|
e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val))
|
|
}
|