2019-01-04 21:44:21 +00:00
package connections
import (
"crypto/rsa"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/peer"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519"
2019-01-08 18:58:01 +00:00
"sync"
2019-01-21 18:47:07 +00:00
"time"
2019-01-04 21:44:21 +00:00
)
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
type Engine struct {
queue * event . Queue
connectionsManager * Manager
// Engine Attributes
Identity identity . Identity
ACN connectivity . ACN
app * application . RicochetApplication
// Engine State
started bool
2019-01-08 18:58:01 +00:00
// Blocklist
blocked sync . Map
2019-01-04 21:44:21 +00:00
// Pointer to the Global Event Manager
eventManager * event . Manager
privateKey ed25519 . PrivateKey
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
2019-01-08 18:58:01 +00:00
func NewProtocolEngine ( privateKey ed25519 . PrivateKey , acn connectivity . ACN , eventManager * event . Manager , blockedPeers [ ] string ) * Engine {
2019-01-04 21:44:21 +00:00
engine := new ( Engine )
engine . privateKey = privateKey
engine . queue = event . NewEventQueue ( 100 )
go engine . eventHandler ( )
engine . ACN = acn
engine . connectionsManager = NewConnectionsManager ( engine . ACN )
go engine . connectionsManager . AttemptReconnections ( )
engine . eventManager = eventManager
2019-01-08 18:58:01 +00:00
2019-01-04 21:44:21 +00:00
engine . eventManager . Subscribe ( event . ProtocolEngineStartListen , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . PeerRequest , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . InvitePeerToGroup , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . JoinServer , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . SendMessageToGroup , engine . queue . EventChannel )
engine . eventManager . Subscribe ( event . SendMessageToPeer , engine . queue . EventChannel )
2019-01-08 18:58:01 +00:00
engine . eventManager . Subscribe ( event . BlockPeer , engine . queue . EventChannel )
for _ , peer := range blockedPeers {
engine . blocked . Store ( peer , true )
}
2019-01-04 21:44:21 +00:00
return engine
}
// eventHandler process events from other subsystems
func ( e * Engine ) eventHandler ( ) {
for {
ev := e . queue . Next ( )
switch ev . EventType {
case event . StatusRequest :
e . eventManager . Publish ( event . Event { EventType : event . ProtocolEngineStatus , EventID : ev . EventID } )
case event . PeerRequest :
2019-01-21 20:08:03 +00:00
e . PeerWithOnion ( ev . Data [ event . RemotePeer ] )
2019-01-04 21:44:21 +00:00
case event . InvitePeerToGroup :
2019-01-21 20:08:03 +00:00
e . InviteOnionToGroup ( ev . Data [ event . RemotePeer ] , [ ] byte ( ev . Data [ event . GroupInvite ] ) )
2019-01-04 21:44:21 +00:00
case event . JoinServer :
2019-01-21 20:08:03 +00:00
e . JoinServer ( ev . Data [ event . GroupServer ] )
2019-01-04 21:44:21 +00:00
case event . SendMessageToGroup :
2019-01-21 20:08:03 +00:00
e . SendMessageToGroup ( ev . Data [ event . GroupServer ] , [ ] byte ( ev . Data [ event . Ciphertext ] ) , [ ] byte ( ev . Data [ event . Signature ] ) )
2019-01-04 21:44:21 +00:00
case event . SendMessageToPeer :
log . Debugf ( "Sending Message to Peer....." )
2019-01-21 20:08:03 +00:00
ppc := e . connectionsManager . GetPeerPeerConnectionForOnion ( ev . Data [ event . RemotePeer ] )
2019-02-14 21:04:40 +00:00
if ppc != nil && ppc . GetState ( ) == AUTHENTICATED {
2019-01-21 20:08:03 +00:00
ppc . SendPacket ( [ ] byte ( ev . Data [ event . Data ] ) )
2019-02-14 21:04:40 +00:00
} else {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . Error : "peer is offline or the connection has yet to finalize" } ) )
2019-01-04 21:44:21 +00:00
}
2019-01-08 18:58:01 +00:00
case event . BlockPeer :
2019-01-21 20:08:03 +00:00
e . blocked . Store ( ev . Data [ event . RemotePeer ] , true )
2019-01-21 20:37:06 +00:00
ppc := e . connectionsManager . GetPeerPeerConnectionForOnion ( ev . Data [ event . RemotePeer ] )
if ppc != nil {
ppc . Close ( )
}
e . app . Close ( ev . Data [ event . RemotePeer ] )
2019-01-04 21:44:21 +00:00
case event . ProtocolEngineStartListen :
go e . listenFn ( )
default :
return
}
}
}
// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler
// TODO: There is likely a slightly better way to encapsulate this behavior
func ( e * Engine ) GetPeerHandler ( remotePeerHostname string ) * CwtchPeerHandler {
return & CwtchPeerHandler { Onion : remotePeerHostname , EventBus : e . eventManager }
}
// Listen sets up an onion listener to process incoming cwtch messages
func ( e * Engine ) listenFn ( ) {
ra := new ( application . RicochetApplication )
onionService , err := e . ACN . Listen ( e . privateKey , application . RicochetPort )
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
2019-01-21 20:08:03 +00:00
e . eventManager . Publish ( event . NewEvent ( event . ProtocolEngineStopped , map [ event . Field ] string { event . Identity : e . Identity . Hostname ( ) , event . Error : err . Error ( ) } ) )
2019-01-04 21:44:21 +00:00
return
}
2019-01-23 20:50:53 +00:00
af := application . InstanceFactory { }
2019-01-04 21:44:21 +00:00
af . Init ( )
2019-01-23 20:50:53 +00:00
af . AddHandler ( "im.cwtch.peer" , func ( rai * application . Instance ) func ( ) channels . Handler {
2019-01-04 21:44:21 +00:00
cpi := new ( CwtchPeerInstance )
cpi . Init ( rai , ra )
return func ( ) channels . Handler {
cpc := new ( peer . CwtchPeerChannel )
cpc . Handler = e . GetPeerHandler ( rai . RemoteHostname )
return cpc
}
} )
2019-01-23 20:50:53 +00:00
af . AddHandler ( "im.cwtch.peer.data" , func ( rai * application . Instance ) func ( ) channels . Handler {
2019-01-04 21:44:21 +00:00
cpi := new ( CwtchPeerInstance )
cpi . Init ( rai , ra )
return func ( ) channels . Handler {
cpc := new ( peer . CwtchPeerDataChannel )
cpc . Handler = e . GetPeerHandler ( rai . RemoteHostname )
return cpc
}
} )
ra . Init ( e . ACN , e . Identity . Name , e . Identity , af , e )
log . Infof ( "Running cwtch peer on %v" , onionService . AddressFull ( ) )
e . started = true
e . app = ra
ra . Run ( onionService )
2019-01-21 20:08:03 +00:00
e . eventManager . Publish ( event . NewEvent ( event . ProtocolEngineStopped , map [ event . Field ] string { event . Identity : e . Identity . Hostname ( ) } ) )
2019-01-04 21:44:21 +00:00
return
}
2019-01-08 18:58:01 +00:00
// LookupContact is a V2 API Call, we want to reject all V2 Peers
// TODO Deprecate
2019-01-04 21:44:21 +00:00
func ( e * Engine ) LookupContact ( hostname string , publicKey rsa . PublicKey ) ( allowed , known bool ) {
2019-01-08 18:58:01 +00:00
return false , false
2019-01-04 21:44:21 +00:00
}
2019-01-08 18:58:01 +00:00
// ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface
// TODO Deprecate
func ( e * Engine ) ContactRequest ( name string , message string ) string {
return "Rejected"
2019-01-04 21:44:21 +00:00
}
2019-01-08 18:58:01 +00:00
// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
func ( e * Engine ) LookupContactV3 ( hostname string , publicKey ed25519 . PublicKey ) ( allowed , known bool ) {
// TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be
// disregarded by peers, so we set it to false.
if _ , blocked := e . blocked . Load ( hostname ) ; blocked {
return false , false
}
return true , false
2019-01-04 21:44:21 +00:00
}
// Shutdown tears down the eventHandler goroutine
func ( e * Engine ) Shutdown ( ) {
e . connectionsManager . Shutdown ( )
e . app . Shutdown ( )
e . queue . Shutdown ( )
}
// PeerWithOnion is the entry point for cwtchPeer relationships
func ( e * Engine ) PeerWithOnion ( onion string ) * PeerPeerConnection {
return e . connectionsManager . ManagePeerConnection ( onion , e )
}
// InviteOnionToGroup kicks off the invite process
func ( e * Engine ) InviteOnionToGroup ( onion string , invite [ ] byte ) error {
ppc := e . connectionsManager . GetPeerPeerConnectionForOnion ( onion )
if ppc == nil {
return errors . New ( "peer connection not setup for onion. peers must be trusted before sending" )
}
if ppc . GetState ( ) == AUTHENTICATED {
log . Infof ( "Got connection for group: %v - Sending Invite\n" , ppc )
ppc . SendGroupInvite ( invite )
} else {
return errors . New ( "cannot send invite to onion: peer connection is not ready" )
}
return nil
}
// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
func ( e * Engine ) ReceiveGroupMessage ( server string , gm * protocol . GroupMessage ) {
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
2019-01-21 20:08:03 +00:00
e . eventManager . Publish ( event . NewEvent ( event . EncryptedGroupMessage , map [ event . Field ] string { event . Ciphertext : string ( gm . GetCiphertext ( ) ) , event . Signature : string ( gm . GetSignature ( ) ) } ) )
2019-01-04 21:44:21 +00:00
}
// JoinServer manages a new server connection with the given onion address
func ( e * Engine ) JoinServer ( onion string ) {
e . connectionsManager . ManageServerConnection ( onion , e . ReceiveGroupMessage )
}
2019-01-28 20:09:25 +00:00
// SendMessageToGroup attempts to sent the given message to the given group id.
2019-01-04 21:44:21 +00:00
func ( e * Engine ) SendMessageToGroup ( server string , ct [ ] byte , sig [ ] byte ) error {
psc := e . connectionsManager . GetPeerServerConnectionForOnion ( server )
if psc == nil {
return errors . New ( "could not find server connection to send message to" )
}
gm := & protocol . GroupMessage {
Ciphertext : ct ,
Signature : sig ,
}
err := psc . SendGroupMessage ( gm )
return err
}
// GetPeers returns a list of peer connections.
func ( e * Engine ) GetPeers ( ) map [ string ] ConnectionState {
return e . connectionsManager . GetPeers ( )
}
// GetServers returns a list of server connections
func ( e * Engine ) GetServers ( ) map [ string ] ConnectionState {
return e . connectionsManager . GetServers ( )
}
// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
2019-01-23 20:50:53 +00:00
rai * application . Instance
2019-01-04 21:44:21 +00:00
ra * application . RicochetApplication
}
// Init sets up a CwtchPeerInstance
2019-01-23 20:50:53 +00:00
func ( cpi * CwtchPeerInstance ) Init ( rai * application . Instance , ra * application . RicochetApplication ) {
2019-01-04 21:44:21 +00:00
cpi . rai = rai
cpi . ra = ra
}
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
type CwtchPeerHandler struct {
Onion string
EventBus * event . Manager
DataHandler func ( string , [ ] byte ) [ ] byte
}
// HandleGroupInvite handles incoming GroupInvites
func ( cph * CwtchPeerHandler ) HandleGroupInvite ( gci * protocol . GroupChatInvite ) {
log . Debugf ( "Received GroupID from %v %v\n" , cph . Onion , gci . String ( ) )
marshal , err := proto . Marshal ( gci )
if err == nil {
2019-01-22 19:11:25 +00:00
cph . EventBus . Publish ( event . NewEvent ( event . NewGroupInvite , map [ event . Field ] string { event . TimestampReceived : time . Now ( ) . Format ( time . RFC3339Nano ) , event . RemotePeer : cph . Onion , event . GroupInvite : string ( marshal ) } ) )
2019-01-04 21:44:21 +00:00
}
}
// HandlePacket handles the Cwtch cwtchPeer Data Channel
func ( cph * CwtchPeerHandler ) HandlePacket ( data [ ] byte ) [ ] byte {
2019-01-22 19:11:25 +00:00
cph . EventBus . Publish ( event . NewEvent ( event . NewMessageFromPeer , map [ event . Field ] string { event . TimestampReceived : time . Now ( ) . Format ( time . RFC3339Nano ) , event . RemotePeer : cph . Onion , event . Data : string ( data ) } ) )
2019-01-04 21:44:21 +00:00
return [ ] byte { } // TODO remove this
}