support arbitrary channel type handlers in cwtch peer
This commit is contained in:
parent
22c8941282
commit
912894b55f
|
@ -5,7 +5,8 @@ import (
|
||||||
"cwtch.im/cwtch/protocol"
|
"cwtch.im/cwtch/protocol"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
||||||
|
)
|
||||||
|
|
||||||
// Manager encapsulates all the logic necessary to manage outgoing peer and server connections.
|
// Manager encapsulates all the logic necessary to manage outgoing peer and server connections.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
|
@ -25,13 +26,13 @@ func NewConnectionsManager() *Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
|
// ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
|
||||||
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection {
|
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
_, exists := m.peerConnections[host]
|
_, exists := m.peerConnections[host]
|
||||||
if !exists {
|
if !exists {
|
||||||
ppc := NewPeerPeerConnection(host, profile, dataHandler)
|
ppc := NewPeerPeerConnection(host, profile, dataHandler, aif)
|
||||||
go ppc.Run()
|
go ppc.Run()
|
||||||
m.peerConnections[host] = ppc
|
m.peerConnections[host] = ppc
|
||||||
return ppc
|
return ppc
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
|
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerPeerConnection encapsulates a single outgoing Peer->Peer connection
|
// PeerPeerConnection encapsulates a single outgoing Peer->Peer connection
|
||||||
|
@ -20,14 +21,16 @@ type PeerPeerConnection struct {
|
||||||
connection *connection.Connection
|
connection *connection.Connection
|
||||||
profile *model.Profile
|
profile *model.Profile
|
||||||
dataHandler func(string, []byte) []byte
|
dataHandler func(string, []byte) []byte
|
||||||
|
aif application.ApplicationInstanceFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
|
// NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
|
||||||
func NewPeerPeerConnection(peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection {
|
func NewPeerPeerConnection(peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection {
|
||||||
ppc := new(PeerPeerConnection)
|
ppc := new(PeerPeerConnection)
|
||||||
ppc.PeerHostname = peerhostname
|
ppc.PeerHostname = peerhostname
|
||||||
ppc.profile = profile
|
ppc.profile = profile
|
||||||
ppc.dataHandler = dataHandler
|
ppc.dataHandler = dataHandler
|
||||||
|
ppc.aif = aif
|
||||||
ppc.Init()
|
ppc.Init()
|
||||||
return ppc
|
return ppc
|
||||||
}
|
}
|
||||||
|
@ -73,6 +76,17 @@ func (ppc *PeerPeerConnection) SendPacket(data []byte) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ppc *PeerPeerConnection) DoOnChannel(ctype string, direction channels.Direction, doSomethingWith func(channel *channels.Channel)) {
|
||||||
|
ppc.WaitTilAuthenticated()
|
||||||
|
ppc.connection.Do(func() error {
|
||||||
|
channel := ppc.connection.Channel(ctype, direction)
|
||||||
|
if channel != nil {
|
||||||
|
doSomethingWith(channel)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// SendGroupInvite sends the given serialized invite packet to the Peer
|
// SendGroupInvite sends the given serialized invite packet to the Peer
|
||||||
func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
|
func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
|
||||||
ppc.WaitTilAuthenticated()
|
ppc.WaitTilAuthenticated()
|
||||||
|
@ -123,6 +137,14 @@ func (ppc *PeerPeerConnection) Run() error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handlers := ppc.aif.GetHandlers()
|
||||||
|
for i := range handlers {
|
||||||
|
ppc.connection.Do(func() error {
|
||||||
|
ppc.connection.RequestOpenChannel(handlers[i], ppc.aif.GetHandler(handlers[i])(ppc.aif.GetApplicationInstance(ppc.connection))())
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(time.Second * 1)
|
time.Sleep(time.Second * 1)
|
||||||
ppc.connection.Do(func() error {
|
ppc.connection.Do(func() error {
|
||||||
channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound)
|
channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound)
|
||||||
|
|
|
@ -41,6 +41,8 @@ type cwtchPeer struct {
|
||||||
key [32]byte
|
key [32]byte
|
||||||
salt [128]byte
|
salt [128]byte
|
||||||
dataHandler func(string, []byte) []byte
|
dataHandler func(string, []byte) []byte
|
||||||
|
//handlers map[string]func(*application.ApplicationInstance) func() channels.Handler
|
||||||
|
aif application.ApplicationInstanceFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
|
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
|
||||||
|
@ -74,6 +76,7 @@ type CwtchPeer interface {
|
||||||
GetContacts() []string
|
GetContacts() []string
|
||||||
GetContact(string) *model.PublicProfile
|
GetContact(string) *model.PublicProfile
|
||||||
|
|
||||||
|
SetApplicationInstanceFactory(factory application.ApplicationInstanceFactory)
|
||||||
SetPeerDataHandler(func(string, []byte) []byte)
|
SetPeerDataHandler(func(string, []byte) []byte)
|
||||||
|
|
||||||
Listen() error
|
Listen() error
|
||||||
|
@ -232,11 +235,19 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExportGroup serializes a group invite so it can be given offline
|
// a handler for the optional data handler
|
||||||
|
// note that the "correct" way to do this would be to AddChannelHandler("im.cwtch.peerdata", ...") but peerdata is such
|
||||||
|
// a handy channel that it's nice to have this convenience shortcut
|
||||||
func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
|
func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
|
||||||
cp.dataHandler = dataHandler
|
cp.dataHandler = dataHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add extra channel handlers (note that the peer will merge these with the ones necessary to make cwtch work, so you
|
||||||
|
// are not clobbering the underlying functionality)
|
||||||
|
func (cp *cwtchPeer) SetApplicationInstanceFactory(aif application.ApplicationInstanceFactory) {
|
||||||
|
cp.aif = aif
|
||||||
|
}
|
||||||
|
|
||||||
// ExportGroup serializes a group invite so it can be given offline
|
// ExportGroup serializes a group invite so it can be given offline
|
||||||
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
|
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
|
||||||
group := cp.Profile.GetGroupByGroupID(groupID)
|
group := cp.Profile.GetGroupByGroupID(groupID)
|
||||||
|
@ -289,7 +300,7 @@ func (cp *cwtchPeer) GetProfile() *model.Profile {
|
||||||
|
|
||||||
// PeerWithOnion is the entry point for cwtchPeer relationships
|
// PeerWithOnion is the entry point for cwtchPeer relationships
|
||||||
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
|
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
|
||||||
return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler)
|
return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler, cp.aif)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InviteOnionToGroup kicks off the invite process
|
// InviteOnionToGroup kicks off the invite process
|
||||||
|
@ -434,6 +445,11 @@ func (cp *cwtchPeer) Listen() error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handlers := cp.aif.GetHandlers()
|
||||||
|
for i := range handlers {
|
||||||
|
af.AddHandler(handlers[i], cp.aif.GetHandler(handlers[i]))
|
||||||
|
}
|
||||||
|
|
||||||
cwtchpeer.InitV3(cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp)
|
cwtchpeer.InitV3(cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp)
|
||||||
log.Printf("Running cwtch peer on %v", l.Addr().String())
|
log.Printf("Running cwtch peer on %v", l.Addr().String())
|
||||||
cp.app = cwtchpeer
|
cp.app = cwtchpeer
|
||||||
|
|
Loading…
Reference in New Issue