From 697b3df54cb21205fbebaa8ccb57de8a757ea7d2 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Thu, 5 Jan 2023 11:21:08 -0800 Subject: [PATCH 01/13] Log Errors related to Sharing Files --- functionality/filesharing/filesharing_functionality.go | 9 ++++++++- protocol/files/filesharing_subsystem.go | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 60e3d18..bc14e5b 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -115,6 +115,7 @@ func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) manifest, manifestExists := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", filekey)) if manifestExists { // everything is in order, so reshare this file with the engine + log.Debugf("restarting file share: %v", filekey) profile.ShareFile(filekey, manifest) return nil } @@ -132,6 +133,7 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error { for _, key := range keys { // only look at timestamp keys // this is an arbitrary choice + if strings.HasSuffix(key, ".ts") { _, zonedpath := attr.ParseScope(key) _, keypath := attr.ParseZone(zonedpath) @@ -148,7 +150,12 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error { // Then attempt to share this file again... // TODO: In the future this would be the point to change the timestamp and reshare the file... if err == nil && sharedFile.Active { - f.RestartFileShare(profile, filekey) + err := f.RestartFileShare(profile, filekey) + if err != nil { + log.Errorf("could not reshare file: %v", err) + } + } else { + log.Errorf("could not get fileshare info %v", err) } } } diff --git a/protocol/files/filesharing_subsystem.go b/protocol/files/filesharing_subsystem.go index 4b85b3d..2d63b81 100644 --- a/protocol/files/filesharing_subsystem.go +++ b/protocol/files/filesharing_subsystem.go @@ -34,6 +34,7 @@ func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest s log.Errorf("could not share file %v", err) return } + log.Debugf("sharing file: %v %v", fileKey, serializedManifest) fsss.activeShares.Store(fileKey, &manifest) } From 26c5c11216f8f1265ac68429d2ab957b023cb180 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Thu, 5 Jan 2023 13:52:43 -0800 Subject: [PATCH 02/13] Initial Prototype of Event Hooks --- app/app.go | 13 ++ extensions/profile_value.go | 65 ++++++ .../filesharing/filesharing_functionality.go | 131 ++++++++++- model/attr/scope.go | 6 + peer/cwtch_peer.go | 212 ++++++------------ peer/hooks.go | 49 ++++ peer/profile_interface.go | 8 +- .../file_sharing_integration_test.go | 28 ++- 8 files changed, 350 insertions(+), 162 deletions(-) create mode 100644 extensions/profile_value.go create mode 100644 peer/hooks.go diff --git a/app/app.go b/app/app.go index 0327b00..a1231b8 100644 --- a/app/app.go +++ b/app/app.go @@ -3,6 +3,8 @@ package app import ( "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/event" + "cwtch.im/cwtch/extensions" + "cwtch.im/cwtch/functionality/filesharing" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" @@ -145,6 +147,7 @@ func (app *application) CreatePeer(name string, password string, attributes map[ eventBus := event.NewEventManager() app.eventBuses[profile.GetOnion()] = eventBus profile.Init(app.eventBuses[profile.GetOnion()]) + app.registerHooks(profile) app.peers[profile.GetOnion()] = profile for zp, val := range attributes { @@ -242,6 +245,13 @@ func (app *application) LoadProfiles(password string) { } } +func (app *application) registerHooks(profile peer.CwtchPeer) { + // Register Hooks + profile.RegisterHook(extensions.ProfileValueExtension{}) + profile.RegisterHook(filesharing.Functionality{}) + +} + // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true func (app *application) installProfile(profile peer.CwtchPeer) bool { app.appmutex.Lock() @@ -252,8 +262,11 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { eventBus := event.NewEventManager() app.eventBuses[profile.GetOnion()] = eventBus profile.Init(app.eventBuses[profile.GetOnion()]) + app.registerHooks(profile) app.peers[profile.GetOnion()] = profile + app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) return true } diff --git a/extensions/profile_value.go b/extensions/profile_value.go new file mode 100644 index 0000000..86f19fd --- /dev/null +++ b/extensions/profile_value.go @@ -0,0 +1,65 @@ +package extensions + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "git.openprivacy.ca/openprivacy/log" + "strconv" +) + +// ProfileValueExtension implements custom Profile Names over Cwtch +type ProfileValueExtension struct { +} + +func (pne ProfileValueExtension) RegisterEvents() []event.Type { + return nil +} + +func (pne ProfileValueExtension) RegisterExperiments() []string { + return nil +} + +func (pne ProfileValueExtension) OnEvent(event event.Event, profile peer.CwtchPeer) { + // nop +} + +// OnContactReceiveValue for ProfileValueExtension handles saving specific Public Profile Values like Profile Name +func (pne ProfileValueExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) { + // Allow public profile parameters to be added as contact specific attributes... + scope, zone, _ := szp.GetScopeZonePath() + if exists && scope.IsPublic() && zone == attr.ProfileZone { + err := profile.SetConversationAttribute(conversation.ID, szp, value) + if err != nil { + log.Errorf("error setting conversation attribute %v", err) + } + } +} + +// OnContactRequestValue for ProfileValueExtension handles returning Public Profile Values +func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) { + scope, zone, zpath := szp.GetScopeZonePath() + log.Infof("Looking up public | conversation scope/zone %v", szp.ToString()) + if scope.IsPublic() || scope.IsConversation() { + val, exists := profile.GetScopedZonedAttribute(scope, zone, zpath) + + // NOTE: Temporary Override because UI currently wipes names if it can't find them... + if !exists && zone == attr.UnknownZone && zpath == constants.Name { + val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) + } + + // Construct a Response + resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)}) + resp.EventID = eventID + if exists { + resp.Data[event.Data] = val + } else { + resp.Data[event.Data] = "" + } + + log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val) + profile.PublishEvent(resp) + } +} diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index bc14e5b..9e1ddc8 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -2,12 +2,14 @@ package filesharing import ( "crypto/rand" + "cwtch.im/cwtch/event" "encoding/hex" "encoding/json" "errors" "fmt" "io" "math" + "math/bits" "os" path "path/filepath" "regexp" @@ -28,6 +30,101 @@ import ( type Functionality struct { } +func (f Functionality) RegisterEvents() []event.Type { + return []event.Type{event.ManifestReceived, event.FileDownloaded} +} + +func (f Functionality) RegisterExperiments() []string { + return []string{constants.FileSharingExperiment} +} + +// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded +func (f Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { + switch ev.EventType { + case event.ManifestReceived: + log.Debugf("Manifest Received Event!: %v", ev) + handle := ev.Data[event.Handle] + fileKey := ev.Data[event.FileKey] + serializedManifest := ev.Data[event.SerializedManifest] + + manifestFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey)) + if exists { + downloadFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey)) + if exists { + log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath) + var manifest files.Manifest + err := json.Unmarshal([]byte(serializedManifest), &manifest) + + if err == nil { + // We only need to check the file size here, as manifest is sent to engine and the file created + // will be bound to the size advertised in manifest. + fileSizeLimitValue, fileSizeLimitExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey)) + if fileSizeLimitExists { + fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize) + if err == nil { + if manifest.FileSizeInBytes >= fileSizeLimit { + log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue) + } else { + manifest.Title = manifest.FileName + manifest.FileName = downloadFilePath + log.Debugf("saving manifest") + err = manifest.Save(manifestFilePath) + if err != nil { + log.Errorf("could not save manifest: %v", err) + } else { + tempFile := "" + if runtime.GOOS == "android" { + tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")] + log.Debugf("derived android temp path: %v", tempFile) + } + profile.PublishEvent(event.NewEvent(event.ManifestSaved, map[event.Field]string{ + event.FileKey: fileKey, + event.Handle: handle, + event.SerializedManifest: string(manifest.Serialize()), + event.TempFile: tempFile, + event.NameSuggestion: manifest.Title, + })) + } + } + } + } + } else { + log.Errorf("error saving manifest: %v", err) + } + } else { + log.Errorf("found manifest path but not download path for %v", fileKey) + } + } else { + log.Errorf("no download path found for manifest: %v", fileKey) + } + case event.FileDownloaded: + fileKey := ev.Data[event.FileKey] + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true") + } +} + +func (f Functionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) { + // nop +} + +func (f Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) { + scope, zone, zpath := path.GetScopeZonePath() + log.Infof("file sharing contact receive value") + if exists && scope.IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(zpath, ".manifest.size") { + fileKey := strings.Replace(zpath, ".manifest.size", "", 1) + size, err := strconv.Atoi(value) + // if size is valid and below the maximum size for a manifest + // this is to prevent malicious sharers from using large amounts of memory when distributing + // a manifest as we reconstruct this in-memory + if err == nil && size < files.MaxManifestSize { + profile.PublishEvent(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: value, event.Handle: conversation.Handle})) + } else { + profile.PublishEvent(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: conversation.Handle})) + } + } + +} + // FunctionalityGate returns filesharing if enabled in the given experiment map // Note: Experiment maps are currently in libcwtch-go func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) { @@ -109,6 +206,23 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int, return nil } +// startFileShare is a private method used to finalize a file share and publish it to the protocol engine for processing. +func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, manifest string) error { + tsStr, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey)) + if exists { + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil || ts < time.Now().Unix()-2592000 { + log.Errorf("ignoring request to download a file offered more than 30 days ago") + return err + } + } + + // set the filekey status to active + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey), constants.True) + profile.PublishEvent(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: filekey, event.SerializedManifest: manifest})) + return nil +} + // RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error { // check that a manifest exists @@ -116,8 +230,7 @@ func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) if manifestExists { // everything is in order, so reshare this file with the engine log.Debugf("restarting file share: %v", filekey) - profile.ShareFile(filekey, manifest) - return nil + return f.startFileShare(profile, filekey, manifest) } return fmt.Errorf("manifest does not exist for filekey: %v", filekey) } @@ -238,7 +351,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (stri profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), string(serializedManifest)) profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize))))) - profile.ShareFile(key, string(serializedManifest)) + err = f.startFileShare(profile, key, string(serializedManifest)) return key, string(wrapperJSON), err } @@ -330,3 +443,15 @@ func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath, } return } + +// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file +func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) { + // set the filekey status to inactive + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False) + profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey})) +} + +// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files +func (f *Functionality) StopAllFileShares(profile peer.CwtchPeer) { + profile.PublishEvent(event.NewEvent(event.StopAllFileShares, map[event.Field]string{})) +} diff --git a/model/attr/scope.go b/model/attr/scope.go index 406d1a9..1e6810c 100644 --- a/model/attr/scope.go +++ b/model/attr/scope.go @@ -23,6 +23,12 @@ type Scope string // ScopedZonedPath typed path with a scope and a zone type ScopedZonedPath string +func (szp ScopedZonedPath) GetScopeZonePath() (Scope, Zone, string) { + scope, path := ParseScope(string(szp)) + zone, zpath := ParseZone(path) + return scope, zone, zpath +} + // scopes for attributes const ( // on a peer, local and peer supplied data diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 6f911ee..22dc43b 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -14,10 +14,8 @@ import ( "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" - "math/bits" "os" path "path/filepath" - "runtime" "sort" "strconv" "strings" @@ -28,7 +26,6 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" - "cwtch.im/cwtch/protocol/files" "git.openprivacy.ca/openprivacy/log" ) @@ -38,8 +35,7 @@ const lastReceivedSignature = "LastReceivedSignature" var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, - event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, - event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true} + event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, event.TriggerAntispamCheck: true} // DefaultEventsToHandle specifies which events will be subscribed to // @@ -58,8 +54,6 @@ var DefaultEventsToHandle = []event.Type{ event.ServerStateChange, event.SendMessageToPeerError, event.NewRetValMessageFromPeer, - event.ManifestReceived, - event.FileDownloaded, event.TriggerAntispamCheck, } @@ -74,6 +68,25 @@ type cwtchPeer struct { queue event.Queue eventBus event.Manager + + extensions []ProfileHook + extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions +} + +func (cp *cwtchPeer) PublishEvent(resp event.Event) { + cp.eventBus.Publish(resp) +} + +func (cp *cwtchPeer) RegisterHook(extension ProfileHooks) { + cp.extensionLock.Lock() + defer cp.extensionLock.Unlock() + + // Register Requested Events + for _, event := range extension.RegisterEvents() { + cp.eventBus.Subscribe(event, cp.queue) + } + + cp.extensions = append(cp.extensions, ConstructHook(extension)) } func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) { @@ -1165,33 +1178,6 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message)) } -// ShareFile begins hosting the given serialized manifest -func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) { - tsStr, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", fileKey)) - if exists { - ts, err := strconv.ParseInt(tsStr, 10, 64) - if err != nil || ts < time.Now().Unix()-2592000 { - log.Errorf("ignoring request to download a file offered more than 30 days ago") - return - } - } - // set the filekey status to active - cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.True) - cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest})) -} - -// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file -func (cp *cwtchPeer) StopFileShare(fileKey string) { - // set the filekey status to inactive - cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False) - cp.eventBus.Publish(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey})) -} - -// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files -func (cp *cwtchPeer) StopAllFileShares() { - cp.eventBus.Publish(event.NewEvent(event.StopAllFileShares, map[event.Field]string{})) -} - // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { @@ -1288,128 +1274,48 @@ func (cp *cwtchPeer) eventHandler() { path := ev.Data[event.Path] log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, path, onion) - conversationInfo, err := cp.FetchConversationInfo(onion) + log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err) + // only accepted contacts can look up information if conversationInfo != nil && conversationInfo.Accepted { - scope := attr.IntoScope(scope) - if scope.IsPublic() || scope.IsConversation() { - zone, zpath := attr.ParseZone(path) - val, exists := cp.GetScopedZonedAttribute(scope, zone, zpath) + // Type Safe Scoped/Zoned Path + zscope := attr.IntoScope(scope) + zone, zpath := attr.ParseZone(path) + scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath)) - // NOTE: Temporary Override because UI currently wipes names if it can't find them... - if !exists && zone == attr.UnknownZone && path == constants.Name { - val, exists = cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) - } - - resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)}) - resp.EventID = ev.EventID - if exists { - resp.Data[event.Data] = val - } else { - resp.Data[event.Data] = "" - } - log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val) - - cp.eventBus.Publish(resp) + // Safe Access to Extensions + cp.extensionLock.Lock() + log.Infof("checking extension...%v", cp.extensions) + for _, extension := range cp.extensions { + log.Infof("checking extension...%v", extension) + extension.extension.OnContactRequestValue(cp, *conversationInfo, ev.EventID, scopedZonedPath) } + cp.extensionLock.Unlock() + } - - case event.ManifestReceived: - log.Debugf("Manifest Received Event!: %v", ev) - handle := ev.Data[event.Handle] - fileKey := ev.Data[event.FileKey] - serializedManifest := ev.Data[event.SerializedManifest] - - manifestFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey)) - if exists { - downloadFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey)) - if exists { - log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath) - var manifest files.Manifest - err := json.Unmarshal([]byte(serializedManifest), &manifest) - - if err == nil { - // We only need to check the file size here, as manifest is sent to engine and the file created - // will be bound to the size advertised in manifest. - fileSizeLimitValue, fileSizeLimitExists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey)) - if fileSizeLimitExists { - fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize) - if err == nil { - if manifest.FileSizeInBytes >= fileSizeLimit { - log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue) - } else { - manifest.Title = manifest.FileName - manifest.FileName = downloadFilePath - log.Debugf("saving manifest") - err = manifest.Save(manifestFilePath) - if err != nil { - log.Errorf("could not save manifest: %v", err) - } else { - tempFile := "" - if runtime.GOOS == "android" { - tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")] - log.Debugf("derived android temp path: %v", tempFile) - } - cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{ - event.FileKey: fileKey, - event.Handle: handle, - event.SerializedManifest: string(manifest.Serialize()), - event.TempFile: tempFile, - event.NameSuggestion: manifest.Title, - })) - } - } - } - } - } else { - log.Errorf("error saving manifest: %v", err) - } - } else { - log.Errorf("found manifest path but not download path for %v", fileKey) - } - } else { - log.Errorf("no download path found for manifest: %v", fileKey) - } - case event.FileDownloaded: - fileKey := ev.Data[event.FileKey] - cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true") case event.NewRetValMessageFromPeer: - onion := ev.Data[event.RemotePeer] + handle := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] path := ev.Data[event.Path] val := ev.Data[event.Data] exists, _ := strconv.ParseBool(ev.Data[event.Exists]) - log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", onion, scope, path, exists, val) - if exists { + log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", handle, scope, path, exists, val) - // Handle File Sharing Metadata - // TODO This probably should be broken out to it's own code.. - zone, path := attr.ParseZone(path) - if attr.Scope(scope).IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(path, ".manifest.size") { - fileKey := strings.Replace(path, ".manifest.size", "", 1) - size, err := strconv.Atoi(val) - // if size is valid and below the maximum size for a manifest - // this is to prevent malicious sharers from using large amounts of memory when distributing - // a manifest as we reconstruct this in-memory - if err == nil && size < files.MaxManifestSize { - cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion})) - } else { - cp.eventBus.Publish(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: onion})) - } - } + conversationInfo, _ := cp.FetchConversationInfo(handle) + // only accepted contacts can look up information + if conversationInfo != nil && conversationInfo.Accepted { + // Type Safe Scoped/Zoned Path + zscope := attr.IntoScope(scope) + zone, zpath := attr.ParseZone(path) + scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath)) - // Allow public profile parameters to be added as peer specific attributes... - if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone { - ci, err := cp.FetchConversationInfo(onion) - log.Debugf("fetch conversation info %v %v", ci, err) - if ci != nil && err == nil { - err := cp.SetConversationAttribute(ci.ID, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val) - if err != nil { - log.Errorf("error setting conversation attribute %v", err) - } - } + // Safe Access to Extensions + cp.extensionLock.Lock() + for _, extension := range cp.extensions { + extension.extension.OnContactReceiveValue(cp, *conversationInfo, scopedZonedPath, val, exists) } + cp.extensionLock.Unlock() } case event.PeerStateChange: handle := ev.Data[event.RemotePeer] @@ -1491,10 +1397,26 @@ func (cp *cwtchPeer) eventHandler() { } } default: - if ev.EventType != "" { - log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) + // invalid event, signifies shutdown + if ev.EventType == "" { + return + } + + // Otherwise, obtain Safe Access to Extensions + processed := false + cp.extensionLock.Lock() + for _, extension := range cp.extensions { + // if the extension is registered for this event type then process + if _, contains := extension.events[ev.EventType]; contains { + extension.extension.OnEvent(ev, cp) + processed = true + } + } + cp.extensionLock.Unlock() + + if !processed { + log.Errorf("cwtch profile received an event that it (or an extension) was unable to handle. this is very likely a programming error: %v", ev.EventType) } - return } } } diff --git a/peer/hooks.go b/peer/hooks.go new file mode 100644 index 0000000..d061c27 --- /dev/null +++ b/peer/hooks.go @@ -0,0 +1,49 @@ +package peer + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" +) + +type ProfileHooks interface { + + // RegisterEvents returns a set of events that the extension is interested hooking + RegisterEvents() []event.Type + + // RegisterExperiments RegisterExperiments returns a set of experiments that the extension is interested in being notified about + RegisterExperiments() []string + + // OnEvent is called whenever an event Registered with RegisterEvents is called + OnEvent(event event.Event, profile CwtchPeer) + + // OnContactRequestValue is Hooked when a contact sends a request for the given path + OnContactRequestValue(profile CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) + + // OnContactReceiveValue is Hooked after a profile receives a response to a Get/Val Request + OnContactReceiveValue(profile CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) +} + +type ProfileHook struct { + extension ProfileHooks + events map[event.Type]bool + experiments map[string]bool +} + +func ConstructHook(extension ProfileHooks) ProfileHook { + events := make(map[event.Type]bool) + for _, event := range extension.RegisterEvents() { + events[event] = true + } + + experiments := make(map[string]bool) + for _, experiment := range extension.RegisterExperiments() { + experiments[experiment] = true + } + + return ProfileHook{ + extension, + events, + experiments, + } +} diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 493bc3e..c9fa835 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -121,12 +121,6 @@ type CwtchPeer interface { GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error - // File Sharing APIS - // TODO move these to feature protected interfaces - ShareFile(fileKey string, serializedManifest string) - StopFileShare(fileKey string) - StopAllFileShares() - // Server Token APIS // TODO move these to feature protected interfaces StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) @@ -136,4 +130,6 @@ type CwtchPeer interface { ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error Export(file string) error Delete() + PublishEvent(resp event.Event) + RegisterHook(hook ProfileHooks) } diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index b266847..2574be0 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -18,6 +18,7 @@ import ( "fmt" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" + "path/filepath" // Import SQL Cipher mrand "math/rand" @@ -56,7 +57,7 @@ func TestFileSharing(t *testing.T) { os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - log.SetLevel(log.LevelInfo) + log.SetLevel(log.LevelDebug) os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") @@ -74,13 +75,22 @@ func TestFileSharing(t *testing.T) { panic(err) } + useCache := os.Getenv("TORCACHE") == "true" + torDataDir := "" - if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { - t.Fatalf("could not create data dir") + if useCache { + log.Infof("using tor cache") + torDataDir = filepath.Join(dataDir, "data-dir-torcache") + os.MkdirAll(torDataDir, 0700) + } else { + log.Infof("using clean tor data dir") + if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { + t.Fatalf("could not create data dir") + } } tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") - acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) if err != nil { t.Fatalf("Could not start Tor: %v", err) } @@ -125,6 +135,7 @@ func TestFileSharing(t *testing.T) { t.Logf("Waiting for alice and Bob to peer...") waitForPeerPeerConnection(t, alice, bob) + alice.AcceptConversation(1) t.Logf("Alice and Bob are Connected!!") @@ -145,7 +156,7 @@ func TestFileSharing(t *testing.T) { // Test stopping and restarting file shares t.Logf("Stopping File Share") - alice.StopAllFileShares() + filesharingFunctionality.StopAllFileShares(alice) // Allow time for the stop request to filter through Engine time.Sleep(time.Second * 5) @@ -181,6 +192,7 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") + bob.AcceptConversation(1) message, _, err := bob.GetChannelMessage(1, 0, 1) if err != nil { t.Fatalf("could not find file sharing message: %v", err) @@ -194,19 +206,19 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay) if err == nil { - + t.Logf("bob attempting to download file with invalid download") // try downloading with invalid download dir err = filesharingFunctionality.DownloadFile(bob, 1, "/do/not/download/this/file/cwtch.out.png", "./cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes) if err == nil { t.Fatalf("should not download file with invalid download dir") } - + t.Logf("bob attempting to download file with invalid manifest") // try downloading with invalid manifest dir err = filesharingFunctionality.DownloadFile(bob, 1, "./cwtch.out.png", "/do/not/download/this/file/cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes) if err == nil { t.Fatalf("should not download file with invalid manifest dir") } - + t.Logf("bob attempting to download file") err = filesharingFunctionality.DownloadFile(bob, 1, "./cwtch.out.png", "./cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes) if err != nil { t.Fatalf("could not download file: %v", err) From f246ea1e40f31eae1776d9002d94f9a34287e0b2 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 25 Jan 2023 12:32:26 -0800 Subject: [PATCH 03/13] FileSharing Experiments / Move Experiment Handling to App and Cwtch Peer --- app/app.go | 38 ++++- app/app_constants.go | 6 + app/plugins/antispam.go | 2 +- app/settings.go | 146 +++++++++++++++++ extensions/profile_value.go | 2 +- .../filesharing/filesharing_functionality.go | 151 +++++++++++------- model/constants/experiments.go | 2 + model/experiments.go | 32 ++++ peer/cwtch_peer.go | 59 ++++++- peer/cwtchprofilestorage.go | 6 +- peer/hooks.go | 3 +- peer/profile_interface.go | 2 + peer/storage.go | 2 +- storage/v1/profile_store.go | 2 +- storage/v1/stream_store.go | 2 +- .../file_sharing_integration_test.go | 15 +- 16 files changed, 391 insertions(+), 79 deletions(-) create mode 100644 app/app_constants.go create mode 100644 app/settings.go create mode 100644 model/experiments.go diff --git a/app/app.go b/app/app.go index a1231b8..32b5e33 100644 --- a/app/app.go +++ b/app/app.go @@ -31,6 +31,8 @@ type application struct { engines map[string]connections.Engine appBus event.Manager appmutex sync.Mutex + + settings *GlobalSettingsFile } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers @@ -52,6 +54,9 @@ type Application interface { ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) DeactivatePeerEngine(onion string) + ReadSettings() GlobalSettings + UpdateSettings(settings GlobalSettings) + ShutdownPeer(string) Shutdown() @@ -67,7 +72,15 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) - app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} + // Note: we basically presume this doesn't fail. If the file doesn't exist we create it, and as such the + // only plausible error conditions are related to file create e.g. low disk space. If that is the case then + // many other parts of Cwtch are likely to fail also. + settings, err := InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles) + if err != nil { + log.Errorf("error initializing global settings file %. Global settings might not be loaded or saves", err) + } + + app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings} app.peers = make(map[string]peer.CwtchPeer) app.acn = acn @@ -80,6 +93,26 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { return app } +func (app *application) ReadSettings() GlobalSettings { + app.appmutex.Lock() + defer app.appmutex.Unlock() + return app.settings.ReadGlobalSettings() +} + +func (app *application) UpdateSettings(settings GlobalSettings) { + // don't allow any other application changes while settings update + app.appmutex.Lock() + defer app.appmutex.Unlock() + app.settings.WriteGlobalSettings(settings) + + // we now need to propagate changes to all peers + app.peerLock.Lock() + defer app.peerLock.Unlock() + for _, profile := range app.peers { + profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments) + } +} + // ListProfiles returns a map of onions to their profile's Name func (app *application) ListProfiles() []string { var keys []string @@ -160,7 +193,7 @@ func (app *application) CreatePeer(name string, password string, attributes map[ } func (app *application) DeletePeer(onion string, password string) { - log.Infof("DeletePeer called on %v\n", onion) + log.Debugf("DeletePeer called on %v\n", onion) app.appmutex.Lock() defer app.appmutex.Unlock() @@ -249,7 +282,6 @@ func (app *application) registerHooks(profile peer.CwtchPeer) { // Register Hooks profile.RegisterHook(extensions.ProfileValueExtension{}) profile.RegisterHook(filesharing.Functionality{}) - } // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true diff --git a/app/app_constants.go b/app/app_constants.go new file mode 100644 index 0000000..f564305 --- /dev/null +++ b/app/app_constants.go @@ -0,0 +1,6 @@ +package app + +// We offer "un-passworded" profiles but our storage encrypts everything with a password. We need an agreed upon +// password to use in that case, that the app case use behind the scenes to password and unlock with +// https://docs.openprivacy.ca/cwtch-security-handbook/profile_encryption_and_storage.html +const DefactoPasswordForUnencryptedProfiles = "be gay do crime" diff --git a/app/plugins/antispam.go b/app/plugins/antispam.go index 6fb1195..c2f309b 100644 --- a/app/plugins/antispam.go +++ b/app/plugins/antispam.go @@ -27,7 +27,7 @@ func (a antispam) Shutdown() { } func (a *antispam) run() { - log.Infof("running antispam trigger plugin") + log.Debugf("running antispam trigger plugin") for { select { case <-time.After(antispamTickTime): diff --git a/app/settings.go b/app/settings.go new file mode 100644 index 0000000..baf7f13 --- /dev/null +++ b/app/settings.go @@ -0,0 +1,146 @@ +package app + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/storage/v1" + "encoding/json" + "git.openprivacy.ca/openprivacy/log" + "os" + path "path/filepath" +) + +const ( + CwtchStarted = event.Type("CwtchStarted") + CwtchStartError = event.Type("CwtchStartError") + UpdateGlobalSettings = event.Type("UpdateGlobalSettings") +) + +const GlobalSettingsFilename = "ui.globals" +const saltFile = "SALT" + +type NotificationPolicy string + +const ( + NotificationPolicyMute = NotificationPolicy("NotificationPolicy.Mute") + NotificationPolicyOptIn = NotificationPolicy("NotificationPolicy.OptIn") + NotificationPolicyDefaultAll = NotificationPolicy("NotificationPolicy.DefaultAll") +) + +type GlobalSettingsFile struct { + v1.FileStore +} + +type GlobalSettings struct { + Locale string + Theme string + ThemeMode string + PreviousPid int64 + ExperimentsEnabled bool + Experiments map[string]bool + BlockUnknownConnections bool + NotificationPolicy NotificationPolicy + NotificationContent string + StreamerMode bool + StateRootPane int + FirstTime bool + UIColumnModePortrait string + UIColumnModeLandscape string + DownloadPath string + AllowAdvancedTorConfig bool + CustomTorrc string + UseCustomTorrc bool + UseExternalTor bool + CustomSocksPort int + CustomControlPort int + UseTorCache bool + TorCacheDir string +} + +var DefaultGlobalSettings = GlobalSettings{ + Locale: "en", + Theme: "dark", + PreviousPid: -1, + ExperimentsEnabled: false, + Experiments: map[string]bool{constants.MessageFormattingExperiment: true}, + StateRootPane: 0, + FirstTime: true, + BlockUnknownConnections: false, + StreamerMode: false, + UIColumnModePortrait: "DualpaneMode.Single", + UIColumnModeLandscape: "DualpaneMode.CopyPortrait", + NotificationPolicy: "NotificationPolicy.Mute", + NotificationContent: "NotificationContent.SimpleEvent", + DownloadPath: "", + AllowAdvancedTorConfig: false, + CustomTorrc: "", + UseCustomTorrc: false, + CustomSocksPort: -1, + CustomControlPort: -1, + UseTorCache: false, + TorCacheDir: "", +} + +func InitGlobalSettingsFile(directory string, password string) (*GlobalSettingsFile, error) { + var key [32]byte + salt, err := os.ReadFile(path.Join(directory, saltFile)) + if err != nil { + log.Infof("Could not find salt file: %v (creating a new settings file)", err) + var newSalt [128]byte + key, newSalt, err = v1.CreateKeySalt(password) + if err != nil { + log.Errorf("Could not initialize salt: %v", err) + return nil, err + } + os.Mkdir(directory, 0700) + err := os.WriteFile(path.Join(directory, saltFile), newSalt[:], 0600) + if err != nil { + log.Errorf("Could not write salt file: %v", err) + return nil, err + } + } else { + key = v1.CreateKey(password, salt) + } + + gsFile := v1.NewFileStore(directory, GlobalSettingsFilename, key) + log.Infof("initialized global settings file: %v", gsFile) + globalSettingsFile := GlobalSettingsFile{ + gsFile, + } + return &globalSettingsFile, nil +} + +func (globalSettingsFile *GlobalSettingsFile) ReadGlobalSettings() GlobalSettings { + settings := DefaultGlobalSettings + + if globalSettingsFile == nil { + log.Errorf("Global Settings File was not Initialized Properly") + return settings + } + + settingsBytes, err := globalSettingsFile.Read() + if err != nil { + log.Infof("Could not read global ui settings: %v (assuming this is a first time app deployment...)", err) + return settings //firstTime = true + } + + err = json.Unmarshal(settingsBytes, &settings) + if err != nil { + log.Errorf("Could not parse global ui settings: %v\n", err) + // TODO if settings is corrupted, we probably want to alert the UI. + return settings //firstTime = true + } + + log.Debugf("Settings: %#v", settings) + return settings +} + +func (globalSettingsFile *GlobalSettingsFile) WriteGlobalSettings(globalSettings GlobalSettings) { + bytes, _ := json.Marshal(globalSettings) + // override first time setting + globalSettings.FirstTime = true + err := globalSettingsFile.Write(bytes) + if err != nil { + log.Errorf("Could not write global ui settings: %v\n", err) + } +} diff --git a/extensions/profile_value.go b/extensions/profile_value.go index 86f19fd..1a5327f 100644 --- a/extensions/profile_value.go +++ b/extensions/profile_value.go @@ -41,7 +41,7 @@ func (pne ProfileValueExtension) OnContactReceiveValue(profile peer.CwtchPeer, c // OnContactRequestValue for ProfileValueExtension handles returning Public Profile Values func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) { scope, zone, zpath := szp.GetScopeZonePath() - log.Infof("Looking up public | conversation scope/zone %v", szp.ToString()) + log.Debugf("Looking up public | conversation scope/zone %v", szp.ToString()) if scope.IsPublic() || scope.IsConversation() { val, exists := profile.GetScopedZonedAttribute(scope, zone, zpath) diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 9e1ddc8..1f6625a 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -40,66 +40,70 @@ func (f Functionality) RegisterExperiments() []string { // OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded func (f Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { - switch ev.EventType { - case event.ManifestReceived: - log.Debugf("Manifest Received Event!: %v", ev) - handle := ev.Data[event.Handle] - fileKey := ev.Data[event.FileKey] - serializedManifest := ev.Data[event.SerializedManifest] + if profile.IsFeatureEnabled(constants.FileSharingExperiment) { + switch ev.EventType { + case event.ManifestReceived: + log.Debugf("Manifest Received Event!: %v", ev) + handle := ev.Data[event.Handle] + fileKey := ev.Data[event.FileKey] + serializedManifest := ev.Data[event.SerializedManifest] - manifestFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey)) - if exists { - downloadFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey)) + manifestFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey)) if exists { - log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath) - var manifest files.Manifest - err := json.Unmarshal([]byte(serializedManifest), &manifest) + downloadFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey)) + if exists { + log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath) + var manifest files.Manifest + err := json.Unmarshal([]byte(serializedManifest), &manifest) - if err == nil { - // We only need to check the file size here, as manifest is sent to engine and the file created - // will be bound to the size advertised in manifest. - fileSizeLimitValue, fileSizeLimitExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey)) - if fileSizeLimitExists { - fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize) - if err == nil { - if manifest.FileSizeInBytes >= fileSizeLimit { - log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue) - } else { - manifest.Title = manifest.FileName - manifest.FileName = downloadFilePath - log.Debugf("saving manifest") - err = manifest.Save(manifestFilePath) - if err != nil { - log.Errorf("could not save manifest: %v", err) + if err == nil { + // We only need to check the file size here, as manifest is sent to engine and the file created + // will be bound to the size advertised in manifest. + fileSizeLimitValue, fileSizeLimitExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey)) + if fileSizeLimitExists { + fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize) + if err == nil { + if manifest.FileSizeInBytes >= fileSizeLimit { + log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue) } else { - tempFile := "" - if runtime.GOOS == "android" { - tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")] - log.Debugf("derived android temp path: %v", tempFile) + manifest.Title = manifest.FileName + manifest.FileName = downloadFilePath + log.Debugf("saving manifest") + err = manifest.Save(manifestFilePath) + if err != nil { + log.Errorf("could not save manifest: %v", err) + } else { + tempFile := "" + if runtime.GOOS == "android" { + tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")] + log.Debugf("derived android temp path: %v", tempFile) + } + profile.PublishEvent(event.NewEvent(event.ManifestSaved, map[event.Field]string{ + event.FileKey: fileKey, + event.Handle: handle, + event.SerializedManifest: string(manifest.Serialize()), + event.TempFile: tempFile, + event.NameSuggestion: manifest.Title, + })) } - profile.PublishEvent(event.NewEvent(event.ManifestSaved, map[event.Field]string{ - event.FileKey: fileKey, - event.Handle: handle, - event.SerializedManifest: string(manifest.Serialize()), - event.TempFile: tempFile, - event.NameSuggestion: manifest.Title, - })) } } } + } else { + log.Errorf("error saving manifest: %v", err) } } else { - log.Errorf("error saving manifest: %v", err) + log.Errorf("found manifest path but not download path for %v", fileKey) } } else { - log.Errorf("found manifest path but not download path for %v", fileKey) + log.Errorf("no download path found for manifest: %v", fileKey) } - } else { - log.Errorf("no download path found for manifest: %v", fileKey) + case event.FileDownloaded: + fileKey := ev.Data[event.FileKey] + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true") } - case event.FileDownloaded: - fileKey := ev.Data[event.FileKey] - profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true") + } else { + log.Errorf("profile called filesharing experiment OnContactReceiveValue even though file sharing was not enabled. This is likely a programming error.") } } @@ -108,21 +112,25 @@ func (f Functionality) OnContactRequestValue(profile peer.CwtchPeer, conversatio } func (f Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) { - scope, zone, zpath := path.GetScopeZonePath() - log.Infof("file sharing contact receive value") - if exists && scope.IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(zpath, ".manifest.size") { - fileKey := strings.Replace(zpath, ".manifest.size", "", 1) - size, err := strconv.Atoi(value) - // if size is valid and below the maximum size for a manifest - // this is to prevent malicious sharers from using large amounts of memory when distributing - // a manifest as we reconstruct this in-memory - if err == nil && size < files.MaxManifestSize { - profile.PublishEvent(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: value, event.Handle: conversation.Handle})) - } else { - profile.PublishEvent(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: conversation.Handle})) + // Profile should not call us if FileSharing is disabled + if profile.IsFeatureEnabled(constants.FileSharingExperiment) { + scope, zone, zpath := path.GetScopeZonePath() + log.Debugf("file sharing contact receive value") + if exists && scope.IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(zpath, ".manifest.size") { + fileKey := strings.Replace(zpath, ".manifest.size", "", 1) + size, err := strconv.Atoi(value) + // if size is valid and below the maximum size for a manifest + // this is to prevent malicious sharers from using large amounts of memory when distributing + // a manifest as we reconstruct this in-memory + if err == nil && size < files.MaxManifestSize { + profile.PublishEvent(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: value, event.Handle: conversation.Handle})) + } else { + profile.PublishEvent(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: conversation.Handle})) + } } + } else { + log.Errorf("profile called filesharing experiment OnContactReceiveValue even though file sharing was not enabled. This is likely a programming error.") } - } // FunctionalityGate returns filesharing if enabled in the given experiment map @@ -174,6 +182,11 @@ func (om *OverlayMessage) ShouldAutoDL() bool { // to downloadFilePath func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int, downloadFilePath string, manifestFilePath string, key string, limit uint64) error { + // assert that we are allowed to download the file + if !profile.IsFeatureEnabled(constants.FileSharingExperiment) { + return errors.New("filesharing functionality is not enabled") + } + // Don't download files if the download or manifest path is not set if downloadFilePath == "" || manifestFilePath == "" { return errors.New("download path or manifest path is empty") @@ -225,6 +238,12 @@ func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, m // RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error { + + // assert that we are allowed to restart filesharing + if !profile.IsFeatureEnabled(constants.FileSharingExperiment) { + return errors.New("filesharing functionality is not enabled") + } + // check that a manifest exists manifest, manifestExists := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", filekey)) if manifestExists { @@ -238,6 +257,12 @@ func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) // ReShareFiles given a profile we iterate through all existing fileshares and re-share them // if the time limit has not expired func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error { + + // assert that we are allowed to restart filesharing + if !profile.IsFeatureEnabled(constants.FileSharingExperiment) { + return errors.New("filesharing functionality is not enabled") + } + keys, err := profile.GetScopedZonedAttributeKeys(attr.LocalScope, attr.FilesharingZone) if err != nil { return err @@ -304,6 +329,12 @@ func (f *Functionality) GetFileShareInfo(profile peer.CwtchPeer, filekey string) // ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file // at filepath func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (string, string, error) { + + // assert that we are allowed to share files + if !profile.IsFeatureEnabled(constants.FileSharingExperiment) { + return "", "", errors.New("filesharing functionality is not enabled") + } + manifest, err := files.CreateManifest(filepath) if err != nil { return "", "", err @@ -446,6 +477,7 @@ func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath, // StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) { + // Note we do not do a permissions check here, as we are *always* permitted to stop sharing files. // set the filekey status to inactive profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False) profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey})) @@ -453,5 +485,6 @@ func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) { // StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files func (f *Functionality) StopAllFileShares(profile peer.CwtchPeer) { + // Note we do not do a permissions check here, as we are *always* permitted to stop sharing files. profile.PublishEvent(event.NewEvent(event.StopAllFileShares, map[event.Field]string{})) } diff --git a/model/constants/experiments.go b/model/constants/experiments.go index 665a856..95486ff 100644 --- a/model/constants/experiments.go +++ b/model/constants/experiments.go @@ -10,5 +10,7 @@ const ImagePreviewsExperiment = "filesharing-images" // ImagePreviewMaxSizeInBytes Files up to this size will be autodownloaded using ImagePreviewsExperiment const ImagePreviewMaxSizeInBytes = 20971520 +const MessageFormattingExperiment = "message-formatting" + // AutoDLFileExts Files with these extensions will be autodownloaded using ImagePreviewsExperiment var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} diff --git a/model/experiments.go b/model/experiments.go new file mode 100644 index 0000000..36ca753 --- /dev/null +++ b/model/experiments.go @@ -0,0 +1,32 @@ +package model + +// Experiments are optional functionality that can be enabled/disabled by an application either completely or individually. +// examples of experiments include File Sharing, Profile Images and Groups. +type Experiments struct { + enabled bool + experiments map[string]bool +} + +// InitExperiments encapsulates a set of experiments separate from their storage in GlobalSettings. +func InitExperiments(enabled bool, experiments map[string]bool) Experiments { + return Experiments{ + enabled: enabled, + experiments: experiments, + } +} + +// IsEnabled is a convenience function that takes in an experiment and returns true if it is enabled. Experiments +// are only enabled if both global experiments are turned on and if the specific experiment is also turned on. +// The one exception to this is experiments that have been promoted to default functionality which may be turned on +// even if experiments turned off globally. These experiments are defined by DefaultEnabledFunctionality. +func (e *Experiments) IsEnabled(experiment string) bool { + if !e.enabled { + // todo handle default-enabled functionality + return false + } + enabled, exists := e.experiments[experiment] + if !exists { + return false + } + return enabled +} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 22dc43b..e62c2bd 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -69,11 +69,30 @@ type cwtchPeer struct { queue event.Queue eventBus event.Manager - extensions []ProfileHook - extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions + extensions []ProfileHook + extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions + experiments model.Experiments + experimentsLock sync.Mutex +} + +// IsFeatureEnabled returns true if the functionality defined by featureName has been enabled by the application, false otherwise. +// this function is intended to be used by ProfileHooks to determine if they should execute experimental functionality. +func (cp *cwtchPeer) IsFeatureEnabled(featureName string) bool { + cp.experimentsLock.Lock() + defer cp.experimentsLock.Unlock() + return cp.experiments.IsEnabled(featureName) +} + +// UpdateExperiments notifies a Cwtch profile of a change in the nature of global experiments. The Cwtch Profile uses +// this information to update registered extensions. +func (cp *cwtchPeer) UpdateExperiments(enabled bool, experiments map[string]bool) { + cp.experimentsLock.Lock() + defer cp.experimentsLock.Unlock() + cp.experiments = model.InitExperiments(enabled, experiments) } func (cp *cwtchPeer) PublishEvent(resp event.Event) { + log.Debugf("Publishing Event: %v %v", resp.EventType, resp.Data) cp.eventBus.Publish(resp) } @@ -154,7 +173,7 @@ func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpass // probably redundant but we like api safety if newpassword == newpasswordAgain { rekey := createKey(newpassword, salt) - log.Infof("rekeying database...") + log.Debugf("rekeying database...") return cp.storage.Rekey(rekey) } return errors.New(constants.PasswordsDoNotMatchError) @@ -1286,9 +1305,15 @@ func (cp *cwtchPeer) eventHandler() { // Safe Access to Extensions cp.extensionLock.Lock() - log.Infof("checking extension...%v", cp.extensions) + log.Debugf("checking extension...%v", cp.extensions) for _, extension := range cp.extensions { - log.Infof("checking extension...%v", extension) + log.Debugf("checking extension...%v", extension) + // check if the current map of experiments satisfies the extension requirements + if !cp.checkExtensionExperiment(extension) { + log.Debugf("skipping extension...not all experiments satisfied") + continue + } + extension.extension.OnContactRequestValue(cp, *conversationInfo, ev.EventID, scopedZonedPath) } cp.extensionLock.Unlock() @@ -1313,6 +1338,12 @@ func (cp *cwtchPeer) eventHandler() { // Safe Access to Extensions cp.extensionLock.Lock() for _, extension := range cp.extensions { + log.Debugf("checking extension...%v", extension) + // check if the current map of experiments satisfies the extension requirements + if !cp.checkExtensionExperiment(extension) { + log.Debugf("skipping extension...not all experiments satisfied") + continue + } extension.extension.OnContactReceiveValue(cp, *conversationInfo, scopedZonedPath, val, exists) } cp.extensionLock.Unlock() @@ -1406,6 +1437,13 @@ func (cp *cwtchPeer) eventHandler() { processed := false cp.extensionLock.Lock() for _, extension := range cp.extensions { + + // check if the current map of experiments satisfies the extension requirements + if !cp.checkExtensionExperiment(extension) { + log.Debugf("skipping extension...not all experiments satisfied") + continue + } + // if the extension is registered for this event type then process if _, contains := extension.events[ev.EventType]; contains { extension.extension.OnEvent(ev, cp) @@ -1421,6 +1459,17 @@ func (cp *cwtchPeer) eventHandler() { } } +func (cp *cwtchPeer) checkExtensionExperiment(hook ProfileHook) bool { + cp.experimentsLock.Lock() + defer cp.experimentsLock.Unlock() + for experiment := range hook.experiments { + if !cp.experiments.IsEnabled(experiment) { + return false + } + } + return true +} + // attemptInsertOrAcknowledgeLegacyGroupConversation is a convenience method that looks up the conversation // by the given handle and attempts to mark the message as acknowledged. returns error on failure // to either find the contact or the associated message diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index e7cca3f..ce79a6e 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -125,7 +125,9 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt) if err != nil { db.Close() - log.Errorf("error preparing query: %v %v", insertProfileKeySQLStmt, err) + // note: this is debug because we expect failure here when opening an encrypted database with an + // incorrect password. The rest are errors because failure is not expected. + log.Debugf("error preparing query: %v %v", insertProfileKeySQLStmt, err) return nil, err } @@ -772,7 +774,7 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() { for _, conversation := range ci { if !conversation.IsGroup() && !conversation.IsServer() { if conversation.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(event.SaveHistoryKey)).ToString()] != event.SaveHistoryConfirmed { - log.Infof("purging conversation...") + log.Debugf("purging conversation...") // TODO: At some point in the future this needs to iterate over channels and make a decision for each on.. cps.PurgeConversationChannel(conversation.ID, 0) } diff --git a/peer/hooks.go b/peer/hooks.go index d061c27..1f4b545 100644 --- a/peer/hooks.go +++ b/peer/hooks.go @@ -7,11 +7,10 @@ import ( ) type ProfileHooks interface { - // RegisterEvents returns a set of events that the extension is interested hooking RegisterEvents() []event.Type - // RegisterExperiments RegisterExperiments returns a set of experiments that the extension is interested in being notified about + // RegisterExperiments returns a set of experiments that the extension is interested in being notified about RegisterExperiments() []string // OnEvent is called whenever an event Registered with RegisterEvents is called diff --git a/peer/profile_interface.go b/peer/profile_interface.go index c9fa835..98c463d 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -132,4 +132,6 @@ type CwtchPeer interface { Delete() PublishEvent(resp event.Event) RegisterHook(hook ProfileHooks) + UpdateExperiments(enabled bool, experiments map[string]bool) + IsFeatureEnabled(featureName string) bool } diff --git a/peer/storage.go b/peer/storage.go index c69b8e1..42a5872 100644 --- a/peer/storage.go +++ b/peer/storage.go @@ -176,7 +176,7 @@ func ImportProfile(exportedCwtchFile string, profilesDir string, password string log.Errorf("%s is an invalid cwtch backup file: %s", profileID, err) return nil, err } - log.Infof("%s is a valid cwtch backup file", profileID) + log.Debugf("%s is a valid cwtch backup file", profileID) profileDBFile := filepath.Join(profilesDir, profileID, dbFile) log.Debugf("checking %v", profileDBFile) diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go index 32110ba..98c6802 100644 --- a/storage/v1/profile_store.go +++ b/storage/v1/profile_store.go @@ -73,7 +73,7 @@ func (ps *ProfileStoreV1) load() error { for gid, group := range cp.Groups { if group.Version == 0 { - log.Infof("group %v is of unsupported version 0. dropping group...\n", group.GroupID) + log.Debugf("group %v is of unsupported version 0. dropping group...\n", group.GroupID) delete(cp.Groups, gid) continue } diff --git a/storage/v1/stream_store.go b/storage/v1/stream_store.go index c90c9ee..759104b 100644 --- a/storage/v1/stream_store.go +++ b/storage/v1/stream_store.go @@ -152,7 +152,7 @@ func (ss *streamStore) WriteN(messages []model.Message) { ss.lock.Lock() defer ss.lock.Unlock() - log.Infof("WriteN %v messages\n", len(messages)) + log.Debugf("WriteN %v messages\n", len(messages)) i := 0 for _, m := range messages { ss.updateBuffer(m) diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index 2574be0..a438868 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -118,13 +118,18 @@ func TestFileSharing(t *testing.T) { app.ActivatePeerEngine(bob.GetOnion(), true, true, true) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) - bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived}) + bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) queueOracle := event.NewQueue() app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle) - t.Logf("** Launching Peers...") + // Turn on File Sharing Experiment... + settings := app.ReadSettings() + settings.ExperimentsEnabled = true + settings.Experiments[constants.FileSharingExperiment] = true + app.UpdateSettings(settings) + t.Logf("** Launching Peers...") waitTime := time.Duration(30) * time.Second t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime) time.Sleep(waitTime) @@ -163,7 +168,11 @@ func TestFileSharing(t *testing.T) { // Restart t.Logf("Restarting File Share") - filesharingFunctionality.ReShareFiles(alice) + err = filesharingFunctionality.ReShareFiles(alice) + + if err != nil { + t.Fatalf("Error!: %v", err) + } // run the same download test again...to check that we can actually download the file testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle) From 861390b11d20b252f28059ff1dec400ca81b0815 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 25 Jan 2023 12:34:58 -0800 Subject: [PATCH 04/13] Rename API --- extensions/profile_value.go | 4 ++-- .../filesharing/filesharing_functionality.go | 4 ++-- peer/cwtch_peer.go | 2 +- peer/hooks.go | 12 ++++++------ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/extensions/profile_value.go b/extensions/profile_value.go index 1a5327f..3855807 100644 --- a/extensions/profile_value.go +++ b/extensions/profile_value.go @@ -14,11 +14,11 @@ import ( type ProfileValueExtension struct { } -func (pne ProfileValueExtension) RegisterEvents() []event.Type { +func (pne ProfileValueExtension) EventsToRegister() []event.Type { return nil } -func (pne ProfileValueExtension) RegisterExperiments() []string { +func (pne ProfileValueExtension) ExperimentsToRegister() []string { return nil } diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 1f6625a..87c87a4 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -30,11 +30,11 @@ import ( type Functionality struct { } -func (f Functionality) RegisterEvents() []event.Type { +func (f Functionality) EventsToRegister() []event.Type { return []event.Type{event.ManifestReceived, event.FileDownloaded} } -func (f Functionality) RegisterExperiments() []string { +func (f Functionality) ExperimentsToRegister() []string { return []string{constants.FileSharingExperiment} } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index e62c2bd..88aef1a 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -101,7 +101,7 @@ func (cp *cwtchPeer) RegisterHook(extension ProfileHooks) { defer cp.extensionLock.Unlock() // Register Requested Events - for _, event := range extension.RegisterEvents() { + for _, event := range extension.EventsToRegister() { cp.eventBus.Subscribe(event, cp.queue) } diff --git a/peer/hooks.go b/peer/hooks.go index 1f4b545..33eec53 100644 --- a/peer/hooks.go +++ b/peer/hooks.go @@ -7,11 +7,11 @@ import ( ) type ProfileHooks interface { - // RegisterEvents returns a set of events that the extension is interested hooking - RegisterEvents() []event.Type + // EventsToRegister returns a set of events that the extension is interested hooking + EventsToRegister() []event.Type - // RegisterExperiments returns a set of experiments that the extension is interested in being notified about - RegisterExperiments() []string + // ExperimentsToRegister returns a set of experiments that the extension is interested in being notified about + ExperimentsToRegister() []string // OnEvent is called whenever an event Registered with RegisterEvents is called OnEvent(event event.Event, profile CwtchPeer) @@ -31,12 +31,12 @@ type ProfileHook struct { func ConstructHook(extension ProfileHooks) ProfileHook { events := make(map[event.Type]bool) - for _, event := range extension.RegisterEvents() { + for _, event := range extension.EventsToRegister() { events[event] = true } experiments := make(map[string]bool) - for _, experiment := range extension.RegisterExperiments() { + for _, experiment := range extension.ExperimentsToRegister() { experiments[experiment] = true } From 0e49d70d65612249430e84b013769236b0eefa31 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 21 Feb 2023 15:55:14 -0800 Subject: [PATCH 05/13] Large API Refactor in prep for autobindings --- app/app.go | 43 +++++++--- event/common.go | 3 +- .../filesharing/filesharing_functionality.go | 67 +++++++++++++++ model/constants/attributes.go | 2 + peer/cwtch_peer.go | 86 ++++++++++++++++++- peer/profile_interface.go | 26 +++++- testing/cwtch_peer_server_integration_test.go | 14 +-- .../encrypted_storage_integration_test.go | 10 +-- .../file_sharing_integration_test.go | 12 +-- 9 files changed, 230 insertions(+), 33 deletions(-) diff --git a/app/app.go b/app/app.go index 32b5e33..5398518 100644 --- a/app/app.go +++ b/app/app.go @@ -38,11 +38,10 @@ type application struct { // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers type Application interface { LoadProfiles(password string) - CreatePeer(name string, password string, attributes map[attr.ZonedPath]string) - // Deprecated in 1.10 - CreateTaggedPeer(name string, password string, tag string) + CreateProfile(name string, password string, autostart bool) + ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) - DeletePeer(onion string, currentPassword string) + DeleteProfile(onion string, currentPassword string) AddPeerPlugin(onion string, pluginID plugins.PluginID) GetPrimaryBus() event.Manager @@ -51,7 +50,7 @@ type Application interface { QueryACNVersion() ActivateEngines(doListn, doPeers, doServers bool) - ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) + ActivatePeerEngine(onion string) DeactivatePeerEngine(onion string) ReadSettings() GlobalSettings @@ -67,8 +66,7 @@ type Application interface { // LoadProfileFn is the function signature for a function in an app that loads a profile type LoadProfileFn func(profile peer.CwtchPeer) -// NewApp creates a new app with some environment awareness and initializes a Tor Manager -func NewApp(acn connectivity.ACN, appDirectory string) Application { +func InitApp(appDirectory string) *GlobalSettingsFile { log.Debugf("NewApp(%v)\n", appDirectory) os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) @@ -79,6 +77,11 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { if err != nil { log.Errorf("error initializing global settings file %. Global settings might not be loaded or saves", err) } + return settings +} + +// NewApp creates a new app with some environment awareness and initializes a Tor Manager +func NewApp(acn connectivity.ACN, appDirectory string, settings *GlobalSettingsFile) Application { app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings} app.peers = make(map[string]peer.CwtchPeer) @@ -159,6 +162,22 @@ func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.M } } +func (app *application) CreateProfile(name string, password string, autostart bool) { + autostartVal := constants.True + if !autostart { + autostartVal = constants.False + } + tagVal := constants.ProfileTypeV1Password + if password == DefactoPasswordForUnencryptedProfiles { + tagVal = constants.ProfileTypeV1DefaultPassword + } + + app.CreatePeer(name, password, map[attr.ZonedPath]string{ + attr.ProfileZone.ConstructZonedPath(constants.Tag): tagVal, + attr.ProfileZone.ConstructZonedPath(constants.PeerAutostart): autostartVal, + }) +} + // Deprecated in 1.10 func (app *application) CreateTaggedPeer(name string, password string, tag string) { app.CreatePeer(name, password, map[attr.ZonedPath]string{attr.ProfileZone.ConstructZonedPath(constants.Tag): tag}) @@ -192,8 +211,8 @@ func (app *application) CreatePeer(name string, password string, attributes map[ app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) } -func (app *application) DeletePeer(onion string, password string) { - log.Debugf("DeletePeer called on %v\n", onion) +func (app *application) DeleteProfile(onion string, password string) { + log.Debugf("DeleteProfile called on %v\n", onion) app.appmutex.Lock() defer app.appmutex.Unlock() @@ -333,7 +352,7 @@ func (app *application) ActivateEngines(doListen, doPeers, doServers bool) { } // ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online -func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) { +func (app *application) ActivatePeerEngine(onion string) { profile := app.GetPeer(onion) if profile != nil { if _, exists := app.engines[onion]; !exists { @@ -341,10 +360,10 @@ func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doSe app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.QueryACNStatus() - if doListen { + if true { profile.Listen() } - profile.StartConnections(doPeers, doServers) + profile.StartConnections(true, true) } } } diff --git a/event/common.go b/event/common.go index abb6976..c9041b2 100644 --- a/event/common.go +++ b/event/common.go @@ -228,7 +228,8 @@ type Field string const ( // A peers local onion address - Onion = Field("Onion") + Onion = Field("Onion") + ProfileOnion = Field("ProfileOnion") RemotePeer = Field("RemotePeer") LastSeen = Field("LastSeen") diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 87c87a4..970d7c8 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -178,6 +178,65 @@ func (om *OverlayMessage) ShouldAutoDL() bool { return false } +func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversation int, fileKey string) { + if manifestFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", fileKey)); exists { + if downloadfilepath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey)); exists { + log.Infof("resuming %s", fileKey) + f.DownloadFile(profile, conversation, downloadfilepath, manifestFilePath, fileKey, files.MaxManifestSize*files.DefaultChunkSize) + } else { + log.Errorf("found manifest path but not download path for %s", fileKey) + } + } else { + log.Errorf("no stored manifest path found for %s", fileKey) + } +} + +func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey string) { + path, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey)) + if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True { + profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{ + event.ProfileOnion: profile.GetOnion(), + event.FileKey: fileKey, + event.FilePath: path, + event.TempFile: "", + })) + } else { + log.Infof("CheckDownloadStatus found .path but not .complete") + profile.PublishEvent(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{ + event.ProfileOnion: profile.GetOnion(), + event.FileKey: fileKey, + event.Progress: "-1", + event.FileSizeInChunks: "-1", + event.FilePath: path, + })) + } +} + +func (f *Functionality) EnhancedShareFile(profile peer.CwtchPeer, conversationID int, sharefilepath string) string { + fileKey, overlay, err := f.ShareFile(sharefilepath, profile) + if err != nil { + log.Errorf("error sharing file: %v", err) + } else if conversationID == -1 { + // FIXME: At some point we might want to allow arbitrary public files, but for now this API will assume + // there is only one, and it is the custom profile image... + profile.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey, fileKey) + } else { + // Set a new attribute so we can associate this download with this conversation... + profile.SetConversationAttribute(conversationID, attr.ConversationScope.ConstructScopedZonedPath(attr.FilesharingZone.ConstructZonedPath(fileKey)), "") + id, err := profile.SendMessage(conversationID, overlay) + if err == nil { + return profile.EnhancedGetMessageById(conversationID, id) + } + } + return "" +} + +// DownloadFileDefaultLimit given a profile, a conversation handle and a file sharing key, start off a download process +// to downloadFilePath with a default filesize limit +func (f *Functionality) DownloadFileDefaultLimit(profile peer.CwtchPeer, conversationID int, downloadFilePath string, manifestFilePath string, key string) error { + return f.DownloadFile(profile, conversationID, downloadFilePath, manifestFilePath, key, files.MaxManifestSize*files.DefaultChunkSize) +} + // DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process // to downloadFilePath func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int, downloadFilePath string, manifestFilePath string, key string, limit uint64) error { @@ -409,6 +468,14 @@ type SharedFile struct { Expired bool } +func (f *Functionality) EnhancedGetSharedFiles(profile peer.CwtchPeer, conversationID int) string { + data, err := json.Marshal(f.GetSharedFiles(profile, conversationID)) + if err == nil { + return string(data) + } + return "" +} + // GetSharedFiles returns all file shares associated with a given conversation func (f *Functionality) GetSharedFiles(profile peer.CwtchPeer, conversationID int) []SharedFile { sharedFiles := []SharedFile{} diff --git a/model/constants/attributes.go b/model/constants/attributes.go index 9e6f4f6..e498222 100644 --- a/model/constants/attributes.go +++ b/model/constants/attributes.go @@ -56,3 +56,5 @@ const SyncPreLastMessageTime = "SyncPreLastMessageTime" const SyncMostRecentMessageTime = "SyncMostRecentMessageTime" const AttrLastConnectionTime = "last-connection-time" +const PeerAutostart = "autostart" +const Archived = "archived" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 88aef1a..8375d41 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -75,6 +75,90 @@ type cwtchPeer struct { experimentsLock sync.Mutex } +func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count int) string { + var emessages []EnhancedMessage = make([]EnhancedMessage, count) + + messages, err := cp.GetMostRecentMessages(conversation, 0, index, count) + if err == nil { + + for i, message := range messages { + + time, _ := time.Parse(time.RFC3339Nano, message.Attr[constants.AttrSentTimestamp]) + emessages[i].Message = model.Message{ + Message: message.Body, + Acknowledged: message.Attr[constants.AttrAck] == constants.True, + Error: message.Attr[constants.AttrErr], + PeerID: message.Attr[constants.AttrAuthor], + Timestamp: time, + } + emessages[i].ID = message.ID + emessages[i].Attributes = message.Attr + emessages[i].ContentHash = model.CalculateContentHash(message.Attr[constants.AttrAuthor], message.Body) + } + } + + bytes, _ := json.Marshal(emessages) + return string(bytes) +} + +func (cp *cwtchPeer) EnhancedGetMessageById(conversation int, messageID int) string { + var message EnhancedMessage + dbmessage, attr, err := cp.GetChannelMessage(conversation, 0, messageID) + if err == nil { + time, _ := time.Parse(time.RFC3339Nano, attr[constants.AttrSentTimestamp]) + message.Message = model.Message{ + Message: dbmessage, + Acknowledged: attr[constants.AttrAck] == constants.True, + Error: attr[constants.AttrErr], + PeerID: attr[constants.AttrAuthor], + Timestamp: time, + } + message.ID = messageID + message.Attributes = attr + message.ContentHash = model.CalculateContentHash(attr[constants.AttrAuthor], dbmessage) + } + bytes, _ := json.Marshal(message) + return string(bytes) +} + +func (cp *cwtchPeer) EnhancedGetMessageByContentHash(conversation int, contentHash string) string { + var message EnhancedMessage + offset, err := cp.GetChannelMessageByContentHash(conversation, 0, contentHash) + if err == nil { + messages, err := cp.GetMostRecentMessages(conversation, 0, offset, 1) + if err == nil { + time, _ := time.Parse(time.RFC3339Nano, messages[0].Attr[constants.AttrSentTimestamp]) + message.Message = model.Message{ + Message: messages[0].Body, + Acknowledged: messages[0].Attr[constants.AttrAck] == constants.True, + Error: messages[0].Attr[constants.AttrErr], + PeerID: messages[0].Attr[constants.AttrAuthor], + Timestamp: time, + } + message.ID = messages[0].ID + message.Attributes = messages[0].Attr + message.LocalIndex = offset + message.ContentHash = contentHash + } else { + log.Errorf("error fetching local index {} ", err) + } + } + bytes, _ := json.Marshal(message) + return string(bytes) +} + +func (cp *cwtchPeer) EnhancedSendMessage(conversation int, message string) string { + mid, err := cp.SendMessage(conversation, message) + if err == nil { + return cp.EnhancedGetMessageById(conversation, mid) + } + return "" +} + +func (cp *cwtchPeer) ArchiveConversation(conversationID int) { + cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Archived)), constants.True) +} + // IsFeatureEnabled returns true if the functionality defined by featureName has been enabled by the application, false otherwise. // this function is intended to be used by ProfileHooks to determine if they should execute experimental functionality. func (cp *cwtchPeer) IsFeatureEnabled(featureName string) bool { @@ -119,7 +203,7 @@ func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass } } -func (cp *cwtchPeer) Export(file string) error { +func (cp *cwtchPeer) ExportProfile(file string) error { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.storage.Export(file) diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 98c463d..4a22052 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -48,6 +48,10 @@ type ModifyServers interface { // SendMessages enables a caller to sender messages to a contact type SendMessages interface { SendMessage(conversation int, message string) (int, error) + + // EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database + EnhancedSendMessage(conversation int, message string) string + SendInviteToConversation(conversationID int, inviteConversationID int) (int, error) SendScopedZonedGetValToContact(conversationID int, scope attr.Scope, zone attr.Zone, key string) } @@ -105,6 +109,7 @@ type CwtchPeer interface { // New Unified Conversation Interfaces NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) FetchConversations() ([]*model.Conversation, error) + ArchiveConversation(conversation int) GetConversationInfo(conversation int) (*model.Conversation, error) FetchConversationInfo(handle string) (*model.Conversation, error) AcceptConversation(conversation int) error @@ -121,6 +126,15 @@ type CwtchPeer interface { GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error + // EnhancedGetMessageById returns a json-encoded enhanced message, suitable for rendering in a UI + EnhancedGetMessageById(conversation int, mid int) string + + // EnhancedGetMessageByContentHash returns a json-encoded enhanced message, suitable for rendering in a UI + EnhancedGetMessageByContentHash(conversation int, hash string) string + + // EnhancedGetMessages returns a set of json-encoded enhanced messages, suitable for rendering in a UI + EnhancedGetMessages(conversation int, index int, count int) string + // Server Token APIS // TODO move these to feature protected interfaces StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) @@ -128,10 +142,20 @@ type CwtchPeer interface { // Profile Management CheckPassword(password string) bool ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error - Export(file string) error + ExportProfile(file string) error Delete() PublishEvent(resp event.Event) RegisterHook(hook ProfileHooks) UpdateExperiments(enabled bool, experiments map[string]bool) IsFeatureEnabled(featureName string) bool } + +// EnhancedMessage wraps a Cwtch model.Message with some additional data to reduce calls from the UI. +type EnhancedMessage struct { + model.Message + ID int // the actual ID of the message in the database (not the row number) + LocalIndex int // local index in the DB (row #). Can be empty (most calls supply it) but lookup by hash will fill it + ContentHash string + ContactImage string + Attributes map[string]string +} diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 9845b81..18e92cc 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -139,7 +139,7 @@ func TestCwtchPeerIntegration(t *testing.T) { const ServerAddr = "nfhxzvzxinripgdh4t2m4xcy3crf6p4cbhectgckuj3idsjsaotgowad" serverKeyBundle, _ := base64.StdEncoding.DecodeString(ServerKeyBundleBase64) - app := app2.NewApp(acn, "./storage") + app := app2.NewApp(acn, "./storage", app2.InitApp("./storage")) usr, _ := user.Current() cwtchDir := path.Join(usr.HomeDir, ".cwtch") @@ -152,31 +152,31 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** cwtchPeer setup ***** log.Infoln("Creating Alice...") - app.CreateTaggedPeer("Alice", "asdfasdf", "test") + app.CreateProfile("Alice", "asdfasdf", true) log.Infoln("Creating Bob...") - app.CreateTaggedPeer("Bob", "asdfasdf", "test") + app.CreateProfile("Bob", "asdfasdf", true) log.Infoln("Creating Carol...") - app.CreateTaggedPeer("Carol", "asdfasdf", "test") + app.CreateProfile("Carol", "asdfasdf", true) alice := app2.WaitGetPeer(app, "Alice") aliceBus := app.GetEventBus(alice.GetOnion()) - app.ActivatePeerEngine(alice.GetOnion(), true, true, true) + app.ActivatePeerEngine(alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion()) alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob := app2.WaitGetPeer(app, "Bob") bobBus := app.GetEventBus(bob.GetOnion()) - app.ActivatePeerEngine(bob.GetOnion(), true, true, true) + app.ActivatePeerEngine(bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion()) bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol := app2.WaitGetPeer(app, "Carol") carolBus := app.GetEventBus(carol.GetOnion()) - app.ActivatePeerEngine(carol.GetOnion(), true, true, true) + app.ActivatePeerEngine(carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion()) carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) diff --git a/testing/encryptedstorage/encrypted_storage_integration_test.go b/testing/encryptedstorage/encrypted_storage_integration_test.go index 27bf862..225d178 100644 --- a/testing/encryptedstorage/encrypted_storage_integration_test.go +++ b/testing/encryptedstorage/encrypted_storage_integration_test.go @@ -59,9 +59,9 @@ func TestEncryptedStorage(t *testing.T) { defer acn.Close() acn.WaitTillBootstrapped() - app := app2.NewApp(acn, cwtchDir) - app.CreateTaggedPeer("alice", "password", constants.ProfileTypeV1Password) - app.CreateTaggedPeer("bob", "password", constants.ProfileTypeV1Password) + app := app2.NewApp(acn, cwtchDir, app2.InitApp(cwtchDir)) + app.CreateProfile("alice", "password", true) + app.CreateProfile("bob", "password", true) alice := app2.WaitGetPeer(app, "alice") bob := app2.WaitGetPeer(app, "bob") @@ -130,7 +130,7 @@ func TestEncryptedStorage(t *testing.T) { t.Fatalf("expeced GetMostRecentMessages to return 1, instead returned: %v %v", len(messages), messages) } - err = alice.Export("alice.tar.gz") + err = alice.ExportProfile("alice.tar.gz") if err != nil { t.Fatalf("could not export profile: %v", err) } @@ -140,7 +140,7 @@ func TestEncryptedStorage(t *testing.T) { t.Fatal("profile is already imported...this should fail") } - app.DeletePeer(alice.GetOnion(), "password") + app.DeleteProfile(alice.GetOnion(), "password") alice, err = app.ImportProfile("alice.tar.gz", "password") if err != nil { t.Fatalf("profile should have successfully imported: %s", err) diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index a438868..e3cabee 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -97,7 +97,7 @@ func TestFileSharing(t *testing.T) { acn.WaitTillBootstrapped() defer acn.Close() - app := app2.NewApp(acn, "./storage") + app := app2.NewApp(acn, "./storage", app2.InitApp("./storage")) usr, _ := user.Current() cwtchDir := path.Join(usr.HomeDir, ".cwtch") @@ -106,16 +106,16 @@ func TestFileSharing(t *testing.T) { os.Mkdir(path.Join(cwtchDir, "testing"), 0700) t.Logf("Creating Alice...") - app.CreateTaggedPeer("alice", "asdfasdf", "testing") + app.CreateProfile("alice", "asdfasdf", true) t.Logf("Creating Bob...") - app.CreateTaggedPeer("bob", "asdfasdf", "testing") + app.CreateProfile("bob", "asdfasdf", true) t.Logf("** Waiting for Alice, Bob...") alice := app2.WaitGetPeer(app, "alice") - app.ActivatePeerEngine(alice.GetOnion(), true, true, true) + app.ActivatePeerEngine(alice.GetOnion()) bob := app2.WaitGetPeer(app, "bob") - app.ActivatePeerEngine(bob.GetOnion(), true, true, true) + app.ActivatePeerEngine(bob.GetOnion()) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) @@ -178,7 +178,7 @@ func TestFileSharing(t *testing.T) { testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle) // test that we can delete bob... - app.DeletePeer(bob.GetOnion(), "asdfasdf") + app.DeleteProfile(bob.GetOnion(), "asdfasdf") queueOracle.Shutdown() app.Shutdown() From 195e048410d78caa4a44a91c7c5a9e3c6a6c60a9 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sat, 25 Feb 2023 07:19:12 -0800 Subject: [PATCH 06/13] Fix Map Panic --- model/experiments.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/model/experiments.go b/model/experiments.go index 36ca753..2c940de 100644 --- a/model/experiments.go +++ b/model/experiments.go @@ -1,10 +1,13 @@ package model +import "sync" + // Experiments are optional functionality that can be enabled/disabled by an application either completely or individually. // examples of experiments include File Sharing, Profile Images and Groups. type Experiments struct { enabled bool experiments map[string]bool + lock sync.Mutex } // InitExperiments encapsulates a set of experiments separate from their storage in GlobalSettings. @@ -24,6 +27,10 @@ func (e *Experiments) IsEnabled(experiment string) bool { // todo handle default-enabled functionality return false } + + // go will sometimes panic if we do not lock this read-only map... + e.lock.Lock() + defer e.lock.Unlock() enabled, exists := e.experiments[experiment] if !exists { return false From 05e77604d2e0822fc535764ad80300b42c0b511c Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 27 Feb 2023 12:01:54 -0800 Subject: [PATCH 07/13] Experiments Update on Load --- app/app.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/app.go b/app/app.go index 5398518..79ff14f 100644 --- a/app/app.go +++ b/app/app.go @@ -312,10 +312,14 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { if app.peers[profile.GetOnion()] == nil { eventBus := event.NewEventManager() app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) app.registerHooks(profile) + profile.Init(app.eventBuses[profile.GetOnion()]) app.peers[profile.GetOnion()] = profile + // Update the Peer with the Most Recent Experiment State... + settings := app.settings.ReadGlobalSettings() + profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments) + app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) From 9abece0f50fb95d09eaed3e54d40c34ccfb201b7 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 27 Feb 2023 12:07:19 -0800 Subject: [PATCH 08/13] Reorganize Peer Init --- app/app.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/app/app.go b/app/app.go index 79ff14f..cfb3367 100644 --- a/app/app.go +++ b/app/app.go @@ -312,16 +312,20 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { if app.peers[profile.GetOnion()] == nil { eventBus := event.NewEventManager() app.eventBuses[profile.GetOnion()] = eventBus - app.registerHooks(profile) - profile.Init(app.eventBuses[profile.GetOnion()]) + + // Initialize the Peer with the Given Event Bus app.peers[profile.GetOnion()] = profile + profile.Init(app.eventBuses[profile.GetOnion()]) // Update the Peer with the Most Recent Experiment State... settings := app.settings.ReadGlobalSettings() profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments) + app.registerHooks(profile) + // Register the Peer With Application Plugins.. app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory + // Finalize the Creation of Peer / Notify any Interfaces.. app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) return true } From 848d5971b6d9045edbc5675c1d3a3588a36eec04 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 27 Feb 2023 13:31:32 -0800 Subject: [PATCH 09/13] Consolidating Profile Setup Logic --- app/app.go | 54 +++++++++++++++---------- extensions/profile_value.go | 2 +- peer/cwtch_peer.go | 4 ++ peer/profile_interface.go | 1 + protocol/files/filesharing_subsystem.go | 2 +- 5 files changed, 40 insertions(+), 23 deletions(-) diff --git a/app/app.go b/app/app.go index cfb3367..08dfb4f 100644 --- a/app/app.go +++ b/app/app.go @@ -113,6 +113,14 @@ func (app *application) UpdateSettings(settings GlobalSettings) { defer app.peerLock.Unlock() for _, profile := range app.peers { profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments) + + // Explicitly toggle blocking/unblocking of unknown connections for profiles + // that have been loaded. + if settings.BlockUnknownConnections { + profile.BlockUnknownConnections() + } else { + profile.AllowUnknownConnections() + } } } @@ -183,6 +191,24 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin app.CreatePeer(name, password, map[attr.ZonedPath]string{attr.ProfileZone.ConstructZonedPath(constants.Tag): tag}) } +func (app *application) setupPeer(profile peer.CwtchPeer) { + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + + // Initialize the Peer with the Given Event Bus + app.peers[profile.GetOnion()] = profile + profile.Init(app.eventBuses[profile.GetOnion()]) + + // Update the Peer with the Most Recent Experiment State... + settings := app.settings.ReadGlobalSettings() + profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments) + app.registerHooks(profile) + + // Register the Peer With Application Plugins.. + app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory + +} + func (app *application) CreatePeer(name string, password string, attributes map[attr.ZonedPath]string) { app.appmutex.Lock() defer app.appmutex.Unlock() @@ -196,18 +222,13 @@ func (app *application) CreatePeer(name string, password string, attributes map[ return } - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.registerHooks(profile) - app.peers[profile.GetOnion()] = profile + app.setupPeer(profile) for zp, val := range attributes { zone, key := attr.ParseZone(zp.ToString()) profile.SetScopedZonedAttribute(attr.LocalScope, zone, key, val) } - app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) } @@ -216,6 +237,11 @@ func (app *application) DeleteProfile(onion string, password string) { app.appmutex.Lock() defer app.appmutex.Unlock() + // allow a blank password to delete "unencrypted" accounts... + if password == "" { + password = DefactoPasswordForUnencryptedProfiles + } + if app.peers[onion].CheckPassword(password) { // soft-shutdown app.peers[onion].Shutdown() @@ -310,21 +336,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { // Only attempt to finalize the profile if we don't have one loaded... if app.peers[profile.GetOnion()] == nil { - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - - // Initialize the Peer with the Given Event Bus - app.peers[profile.GetOnion()] = profile - profile.Init(app.eventBuses[profile.GetOnion()]) - - // Update the Peer with the Most Recent Experiment State... - settings := app.settings.ReadGlobalSettings() - profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments) - app.registerHooks(profile) - - // Register the Peer With Application Plugins.. - app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory - + app.setupPeer(profile) // Finalize the Creation of Peer / Notify any Interfaces.. app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) return true diff --git a/extensions/profile_value.go b/extensions/profile_value.go index 3855807..5c4dddc 100644 --- a/extensions/profile_value.go +++ b/extensions/profile_value.go @@ -41,7 +41,7 @@ func (pne ProfileValueExtension) OnContactReceiveValue(profile peer.CwtchPeer, c // OnContactRequestValue for ProfileValueExtension handles returning Public Profile Values func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) { scope, zone, zpath := szp.GetScopeZonePath() - log.Debugf("Looking up public | conversation scope/zone %v", szp.ToString()) + log.Infof("Looking up public | conversation scope/zone %v", szp.ToString()) if scope.IsPublic() || scope.IsConversation() { val, exists := profile.GetScopedZonedAttribute(scope, zone, zpath) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 8375d41..560c4fe 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -75,6 +75,10 @@ type cwtchPeer struct { experimentsLock sync.Mutex } +func (cp *cwtchPeer) EnhancedImportBundle(importString string) string { + return cp.ImportBundle(importString).Error() +} + func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count int) string { var emessages []EnhancedMessage = make([]EnhancedMessage, count) diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 4a22052..2954dfa 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -105,6 +105,7 @@ type CwtchPeer interface { // Import Bundle ImportBundle(string) error + EnhancedImportBundle(string) string // New Unified Conversation Interfaces NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) diff --git a/protocol/files/filesharing_subsystem.go b/protocol/files/filesharing_subsystem.go index 2d63b81..011d015 100644 --- a/protocol/files/filesharing_subsystem.go +++ b/protocol/files/filesharing_subsystem.go @@ -34,7 +34,7 @@ func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest s log.Errorf("could not share file %v", err) return } - log.Debugf("sharing file: %v %v", fileKey, serializedManifest) + log.Infof("sharing file: %v %v", fileKey, serializedManifest) fsss.activeShares.Store(fileKey, &manifest) } From aceb4adeb1b692ea4d922706eba81e88eaf5c535 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 27 Feb 2023 14:05:52 -0800 Subject: [PATCH 10/13] Support for Enhanced Import --- app/app.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/app/app.go b/app/app.go index 08dfb4f..25f7592 100644 --- a/app/app.go +++ b/app/app.go @@ -41,6 +41,7 @@ type Application interface { CreateProfile(name string, password string, autostart bool) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) + EnhancedImportProfile(exportedCwtchFile string, password string) string DeleteProfile(onion string, currentPassword string) AddPeerPlugin(onion string, pluginID plugins.PluginID) @@ -271,6 +272,14 @@ func (app *application) ImportProfile(exportedCwtchFile string, password string) return profile, err } +func (app *application) EnhancedImportProfile(exportedCwtchFile string, password string) string { + _, err := app.ImportProfile(exportedCwtchFile, password) + if err == nil { + return "" + } + return err.Error() +} + // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them func (app *application) LoadProfiles(password string) { count := 0 From 14962e2428c61dcb104ef604bdf25a8604801b03 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 28 Feb 2023 10:00:32 -0800 Subject: [PATCH 11/13] Logging Fixes / InitApp -> InitAppSettings --- app/app.go | 13 ++++++++++++- extensions/profile_value.go | 2 +- .../filesharing/filesharing_functionality.go | 4 ++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/app/app.go b/app/app.go index 25f7592..6198d27 100644 --- a/app/app.go +++ b/app/app.go @@ -35,6 +35,16 @@ type application struct { settings *GlobalSettingsFile } +func (app *application) IsFeatureEnabled(experiment string) bool { + settings := app.ReadSettings() + if settings.ExperimentsEnabled { + if status, exists := settings.Experiments[experiment]; exists { + return status + } + } + return false +} + // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers type Application interface { LoadProfiles(password string) @@ -56,6 +66,7 @@ type Application interface { ReadSettings() GlobalSettings UpdateSettings(settings GlobalSettings) + IsFeatureEnabled(experiment string) bool ShutdownPeer(string) Shutdown() @@ -67,7 +78,7 @@ type Application interface { // LoadProfileFn is the function signature for a function in an app that loads a profile type LoadProfileFn func(profile peer.CwtchPeer) -func InitApp(appDirectory string) *GlobalSettingsFile { +func LoadAppSettings(appDirectory string) *GlobalSettingsFile { log.Debugf("NewApp(%v)\n", appDirectory) os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) diff --git a/extensions/profile_value.go b/extensions/profile_value.go index 5c4dddc..3855807 100644 --- a/extensions/profile_value.go +++ b/extensions/profile_value.go @@ -41,7 +41,7 @@ func (pne ProfileValueExtension) OnContactReceiveValue(profile peer.CwtchPeer, c // OnContactRequestValue for ProfileValueExtension handles returning Public Profile Values func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) { scope, zone, zpath := szp.GetScopeZonePath() - log.Infof("Looking up public | conversation scope/zone %v", szp.ToString()) + log.Debugf("Looking up public | conversation scope/zone %v", szp.ToString()) if scope.IsPublic() || scope.IsConversation() { val, exists := profile.GetScopedZonedAttribute(scope, zone, zpath) diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 970d7c8..80a0346 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -181,7 +181,7 @@ func (om *OverlayMessage) ShouldAutoDL() bool { func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversation int, fileKey string) { if manifestFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", fileKey)); exists { if downloadfilepath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey)); exists { - log.Infof("resuming %s", fileKey) + log.Debugf("resuming %s", fileKey) f.DownloadFile(profile, conversation, downloadfilepath, manifestFilePath, fileKey, files.MaxManifestSize*files.DefaultChunkSize) } else { log.Errorf("found manifest path but not download path for %s", fileKey) @@ -201,7 +201,7 @@ func (f *Functionality) CheckDownloadStatus(profile peer.CwtchPeer, fileKey stri event.TempFile: "", })) } else { - log.Infof("CheckDownloadStatus found .path but not .complete") + log.Debugf("CheckDownloadStatus found .path but not .complete") profile.PublishEvent(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{ event.ProfileOnion: profile.GetOnion(), event.FileKey: fileKey, From a6a196a1c14f7b907b98c0b25e274f5e4e82c722 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 28 Feb 2023 10:13:11 -0800 Subject: [PATCH 12/13] Load App Settings Tests --- testing/cwtch_peer_server_integration_test.go | 4 ++-- .../encryptedstorage/encrypted_storage_integration_test.go | 2 +- testing/filesharing/file_sharing_integration_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 18e92cc..816290d 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -139,7 +139,7 @@ func TestCwtchPeerIntegration(t *testing.T) { const ServerAddr = "nfhxzvzxinripgdh4t2m4xcy3crf6p4cbhectgckuj3idsjsaotgowad" serverKeyBundle, _ := base64.StdEncoding.DecodeString(ServerKeyBundleBase64) - app := app2.NewApp(acn, "./storage", app2.InitApp("./storage")) + app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage")) usr, _ := user.Current() cwtchDir := path.Join(usr.HomeDir, ".cwtch") @@ -412,7 +412,7 @@ func TestCwtchPeerIntegration(t *testing.T) { pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) fmt.Println("") log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ - "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", + "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) diff --git a/testing/encryptedstorage/encrypted_storage_integration_test.go b/testing/encryptedstorage/encrypted_storage_integration_test.go index 225d178..78b4f2d 100644 --- a/testing/encryptedstorage/encrypted_storage_integration_test.go +++ b/testing/encryptedstorage/encrypted_storage_integration_test.go @@ -59,7 +59,7 @@ func TestEncryptedStorage(t *testing.T) { defer acn.Close() acn.WaitTillBootstrapped() - app := app2.NewApp(acn, cwtchDir, app2.InitApp(cwtchDir)) + app := app2.NewApp(acn, cwtchDir, app2.LoadAppSettings(cwtchDir)) app.CreateProfile("alice", "password", true) app.CreateProfile("bob", "password", true) diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index e3cabee..f151cd6 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -97,7 +97,7 @@ func TestFileSharing(t *testing.T) { acn.WaitTillBootstrapped() defer acn.Close() - app := app2.NewApp(acn, "./storage", app2.InitApp("./storage")) + app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage")) usr, _ := user.Current() cwtchDir := path.Join(usr.HomeDir, ".cwtch") From 243b82752255ad2071e1bfbedb4f788685ed2c8c Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 28 Feb 2023 10:33:00 -0800 Subject: [PATCH 13/13] bool -> atomic.Bool to prevent "race condition" --- protocol/connections/engine.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 419a381..959e719 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -50,7 +50,7 @@ type engine struct { authorizations sync.Map // string(onion) => model.Authorization // Block Unknown Contacts - blockUnknownContacts bool + blockUnknownContacts atomic.Bool // Pointer to the Global Event Manager eventManager event.Manager @@ -240,10 +240,10 @@ func (e *engine) eventHandler() { } case event.AllowUnknownPeers: log.Debugf("%v now allows unknown connections", e.identity.Hostname()) - e.blockUnknownContacts = false + e.blockUnknownContacts.Store(false) case event.BlockUnknownPeers: log.Debugf("%v now forbids unknown connections", e.identity.Hostname()) - e.blockUnknownContacts = true + e.blockUnknownContacts.Store(true) case event.ProtocolEngineStartListen: go e.listenFn() case event.ShareManifest: @@ -285,7 +285,7 @@ func (e *engine) isBlocked(onion string) bool { authorization, known := e.authorizations.Load(onion) if !known { // if we block unknown peers we will block this contact - return e.blockUnknownContacts + return e.blockUnknownContacts.Load() } return authorization.(model.Authorization) == model.AuthBlocked } @@ -296,7 +296,7 @@ func (e *engine) isAllowed(onion string) bool { log.Errorf("attempted to lookup authorization of onion not in map...that should never happen") return false } - if e.blockUnknownContacts { + if e.blockUnknownContacts.Load() { return authorization.(model.Authorization) == model.AuthApproved } return authorization.(model.Authorization) != model.AuthBlocked