Browse Source

peer getVal/retVal messages and functions and handling

pull/302/head
Dan Ballard 2 months ago
parent
commit
dc3df531dd
10 changed files with 307 additions and 38 deletions
  1. +4
    -4
      app/app.go
  2. +19
    -2
      event/common.go
  3. +9
    -3
      go.mod
  4. +19
    -4
      go.sum
  5. +60
    -0
      model/attr/scope.go
  6. +56
    -5
      peer/cwtch_peer.go
  7. +59
    -5
      protocol/connections/engine.go
  8. +35
    -8
      protocol/connections/peerapp.go
  9. +0
    -1
      storage/v1/profile_store.go
  10. +46
    -6
      testing/cwtch_peer_server_integration_test.go

+ 4
- 4
app/app.go View File

@@ -118,19 +118,19 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin
app.storage[profile.Onion] = profileStore

pc := app.storage[profile.Onion].GetProfileCopy(true)
peer := peer.FromProfile(pc)
peer.Init(app.eventBuses[profile.Onion])
p := peer.FromProfile(pc)
p.Init(app.eventBuses[profile.Onion])

blockedPeers := profile.BlockedPeers()
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], profile.GetContacts(), blockedPeers)

app.peers[profile.Onion] = peer
app.peers[profile.Onion] = p
app.engines[profile.Onion] = engine

if tag != "" {
peer.SetAttribute(AttributeTag, tag)
p.SetAttribute(AttributeTag, tag)
}

app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))


+ 19
- 2
event/common.go View File

@@ -64,6 +64,18 @@ const (
SendMessageToPeer = Type("SendMessageToPeer")
NewMessageFromPeer = Type("NewMessageFromPeer")

// RemotePeer, scope, path
NewGetValMessageFromPeer = Type("NewGetValMessageFromPeer")

// RemotePeer, val, exists
SendRetValMessageToPeer = Type("SendRetValMessageToPeer")

// RemotePeer, scope, val
SendGetValMessageToPeer = Type("SendGetValMessageToPeer")

// RemotePeer, scope, path, data, exists
NewRetValMessageFromPeer = Type("NewRetValMessageFromPeer")

// Peer acknowledges a previously sent message
// attributes
// EventID: The original event id that the peer is responding too.
@@ -218,8 +230,11 @@ const (

ConnectionState = Field("ConnectionState")

Key = Field("Key")
Data = Field("Data")
Key = Field("Key")
Data = Field("Data")
Scope = Field("Scope")
Path = Field("Path")
Exists = Field("Exists")

Salt = Field("Salt")

@@ -247,4 +262,6 @@ const (
ContextAck = "im.cwtch.acknowledgement"
ContextInvite = "im.cwtch.invite"
ContextRaw = "im.cwtch.raw"
ContextGetVal = "im.cwtch.getVal"
ContextRetVal = "im.cwtch.retVal"
)

+ 9
- 3
go.mod View File

@@ -1,18 +1,24 @@
module cwtch.im/cwtch

require (
cwtch.im/tapir v0.1.15
cwtch.im/tapir v0.1.17
git.openprivacy.ca/openprivacy/connectivity v1.1.1
git.openprivacy.ca/openprivacy/libricochet-go v1.0.11
git.openprivacy.ca/openprivacy/log v1.0.0
github.com/c-bata/go-prompt v0.2.3
github.com/golang/protobuf v1.3.3
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-runewidth v0.0.8 // indirect
github.com/mattn/go-tty v0.0.3 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 // indirect
github.com/stretchr/testify v1.5.1 // indirect
github.com/struCoder/pidusage v0.1.3
golang.org/x/crypto v0.0.0-20200210191831-6ca56c2f2e2b
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
go.etcd.io/bbolt v1.3.4 // indirect
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df
golang.org/x/net v0.0.0-20200320181208-1c781a10960a
golang.org/x/sys v0.0.0-20200320181252-af34d8274f85 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
)

go 1.13

+ 19
- 4
go.sum View File

@@ -1,5 +1,5 @@
cwtch.im/tapir v0.1.15 h1:XSCWOvjmNkzMT2IceFgTBXWGKtYfr3a8o+La1s10OhE=
cwtch.im/tapir v0.1.15/go.mod h1:HzezugpEx+nZ3LdyDsl0w6n45IJYnOt8uqldkLWmaqs=
cwtch.im/tapir v0.1.17 h1:2jVZUe1a88tMI4aJPvRTO4Id3NN3PsM62cT5lntEChk=
cwtch.im/tapir v0.1.17/go.mod h1:HzezugpEx+nZ3LdyDsl0w6n45IJYnOt8uqldkLWmaqs=
git.openprivacy.ca/openprivacy/connectivity v1.1.0 h1:9PEeKuPdoIRYeA62BUkBW2BfK4KqKEXz1fvUxZoP4xs=
git.openprivacy.ca/openprivacy/connectivity v1.1.0/go.mod h1:4P8mirZZslKbo2zBrXXVjgEdqGwHo/6qoFBwFQW6d6E=
git.openprivacy.ca/openprivacy/connectivity v1.1.1 h1:hKxBOmxP7Jdu3K1BJ93mRtKNiWUoP6YHt/o2snE2Z0w=
@@ -12,6 +12,7 @@ github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7I
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s=
github.com/c-bata/go-prompt v0.2.3/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cretz/bine v0.1.1-0.20200124154328-f9f678b84cca h1:Q2r7AxHdJwWfLtBZwvW621M3sPqxPc6ITv2j1FGsYpw=
github.com/cretz/bine v0.1.1-0.20200124154328-f9f678b84cca/go.mod h1:6PF6fWAvYtwjRGkAuDEJeWNOv3a2hUouSP/yRYXmvHw=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
@@ -35,6 +36,8 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
@@ -48,6 +51,8 @@ github.com/mattn/go-tty v0.0.3 h1:5OfyWorkyO7xP52Mq7tB36ajHDG5OHrmBGIS/DtakQI=
github.com/mattn/go-tty v0.0.3/go.mod h1:ihxohKRERHTVzN+aSVRwACLCeqIoZAWpoICkkvrWyR0=
github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 h1:hLDRPB66XQT/8+wG9WsDpiCvZf1yKO7sz7scAjSlBa0=
github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 h1:A7GG7zcGjl3jqAqGPmcNjd/D9hzL95SuoOQAaFNdLU0=
github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -57,19 +62,25 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ=
github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a h1:aczoJ0HPNE92XKa7DrIzkNN6esOKO2TBwiiYoKcINhA=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200210191831-6ca56c2f2e2b h1:at/SwedEdvIO7WPO4V9Yn1a3MqdSqAo0p6trBocu5aA=
golang.org/x/crypto v0.0.0-20200210191831-6ca56c2f2e2b/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df h1:lDWgvUvNnaTnNBc/dwOty86cFeKoKWbwy2wQj0gIxbU=
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200320181208-1c781a10960a h1:KaxWXSFrOaE2ptiOotI+zFdzHxBsg9MW6XfCv497IRo=
golang.org/x/net v0.0.0-20200320181208-1c781a10960a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -81,11 +92,15 @@ golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200320181252-af34d8274f85 h1:fD99hd4ciR6T3oPhr2EkmuKe9oHixHx9Hj/hND89j3g=
golang.org/x/sys v0.0.0-20200320181252-af34d8274f85/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

+ 60
- 0
model/attr/scope.go View File

@@ -0,0 +1,60 @@
package attr

import (
"strings"
)

/*
Scope model for peer attributes and requests

A local peer "Alice" has a PublicScope that is queryable by getVal requests.
By default, for now, other scopes are private, of which we here define SettingsScope

Alice's peer structs of remote peers such as "Bob" keep the queried
PublicScope values in the PeerScope, which can be overriden by the same named
values stored in the LocalScope.

*/

// scopes for attributes
const (
// on a peer, local and peer supplied data
LocalScope = "local"
PeerScope = "peer"

// on a local profile, public data and private settings
PublicScope = "public"
SettingsScope = "settings"
)

// Seperator for scope and the rest of path
const Seperator = "."

// GetPublicScope takes a path and attaches the pubic scope to it
func GetPublicScope(path string) string {
return PublicScope + Seperator + path
}

// GetSettingsScope takes a path and attaches the settings scope to it
func GetSettingsScope(path string) string {
return SettingsScope + Seperator + path
}

// GetLocalScope takes a path and attaches the local scope to it
func GetLocalScope(path string) string {
return LocalScope + Seperator + path
}

// GetPeerScope takes a path and attaches the peer scope to it
func GetPeerScope(path string) string {
return PeerScope + Seperator + path
}

// GetScopePath take a full path and returns the scope and the scope-less path
func GetScopePath(fullPath string) (string, string) {
parts := strings.SplitN(fullPath, Seperator, 1)
if len(parts) != 2 {
return "", ""
}
return parts[0], parts[1]
}

+ 56
- 5
peer/cwtch_peer.go View File

@@ -3,12 +3,14 @@ package peer
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"encoding/base32"
"encoding/base64"
"encoding/json"
"errors"
"git.openprivacy.ca/openprivacy/log"
"strconv"
"strings"
"sync"
"time"
@@ -16,7 +18,8 @@ import (

var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true}
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true}

// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct {
@@ -36,6 +39,7 @@ type CwtchPeer interface {
PeerWithOnion(string)
InviteOnionToGroup(string, string) error
SendMessageToPeer(string, string) string
SendGetValToPeer(string, string, string)

TrustPeer(string) error
BlockPeer(string) error
@@ -103,7 +107,8 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) {
go cp.eventHandler()

cp.eventBus = eventBus
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.PeerError, event.SendMessageToGroupError})
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement,
event.PeerError, event.SendMessageToGroupError, event.NewGetValMessageFromPeer, event.NewRetValMessageFromPeer})
}

// AutoHandleEvents sets an event (if able) to be handled by this peer
@@ -333,6 +338,11 @@ func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
return event.EventID
}

func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) {
event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
cp.eventBus.Publish(event)
}

// TrustPeer sets an existing peer relationship to trusted
func (cp *cwtchPeer) TrustPeer(peer string) error {
cp.mutex.Lock()
@@ -434,7 +444,10 @@ func (cp *cwtchPeer) SetAttribute(key string, val string) {
func (cp *cwtchPeer) GetAttribute(key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.GetAttribute(key)
if val, exists := cp.Profile.GetAttribute(key); exists {
return val, true
}
return "", false
}

// SetContactAttribute sets an attribute for the indicated contact and emits an event
@@ -456,7 +469,9 @@ func (cp *cwtchPeer) GetContactAttribute(onion string, key string) (string, bool
cp.mutex.Lock()
defer cp.mutex.Unlock()
if contact, ok := cp.Profile.GetContact(onion); ok {
return contact.GetAttribute(key)
if val, exists := contact.GetAttribute(key); exists {
return val, true
}
}
return "", false
}
@@ -480,7 +495,9 @@ func (cp *cwtchPeer) GetGroupAttribute(gid string, key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if group := cp.Profile.GetGroup(gid); group != nil {
return group.GetAttribute(key)
if val, exists := group.GetAttribute(key); exists {
return val, true
}
}
return "", false
}
@@ -530,6 +547,40 @@ func (cp *cwtchPeer) eventHandler() {
cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
cp.mutex.Unlock()

case event.NewGetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]
scope := ev.Data[event.Scope]
path := ev.Data[event.Path]

log.Debugf("NewGetValMessageFromPeer for %v%v from %v\n", scope, path, onion)

if scope == attr.PublicScope {
val, exists := cp.GetAttribute(attr.GetPublicScope(path))
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)})
resp.EventID = ev.EventID
if exists {
resp.Data[event.Data] = val
} else {
resp.Data[event.Data] = ""
}
log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val)

cp.eventBus.Publish(resp)
}

case event.NewRetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]
scope := ev.Data[event.Scope]
path := ev.Data[event.Path]
val := ev.Data[event.Data]
exists, _ := strconv.ParseBool(ev.Data[event.Exists])
log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val)
if exists {
if scope == attr.PublicScope {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
}
}

/***** Non default but requestable handlable events *****/

case event.NewGroupInvite:


+ 59
- 5
protocol/connections/engine.go View File

@@ -4,14 +4,15 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/tapir"
"cwtch.im/tapir/applications"
"cwtch.im/tapir/networks/tor"
"cwtch.im/tapir/primitives"
"encoding/json"
"errors"
"git.openprivacy.ca/openprivacy/connectivity"
torProdider "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"golang.org/x/crypto/ed25519"
"strconv"
"sync"
"time"
)
@@ -77,6 +78,8 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue)
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue)
engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue)
engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue)
engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)

@@ -144,6 +147,10 @@ func (e *engine) eventHandler() {
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
}
case event.SendGetValMessageToPeer:
e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path])
case event.SendRetValMessageToPeer:
e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists])
case event.UnblockPeer:
// We simply remove the peer from our blocklist
// The UI has the responsibility to reinitiate contact with the peer.
@@ -186,6 +193,7 @@ func (e *engine) createPeerTemplate() *PeerApp {
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
peerAppTemplate.RetValHandler = e.handlePeerRetVal
return peerAppTemplate
}

@@ -209,7 +217,6 @@ func (e *engine) Shutdown() {
// peerWithOnion is the entry point for cwtchPeer relationships
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithOnion(onion string) {
log.Infof("PEER WITH ONION: %v\n", onion)
blocked, known := e.blocked.Load(onion)
if known && !(blocked.(bool)) {
e.ignoreOnShutdown(e.peerConnecting)(onion)
@@ -220,7 +227,7 @@ func (e *engine) peerWithOnion(onion string) {
if connected && err != nil {
conn, err := e.service.GetConnection(onion)
if err == nil {
if conn.HasCapability(applications.AuthCapability) {
if conn.HasCapability(cwtchCapability) {
e.ignoreOnShutdown(e.peerAuthed)(onion)
return
}
@@ -280,7 +287,7 @@ func (e *engine) peerDisconnected(onion string) {

// sendMessageToPeer sends a message to a peer under a given context
func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
conn, err := e.service.GetConnection(onion)
conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
@@ -292,6 +299,27 @@ func (e *engine) sendMessageToPeer(eventID string, onion string, context string,
return err
}

func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
log.Debugf("sendGetValMessage to peer %v %v%v\n", onion, scope, path)
getVal := peerGetVal{Scope: scope, Path: path}
message, err := json.Marshal(getVal)
if err != nil {
return err
}
return e.sendMessageToPeer(eventID, onion, event.ContextGetVal, message)
}

func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error {
log.Debugf("sendRetValMessage to peer %v (%v) %v %v\n", onion, eventID, val, existsStr)
exists, _ := strconv.ParseBool(existsStr)
retVal := peerRetVal{Val: val, Exists: exists}
message, err := json.Marshal(retVal)
if err != nil {
return err
}
return e.sendMessageToPeer(eventID, onion, event.ContextRetVal, message)
}

func (e *engine) deleteConnection(id string) {
conn, err := e.service.GetConnection(id)
if err == nil {
@@ -328,11 +356,37 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
}
}

func (e *engine) handlePeerMessage(hostname string, context string, message []byte) {
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
log.Debugf("New message from peer: %v %v", hostname, context)
if context == event.ContextInvite {
e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(message)}))
} else if context == event.ContextGetVal {
var getVal peerGetVal
err := json.Unmarshal(message, &getVal)
if err == nil {
ev := event.NewEventList(event.NewGetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path)
ev.EventID = eventID
e.eventManager.Publish(ev)
}
} else {
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
}
}

func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte) {
var getVal peerGetVal
var retVal peerRetVal

err := json.Unmarshal(getValData, &getVal)
if err != nil {
log.Errorf("Unmarshalling our own getVal request: %v\n", err)
return
}
err = json.Unmarshal(retValData, &retVal)
if err != nil {
log.Errorf("Unmarshalling peer response to getVal request")
return
}

e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val))
}

+ 35
- 8
protocol/connections/peerapp.go View File

@@ -6,18 +6,24 @@ import (
"cwtch.im/tapir/applications"
"encoding/json"
"git.openprivacy.ca/openprivacy/log"
"sync"
)

const cwtchCapability = tapir.Capability("cwtchCapability")

// PeerApp encapsulates the behaviour of a Cwtch Peer
type PeerApp struct {
applications.AuthApp
connection tapir.Connection
MessageHandler func(string, string, []byte)
MessageHandler func(string, string, string, []byte)
RetValHandler func(string, []byte, []byte)
IsBlocked func(string) bool
OnAcknowledgement func(string, string)
OnAuth func(string)
OnClose func(string)
OnConnecting func(string)

getValRequests sync.Map // [string]string eventID:Data
}

// PeerMessage is an encapsulation that can be used by higher level applications
@@ -27,8 +33,17 @@ type PeerMessage struct {
Data []byte // The serialized data packet.
}

type peerGetVal struct {
Scope, Path string
}

type peerRetVal struct {
Val string
Exists bool
}

// NewInstance should always return a new instantiation of the application.
func (pa PeerApp) NewInstance() tapir.Application {
func (pa *PeerApp) NewInstance() tapir.Application {
newApp := new(PeerApp)
newApp.MessageHandler = pa.MessageHandler
newApp.IsBlocked = pa.IsBlocked
@@ -36,18 +51,19 @@ func (pa PeerApp) NewInstance() tapir.Application {
newApp.OnAuth = pa.OnAuth
newApp.OnClose = pa.OnClose
newApp.OnConnecting = pa.OnConnecting
newApp.RetValHandler = pa.RetValHandler
return newApp
}

// Init is run when the connection is first started.
func (pa *PeerApp) Init(connection tapir.Connection) {

// First run the Authentication App
pa.AuthApp.Init(connection)

if connection.HasCapability(applications.AuthCapability) {

pa.connection = connection
connection.SetCapability(cwtchCapability)

if pa.IsBlocked(connection.Hostname()) {
pa.connection.Close()
@@ -61,7 +77,7 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
}
}

func (pa PeerApp) listen() {
func (pa *PeerApp) listen() {
for {
message := pa.connection.Expect()
if len(message) == 0 {
@@ -72,10 +88,18 @@ func (pa PeerApp) listen() {
var peerMessage PeerMessage
err := json.Unmarshal(message, &peerMessage)
if err == nil {
if peerMessage.Context == event.ContextAck {
switch peerMessage.Context {
case event.ContextAck:
pa.OnAcknowledgement(pa.connection.Hostname(), peerMessage.ID)
} else {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.Context, peerMessage.Data)
case event.ContextRetVal:
req, ok := pa.getValRequests.Load(peerMessage.ID)
if ok {
reqStr := []byte(req.(string))
pa.RetValHandler(pa.connection.Hostname(), reqStr, peerMessage.Data)
pa.getValRequests.Delete(peerMessage.ID)
}
default:
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)

// Acknowledge the message
pa.SendMessage(PeerMessage{peerMessage.ID, event.ContextAck, []byte{}})
@@ -88,7 +112,10 @@ func (pa PeerApp) listen() {

// SendMessage sends the peer a preformatted message
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
func (pa PeerApp) SendMessage(message PeerMessage) {
func (pa *PeerApp) SendMessage(message PeerMessage) {
if message.Context == event.ContextGetVal {
pa.getValRequests.Store(message.ID, string(message.Data))
}
serialized, _ := json.Marshal(message)
pa.connection.Send(serialized)
}

+ 0
- 1
storage/v1/profile_store.go View File

@@ -97,7 +97,6 @@ func LoadProfileWriterStore(eventManager event.Manager, directory, password stri
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
copy(ps.salt[:], salt)


err = ps.load()
if err != nil {
return nil, err


+ 46
- 6
testing/cwtch_peer_server_integration_test.go View File

@@ -6,6 +6,7 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/event/bridge"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
cwtchserver "cwtch.im/cwtch/server"
@@ -171,14 +172,17 @@ func TestCwtchPeerIntegration(t *testing.T) {

alice := utils.WaitGetPeer(app, "alice")
fmt.Println("Alice created:", alice.GetOnion())
alice.SetAttribute(attr.GetPublicScope("name"), "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite})

bob := utils.WaitGetPeer(app, "bob")
fmt.Println("Bob created:", bob.GetOnion())
bob.SetAttribute(attr.GetPublicScope("name"), "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite})

carol := utils.WaitGetPeer(appClient, "carol")
fmt.Println("Carol created:", carol.GetOnion())
carol.SetAttribute(attr.GetPublicScope("name"), "Carol")
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite})

app.LaunchPeers()
@@ -214,6 +218,42 @@ func TestCwtchPeerIntegration(t *testing.T) {

fmt.Println("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
bob.AddContact("alice?", alice.GetOnion(), true)
waitForPeerPeerConnection(t, alice, carol)
carol.AddContact("alice?", alice.GetOnion(), true)

fmt.Println("Alice and Bob getVal public.name...")

alice.SendGetValToPeer(bob.GetOnion(), attr.PublicScope, "name")
bob.SendGetValToPeer(alice.GetOnion(), attr.PublicScope, "name")

alice.SendGetValToPeer(carol.GetOnion(), attr.PublicScope, "name")
carol.SendGetValToPeer(alice.GetOnion(), attr.PublicScope, "name")

time.Sleep(10 * time.Second)

aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope("name"))
if !exists || aliceName != "Alice" {
t.Fatalf("alice GetKeyVal error on alice peer.name %v\n", exists)
}
fmt.Printf("Bob has alice's name as '%v'\n", aliceName)

bobName, exists := alice.GetContactAttribute(bob.GetOnion(), attr.GetPeerScope("name"))
if !exists || bobName != "Bob" {
t.Fatalf("bob GetKeyVal error, alice peer.name\n")
}
fmt.Printf("Alice has bob's name as '%v'\n", bobName)

aliceName, exists = carol.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope("name"))
if !exists || aliceName != "Alice" {
t.Fatalf("carol GetKeyVal error for alice peer.name %v\n", exists)
}

carolName, exists := alice.GetContactAttribute(carol.GetOnion(), attr.GetPeerScope("name"))
if !exists || carolName != "Carol" {
t.Fatalf("alice GetKeyVal error, carol peer.name\n")
}
fmt.Printf("Alice has carol's name as '%v'\n", carolName)

fmt.Println("Alice inviting Bob to group...")
err = alice.InviteOnionToGroup(bob.GetOnion(), groupID)
@@ -242,25 +282,25 @@ func TestCwtchPeerIntegration(t *testing.T) {

fmt.Println("Starting conversation in group...")
// Conversation
fmt.Println("Alice> ", aliceLines[0])
fmt.Printf("%v> %v\n", aliceName, aliceLines[0])
err = alice.SendMessageToGroup(groupID, aliceLines[0])
if err != nil {
t.Fatalf("Alice failed to send a message to the group: %v", err)
}
time.Sleep(time.Second * 10)

fmt.Println("Bob> ", bobLines[0])
fmt.Printf("%v> %v\n", bobName, bobLines[0])
err = bob.SendMessageToGroup(groupID, bobLines[0])
if err != nil {
t.Fatalf("Bob failed to send a message to the group: %v", err)
}
time.Sleep(time.Second * 10)

fmt.Println("Alice> ", aliceLines[1])
fmt.Printf("%v> %v\n", aliceName, aliceLines[1])
alice.SendMessageToGroup(groupID, aliceLines[1])
time.Sleep(time.Second * 10)

fmt.Println("Bob> ", bobLines[1])
fmt.Printf("%v> %v\n", bobName, bobLines[1])
bob.SendMessageToGroup(groupID, bobLines[1])
time.Sleep(time.Second * 10)

@@ -290,11 +330,11 @@ func TestCwtchPeerIntegration(t *testing.T) {
waitForPeerGroupConnection(t, carol, groupID)
numGoRotinesPostCarolConnect := runtime.NumGoroutine()

fmt.Println("Bob> ", bobLines[2])
fmt.Printf("%v> %v", bobName, bobLines[2])
bob.SendMessageToGroup(groupID, bobLines[2])
time.Sleep(time.Second * 10)

fmt.Println("Carol> ", carolLines[0])
fmt.Printf("%v> %v", carolName, carolLines[0])
carol.SendMessageToGroup(groupID, carolLines[0])
time.Sleep(time.Second * 10)



Loading…
Cancel
Save