Browse Source

make engine a interface and private struct, private most methods

pull/245/head
Dan Ballard 10 months ago
parent
commit
1e1cbe6cd8
5 changed files with 80 additions and 49 deletions
  1. +3
    -3
      peer/cwtch_peer.go
  2. +1
    -1
      protocol/connections/connectionsmanager.go
  3. +67
    -40
      protocol/connections/engine.go
  4. +4
    -4
      protocol/connections/peerpeerconnection.go
  5. +5
    -1
      protocol/connections/peerpeerconnection_test.go

+ 3
- 3
peer/cwtch_peer.go View File

@@ -24,7 +24,7 @@ type cwtchPeer struct {
shutdown bool
started bool

engine *connections.Engine
engine connections.Engine
queue *event.Queue
eventBus *event.Manager
}
@@ -101,8 +101,8 @@ func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) {
}

// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
cp.engine = connections.NewProtocolEngine(cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers)
cp.engine.Identity = identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey)
identity := identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey)
cp.engine = connections.NewProtocolEngine(identity, cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers)
}

// ImportGroup intializes a group from an imported source rather than a peer invite


+ 1
- 1
protocol/connections/connectionsmanager.go View File

@@ -27,7 +27,7 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager {
}

// ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerConnection {
func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConnection {
m.lock.Lock()
defer m.lock.Unlock()



+ 67
- 40
protocol/connections/engine.go View File

@@ -17,15 +17,13 @@ import (
"time"
)

// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
type Engine struct {
type engine struct {
queue *event.Queue
connectionsManager *Manager

// Engine Attributes
Identity identity.Identity
ACN connectivity.ACN
identity identity.Identity
acn connectivity.ACN

app *application.RicochetApplication

@@ -37,18 +35,39 @@ type Engine struct {

// Pointer to the Global Event Manager
eventManager *event.Manager
privateKey ed25519.PrivateKey

// Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey
}

// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
type Engine interface {
Identity() identity.Identity
ACN() connectivity.ACN

GetPeerHandler(string) *CwtchPeerHandler
ContactRequest(string, string) string

LookupContact(string, rsa.PublicKey) (bool, bool)
LookupContactV3(string, ed25519.PublicKey) (bool, bool)

GetPeers() map[string]ConnectionState
GetServers() map[string]ConnectionState

Shutdown()
}

// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) *Engine {
engine := new(Engine)
func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager, blockedPeers []string) Engine {
engine := new(engine)
engine.identity = identity
engine.privateKey = privateKey
engine.queue = event.NewEventQueue(100)
go engine.eventHandler()

engine.ACN = acn
engine.connectionsManager = NewConnectionsManager(engine.ACN)
engine.acn = acn
engine.connectionsManager = NewConnectionsManager(engine.acn)
go engine.connectionsManager.AttemptReconnections()

engine.eventManager = eventManager
@@ -67,21 +86,29 @@ func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, even
return engine
}

func (e *engine) ACN() connectivity.ACN {
return e.acn
}

func (e *engine) Identity() identity.Identity {
return e.identity
}

// eventHandler process events from other subsystems
func (e *Engine) eventHandler() {
func (e *engine) eventHandler() {
for {
ev := e.queue.Next()
switch ev.EventType {
case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
case event.PeerRequest:
e.PeerWithOnion(ev.Data[event.RemotePeer])
e.peerWithOnion(ev.Data[event.RemotePeer])
case event.InvitePeerToGroup:
e.InviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite]))
e.inviteOnionToGroup(ev.Data[event.RemotePeer], []byte(ev.Data[event.GroupInvite]))
case event.JoinServer:
e.JoinServer(ev.Data[event.GroupServer])
e.joinServer(ev.Data[event.GroupServer])
case event.SendMessageToGroup:
e.SendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
case event.SendMessageToPeer:
log.Debugf("Sending Message to Peer.....")
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer])
@@ -110,16 +137,16 @@ func (e *Engine) eventHandler() {

// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler
// TODO: There is likely a slightly better way to encapsulate this behavior
func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler {
func (e *engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler {
return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager}
}

// Listen sets up an onion listener to process incoming cwtch messages
func (e *Engine) listenFn() {
func (e *engine) listenFn() {
ra := new(application.RicochetApplication)
onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort)
onionService, err := e.acn.Listen(e.privateKey, application.RicochetPort)
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname(), event.Error: err.Error()}))
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
return
}

@@ -145,29 +172,29 @@ func (e *Engine) listenFn() {
}
})

ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e)
ra.Init(e.ACN(), e.identity.Name, e.identity, af, e)
log.Infof("Running cwtch peer on %v", onionService.AddressFull())
e.started = true
e.app = ra
ra.Run(onionService)
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.Identity.Hostname()}))
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname()}))
return
}

// LookupContact is a V2 API Call, we want to reject all V2 Peers
// TODO Deprecate
func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
func (e *engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
return false, false
}

// ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface
// TODO Deprecate
func (e *Engine) ContactRequest(name string, message string) string {
func (e *engine) ContactRequest(name string, message string) string {
return "Rejected"
}

// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
func (e *engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
// TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be
// disregarded by peers, so we set it to false.
if _, blocked := e.blocked.Load(hostname); blocked {
@@ -177,19 +204,19 @@ func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (
}

// Shutdown tears down the eventHandler goroutine
func (e *Engine) Shutdown() {
func (e *engine) Shutdown() {
e.connectionsManager.Shutdown()
e.app.Shutdown()
e.queue.Shutdown()
}

// PeerWithOnion is the entry point for cwtchPeer relationships
func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection {
// peerWithOnion is the entry point for cwtchPeer relationships
func (e *engine) peerWithOnion(onion string) *PeerPeerConnection {
return e.connectionsManager.ManagePeerConnection(onion, e)
}

// InviteOnionToGroup kicks off the invite process
func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error {
// inviteOnionToGroup kicks off the invite process
func (e *engine) inviteOnionToGroup(onion string, invite []byte) error {
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion)
if ppc == nil {
return errors.New("peer connection not setup for onion. peers must be trusted before sending")
@@ -203,25 +230,25 @@ func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error {
return nil
}

// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
// receiveGroupMessage is a callback function that processes GroupMessages from a given server
func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
}

// FinishedFetch is a callback function the processes the termination of a fetch channel from a given server
func (e *Engine) FinishedFetch(server string) {
// finishedFetch is a callback function the processes the termination of a fetch channel from a given server
func (e *engine) finishedFetch(server string) {
e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server}))
}

// JoinServer manages a new server connection with the given onion address
func (e *Engine) JoinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage, e.FinishedFetch)
// joinServer manages a new server connection with the given onion address
func (e *engine) joinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e.receiveGroupMessage, e.finishedFetch)
}

// SendMessageToGroup attempts to sent the given message to the given group id.
func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) {
// sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
if psc == nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: server, event.Signature: string(sig), event.Error: "server is offline or the connection has yet to finalize"}))
@@ -238,12 +265,12 @@ func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) {
}

// GetPeers returns a list of peer connections.
func (e *Engine) GetPeers() map[string]ConnectionState {
func (e *engine) GetPeers() map[string]ConnectionState {
return e.connectionsManager.GetPeers()
}

// GetServers returns a list of server connections
func (e *Engine) GetServers() map[string]ConnectionState {
func (e *engine) GetServers() map[string]ConnectionState {
return e.connectionsManager.GetServers()
}



+ 4
- 4
protocol/connections/peerpeerconnection.go View File

@@ -16,11 +16,11 @@ type PeerPeerConnection struct {
PeerHostname string
state ConnectionState
connection *connection.Connection
protocolEngine *Engine
protocolEngine Engine
}

// NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
func NewPeerPeerConnection(peerhostname string, protocolEngine *Engine) *PeerPeerConnection {
func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeerConnection {
ppc := new(PeerPeerConnection)
ppc.PeerHostname = peerhostname
ppc.protocolEngine = protocolEngine
@@ -79,11 +79,11 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() {
// Run manages the setup and teardown of a peer->peer connection
func (ppc *PeerPeerConnection) Run() error {
ppc.state = CONNECTING
rc, err := goricochet.Open(ppc.protocolEngine.ACN, ppc.PeerHostname)
rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname)
if err == nil {
ppc.connection = rc
ppc.state = CONNECTED
_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity)
_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity())
if err == nil {
ppc.state = AUTHENTICATED
go func() {


+ 5
- 1
protocol/connections/peerpeerconnection_test.go View File

@@ -2,6 +2,7 @@ package connections

import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/peer"
@@ -60,7 +61,10 @@ func TestPeerPeerConnection(t *testing.T) {

profile := model.GenerateNewProfile("alice")
hostname := identity.Hostname()
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, &Engine{ACN: connectivity.LocalProvider(), Identity: identity})
manager := &event.Manager{}
manager.Initialize()
engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil)
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, engine)

tp := new(TestPeer)
tp.Init()


Loading…
Cancel
Save