Add Optional Raw Data Channels Between Cwtch Peers #131
|
@ -15,10 +15,10 @@ import (
|
|||
)
|
||||
|
||||
type application struct {
|
||||
peers map[string]peer.CwtchPeer
|
||||
torManager *tor.Manager
|
||||
directory string
|
||||
mutex sync.Mutex
|
||||
peers map[string]peer.CwtchPeer
|
||||
torManager *tor.Manager
|
||||
directory string
|
||||
mutex sync.Mutex
|
||||
primaryonion string
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/peer"
|
||||
"log"
|
||||
)
|
||||
|
||||
func main() {
|
||||
alice, _ := peer.NewCwtchPeer("alice", "password", ".")
|
||||
|
||||
processData := func(onion string, data []byte) []byte {
|
||||
log.Printf("Recieved %s from %v", data, onion)
|
||||
return data
|
||||
}
|
||||
|
||||
|
||||
alice.SetPeerDataHandler(processData)
|
||||
alice.Listen()
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/peer"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
bob, _ := peer.NewCwtchPeer("bob", "password", ".")
|
||||
counter := 1
|
||||
|
||||
bob.SetPeerDataHandler(func(onion string, data []byte) []byte {
|
||||
log.Printf("Recieved %s from %v", data, onion)
|
||||
counter++
|
||||
return []byte(strconv.Itoa(counter))
|
||||
})
|
||||
connection := bob.PeerWithOnion("qtpnmnth767gjmpv")
|
||||
|
||||
log.Printf("Waiting for Bob to Connect to Alice...")
|
||||
connection.SendPacket([]byte("Hello Alice!!!"))
|
||||
|
||||
// Wait a while...
|
||||
time.Sleep(time.Second * 100)
|
||||
|
||||
}
|
|
@ -25,17 +25,18 @@ func NewConnectionsManager() *Manager {
|
|||
}
|
||||
|
||||
// ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
|
||||
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile) {
|
||||
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
_, exists := m.peerConnections[host]
|
||||
if !exists {
|
||||
ppc := NewPeerPeerConnection(host, profile)
|
||||
ppc := NewPeerPeerConnection(host, profile, dataHandler)
|
||||
go ppc.Run()
|
||||
m.peerConnections[host] = ppc
|
||||
return ppc
|
||||
}
|
||||
m.lock.Unlock()
|
||||
|
||||
return m.peerConnections[host]
|
||||
}
|
||||
|
||||
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
|
||||
|
|
|
@ -19,13 +19,15 @@ type PeerPeerConnection struct {
|
|||
state ConnectionState
|
||||
connection *connection.Connection
|
||||
profile *model.Profile
|
||||
dataHandler func(string, []byte) []byte
|
||||
}
|
||||
|
||||
// NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
|
||||
func NewPeerPeerConnection(peerhostname string, profile *model.Profile) *PeerPeerConnection {
|
||||
func NewPeerPeerConnection(peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection {
|
||||
ppc := new(PeerPeerConnection)
|
||||
ppc.PeerHostname = peerhostname
|
||||
ppc.profile = profile
|
||||
ppc.dataHandler = dataHandler
|
||||
ppc.Init()
|
||||
return ppc
|
||||
}
|
||||
|
@ -50,8 +52,30 @@ func (ppc *PeerPeerConnection) GetClientIdentityPacket() []byte {
|
|||
return nil
|
||||
}
|
||||
|
||||
// HandlePacket handles data packets on the optional data channel
|
||||
func (ppc *PeerPeerConnection) HandlePacket(data []byte) []byte {
|
||||
return ppc.dataHandler(ppc.PeerHostname, data)
|
||||
}
|
||||
|
||||
// SendPacket sends data packets on the optional data channel
|
||||
func (ppc *PeerPeerConnection) SendPacket(data []byte) {
|
||||
ppc.WaitTilAuthenticated()
|
||||
ppc.connection.Do(func() error {
|
||||
channel := ppc.connection.Channel("im.cwtch.peer.data", channels.Outbound)
|
||||
if channel != nil {
|
||||
peerchannel, ok := channel.Handler.(*peer.CwtchPeerDataChannel)
|
||||
if ok {
|
||||
log.Printf("Sending packet\n")
|
||||
peerchannel.SendMessage(data)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// SendGroupInvite sends the given serialized invite packet to the Peer
|
||||
func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
|
||||
ppc.WaitTilAuthenticated()
|
||||
ppc.connection.Do(func() error {
|
||||
channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound)
|
||||
if channel != nil {
|
||||
|
@ -65,6 +89,16 @@ func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
|
|||
})
|
||||
}
|
||||
|
||||
// WaitTilAuthenticated waits until the underlying connection is authenticated
|
||||
func (ppc *PeerPeerConnection) WaitTilAuthenticated() {
|
||||
for {
|
||||
if ppc.GetState() == AUTHENTICATED {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
}
|
||||
}
|
||||
|
||||
// Run manages the setup and teardown of a peer->peer connection
|
||||
func (ppc *PeerPeerConnection) Run() error {
|
||||
ppc.state = CONNECTING
|
||||
|
@ -82,6 +116,13 @@ func (ppc *PeerPeerConnection) Run() error {
|
|||
return nil
|
||||
})
|
||||
|
||||
if ppc.dataHandler != nil {
|
||||
ppc.connection.Do(func() error {
|
||||
ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 1)
|
||||
ppc.connection.Do(func() error {
|
||||
channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound)
|
||||
|
|
|
@ -86,7 +86,7 @@ func TestPeerPeerConnection(t *testing.T) {
|
|||
}
|
||||
|
||||
profile := model.GenerateNewProfile("alice")
|
||||
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile)
|
||||
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile, nil)
|
||||
//numcalls := 0
|
||||
tp := new(TestPeer)
|
||||
tp.Init()
|
||||
|
|
|
@ -38,13 +38,14 @@ type cwtchPeer struct {
|
|||
profilefile string
|
||||
key [32]byte
|
||||
salt [128]byte
|
||||
dataHandler func(string, []byte) []byte
|
||||
}
|
||||
|
||||
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
|
||||
// directly implement a cwtchPeer.
|
||||
type CwtchPeer interface {
|
||||
Save() error
|
||||
PeerWithOnion(string)
|
||||
PeerWithOnion(string) *connections.PeerPeerConnection
|
||||
InviteOnionToGroup(string, string) error
|
||||
|
||||
TrustPeer(string) error
|
||||
|
@ -70,6 +71,8 @@ type CwtchPeer interface {
|
|||
GetContacts() []string
|
||||
GetContact(string) *model.PublicProfile
|
||||
|
||||
SetPeerDataHandler(func(string, []byte) []byte)
|
||||
|
||||
Listen() error
|
||||
Shutdown()
|
||||
}
|
||||
|
@ -228,6 +231,11 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
|
|||
return
|
||||
}
|
||||
|
||||
// ExportGroup serializes a group invite so it can be given offline
|
||||
func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
|
||||
cp.dataHandler = dataHandler
|
||||
}
|
||||
|
||||
// ExportGroup serializes a group invite so it can be given offline
|
||||
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
|
||||
group := cp.Profile.GetGroupByGroupID(groupID)
|
||||
|
@ -279,8 +287,8 @@ func (cp *cwtchPeer) GetProfile() *model.Profile {
|
|||
}
|
||||
|
||||
// PeerWithOnion is the entry point for cwtchPeer relationships
|
||||
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
||||
cp.connectionsManager.ManagePeerConnection(onion, cp.Profile)
|
||||
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
|
||||
return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler)
|
||||
}
|
||||
|
||||
// InviteOnionToGroup kicks off the invite process
|
||||
|
@ -382,6 +390,12 @@ func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (al
|
|||
return !blocked, true
|
||||
}
|
||||
|
||||
// LookupContact returns that a contact is known and allowed to communicate for all cases.
|
||||
func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
|
||||
blocked := cp.Profile.IsBlocked(hostname)
|
||||
return !blocked, true
|
||||
}
|
||||
|
||||
// ContactRequest needed to implement ContactRequestHandler Interface
|
||||
func (cp *cwtchPeer) ContactRequest(name string, message string) string {
|
||||
return "Accepted"
|
||||
|
@ -407,6 +421,19 @@ func (cp *cwtchPeer) Listen() error {
|
|||
return cpc
|
||||
}
|
||||
})
|
||||
|
||||
if cp.dataHandler != nil {
|
||||
af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
|
||||
cpi := new(CwtchPeerInstance)
|
||||
cpi.Init(rai, cwtchpeer)
|
||||
return func() channels.Handler {
|
||||
cpc := new(peer.CwtchPeerDataChannel)
|
||||
cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler}
|
||||
return cpc
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
cwtchpeer.Init(cp.Profile.Name, cp.Profile.OnionPrivateKey, af, cp)
|
||||
log.Printf("Running cwtch peer on %v", l.Addr().String())
|
||||
cp.app = cwtchpeer
|
||||
|
@ -435,8 +462,9 @@ func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *app
|
|||
|
||||
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
|
||||
type CwtchPeerHandler struct {
|
||||
Onion string
|
||||
Peer *cwtchPeer
|
||||
Onion string
|
||||
Peer *cwtchPeer
|
||||
DataHandler func(string, []byte) []byte
|
||||
}
|
||||
|
||||
// ClientIdentity handles incoming ClientIdentity packets
|
||||
|
@ -456,3 +484,8 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
|
|||
func (cph *CwtchPeerHandler) GetClientIdentityPacket() []byte {
|
||||
return cph.Peer.Profile.GetCwtchIdentityPacket()
|
||||
}
|
||||
|
||||
// HandlePacket handles the Cwtch Peer Data Channel
|
||||
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
|
||||
return cph.DataHandler(cph.Onion, data)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
package peer
|
||||
|
||||
import (
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
|
||||
)
|
||||
|
||||
// CwtchPeerDataChannel implements the ChannelHandler interface for a channel of
|
||||
// type "im.cwtch.peer.data The channel may be inbound or outbound.
|
||||
//
|
||||
// CwtchPeerChannel implements protocol-level sanity and state validation, but
|
||||
// does not handle or acknowledge Cwtch messages. The application must provide
|
||||
// a CwtchPeerChannelHandler implementation to handle Cwtch events.
|
||||
type CwtchPeerDataChannel struct {
|
||||
// Methods of Handler are called for Cwtch events on this channel
|
||||
Handler CwtchPeerDataHandler
|
||||
channel *channels.Channel
|
||||
}
|
||||
|
||||
// CwtchPeerDataHandler is implemented by an application type to receive
|
||||
// events from a CwtchPeerChannel.
|
||||
type CwtchPeerDataHandler interface {
|
||||
HandlePacket([]byte) []byte
|
||||
}
|
||||
|
||||
// SendMessage sends a raw message on this channel
|
||||
func (cpc *CwtchPeerDataChannel) SendMessage(data []byte) {
|
||||
cpc.channel.SendMessage(data)
|
||||
}
|
||||
|
||||
// Type returns the type string for this channel, e.g. "im.ricochet.Cwtch".
|
||||
func (cpc *CwtchPeerDataChannel) Type() string {
|
||||
return "im.cwtch.peer.data"
|
||||
}
|
||||
|
||||
// Closed is called when the channel is closed for any reason.
|
||||
func (cpc *CwtchPeerDataChannel) Closed(err error) {
|
||||
|
||||
}
|
||||
|
||||
// OnlyClientCanOpen - for Cwtch channels any side can open
|
||||
func (cpc *CwtchPeerDataChannel) OnlyClientCanOpen() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Singleton - for Cwtch channels there can only be one instance per direction
|
||||
func (cpc *CwtchPeerDataChannel) Singleton() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Bidirectional - for Cwtch channels are bidirectional
|
||||
func (cpc *CwtchPeerDataChannel) Bidirectional() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// RequiresAuthentication - Cwtch channels require hidden service auth
|
||||
func (cpc *CwtchPeerDataChannel) RequiresAuthentication() string {
|
||||
return "im.ricochet.auth.hidden-service"
|
||||
}
|
||||
|
||||
// OpenInbound is the first method called for an inbound channel request.
|
||||
// If an error is returned, the channel is rejected. If a RawMessage is
|
||||
// returned, it will be sent as the ChannelResult message.
|
||||
func (cpc *CwtchPeerDataChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) {
|
||||
cpc.channel = channel
|
||||
messageBuilder := new(utils.MessageBuilder)
|
||||
return messageBuilder.AckOpenChannel(channel.ID), nil
|
||||
}
|
||||
|
||||
// OpenOutbound is the first method called for an outbound channel request.
|
||||
// If an error is returned, the channel is not opened. If a RawMessage is
|
||||
// returned, it will be sent as the OpenChannel message.
|
||||
func (cpc *CwtchPeerDataChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) {
|
||||
cpc.channel = channel
|
||||
messageBuilder := new(utils.MessageBuilder)
|
||||
return messageBuilder.OpenChannel(channel.ID, cpc.Type()), nil
|
||||
}
|
||||
|
||||
// OpenOutboundResult is called when a response is received for an
|
||||
// outbound OpenChannel request. If `err` is non-nil, the channel was
|
||||
// rejected and Closed will be called immediately afterwards. `raw`
|
||||
// contains the raw protocol message including any extension data.
|
||||
func (cpc *CwtchPeerDataChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) {
|
||||
if err == nil {
|
||||
if crm.GetOpened() {
|
||||
cpc.channel.Pending = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Packet is called for each raw packet received on this channel.
|
||||
func (cpc *CwtchPeerDataChannel) Packet(data []byte) {
|
||||
ret := cpc.Handler.HandlePacket(data)
|
||||
if len(ret) >= 0 {
|
||||
cpc.channel.SendMessage(ret)
|
||||
}
|
||||
}
|
|
@ -182,7 +182,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
bob.JoinServer(serverAddr)
|
||||
|
||||
fmt.Println("Waiting for peerings and server joins...")
|
||||
time.Sleep(time.Second * 120)
|
||||
time.Sleep(time.Second * 240)
|
||||
|
||||
fmt.Println("Alice inviting Bob to group...")
|
||||
err = alice.InviteOnionToGroup(bob.GetProfile().Onion, groupID)
|
||||
|
|
Loading…
Reference in New Issue