Port Autodownload / Image Previews / Profile Image Experiment to Cwtch
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2023-03-06 13:06:15 -08:00
parent 7bb75e4365
commit d50f210e35
15 changed files with 408 additions and 26 deletions

View File

@ -48,6 +48,14 @@ steps:
commands:
- export PATH=`pwd`:$PATH
- go test -timeout=20m -race -v cwtch.im/cwtch/testing/filesharing
- name: filesharing-autodownload-integ-test
image: golang:1.19.1
volumes:
- name: deps
path: /go
commands:
- export PATH=`pwd`:$PATH
- go test -timeout=20m -race -v cwtch.im/cwtch/testing/autodownload
- name: notify-gogs
image: openpriv/drone-gogs
pull: if-not-exists

View File

@ -10,6 +10,7 @@ import (
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
@ -32,7 +33,7 @@ type application struct {
appBus event.Manager
appmutex sync.Mutex
settings *GlobalSettingsFile
settings *settings.GlobalSettingsFile
}
func (app *application) IsFeatureEnabled(experiment string) bool {
@ -64,8 +65,8 @@ type Application interface {
ActivatePeerEngine(onion string)
DeactivatePeerEngine(onion string)
ReadSettings() GlobalSettings
UpdateSettings(settings GlobalSettings)
ReadSettings() settings.GlobalSettings
UpdateSettings(settings settings.GlobalSettings)
IsFeatureEnabled(experiment string) bool
ShutdownPeer(string)
@ -78,14 +79,14 @@ type Application interface {
// LoadProfileFn is the function signature for a function in an app that loads a profile
type LoadProfileFn func(profile peer.CwtchPeer)
func LoadAppSettings(appDirectory string) *GlobalSettingsFile {
func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
log.Debugf("NewApp(%v)\n", appDirectory)
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
// Note: we basically presume this doesn't fail. If the file doesn't exist we create it, and as such the
// only plausible error conditions are related to file create e.g. low disk space. If that is the case then
// many other parts of Cwtch are likely to fail also.
settings, err := InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles)
settings, err := settings.InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles)
if err != nil {
log.Errorf("error initializing global settings file %. Global settings might not be loaded or saves", err)
}
@ -93,7 +94,7 @@ func LoadAppSettings(appDirectory string) *GlobalSettingsFile {
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string, settings *GlobalSettingsFile) Application {
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
app.peers = make(map[string]peer.CwtchPeer)
@ -108,13 +109,13 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *GlobalSettingsF
return app
}
func (app *application) ReadSettings() GlobalSettings {
func (app *application) ReadSettings() settings.GlobalSettings {
app.appmutex.Lock()
defer app.appmutex.Unlock()
return app.settings.ReadGlobalSettings()
}
func (app *application) UpdateSettings(settings GlobalSettings) {
func (app *application) UpdateSettings(settings settings.GlobalSettings) {
// don't allow any other application changes while settings update
app.appmutex.Lock()
defer app.appmutex.Unlock()
@ -133,6 +134,8 @@ func (app *application) UpdateSettings(settings GlobalSettings) {
} else {
profile.AllowUnknownConnections()
}
profile.NotifySettingsUpdate(settings)
}
}
@ -347,6 +350,7 @@ func (app *application) registerHooks(profile peer.CwtchPeer) {
// Register Hooks
profile.RegisterHook(extensions.ProfileValueExtension{})
profile.RegisterHook(filesharing.Functionality{})
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
}
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true

View File

@ -6,6 +6,7 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
"strconv"
)
@ -14,6 +15,9 @@ import (
type ProfileValueExtension struct {
}
func (pne ProfileValueExtension) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (pne ProfileValueExtension) EventsToRegister() []event.Type {
return nil
}

View File

@ -3,6 +3,7 @@ package filesharing
import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/settings"
"encoding/hex"
"encoding/json"
"errors"
@ -30,8 +31,11 @@ import (
type Functionality struct {
}
func (f Functionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f Functionality) EventsToRegister() []event.Type {
return []event.Type{event.ManifestReceived, event.FileDownloaded}
return []event.Type{event.ProtocolEngineCreated, event.ManifestReceived, event.FileDownloaded}
}
func (f Functionality) ExperimentsToRegister() []string {
@ -133,13 +137,9 @@ func (f Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversatio
}
}
// FunctionalityGate returns filesharing if enabled in the given experiment map
// Note: Experiment maps are currently in libcwtch-go
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
if experimentMap[constants.FileSharingExperiment] {
return new(Functionality), nil
}
return nil, errors.New("filesharing is not enabled")
// FunctionalityGate returns filesharing functionality - gates now happen on function calls.
func FunctionalityGate() *Functionality {
return new(Functionality)
}
// PreviewFunctionalityGate returns filesharing if image previews are enabled

View File

@ -0,0 +1,139 @@
package filesharing
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/settings"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"os"
"strconv"
"time"
)
type ImagePreviewsFunctionality struct {
downloadFolder string
}
func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
i.downloadFolder = settings.DownloadPath
}
func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup}
}
func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string {
return []string{constants.FileSharingExperiment, constants.ImagePreviewsExperiment}
}
func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) {
switch ev.EventType {
case event.NewMessageFromPeer:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
if ci.Accepted {
i.handleImagePreviews(profile, &ev, ci.ID, ci.ID)
}
}
case event.NewMessageFromGroup:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
if ci.Accepted {
i.handleImagePreviews(profile, &ev, ci.ID, ci.ID)
}
}
case event.ProtocolEngineCreated:
// Now that the Peer Engine is Activated, Reshare Profile Images
key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
if exists {
serializedManifest, _ := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key))
// reset the share timestamp, currently file shares are hardcoded to expire after 30 days...
// we reset the profile image here so that it is always available.
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", key), strconv.FormatInt(time.Now().Unix(), 10))
log.Debugf("Custom Profile Image: %v %s", key, serializedManifest)
}
// If file sharing is enabled then reshare all active files...
fsf := FunctionalityGate()
fsf.ReShareFiles(profile)
}
}
}
func (i ImagePreviewsFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
}
func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) {
_, zone, path := path.GetScopeZonePath()
if zone == attr.ProfileZone && path == constants.CustomProfileImageKey {
fileKey := value
if conversation.Accepted {
fsf := FunctionalityGate()
basepath := i.downloadFolder
fp, mp := GenerateDownloadPath(basepath, fileKey, true)
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
if _, err := os.Stat(fp); err == nil {
// file is marked as completed downloaded and exists...
} else {
// the user probably deleted the file, mark completed as false...
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), event.False)
}
}
log.Debugf("Downloading Profile Image %v %v %v", fp, mp, fileKey)
// ev.Event.Data[event.FilePath] = fp
fsf.DownloadFile(profile, conversation.ID, fp, mp, value, constants.ImagePreviewMaxSizeInBytes)
}
}
}
}
// handleImagePreviews checks settings and, if appropriate, auto-downloads any images
func (i *ImagePreviewsFunctionality) handleImagePreviews(profile peer.CwtchPeer, ev *event.Event, conversationID, senderID int) {
if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) {
// Short-circuit failures
// Don't autodownload images if the download path does not exist.
if i.downloadFolder == "" {
log.Debugf("download folder %v is not set", i.downloadFolder)
return
}
// Don't autodownload images if the download path does not exist.
if _, err := os.Stat(i.downloadFolder); os.IsNotExist(err) {
log.Debugf("download folder %v does not exist", i.downloadFolder)
return
}
// If file sharing is enabled then reshare all active files...
fsf := FunctionalityGate()
// Now look at the image preview experiment
var cm model.MessageWrapper
err := json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err == nil && cm.Overlay == model.OverlayFileSharing {
log.Debugf("Received File Sharing Message")
var fm OverlayMessage
err = json.Unmarshal([]byte(cm.Data), &fm)
if err == nil {
if fm.ShouldAutoDL() {
basepath := i.downloadFolder
fp, mp := GenerateDownloadPath(basepath, fm.Name, false)
log.Debugf("autodownloading file!")
ev.Data["Auto"] = constants.True
mID, _ := strconv.Atoi(ev.Data["Index"])
profile.UpdateMessageAttribute(conversationID, 0, mID, constants.AttrDownloaded, constants.True)
fsf.DownloadFile(profile, senderID, fp, mp, fm.FileKey(), constants.ImagePreviewMaxSizeInBytes)
}
}
}
}
}

View File

@ -4,6 +4,7 @@ import (
"crypto/rand"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/hex"
"encoding/json"
@ -189,6 +190,16 @@ func (cp *cwtchPeer) UpdateExperiments(enabled bool, experiments map[string]bool
cp.experiments = model.InitExperiments(enabled, experiments)
}
// NotifySettingsUpdate notifies a Cwtch profile of a change in the nature of global experiments. The Cwtch Profile uses
// this information to update registered extensions.
func (cp *cwtchPeer) NotifySettingsUpdate(settings settings.GlobalSettings) {
cp.extensionLock.Lock()
defer cp.extensionLock.Unlock()
for _, extension := range cp.extensions {
extension.extension.NotifySettingsUpdate(settings)
}
}
func (cp *cwtchPeer) PublishEvent(resp event.Event) {
log.Debugf("Publishing Event: %v %v", resp.EventType, resp.Data)
cp.eventBus.Publish(resp)
@ -1408,7 +1419,7 @@ func (cp *cwtchPeer) eventHandler() {
log.Debugf("checking extension...%v", extension)
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension...not all experiments satisfied")
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}
@ -1439,7 +1450,7 @@ func (cp *cwtchPeer) eventHandler() {
log.Debugf("checking extension...%v", extension)
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension...not all experiments satisfied")
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}
extension.extension.OnContactReceiveValue(cp, *conversationInfo, scopedZonedPath, val, exists)
@ -1538,7 +1549,7 @@ func (cp *cwtchPeer) eventHandler() {
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension...not all experiments satisfied")
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}

View File

@ -228,6 +228,8 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
// StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine
func (cps *CwtchProfileStorage) StoreProfileKeyValue(keyType StorageKeyType, key string, value []byte) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, err := cps.insertProfileKeyValueStmt.Exec(keyType, key, value)
if err != nil {
log.Errorf("error executing query: %v", err)
@ -238,6 +240,8 @@ func (cps *CwtchProfileStorage) StoreProfileKeyValue(keyType StorageKeyType, key
// FindProfileKeysByPrefix allows fetching of typed values via a known Key from the Storage Engine
func (cps *CwtchProfileStorage) FindProfileKeysByPrefix(keyType StorageKeyType, prefix string) ([]string, error) {
cps.mutex.Lock()
defer cps.mutex.Unlock()
rows, err := cps.findProfileKeySQLStmt.Query(keyType, prefix+"%")
if err != nil {
log.Errorf("error executing query: %v", err)
@ -266,6 +270,8 @@ func (cps *CwtchProfileStorage) FindProfileKeysByPrefix(keyType StorageKeyType,
// LoadProfileKeyValue allows fetching of typed values via a known Key from the Storage Engine
func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key string) ([]byte, error) {
cps.mutex.Lock()
defer cps.mutex.Unlock()
rows, err := cps.selectProfileKeyValueStmt.Query(keyType, key)
if err != nil {
log.Errorf("error executing query: %v", err)
@ -291,6 +297,8 @@ func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key
// NewConversation stores a new conversation in the data store
func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) (int, error) {
cps.mutex.Lock()
defer cps.mutex.Unlock()
tx, err := cps.db.Begin()
if err != nil {
@ -336,6 +344,8 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently
// unavoidable in some circumstances because the event bus references conversations by handle, not by id)
func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.Conversation, error) {
cps.mutex.Lock()
defer cps.mutex.Unlock()
rows, err := cps.selectConversationByHandleStmt.Query(handle)
if err != nil {
log.Errorf("error executing query: %v", err)
@ -367,6 +377,8 @@ func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.C
// on app start up to build a summary of conversations for the UI. Any further updates should be integrated
// through the event bus.
func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, error) {
cps.mutex.Lock()
defer cps.mutex.Unlock()
rows, err := cps.fetchAllConversationsStmt.Query()
if err != nil {
log.Errorf("error executing query: %v", err)
@ -401,6 +413,8 @@ func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, err
// GetConversation looks up a particular conversation by id
func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, error) {
cps.mutex.Lock()
defer cps.mutex.Unlock()
rows, err := cps.selectConversationStmt.Query(id)
if err != nil {
log.Errorf("error executing query: %v", err)
@ -430,6 +444,8 @@ func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, er
// AcceptConversation sets the accepted status of a conversation to true in the backing datastore
func (cps *CwtchProfileStorage) AcceptConversation(id int) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, err := cps.acceptConversationStmt.Exec(id)
if err != nil {
log.Errorf("error executing query: %v", err)
@ -440,6 +456,8 @@ func (cps *CwtchProfileStorage) AcceptConversation(id int) error {
// DeleteConversation purges the conversation and any associated message history from the conversation store.
func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, err := cps.deleteConversationStmt.Exec(id)
if err != nil {
log.Errorf("error executing query: %v", err)
@ -450,6 +468,8 @@ func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
// SetConversationACL sets a new ACL on a given conversation.
func (cps *CwtchProfileStorage) SetConversationACL(id int, acl model.AccessControlList) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, err := cps.setConversationACLStmt.Exec(acl.Serialize(), id)
if err != nil {
log.Errorf("error executing query: %v", err)
@ -464,6 +484,8 @@ func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.Scope
if err != nil {
return err
}
cps.mutex.Lock()
defer cps.mutex.Unlock()
ci.Attributes[path.ToString()] = value
_, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), id)
if err != nil {
@ -475,7 +497,6 @@ func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.Scope
// InsertMessage appends a message to a conversation channel, with a given set of attributes
func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes, signature string, contentHash string) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
@ -757,6 +778,8 @@ func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel
// PurgeConversationChannel deletes all message for a conversation channel.
func (cps *CwtchProfileStorage) PurgeConversationChannel(conversation int, channel int) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
@ -785,12 +808,13 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
// Close closes the underlying database and prepared statements
func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) {
cps.mutex.Lock()
defer cps.mutex.Unlock()
if cps.db != nil {
if purgeAllNonSavedMessages {
cps.PurgeNonSavedMessages()
}
// We can't lock before this..
cps.mutex.Lock()
defer cps.mutex.Unlock()
cps.insertProfileKeyValueStmt.Close()
cps.selectProfileKeyValueStmt.Close()

View File

@ -4,6 +4,7 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/settings"
)
type ProfileHooks interface {
@ -21,6 +22,9 @@ type ProfileHooks interface {
// OnContactReceiveValue is Hooked after a profile receives a response to a Get/Val Request
OnContactReceiveValue(profile CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool)
// NotifySettingsUpdate allow profile hooks to access configs e.g. download folder
NotifySettingsUpdate(settings settings.GlobalSettings)
}
type ProfileHook struct {

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
)
@ -152,6 +153,7 @@ type CwtchPeer interface {
PublishEvent(resp event.Event)
RegisterHook(hook ProfileHooks)
UpdateExperiments(enabled bool, experiments map[string]bool)
NotifySettingsUpdate(settings settings.GlobalSettings)
IsFeatureEnabled(featureName string) bool
}

View File

@ -34,7 +34,7 @@ func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest s
log.Errorf("could not share file %v", err)
return
}
log.Infof("sharing file: %v %v", fileKey, serializedManifest)
log.Debugf("sharing file: %v %v", fileKey, serializedManifest)
fsss.activeShares.Store(fileKey, &manifest)
}

View File

@ -1,4 +1,4 @@
package app
package settings
import (
"cwtch.im/cwtch/event"

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

View File

@ -0,0 +1,186 @@
package filesharing
import (
"crypto/rand"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"encoding/base64"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"path/filepath"
// Import SQL Cipher
mrand "math/rand"
"os"
"os/user"
"path"
"runtime"
"runtime/pprof"
"testing"
"time"
_ "github.com/mutecomm/go-sqlcipher/v4"
)
func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) {
for {
state := peera.GetPeerState(peerb.GetOnion())
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion())
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
peerAName, _ := peera.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
peerBName, _ := peerb.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}
}
}
func TestFileSharing(t *testing.T) {
numGoRoutinesStart := runtime.NumGoroutine()
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
os.RemoveAll("storage")
os.RemoveAll("tordir")
log.SetLevel(log.LevelDebug)
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
// we don't need real randomness for the port, just to avoid a possible conflict...
mrand.Seed(int64(time.Now().Nanosecond()))
socksPort := mrand.Intn(1000) + 9051
controlPort := mrand.Intn(1000) + 9052
// generate a random password
key := make([]byte, 64)
_, err := rand.Read(key)
if err != nil {
panic(err)
}
useCache := os.Getenv("TORCACHE") == "true"
torDataDir := ""
if useCache {
log.Infof("using tor cache")
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
os.MkdirAll(torDataDir, 0700)
} else {
log.Infof("using clean tor data dir")
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
}
acn.WaitTillBootstrapped()
defer acn.Close()
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
usr, _ := user.Current()
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
os.Mkdir(cwtchDir, 0700)
os.RemoveAll(path.Join(cwtchDir, "testing"))
os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
t.Logf("Creating Alice...")
app.CreateProfile("alice", "asdfasdf", true)
t.Logf("Creating Bob...")
app.CreateProfile("bob", "asdfasdf", true)
t.Logf("** Waiting for Alice, Bob...")
alice := app2.WaitGetPeer(app, "alice")
app.ActivatePeerEngine(alice.GetOnion())
bob := app2.WaitGetPeer(app, "bob")
app.ActivatePeerEngine(bob.GetOnion())
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
queueOracle := event.NewQueue()
app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle)
// Turn on File Sharing Experiment...
settings := app.ReadSettings()
settings.ExperimentsEnabled = true
settings.DownloadPath = "./download_dir"
os.RemoveAll(path.Join(settings.DownloadPath, "cwtch.png"))
os.RemoveAll(path.Join(settings.DownloadPath, "cwtch.png.manifest"))
settings.Experiments[constants.FileSharingExperiment] = true
// Turn Auto Downloading On... (Part of the Image Previews / Profile Images Experiment)
settings.Experiments[constants.ImagePreviewsExperiment] = true
app.UpdateSettings(settings)
t.Logf("** Launching Peers...")
waitTime := time.Duration(30) * time.Second
t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime)
time.Sleep(waitTime)
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
t.Logf("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
alice.AcceptConversation(1)
bob.AcceptConversation(1)
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality := filesharing.FunctionalityGate()
_, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice)
alice.SendMessage(1, fileSharingMessage)
if err != nil {
t.Fatalf("Error!: %v", err)
}
// test that bob can download and verify the file
// The main difference here is that bob doesn't need to do anything...
// testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle)
// Wait for say...
time.Sleep(10 * time.Second)
if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) {
// path/to/whatever does not exist
t.Fatalf("cwthc.png should have been automatically downloadeded...")
}
queueOracle.Shutdown()
app.Shutdown()
acn.Close()
time.Sleep(10 * time.Second)
numGoRoutinesPostACN := runtime.NumGoroutine()
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
if numGoRoutinesStart != numGoRoutinesPostACN {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN)
}
}

View File

@ -412,7 +412,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Println("")
log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)

View File

@ -144,7 +144,7 @@ func TestFileSharing(t *testing.T) {
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{constants.FileSharingExperiment: true})
filesharingFunctionality := filesharing.FunctionalityGate()
_, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice)
alice.SendMessage(1, fileSharingMessage)