From 6620875b61bc93c70d5a3320b1deba59dd6636ba Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 3 Oct 2018 21:35:30 -0700 Subject: [PATCH 1/2] Shortcut for Primary Identity --- app/app.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/app/app.go b/app/app.go index 054bd40..68332a2 100644 --- a/app/app.go +++ b/app/app.go @@ -19,6 +19,7 @@ type application struct { torManager *tor.Manager directory string mutex sync.Mutex + primaryonion string } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers @@ -26,6 +27,7 @@ type Application interface { LoadProfiles(password string) error CreatePeer(name string, password string) (peer.CwtchPeer, error) + PrimaryIdentity() peer.CwtchPeer GetPeer(onion string) peer.CwtchPeer ListPeers() map[string]string @@ -100,6 +102,9 @@ func (app *application) LoadProfiles(password string) error { app.startPeer(p) app.mutex.Lock() app.peers[p.GetProfile().Onion] = p + if app.primaryonion == "" { + app.primaryonion = p.GetProfile().Onion + } app.mutex.Unlock() } return nil @@ -148,6 +153,11 @@ func (app *application) ListPeers() map[string]string { return keys } +// PrimaryIdentity returns a Peer for a given onion address +func (app *application) PrimaryIdentity() peer.CwtchPeer { + return app.peers[app.primaryonion] +} + // GetPeer returns a Peer for a given onion address func (app *application) GetPeer(onion string) peer.CwtchPeer { if peer, ok := app.peers[onion]; ok { From 8ab4752b448d6e91c364ebae06a8cde5e6f60a84 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Thu, 4 Oct 2018 12:15:03 -0700 Subject: [PATCH 2/2] Adding Cwtch Peer Data Channel --- app/app.go | 8 +- app/peer/alice/alice.go | 19 ++++ app/peer/bob/bob.go | 27 +++++ peer/connections/connectionsmanager.go | 9 +- peer/connections/peerpeerconnection.go | 43 +++++++- peer/connections/peerpeerconnection_test.go | 2 +- peer/cwtch_peer.go | 43 +++++++- peer/peer/peer_data_channel.go | 98 +++++++++++++++++++ .../cwtch_peer_server_intergration_test.go | 2 +- 9 files changed, 235 insertions(+), 16 deletions(-) create mode 100644 app/peer/alice/alice.go create mode 100644 app/peer/bob/bob.go create mode 100644 peer/peer/peer_data_channel.go diff --git a/app/app.go b/app/app.go index 68332a2..f366891 100644 --- a/app/app.go +++ b/app/app.go @@ -15,10 +15,10 @@ import ( ) type application struct { - peers map[string]peer.CwtchPeer - torManager *tor.Manager - directory string - mutex sync.Mutex + peers map[string]peer.CwtchPeer + torManager *tor.Manager + directory string + mutex sync.Mutex primaryonion string } diff --git a/app/peer/alice/alice.go b/app/peer/alice/alice.go new file mode 100644 index 0000000..f0e678c --- /dev/null +++ b/app/peer/alice/alice.go @@ -0,0 +1,19 @@ +package main + +import ( + "cwtch.im/cwtch/peer" + "log" +) + +func main() { + alice, _ := peer.NewCwtchPeer("alice", "password", ".") + + processData := func(onion string, data []byte) []byte { + log.Printf("Recieved %s from %v", data, onion) + return data + } + + + alice.SetPeerDataHandler(processData) + alice.Listen() +} diff --git a/app/peer/bob/bob.go b/app/peer/bob/bob.go new file mode 100644 index 0000000..a1e8b53 --- /dev/null +++ b/app/peer/bob/bob.go @@ -0,0 +1,27 @@ +package main + +import ( + "cwtch.im/cwtch/peer" + "log" + "strconv" + "time" +) + +func main() { + bob, _ := peer.NewCwtchPeer("bob", "password", ".") + counter := 1 + + bob.SetPeerDataHandler(func(onion string, data []byte) []byte { + log.Printf("Recieved %s from %v", data, onion) + counter++ + return []byte(strconv.Itoa(counter)) + }) + connection := bob.PeerWithOnion("qtpnmnth767gjmpv") + + log.Printf("Waiting for Bob to Connect to Alice...") + connection.SendPacket([]byte("Hello Alice!!!")) + + // Wait a while... + time.Sleep(time.Second * 100) + +} diff --git a/peer/connections/connectionsmanager.go b/peer/connections/connectionsmanager.go index a4438ad..8074a7e 100644 --- a/peer/connections/connectionsmanager.go +++ b/peer/connections/connectionsmanager.go @@ -25,17 +25,18 @@ func NewConnectionsManager() *Manager { } // ManagePeerConnection creates a new PeerConnection for the given Host and Profile. -func (m *Manager) ManagePeerConnection(host string, profile *model.Profile) { +func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection { m.lock.Lock() + defer m.lock.Unlock() _, exists := m.peerConnections[host] if !exists { - ppc := NewPeerPeerConnection(host, profile) + ppc := NewPeerPeerConnection(host, profile, dataHandler) go ppc.Run() m.peerConnections[host] = ppc + return ppc } - m.lock.Unlock() - + return m.peerConnections[host] } // ManageServerConnection creates a new ServerConnection for Host with the given callback handler. diff --git a/peer/connections/peerpeerconnection.go b/peer/connections/peerpeerconnection.go index 5515137..21f3cb2 100644 --- a/peer/connections/peerpeerconnection.go +++ b/peer/connections/peerpeerconnection.go @@ -19,13 +19,15 @@ type PeerPeerConnection struct { state ConnectionState connection *connection.Connection profile *model.Profile + dataHandler func(string, []byte) []byte } // NewPeerPeerConnection creates a new peer connection for the given hostname and profile. -func NewPeerPeerConnection(peerhostname string, profile *model.Profile) *PeerPeerConnection { +func NewPeerPeerConnection(peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection { ppc := new(PeerPeerConnection) ppc.PeerHostname = peerhostname ppc.profile = profile + ppc.dataHandler = dataHandler ppc.Init() return ppc } @@ -50,8 +52,30 @@ func (ppc *PeerPeerConnection) GetClientIdentityPacket() []byte { return nil } +// HandlePacket handles data packets on the optional data channel +func (ppc *PeerPeerConnection) HandlePacket(data []byte) []byte { + return ppc.dataHandler(ppc.PeerHostname, data) +} + +// SendPacket sends data packets on the optional data channel +func (ppc *PeerPeerConnection) SendPacket(data []byte) { + ppc.WaitTilAuthenticated() + ppc.connection.Do(func() error { + channel := ppc.connection.Channel("im.cwtch.peer.data", channels.Outbound) + if channel != nil { + peerchannel, ok := channel.Handler.(*peer.CwtchPeerDataChannel) + if ok { + log.Printf("Sending packet\n") + peerchannel.SendMessage(data) + } + } + return nil + }) +} + // SendGroupInvite sends the given serialized invite packet to the Peer func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) { + ppc.WaitTilAuthenticated() ppc.connection.Do(func() error { channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound) if channel != nil { @@ -65,6 +89,16 @@ func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) { }) } +// WaitTilAuthenticated waits until the underlying connection is authenticated +func (ppc *PeerPeerConnection) WaitTilAuthenticated() { + for { + if ppc.GetState() == AUTHENTICATED { + break + } + time.Sleep(time.Second * 5) + } +} + // Run manages the setup and teardown of a peer->peer connection func (ppc *PeerPeerConnection) Run() error { ppc.state = CONNECTING @@ -82,6 +116,13 @@ func (ppc *PeerPeerConnection) Run() error { return nil }) + if ppc.dataHandler != nil { + ppc.connection.Do(func() error { + ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc}) + return nil + }) + } + time.Sleep(time.Second * 1) ppc.connection.Do(func() error { channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound) diff --git a/peer/connections/peerpeerconnection_test.go b/peer/connections/peerpeerconnection_test.go index 73ede7d..50925c6 100644 --- a/peer/connections/peerpeerconnection_test.go +++ b/peer/connections/peerpeerconnection_test.go @@ -86,7 +86,7 @@ func TestPeerPeerConnection(t *testing.T) { } profile := model.GenerateNewProfile("alice") - ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile) + ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile, nil) //numcalls := 0 tp := new(TestPeer) tp.Init() diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 48bcdaa..c1f127a 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -38,13 +38,14 @@ type cwtchPeer struct { profilefile string key [32]byte salt [128]byte + dataHandler func(string, []byte) []byte } // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to // directly implement a cwtchPeer. type CwtchPeer interface { Save() error - PeerWithOnion(string) + PeerWithOnion(string) *connections.PeerPeerConnection InviteOnionToGroup(string, string) error TrustPeer(string) error @@ -70,6 +71,8 @@ type CwtchPeer interface { GetContacts() []string GetContact(string) *model.PublicProfile + SetPeerDataHandler(func(string, []byte) []byte) + Listen() error Shutdown() } @@ -228,6 +231,11 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err return } +// ExportGroup serializes a group invite so it can be given offline +func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) { + cp.dataHandler = dataHandler +} + // ExportGroup serializes a group invite so it can be given offline func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { group := cp.Profile.GetGroupByGroupID(groupID) @@ -279,8 +287,8 @@ func (cp *cwtchPeer) GetProfile() *model.Profile { } // PeerWithOnion is the entry point for cwtchPeer relationships -func (cp *cwtchPeer) PeerWithOnion(onion string) { - cp.connectionsManager.ManagePeerConnection(onion, cp.Profile) +func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { + return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler) } // InviteOnionToGroup kicks off the invite process @@ -382,6 +390,12 @@ func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (al return !blocked, true } +// LookupContact returns that a contact is known and allowed to communicate for all cases. +func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) { + blocked := cp.Profile.IsBlocked(hostname) + return !blocked, true +} + // ContactRequest needed to implement ContactRequestHandler Interface func (cp *cwtchPeer) ContactRequest(name string, message string) string { return "Accepted" @@ -407,6 +421,19 @@ func (cp *cwtchPeer) Listen() error { return cpc } }) + + if cp.dataHandler != nil { + af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler { + cpi := new(CwtchPeerInstance) + cpi.Init(rai, cwtchpeer) + return func() channels.Handler { + cpc := new(peer.CwtchPeerDataChannel) + cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler} + return cpc + } + }) + } + cwtchpeer.Init(cp.Profile.Name, cp.Profile.OnionPrivateKey, af, cp) log.Printf("Running cwtch peer on %v", l.Addr().String()) cp.app = cwtchpeer @@ -435,8 +462,9 @@ func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *app // CwtchPeerHandler encapsulates handling of incoming CwtchPackets type CwtchPeerHandler struct { - Onion string - Peer *cwtchPeer + Onion string + Peer *cwtchPeer + DataHandler func(string, []byte) []byte } // ClientIdentity handles incoming ClientIdentity packets @@ -456,3 +484,8 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { func (cph *CwtchPeerHandler) GetClientIdentityPacket() []byte { return cph.Peer.Profile.GetCwtchIdentityPacket() } + +// HandlePacket handles the Cwtch Peer Data Channel +func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte { + return cph.DataHandler(cph.Onion, data) +} diff --git a/peer/peer/peer_data_channel.go b/peer/peer/peer_data_channel.go new file mode 100644 index 0000000..a4452e9 --- /dev/null +++ b/peer/peer/peer_data_channel.go @@ -0,0 +1,98 @@ +package peer + +import ( + "git.openprivacy.ca/openprivacy/libricochet-go/channels" + "git.openprivacy.ca/openprivacy/libricochet-go/utils" + "git.openprivacy.ca/openprivacy/libricochet-go/wire/control" +) + +// CwtchPeerDataChannel implements the ChannelHandler interface for a channel of +// type "im.cwtch.peer.data The channel may be inbound or outbound. +// +// CwtchPeerChannel implements protocol-level sanity and state validation, but +// does not handle or acknowledge Cwtch messages. The application must provide +// a CwtchPeerChannelHandler implementation to handle Cwtch events. +type CwtchPeerDataChannel struct { + // Methods of Handler are called for Cwtch events on this channel + Handler CwtchPeerDataHandler + channel *channels.Channel +} + +// CwtchPeerDataHandler is implemented by an application type to receive +// events from a CwtchPeerChannel. +type CwtchPeerDataHandler interface { + HandlePacket([]byte) []byte +} + +// SendMessage sends a raw message on this channel +func (cpc *CwtchPeerDataChannel) SendMessage(data []byte) { + cpc.channel.SendMessage(data) +} + +// Type returns the type string for this channel, e.g. "im.ricochet.Cwtch". +func (cpc *CwtchPeerDataChannel) Type() string { + return "im.cwtch.peer.data" +} + +// Closed is called when the channel is closed for any reason. +func (cpc *CwtchPeerDataChannel) Closed(err error) { + +} + +// OnlyClientCanOpen - for Cwtch channels any side can open +func (cpc *CwtchPeerDataChannel) OnlyClientCanOpen() bool { + return false +} + +// Singleton - for Cwtch channels there can only be one instance per direction +func (cpc *CwtchPeerDataChannel) Singleton() bool { + return true +} + +// Bidirectional - for Cwtch channels are bidirectional +func (cpc *CwtchPeerDataChannel) Bidirectional() bool { + return true +} + +// RequiresAuthentication - Cwtch channels require hidden service auth +func (cpc *CwtchPeerDataChannel) RequiresAuthentication() string { + return "im.ricochet.auth.hidden-service" +} + +// OpenInbound is the first method called for an inbound channel request. +// If an error is returned, the channel is rejected. If a RawMessage is +// returned, it will be sent as the ChannelResult message. +func (cpc *CwtchPeerDataChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) { + cpc.channel = channel + messageBuilder := new(utils.MessageBuilder) + return messageBuilder.AckOpenChannel(channel.ID), nil +} + +// OpenOutbound is the first method called for an outbound channel request. +// If an error is returned, the channel is not opened. If a RawMessage is +// returned, it will be sent as the OpenChannel message. +func (cpc *CwtchPeerDataChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) { + cpc.channel = channel + messageBuilder := new(utils.MessageBuilder) + return messageBuilder.OpenChannel(channel.ID, cpc.Type()), nil +} + +// OpenOutboundResult is called when a response is received for an +// outbound OpenChannel request. If `err` is non-nil, the channel was +// rejected and Closed will be called immediately afterwards. `raw` +// contains the raw protocol message including any extension data. +func (cpc *CwtchPeerDataChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) { + if err == nil { + if crm.GetOpened() { + cpc.channel.Pending = false + } + } +} + +// Packet is called for each raw packet received on this channel. +func (cpc *CwtchPeerDataChannel) Packet(data []byte) { + ret := cpc.Handler.HandlePacket(data) + if len(ret) >= 0 { + cpc.channel.SendMessage(ret) + } +} diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index 2c14ebf..5e3e642 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -182,7 +182,7 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.JoinServer(serverAddr) fmt.Println("Waiting for peerings and server joins...") - time.Sleep(time.Second * 120) + time.Sleep(time.Second * 240) fmt.Println("Alice inviting Bob to group...") err = alice.InviteOnionToGroup(bob.GetProfile().Onion, groupID)