diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 44e5b19..f2516e5 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -24,7 +24,7 @@ type cwtchPeer struct { shutdown bool started bool - engine *connections.Engine + engine connections.Engine queue *event.Queue eventBus *event.Manager } @@ -101,8 +101,8 @@ func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) { } // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. - cp.engine = connections.NewProtocolEngine(cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers) - cp.engine.Identity = identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey) + identity := identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey) + cp.engine = connections.NewProtocolEngine(identity, cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers) } // ImportGroup intializes a group from an imported source rather than a peer invite diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index 100e2d7..27b2b25 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -27,7 +27,7 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager { } // ManagePeerConnection creates a new PeerConnection for the given Host and Profile. -func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerConnection { +func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConnection { m.lock.Lock() defer m.lock.Unlock() diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 5aa89a7..1c1be8c 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -17,15 +17,13 @@ import ( "time" ) -// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. -// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages -type Engine struct { +type engine struct { queue *event.Queue connectionsManager *Manager // Engine Attributes - Identity identity.Identity - ACN connectivity.ACN + identity identity.Identity + acn connectivity.ACN app *application.RicochetApplication @@ -37,18 +35,39 @@ type Engine struct { // Pointer to the Global Event Manager eventManager *event.Manager - privateKey ed25519.PrivateKey + + // Required for listen(), inaccessible from identity + privateKey ed25519.PrivateKey +} + +// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. +// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages +type Engine interface { + Identity() identity.Identity + ACN() connectivity.ACN + + GetPeerHandler(string) *CwtchPeerHandler + ContactRequest(string, string) string + + LookupContact(string, rsa.PublicKey) (bool, bool) + LookupContactV3(string, ed25519.PublicKey) (bool, bool) + + GetPeers() map[string]ConnectionState + GetServers() map[string]ConnectionState + + Shutdown() } // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters -func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) *Engine { - engine := new(Engine) +func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) Engine { + engine := new(engine) + engine.identity = identity engine.privateKey = privateKey engine.queue = event.NewEventQueue(100) go engine.eventHandler() - engine.ACN = acn - engine.connectionsManager = NewConnectionsManager(engine.ACN) + engine.acn = acn + engine.connectionsManager = NewConnectionsManager(engine.acn) go engine.connectionsManager.AttemptReconnections() engine.eventManager = eventManager @@ -67,21 +86,29 @@ func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, even return engine } +func (e *engine) ACN() connectivity.ACN { + return e.acn +} + +func (e *engine) Identity() identity.Identity { + return e.identity +} + // eventHandler process events from other subsystems -func (e *Engine) eventHandler() { +func (e *engine) eventHandler() { for { ev := e.queue.Next() switch ev.EventType { case event.StatusRequest: e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) case event.PeerRequest: - e.PeerWithOnion(ev.Data[event.RemotePeer]) + e.peerWithOnion(ev.Data[event.RemotePeer]) case event.InvitePeerToGroup: - e.InviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite])) + e.inviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite])) case event.JoinServer: - e.JoinServer(ev.Data[event.GroupServer]) + e.joinServer(ev.Data[event.GroupServer]) case event.SendMessageToGroup: - e.SendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) + e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) case event.SendMessageToPeer: log.Debugf("Sending Message to Peer.....") ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer]) @@ -110,16 +137,16 @@ func (e *Engine) eventHandler() { // GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler // TODO: There is likely a slightly better way to encapsulate this behavior -func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler { +func (e *engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler { return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager} } // Listen sets up an onion listener to process incoming cwtch messages -func (e *Engine) listenFn() { +func (e *engine) listenFn() { ra := new(application.RicochetApplication) - onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort) + onionService, err := e.acn.Listen(e.privateKey, application.RicochetPort) if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ { - e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname(), event.Error: err.Error()})) + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()})) return } @@ -145,29 +172,29 @@ func (e *Engine) listenFn() { } }) - ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e) + ra.Init(e.ACN(), e.identity.Name, e.identity, af, e) log.Infof("Running cwtch peer on %v", onionService.AddressFull()) e.started = true e.app = ra ra.Run(onionService) - e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname()})) + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname()})) return } // LookupContact is a V2 API Call, we want to reject all V2 Peers // TODO Deprecate -func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) { +func (e *engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) { return false, false } // ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface // TODO Deprecate -func (e *Engine) ContactRequest(name string, message string) string { +func (e *engine) ContactRequest(name string, message string) string { return "Rejected" } // LookupContactV3 returns that a contact is known and allowed to communicate for all cases. -func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) { +func (e *engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) { // TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be // disregarded by peers, so we set it to false. if _, blocked := e.blocked.Load(hostname); blocked { @@ -177,19 +204,19 @@ func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) ( } // Shutdown tears down the eventHandler goroutine -func (e *Engine) Shutdown() { +func (e *engine) Shutdown() { e.connectionsManager.Shutdown() e.app.Shutdown() e.queue.Shutdown() } -// PeerWithOnion is the entry point for cwtchPeer relationships -func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection { +// peerWithOnion is the entry point for cwtchPeer relationships +func (e *engine) peerWithOnion(onion string) *PeerPeerConnection { return e.connectionsManager.ManagePeerConnection(onion, e) } -// InviteOnionToGroup kicks off the invite process -func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error { +// inviteOnionToGroup kicks off the invite process +func (e *engine) inviteOnionToGroup(onion string, invite []byte) error { ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion) if ppc == nil { return errors.New("peer connection not setup for onion. peers must be trusted before sending") @@ -203,25 +230,25 @@ func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error { return nil } -// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server -func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { +// receiveGroupMessage is a callback function that processes GroupMessages from a given server +func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) { // Publish Event so that a Profile Engine can deal with it. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())})) } -// FinishedFetch is a callback function the processes the termination of a fetch channel from a given server -func (e *Engine) FinishedFetch(server string) { +// finishedFetch is a callback function the processes the termination of a fetch channel from a given server +func (e *engine) finishedFetch(server string) { e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server})) } -// JoinServer manages a new server connection with the given onion address -func (e *Engine) JoinServer(onion string) { - e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage, e.FinishedFetch) +// joinServer manages a new server connection with the given onion address +func (e *engine) joinServer(onion string) { + e.connectionsManager.ManageServerConnection(onion, e.receiveGroupMessage, e.finishedFetch) } -// SendMessageToGroup attempts to sent the given message to the given group id. -func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) { +// sendMessageToGroup attempts to sent the given message to the given group id. +func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) { psc := e.connectionsManager.GetPeerServerConnectionForOnion(server) if psc == nil { e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: "server is offline or the connection has yet to finalize"})) @@ -238,12 +265,12 @@ func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) { } // GetPeers returns a list of peer connections. -func (e *Engine) GetPeers() map[string]ConnectionState { +func (e *engine) GetPeers() map[string]ConnectionState { return e.connectionsManager.GetPeers() } // GetServers returns a list of server connections -func (e *Engine) GetServers() map[string]ConnectionState { +func (e *engine) GetServers() map[string]ConnectionState { return e.connectionsManager.GetServers() } diff --git a/protocol/connections/peerpeerconnection.go b/protocol/connections/peerpeerconnection.go index 53f94fb..3644fb5 100644 --- a/protocol/connections/peerpeerconnection.go +++ b/protocol/connections/peerpeerconnection.go @@ -16,11 +16,11 @@ type PeerPeerConnection struct { PeerHostname string state ConnectionState connection *connection.Connection - protocolEngine *Engine + protocolEngine Engine } // NewPeerPeerConnection creates a new peer connection for the given hostname and profile. -func NewPeerPeerConnection(peerhostname string, protocolEngine *Engine) *PeerPeerConnection { +func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeerConnection { ppc := new(PeerPeerConnection) ppc.PeerHostname = peerhostname ppc.protocolEngine = protocolEngine @@ -79,11 +79,11 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() { // Run manages the setup and teardown of a peer->peer connection func (ppc *PeerPeerConnection) Run() error { ppc.state = CONNECTING - rc, err := goricochet.Open(ppc.protocolEngine.ACN, ppc.PeerHostname) + rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname) if err == nil { ppc.connection = rc ppc.state = CONNECTED - _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity) + _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity()) if err == nil { ppc.state = AUTHENTICATED go func() { diff --git a/protocol/connections/peerpeerconnection_test.go b/protocol/connections/peerpeerconnection_test.go index 1182c4c..735d0f9 100644 --- a/protocol/connections/peerpeerconnection_test.go +++ b/protocol/connections/peerpeerconnection_test.go @@ -2,6 +2,7 @@ package connections import ( "crypto/rand" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections/peer" @@ -60,7 +61,10 @@ func TestPeerPeerConnection(t *testing.T) { profile := model.GenerateNewProfile("alice") hostname := identity.Hostname() - ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, &Engine{ACN: connectivity.LocalProvider(), Identity: identity}) + manager := &event.Manager{} + manager.Initialize() + engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil) + ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, engine) tp := new(TestPeer) tp.Init()