Merge branch 'peerdata' of cwtch.im/cwtch into master

This commit is contained in:
Sarah Jamie Lewis 2018-10-05 20:12:15 +00:00 committed by Gogs
commit 6818dd5602
9 changed files with 245 additions and 16 deletions

View File

@ -15,10 +15,11 @@ import (
)
type application struct {
peers map[string]peer.CwtchPeer
torManager *tor.Manager
directory string
mutex sync.Mutex
peers map[string]peer.CwtchPeer
torManager *tor.Manager
directory string
mutex sync.Mutex
primaryonion string
}
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
@ -26,6 +27,7 @@ type Application interface {
LoadProfiles(password string) error
CreatePeer(name string, password string) (peer.CwtchPeer, error)
PrimaryIdentity() peer.CwtchPeer
GetPeer(onion string) peer.CwtchPeer
ListPeers() map[string]string
@ -100,6 +102,9 @@ func (app *application) LoadProfiles(password string) error {
app.startPeer(p)
app.mutex.Lock()
app.peers[p.GetProfile().Onion] = p
if app.primaryonion == "" {
app.primaryonion = p.GetProfile().Onion
}
app.mutex.Unlock()
}
return nil
@ -148,6 +153,11 @@ func (app *application) ListPeers() map[string]string {
return keys
}
// PrimaryIdentity returns a Peer for a given onion address
func (app *application) PrimaryIdentity() peer.CwtchPeer {
return app.peers[app.primaryonion]
}
// GetPeer returns a Peer for a given onion address
func (app *application) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := app.peers[onion]; ok {

19
app/peer/alice/alice.go Normal file
View File

@ -0,0 +1,19 @@
package main
import (
"cwtch.im/cwtch/peer"
"log"
)
func main() {
alice, _ := peer.NewCwtchPeer("alice", "password", ".")
processData := func(onion string, data []byte) []byte {
log.Printf("Recieved %s from %v", data, onion)
return data
}
alice.SetPeerDataHandler(processData)
alice.Listen()
}

27
app/peer/bob/bob.go Normal file
View File

@ -0,0 +1,27 @@
package main
import (
"cwtch.im/cwtch/peer"
"log"
"strconv"
"time"
)
func main() {
bob, _ := peer.NewCwtchPeer("bob", "password", ".")
counter := 1
bob.SetPeerDataHandler(func(onion string, data []byte) []byte {
log.Printf("Recieved %s from %v", data, onion)
counter++
return []byte(strconv.Itoa(counter))
})
connection := bob.PeerWithOnion("qtpnmnth767gjmpv")
log.Printf("Waiting for Bob to Connect to Alice...")
connection.SendPacket([]byte("Hello Alice!!!"))
// Wait a while...
time.Sleep(time.Second * 100)
}

View File

@ -25,17 +25,18 @@ func NewConnectionsManager() *Manager {
}
// ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile) {
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection {
m.lock.Lock()
defer m.lock.Unlock()
_, exists := m.peerConnections[host]
if !exists {
ppc := NewPeerPeerConnection(host, profile)
ppc := NewPeerPeerConnection(host, profile, dataHandler)
go ppc.Run()
m.peerConnections[host] = ppc
return ppc
}
m.lock.Unlock()
return m.peerConnections[host]
}
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.

View File

@ -19,13 +19,15 @@ type PeerPeerConnection struct {
state ConnectionState
connection *connection.Connection
profile *model.Profile
dataHandler func(string, []byte) []byte
}
// NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
func NewPeerPeerConnection(peerhostname string, profile *model.Profile) *PeerPeerConnection {
func NewPeerPeerConnection(peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte) *PeerPeerConnection {
ppc := new(PeerPeerConnection)
ppc.PeerHostname = peerhostname
ppc.profile = profile
ppc.dataHandler = dataHandler
ppc.Init()
return ppc
}
@ -50,8 +52,30 @@ func (ppc *PeerPeerConnection) GetClientIdentityPacket() []byte {
return nil
}
// HandlePacket handles data packets on the optional data channel
func (ppc *PeerPeerConnection) HandlePacket(data []byte) []byte {
return ppc.dataHandler(ppc.PeerHostname, data)
}
// SendPacket sends data packets on the optional data channel
func (ppc *PeerPeerConnection) SendPacket(data []byte) {
ppc.WaitTilAuthenticated()
ppc.connection.Do(func() error {
channel := ppc.connection.Channel("im.cwtch.peer.data", channels.Outbound)
if channel != nil {
peerchannel, ok := channel.Handler.(*peer.CwtchPeerDataChannel)
if ok {
log.Printf("Sending packet\n")
peerchannel.SendMessage(data)
}
}
return nil
})
}
// SendGroupInvite sends the given serialized invite packet to the Peer
func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
ppc.WaitTilAuthenticated()
ppc.connection.Do(func() error {
channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound)
if channel != nil {
@ -65,6 +89,16 @@ func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
})
}
// WaitTilAuthenticated waits until the underlying connection is authenticated
func (ppc *PeerPeerConnection) WaitTilAuthenticated() {
for {
if ppc.GetState() == AUTHENTICATED {
break
}
time.Sleep(time.Second * 5)
}
}
// Run manages the setup and teardown of a peer->peer connection
func (ppc *PeerPeerConnection) Run() error {
ppc.state = CONNECTING
@ -82,6 +116,13 @@ func (ppc *PeerPeerConnection) Run() error {
return nil
})
if ppc.dataHandler != nil {
ppc.connection.Do(func() error {
ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc})
return nil
})
}
time.Sleep(time.Second * 1)
ppc.connection.Do(func() error {
channel := ppc.connection.Channel("im.cwtch.peer", channels.Outbound)

View File

@ -86,7 +86,7 @@ func TestPeerPeerConnection(t *testing.T) {
}
profile := model.GenerateNewProfile("alice")
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile)
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile, nil)
//numcalls := 0
tp := new(TestPeer)
tp.Init()

View File

@ -38,13 +38,14 @@ type cwtchPeer struct {
profilefile string
key [32]byte
salt [128]byte
dataHandler func(string, []byte) []byte
}
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
// directly implement a cwtchPeer.
type CwtchPeer interface {
Save() error
PeerWithOnion(string)
PeerWithOnion(string) *connections.PeerPeerConnection
InviteOnionToGroup(string, string) error
TrustPeer(string) error
@ -70,6 +71,8 @@ type CwtchPeer interface {
GetContacts() []string
GetContact(string) *model.PublicProfile
SetPeerDataHandler(func(string, []byte) []byte)
Listen() error
Shutdown()
}
@ -228,6 +231,11 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
return
}
// ExportGroup serializes a group invite so it can be given offline
func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
cp.dataHandler = dataHandler
}
// ExportGroup serializes a group invite so it can be given offline
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
group := cp.Profile.GetGroupByGroupID(groupID)
@ -279,8 +287,8 @@ func (cp *cwtchPeer) GetProfile() *model.Profile {
}
// PeerWithOnion is the entry point for cwtchPeer relationships
func (cp *cwtchPeer) PeerWithOnion(onion string) {
cp.connectionsManager.ManagePeerConnection(onion, cp.Profile)
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler)
}
// InviteOnionToGroup kicks off the invite process
@ -382,6 +390,12 @@ func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (al
return !blocked, true
}
// LookupContact returns that a contact is known and allowed to communicate for all cases.
func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
blocked := cp.Profile.IsBlocked(hostname)
return !blocked, true
}
// ContactRequest needed to implement ContactRequestHandler Interface
func (cp *cwtchPeer) ContactRequest(name string, message string) string {
return "Accepted"
@ -407,6 +421,19 @@ func (cp *cwtchPeer) Listen() error {
return cpc
}
})
if cp.dataHandler != nil {
af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, cwtchpeer)
return func() channels.Handler {
cpc := new(peer.CwtchPeerDataChannel)
cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler}
return cpc
}
})
}
cwtchpeer.Init(cp.Profile.Name, cp.Profile.OnionPrivateKey, af, cp)
log.Printf("Running cwtch peer on %v", l.Addr().String())
cp.app = cwtchpeer
@ -435,8 +462,9 @@ func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *app
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
type CwtchPeerHandler struct {
Onion string
Peer *cwtchPeer
Onion string
Peer *cwtchPeer
DataHandler func(string, []byte) []byte
}
// ClientIdentity handles incoming ClientIdentity packets
@ -456,3 +484,8 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
func (cph *CwtchPeerHandler) GetClientIdentityPacket() []byte {
return cph.Peer.Profile.GetCwtchIdentityPacket()
}
// HandlePacket handles the Cwtch Peer Data Channel
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
return cph.DataHandler(cph.Onion, data)
}

View File

@ -0,0 +1,98 @@
package peer
import (
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
)
// CwtchPeerDataChannel implements the ChannelHandler interface for a channel of
// type "im.cwtch.peer.data The channel may be inbound or outbound.
//
// CwtchPeerChannel implements protocol-level sanity and state validation, but
// does not handle or acknowledge Cwtch messages. The application must provide
// a CwtchPeerChannelHandler implementation to handle Cwtch events.
type CwtchPeerDataChannel struct {
// Methods of Handler are called for Cwtch events on this channel
Handler CwtchPeerDataHandler
channel *channels.Channel
}
// CwtchPeerDataHandler is implemented by an application type to receive
// events from a CwtchPeerChannel.
type CwtchPeerDataHandler interface {
HandlePacket([]byte) []byte
}
// SendMessage sends a raw message on this channel
func (cpc *CwtchPeerDataChannel) SendMessage(data []byte) {
cpc.channel.SendMessage(data)
}
// Type returns the type string for this channel, e.g. "im.ricochet.Cwtch".
func (cpc *CwtchPeerDataChannel) Type() string {
return "im.cwtch.peer.data"
}
// Closed is called when the channel is closed for any reason.
func (cpc *CwtchPeerDataChannel) Closed(err error) {
}
// OnlyClientCanOpen - for Cwtch channels any side can open
func (cpc *CwtchPeerDataChannel) OnlyClientCanOpen() bool {
return false
}
// Singleton - for Cwtch channels there can only be one instance per direction
func (cpc *CwtchPeerDataChannel) Singleton() bool {
return true
}
// Bidirectional - for Cwtch channels are bidirectional
func (cpc *CwtchPeerDataChannel) Bidirectional() bool {
return true
}
// RequiresAuthentication - Cwtch channels require hidden service auth
func (cpc *CwtchPeerDataChannel) RequiresAuthentication() string {
return "im.ricochet.auth.hidden-service"
}
// OpenInbound is the first method called for an inbound channel request.
// If an error is returned, the channel is rejected. If a RawMessage is
// returned, it will be sent as the ChannelResult message.
func (cpc *CwtchPeerDataChannel) OpenInbound(channel *channels.Channel, raw *Protocol_Data_Control.OpenChannel) ([]byte, error) {
cpc.channel = channel
messageBuilder := new(utils.MessageBuilder)
return messageBuilder.AckOpenChannel(channel.ID), nil
}
// OpenOutbound is the first method called for an outbound channel request.
// If an error is returned, the channel is not opened. If a RawMessage is
// returned, it will be sent as the OpenChannel message.
func (cpc *CwtchPeerDataChannel) OpenOutbound(channel *channels.Channel) ([]byte, error) {
cpc.channel = channel
messageBuilder := new(utils.MessageBuilder)
return messageBuilder.OpenChannel(channel.ID, cpc.Type()), nil
}
// OpenOutboundResult is called when a response is received for an
// outbound OpenChannel request. If `err` is non-nil, the channel was
// rejected and Closed will be called immediately afterwards. `raw`
// contains the raw protocol message including any extension data.
func (cpc *CwtchPeerDataChannel) OpenOutboundResult(err error, crm *Protocol_Data_Control.ChannelResult) {
if err == nil {
if crm.GetOpened() {
cpc.channel.Pending = false
}
}
}
// Packet is called for each raw packet received on this channel.
func (cpc *CwtchPeerDataChannel) Packet(data []byte) {
ret := cpc.Handler.HandlePacket(data)
if len(ret) >= 0 {
cpc.channel.SendMessage(ret)
}
}

View File

@ -182,7 +182,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
bob.JoinServer(serverAddr)
fmt.Println("Waiting for peerings and server joins...")
time.Sleep(time.Second * 120)
time.Sleep(time.Second * 240)
fmt.Println("Alice inviting Bob to group...")
err = alice.InviteOnionToGroup(bob.GetProfile().Onion, groupID)