From 6e64f6596226dfaa48ac6ca4f104e198fd8a3dbe Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 17 Jul 2019 12:10:52 -0700 Subject: [PATCH] First cut of Tapir Integration --- go.mod | 13 +- go.sum | 35 ++-- peer/cwtch_peer.go | 5 +- protocol/connections/connectionsmanager.go | 48 ----- protocol/connections/engine.go | 190 ++++++------------ protocol/connections/peer/peer_channel.go | 106 ---------- .../connections/peer/peer_channel_test.go | 102 ---------- .../connections/peer/peer_data_channel.go | 98 --------- protocol/connections/peerapp.go | 56 ++++++ protocol/connections/peerpeerconnection.go | 123 ------------ .../connections/peerpeerconnection_test.go | 90 --------- 11 files changed, 146 insertions(+), 720 deletions(-) delete mode 100644 protocol/connections/peer/peer_channel.go delete mode 100644 protocol/connections/peer/peer_channel_test.go delete mode 100644 protocol/connections/peer/peer_data_channel.go create mode 100644 protocol/connections/peerapp.go delete mode 100644 protocol/connections/peerpeerconnection.go delete mode 100644 protocol/connections/peerpeerconnection_test.go diff --git a/go.mod b/go.mod index 50f7de3..529952c 100644 --- a/go.mod +++ b/go.mod @@ -1,16 +1,15 @@ module cwtch.im/cwtch require ( + cwtch.im/tapir v0.1.4 git.openprivacy.ca/openprivacy/libricochet-go v1.0.4 github.com/c-bata/go-prompt v0.2.3 - github.com/golang/protobuf v1.2.0 - github.com/mattn/go-colorable v0.0.9 // indirect - github.com/mattn/go-isatty v0.0.4 // indirect + github.com/golang/protobuf v1.3.2 + github.com/mattn/go-colorable v0.1.2 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect - github.com/mattn/go-tty v0.0.0-20181127064339-e4f871175a2f // indirect + github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859 // indirect github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 // indirect github.com/struCoder/pidusage v0.1.2 - golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b - golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 - golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb // indirect + golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 + golang.org/x/net v0.0.0-20190628185345-da137c7871d7 ) diff --git a/go.sum b/go.sum index 0d0cd82..313759e 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,6 @@ -git.openprivacy.ca/openprivacy/libricochet-go v1.0.2 h1:U5tufewB3O0L2EKjUyHXxJkvByx4rBSvn5s1M9ma+f4= -git.openprivacy.ca/openprivacy/libricochet-go v1.0.2/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= -git.openprivacy.ca/openprivacy/libricochet-go v1.0.3 h1:LHnhK9hzkqMY+iEE3TZ0FjZsYal05YDiamKmxDOXuts= -git.openprivacy.ca/openprivacy/libricochet-go v1.0.3/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= +cwtch.im/tapir v0.1.4 h1:zXjlp4R7gIjzsnQ5J77RvD3biqJZINy9KkAogsuKRSQ= +cwtch.im/tapir v0.1.4/go.mod h1:EuRYdVrwijeaGBQ4OijDDRHf7R2MDSypqHkSl5DxI34= +git.openprivacy.ca/openprivacy/libricochet-go v1.0.4 h1:GWLMJ5jBSIC/gFXzdbbeVz7fIAn2FTgW8+wBci6/3Ek= git.openprivacy.ca/openprivacy/libricochet-go v1.0.4/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= @@ -13,14 +12,16 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= -github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= -github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= -github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -github.com/mattn/go-tty v0.0.0-20181127064339-e4f871175a2f h1:4P7Ul+TAnk92vTeVkXs6VLjmf1EhrYtDRa03PCYY6VM= -github.com/mattn/go-tty v0.0.0-20181127064339-e4f871175a2f/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= +github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859 h1:smQbSzmT3EHl4EUwtFwFGmGIpiYgIiiPeVv1uguIQEE= +github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 h1:A7GG7zcGjl3jqAqGPmcNjd/D9hzL95SuoOQAaFNdLU0= github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -32,9 +33,19 @@ github.com/struCoder/pidusage v0.1.2 h1:fFPTThlcWFQyizv3xKs5Lyq1lpG5lZ36arEGNhWz github.com/struCoder/pidusage v0.1.2/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b h1:Ib/yptP38nXZFMwqWSip+OKuMP9OkyDe3p+DssP8n9w= golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb h1:1w588/yEchbPNpa9sEvOcMZYbWHedwJjg4VOAdDHWHk= -golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index ba23923..0cafeac 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -30,7 +30,7 @@ type cwtchPeer struct { // directly implement a cwtchPeer. type CwtchPeer interface { Init(event.Manager) - PeerWithOnion(string) *connections.PeerPeerConnection + PeerWithOnion(string) InviteOnionToGroup(string, string) error SendMessageToPeer(string, string) string @@ -197,9 +197,8 @@ func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState { } // PeerWithOnion is the entry point for cwtchPeer relationships -func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { +func (cp *cwtchPeer) PeerWithOnion(onion string) { cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) - return nil } // InviteOnionToGroup kicks off the invite process diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index 9300836..b21d159 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -11,7 +11,6 @@ import ( // Manager encapsulates all the logic necessary to manage outgoing peer and server connections. type Manager struct { - peerConnections map[string]*PeerPeerConnection serverConnections map[string]*PeerServerConnection lock sync.Mutex breakChannel chan bool @@ -22,27 +21,11 @@ type Manager struct { func NewConnectionsManager(acn connectivity.ACN) *Manager { m := new(Manager) m.acn = acn - m.peerConnections = make(map[string]*PeerPeerConnection) m.serverConnections = make(map[string]*PeerServerConnection) m.breakChannel = make(chan bool) return m } -// ManagePeerConnection creates a new PeerConnection for the given Host and Profile. -func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConnection { - m.lock.Lock() - defer m.lock.Unlock() - - _, exists := m.peerConnections[host] - if !exists { - ppc := NewPeerPeerConnection(host, engine) - go ppc.Run() - m.peerConnections[host] = ppc - return ppc - } - return m.peerConnections[host] -} - // ManageServerConnection creates a new ServerConnection for Host with the given callback handler. // If there is an establish connection, it is replaced with a new one, assuming this came from // a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded @@ -75,14 +58,6 @@ func (m *Manager) SetServerSynced(onion string) { m.serverConnections[onion].setState(SYNCED) } -// GetPeerPeerConnectionForOnion safely returns a given peer connection -func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) { - m.lock.Lock() - ppc = m.peerConnections[host] - m.lock.Unlock() - return -} - // GetPeerServerConnectionForOnion safely returns a given host connection func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerConnection) { m.lock.Lock() @@ -99,14 +74,6 @@ func (m *Manager) AttemptReconnections() { for { select { case <-time.After(timeout): - m.lock.Lock() - for _, ppc := range m.peerConnections { - if ppc.GetState() == FAILED { - go ppc.Run() - } - } - m.lock.Unlock() - m.lock.Lock() for _, psc := range m.serverConnections { if psc.GetState() == FAILED { @@ -126,25 +93,10 @@ func (m *Manager) AttemptReconnections() { } } -// ClosePeerConnection closes an existing peer connection -func (m *Manager) ClosePeerConnection(onion string) { - m.lock.Lock() - pc, ok := m.peerConnections[onion] - if ok { - pc.Close() - delete(m.peerConnections, onion) - } - m.lock.Unlock() -} - // Shutdown closes all connections under management (freeing their goroutines) func (m *Manager) Shutdown() { m.breakChannel <- true m.lock.Lock() - for onion, ppc := range m.peerConnections { - ppc.Close() - delete(m.peerConnections, onion) - } for onion, psc := range m.serverConnections { psc.Close() delete(m.serverConnections, onion) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index c568d05..22c3739 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -1,13 +1,10 @@ package connections import ( - "crypto/rsa" "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/protocol/connections/peer" - "errors" - "git.openprivacy.ca/openprivacy/libricochet-go/application" - "git.openprivacy.ca/openprivacy/libricochet-go/channels" + "cwtch.im/tapir" + "cwtch.im/tapir/networks/tor" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -25,8 +22,6 @@ type engine struct { identity identity.Identity acn connectivity.ACN - app *application.RicochetApplication - // Engine State started bool @@ -36,6 +31,9 @@ type engine struct { // Pointer to the Global Event Manager eventManager event.Manager + // Nextgen Tapir Service + service tapir.Service + // Required for listen(), inaccessible from identity privateKey ed25519.PrivateKey } @@ -46,13 +44,6 @@ type Engine interface { Identity() identity.Identity ACN() connectivity.ACN EventManager() event.Manager - - GetPeerHandler(string) *CwtchPeerHandler - ContactRequest(string, string) string - - LookupContact(string, rsa.PublicKey) (bool, bool) - LookupContactV3(string, ed25519.PublicKey) (bool, bool) - Shutdown() } @@ -68,6 +59,10 @@ func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey engine.connectionsManager = NewConnectionsManager(engine.acn) go engine.connectionsManager.AttemptReconnections() + // Init the Server running the Simple App. + engine.service = new(tor.BaseOnionService) + engine.service.Init(acn, privateKey, identity) + engine.eventManager = eventManager engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel) @@ -113,22 +108,17 @@ func (e *engine) eventHandler() { e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) case event.SendMessageToPeer: log.Debugf("Sending Message to Peer.....") - ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer]) - if ppc != nil && ppc.GetState() == AUTHENTICATED { - err := ppc.SendPacket([]byte(ev.Data[event.Data])) - if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: err.Error()})) - } - } else { + connection, err := e.service.GetConnection(ev.Data[event.RemotePeer]) + if err != nil { e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } + connection.Send([]byte(ev.Data[event.Data])) case event.BlockPeer: e.blocked.Store(ev.Data[event.RemotePeer], true) - ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer]) - if ppc != nil { - ppc.Close() + connection, err := e.service.GetConnection(ev.Data[event.RemotePeer]) + if err != nil { + connection.Close() } - e.app.Close(ev.Data[event.RemotePeer]) case event.ProtocolEngineStartListen: go e.listenFn() default: @@ -137,99 +127,59 @@ func (e *engine) eventHandler() { } } -// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler -// TODO: There is likely a slightly better way to encapsulate this behavior -func (e *engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler { - return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager} -} - // Listen sets up an onion listener to process incoming cwtch messages func (e *engine) listenFn() { - ra := new(application.RicochetApplication) - onionService, err := e.acn.Listen(e.privateKey, application.RicochetPort) - if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ { - e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()})) - return - } - - af := application.InstanceFactory{} - af.Init() - af.AddHandler("im.cwtch.peer", func(rai *application.Instance) func() channels.Handler { - cpi := new(CwtchPeerInstance) - cpi.Init(rai, ra) - return func() channels.Handler { - cpc := new(peer.CwtchPeerChannel) - cpc.Handler = e.GetPeerHandler(rai.RemoteHostname) - return cpc - } - }) - - af.AddHandler("im.cwtch.peer.data", func(rai *application.Instance) func() channels.Handler { - cpi := new(CwtchPeerInstance) - cpi.Init(rai, ra) - return func() channels.Handler { - cpc := new(peer.CwtchPeerDataChannel) - cpc.Handler = e.GetPeerHandler(rai.RemoteHostname) - return cpc - } - }) - - ra.Init(e.ACN(), e.identity.Name, e.identity, af, e) - log.Infof("Running cwtch peer on %v", onionService.AddressFull()) - e.started = true - e.app = ra - ra.Run(onionService) - e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname()})) + peerAppTemplate := new(PeerApp) + peerAppTemplate.MessageHandler = e.handlePeerMessage + peerAppTemplate.OnAuth = e.peerAuthed + peerAppTemplate.OnClose = e.peerDisconnected + err := e.service.Listen(peerAppTemplate) + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()})) return } -// LookupContact is a V2 API Call, we want to reject all V2 Peers -// TODO Deprecate -func (e *engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) { - return false, false -} - -// ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface -// TODO Deprecate -func (e *engine) ContactRequest(name string, message string) string { - return "Rejected" -} - -// LookupContactV3 returns that a contact is known and allowed to communicate for all cases. -func (e *engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) { - // TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be - // disregarded by peers, so we set it to false. - if _, blocked := e.blocked.Load(hostname); blocked { - return false, false - } - return true, false -} - // Shutdown tears down the eventHandler goroutine func (e *engine) Shutdown() { + e.service.Shutdown() e.connectionsManager.Shutdown() - e.app.Shutdown() e.queue.Shutdown() } // peerWithOnion is the entry point for cwtchPeer relationships -func (e *engine) peerWithOnion(onion string) *PeerPeerConnection { - return e.connectionsManager.ManagePeerConnection(onion, e) +func (e *engine) peerWithOnion(onion string) { + peerAppTemplate := new(PeerApp) + peerAppTemplate.MessageHandler = e.handlePeerMessage + peerAppTemplate.OnAuth = e.peerAuthed + peerAppTemplate.OnClose = e.peerDisconnected + e.service.Connect(onion, peerAppTemplate) +} + +func (e *engine) peerAuthed(onion string) { + e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(onion), + event.ConnectionState: ConnectionStateName[AUTHENTICATED], + })) +} + +func (e *engine) peerDisconnected(onion string) { + e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(onion), + event.ConnectionState: ConnectionStateName[DISCONNECTED], + })) } // inviteOnionToGroup kicks off the invite process func (e *engine) inviteOnionToGroup(onion string, invite []byte) error { - ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion) - if ppc == nil { - return errors.New("peer connection not setup for onion. peers must be trusted before sending") + conn, err := e.service.GetConnection(onion) + if err == nil { + peerApp, ok := conn.App.(*PeerApp) + if ok { + peerApp.SendMessage(invite) + return nil + } + panic("this should never happen") } - if ppc.GetState() == AUTHENTICATED { - log.Infof("Got connection for group: %v - Sending Invite\n", ppc) - ppc.SendGroupInvite(invite) - } else { - return errors.New("cannot send invite to onion: peer connection is not ready") - } - return nil + return err } // receiveGroupMessage is a callback function that processes GroupMessages from a given server @@ -261,36 +211,14 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) { } } -// CwtchPeerInstance encapsulates incoming peer connections -type CwtchPeerInstance struct { - rai *application.Instance - ra *application.RicochetApplication -} - -// Init sets up a CwtchPeerInstance -func (cpi *CwtchPeerInstance) Init(rai *application.Instance, ra *application.RicochetApplication) { - cpi.rai = rai - cpi.ra = ra -} - -// CwtchPeerHandler encapsulates handling of incoming CwtchPackets -type CwtchPeerHandler struct { - Onion string - EventBus event.Manager - DataHandler func(string, []byte) []byte -} - -// HandleGroupInvite handles incoming GroupInvites -func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { - log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) - marshal, err := proto.Marshal(gci) +func (e *engine) handlePeerMessage(hostname string, message []byte) { + cpp := &protocol.CwtchPeerPacket{} + err := proto.Unmarshal(message, cpp) if err == nil { - cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)})) + if cpp.GetGroupChatInvite() != nil { + marshal, _ := proto.Marshal(cpp.GetGroupChatInvite()) + e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(marshal)})) + } } -} - -// HandlePacket handles the Cwtch cwtchPeer Data Channel -func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte { - cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.Data: string(data)})) - return []byte{} // TODO remove this + e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) } diff --git a/protocol/connections/peer/peer_channel.go b/protocol/connections/peer/peer_channel.go deleted file mode 100644 index 4fb1345..0000000 --- a/protocol/connections/peer/peer_channel.go +++ /dev/null @@ -1,106 +0,0 @@ -package peer - -import ( - "cwtch.im/cwtch/protocol" - "git.openprivacy.ca/openprivacy/libricochet-go/channels" - "git.openprivacy.ca/openprivacy/libricochet-go/log" - "git.openprivacy.ca/openprivacy/libricochet-go/utils" - "git.openprivacy.ca/openprivacy/libricochet-go/wire/control" - "github.com/golang/protobuf/proto" -) - -// CwtchPeerChannel implements the ChannelHandler interface for a channel of -// type "im.ricochet.Cwtch". The channel may be inbound or outbound. -// -// CwtchPeerChannel implements protocol-level sanity and state validation, but -// does not handle or acknowledge Cwtch messages. The application must provide -// a CwtchPeerChannelHandler implementation to handle Cwtch events. -type CwtchPeerChannel struct { - // Methods of Handler are called for Cwtch events on this channel - Handler CwtchPeerChannelHandler - channel *channels.Channel -} - -// CwtchPeerChannelHandler is implemented by an application type to receive -// events from a CwtchPeerChannel. -type CwtchPeerChannelHandler interface { - HandleGroupInvite(*protocol.GroupChatInvite) -} - -// SendMessage sends a raw message on this channel -func (cpc *CwtchPeerChannel) SendMessage(data []byte) { - cpc.channel.SendMessage(data) -} - -// Type returns the type string for this channel, e.g. "im.ricochet.Cwtch". -func (cpc *CwtchPeerChannel) Type() string { - return "im.cwtch.peer" -} - -// Closed is called when the channel is closed for any reason. -func (cpc *CwtchPeerChannel) Closed(err error) { - -} - -// OnlyClientCanOpen - for Cwtch channels any side can open -func (cpc *CwtchPeerChannel) OnlyClientCanOpen() bool { - return false -} - -// Singleton - for Cwtch channels there can only be one instance per direction -func (cpc *CwtchPeerChannel) Singleton() bool { - return true -} - -// Bidirectional - for Cwtch channels are not bidrectional -func (cpc *CwtchPeerChannel) Bidirectional() bool { - return false -} - -// RequiresAuthentication - Cwtch channels require hidden service auth -func (cpc *CwtchPeerChannel) RequiresAuthentication() string { - return "im.ricochet.auth.3dh" -} - -// OpenInbound is the first method called for an inbound channel request. -// If an error is returned, the channel is rejected. If a RawMessage is -// returned, it will be sent as the ChannelResult message. -func (cpc *CwtchPeerChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) { - cpc.channel = channel - messageBuilder := new(utils.MessageBuilder) - return messageBuilder.AckOpenChannel(channel.ID), nil -} - -// OpenOutbound is the first method called for an outbound channel request. -// If an error is returned, the channel is not opened. If a RawMessage is -// returned, it will be sent as the OpenChannel message. -func (cpc *CwtchPeerChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) { - cpc.channel = channel - messageBuilder := new(utils.MessageBuilder) - return messageBuilder.OpenChannel(channel.ID, cpc.Type()), nil -} - -// OpenOutboundResult is called when a response is received for an -// outbound OpenChannel request. If `err` is non-nil, the channel was -// rejected and Closed will be called immediately afterwards. `raw` -// contains the raw protocol message including any extension data. -func (cpc *CwtchPeerChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) { - if err == nil { - if crm.GetOpened() { - cpc.channel.Pending = false - } - } -} - -// Packet is called for each raw packet received on this channel. -func (cpc *CwtchPeerChannel) Packet(data []byte) { - cpp := &protocol.CwtchPeerPacket{} - err := proto.Unmarshal(data, cpp) - if err == nil { - if cpp.GetGroupChatInvite() != nil { - cpc.Handler.HandleGroupInvite(cpp.GetGroupChatInvite()) - } - } else { - log.Errorf("Error Receivng Packet %v\n", err) - } -} diff --git a/protocol/connections/peer/peer_channel_test.go b/protocol/connections/peer/peer_channel_test.go deleted file mode 100644 index a752157..0000000 --- a/protocol/connections/peer/peer_channel_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package peer - -import ( - "cwtch.im/cwtch/protocol" - "git.openprivacy.ca/openprivacy/libricochet-go/channels" - "git.openprivacy.ca/openprivacy/libricochet-go/wire/control" - "github.com/golang/protobuf/proto" - "testing" -) - -func TestPeerChannelAttributes(t *testing.T) { - cssc := new(CwtchPeerChannel) - if cssc.Type() != "im.cwtch.peer" { - t.Errorf("cwtch channel type is incorrect %v", cssc.Type()) - } - - if cssc.OnlyClientCanOpen() { - t.Errorf("either side should be able to open im.cwtch.peer channel") - } - - if cssc.Bidirectional() { - t.Errorf("im.cwtch.peer should not be bidirectional") - } - - if !cssc.Singleton() { - t.Errorf("im.cwtch.server.listen should be a Singleton") - } - - if cssc.RequiresAuthentication() != "im.ricochet.auth.3dh" { - t.Errorf("cwtch channel required auth is incorrect %v", cssc.RequiresAuthentication()) - } -} - -type TestHandler struct { - Received bool - ReceviedGroupInvite bool -} - -func (th *TestHandler) ClientIdentity(ci *protocol.CwtchIdentity) { - if ci.GetName() == "hello" { - th.Received = true - } -} - -func (th *TestHandler) HandleGroupInvite(ci *protocol.GroupChatInvite) { - ///if ci.GetName() == "hello" { - th.ReceviedGroupInvite = true - //} -} - -func (th *TestHandler) GetClientIdentityPacket() []byte { - return nil -} - -func TestPeerChannel(t *testing.T) { - th := new(TestHandler) - cpc := new(CwtchPeerChannel) - cpc.Handler = th - channel := new(channels.Channel) - channel.ID = 3 - result, err := cpc.OpenOutbound(channel) - if err != nil { - t.Errorf("should have send open channel request instead %v, %v", result, err) - } - - cpc2 := new(CwtchPeerChannel) - channel2 := new(channels.Channel) - channel2.ID = 3 - sent := false - channel2.SendMessage = func(message []byte) { - sent = true - } - - control := new(Protocol_Data_Control.Packet) - proto.Unmarshal(result[:], control) - ack, err := cpc2.OpenInbound(channel2, control.GetOpenChannel()) - if err != nil { - t.Errorf("should have ack open channel request instead %v, %v", ack, err) - } - - ackpacket := new(Protocol_Data_Control.Packet) - proto.Unmarshal(ack[:], ackpacket) - cpc.OpenOutboundResult(nil, ackpacket.GetChannelResult()) - if channel.Pending != false { - t.Errorf("Channel should no longer be pending") - } - - gci := &protocol.GroupChatInvite{ - GroupName: "hello", - GroupSharedKey: []byte{}, - ServerHost: "abc.onion", - } - - cpp := &protocol.CwtchPeerPacket{ - GroupChatInvite: gci, - } - packet, _ := proto.Marshal(cpp) - cpc.Packet(packet) - if sent && th.ReceviedGroupInvite == false { - t.Errorf("Should have sent invite packet to handler") - } -} diff --git a/protocol/connections/peer/peer_data_channel.go b/protocol/connections/peer/peer_data_channel.go deleted file mode 100644 index add26de..0000000 --- a/protocol/connections/peer/peer_data_channel.go +++ /dev/null @@ -1,98 +0,0 @@ -package peer - -import ( - "git.openprivacy.ca/openprivacy/libricochet-go/channels" - "git.openprivacy.ca/openprivacy/libricochet-go/utils" - "git.openprivacy.ca/openprivacy/libricochet-go/wire/control" -) - -// CwtchPeerDataChannel implements the ChannelHandler interface for a channel of -// type "im.cwtch.peer.data The channel may be inbound or outbound. -// -// CwtchPeerChannel implements protocol-level sanity and state validation, but -// does not handle or acknowledge Cwtch messages. The application must provide -// a CwtchPeerChannelHandler implementation to handle Cwtch events. -type CwtchPeerDataChannel struct { - // Methods of Handler are called for Cwtch events on this channel - Handler CwtchPeerDataHandler - channel *channels.Channel -} - -// CwtchPeerDataHandler is implemented by an application type to receive -// events from a CwtchPeerChannel. -type CwtchPeerDataHandler interface { - HandlePacket([]byte) []byte -} - -// SendMessage sends a raw message on this channel -func (cpc *CwtchPeerDataChannel) SendMessage(data []byte) { - cpc.channel.SendMessage(data) -} - -// Type returns the type string for this channel, e.g. "im.ricochet.Cwtch". -func (cpc *CwtchPeerDataChannel) Type() string { - return "im.cwtch.peer.data" -} - -// Closed is called when the channel is closed for any reason. -func (cpc *CwtchPeerDataChannel) Closed(err error) { - -} - -// OnlyClientCanOpen - for Cwtch channels any side can open -func (cpc *CwtchPeerDataChannel) OnlyClientCanOpen() bool { - return false -} - -// Singleton - for Cwtch channels there can only be one instance per direction -func (cpc *CwtchPeerDataChannel) Singleton() bool { - return true -} - -// Bidirectional - for Cwtch channels are bidirectional -func (cpc *CwtchPeerDataChannel) Bidirectional() bool { - return true -} - -// RequiresAuthentication - Cwtch channels require hidden service auth -func (cpc *CwtchPeerDataChannel) RequiresAuthentication() string { - return "im.ricochet.auth.3dh" -} - -// OpenInbound is the first method called for an inbound channel request. -// If an error is returned, the channel is rejected. If a RawMessage is -// returned, it will be sent as the ChannelResult message. -func (cpc *CwtchPeerDataChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) { - cpc.channel = channel - messageBuilder := new(utils.MessageBuilder) - return messageBuilder.AckOpenChannel(channel.ID), nil -} - -// OpenOutbound is the first method called for an outbound channel request. -// If an error is returned, the channel is not opened. If a RawMessage is -// returned, it will be sent as the OpenChannel message. -func (cpc *CwtchPeerDataChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) { - cpc.channel = channel - messageBuilder := new(utils.MessageBuilder) - return messageBuilder.OpenChannel(channel.ID, cpc.Type()), nil -} - -// OpenOutboundResult is called when a response is received for an -// outbound OpenChannel request. If `err` is non-nil, the channel was -// rejected and Closed will be called immediately afterwards. `raw` -// contains the raw protocol message including any extension data. -func (cpc *CwtchPeerDataChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) { - if err == nil { - if crm.GetOpened() { - cpc.channel.Pending = false - } - } -} - -// Packet is called for each raw packet received on this channel. -func (cpc *CwtchPeerDataChannel) Packet(data []byte) { - ret := cpc.Handler.HandlePacket(data) - if len(ret) > 0 { - cpc.channel.SendMessage(ret) - } -} diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go new file mode 100644 index 0000000..ddb325d --- /dev/null +++ b/protocol/connections/peerapp.go @@ -0,0 +1,56 @@ +package connections + +import ( + "cwtch.im/tapir" + "cwtch.im/tapir/applications" + "git.openprivacy.ca/openprivacy/libricochet-go/log" +) + +// PeerApp encapsulates the behaviour of a Cwtch Peer +type PeerApp struct { + applications.AuthApp + connection *tapir.Connection + MessageHandler func(string, []byte) + OnAuth func(string) + OnClose func(string) +} + +// NewInstance should always return a new instantiation of the application. +func (pa PeerApp) NewInstance() tapir.Application { + newApp := new(PeerApp) + newApp.MessageHandler = pa.MessageHandler + newApp.OnAuth = pa.OnAuth + newApp.OnClose = pa.OnClose + return newApp +} + +// Init is run when the connection is first started. +func (pa *PeerApp) Init(connection *tapir.Connection) { + // First run the Authentication App + pa.AuthApp.Init(connection) + + if connection.HasCapability(applications.AuthCapability) { + pa.connection = connection + pa.OnAuth(connection.Hostname) + go pa.listen() + } else { + pa.OnClose(connection.Hostname) + } +} + +func (pa PeerApp) listen() { + for { + message := pa.connection.Expect() + if len(message) == 0 { + log.Errorf("0 byte read, socket has likely failed. Closing the listen goroutine") + return + } + pa.MessageHandler(pa.connection.Hostname, message) + } +} + +// SendMessage sends the peer a preformatted message +// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol +func (pa PeerApp) SendMessage(message []byte) { + pa.connection.Send(message) +} diff --git a/protocol/connections/peerpeerconnection.go b/protocol/connections/peerpeerconnection.go deleted file mode 100644 index 684ef6d..0000000 --- a/protocol/connections/peerpeerconnection.go +++ /dev/null @@ -1,123 +0,0 @@ -package connections - -import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/protocol/connections/peer" - "errors" - "git.openprivacy.ca/openprivacy/libricochet-go" - "git.openprivacy.ca/openprivacy/libricochet-go/channels" - "git.openprivacy.ca/openprivacy/libricochet-go/connection" - "git.openprivacy.ca/openprivacy/libricochet-go/log" - "time" -) - -// PeerPeerConnection encapsulates a single outgoing cwtchPeer->cwtchPeer connection -type PeerPeerConnection struct { - connection.AutoConnectionHandler - PeerHostname string - state ConnectionState - connection *connection.Connection - protocolEngine Engine -} - -// NewPeerPeerConnection creates a new peer connection for the given hostname and profile. -func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeerConnection { - ppc := new(PeerPeerConnection) - ppc.PeerHostname = peerhostname - ppc.protocolEngine = protocolEngine - ppc.Init() - return ppc -} - -func (ppc *PeerPeerConnection) setState(state ConnectionState) { - ppc.state = state - ppc.protocolEngine.EventManager().Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ - event.RemotePeer: string(ppc.PeerHostname), - event.ConnectionState: ConnectionStateName[state], - })) -} - -// GetState returns the current connection state -func (ppc *PeerPeerConnection) GetState() ConnectionState { - return ppc.state -} - -// SendPacket sends data packets on the optional data channel -func (ppc *PeerPeerConnection) SendPacket(data []byte) error { - ppc.WaitTilAuthenticated() - return ppc.connection.Do(func() error { - channel := ppc.connection.Channel("im.cwtch.peer.data", channels.Outbound) - if channel != nil { - peerchannel, ok := channel.Handler.(*peer.CwtchPeerDataChannel) - if ok { - log.Debugf("Sending packet\n") - peerchannel.SendMessage(data) - return nil - } - } - return errors.New("failed to send packet to peer") - }) -} - -// SendGroupInvite sends the given serialized invite packet to the Peer -func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) { - ppc.WaitTilAuthenticated() - ppc.connection.Do(func() error { - channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound) - if channel != nil { - peerchannel, ok := channel.Handler.(*peer.CwtchPeerChannel) - if ok { - log.Debugf("Sending group invite packet\n") - peerchannel.SendMessage(invite) - } - } - return nil - }) -} - -// WaitTilAuthenticated waits until the underlying connection is authenticated -func (ppc *PeerPeerConnection) WaitTilAuthenticated() { - for { - if ppc.GetState() == AUTHENTICATED { - break - } - time.Sleep(time.Second * 1) - } -} - -// Run manages the setup and teardown of a peer->peer connection -func (ppc *PeerPeerConnection) Run() error { - ppc.setState(CONNECTING) - rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname) - if err == nil { - ppc.connection = rc - ppc.setState(CONNECTED) - _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity()) - if err == nil { - ppc.setState(AUTHENTICATED) - go func() { - ppc.connection.Do(func() error { - ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)}) - return nil - }) - - ppc.connection.Do(func() error { - ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)}) - return nil - }) - }() - - ppc.connection.Process(ppc) - } - } - ppc.setState(FAILED) - return err -} - -// Close closes the connection -func (ppc *PeerPeerConnection) Close() { - ppc.setState(KILLED) - if ppc.connection != nil { - ppc.connection.Close() - } -} diff --git a/protocol/connections/peerpeerconnection_test.go b/protocol/connections/peerpeerconnection_test.go deleted file mode 100644 index 8cd583f..0000000 --- a/protocol/connections/peerpeerconnection_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package connections - -import ( - "crypto/rand" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/protocol/connections/peer" - "fmt" - "git.openprivacy.ca/openprivacy/libricochet-go" - "git.openprivacy.ca/openprivacy/libricochet-go/channels" - "git.openprivacy.ca/openprivacy/libricochet-go/connection" - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" - "git.openprivacy.ca/openprivacy/libricochet-go/identity" - "golang.org/x/crypto/ed25519" - "net" - "testing" - "time" -) - -func PeerAuthValid(hostname string, key ed25519.PublicKey) (allowed, known bool) { - return true, true -} - -func runtestpeer(t *testing.T, tp *TestPeer, identity identity.Identity, listenChan chan bool) { - ln, _ := net.Listen("tcp", "127.0.0.1:5452") - listenChan <- true - conn, _ := ln.Accept() - defer conn.Close() - - rc, err := goricochet.NegotiateVersionInbound(conn) - if err != nil { - t.Errorf("Negotiate Version Error: %v", err) - } - err = connection.HandleInboundConnection(rc).ProcessAuthAsV3Server(identity, PeerAuthValid) - if err != nil { - t.Errorf("ServerAuth Error: %v", err) - } - tp.RegisterChannelHandler("im.cwtch.peer", func() channels.Handler { - cpc := new(peer.CwtchPeerChannel) - cpc.Handler = tp - return cpc - }) - - rc.Process(tp) -} - -type TestPeer struct { - connection.AutoConnectionHandler - ReceivedIdentityPacket bool - ReceivedGroupInvite bool -} - -func (tp *TestPeer) HandleGroupInvite(gci *protocol.GroupChatInvite) { - tp.ReceivedGroupInvite = true -} - -func TestPeerPeerConnection(t *testing.T) { - pub, priv, _ := ed25519.GenerateKey(rand.Reader) - identity := identity.InitializeV3("", &priv, &pub) - - profile := model.GenerateNewProfile("alice") - hostname := identity.Hostname() - manager := event.NewEventManager() - engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil) - ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, engine) - - tp := new(TestPeer) - tp.Init() - listenChan := make(chan bool) - go runtestpeer(t, tp, identity, listenChan) - <-listenChan - state := ppc.GetState() - if state != DISCONNECTED { - fmt.Println("ERROR state should be disconnected") - t.Errorf("new connections should start in disconnected state") - } - go ppc.Run() - time.Sleep(time.Second * 5) - state = ppc.GetState() - if state != AUTHENTICATED { - t.Errorf("connection state should be authenticated(3), was instead %v", state) - } - _, invite, _ := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - ppc.SendGroupInvite(invite) - time.Sleep(time.Second * 3) - if tp.ReceivedGroupInvite == false { - t.Errorf("should have received an group invite packet") - } -}