2019-01-04 21:44:21 +00:00
package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
2019-07-17 19:10:52 +00:00
"cwtch.im/tapir"
"cwtch.im/tapir/networks/tor"
2019-07-23 17:57:04 +00:00
"errors"
2019-01-04 21:44:21 +00:00
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519"
2019-01-08 18:58:01 +00:00
"sync"
2019-01-21 18:47:07 +00:00
"time"
2019-01-04 21:44:21 +00:00
)
2019-05-15 19:58:57 +00:00
type engine struct {
2019-01-04 21:44:21 +00:00
queue * event . Queue
connectionsManager * Manager
// Engine Attributes
2019-05-15 19:58:57 +00:00
identity identity . Identity
acn connectivity . ACN
2019-01-04 21:44:21 +00:00
// Engine State
started bool
2019-01-08 18:58:01 +00:00
// Blocklist
blocked sync . Map
2019-01-04 21:44:21 +00:00
// Pointer to the Global Event Manager
2019-06-05 20:40:55 +00:00
eventManager event . Manager
2019-05-15 19:58:57 +00:00
2019-07-17 19:10:52 +00:00
// Nextgen Tapir Service
service tapir . Service
2019-05-15 19:58:57 +00:00
// Required for listen(), inaccessible from identity
privateKey ed25519 . PrivateKey
2019-07-24 18:49:01 +00:00
shuttingDown bool
2019-05-15 19:58:57 +00:00
}
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
type Engine interface {
Identity ( ) identity . Identity
ACN ( ) connectivity . ACN
2019-06-05 20:40:55 +00:00
EventManager ( ) event . Manager
2019-05-15 19:58:57 +00:00
Shutdown ( )
2019-01-04 21:44:21 +00:00
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
2019-06-05 20:40:55 +00:00
func NewProtocolEngine ( identity identity . Identity , privateKey ed25519 . PrivateKey , acn connectivity . ACN , eventManager event . Manager , blockedPeers [ ] string ) Engine {
2019-05-15 19:58:57 +00:00
engine := new ( engine )
engine . identity = identity
2019-01-04 21:44:21 +00:00
engine . privateKey = privateKey
engine . queue = event . NewEventQueue ( 100 )
go engine . eventHandler ( )
2019-05-15 19:58:57 +00:00
engine . acn = acn
engine . connectionsManager = NewConnectionsManager ( engine . acn )
2019-01-04 21:44:21 +00:00
go engine . connectionsManager . AttemptReconnections ( )
2019-07-17 19:10:52 +00:00
// Init the Server running the Simple App.
engine . service = new ( tor . BaseOnionService )
engine . service . Init ( acn , privateKey , identity )
2019-01-04 21:44:21 +00:00
engine . eventManager = eventManager
2019-01-08 18:58:01 +00:00
2019-01-04 21:44:21 +00:00
engine . eventManager . Subscribe ( event . ProtocolEngineStartListen , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . PeerRequest , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . InvitePeerToGroup , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . JoinServer , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . SendMessageToGroup , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . SendMessageToPeer , engine . queue . EventChannel )
2019-01-08 18:58:01 +00:00
engine . eventManager . Subscribe ( event . BlockPeer , engine . queue . EventChannel )
for _ , peer := range blockedPeers {
engine . blocked . Store ( peer , true )
}
2019-01-04 21:44:21 +00:00
return engine
}
2019-05-15 19:58:57 +00:00
func ( e * engine ) ACN ( ) connectivity . ACN {
return e . acn
}
func ( e * engine ) Identity ( ) identity . Identity {
return e . identity
}
2019-06-05 20:40:55 +00:00
func ( e * engine ) EventManager ( ) event . Manager {
2019-05-15 20:12:11 +00:00
return e . eventManager
}
2019-01-04 21:44:21 +00:00
// eventHandler process events from other subsystems
2019-05-15 19:58:57 +00:00
func ( e * engine ) eventHandler ( ) {
2019-01-04 21:44:21 +00:00
for {
ev := e . queue . Next ( )
switch ev . EventType {
case event . StatusRequest :
e . eventManager . Publish ( event . Event { EventType : event . ProtocolEngineStatus , EventID : ev . EventID } )
case event . PeerRequest :
2019-07-29 19:48:12 +00:00
go e . peerWithOnion ( ev . Data [ event . RemotePeer ] )
2019-01-04 21:44:21 +00:00
case event . InvitePeerToGroup :
2019-07-23 17:57:04 +00:00
e . sendMessageToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , event . ContextInvite , [ ] byte ( ev . Data [ event . GroupInvite ] ) )
2019-01-04 21:44:21 +00:00
case event . JoinServer :
2019-05-15 19:58:57 +00:00
e . joinServer ( ev . Data [ event . GroupServer ] )
2019-01-04 21:44:21 +00:00
case event . SendMessageToGroup :
2019-05-15 19:58:57 +00:00
e . sendMessageToGroup ( ev . Data [ event . GroupServer ] , [ ] byte ( ev . Data [ event . Ciphertext ] ) , [ ] byte ( ev . Data [ event . Signature ] ) )
2019-01-04 21:44:21 +00:00
case event . SendMessageToPeer :
2019-07-22 20:49:23 +00:00
// TODO: remove this passthrough once the UI is integrated.
context , ok := ev . Data [ event . EventContext ]
if ! ok {
2019-07-23 17:57:04 +00:00
context = event . ContextRaw
2019-07-22 20:49:23 +00:00
}
err := e . sendMessageToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , context , [ ] byte ( ev . Data [ event . GroupInvite ] ) )
2019-07-17 19:10:52 +00:00
if err != nil {
2019-04-23 20:30:50 +00:00
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . Signature : ev . EventID , event . Error : "peer is offline or the connection has yet to finalize" } ) )
2019-01-04 21:44:21 +00:00
}
2019-01-08 18:58:01 +00:00
case event . BlockPeer :
2019-01-21 20:08:03 +00:00
e . blocked . Store ( ev . Data [ event . RemotePeer ] , true )
2019-07-17 19:10:52 +00:00
connection , err := e . service . GetConnection ( ev . Data [ event . RemotePeer ] )
if err != nil {
connection . Close ( )
2019-01-21 20:37:06 +00:00
}
2019-01-04 21:44:21 +00:00
case event . ProtocolEngineStartListen :
go e . listenFn ( )
default :
return
}
}
}
2019-07-23 17:57:04 +00:00
func ( e * engine ) createPeerTemplate ( ) * PeerApp {
2019-07-17 19:10:52 +00:00
peerAppTemplate := new ( PeerApp )
peerAppTemplate . MessageHandler = e . handlePeerMessage
2019-07-24 18:49:01 +00:00
peerAppTemplate . OnAcknowledgement = e . ignoreOnShutdown ( e . peerAck )
peerAppTemplate . OnAuth = e . ignoreOnShutdown ( e . peerAuthed )
peerAppTemplate . OnConnecting = e . ignoreOnShutdown ( e . peerConnecting )
peerAppTemplate . OnClose = e . ignoreOnShutdown ( e . peerDisconnected )
2019-07-23 17:57:04 +00:00
return peerAppTemplate
}
// Listen sets up an onion listener to process incoming cwtch messages
func ( e * engine ) listenFn ( ) {
err := e . service . Listen ( e . createPeerTemplate ( ) )
2019-07-24 19:26:44 +00:00
if ! e . shuttingDown {
e . eventManager . Publish ( event . NewEvent ( event . ProtocolEngineStopped , map [ event . Field ] string { event . Identity : e . identity . Hostname ( ) , event . Error : err . Error ( ) } ) )
}
2019-01-04 21:44:21 +00:00
return
}
// Shutdown tears down the eventHandler goroutine
2019-05-15 19:58:57 +00:00
func ( e * engine ) Shutdown ( ) {
2019-07-24 18:49:01 +00:00
e . shuttingDown = true
2019-01-04 21:44:21 +00:00
e . connectionsManager . Shutdown ( )
2019-07-24 19:26:44 +00:00
e . service . Shutdown ( )
2019-01-04 21:44:21 +00:00
e . queue . Shutdown ( )
}
2019-05-15 19:58:57 +00:00
// peerWithOnion is the entry point for cwtchPeer relationships
2019-07-29 19:48:12 +00:00
// needs to be run in a goroutine as will block on Open.
2019-07-17 19:10:52 +00:00
func ( e * engine ) peerWithOnion ( onion string ) {
2019-07-29 19:48:12 +00:00
e . ignoreOnShutdown ( e . peerConnecting ) ( onion )
_ , err := e . service . Connect ( onion , e . createPeerTemplate ( ) )
if err != nil {
e . ignoreOnShutdown ( e . peerDisconnected ) ( onion )
}
2019-07-17 19:10:52 +00:00
}
2019-07-24 18:49:01 +00:00
func ( e * engine ) ignoreOnShutdown ( f func ( string ) ) func ( string ) {
return func ( x string ) { if ! e . shuttingDown { f ( x ) } }
}
2019-07-17 19:10:52 +00:00
func ( e * engine ) peerAuthed ( onion string ) {
e . eventManager . Publish ( event . NewEvent ( event . PeerStateChange , map [ event . Field ] string {
event . RemotePeer : string ( onion ) ,
event . ConnectionState : ConnectionStateName [ AUTHENTICATED ] ,
} ) )
}
2019-07-23 17:57:04 +00:00
func ( e * engine ) peerConnecting ( onion string ) {
e . eventManager . Publish ( event . NewEvent ( event . PeerStateChange , map [ event . Field ] string {
event . RemotePeer : string ( onion ) ,
event . ConnectionState : ConnectionStateName [ CONNECTING ] ,
} ) )
}
2019-07-22 20:49:23 +00:00
func ( e * engine ) peerAck ( eventID string ) {
e . eventManager . Publish ( event . NewEvent ( event . PeerAcknowledgement , map [ event . Field ] string {
event . EventID : eventID ,
} ) )
}
2019-07-17 19:10:52 +00:00
func ( e * engine ) peerDisconnected ( onion string ) {
e . eventManager . Publish ( event . NewEvent ( event . PeerStateChange , map [ event . Field ] string {
event . RemotePeer : string ( onion ) ,
event . ConnectionState : ConnectionStateName [ DISCONNECTED ] ,
} ) )
2019-01-04 21:44:21 +00:00
}
2019-07-22 20:49:23 +00:00
// sendMessageToPeer sends a message to a peer under a given context
func ( e * engine ) sendMessageToPeer ( eventID string , onion string , context string , message [ ] byte ) error {
2019-07-17 19:10:52 +00:00
conn , err := e . service . GetConnection ( onion )
if err == nil {
peerApp , ok := conn . App . ( * PeerApp )
if ok {
2019-07-22 20:49:23 +00:00
peerApp . SendMessage ( PeerMessage { eventID , context , message } )
2019-07-17 19:10:52 +00:00
return nil
}
2019-07-23 17:57:04 +00:00
return errors . New ( "failed type assertion conn.App != PeerApp" )
2019-01-04 21:44:21 +00:00
}
2019-07-17 19:10:52 +00:00
return err
2019-01-04 21:44:21 +00:00
}
2019-05-15 19:58:57 +00:00
// receiveGroupMessage is a callback function that processes GroupMessages from a given server
func ( e * engine ) receiveGroupMessage ( server string , gm * protocol . GroupMessage ) {
2019-01-04 21:44:21 +00:00
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
2019-01-21 20:08:03 +00:00
e . eventManager . Publish ( event . NewEvent ( event . EncryptedGroupMessage , map [ event . Field ] string { event . Ciphertext : string ( gm . GetCiphertext ( ) ) , event . Signature : string ( gm . GetSignature ( ) ) } ) )
2019-01-04 21:44:21 +00:00
}
2019-05-15 19:58:57 +00:00
// joinServer manages a new server connection with the given onion address
func ( e * engine ) joinServer ( onion string ) {
2019-07-19 17:27:50 +00:00
e . connectionsManager . ManageServerConnection ( onion , e , e . receiveGroupMessage )
2019-01-04 21:44:21 +00:00
}
2019-05-15 19:58:57 +00:00
// sendMessageToGroup attempts to sent the given message to the given group id.
func ( e * engine ) sendMessageToGroup ( server string , ct [ ] byte , sig [ ] byte ) {
2019-01-04 21:44:21 +00:00
psc := e . connectionsManager . GetPeerServerConnectionForOnion ( server )
if psc == nil {
2019-02-20 20:58:05 +00:00
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupServer : server , event . Signature : string ( sig ) , event . Error : "server is offline or the connection has yet to finalize" } ) )
2019-01-04 21:44:21 +00:00
}
gm := & protocol . GroupMessage {
Ciphertext : ct ,
Signature : sig ,
}
err := psc . SendGroupMessage ( gm )
2019-02-20 20:58:05 +00:00
if err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupServer : server , event . Signature : string ( sig ) , event . Error : err . Error ( ) } ) )
}
2019-01-04 21:44:21 +00:00
}
2019-07-17 19:10:52 +00:00
func ( e * engine ) handlePeerMessage ( hostname string , message [ ] byte ) {
cpp := & protocol . CwtchPeerPacket { }
err := proto . Unmarshal ( message , cpp )
2019-01-04 21:44:21 +00:00
if err == nil {
2019-07-17 19:10:52 +00:00
if cpp . GetGroupChatInvite ( ) != nil {
marshal , _ := proto . Marshal ( cpp . GetGroupChatInvite ( ) )
e . eventManager . Publish ( event . NewEvent ( event . NewGroupInvite , map [ event . Field ] string { event . TimestampReceived : time . Now ( ) . Format ( time . RFC3339Nano ) , event . RemotePeer : hostname , event . GroupInvite : string ( marshal ) } ) )
}
2019-07-23 17:57:04 +00:00
} else {
e . eventManager . Publish ( event . NewEvent ( event . NewMessageFromPeer , map [ event . Field ] string { event . TimestampReceived : time . Now ( ) . Format ( time . RFC3339Nano ) , event . RemotePeer : hostname , event . Data : string ( message ) } ) )
2019-01-04 21:44:21 +00:00
}
}