diff --git a/.drone.yml b/.drone.yml index 4ee878e..75f1322 100644 --- a/.drone.yml +++ b/.drone.yml @@ -48,6 +48,14 @@ steps: commands: - export PATH=`pwd`:$PATH - go test -timeout=20m -race -v cwtch.im/cwtch/testing/filesharing + - name: filesharing-autodownload-integ-test + image: golang:1.19.1 + volumes: + - name: deps + path: /go + commands: + - export PATH=`pwd`:$PATH + - go test -timeout=20m -race -v cwtch.im/cwtch/testing/autodownload - name: notify-gogs image: openpriv/drone-gogs pull: if-not-exists diff --git a/app/app.go b/app/app.go index 6198d27..3042c5e 100644 --- a/app/app.go +++ b/app/app.go @@ -10,6 +10,7 @@ import ( "cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" + "cwtch.im/cwtch/settings" "cwtch.im/cwtch/storage" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" @@ -32,7 +33,7 @@ type application struct { appBus event.Manager appmutex sync.Mutex - settings *GlobalSettingsFile + settings *settings.GlobalSettingsFile } func (app *application) IsFeatureEnabled(experiment string) bool { @@ -64,8 +65,8 @@ type Application interface { ActivatePeerEngine(onion string) DeactivatePeerEngine(onion string) - ReadSettings() GlobalSettings - UpdateSettings(settings GlobalSettings) + ReadSettings() settings.GlobalSettings + UpdateSettings(settings settings.GlobalSettings) IsFeatureEnabled(experiment string) bool ShutdownPeer(string) @@ -78,14 +79,14 @@ type Application interface { // LoadProfileFn is the function signature for a function in an app that loads a profile type LoadProfileFn func(profile peer.CwtchPeer) -func LoadAppSettings(appDirectory string) *GlobalSettingsFile { +func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile { log.Debugf("NewApp(%v)\n", appDirectory) os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) // Note: we basically presume this doesn't fail. If the file doesn't exist we create it, and as such the // only plausible error conditions are related to file create e.g. low disk space. If that is the case then // many other parts of Cwtch are likely to fail also. - settings, err := InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles) + settings, err := settings.InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles) if err != nil { log.Errorf("error initializing global settings file %. Global settings might not be loaded or saves", err) } @@ -93,7 +94,7 @@ func LoadAppSettings(appDirectory string) *GlobalSettingsFile { } // NewApp creates a new app with some environment awareness and initializes a Tor Manager -func NewApp(acn connectivity.ACN, appDirectory string, settings *GlobalSettingsFile) Application { +func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application { app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings} app.peers = make(map[string]peer.CwtchPeer) @@ -108,13 +109,13 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *GlobalSettingsF return app } -func (app *application) ReadSettings() GlobalSettings { +func (app *application) ReadSettings() settings.GlobalSettings { app.appmutex.Lock() defer app.appmutex.Unlock() return app.settings.ReadGlobalSettings() } -func (app *application) UpdateSettings(settings GlobalSettings) { +func (app *application) UpdateSettings(settings settings.GlobalSettings) { // don't allow any other application changes while settings update app.appmutex.Lock() defer app.appmutex.Unlock() @@ -133,6 +134,8 @@ func (app *application) UpdateSettings(settings GlobalSettings) { } else { profile.AllowUnknownConnections() } + + profile.NotifySettingsUpdate(settings) } } @@ -347,6 +350,7 @@ func (app *application) registerHooks(profile peer.CwtchPeer) { // Register Hooks profile.RegisterHook(extensions.ProfileValueExtension{}) profile.RegisterHook(filesharing.Functionality{}) + profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality)) } // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true diff --git a/extensions/profile_value.go b/extensions/profile_value.go index 3855807..0ac8b3b 100644 --- a/extensions/profile_value.go +++ b/extensions/profile_value.go @@ -6,6 +6,7 @@ import ( "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/settings" "git.openprivacy.ca/openprivacy/log" "strconv" ) @@ -14,6 +15,9 @@ import ( type ProfileValueExtension struct { } +func (pne ProfileValueExtension) NotifySettingsUpdate(settings settings.GlobalSettings) { +} + func (pne ProfileValueExtension) EventsToRegister() []event.Type { return nil } diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 80a0346..4ec1620 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -3,6 +3,7 @@ package filesharing import ( "crypto/rand" "cwtch.im/cwtch/event" + "cwtch.im/cwtch/settings" "encoding/hex" "encoding/json" "errors" @@ -30,8 +31,11 @@ import ( type Functionality struct { } +func (f Functionality) NotifySettingsUpdate(settings settings.GlobalSettings) { +} + func (f Functionality) EventsToRegister() []event.Type { - return []event.Type{event.ManifestReceived, event.FileDownloaded} + return []event.Type{event.ProtocolEngineCreated, event.ManifestReceived, event.FileDownloaded} } func (f Functionality) ExperimentsToRegister() []string { @@ -133,13 +137,9 @@ func (f Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversatio } } -// FunctionalityGate returns filesharing if enabled in the given experiment map -// Note: Experiment maps are currently in libcwtch-go -func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) { - if experimentMap[constants.FileSharingExperiment] { - return new(Functionality), nil - } - return nil, errors.New("filesharing is not enabled") +// FunctionalityGate returns filesharing functionality - gates now happen on function calls. +func FunctionalityGate() *Functionality { + return new(Functionality) } // PreviewFunctionalityGate returns filesharing if image previews are enabled diff --git a/functionality/filesharing/image_previews.go b/functionality/filesharing/image_previews.go new file mode 100644 index 0000000..4b7b911 --- /dev/null +++ b/functionality/filesharing/image_previews.go @@ -0,0 +1,139 @@ +package filesharing + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/settings" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/log" + "os" + "strconv" + "time" +) + +type ImagePreviewsFunctionality struct { + downloadFolder string +} + +func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) { + i.downloadFolder = settings.DownloadPath +} + +func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type { + return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup} +} + +func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string { + return []string{constants.FileSharingExperiment, constants.ImagePreviewsExperiment} +} + +func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { + if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) { + switch ev.EventType { + case event.NewMessageFromPeer: + ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"]) + if err == nil { + if ci.Accepted { + i.handleImagePreviews(profile, &ev, ci.ID, ci.ID) + } + } + case event.NewMessageFromGroup: + ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"]) + if err == nil { + if ci.Accepted { + i.handleImagePreviews(profile, &ev, ci.ID, ci.ID) + } + } + case event.ProtocolEngineCreated: + // Now that the Peer Engine is Activated, Reshare Profile Images + key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) + if exists { + serializedManifest, _ := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key)) + // reset the share timestamp, currently file shares are hardcoded to expire after 30 days... + // we reset the profile image here so that it is always available. + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", key), strconv.FormatInt(time.Now().Unix(), 10)) + log.Debugf("Custom Profile Image: %v %s", key, serializedManifest) + } + // If file sharing is enabled then reshare all active files... + fsf := FunctionalityGate() + fsf.ReShareFiles(profile) + } + } +} + +func (i ImagePreviewsFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) { +} + +func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) { + if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) { + _, zone, path := path.GetScopeZonePath() + if zone == attr.ProfileZone && path == constants.CustomProfileImageKey { + fileKey := value + + if conversation.Accepted { + fsf := FunctionalityGate() + basepath := i.downloadFolder + fp, mp := GenerateDownloadPath(basepath, fileKey, true) + + if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True { + if _, err := os.Stat(fp); err == nil { + // file is marked as completed downloaded and exists... + } else { + // the user probably deleted the file, mark completed as false... + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), event.False) + } + } + + log.Debugf("Downloading Profile Image %v %v %v", fp, mp, fileKey) + // ev.Event.Data[event.FilePath] = fp + fsf.DownloadFile(profile, conversation.ID, fp, mp, value, constants.ImagePreviewMaxSizeInBytes) + } + } + } +} + +// handleImagePreviews checks settings and, if appropriate, auto-downloads any images +func (i *ImagePreviewsFunctionality) handleImagePreviews(profile peer.CwtchPeer, ev *event.Event, conversationID, senderID int) { + if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) { + + // Short-circuit failures + // Don't autodownload images if the download path does not exist. + if i.downloadFolder == "" { + log.Debugf("download folder %v is not set", i.downloadFolder) + return + } + + // Don't autodownload images if the download path does not exist. + if _, err := os.Stat(i.downloadFolder); os.IsNotExist(err) { + log.Debugf("download folder %v does not exist", i.downloadFolder) + return + } + + // If file sharing is enabled then reshare all active files... + fsf := FunctionalityGate() + + // Now look at the image preview experiment + var cm model.MessageWrapper + err := json.Unmarshal([]byte(ev.Data[event.Data]), &cm) + if err == nil && cm.Overlay == model.OverlayFileSharing { + log.Debugf("Received File Sharing Message") + var fm OverlayMessage + err = json.Unmarshal([]byte(cm.Data), &fm) + if err == nil { + if fm.ShouldAutoDL() { + basepath := i.downloadFolder + fp, mp := GenerateDownloadPath(basepath, fm.Name, false) + log.Debugf("autodownloading file!") + ev.Data["Auto"] = constants.True + mID, _ := strconv.Atoi(ev.Data["Index"]) + profile.UpdateMessageAttribute(conversationID, 0, mID, constants.AttrDownloaded, constants.True) + fsf.DownloadFile(profile, senderID, fp, mp, fm.FileKey(), constants.ImagePreviewMaxSizeInBytes) + } + } + } + } +} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 7037fd2..7fbbc35 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/settings" "encoding/base64" "encoding/hex" "encoding/json" @@ -189,6 +190,16 @@ func (cp *cwtchPeer) UpdateExperiments(enabled bool, experiments map[string]bool cp.experiments = model.InitExperiments(enabled, experiments) } +// NotifySettingsUpdate notifies a Cwtch profile of a change in the nature of global experiments. The Cwtch Profile uses +// this information to update registered extensions. +func (cp *cwtchPeer) NotifySettingsUpdate(settings settings.GlobalSettings) { + cp.extensionLock.Lock() + defer cp.extensionLock.Unlock() + for _, extension := range cp.extensions { + extension.extension.NotifySettingsUpdate(settings) + } +} + func (cp *cwtchPeer) PublishEvent(resp event.Event) { log.Debugf("Publishing Event: %v %v", resp.EventType, resp.Data) cp.eventBus.Publish(resp) @@ -1408,7 +1419,7 @@ func (cp *cwtchPeer) eventHandler() { log.Debugf("checking extension...%v", extension) // check if the current map of experiments satisfies the extension requirements if !cp.checkExtensionExperiment(extension) { - log.Debugf("skipping extension...not all experiments satisfied") + log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension) continue } @@ -1439,7 +1450,7 @@ func (cp *cwtchPeer) eventHandler() { log.Debugf("checking extension...%v", extension) // check if the current map of experiments satisfies the extension requirements if !cp.checkExtensionExperiment(extension) { - log.Debugf("skipping extension...not all experiments satisfied") + log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension) continue } extension.extension.OnContactReceiveValue(cp, *conversationInfo, scopedZonedPath, val, exists) @@ -1538,7 +1549,7 @@ func (cp *cwtchPeer) eventHandler() { // check if the current map of experiments satisfies the extension requirements if !cp.checkExtensionExperiment(extension) { - log.Debugf("skipping extension...not all experiments satisfied") + log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension) continue } diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index ce79a6e..886896a 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -228,6 +228,8 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS // StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine func (cps *CwtchProfileStorage) StoreProfileKeyValue(keyType StorageKeyType, key string, value []byte) error { + cps.mutex.Lock() + defer cps.mutex.Unlock() _, err := cps.insertProfileKeyValueStmt.Exec(keyType, key, value) if err != nil { log.Errorf("error executing query: %v", err) @@ -238,6 +240,8 @@ func (cps *CwtchProfileStorage) StoreProfileKeyValue(keyType StorageKeyType, key // FindProfileKeysByPrefix allows fetching of typed values via a known Key from the Storage Engine func (cps *CwtchProfileStorage) FindProfileKeysByPrefix(keyType StorageKeyType, prefix string) ([]string, error) { + cps.mutex.Lock() + defer cps.mutex.Unlock() rows, err := cps.findProfileKeySQLStmt.Query(keyType, prefix+"%") if err != nil { log.Errorf("error executing query: %v", err) @@ -266,6 +270,8 @@ func (cps *CwtchProfileStorage) FindProfileKeysByPrefix(keyType StorageKeyType, // LoadProfileKeyValue allows fetching of typed values via a known Key from the Storage Engine func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key string) ([]byte, error) { + cps.mutex.Lock() + defer cps.mutex.Unlock() rows, err := cps.selectProfileKeyValueStmt.Query(keyType, key) if err != nil { log.Errorf("error executing query: %v", err) @@ -291,6 +297,8 @@ func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key // NewConversation stores a new conversation in the data store func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) (int, error) { + cps.mutex.Lock() + defer cps.mutex.Unlock() tx, err := cps.db.Begin() if err != nil { @@ -336,6 +344,8 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model. // Ideally this function should not exist, and all lookups should happen by ID (this is currently // unavoidable in some circumstances because the event bus references conversations by handle, not by id) func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.Conversation, error) { + cps.mutex.Lock() + defer cps.mutex.Unlock() rows, err := cps.selectConversationByHandleStmt.Query(handle) if err != nil { log.Errorf("error executing query: %v", err) @@ -367,6 +377,8 @@ func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.C // on app start up to build a summary of conversations for the UI. Any further updates should be integrated // through the event bus. func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, error) { + cps.mutex.Lock() + defer cps.mutex.Unlock() rows, err := cps.fetchAllConversationsStmt.Query() if err != nil { log.Errorf("error executing query: %v", err) @@ -401,6 +413,8 @@ func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, err // GetConversation looks up a particular conversation by id func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, error) { + cps.mutex.Lock() + defer cps.mutex.Unlock() rows, err := cps.selectConversationStmt.Query(id) if err != nil { log.Errorf("error executing query: %v", err) @@ -430,6 +444,8 @@ func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, er // AcceptConversation sets the accepted status of a conversation to true in the backing datastore func (cps *CwtchProfileStorage) AcceptConversation(id int) error { + cps.mutex.Lock() + defer cps.mutex.Unlock() _, err := cps.acceptConversationStmt.Exec(id) if err != nil { log.Errorf("error executing query: %v", err) @@ -440,6 +456,8 @@ func (cps *CwtchProfileStorage) AcceptConversation(id int) error { // DeleteConversation purges the conversation and any associated message history from the conversation store. func (cps *CwtchProfileStorage) DeleteConversation(id int) error { + cps.mutex.Lock() + defer cps.mutex.Unlock() _, err := cps.deleteConversationStmt.Exec(id) if err != nil { log.Errorf("error executing query: %v", err) @@ -450,6 +468,8 @@ func (cps *CwtchProfileStorage) DeleteConversation(id int) error { // SetConversationACL sets a new ACL on a given conversation. func (cps *CwtchProfileStorage) SetConversationACL(id int, acl model.AccessControlList) error { + cps.mutex.Lock() + defer cps.mutex.Unlock() _, err := cps.setConversationACLStmt.Exec(acl.Serialize(), id) if err != nil { log.Errorf("error executing query: %v", err) @@ -464,6 +484,8 @@ func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.Scope if err != nil { return err } + cps.mutex.Lock() + defer cps.mutex.Unlock() ci.Attributes[path.ToString()] = value _, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), id) if err != nil { @@ -475,7 +497,6 @@ func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.Scope // InsertMessage appends a message to a conversation channel, with a given set of attributes func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes, signature string, contentHash string) (int, error) { - channelID := ChannelID{Conversation: conversation, Channel: channel} cps.mutex.Lock() @@ -757,6 +778,8 @@ func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel // PurgeConversationChannel deletes all message for a conversation channel. func (cps *CwtchProfileStorage) PurgeConversationChannel(conversation int, channel int) error { + cps.mutex.Lock() + defer cps.mutex.Unlock() conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation, channel)) if err != nil { log.Errorf("error executing transaction: %v", err) @@ -785,12 +808,13 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() { // Close closes the underlying database and prepared statements func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) { - cps.mutex.Lock() - defer cps.mutex.Unlock() if cps.db != nil { if purgeAllNonSavedMessages { cps.PurgeNonSavedMessages() } + // We can't lock before this.. + cps.mutex.Lock() + defer cps.mutex.Unlock() cps.insertProfileKeyValueStmt.Close() cps.selectProfileKeyValueStmt.Close() diff --git a/peer/hooks.go b/peer/hooks.go index 33eec53..6a0924c 100644 --- a/peer/hooks.go +++ b/peer/hooks.go @@ -4,6 +4,7 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/settings" ) type ProfileHooks interface { @@ -21,6 +22,9 @@ type ProfileHooks interface { // OnContactReceiveValue is Hooked after a profile receives a response to a Get/Val Request OnContactReceiveValue(profile CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) + + // NotifySettingsUpdate allow profile hooks to access configs e.g. download folder + NotifySettingsUpdate(settings settings.GlobalSettings) } type ProfileHook struct { diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 6e6b830..fe28871 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -5,6 +5,7 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" + "cwtch.im/cwtch/settings" "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" "git.openprivacy.ca/openprivacy/connectivity" ) @@ -152,6 +153,7 @@ type CwtchPeer interface { PublishEvent(resp event.Event) RegisterHook(hook ProfileHooks) UpdateExperiments(enabled bool, experiments map[string]bool) + NotifySettingsUpdate(settings settings.GlobalSettings) IsFeatureEnabled(featureName string) bool } diff --git a/protocol/files/filesharing_subsystem.go b/protocol/files/filesharing_subsystem.go index 011d015..2d63b81 100644 --- a/protocol/files/filesharing_subsystem.go +++ b/protocol/files/filesharing_subsystem.go @@ -34,7 +34,7 @@ func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest s log.Errorf("could not share file %v", err) return } - log.Infof("sharing file: %v %v", fileKey, serializedManifest) + log.Debugf("sharing file: %v %v", fileKey, serializedManifest) fsss.activeShares.Store(fileKey, &manifest) } diff --git a/app/settings.go b/settings/settings.go similarity index 99% rename from app/settings.go rename to settings/settings.go index baf7f13..08cd263 100644 --- a/app/settings.go +++ b/settings/settings.go @@ -1,4 +1,4 @@ -package app +package settings import ( "cwtch.im/cwtch/event" diff --git a/testing/autodownload/cwtch.png b/testing/autodownload/cwtch.png new file mode 100644 index 0000000..e50812f Binary files /dev/null and b/testing/autodownload/cwtch.png differ diff --git a/testing/autodownload/file_sharing_integration_test.go b/testing/autodownload/file_sharing_integration_test.go new file mode 100644 index 0000000..94feda7 --- /dev/null +++ b/testing/autodownload/file_sharing_integration_test.go @@ -0,0 +1,186 @@ +package filesharing + +import ( + "crypto/rand" + app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/functionality/filesharing" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" + "encoding/base64" + "errors" + "fmt" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "git.openprivacy.ca/openprivacy/log" + "path/filepath" + + // Import SQL Cipher + mrand "math/rand" + "os" + "os/user" + "path" + "runtime" + "runtime/pprof" + "testing" + "time" + + _ "github.com/mutecomm/go-sqlcipher/v4" +) + +func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) { + for { + state := peera.GetPeerState(peerb.GetOnion()) + if state == connections.FAILED { + t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion()) + } + if state != connections.AUTHENTICATED { + fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state]) + time.Sleep(time.Second * 5) + continue + } else { + peerAName, _ := peera.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) + peerBName, _ := peerb.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) + fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName) + break + } + } +} + +func TestFileSharing(t *testing.T) { + numGoRoutinesStart := runtime.NumGoroutine() + os.RemoveAll("cwtch.out.png") + os.RemoveAll("cwtch.out.png.manifest") + os.RemoveAll("storage") + os.RemoveAll("tordir") + + log.SetLevel(log.LevelDebug) + + os.Mkdir("tordir", 0700) + dataDir := path.Join("tordir", "tor") + os.MkdirAll(dataDir, 0700) + + // we don't need real randomness for the port, just to avoid a possible conflict... + mrand.Seed(int64(time.Now().Nanosecond())) + socksPort := mrand.Intn(1000) + 9051 + controlPort := mrand.Intn(1000) + 9052 + + // generate a random password + key := make([]byte, 64) + _, err := rand.Read(key) + if err != nil { + panic(err) + } + + useCache := os.Getenv("TORCACHE") == "true" + + torDataDir := "" + if useCache { + log.Infof("using tor cache") + torDataDir = filepath.Join(dataDir, "data-dir-torcache") + os.MkdirAll(torDataDir, 0700) + } else { + log.Infof("using clean tor data dir") + if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { + t.Fatalf("could not create data dir") + } + } + + tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") + acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + if err != nil { + t.Fatalf("Could not start Tor: %v", err) + } + acn.WaitTillBootstrapped() + defer acn.Close() + + app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage")) + + usr, _ := user.Current() + cwtchDir := path.Join(usr.HomeDir, ".cwtch") + os.Mkdir(cwtchDir, 0700) + os.RemoveAll(path.Join(cwtchDir, "testing")) + os.Mkdir(path.Join(cwtchDir, "testing"), 0700) + + t.Logf("Creating Alice...") + app.CreateProfile("alice", "asdfasdf", true) + + t.Logf("Creating Bob...") + app.CreateProfile("bob", "asdfasdf", true) + + t.Logf("** Waiting for Alice, Bob...") + alice := app2.WaitGetPeer(app, "alice") + app.ActivatePeerEngine(alice.GetOnion()) + bob := app2.WaitGetPeer(app, "bob") + app.ActivatePeerEngine(bob.GetOnion()) + + alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) + bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) + + queueOracle := event.NewQueue() + app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle) + + // Turn on File Sharing Experiment... + settings := app.ReadSettings() + settings.ExperimentsEnabled = true + settings.DownloadPath = "./download_dir" + os.RemoveAll(path.Join(settings.DownloadPath, "cwtch.png")) + os.RemoveAll(path.Join(settings.DownloadPath, "cwtch.png.manifest")) + settings.Experiments[constants.FileSharingExperiment] = true + // Turn Auto Downloading On... (Part of the Image Previews / Profile Images Experiment) + settings.Experiments[constants.ImagePreviewsExperiment] = true + app.UpdateSettings(settings) + + t.Logf("** Launching Peers...") + waitTime := time.Duration(30) * time.Second + t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime) + time.Sleep(waitTime) + + bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true) + alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) + alice.PeerWithOnion(bob.GetOnion()) + + t.Logf("Waiting for alice and Bob to peer...") + waitForPeerPeerConnection(t, alice, bob) + alice.AcceptConversation(1) + bob.AcceptConversation(1) + t.Logf("Alice and Bob are Connected!!") + + filesharingFunctionality := filesharing.FunctionalityGate() + + _, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice) + alice.SendMessage(1, fileSharingMessage) + + if err != nil { + t.Fatalf("Error!: %v", err) + } + + // test that bob can download and verify the file + // The main difference here is that bob doesn't need to do anything... + // testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle) + + // Wait for say... + time.Sleep(10 * time.Second) + + if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) { + // path/to/whatever does not exist + t.Fatalf("cwthc.png should have been automatically downloadeded...") + } + + queueOracle.Shutdown() + app.Shutdown() + acn.Close() + time.Sleep(10 * time.Second) + numGoRoutinesPostACN := runtime.NumGoroutine() + + // Printing out the current goroutines + // Very useful if we are leaking any. + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + + if numGoRoutinesStart != numGoRoutinesPostACN { + t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN) + } + +} diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 816290d..a7421c3 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -412,7 +412,7 @@ func TestCwtchPeerIntegration(t *testing.T) { pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) fmt.Println("") log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ - "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", + "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index f151cd6..5287acb 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -144,7 +144,7 @@ func TestFileSharing(t *testing.T) { t.Logf("Alice and Bob are Connected!!") - filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{constants.FileSharingExperiment: true}) + filesharingFunctionality := filesharing.FunctionalityGate() _, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice) alice.SendMessage(1, fileSharingMessage)