Browse Source

First cut of Tapir Integration

pull/254/head
Sarah Jamie Lewis 11 months ago
parent
commit
6e64f65962
11 changed files with 146 additions and 720 deletions
  1. +6
    -7
      go.mod
  2. +23
    -12
      go.sum
  3. +2
    -3
      peer/cwtch_peer.go
  4. +0
    -48
      protocol/connections/connectionsmanager.go
  5. +59
    -131
      protocol/connections/engine.go
  6. +0
    -106
      protocol/connections/peer/peer_channel.go
  7. +0
    -102
      protocol/connections/peer/peer_channel_test.go
  8. +0
    -98
      protocol/connections/peer/peer_data_channel.go
  9. +56
    -0
      protocol/connections/peerapp.go
  10. +0
    -123
      protocol/connections/peerpeerconnection.go
  11. +0
    -90
      protocol/connections/peerpeerconnection_test.go

+ 6
- 7
go.mod View File

@@ -1,16 +1,15 @@
module cwtch.im/cwtch

require (
cwtch.im/tapir v0.1.4
git.openprivacy.ca/openprivacy/libricochet-go v1.0.4
github.com/c-bata/go-prompt v0.2.3
github.com/golang/protobuf v1.2.0
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
github.com/golang/protobuf v1.3.2
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/mattn/go-tty v0.0.0-20181127064339-e4f871175a2f // indirect
github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859 // indirect
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 // indirect
github.com/struCoder/pidusage v0.1.2
golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb // indirect
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190628185345-da137c7871d7
)

+ 23
- 12
go.sum View File

@@ -1,7 +1,6 @@
git.openprivacy.ca/openprivacy/libricochet-go v1.0.2 h1:U5tufewB3O0L2EKjUyHXxJkvByx4rBSvn5s1M9ma+f4=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.2/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.3 h1:LHnhK9hzkqMY+iEE3TZ0FjZsYal05YDiamKmxDOXuts=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.3/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
cwtch.im/tapir v0.1.4 h1:zXjlp4R7gIjzsnQ5J77RvD3biqJZINy9KkAogsuKRSQ=
cwtch.im/tapir v0.1.4/go.mod h1:EuRYdVrwijeaGBQ4OijDDRHf7R2MDSypqHkSl5DxI34=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.4 h1:GWLMJ5jBSIC/gFXzdbbeVz7fIAn2FTgW8+wBci6/3Ek=
git.openprivacy.ca/openprivacy/libricochet-go v1.0.4/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY=
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI=
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
@@ -13,14 +12,16 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-tty v0.0.0-20181127064339-e4f871175a2f h1:4P7Ul+TAnk92vTeVkXs6VLjmf1EhrYtDRa03PCYY6VM=
github.com/mattn/go-tty v0.0.0-20181127064339-e4f871175a2f/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859 h1:smQbSzmT3EHl4EUwtFwFGmGIpiYgIiiPeVv1uguIQEE=
github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 h1:A7GG7zcGjl3jqAqGPmcNjd/D9hzL95SuoOQAaFNdLU0=
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -32,9 +33,19 @@ github.com/struCoder/pidusage v0.1.2 h1:fFPTThlcWFQyizv3xKs5Lyq1lpG5lZ36arEGNhWz
github.com/struCoder/pidusage v0.1.2/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI=
golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b h1:Ib/yptP38nXZFMwqWSip+OKuMP9OkyDe3p+DssP8n9w=
golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb h1:1w588/yEchbPNpa9sEvOcMZYbWHedwJjg4VOAdDHWHk=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

+ 2
- 3
peer/cwtch_peer.go View File

@@ -30,7 +30,7 @@ type cwtchPeer struct {
// directly implement a cwtchPeer.
type CwtchPeer interface {
Init(event.Manager)
PeerWithOnion(string) *connections.PeerPeerConnection
PeerWithOnion(string)
InviteOnionToGroup(string, string) error
SendMessageToPeer(string, string) string

@@ -197,9 +197,8 @@ func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState {
}

// PeerWithOnion is the entry point for cwtchPeer relationships
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
func (cp *cwtchPeer) PeerWithOnion(onion string) {
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
return nil
}

// InviteOnionToGroup kicks off the invite process


+ 0
- 48
protocol/connections/connectionsmanager.go View File

@@ -11,7 +11,6 @@ import (

// Manager encapsulates all the logic necessary to manage outgoing peer and server connections.
type Manager struct {
peerConnections map[string]*PeerPeerConnection
serverConnections map[string]*PeerServerConnection
lock sync.Mutex
breakChannel chan bool
@@ -22,27 +21,11 @@ type Manager struct {
func NewConnectionsManager(acn connectivity.ACN) *Manager {
m := new(Manager)
m.acn = acn
m.peerConnections = make(map[string]*PeerPeerConnection)
m.serverConnections = make(map[string]*PeerServerConnection)
m.breakChannel = make(chan bool)
return m
}

// ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConnection {
m.lock.Lock()
defer m.lock.Unlock()

_, exists := m.peerConnections[host]
if !exists {
ppc := NewPeerPeerConnection(host, engine)
go ppc.Run()
m.peerConnections[host] = ppc
return ppc
}
return m.peerConnections[host]
}

// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
// If there is an establish connection, it is replaced with a new one, assuming this came from
// a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded
@@ -75,14 +58,6 @@ func (m *Manager) SetServerSynced(onion string) {
m.serverConnections[onion].setState(SYNCED)
}

// GetPeerPeerConnectionForOnion safely returns a given peer connection
func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) {
m.lock.Lock()
ppc = m.peerConnections[host]
m.lock.Unlock()
return
}

// GetPeerServerConnectionForOnion safely returns a given host connection
func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerConnection) {
m.lock.Lock()
@@ -100,14 +75,6 @@ func (m *Manager) AttemptReconnections() {
select {
case <-time.After(timeout):
m.lock.Lock()
for _, ppc := range m.peerConnections {
if ppc.GetState() == FAILED {
go ppc.Run()
}
}
m.lock.Unlock()

m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
@@ -126,25 +93,10 @@ func (m *Manager) AttemptReconnections() {
}
}

// ClosePeerConnection closes an existing peer connection
func (m *Manager) ClosePeerConnection(onion string) {
m.lock.Lock()
pc, ok := m.peerConnections[onion]
if ok {
pc.Close()
delete(m.peerConnections, onion)
}
m.lock.Unlock()
}

// Shutdown closes all connections under management (freeing their goroutines)
func (m *Manager) Shutdown() {
m.breakChannel <- true
m.lock.Lock()
for onion, ppc := range m.peerConnections {
ppc.Close()
delete(m.peerConnections, onion)
}
for onion, psc := range m.serverConnections {
psc.Close()
delete(m.serverConnections, onion)


+ 59
- 131
protocol/connections/engine.go View File

@@ -1,13 +1,10 @@
package connections

import (
"crypto/rsa"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/peer"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"cwtch.im/tapir"
"cwtch.im/tapir/networks/tor"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
@@ -25,8 +22,6 @@ type engine struct {
identity identity.Identity
acn connectivity.ACN

app *application.RicochetApplication

// Engine State
started bool

@@ -36,6 +31,9 @@ type engine struct {
// Pointer to the Global Event Manager
eventManager event.Manager

// Nextgen Tapir Service
service tapir.Service

// Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey
}
@@ -46,13 +44,6 @@ type Engine interface {
Identity() identity.Identity
ACN() connectivity.ACN
EventManager() event.Manager

GetPeerHandler(string) *CwtchPeerHandler
ContactRequest(string, string) string

LookupContact(string, rsa.PublicKey) (bool, bool)
LookupContactV3(string, ed25519.PublicKey) (bool, bool)

Shutdown()
}

@@ -68,6 +59,10 @@ func NewProtocolEngine(identity identity.Identity, privateKey ed25519.PrivateKey
engine.connectionsManager = NewConnectionsManager(engine.acn)
go engine.connectionsManager.AttemptReconnections()

// Init the Server running the Simple App.
engine.service = new(tor.BaseOnionService)
engine.service.Init(acn, privateKey, identity)

engine.eventManager = eventManager

engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
@@ -113,22 +108,17 @@ func (e *engine) eventHandler() {
e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
case event.SendMessageToPeer:
log.Debugf("Sending Message to Peer.....")
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer])
if ppc != nil && ppc.GetState() == AUTHENTICATED {
err := ppc.SendPacket([]byte(ev.Data[event.Data]))
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: err.Error()}))
}
} else {
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
}
connection.Send([]byte(ev.Data[event.Data]))
case event.BlockPeer:
e.blocked.Store(ev.Data[event.RemotePeer], true)
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data[event.RemotePeer])
if ppc != nil {
ppc.Close()
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
if err != nil {
connection.Close()
}
e.app.Close(ev.Data[event.RemotePeer])
case event.ProtocolEngineStartListen:
go e.listenFn()
default:
@@ -137,99 +127,59 @@ func (e *engine) eventHandler() {
}
}

// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler
// TODO: There is likely a slightly better way to encapsulate this behavior
func (e *engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler {
return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager}
}

// Listen sets up an onion listener to process incoming cwtch messages
func (e *engine) listenFn() {
ra := new(application.RicochetApplication)
onionService, err := e.acn.Listen(e.privateKey, application.RicochetPort)
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
return
}

af := application.InstanceFactory{}
af.Init()
af.AddHandler("im.cwtch.peer", func(rai *application.Instance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerChannel)
cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
return cpc
}
})

af.AddHandler("im.cwtch.peer.data", func(rai *application.Instance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerDataChannel)
cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
return cpc
}
})

ra.Init(e.ACN(), e.identity.Name, e.identity, af, e)
log.Infof("Running cwtch peer on %v", onionService.AddressFull())
e.started = true
e.app = ra
ra.Run(onionService)
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname()}))
peerAppTemplate := new(PeerApp)
peerAppTemplate.MessageHandler = e.handlePeerMessage
peerAppTemplate.OnAuth = e.peerAuthed
peerAppTemplate.OnClose = e.peerDisconnected
err := e.service.Listen(peerAppTemplate)
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
return
}

// LookupContact is a V2 API Call, we want to reject all V2 Peers
// TODO Deprecate
func (e *engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
return false, false
}

// ContactRequest is a V2 API Call needed to implement ContactRequestHandler Interface
// TODO Deprecate
func (e *engine) ContactRequest(name string, message string) string {
return "Rejected"
}

// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
func (e *engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
// TODO: We want to autoblock those that are blocked, The known parameter has no use anymore and should be
// disregarded by peers, so we set it to false.
if _, blocked := e.blocked.Load(hostname); blocked {
return false, false
}
return true, false
}

// Shutdown tears down the eventHandler goroutine
func (e *engine) Shutdown() {
e.service.Shutdown()
e.connectionsManager.Shutdown()
e.app.Shutdown()
e.queue.Shutdown()
}

// peerWithOnion is the entry point for cwtchPeer relationships
func (e *engine) peerWithOnion(onion string) *PeerPeerConnection {
return e.connectionsManager.ManagePeerConnection(onion, e)
func (e *engine) peerWithOnion(onion string) {
peerAppTemplate := new(PeerApp)
peerAppTemplate.MessageHandler = e.handlePeerMessage
peerAppTemplate.OnAuth = e.peerAuthed
peerAppTemplate.OnClose = e.peerDisconnected
e.service.Connect(onion, peerAppTemplate)
}

func (e *engine) peerAuthed(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.ConnectionState: ConnectionStateName[AUTHENTICATED],
}))
}

func (e *engine) peerDisconnected(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.ConnectionState: ConnectionStateName[DISCONNECTED],
}))
}

// inviteOnionToGroup kicks off the invite process
func (e *engine) inviteOnionToGroup(onion string, invite []byte) error {
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion)
if ppc == nil {
return errors.New("peer connection not setup for onion. peers must be trusted before sending")
}
if ppc.GetState() == AUTHENTICATED {
log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
ppc.SendGroupInvite(invite)
} else {
return errors.New("cannot send invite to onion: peer connection is not ready")
conn, err := e.service.GetConnection(onion)
if err == nil {
peerApp, ok := conn.App.(*PeerApp)
if ok {
peerApp.SendMessage(invite)
return nil
}
panic("this should never happen")
}
return nil
return err
}

// receiveGroupMessage is a callback function that processes GroupMessages from a given server
@@ -261,36 +211,14 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
}
}

// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
rai *application.Instance
ra *application.RicochetApplication
}

// Init sets up a CwtchPeerInstance
func (cpi *CwtchPeerInstance) Init(rai *application.Instance, ra *application.RicochetApplication) {
cpi.rai = rai
cpi.ra = ra
}

// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
type CwtchPeerHandler struct {
Onion string
EventBus event.Manager
DataHandler func(string, []byte) []byte
}

// HandleGroupInvite handles incoming GroupInvites
func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
marshal, err := proto.Marshal(gci)
func (e *engine) handlePeerMessage(hostname string, message []byte) {
cpp := &protocol.CwtchPeerPacket{}
err := proto.Unmarshal(message, cpp)
if err == nil {
cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)}))
if cpp.GetGroupChatInvite() != nil {
marshal, _ := proto.Marshal(cpp.GetGroupChatInvite())
e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(marshal)}))
}
}
}

// HandlePacket handles the Cwtch cwtchPeer Data Channel
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.Data: string(data)}))
return []byte{} // TODO remove this
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
}

+ 0
- 106
protocol/connections/peer/peer_channel.go View File

@@ -1,106 +0,0 @@
package peer

import (
"cwtch.im/cwtch/protocol"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
"github.com/golang/protobuf/proto"
)

// CwtchPeerChannel implements the ChannelHandler interface for a channel of
// type "im.ricochet.Cwtch". The channel may be inbound or outbound.
//
// CwtchPeerChannel implements protocol-level sanity and state validation, but
// does not handle or acknowledge Cwtch messages. The application must provide
// a CwtchPeerChannelHandler implementation to handle Cwtch events.
type CwtchPeerChannel struct {
// Methods of Handler are called for Cwtch events on this channel
Handler CwtchPeerChannelHandler
channel *channels.Channel
}

// CwtchPeerChannelHandler is implemented by an application type to receive
// events from a CwtchPeerChannel.
type CwtchPeerChannelHandler interface {
HandleGroupInvite(*protocol.GroupChatInvite)
}

// SendMessage sends a raw message on this channel
func (cpc *CwtchPeerChannel) SendMessage(data []byte) {
cpc.channel.SendMessage(data)
}

// Type returns the type string for this channel, e.g. "im.ricochet.Cwtch".
func (cpc *CwtchPeerChannel) Type() string {
return "im.cwtch.peer"
}

// Closed is called when the channel is closed for any reason.
func (cpc *CwtchPeerChannel) Closed(err error) {

}

// OnlyClientCanOpen - for Cwtch channels any side can open
func (cpc *CwtchPeerChannel) OnlyClientCanOpen() bool {
return false
}

// Singleton - for Cwtch channels there can only be one instance per direction
func (cpc *CwtchPeerChannel) Singleton() bool {
return true
}

// Bidirectional - for Cwtch channels are not bidrectional
func (cpc *CwtchPeerChannel) Bidirectional() bool {
return false
}

// RequiresAuthentication - Cwtch channels require hidden service auth
func (cpc *CwtchPeerChannel) RequiresAuthentication() string {
return "im.ricochet.auth.3dh"
}

// OpenInbound is the first method called for an inbound channel request.
// If an error is returned, the channel is rejected. If a RawMessage is
// returned, it will be sent as the ChannelResult message.
func (cpc *CwtchPeerChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) {
cpc.channel = channel
messageBuilder := new(utils.MessageBuilder)
return messageBuilder.AckOpenChannel(channel.ID), nil
}

// OpenOutbound is the first method called for an outbound channel request.
// If an error is returned, the channel is not opened. If a RawMessage is
// returned, it will be sent as the OpenChannel message.
func (cpc *CwtchPeerChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) {
cpc.channel = channel
messageBuilder := new(utils.MessageBuilder)
return messageBuilder.OpenChannel(channel.ID, cpc.Type()), nil
}

// OpenOutboundResult is called when a response is received for an
// outbound OpenChannel request. If `err` is non-nil, the channel was
// rejected and Closed will be called immediately afterwards. `raw`
// contains the raw protocol message including any extension data.
func (cpc *CwtchPeerChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) {
if err == nil {
if crm.GetOpened() {
cpc.channel.Pending = false
}
}
}

// Packet is called for each raw packet received on this channel.
func (cpc *CwtchPeerChannel) Packet(data []byte) {
cpp := &protocol.CwtchPeerPacket{}
err := proto.Unmarshal(data, cpp)
if err == nil {
if cpp.GetGroupChatInvite() != nil {
cpc.Handler.HandleGroupInvite(cpp.GetGroupChatInvite())
}
} else {
log.Errorf("Error Receivng Packet %v\n", err)
}
}

+ 0
- 102
protocol/connections/peer/peer_channel_test.go View File

@@ -1,102 +0,0 @@
package peer

import (
"cwtch.im/cwtch/protocol"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
"github.com/golang/protobuf/proto"
"testing"
)

func TestPeerChannelAttributes(t *testing.T) {
cssc := new(CwtchPeerChannel)
if cssc.Type() != "im.cwtch.peer" {
t.Errorf("cwtch channel type is incorrect %v", cssc.Type())
}

if cssc.OnlyClientCanOpen() {
t.Errorf("either side should be able to open im.cwtch.peer channel")
}

if cssc.Bidirectional() {
t.Errorf("im.cwtch.peer should not be bidirectional")
}

if !cssc.Singleton() {
t.Errorf("im.cwtch.server.listen should be a Singleton")
}

if cssc.RequiresAuthentication() != "im.ricochet.auth.3dh" {
t.Errorf("cwtch channel required auth is incorrect %v", cssc.RequiresAuthentication())
}
}

type TestHandler struct {
Received bool
ReceviedGroupInvite bool
}

func (th *TestHandler) ClientIdentity(ci *protocol.CwtchIdentity) {
if ci.GetName() == "hello" {
th.Received = true
}
}

func (th *TestHandler) HandleGroupInvite(ci *protocol.GroupChatInvite) {
///if ci.GetName() == "hello" {
th.ReceviedGroupInvite = true
//}
}

func (th *TestHandler) GetClientIdentityPacket() []byte {
return nil
}

func TestPeerChannel(t *testing.T) {
th := new(TestHandler)
cpc := new(CwtchPeerChannel)
cpc.Handler = th
channel := new(channels.Channel)
channel.ID = 3
result, err := cpc.OpenOutbound(channel)
if err != nil {
t.Errorf("should have send open channel request instead %v, %v", result, err)
}

cpc2 := new(CwtchPeerChannel)
channel2 := new(channels.Channel)
channel2.ID = 3
sent := false
channel2.SendMessage = func(message []byte) {
sent = true
}

control := new(Protocol_Data_Control.Packet)
proto.Unmarshal(result[:], control)
ack, err := cpc2.OpenInbound(channel2, control.GetOpenChannel())
if err != nil {
t.Errorf("should have ack open channel request instead %v, %v", ack, err)
}

ackpacket := new(Protocol_Data_Control.Packet)
proto.Unmarshal(ack[:], ackpacket)
cpc.OpenOutboundResult(nil, ackpacket.GetChannelResult())
if channel.Pending != false {
t.Errorf("Channel should no longer be pending")
}

gci := &protocol.GroupChatInvite{
GroupName: "hello",
GroupSharedKey: []byte{},
ServerHost: "abc.onion",
}

cpp := &protocol.CwtchPeerPacket{
GroupChatInvite: gci,
}
packet, _ := proto.Marshal(cpp)
cpc.Packet(packet)
if sent && th.ReceviedGroupInvite == false {
t.Errorf("Should have sent invite packet to handler")
}
}

+ 0
- 98
protocol/connections/peer/peer_data_channel.go View File

@@ -1,98 +0,0 @@
package peer

import (
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
)

// CwtchPeerDataChannel implements the ChannelHandler interface for a channel of
// type "im.cwtch.peer.data The channel may be inbound or outbound.
//
// CwtchPeerChannel implements protocol-level sanity and state validation, but
// does not handle or acknowledge Cwtch messages. The application must provide
// a CwtchPeerChannelHandler implementation to handle Cwtch events.
type CwtchPeerDataChannel struct {
// Methods of Handler are called for Cwtch events on this channel
Handler CwtchPeerDataHandler
channel *channels.Channel
}

// CwtchPeerDataHandler is implemented by an application type to receive
// events from a CwtchPeerChannel.
type CwtchPeerDataHandler interface {
HandlePacket([]byte) []byte
}

// SendMessage sends a raw message on this channel
func (cpc *CwtchPeerDataChannel) SendMessage(data []byte) {
cpc.channel.SendMessage(data)
}

// Type returns the type string for this channel, e.g. "im.ricochet.Cwtch".
func (cpc *CwtchPeerDataChannel) Type() string {
return "im.cwtch.peer.data"
}

// Closed is called when the channel is closed for any reason.
func (cpc *CwtchPeerDataChannel) Closed(err error) {

}

// OnlyClientCanOpen - for Cwtch channels any side can open
func (cpc *CwtchPeerDataChannel) OnlyClientCanOpen() bool {
return false
}

// Singleton - for Cwtch channels there can only be one instance per direction
func (cpc *CwtchPeerDataChannel) Singleton() bool {
return true
}

// Bidirectional - for Cwtch channels are bidirectional
func (cpc *CwtchPeerDataChannel) Bidirectional() bool {
return true
}

// RequiresAuthentication - Cwtch channels require hidden service auth
func (cpc *CwtchPeerDataChannel) RequiresAuthentication() string {
return "im.ricochet.auth.3dh"
}

// OpenInbound is the first method called for an inbound channel request.
// If an error is returned, the channel is rejected. If a RawMessage is
// returned, it will be sent as the ChannelResult message.
func (cpc *CwtchPeerDataChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) {
cpc.channel = channel
messageBuilder := new(utils.MessageBuilder)
return messageBuilder.AckOpenChannel(channel.ID), nil
}

// OpenOutbound is the first method called for an outbound channel request.
// If an error is returned, the channel is not opened. If a RawMessage is
// returned, it will be sent as the OpenChannel message.
func (cpc *CwtchPeerDataChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) {
cpc.channel = channel
messageBuilder := new(utils.MessageBuilder)
return messageBuilder.OpenChannel(channel.ID, cpc.Type()), nil
}

// OpenOutboundResult is called when a response is received for an
// outbound OpenChannel request. If `err` is non-nil, the channel was
// rejected and Closed will be called immediately afterwards. `raw`
// contains the raw protocol message including any extension data.
func (cpc *CwtchPeerDataChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) {
if err == nil {
if crm.GetOpened() {
cpc.channel.Pending = false
}
}
}

// Packet is called for each raw packet received on this channel.
func (cpc *CwtchPeerDataChannel) Packet(data []byte) {
ret := cpc.Handler.HandlePacket(data)
if len(ret) > 0 {
cpc.channel.SendMessage(ret)
}
}

+ 56
- 0
protocol/connections/peerapp.go View File

@@ -0,0 +1,56 @@
package connections

import (
"cwtch.im/tapir"
"cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
)

// PeerApp encapsulates the behaviour of a Cwtch Peer
type PeerApp struct {
applications.AuthApp
connection *tapir.Connection
MessageHandler func(string, []byte)
OnAuth func(string)
OnClose func(string)
}

// NewInstance should always return a new instantiation of the application.
func (pa PeerApp) NewInstance() tapir.Application {
newApp := new(PeerApp)
newApp.MessageHandler = pa.MessageHandler
newApp.OnAuth = pa.OnAuth
newApp.OnClose = pa.OnClose
return newApp
}

// Init is run when the connection is first started.
func (pa *PeerApp) Init(connection *tapir.Connection) {
// First run the Authentication App
pa.AuthApp.Init(connection)

if connection.HasCapability(applications.AuthCapability) {
pa.connection = connection
pa.OnAuth(connection.Hostname)
go pa.listen()
} else {
pa.OnClose(connection.Hostname)
}
}

func (pa PeerApp) listen() {
for {
message := pa.connection.Expect()
if len(message) == 0 {
log.Errorf("0 byte read, socket has likely failed. Closing the listen goroutine")
return
}
pa.MessageHandler(pa.connection.Hostname, message)
}
}

// SendMessage sends the peer a preformatted message
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
func (pa PeerApp) SendMessage(message []byte) {
pa.connection.Send(message)
}

+ 0
- 123
protocol/connections/peerpeerconnection.go View File

@@ -1,123 +0,0 @@
package connections

import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections/peer"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"time"
)

// PeerPeerConnection encapsulates a single outgoing cwtchPeer->cwtchPeer connection
type PeerPeerConnection struct {
connection.AutoConnectionHandler
PeerHostname string
state ConnectionState
connection *connection.Connection
protocolEngine Engine
}

// NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeerConnection {
ppc := new(PeerPeerConnection)
ppc.PeerHostname = peerhostname
ppc.protocolEngine = protocolEngine
ppc.Init()
return ppc
}

func (ppc *PeerPeerConnection) setState(state ConnectionState) {
ppc.state = state
ppc.protocolEngine.EventManager().Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(ppc.PeerHostname),
event.ConnectionState: ConnectionStateName[state],
}))
}

// GetState returns the current connection state
func (ppc *PeerPeerConnection) GetState() ConnectionState {
return ppc.state
}

// SendPacket sends data packets on the optional data channel
func (ppc *PeerPeerConnection) SendPacket(data []byte) error {
ppc.WaitTilAuthenticated()
return ppc.connection.Do(func() error {
channel := ppc.connection.Channel("im.cwtch.peer.data", channels.Outbound)
if channel != nil {
peerchannel, ok := channel.Handler.(*peer.CwtchPeerDataChannel)
if ok {
log.Debugf("Sending packet\n")
peerchannel.SendMessage(data)
return nil
}
}
return errors.New("failed to send packet to peer")
})
}

// SendGroupInvite sends the given serialized invite packet to the Peer
func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
ppc.WaitTilAuthenticated()
ppc.connection.Do(func() error {
channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound)
if channel != nil {
peerchannel, ok := channel.Handler.(*peer.CwtchPeerChannel)
if ok {
log.Debugf("Sending group invite packet\n")
peerchannel.SendMessage(invite)
}
}
return nil
})
}

// WaitTilAuthenticated waits until the underlying connection is authenticated
func (ppc *PeerPeerConnection) WaitTilAuthenticated() {
for {
if ppc.GetState() == AUTHENTICATED {
break
}
time.Sleep(time.Second * 1)
}
}

// Run manages the setup and teardown of a peer->peer connection
func (ppc *PeerPeerConnection) Run() error {
ppc.setState(CONNECTING)
rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname)
if err == nil {
ppc.connection = rc
ppc.setState(CONNECTED)
_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity())
if err == nil {
ppc.setState(AUTHENTICATED)
go func() {
ppc.connection.Do(func() error {
ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
return nil
})

ppc.connection.Do(func() error {
ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
return nil
})
}()

ppc.connection.Process(ppc)
}
}
ppc.setState(FAILED)
return err
}

// Close closes the connection
func (ppc *PeerPeerConnection) Close() {
ppc.setState(KILLED)
if ppc.connection != nil {
ppc.connection.Close()
}
}

+ 0
- 90
protocol/connections/peerpeerconnection_test.go View File

@@ -1,90 +0,0 @@
package connections

import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/peer"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"golang.org/x/crypto/ed25519"
"net"
"testing"
"time"
)

func PeerAuthValid(hostname string, key ed25519.PublicKey) (allowed, known bool) {
return true, true
}

func runtestpeer(t *testing.T, tp *TestPeer, identity identity.Identity, listenChan chan bool) {
ln, _ := net.Listen("tcp", "127.0.0.1:5452")
listenChan <- true
conn, _ := ln.Accept()
defer conn.Close()

rc, err := goricochet.NegotiateVersionInbound(conn)
if err != nil {
t.Errorf("Negotiate Version Error: %v", err)
}
err = connection.HandleInboundConnection(rc).ProcessAuthAsV3Server(identity, PeerAuthValid)
if err != nil {
t.Errorf("ServerAuth Error: %v", err)
}
tp.RegisterChannelHandler("im.cwtch.peer", func() channels.Handler {
cpc := new(peer.CwtchPeerChannel)
cpc.Handler = tp
return cpc
})

rc.Process(tp)
}

type TestPeer struct {
connection.AutoConnectionHandler
ReceivedIdentityPacket bool
ReceivedGroupInvite bool
}

func (tp *TestPeer) HandleGroupInvite(gci *protocol.GroupChatInvite) {
tp.ReceivedGroupInvite = true
}

func TestPeerPeerConnection(t *testing.T) {
pub, priv, _ := ed25519.GenerateKey(rand.Reader)
identity := identity.InitializeV3("", &priv, &pub)

profile := model.GenerateNewProfile("alice")
hostname := identity.Hostname()
manager := event.NewEventManager()
engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil)
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, engine)

tp := new(TestPeer)
tp.Init()
listenChan := make(chan bool)
go runtestpeer(t, tp, identity, listenChan)
<-listenChan
state := ppc.GetState()
if state != DISCONNECTED {
fmt.Println("ERROR state should be disconnected")
t.Errorf("new connections should start in disconnected state")
}
go ppc.Run()
time.Sleep(time.Second * 5)
state = ppc.GetState()
if state != AUTHENTICATED {
t.Errorf("connection state should be authenticated(3), was instead %v", state)
}
_, invite, _ := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
ppc.SendGroupInvite(invite)
time.Sleep(time.Second * 3)
if tp.ReceivedGroupInvite == false {
t.Errorf("should have received an group invite packet")
}
}

Loading…
Cancel
Save