Compare commits

...

16 Commits

Author SHA1 Message Date
Sarah Jamie Lewis a7b885166a Merge pull request 'Enable per-contact file sharing permissions' (#554) from ep into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #554
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-04-29 15:37:50 +00:00
Sarah Jamie Lewis b32b11c711
Enable per-contact file sharing permissions
continuous-integration/drone/pr Build is passing Details
2024-04-16 11:35:21 -07:00
Sarah Jamie Lewis 0e96539f22 Merge pull request 'Store Messages and Send when Online' (#553) from offline-messages into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #553
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-04-16 18:35:02 +00:00
Sarah Jamie Lewis e55f342324
Updating Logging -> Debug
continuous-integration/drone/pr Build is passing Details
2024-02-26 13:40:47 -08:00
Sarah Jamie Lewis 89aca91b37
Store Messages and Send when Online
continuous-integration/drone/pr Build is passing Details
2024-02-26 13:18:38 -08:00
Sarah Jamie Lewis cd918c02ea Merge pull request 'Fix Error in ACL-V1 that Prevented ShareFiles (for some)' (#552) from acl-v2 into master
continuous-integration/drone/push Build is passing Details
Reviewed-on: #552
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-02-26 17:26:17 +00:00
Sarah Jamie Lewis 05a198c89f
Fix Error in ACL-V1 that Prevented ShareFiles (for some)
continuous-integration/drone/pr Build is passing Details
Also aligns model.DeserializeAttributes to best practice
2024-02-24 12:51:19 -08:00
Sarah Jamie Lewis 1d9202ff93 Don't reject text messages
continuous-integration/drone/pr Build is passing Details
continuous-integration/drone/push Build is pending Details
2024-02-12 22:02:35 +00:00
Sarah Jamie Lewis 0907af57d5 Merge pull request 'Introduce Channel/Overlay Mappings' (#549) from overlays into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #549
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-02-11 23:10:59 +00:00
Sarah Jamie Lewis 826ac40a5c Stream check in engine
continuous-integration/drone/pr Build is pending Details
2024-02-11 14:45:11 -08:00
Sarah Jamie Lewis 1a034953df Util Functions for MW
continuous-integration/drone/pr Build is pending Details
2024-02-11 14:44:18 -08:00
Sarah Jamie Lewis 3124f7b7c4 MessageOverlay time to pointer
continuous-integration/drone/pr Build is pending Details
2024-02-11 13:56:19 -08:00
Sarah Jamie Lewis 792e79dceb Introduce Channel/Overlay Mappings
continuous-integration/drone/pr Build is failing Details
- Map channel 7 to ephemeral / no ack
- Create model methods
- Introduce optional latency measurements into Cwtch
2024-02-11 12:14:07 -08:00
Sarah Jamie Lewis 3e0680943a Prevent Duplicate Queue Subscription
continuous-integration/drone/pr Build is pending Details
continuous-integration/drone/push Build is failing Details
2024-02-09 13:16:23 -08:00
Sarah Jamie Lewis 9cb62d269e Merge pull request 'Fix non-image/preview downloads in Android' (#547) from android_file_download_fix into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #547
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-02-09 21:06:25 +00:00
Sarah Jamie Lewis ec71e56d23 Fix non-image/preview downloads in Android
continuous-integration/drone/pr Build is pending Details
2024-02-09 11:33:25 -08:00
17 changed files with 246 additions and 52 deletions

4
.gitignore vendored
View File

@ -33,4 +33,6 @@ data-dir-cwtchtool/
tokens tokens
tordir/ tordir/
testing/autodownload/download_dir testing/autodownload/download_dir
testing/autodownload/storage testing/autodownload/storage
*.swp
testing/managerstorage/*

View File

@ -363,6 +363,7 @@ func (app *application) LoadProfiles(password string) {
func (app *application) registerHooks(profile peer.CwtchPeer) { func (app *application) registerHooks(profile peer.CwtchPeer) {
// Register Hooks // Register Hooks
profile.RegisterHook(extensions.ProfileValueExtension{}) profile.RegisterHook(extensions.ProfileValueExtension{})
profile.RegisterHook(extensions.SendWhenOnlineExtension{})
profile.RegisterHook(new(filesharing.Functionality)) profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality)) profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality)) profile.RegisterHook(new(servers.Functionality))

View File

@ -95,6 +95,11 @@ func (em *manager) initialize() {
func (em *manager) Subscribe(eventType Type, queue Queue) { func (em *manager) Subscribe(eventType Type, queue Queue) {
em.mapMutex.Lock() em.mapMutex.Lock()
defer em.mapMutex.Unlock() defer em.mapMutex.Unlock()
for _, sub := range em.subscribers[eventType] {
if sub == queue {
return // don't add the same queue for the same event twice...
}
}
em.subscribers[eventType] = append(em.subscribers[eventType], queue) em.subscribers[eventType] = append(em.subscribers[eventType], queue)
} }

View File

@ -104,6 +104,15 @@ func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, c
val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
} }
// NOTE: Cwtch 1.15+ requires that profiles be able to restrict file downloading to specific contacts. As such we need an ACL check here
// on the fileshareing zone.
// TODO: Split this functionality into FilesharingFunctionality, and restrict this function to only considering Profile zoned attributes?
if zone == attr.FilesharingZone {
if !conversation.GetPeerAC().ShareFiles {
return
}
}
// Construct a Response // Construct a Response
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)}) resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)})
resp.EventID = eventID resp.EventID = eventID

View File

@ -0,0 +1,66 @@
package extensions
import (
"strconv"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
)
// SendWhenOnlineExtension implements automatic sending
// Some Considerations:
// - There are race conditions inherant in this approach e.g. a peer could go offline just after recieving a message and never sending an ack
// - In that case the next time we connect we will send a duplicate message.
// - Currently we do not include metadata like sent time in raw peer protocols (however Overlay does now have support for that information)
type SendWhenOnlineExtension struct {
}
func (soe SendWhenOnlineExtension) NotifySettingsUpdate(_ settings.GlobalSettings) {
}
func (soe SendWhenOnlineExtension) EventsToRegister() []event.Type {
return []event.Type{event.PeerStateChange}
}
func (soe SendWhenOnlineExtension) ExperimentsToRegister() []string {
return nil
}
func (soe SendWhenOnlineExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
// Check the last 100 messages, if any of them are pending, then send them now...
messsages, _ := profile.GetMostRecentMessages(ci.ID, 0, 0, uint(100))
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
body := message.Body
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
log.Debugf("resending message that was sent when peer was offline")
profile.PublishEvent(ev)
}
}
}
}
}
}
// OnContactReceiveValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) {
}
// OnContactRequestValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) {
}

View File

@ -272,15 +272,21 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int,
} }
// Don't download files if the download file directory does not exist // Don't download files if the download file directory does not exist
if _, err := os.Stat(path.Dir(downloadFilePath)); os.IsNotExist(err) { // Unless we are on Android where the kernel wishes to keep us ignorant of the
return errors.New("download directory does not exist") // actual path and/or existence of the file. We handle this case further down
} // the line when the manifest is received and protocol engine and the Android layer
// negotiate a temporary local file -> final file copy. We don't want to worry
// about that here...
if runtime.GOOS != "android" {
if _, err := os.Stat(path.Dir(downloadFilePath)); os.IsNotExist(err) {
return errors.New("download directory does not exist")
}
// Don't download files if the manifest file directory does not exist // Don't download files if the manifest file directory does not exist
if _, err := os.Stat(path.Dir(manifestFilePath)); os.IsNotExist(err) { if _, err := os.Stat(path.Dir(manifestFilePath)); os.IsNotExist(err) {
return errors.New("manifest directory does not exist") return errors.New("manifest directory does not exist")
}
} }
// Store local.filesharing.filekey.manifest as the location of the manifest // Store local.filesharing.filekey.manifest as the location of the manifest
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), manifestFilePath) profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), manifestFilePath)

View File

@ -62,7 +62,15 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
if err == nil { if err == nil {
for _, ci := range conversations { for _, ci := range conversations {
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED { if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) // if we have enabled file shares for this contact, then send them our profile image
// NOTE: In the past, Cwtch treated "profile image" as a public file share. As such, anyone with the file key and who is able
// to authenticate with the profile (i.e. non-blocked peers) can download the file (if the global profile images experiment is enabled)
// To better allow for fine-grained permissions (and to support hybrid group permissions), we want to enable per-conversation file
// sharing permissions. As such, profile images are now only shared with contacts with that permission enabled.
// (i.e. all previous accepted contacts, new accepted contacts, and contacts who have this toggle set explictly)
if ci.GetPeerAC().ShareFiles {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
}
} }
} }
} }

1
go.mod
View File

@ -16,7 +16,6 @@ require (
require ( require (
filippo.io/edwards25519 v1.0.0 // indirect filippo.io/edwards25519 v1.0.0 // indirect
git.openprivacy.ca/openprivacy/bine v0.0.5 // indirect git.openprivacy.ca/openprivacy/bine v0.0.5 // indirect
github.com/client9/misspell v0.3.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect github.com/google/go-cmp v0.5.8 // indirect
github.com/gtank/merlin v0.1.1 // indirect github.com/gtank/merlin v0.1.1 // indirect
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect

2
go.sum
View File

@ -8,8 +8,6 @@ git.openprivacy.ca/openprivacy/connectivity v1.11.0 h1:roASjaFtQLu+HdH5fa2wx6F00
git.openprivacy.ca/openprivacy/connectivity v1.11.0/go.mod h1:OQO1+7OIz/jLxDrorEMzvZA6SEbpbDyLGpjoFqT3z1Y= git.openprivacy.ca/openprivacy/connectivity v1.11.0/go.mod h1:OQO1+7OIz/jLxDrorEMzvZA6SEbpbDyLGpjoFqT3z1Y=
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0= git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -71,3 +71,4 @@ const Description = "description"
// Used to store the status of acl migrations // Used to store the status of acl migrations
const ACLVersion = "acl-version" const ACLVersion = "acl-version"
const ACLVersionOne = "acl-v1" const ACLVersionOne = "acl-v1"
const ACLVersionTwo = "acl-v2"

View File

@ -1,11 +1,13 @@
package model package model
import ( import (
"encoding/json"
"time"
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/model/constants"
"encoding/json" "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"time"
) )
// AccessControl is a type determining client assigned authorization to a peer // AccessControl is a type determining client assigned authorization to a peer
@ -59,8 +61,12 @@ func (a *Attributes) Serialize() []byte {
// DeserializeAttributes converts a JSON struct into an Attributes map // DeserializeAttributes converts a JSON struct into an Attributes map
func DeserializeAttributes(data []byte) Attributes { func DeserializeAttributes(data []byte) Attributes {
var attributes Attributes attributes := make(Attributes)
json.Unmarshal(data, &attributes) err := json.Unmarshal(data, &attributes)
if err != nil {
log.Error("error deserializing attributes (this is likely a programming error): %v", err)
return make(Attributes)
}
return attributes return attributes
} }
@ -95,6 +101,11 @@ func (ci *Conversation) GetPeerAC() AccessControl {
return DefaultP2PAccessControl() return DefaultP2PAccessControl()
} }
// IsCwtchPeer is a helper attribute that identifies whether a conversation is a cwtch peer
func (ci *Conversation) IsCwtchPeer() bool {
return tor.IsValidHostname(ci.Handle)
}
// IsGroup is a helper attribute that identifies whether a conversation is a legacy group // IsGroup is a helper attribute that identifies whether a conversation is a legacy group
func (ci *Conversation) IsGroup() bool { func (ci *Conversation) IsGroup() bool {
if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists { if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists {

View File

@ -3,6 +3,7 @@ package model
import ( import (
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/json"
) )
// CalculateContentHash derives a hash using the author and the message body. It is intended to be // CalculateContentHash derives a hash using the author and the message body. It is intended to be
@ -12,3 +13,13 @@ func CalculateContentHash(author string, messageBody string) string {
contentBasedHash := sha256.Sum256(content) contentBasedHash := sha256.Sum256(content)
return base64.StdEncoding.EncodeToString(contentBasedHash[:]) return base64.StdEncoding.EncodeToString(contentBasedHash[:])
} }
func DeserializeMessage(message string) (*MessageWrapper, error) {
var cm MessageWrapper
err := json.Unmarshal([]byte(message), &cm)
if err != nil {
return nil, err
}
return &cm, err
}

View File

@ -1,9 +1,40 @@
package model package model
import (
"time"
)
// MessageWrapper is the canonical Cwtch overlay wrapper // MessageWrapper is the canonical Cwtch overlay wrapper
type MessageWrapper struct { type MessageWrapper struct {
Overlay int `json:"o"` Overlay int `json:"o"`
Data string `json:"d"` Data string `json:"d"`
// when the data was assembled
SendTime *time.Time `json:"s,omitempty"`
// when the data was transmitted (by protocol engine e.g. over Tor)
TransitTime *time.Time `json:"t,omitempty"`
// when the data was received
RecvTime *time.Time `json:"r,omitempty"`
}
// Channel is defined as being the last 3 bits of the overlay id
// Channel 0 is reserved for the main conversation
// Channel 2 is reserved for conversation admin (managed groups)
// Channel 7 is reserved for streams (no ack, no store)
func (mw MessageWrapper) Channel() int {
if mw.Overlay > 1024 {
return mw.Overlay & 0x07
}
// for backward compatibilty all overlays less than 0x400 i.e. 1024 are
// mapped to channel 0 regardless of their channel status.
return 0
}
// If Overlay is a Stream Message it should not be ackd, or stored.
func (mw MessageWrapper) IsStream() bool {
return mw.Channel() == 0x07
} }
// OverlayChat is the canonical identifier for chat overlays // OverlayChat is the canonical identifier for chat overlays

View File

@ -3,11 +3,20 @@ package peer
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os" "os"
path "path/filepath" path "path/filepath"
"sort" "sort"
@ -16,17 +25,7 @@ import (
"sync" "sync"
"time" "time"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
@ -311,15 +310,16 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
authorizations := make(map[string]model.Authorization) authorizations := make(map[string]model.Authorization)
for _, conversation := range conversations { for _, conversation := range conversations {
if tor.IsValidHostname(conversation.Handle) { // Only perform the following actions for Peer-type Conversaions...
if conversation.IsCwtchPeer() {
// if this profile does not have an ACL version, and the profile is accepted, then migrate // if this profile does not have an ACL version, and the profile is accepted (OR the acl version is v1 and the profile is accepted...)
// the permissions to the v1 ACL // then migrate the permissions to the v2 ACL
// migrate the old accepted AC to a new fine-grained one // migrate the old accepted AC to a new fine-grained one
// we only do this for previously trusted connections // we only do this for previously trusted connections
// NOTE: this does not supercede global cwthch experiments settings // NOTE: this does not supercede global cwthch experiments settings
// if share files is turned off globally then acl.ShareFiles will be ignored. // if share files is turned off globally then acl.ShareFiles will be ignored
if _, exists := conversation.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.ACLVersion); !exists { // Note: There was a bug in the original EP code that meant that some acl-v1 profiles did not get ShareFiles or RenderImages - this corrects that.
if version, exists := conversation.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.ACLVersion); !exists || version == constants.ACLVersionOne {
if conversation.Accepted { if conversation.Accepted {
if ac, exists := conversation.ACL[conversation.Handle]; exists { if ac, exists := conversation.ACL[conversation.Handle]; exists {
ac.ShareFiles = true ac.ShareFiles = true
@ -330,7 +330,7 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
} }
// Update the ACL Version // Update the ACL Version
cp.storage.SetConversationAttribute(conversation.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne) cp.storage.SetConversationAttribute(conversation.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionTwo)
// Store the updated ACL // Store the updated ACL
cp.storage.SetConversationACL(conversation.ID, conversation.ACL) cp.storage.SetConversationACL(conversation.ID, conversation.ACL)
} }
@ -435,12 +435,19 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
if tor.IsValidHostname(conversationInfo.Handle) { if tor.IsValidHostname(conversationInfo.Handle) {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message}) ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
id := -1
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks // check if we should store this message locally...
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message)) if cm, err := model.DeserializeMessage(message); err == nil {
if err != nil { if !cm.IsStream() {
return -1, err // For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
}
} }
cp.eventBus.Publish(ev) cp.eventBus.Publish(ev)
return id, nil return id, nil
} else { } else {
@ -707,8 +714,18 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
conversationInfo, _ := cp.storage.GetConversationByHandle(handle) conversationInfo, _ := cp.storage.GetConversationByHandle(handle)
if conversationInfo == nil { if conversationInfo == nil {
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted) conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne) if err != nil {
log.Errorf("unable to create a new contact conversation: %v", err)
return -1, err
}
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano)) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
if accepted {
// If this call came from a trusted action (i.e. import bundle or accept button then accept the conversation)
// This assigns all permissions (and in v2 is currently the default state of trusted contacts)
// Accept conversation does PeerWithOnion
cp.AcceptConversation(conversationID)
}
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionTwo)
cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle})) cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle}))
return conversationID, err return conversationID, err
} }
@ -1251,9 +1268,9 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
return ConstructResponse(constants.ImportBundlePrefix, "success") return ConstructResponse(constants.ImportBundlePrefix, "success")
} else if tor.IsValidHostname(importString) { } else if tor.IsValidHostname(importString) {
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true) _, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
// NOTE: Not NewContactConversation implictly does AcceptConversation AND PeerWithOnion if relevant so
// we no longer need to do it here...
if err == nil { if err == nil {
// Assuming all is good, we should peer with this contact.
cp.PeerWithOnion(importString)
return ConstructResponse(constants.ImportBundlePrefix, "success") return ConstructResponse(constants.ImportBundlePrefix, "success")
} }
return ConstructResponse(constants.ImportBundlePrefix, err.Error()) return ConstructResponse(constants.ImportBundlePrefix, err.Error())
@ -1482,6 +1499,13 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
} }
} }
// Don't store messages in channel 7
if cm, err := model.DeserializeMessage(message); err == nil {
if cm.IsStream() {
return -1, nil
}
}
// Generate a random number and use it as the signature // Generate a random number and use it as the signature
signature := event.GetRandNumber().String() signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message)) return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
@ -1586,7 +1610,6 @@ func (cp *cwtchPeer) eventHandler() {
conversationInfo, err := cp.FetchConversationInfo(onion) conversationInfo, err := cp.FetchConversationInfo(onion)
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err) log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
// only accepted contacts can look up information
if conversationInfo != nil && conversationInfo.GetPeerAC().ExchangeAttributes { if conversationInfo != nil && conversationInfo.GetPeerAC().ExchangeAttributes {
// Type Safe Scoped/Zoned Path // Type Safe Scoped/Zoned Path
zscope := attr.IntoScope(scope) zscope := attr.IntoScope(scope)
@ -1659,6 +1682,7 @@ func (cp *cwtchPeer) eventHandler() {
timestamp := time.Now().Format(time.RFC3339Nano) timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp) cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED { } else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED {
ci, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
if err == nil { if err == nil {

View File

@ -749,6 +749,16 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
// Fall through handler for the default text conversation. // Fall through handler for the default text conversation.
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
// Don't ack messages in channel 7
// Note: this code explictly doesn't care about malformed messages, we deal with them
// later on...we still want to ack the original send...(as some "malformed" messages
// may be future-ok)
if cm, err := model.DeserializeMessage(string(message)); err == nil {
if cm.IsStream() {
return
}
}
// Send an explicit acknowledgement // Send an explicit acknowledgement
// Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow // Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil { if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {

View File

@ -2,12 +2,14 @@ package connections
import ( import (
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
model2 "cwtch.im/cwtch/protocol/model" model2 "cwtch.im/cwtch/protocol/model"
"encoding/json" "encoding/json"
"git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"sync/atomic" "sync/atomic"
"time"
) )
const cwtchCapability = tapir.Capability("cwtchCapability") const cwtchCapability = tapir.Capability("cwtchCapability")
@ -133,6 +135,14 @@ func (pa *PeerApp) listen() {
pa.version.Store(Version2) pa.version.Store(Version2)
} }
} else { } else {
if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil {
if cm.TransitTime != nil {
rt := time.Now().UTC()
cm.RecvTime = &rt
data, _ := json.Marshal(cm)
packet.Data = data
}
}
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data) pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data)
} }
} }
@ -148,6 +158,15 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
var serialized []byte var serialized []byte
var err error var err error
if cm, err := model.DeserializeMessage(string(message.Data)); err == nil {
if cm.SendTime != nil {
tt := time.Now().UTC()
cm.TransitTime = &tt
data, _ := json.Marshal(cm)
message.Data = data
}
}
if pa.version.Load() == Version2 { if pa.version.Load() == Version2 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size) // treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = message.Serialize() serialized = message.Serialize()

View File

@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest") os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelInfo) log.SetLevel(log.LevelDebug)
log.ExcludeFromPattern("tapir") log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700) os.Mkdir("tordir", 0700)
@ -151,13 +151,6 @@ func TestFileSharing(t *testing.T) {
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true) bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
t.Logf("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
alice.AcceptConversation(1)
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality := filesharing.FunctionalityGate() filesharingFunctionality := filesharing.FunctionalityGate()
@ -167,10 +160,10 @@ func TestFileSharing(t *testing.T) {
} }
alice.SendMessage(1, fileSharingMessage) alice.SendMessage(1, fileSharingMessage)
bob.AcceptConversation(1)
// Wait for the messages to arrive... // Ok this is fun...we just Sent a Message we may not have a connection yet...
time.Sleep(time.Second * 10) // so this test will only pass if sending offline works...
waitForPeerPeerConnection(t, bob, alice)
bob.SendMessage(1, "this is a test message") bob.SendMessage(1, "this is a test message")
bob.SendMessage(1, "this is another test message") bob.SendMessage(1, "this is another test message")