From 912894b55f895f9f9e0362e39d734964ce7b3fe9 Mon Sep 17 00:00:00 2001 From: erinn Date: Sat, 27 Oct 2018 01:49:30 -0700 Subject: [PATCH 1/2] support arbitrary channel type handlers in cwtch peer --- peer/connections/connectionsmanager.go | 7 ++++--- peer/connections/peerpeerconnection.go | 24 +++++++++++++++++++++++- peer/cwtch_peer.go | 20 ++++++++++++++++++-- 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/peer/connections/connectionsmanager.go b/peer/connections/connectionsmanager.go index 8074a7e..2602181 100644 --- a/peer/connections/connectionsmanager.go +++ b/peer/connections/connectionsmanager.go @@ -5,7 +5,8 @@ import ( "cwtch.im/cwtch/protocol" "sync" "time" -) + "git.openprivacy.ca/openprivacy/libricochet-go/application" + ) // Manager encapsulates all the logic necessary to manage outgoing peer and server connections. type Manager struct { @@ -25,13 +26,13 @@ func NewConnectionsManager() *Manager { } // ManagePeerConnection creates a new PeerConnection for the given Host and Profile. -func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection { +func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection { m.lock.Lock() defer m.lock.Unlock() _, exists := m.peerConnections[host] if !exists { - ppc := NewPeerPeerConnection(host, profile, dataHandler) + ppc := NewPeerPeerConnection(host, profile, dataHandler, aif) go ppc.Run() m.peerConnections[host] = ppc return ppc diff --git a/peer/connections/peerpeerconnection.go b/peer/connections/peerpeerconnection.go index 8368fd3..29b08a7 100644 --- a/peer/connections/peerpeerconnection.go +++ b/peer/connections/peerpeerconnection.go @@ -10,6 +10,7 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/identity" "log" "time" + "git.openprivacy.ca/openprivacy/libricochet-go/application" ) // PeerPeerConnection encapsulates a single outgoing Peer->Peer connection @@ -20,14 +21,16 @@ type PeerPeerConnection struct { connection *connection.Connection profile *model.Profile dataHandler func(string, []byte) []byte + aif application.ApplicationInstanceFactory } // NewPeerPeerConnection creates a new peer connection for the given hostname and profile. -func NewPeerPeerConnection(peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection { +func NewPeerPeerConnection(peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection { ppc := new(PeerPeerConnection) ppc.PeerHostname = peerhostname ppc.profile = profile ppc.dataHandler = dataHandler + ppc.aif = aif ppc.Init() return ppc } @@ -73,6 +76,17 @@ func (ppc *PeerPeerConnection) SendPacket(data []byte) { }) } +func (ppc *PeerPeerConnection) DoOnChannel(ctype string, direction channels.Direction, doSomethingWith func(channel *channels.Channel)) { + ppc.WaitTilAuthenticated() + ppc.connection.Do(func() error { + channel := ppc.connection.Channel(ctype, direction) + if channel != nil { + doSomethingWith(channel) + } + return nil + }) +} + // SendGroupInvite sends the given serialized invite packet to the Peer func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) { ppc.WaitTilAuthenticated() @@ -123,6 +137,14 @@ func (ppc *PeerPeerConnection) Run() error { }) } + handlers := ppc.aif.GetHandlers() + for i := range handlers { + ppc.connection.Do(func() error { + ppc.connection.RequestOpenChannel(handlers[i], ppc.aif.GetHandler(handlers[i])(ppc.aif.GetApplicationInstance(ppc.connection))()) + return nil + }) + } + time.Sleep(time.Second * 1) ppc.connection.Do(func() error { channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 9a8ad47..07ff5d6 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -41,6 +41,8 @@ type cwtchPeer struct { key [32]byte salt [128]byte dataHandler func(string, []byte) []byte + //handlers map[string]func(*application.ApplicationInstance) func() channels.Handler + aif application.ApplicationInstanceFactory } // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to @@ -74,6 +76,7 @@ type CwtchPeer interface { GetContacts() []string GetContact(string) *model.PublicProfile + SetApplicationInstanceFactory(factory application.ApplicationInstanceFactory) SetPeerDataHandler(func(string, []byte) []byte) Listen() error @@ -232,11 +235,19 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err return } -// ExportGroup serializes a group invite so it can be given offline +// a handler for the optional data handler +// note that the "correct" way to do this would be to AddChannelHandler("im.cwtch.peerdata", ...") but peerdata is such +// a handy channel that it's nice to have this convenience shortcut func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) { cp.dataHandler = dataHandler } +// add extra channel handlers (note that the peer will merge these with the ones necessary to make cwtch work, so you +// are not clobbering the underlying functionality) +func (cp *cwtchPeer) SetApplicationInstanceFactory(aif application.ApplicationInstanceFactory) { + cp.aif = aif +} + // ExportGroup serializes a group invite so it can be given offline func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { group := cp.Profile.GetGroupByGroupID(groupID) @@ -289,7 +300,7 @@ func (cp *cwtchPeer) GetProfile() *model.Profile { // PeerWithOnion is the entry point for cwtchPeer relationships func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { - return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler) + return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler, cp.aif) } // InviteOnionToGroup kicks off the invite process @@ -434,6 +445,11 @@ func (cp *cwtchPeer) Listen() error { }) } + handlers := cp.aif.GetHandlers() + for i := range handlers { + af.AddHandler(handlers[i], cp.aif.GetHandler(handlers[i])) + } + cwtchpeer.InitV3(cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp) log.Printf("Running cwtch peer on %v", l.Addr().String()) cp.app = cwtchpeer From ce7d57a1a5e42d22449441ff36b11ec15a9f11af Mon Sep 17 00:00:00 2001 From: erinn Date: Mon, 29 Oct 2018 12:19:30 -0700 Subject: [PATCH 2/2] fix tests --- peer/connections/connectionsmanager.go | 4 ++-- peer/connections/peerpeerconnection.go | 3 ++- peer/connections/peerpeerconnection_test.go | 3 ++- peer/cwtch_peer.go | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/peer/connections/connectionsmanager.go b/peer/connections/connectionsmanager.go index 2602181..d9e2e15 100644 --- a/peer/connections/connectionsmanager.go +++ b/peer/connections/connectionsmanager.go @@ -3,10 +3,10 @@ package connections import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol" + "git.openprivacy.ca/openprivacy/libricochet-go/application" "sync" "time" - "git.openprivacy.ca/openprivacy/libricochet-go/application" - ) +) // Manager encapsulates all the logic necessary to manage outgoing peer and server connections. type Manager struct { diff --git a/peer/connections/peerpeerconnection.go b/peer/connections/peerpeerconnection.go index 29b08a7..aac76d1 100644 --- a/peer/connections/peerpeerconnection.go +++ b/peer/connections/peerpeerconnection.go @@ -5,12 +5,12 @@ import ( "cwtch.im/cwtch/peer/peer" "cwtch.im/cwtch/protocol" "git.openprivacy.ca/openprivacy/libricochet-go" + "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connection" "git.openprivacy.ca/openprivacy/libricochet-go/identity" "log" "time" - "git.openprivacy.ca/openprivacy/libricochet-go/application" ) // PeerPeerConnection encapsulates a single outgoing Peer->Peer connection @@ -76,6 +76,7 @@ func (ppc *PeerPeerConnection) SendPacket(data []byte) { }) } +// DoOnChannel performs an operation on the requested channel func (ppc *PeerPeerConnection) DoOnChannel(ctype string, direction channels.Direction, doSomethingWith func(channel *channels.Channel)) { ppc.WaitTilAuthenticated() ppc.connection.Do(func() error { diff --git a/peer/connections/peerpeerconnection_test.go b/peer/connections/peerpeerconnection_test.go index eab486c..8106648 100644 --- a/peer/connections/peerpeerconnection_test.go +++ b/peer/connections/peerpeerconnection_test.go @@ -6,6 +6,7 @@ import ( "cwtch.im/cwtch/peer/peer" "cwtch.im/cwtch/protocol" "git.openprivacy.ca/openprivacy/libricochet-go" + "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connection" "git.openprivacy.ca/openprivacy/libricochet-go/identity" @@ -81,7 +82,7 @@ func TestPeerPeerConnection(t *testing.T) { profile := model.GenerateNewProfile("alice") hostname := identity.Hostname() - ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, profile, nil) + ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, profile, nil, application.ApplicationInstanceFactory{}) tp := new(TestPeer) tp.Init() diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 07ff5d6..bfd15fd 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -42,7 +42,7 @@ type cwtchPeer struct { salt [128]byte dataHandler func(string, []byte) []byte //handlers map[string]func(*application.ApplicationInstance) func() channels.Handler - aif application.ApplicationInstanceFactory + aif application.ApplicationInstanceFactory } // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to