package connections import ( "crypto/rsa" "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections/peer" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "github.com/golang/protobuf/proto" "golang.org/x/crypto/ed25519" "sync" "time" ) // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages type Engine struct { queue *event.Queue connectionsManager *Manager // Engine Attributes Identity identity.Identity ACN connectivity.ACN app *application.RicochetApplication // Engine State started bool // Blocklist blocked sync.Map // Pointer to the Global Event Manager eventManager *event.Manager privateKey ed25519.PrivateKey } // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) *Engine { engine := new(Engine) engine.privateKey = privateKey engine.queue = event.NewEventQueue(100) go engine.eventHandler() engine.ACN = acn engine.connectionsManager = NewConnectionsManager(engine.ACN) go engine.connectionsManager.AttemptReconnections() engine.eventManager = eventManager engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel) engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel) engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel) engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel) engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel) engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel) engine.eventManager.Subscribe(event.BlockPeer, engine.queue.EventChannel) for _, peer := range blockedPeers { engine.blocked.Store(peer, true) } return engine } // eventHandler process events from other subsystems func (e *Engine) eventHandler() { for { ev := e.queue.Next() switch ev.EventType { case event.StatusRequest: e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) case event.PeerRequest: e.PeerWithOnion(ev.Data[event.RemotePeer]) case event.InvitePeerToGroup: e.InviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite])) case event.JoinServer: e.JoinServer(ev.Data[event.GroupServer]) case event.SendMessageToGroup: e.SendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) case event.SendMessageToPeer: log.Debugf("Sending Message to Peer.....") ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer]) if ppc != nil { // TODO this will block. ppc.SendPacket([]byte(ev.Data[event.Data])) } case event.BlockPeer: e.blocked.Store(ev.Data[event.RemotePeer], true) case event.ProtocolEngineStartListen: go e.listenFn() default: return } } } // GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler // TODO: There is likely a slightly better way to encapsulate this behavior func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler { return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager} } // Listen sets up an onion listener to process incoming cwtch messages func (e *Engine) listenFn() { ra := new(application.RicochetApplication) onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort) if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ { e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname(), event.Error: err.Error()})) return } af := application.ApplicationInstanceFactory{} af.Init() af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler { cpi := new(CwtchPeerInstance) cpi.Init(rai, ra) return func() channels.Handler { cpc := new(peer.CwtchPeerChannel) cpc.Handler = e.GetPeerHandler(rai.RemoteHostname) return cpc } }) af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler { cpi := new(CwtchPeerInstance) cpi.Init(rai, ra) return func() channels.Handler { cpc := new(peer.CwtchPeerDataChannel) cpc.Handler = e.GetPeerHandler(rai.RemoteHostname) return cpc } }) ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e) log.Infof("Running cwtch peer on %v", onionService.AddressFull()) e.started = true e.app = ra ra.Run(onionService) e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname()})) return } // LookupContact is a V2 API Call, we want to reject all V2 Peers // TODO Deprecate func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) { return false, false } // ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface // TODO Deprecate func (e *Engine) ContactRequest(name string, message string) string { return "Rejected" } // LookupContactV3 returns that a contact is known and allowed to communicate for all cases. func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) { // TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be // disregarded by peers, so we set it to false. if _, blocked := e.blocked.Load(hostname); blocked { return false, false } return true, false } // Shutdown tears down the eventHandler goroutine func (e *Engine) Shutdown() { e.connectionsManager.Shutdown() e.app.Shutdown() e.queue.Shutdown() } // PeerWithOnion is the entry point for cwtchPeer relationships func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection { return e.connectionsManager.ManagePeerConnection(onion, e) } // InviteOnionToGroup kicks off the invite process func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error { ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion) if ppc == nil { return errors.New("peer connection not setup for onion. peers must be trusted before sending") } if ppc.GetState() == AUTHENTICATED { log.Infof("Got connection for group: %v - Sending Invite\n", ppc) ppc.SendGroupInvite(invite) } else { return errors.New("cannot send invite to onion: peer connection is not ready") } return nil } // ReceiveGroupMessage is a callback function that processes GroupMessages from a given server func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { // Publish Event so that a Profile Engine can deal with it. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())})) } // JoinServer manages a new server connection with the given onion address func (e *Engine) JoinServer(onion string) { e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage) } // SendMessageToGroup attemps to sent the given message to the given group id. func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) error { psc := e.connectionsManager.GetPeerServerConnectionForOnion(server) if psc == nil { return errors.New("could not find server connection to send message to") } gm := &protocol.GroupMessage{ Ciphertext: ct, Signature: sig, } err := psc.SendGroupMessage(gm) return err } // GetPeers returns a list of peer connections. func (e *Engine) GetPeers() map[string]ConnectionState { return e.connectionsManager.GetPeers() } // GetServers returns a list of server connections func (e *Engine) GetServers() map[string]ConnectionState { return e.connectionsManager.GetServers() } // CwtchPeerInstance encapsulates incoming peer connections type CwtchPeerInstance struct { rai *application.ApplicationInstance ra *application.RicochetApplication } // Init sets up a CwtchPeerInstance func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) { cpi.rai = rai cpi.ra = ra } // CwtchPeerHandler encapsulates handling of incoming CwtchPackets type CwtchPeerHandler struct { Onion string EventBus *event.Manager DataHandler func(string, []byte) []byte } // HandleGroupInvite handles incoming GroupInvites func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) marshal, err := proto.Marshal(gci) if err == nil { cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().String(), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)})) } } // HandlePacket handles the Cwtch cwtchPeer Data Channel func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte { cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().String(), event.RemotePeer: cph.Onion, event.Data: string(data)})) return []byte{} // TODO remove this }