Compare commits

..

No commits in common. "master" and "fixups" have entirely different histories.

23 changed files with 120 additions and 475 deletions

View File

@ -5,29 +5,27 @@ name: linux-test
steps:
- name: fetch
image: golang:1.21.5
image: golang:1.19.1
volumes:
- name: deps
path: /go
commands:
- go install honnef.co/go/tools/cmd/staticcheck@latest
- go install go.uber.org/nilaway/cmd/nilaway@latest
- wget https://git.openprivacy.ca/openprivacy/buildfiles/raw/branch/master/tor/tor-0.4.8.9-linux-x86_64.tar.gz -O tor.tar.gz
- tar -xzf tor.tar.gz
- chmod a+x Tor/tor
- export PATH=$PWD/Tor/:$PATH
- export LD_LIBRARY_PATH=$PWD/Tor/
- tor --version
- wget https://git.openprivacy.ca/openprivacy/buildfiles/raw/master/tor/tor
- wget https://git.openprivacy.ca/openprivacy/buildfiles/raw/master/tor/torrc
- chmod a+x tor
- go get -u golang.org/x/lint/golint
- export GO111MODULE=on
- go mod vendor
- name: quality
image: golang:1.21.5
image: golang:1.19.1
volumes:
- name: deps
path: /go
commands:
- ./testing/quality.sh
- staticcheck ./...
- name: units-tests
image: golang:1.21.5
image: golang:1.19.1
volumes:
- name: deps
path: /go
@ -35,32 +33,28 @@ steps:
- export PATH=`pwd`:$PATH
- sh testing/tests.sh
- name: integ-test
image: golang:1.21.5
image: golang:1.19.1
volumes:
- name: deps
path: /go
commands:
- export PATH=$PWD/Tor/:$PATH
- export LD_LIBRARY_PATH=$PWD/Tor/
- tor --version
- export PATH=`pwd`:$PATH
- go test -timeout=30m -race -v cwtch.im/cwtch/testing/
- name: filesharing-integ-test
image: golang:1.21.5
image: golang:1.19.1
volumes:
- name: deps
path: /go
commands:
- export PATH=$PWD/Tor/:$PATH
- export LD_LIBRARY_PATH=$PWD/Tor/
- export PATH=`pwd`:$PATH
- go test -timeout=20m -race -v cwtch.im/cwtch/testing/filesharing
- name: filesharing-autodownload-integ-test
image: golang:1.21.5
image: golang:1.19.1
volumes:
- name: deps
path: /go
commands:
- export PATH=$PWD/Tor/:$PATH
- export LD_LIBRARY_PATH=$PWD/Tor/
- export PATH=`pwd`:$PATH
- go test -timeout=20m -race -v cwtch.im/cwtch/testing/autodownload
- name: notify-gogs
image: openpriv/drone-gogs

4
.gitignore vendored
View File

@ -33,6 +33,4 @@ data-dir-cwtchtool/
tokens
tordir/
testing/autodownload/download_dir
testing/autodownload/storage
*.swp
testing/managerstorage/*
testing/autodownload/storage

View File

@ -363,7 +363,6 @@ func (app *application) LoadProfiles(password string) {
func (app *application) registerHooks(profile peer.CwtchPeer) {
// Register Hooks
profile.RegisterHook(extensions.ProfileValueExtension{})
profile.RegisterHook(extensions.SendWhenOnlineExtension{})
profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality))

View File

@ -302,7 +302,8 @@ func (cr *contactRetry) run() {
cr.authorizedPeers.Store(id, true)
if c, ok := cr.connections.Load(id); ok {
contact := c.(*contact)
if contact.state == connections.DISCONNECTED {
if contact.state == connections.DISCONNECTED && !contact.queued {
// prioritize connections made in the last week
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
cr.priorityQueue.insert(contact)
@ -460,10 +461,10 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta
cr.connections.Store(id, p)
return
} else {
// we have rerequested this connnection, probably via an explicit ask, update it's state
if c, ok := cr.connections.Load(id); ok {
contact := c.(*contact)
contact.state = state
// we have rerequested this connnection. Force set the queued parameter to true.
p, _ := cr.connections.Load(id)
if !p.(*contact).queued {
p.(*contact).queued = true
}
}
}

View File

@ -36,19 +36,19 @@ func TestContactRetryQueue(t *testing.T) {
// progress...
setup := false
for !setup {
if _, exists := cr.connections.Load(testOnion); exists {
if _, exists := cr.authorizedPeers.Load(testOnion); exists {
t.Logf("authorized")
setup = true
if pinf, exists := cr.connections.Load(testOnion); exists {
if pinf.(*contact).queued {
if _, exists := cr.authorizedPeers.Load(testOnion); exists {
t.Logf("authorized")
setup = true
}
}
}
}
// We should very quickly become connecting...
time.Sleep(time.Second)
pinf, _ := cr.connections.Load(testOnion)
if pinf.(*contact).state != 1 {
t.Fatalf("test connection should be in connecting after update, actually: %v", pinf.(*contact).state)
if pinf.(*contact).queued == false {
t.Fatalf("test connection should be queued, actually: %v", pinf.(*contact).queued)
}
// Asset that "test" is authenticated
@ -63,12 +63,19 @@ func TestContactRetryQueue(t *testing.T) {
// Publish an unrelated event to trigger the Plugin to go through a queuing cycle
// If we didn't do this we would have to wait 30 seconds for a check-in
bus.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{event.RemotePeer: "test2", event.ConnectionState: "Disconnected"}))
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: testOnion, event.LastSeen: time.Now().Format(time.RFC3339Nano)}))
time.Sleep(time.Second)
if pinf.(*contact).queued != false {
t.Fatalf("test connection should not be queued, actually: %v", pinf.(*contact).queued)
}
// Publish a new peer request...
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: testOnion}))
time.Sleep(time.Second) // yield for a second so the event can catch up...
// Peer test should be forced to queue....
pinf, _ = cr.connections.Load(testOnion)
if pinf.(*contact).state != 1 {
t.Fatalf("test connection should be in connecting after update, actually: %v", pinf.(*contact).state)
if pinf.(*contact).queued != true {
t.Fatalf("test connection should be forced to queue after new queue peer request")
}
cr.Shutdown()

View File

@ -95,11 +95,6 @@ func (em *manager) initialize() {
func (em *manager) Subscribe(eventType Type, queue Queue) {
em.mapMutex.Lock()
defer em.mapMutex.Unlock()
for _, sub := range em.subscribers[eventType] {
if sub == queue {
return // don't add the same queue for the same event twice...
}
}
em.subscribers[eventType] = append(em.subscribers[eventType], queue)
}

View File

@ -104,15 +104,6 @@ func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, c
val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
}
// NOTE: Cwtch 1.15+ requires that profiles be able to restrict file downloading to specific contacts. As such we need an ACL check here
// on the fileshareing zone.
// TODO: Split this functionality into FilesharingFunctionality, and restrict this function to only considering Profile zoned attributes?
if zone == attr.FilesharingZone {
if !conversation.GetPeerAC().ShareFiles {
return
}
}
// Construct a Response
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)})
resp.EventID = eventID

View File

@ -1,66 +0,0 @@
package extensions
import (
"strconv"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
)
// SendWhenOnlineExtension implements automatic sending
// Some Considerations:
// - There are race conditions inherant in this approach e.g. a peer could go offline just after recieving a message and never sending an ack
// - In that case the next time we connect we will send a duplicate message.
// - Currently we do not include metadata like sent time in raw peer protocols (however Overlay does now have support for that information)
type SendWhenOnlineExtension struct {
}
func (soe SendWhenOnlineExtension) NotifySettingsUpdate(_ settings.GlobalSettings) {
}
func (soe SendWhenOnlineExtension) EventsToRegister() []event.Type {
return []event.Type{event.PeerStateChange}
}
func (soe SendWhenOnlineExtension) ExperimentsToRegister() []string {
return nil
}
func (soe SendWhenOnlineExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
// Check the last 100 messages, if any of them are pending, then send them now...
messsages, _ := profile.GetMostRecentMessages(ci.ID, 0, 0, uint(100))
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
body := message.Body
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
log.Debugf("resending message that was sent when peer was offline")
profile.PublishEvent(ev)
}
}
}
}
}
}
// OnContactReceiveValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) {
}
// OnContactRequestValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) {
}

View File

@ -272,21 +272,15 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int,
}
// Don't download files if the download file directory does not exist
// Unless we are on Android where the kernel wishes to keep us ignorant of the
// actual path and/or existence of the file. We handle this case further down
// the line when the manifest is received and protocol engine and the Android layer
// negotiate a temporary local file -> final file copy. We don't want to worry
// about that here...
if runtime.GOOS != "android" {
if _, err := os.Stat(path.Dir(downloadFilePath)); os.IsNotExist(err) {
return errors.New("download directory does not exist")
}
// Don't download files if the manifest file directory does not exist
if _, err := os.Stat(path.Dir(manifestFilePath)); os.IsNotExist(err) {
return errors.New("manifest directory does not exist")
}
if _, err := os.Stat(path.Dir(downloadFilePath)); os.IsNotExist(err) {
return errors.New("download directory does not exist")
}
// Don't download files if the manifest file directory does not exist
if _, err := os.Stat(path.Dir(manifestFilePath)); os.IsNotExist(err) {
return errors.New("manifest directory does not exist")
}
// Store local.filesharing.filekey.manifest as the location of the manifest
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), manifestFilePath)

View File

@ -38,14 +38,14 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
case event.NewMessageFromPeer:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
if ci.GetPeerAC().RenderImages {
if ci.Accepted {
i.handleImagePreviews(profile, &ev, ci.ID, ci.ID)
}
}
case event.NewMessageFromGroup:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
if ci.GetPeerAC().RenderImages {
if ci.Accepted {
i.handleImagePreviews(profile, &ev, ci.ID, ci.ID)
}
}
@ -62,15 +62,7 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
if err == nil {
for _, ci := range conversations {
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
// if we have enabled file shares for this contact, then send them our profile image
// NOTE: In the past, Cwtch treated "profile image" as a public file share. As such, anyone with the file key and who is able
// to authenticate with the profile (i.e. non-blocked peers) can download the file (if the global profile images experiment is enabled)
// To better allow for fine-grained permissions (and to support hybrid group permissions), we want to enable per-conversation file
// sharing permissions. As such, profile images are now only shared with contacts with that permission enabled.
// (i.e. all previous accepted contacts, new accepted contacts, and contacts who have this toggle set explictly)
if ci.GetPeerAC().ShareFiles {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
}
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
}
}
}
@ -98,7 +90,7 @@ func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPee
_, zone, path := path.GetScopeZonePath()
if exists && zone == attr.ProfileZone && path == constants.CustomProfileImageKey {
// We only download from accepted conversations
if conversation.GetPeerAC().RenderImages {
if conversation.Accepted {
fileKey := value
basepath := i.downloadFolder
fsf := FunctionalityGate()
@ -131,16 +123,6 @@ func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPee
// handleImagePreviews checks settings and, if appropriate, auto-downloads any images
func (i *ImagePreviewsFunctionality) handleImagePreviews(profile peer.CwtchPeer, ev *event.Event, conversationID, senderID int) {
if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) {
ci, err := profile.GetConversationInfo(senderID)
if err != nil {
log.Errorf("attempted to call handleImagePreviews with unknown conversation: %v", senderID)
return
}
if !ci.GetPeerAC().ShareFiles || !ci.GetPeerAC().RenderImages {
log.Infof("refusing to autodownload files from sender: %v. conversation AC does not permit image rendering", senderID)
return
}
// Short-circuit failures
// Don't auto-download images if the download path does not exist.
@ -160,7 +142,7 @@ func (i *ImagePreviewsFunctionality) handleImagePreviews(profile peer.CwtchPeer,
// Now look at the image preview experiment
var cm model.MessageWrapper
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
err := json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err == nil && cm.Overlay == model.OverlayFileSharing {
log.Debugf("Received File Sharing Message")
var fm OverlayMessage

View File

@ -67,8 +67,3 @@ const ProfileAttribute3 = "profile-attribute-3"
// Description is used on server contacts,
const Description = "description"
// Used to store the status of acl migrations
const ACLVersion = "acl-version"
const ACLVersionOne = "acl-v1"
const ACLVersionTwo = "acl-v2"

View File

@ -1,36 +1,22 @@
package model
import (
"encoding/json"
"time"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"encoding/json"
"time"
)
// AccessControl is a type determining client assigned authorization to a peer
// for a given conversation
type AccessControl struct {
Blocked bool // Any attempts from this handle to connect are blocked overrides all other settings
// Basic Conversation Rights
Read bool // Allows a handle to access the conversation
Append bool // Allows a handle to append new messages to the conversation
AutoConnect bool // Profile should automatically try to connect with peer
ExchangeAttributes bool // Profile should automatically exchange attributes like Name, Profile Image, etc.
// Extension Related Permissions
ShareFiles bool // Allows a handle to share files to a conversation
RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact
Blocked bool // Any attempts from this handle to connect are blocked
Read bool // Allows a handle to access the conversation
Append bool // Allows a handle to append new messages to the conversation
}
// DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions.
// DefaultP2PAccessControl - because in the year 2021, go does not support constant structs...
func DefaultP2PAccessControl() AccessControl {
return AccessControl{Read: true, Append: true, ExchangeAttributes: true, Blocked: false,
AutoConnect: true, ShareFiles: false, RenderImages: false}
return AccessControl{Read: true, Append: true, Blocked: false}
}
// AccessControlList represents an access control list for a conversation. Mapping handles to conversation
@ -44,10 +30,10 @@ func (acl *AccessControlList) Serialize() []byte {
}
// DeserializeAccessControlList takes in JSON and returns an AccessControlList
func DeserializeAccessControlList(data []byte) (AccessControlList, error) {
func DeserializeAccessControlList(data []byte) AccessControlList {
var acl AccessControlList
err := json.Unmarshal(data, &acl)
return acl, err
json.Unmarshal(data, &acl)
return acl
}
// Attributes a type-driven encapsulation of an Attribute map.
@ -61,12 +47,8 @@ func (a *Attributes) Serialize() []byte {
// DeserializeAttributes converts a JSON struct into an Attributes map
func DeserializeAttributes(data []byte) Attributes {
attributes := make(Attributes)
err := json.Unmarshal(data, &attributes)
if err != nil {
log.Error("error deserializing attributes (this is likely a programming error): %v", err)
return make(Attributes)
}
var attributes Attributes
json.Unmarshal(data, &attributes)
return attributes
}
@ -78,9 +60,7 @@ type Conversation struct {
Handle string
Attributes Attributes
ACL AccessControlList
// Deprecated, please use ACL for permissions related functions
Accepted bool
Accepted bool
}
// GetAttribute is a helper function that fetches a conversation attribute by scope, zone and key
@ -91,21 +71,6 @@ func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key strin
return "", false
}
// GetPeerAC returns a suitable Access Control object for a the given peer conversation
// If this is called for a group conversation, this method will error and return a safe default AC.
func (ci *Conversation) GetPeerAC() AccessControl {
if acl, exists := ci.ACL[ci.Handle]; exists {
return acl
}
log.Errorf("attempted to access a Peer Access Control object from %v but peer ACL is undefined. This is likely a programming error", ci.Handle)
return DefaultP2PAccessControl()
}
// IsCwtchPeer is a helper attribute that identifies whether a conversation is a cwtch peer
func (ci *Conversation) IsCwtchPeer() bool {
return tor.IsValidHostname(ci.Handle)
}
// IsGroup is a helper attribute that identifies whether a conversation is a legacy group
func (ci *Conversation) IsGroup() bool {
if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists {

View File

@ -3,7 +3,6 @@ package model
import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
)
// CalculateContentHash derives a hash using the author and the message body. It is intended to be
@ -13,13 +12,3 @@ func CalculateContentHash(author string, messageBody string) string {
contentBasedHash := sha256.Sum256(content)
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
}
func DeserializeMessage(message string) (*MessageWrapper, error) {
var cm MessageWrapper
err := json.Unmarshal([]byte(message), &cm)
if err != nil {
return nil, err
}
return &cm, err
}

View File

@ -1,40 +1,9 @@
package model
import (
"time"
)
// MessageWrapper is the canonical Cwtch overlay wrapper
type MessageWrapper struct {
Overlay int `json:"o"`
Data string `json:"d"`
// when the data was assembled
SendTime *time.Time `json:"s,omitempty"`
// when the data was transmitted (by protocol engine e.g. over Tor)
TransitTime *time.Time `json:"t,omitempty"`
// when the data was received
RecvTime *time.Time `json:"r,omitempty"`
}
// Channel is defined as being the last 3 bits of the overlay id
// Channel 0 is reserved for the main conversation
// Channel 2 is reserved for conversation admin (managed groups)
// Channel 7 is reserved for streams (no ack, no store)
func (mw MessageWrapper) Channel() int {
if mw.Overlay > 1024 {
return mw.Overlay & 0x07
}
// for backward compatibilty all overlays less than 0x400 i.e. 1024 are
// mapped to channel 0 regardless of their channel status.
return 0
}
// If Overlay is a Stream Message it should not be ackd, or stored.
func (mw MessageWrapper) IsStream() bool {
return mw.Channel() == 0x07
}
// OverlayChat is the canonical identifier for chat overlays

View File

@ -3,20 +3,11 @@ package peer
import (
"context"
"crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os"
path "path/filepath"
"sort"
@ -25,7 +16,17 @@ import (
"sync"
"time"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
@ -93,7 +94,7 @@ func (cp *cwtchPeer) EnhancedImportBundle(importString string) string {
return cp.ImportBundle(importString).Error()
}
func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count uint) string {
func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count int) string {
var emessages = make([]EnhancedMessage, count)
messages, err := cp.GetMostRecentMessages(conversation, 0, index, count)
@ -309,33 +310,7 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
authorizations := make(map[string]model.Authorization)
for _, conversation := range conversations {
// Only perform the following actions for Peer-type Conversaions...
if conversation.IsCwtchPeer() {
// if this profile does not have an ACL version, and the profile is accepted (OR the acl version is v1 and the profile is accepted...)
// then migrate the permissions to the v2 ACL
// migrate the old accepted AC to a new fine-grained one
// we only do this for previously trusted connections
// NOTE: this does not supercede global cwthch experiments settings
// if share files is turned off globally then acl.ShareFiles will be ignored
// Note: There was a bug in the original EP code that meant that some acl-v1 profiles did not get ShareFiles or RenderImages - this corrects that.
if version, exists := conversation.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.ACLVersion); !exists || version == constants.ACLVersionOne {
if conversation.Accepted {
if ac, exists := conversation.ACL[conversation.Handle]; exists {
ac.ShareFiles = true
ac.RenderImages = true
ac.AutoConnect = true
ac.ExchangeAttributes = true
conversation.ACL[conversation.Handle] = ac
}
// Update the ACL Version
cp.storage.SetConversationAttribute(conversation.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionTwo)
// Store the updated ACL
cp.storage.SetConversationACL(conversation.ID, conversation.ACL)
}
}
if tor.IsValidHostname(conversation.Handle) {
if conversation.ACL[conversation.Handle].Blocked {
authorizations[conversation.Handle] = model.AuthBlocked
} else {
@ -435,19 +410,12 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
if tor.IsValidHostname(conversationInfo.Handle) {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
id := -1
// check if we should store this message locally...
if cm, err := model.DeserializeMessage(message); err == nil {
if !cm.IsStream() {
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
}
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
cp.eventBus.Publish(ev)
return id, nil
} else {
@ -714,60 +682,13 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
conversationInfo, _ := cp.storage.GetConversationByHandle(handle)
if conversationInfo == nil {
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
if err != nil {
log.Errorf("unable to create a new contact conversation: %v", err)
return -1, err
}
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
if accepted {
// If this call came from a trusted action (i.e. import bundle or accept button then accept the conversation)
// This assigns all permissions (and in v2 is currently the default state of trusted contacts)
// Accept conversation does PeerWithOnion
cp.AcceptConversation(conversationID)
}
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionTwo)
cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle}))
return conversationID, err
}
return -1, fmt.Errorf("contact conversation already exists")
}
// UpdateConversationAccessControlList is a genric ACL update method
func (cp *cwtchPeer) UpdateConversationAccessControlList(id int, acl model.AccessControlList) error {
return cp.storage.SetConversationACL(id, acl)
}
// EnhancedUpdateConversationAccessControlList wraps UpdateConversationAccessControlList and allows updating via a serialized JSON struct
func (cp *cwtchPeer) EnhancedUpdateConversationAccessControlList(id int, json string) error {
_, err := cp.GetConversationInfo(id)
if err == nil {
acl, err := model.DeserializeAccessControlList([]byte(json))
if err == nil {
return cp.UpdateConversationAccessControlList(id, acl)
}
return err
}
return err
}
// GetConversationAccessControlList returns the access control list associated with the conversation
func (cp *cwtchPeer) GetConversationAccessControlList(id int) (model.AccessControlList, error) {
ci, err := cp.GetConversationInfo(id)
if err == nil {
return ci.ACL, nil
}
return nil, err
}
// EnhancedGetConversationAccessControlList serialzies the access control list associated with the conversation
func (cp *cwtchPeer) EnhancedGetConversationAccessControlList(id int) (string, error) {
ci, err := cp.GetConversationInfo(id)
if err == nil {
return string(ci.ACL.Serialize()), nil
}
return "", err
}
// AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true`
// This will cause Cwtch to auto connect to this conversation on start up
func (cp *cwtchPeer) AcceptConversation(id int) error {
@ -780,21 +701,6 @@ func (cp *cwtchPeer) AcceptConversation(id int) error {
log.Errorf("Could not get conversation for %v: %v", id, err)
return err
}
if ac, exists := ci.ACL[ci.Handle]; exists {
ac.ShareFiles = true
ac.AutoConnect = true
ac.RenderImages = true
ac.ExchangeAttributes = true
ci.ACL[ci.Handle] = ac
}
err = cp.storage.SetConversationACL(id, ci.ACL)
if err != nil {
log.Errorf("Could not set conversation acl for %v: %v", id, err)
return err
}
if !ci.IsGroup() && !ci.IsServer() {
cp.sendUpdateAuth(id, ci.Handle, ci.Accepted, ci.ACL[ci.Handle].Blocked)
cp.PeerWithOnion(ci.Handle)
@ -842,7 +748,7 @@ func (cp *cwtchPeer) UnblockConversation(id int) error {
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
cp.sendUpdateAuth(id, ci.Handle, ci.Accepted, ci.ACL[ci.Handle].Blocked)
if !ci.IsGroup() && !ci.IsServer() && ci.GetPeerAC().AutoConnect {
if !ci.IsGroup() && !ci.IsServer() && ci.Accepted {
cp.PeerWithOnion(ci.Handle)
}
@ -982,7 +888,7 @@ func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int,
}
// GetMostRecentMessages returns a selection of messages, ordered by most recently inserted
func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error) {
func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit)
}
@ -1150,7 +1056,7 @@ func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
ci, err := cp.FetchConversationInfo(handle)
if err == nil {
lastSeen := cp.GetConversationLastSeenTime(ci.ID)
if !ci.ACL[ci.Handle].Blocked {
if !ci.ACL[ci.Handle].Blocked && ci.Accepted {
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
}
}
@ -1268,9 +1174,9 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
return ConstructResponse(constants.ImportBundlePrefix, "success")
} else if tor.IsValidHostname(importString) {
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
// NOTE: Not NewContactConversation implictly does AcceptConversation AND PeerWithOnion if relevant so
// we no longer need to do it here...
if err == nil {
// Assuming all is good, we should peer with this contact.
cp.PeerWithOnion(importString)
return ConstructResponse(constants.ImportBundlePrefix, "success")
}
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
@ -1410,7 +1316,7 @@ func (cp *cwtchPeer) getConnectionsSortedByLastSeen(doPeers, doServers bool) []*
continue
}
} else {
if !doPeers {
if !doPeers || !conversation.Accepted {
continue
}
}
@ -1433,9 +1339,7 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
cp.QueueJoinServer(conversation.model.Handle)
} else {
log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle)
if conversation.model.GetPeerAC().AutoConnect {
cp.QueuePeeringWithOnion(conversation.model.Handle)
}
cp.QueuePeeringWithOnion(conversation.model.Handle)
}
time.Sleep(50 * time.Millisecond)
}
@ -1499,13 +1403,6 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
}
}
// Don't store messages in channel 7
if cm, err := model.DeserializeMessage(message); err == nil {
if cm.IsStream() {
return -1, nil
}
}
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
@ -1610,7 +1507,8 @@ func (cp *cwtchPeer) eventHandler() {
conversationInfo, err := cp.FetchConversationInfo(onion)
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
if conversationInfo != nil && conversationInfo.GetPeerAC().ExchangeAttributes {
// only accepted contacts can look up information
if conversationInfo != nil && conversationInfo.Accepted {
// Type Safe Scoped/Zoned Path
zscope := attr.IntoScope(scope)
zone, zpath := attr.ParseZone(zpath)
@ -1642,7 +1540,7 @@ func (cp *cwtchPeer) eventHandler() {
conversationInfo, _ := cp.FetchConversationInfo(handle)
// only accepted contacts can look up information
if conversationInfo != nil && conversationInfo.GetPeerAC().ExchangeAttributes {
if conversationInfo != nil && conversationInfo.Accepted {
// Type Safe Scoped/Zoned Path
zscope := attr.IntoScope(scope)
zone, zpath := attr.ParseZone(zpath)
@ -1682,7 +1580,6 @@ func (cp *cwtchPeer) eventHandler() {
timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED {
ci, err := cp.FetchConversationInfo(handle)
if err == nil {

View File

@ -376,12 +376,7 @@ func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.C
}
rows.Close()
cacl, err := model.DeserializeAccessControlList(acl)
if err != nil {
log.Errorf("error deserializing ACL from database, database maybe corrupted: %v", err)
return nil, err
}
return &model.Conversation{ID: id, Handle: handle, ACL: cacl, Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
}
// FetchConversations returns *all* active conversations. This method should only be called
@ -417,13 +412,7 @@ func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, err
rows.Close()
return nil, err
}
cacl, err := model.DeserializeAccessControlList(acl)
if err != nil {
log.Errorf("error deserializing ACL from database, database maybe corrupted: %v", err)
return nil, err
}
conversations = append(conversations, &model.Conversation{ID: id, Handle: handle, ACL: cacl, Attributes: model.DeserializeAttributes(attributes), Accepted: accepted})
conversations = append(conversations, &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted})
}
}
@ -456,12 +445,7 @@ func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, er
}
rows.Close()
cacl, err := model.DeserializeAccessControlList(acl)
if err != nil {
log.Errorf("error deserializing ACL from database, database maybe corrupted: %v", err)
return nil, err
}
return &model.Conversation{ID: id, Handle: handle, ACL: cacl, Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
}
// AcceptConversation sets the accepted status of a conversation to true in the backing datastore
@ -797,7 +781,7 @@ func (cps *CwtchProfileStorage) SearchMessages(conversation int, channel int, pa
}
// GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error) {
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()

View File

@ -120,19 +120,9 @@ type CwtchPeer interface {
ArchiveConversation(conversation int)
GetConversationInfo(conversation int) (*model.Conversation, error)
FetchConversationInfo(handle string) (*model.Conversation, error)
// API-level management of conversation access control
UpdateConversationAccessControlList(id int, acl model.AccessControlList) error
EnhancedUpdateConversationAccessControlList(conversation int, acjson string) error
GetConversationAccessControlList(conversation int) (model.AccessControlList, error)
EnhancedGetConversationAccessControlList(conversation int) (string, error)
// Convieniance Functions for ACL Management
AcceptConversation(conversation int) error
BlockConversation(conversation int) error
UnblockConversation(conversation int) error
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
DeleteConversation(conversation int) error
@ -141,7 +131,7 @@ type CwtchPeer interface {
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
GetChannelMessageCount(conversation int, channel int) (int, error)
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
SearchConversations(pattern string) string
@ -152,7 +142,7 @@ type CwtchPeer interface {
EnhancedGetMessageByContentHash(conversation int, hash string) string
// EnhancedGetMessages returns a set of json-encoded enhanced messages, suitable for rendering in a UI
EnhancedGetMessages(conversation int, index int, count uint) string
EnhancedGetMessages(conversation int, index int, count int) string
// Server Token APIS
// TODO move these to feature protected interfaces

View File

@ -749,16 +749,6 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
// Fall through handler for the default text conversation.
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
// Don't ack messages in channel 7
// Note: this code explictly doesn't care about malformed messages, we deal with them
// later on...we still want to ack the original send...(as some "malformed" messages
// may be future-ok)
if cm, err := model.DeserializeMessage(string(message)); err == nil {
if cm.IsStream() {
return
}
}
// Send an explicit acknowledgement
// Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {

View File

@ -2,14 +2,12 @@ package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/openprivacy/log"
"sync/atomic"
"time"
)
const cwtchCapability = tapir.Capability("cwtchCapability")
@ -135,14 +133,6 @@ func (pa *PeerApp) listen() {
pa.version.Store(Version2)
}
} else {
if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil {
if cm.TransitTime != nil {
rt := time.Now().UTC()
cm.RecvTime = &rt
data, _ := json.Marshal(cm)
packet.Data = data
}
}
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data)
}
}
@ -158,15 +148,6 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
var serialized []byte
var err error
if cm, err := model.DeserializeMessage(string(message.Data)); err == nil {
if cm.SendTime != nil {
tt := time.Now().UTC()
cm.TransitTime = &tt
data, _ := json.Marshal(cm)
message.Data = data
}
}
if pa.version.Load() == Version2 {
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
serialized = message.Serialize()

View File

@ -35,7 +35,6 @@ type GlobalSettings struct {
Locale string
Theme string
ThemeMode string
ThemeImages bool
PreviousPid int64
ExperimentsEnabled bool
Experiments map[string]bool
@ -63,9 +62,7 @@ type GlobalSettings struct {
var DefaultGlobalSettings = GlobalSettings{
Locale: "en",
Theme: "cwtch",
ThemeMode: "dark",
ThemeImages: false,
Theme: "dark",
PreviousPid: -1,
ExperimentsEnabled: false,
Experiments: map[string]bool{constants.MessageFormattingExperiment: true},

View File

@ -145,23 +145,10 @@ func TestFileSharing(t *testing.T) {
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
json, err := alice.EnhancedGetConversationAccessControlList(1)
if err != nil {
t.Fatalf("Error!: %v", err)
}
t.Logf("alice<->bob ACL: %s", json)
t.Logf("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
err = alice.AcceptConversation(1)
if err != nil {
t.Fatalf("Error!: %v", err)
}
err = bob.AcceptConversation(1)
if err != nil {
t.Fatalf("Error!: %v", err)
}
alice.AcceptConversation(1)
bob.AcceptConversation(1)
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality := filesharing.FunctionalityGate()

View File

@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelDebug)
log.SetLevel(log.LevelInfo)
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
@ -151,6 +151,13 @@ func TestFileSharing(t *testing.T) {
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
t.Logf("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
alice.AcceptConversation(1)
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality := filesharing.FunctionalityGate()
@ -160,10 +167,10 @@ func TestFileSharing(t *testing.T) {
}
alice.SendMessage(1, fileSharingMessage)
bob.AcceptConversation(1)
// Ok this is fun...we just Sent a Message we may not have a connection yet...
// so this test will only pass if sending offline works...
waitForPeerPeerConnection(t, bob, alice)
// Wait for the messages to arrive...
time.Sleep(time.Second * 10)
bob.SendMessage(1, "this is a test message")
bob.SendMessage(1, "this is another test message")

View File

@ -4,25 +4,24 @@ echo "Checking code quality (you want to see no output here)"
echo ""
echo ""
echo "Running staticcheck..."
echo "Linting:"
staticcheck ./...
# In the future we should remove include-pkgs. However, there are a few false positives in the overall go stdlib that make this
# In the future we should enable this by default. However, there are a few false positives that make this
# too noisy right now, specifically assigning nil to initialize slices (safe), and using go internal context channels assigned
# nil (also safe).
# We also have one file infinite_channel.go written in a way that static analysis cannot reason about easily. So it is explictly
# ignored.
echo "Running nilaway..."
nilaway -include-pkgs="cwtch.im/cwtch,cwtch.im/tapir,git.openprivacy.ca/openprivacy/connectivity" -exclude-file-docstrings="nolint:nilaway" ./...
# nilaway -exclude-file-docstrings="nolint:nilaway" ./...
echo "Time to format"
gofmt -l -s -w .
# ineffassign (https://github.com/gordonklaus/ineffassign)
# echo "Checking for ineffectual assignment of errors (unchecked errors...)"
# ineffassign .
echo "Checking for ineffectual assignment of errors (unchecked errors...)"
ineffassign ./..
# misspell (https://github.com/client9/misspell/cmd/misspell)
# echo "Checking for misspelled words..."
# misspell . | grep -v "testing/" | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea"
echo "Checking for misspelled words..."
misspell . | grep -v "testing/" | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea"