2019-01-04 21:44:21 +00:00
|
|
|
package connections
|
|
|
|
|
|
|
|
import (
|
|
|
|
"crypto/rsa"
|
|
|
|
"cwtch.im/cwtch/event"
|
|
|
|
"cwtch.im/cwtch/protocol"
|
|
|
|
"cwtch.im/cwtch/protocol/connections/peer"
|
|
|
|
"errors"
|
|
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
|
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
|
|
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
|
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
|
|
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"golang.org/x/crypto/ed25519"
|
2019-01-08 18:58:01 +00:00
|
|
|
"sync"
|
2019-01-21 18:47:07 +00:00
|
|
|
"time"
|
2019-01-04 21:44:21 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
|
|
|
|
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
|
|
|
|
type Engine struct {
|
|
|
|
queue *event.Queue
|
|
|
|
connectionsManager *Manager
|
|
|
|
|
|
|
|
// Engine Attributes
|
|
|
|
Identity identity.Identity
|
|
|
|
ACN connectivity.ACN
|
|
|
|
|
|
|
|
app *application.RicochetApplication
|
|
|
|
|
|
|
|
// Engine State
|
|
|
|
started bool
|
|
|
|
|
2019-01-08 18:58:01 +00:00
|
|
|
// Blocklist
|
|
|
|
blocked sync.Map
|
|
|
|
|
2019-01-04 21:44:21 +00:00
|
|
|
// Pointer to the Global Event Manager
|
|
|
|
eventManager *event.Manager
|
|
|
|
privateKey ed25519.PrivateKey
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
|
2019-01-08 18:58:01 +00:00
|
|
|
func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) *Engine {
|
2019-01-04 21:44:21 +00:00
|
|
|
engine := new(Engine)
|
|
|
|
engine.privateKey = privateKey
|
|
|
|
engine.queue = event.NewEventQueue(100)
|
|
|
|
go engine.eventHandler()
|
|
|
|
|
|
|
|
engine.ACN = acn
|
|
|
|
engine.connectionsManager = NewConnectionsManager(engine.ACN)
|
|
|
|
go engine.connectionsManager.AttemptReconnections()
|
|
|
|
|
|
|
|
engine.eventManager = eventManager
|
2019-01-08 18:58:01 +00:00
|
|
|
|
2019-01-04 21:44:21 +00:00
|
|
|
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
|
|
|
|
engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
|
|
|
|
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
|
|
|
|
engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
|
|
|
|
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
|
|
|
|
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
|
|
|
|
|
2019-01-08 18:58:01 +00:00
|
|
|
engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel)
|
|
|
|
for _, peer := range blockedPeers {
|
|
|
|
engine.blocked.Store(peer, true)
|
|
|
|
}
|
2019-01-04 21:44:21 +00:00
|
|
|
return engine
|
|
|
|
}
|
|
|
|
|
|
|
|
// eventHandler process events from other subsystems
|
|
|
|
func (e *Engine) eventHandler() {
|
|
|
|
for {
|
|
|
|
ev := e.queue.Next()
|
|
|
|
switch ev.EventType {
|
|
|
|
case event.StatusRequest:
|
|
|
|
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
|
|
|
|
case event.PeerRequest:
|
2019-01-21 20:08:03 +00:00
|
|
|
e.PeerWithOnion(ev.Data[event.RemotePeer])
|
2019-01-04 21:44:21 +00:00
|
|
|
case event.InvitePeerToGroup:
|
2019-01-21 20:08:03 +00:00
|
|
|
e.InviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite]))
|
2019-01-04 21:44:21 +00:00
|
|
|
case event.JoinServer:
|
2019-01-21 20:08:03 +00:00
|
|
|
e.JoinServer(ev.Data[event.GroupServer])
|
2019-01-04 21:44:21 +00:00
|
|
|
case event.SendMessageToGroup:
|
2019-01-21 20:08:03 +00:00
|
|
|
e.SendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
2019-01-04 21:44:21 +00:00
|
|
|
case event.SendMessageToPeer:
|
|
|
|
log.Debugf("Sending Message to Peer.....")
|
2019-01-21 20:08:03 +00:00
|
|
|
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer])
|
2019-01-04 21:44:21 +00:00
|
|
|
if ppc != nil {
|
|
|
|
// TODO this will block.
|
2019-01-21 20:08:03 +00:00
|
|
|
ppc.SendPacket([]byte(ev.Data[event.Data]))
|
2019-01-04 21:44:21 +00:00
|
|
|
}
|
2019-01-08 18:58:01 +00:00
|
|
|
case event.BlockPeer:
|
2019-01-21 20:08:03 +00:00
|
|
|
e.blocked.Store(ev.Data[event.RemotePeer], true)
|
2019-01-21 20:37:06 +00:00
|
|
|
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer])
|
|
|
|
if ppc != nil {
|
|
|
|
ppc.Close()
|
|
|
|
}
|
|
|
|
e.app.Close(ev.Data[event.RemotePeer])
|
2019-01-04 21:44:21 +00:00
|
|
|
case event.ProtocolEngineStartListen:
|
|
|
|
go e.listenFn()
|
|
|
|
default:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler
|
|
|
|
// TODO: There is likely a slightly better way to encapsulate this behavior
|
|
|
|
func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler {
|
|
|
|
return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Listen sets up an onion listener to process incoming cwtch messages
|
|
|
|
func (e *Engine) listenFn() {
|
|
|
|
ra := new(application.RicochetApplication)
|
|
|
|
onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort)
|
|
|
|
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
|
2019-01-21 20:08:03 +00:00
|
|
|
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname(), event.Error: err.Error()}))
|
2019-01-04 21:44:21 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
af := application.ApplicationInstanceFactory{}
|
|
|
|
af.Init()
|
|
|
|
af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler {
|
|
|
|
cpi := new(CwtchPeerInstance)
|
|
|
|
cpi.Init(rai, ra)
|
|
|
|
return func() channels.Handler {
|
|
|
|
cpc := new(peer.CwtchPeerChannel)
|
|
|
|
cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
|
|
|
|
return cpc
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
|
|
|
|
cpi := new(CwtchPeerInstance)
|
|
|
|
cpi.Init(rai, ra)
|
|
|
|
return func() channels.Handler {
|
|
|
|
cpc := new(peer.CwtchPeerDataChannel)
|
|
|
|
cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
|
|
|
|
return cpc
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e)
|
|
|
|
log.Infof("Running cwtch peer on %v", onionService.AddressFull())
|
|
|
|
e.started = true
|
|
|
|
e.app = ra
|
|
|
|
ra.Run(onionService)
|
2019-01-21 20:08:03 +00:00
|
|
|
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname()}))
|
2019-01-04 21:44:21 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-01-08 18:58:01 +00:00
|
|
|
// LookupContact is a V2 API Call, we want to reject all V2 Peers
|
|
|
|
// TODO Deprecate
|
2019-01-04 21:44:21 +00:00
|
|
|
func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
|
2019-01-08 18:58:01 +00:00
|
|
|
return false, false
|
2019-01-04 21:44:21 +00:00
|
|
|
}
|
|
|
|
|
2019-01-08 18:58:01 +00:00
|
|
|
// ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface
|
|
|
|
// TODO Deprecate
|
|
|
|
func (e *Engine) ContactRequest(name string, message string) string {
|
|
|
|
return "Rejected"
|
2019-01-04 21:44:21 +00:00
|
|
|
}
|
|
|
|
|
2019-01-08 18:58:01 +00:00
|
|
|
// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
|
|
|
|
func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
|
|
|
|
// TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be
|
|
|
|
// disregarded by peers, so we set it to false.
|
|
|
|
if _, blocked := e.blocked.Load(hostname); blocked {
|
|
|
|
return false, false
|
|
|
|
}
|
|
|
|
return true, false
|
2019-01-04 21:44:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown tears down the eventHandler goroutine
|
|
|
|
func (e *Engine) Shutdown() {
|
|
|
|
e.connectionsManager.Shutdown()
|
|
|
|
e.app.Shutdown()
|
|
|
|
e.queue.Shutdown()
|
|
|
|
}
|
|
|
|
|
|
|
|
// PeerWithOnion is the entry point for cwtchPeer relationships
|
|
|
|
func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection {
|
|
|
|
return e.connectionsManager.ManagePeerConnection(onion, e)
|
|
|
|
}
|
|
|
|
|
|
|
|
// InviteOnionToGroup kicks off the invite process
|
|
|
|
func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error {
|
|
|
|
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion)
|
|
|
|
if ppc == nil {
|
|
|
|
return errors.New("peer connection not setup for onion. peers must be trusted before sending")
|
|
|
|
}
|
|
|
|
if ppc.GetState() == AUTHENTICATED {
|
|
|
|
log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
|
|
|
|
ppc.SendGroupInvite(invite)
|
|
|
|
} else {
|
|
|
|
return errors.New("cannot send invite to onion: peer connection is not ready")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
|
|
|
|
func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
|
|
|
|
// Publish Event so that a Profile Engine can deal with it.
|
|
|
|
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
|
2019-01-21 20:08:03 +00:00
|
|
|
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
|
2019-01-04 21:44:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// JoinServer manages a new server connection with the given onion address
|
|
|
|
func (e *Engine) JoinServer(onion string) {
|
|
|
|
e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage)
|
|
|
|
}
|
|
|
|
|
|
|
|
// SendMessageToGroup attemps to sent the given message to the given group id.
|
|
|
|
func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) error {
|
|
|
|
psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
|
|
|
|
if psc == nil {
|
|
|
|
return errors.New("could not find server connection to send message to")
|
|
|
|
}
|
|
|
|
gm := &protocol.GroupMessage{
|
|
|
|
Ciphertext: ct,
|
|
|
|
Signature: sig,
|
|
|
|
}
|
|
|
|
err := psc.SendGroupMessage(gm)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetPeers returns a list of peer connections.
|
|
|
|
func (e *Engine) GetPeers() map[string]ConnectionState {
|
|
|
|
return e.connectionsManager.GetPeers()
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetServers returns a list of server connections
|
|
|
|
func (e *Engine) GetServers() map[string]ConnectionState {
|
|
|
|
return e.connectionsManager.GetServers()
|
|
|
|
}
|
|
|
|
|
|
|
|
// CwtchPeerInstance encapsulates incoming peer connections
|
|
|
|
type CwtchPeerInstance struct {
|
|
|
|
rai *application.ApplicationInstance
|
|
|
|
ra *application.RicochetApplication
|
|
|
|
}
|
|
|
|
|
|
|
|
// Init sets up a CwtchPeerInstance
|
|
|
|
func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) {
|
|
|
|
cpi.rai = rai
|
|
|
|
cpi.ra = ra
|
|
|
|
}
|
|
|
|
|
|
|
|
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
|
|
|
|
type CwtchPeerHandler struct {
|
|
|
|
Onion string
|
|
|
|
EventBus *event.Manager
|
|
|
|
DataHandler func(string, []byte) []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
// HandleGroupInvite handles incoming GroupInvites
|
|
|
|
func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
|
|
|
|
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
|
|
|
|
marshal, err := proto.Marshal(gci)
|
|
|
|
if err == nil {
|
2019-01-21 20:08:03 +00:00
|
|
|
cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().String(), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)}))
|
2019-01-04 21:44:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// HandlePacket handles the Cwtch cwtchPeer Data Channel
|
|
|
|
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
|
2019-01-21 20:08:03 +00:00
|
|
|
cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().String(), event.RemotePeer: cph.Onion, event.Data: string(data)}))
|
2019-01-04 21:44:21 +00:00
|
|
|
return []byte{} // TODO remove this
|
|
|
|
}
|