From 26c5c11216f8f1265ac68429d2ab957b023cb180 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Thu, 5 Jan 2023 13:52:43 -0800 Subject: [PATCH] Initial Prototype of Event Hooks --- app/app.go | 13 ++ extensions/profile_value.go | 65 ++++++ .../filesharing/filesharing_functionality.go | 131 ++++++++++- model/attr/scope.go | 6 + peer/cwtch_peer.go | 212 ++++++------------ peer/hooks.go | 49 ++++ peer/profile_interface.go | 8 +- .../file_sharing_integration_test.go | 28 ++- 8 files changed, 350 insertions(+), 162 deletions(-) create mode 100644 extensions/profile_value.go create mode 100644 peer/hooks.go diff --git a/app/app.go b/app/app.go index 0327b00..a1231b8 100644 --- a/app/app.go +++ b/app/app.go @@ -3,6 +3,8 @@ package app import ( "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/event" + "cwtch.im/cwtch/extensions" + "cwtch.im/cwtch/functionality/filesharing" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" @@ -145,6 +147,7 @@ func (app *application) CreatePeer(name string, password string, attributes map[ eventBus := event.NewEventManager() app.eventBuses[profile.GetOnion()] = eventBus profile.Init(app.eventBuses[profile.GetOnion()]) + app.registerHooks(profile) app.peers[profile.GetOnion()] = profile for zp, val := range attributes { @@ -242,6 +245,13 @@ func (app *application) LoadProfiles(password string) { } } +func (app *application) registerHooks(profile peer.CwtchPeer) { + // Register Hooks + profile.RegisterHook(extensions.ProfileValueExtension{}) + profile.RegisterHook(filesharing.Functionality{}) + +} + // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true func (app *application) installProfile(profile peer.CwtchPeer) bool { app.appmutex.Lock() @@ -252,8 +262,11 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { eventBus := event.NewEventManager() app.eventBuses[profile.GetOnion()] = eventBus profile.Init(app.eventBuses[profile.GetOnion()]) + app.registerHooks(profile) app.peers[profile.GetOnion()] = profile + app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) return true } diff --git a/extensions/profile_value.go b/extensions/profile_value.go new file mode 100644 index 0000000..86f19fd --- /dev/null +++ b/extensions/profile_value.go @@ -0,0 +1,65 @@ +package extensions + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "git.openprivacy.ca/openprivacy/log" + "strconv" +) + +// ProfileValueExtension implements custom Profile Names over Cwtch +type ProfileValueExtension struct { +} + +func (pne ProfileValueExtension) RegisterEvents() []event.Type { + return nil +} + +func (pne ProfileValueExtension) RegisterExperiments() []string { + return nil +} + +func (pne ProfileValueExtension) OnEvent(event event.Event, profile peer.CwtchPeer) { + // nop +} + +// OnContactReceiveValue for ProfileValueExtension handles saving specific Public Profile Values like Profile Name +func (pne ProfileValueExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) { + // Allow public profile parameters to be added as contact specific attributes... + scope, zone, _ := szp.GetScopeZonePath() + if exists && scope.IsPublic() && zone == attr.ProfileZone { + err := profile.SetConversationAttribute(conversation.ID, szp, value) + if err != nil { + log.Errorf("error setting conversation attribute %v", err) + } + } +} + +// OnContactRequestValue for ProfileValueExtension handles returning Public Profile Values +func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) { + scope, zone, zpath := szp.GetScopeZonePath() + log.Infof("Looking up public | conversation scope/zone %v", szp.ToString()) + if scope.IsPublic() || scope.IsConversation() { + val, exists := profile.GetScopedZonedAttribute(scope, zone, zpath) + + // NOTE: Temporary Override because UI currently wipes names if it can't find them... + if !exists && zone == attr.UnknownZone && zpath == constants.Name { + val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) + } + + // Construct a Response + resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)}) + resp.EventID = eventID + if exists { + resp.Data[event.Data] = val + } else { + resp.Data[event.Data] = "" + } + + log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val) + profile.PublishEvent(resp) + } +} diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index bc14e5b..9e1ddc8 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -2,12 +2,14 @@ package filesharing import ( "crypto/rand" + "cwtch.im/cwtch/event" "encoding/hex" "encoding/json" "errors" "fmt" "io" "math" + "math/bits" "os" path "path/filepath" "regexp" @@ -28,6 +30,101 @@ import ( type Functionality struct { } +func (f Functionality) RegisterEvents() []event.Type { + return []event.Type{event.ManifestReceived, event.FileDownloaded} +} + +func (f Functionality) RegisterExperiments() []string { + return []string{constants.FileSharingExperiment} +} + +// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded +func (f Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { + switch ev.EventType { + case event.ManifestReceived: + log.Debugf("Manifest Received Event!: %v", ev) + handle := ev.Data[event.Handle] + fileKey := ev.Data[event.FileKey] + serializedManifest := ev.Data[event.SerializedManifest] + + manifestFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey)) + if exists { + downloadFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey)) + if exists { + log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath) + var manifest files.Manifest + err := json.Unmarshal([]byte(serializedManifest), &manifest) + + if err == nil { + // We only need to check the file size here, as manifest is sent to engine and the file created + // will be bound to the size advertised in manifest. + fileSizeLimitValue, fileSizeLimitExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey)) + if fileSizeLimitExists { + fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize) + if err == nil { + if manifest.FileSizeInBytes >= fileSizeLimit { + log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue) + } else { + manifest.Title = manifest.FileName + manifest.FileName = downloadFilePath + log.Debugf("saving manifest") + err = manifest.Save(manifestFilePath) + if err != nil { + log.Errorf("could not save manifest: %v", err) + } else { + tempFile := "" + if runtime.GOOS == "android" { + tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")] + log.Debugf("derived android temp path: %v", tempFile) + } + profile.PublishEvent(event.NewEvent(event.ManifestSaved, map[event.Field]string{ + event.FileKey: fileKey, + event.Handle: handle, + event.SerializedManifest: string(manifest.Serialize()), + event.TempFile: tempFile, + event.NameSuggestion: manifest.Title, + })) + } + } + } + } + } else { + log.Errorf("error saving manifest: %v", err) + } + } else { + log.Errorf("found manifest path but not download path for %v", fileKey) + } + } else { + log.Errorf("no download path found for manifest: %v", fileKey) + } + case event.FileDownloaded: + fileKey := ev.Data[event.FileKey] + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true") + } +} + +func (f Functionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) { + // nop +} + +func (f Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) { + scope, zone, zpath := path.GetScopeZonePath() + log.Infof("file sharing contact receive value") + if exists && scope.IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(zpath, ".manifest.size") { + fileKey := strings.Replace(zpath, ".manifest.size", "", 1) + size, err := strconv.Atoi(value) + // if size is valid and below the maximum size for a manifest + // this is to prevent malicious sharers from using large amounts of memory when distributing + // a manifest as we reconstruct this in-memory + if err == nil && size < files.MaxManifestSize { + profile.PublishEvent(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: value, event.Handle: conversation.Handle})) + } else { + profile.PublishEvent(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: conversation.Handle})) + } + } + +} + // FunctionalityGate returns filesharing if enabled in the given experiment map // Note: Experiment maps are currently in libcwtch-go func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) { @@ -109,6 +206,23 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int, return nil } +// startFileShare is a private method used to finalize a file share and publish it to the protocol engine for processing. +func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, manifest string) error { + tsStr, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey)) + if exists { + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil || ts < time.Now().Unix()-2592000 { + log.Errorf("ignoring request to download a file offered more than 30 days ago") + return err + } + } + + // set the filekey status to active + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey), constants.True) + profile.PublishEvent(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: filekey, event.SerializedManifest: manifest})) + return nil +} + // RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error { // check that a manifest exists @@ -116,8 +230,7 @@ func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) if manifestExists { // everything is in order, so reshare this file with the engine log.Debugf("restarting file share: %v", filekey) - profile.ShareFile(filekey, manifest) - return nil + return f.startFileShare(profile, filekey, manifest) } return fmt.Errorf("manifest does not exist for filekey: %v", filekey) } @@ -238,7 +351,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (stri profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), string(serializedManifest)) profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize))))) - profile.ShareFile(key, string(serializedManifest)) + err = f.startFileShare(profile, key, string(serializedManifest)) return key, string(wrapperJSON), err } @@ -330,3 +443,15 @@ func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath, } return } + +// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file +func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) { + // set the filekey status to inactive + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False) + profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey})) +} + +// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files +func (f *Functionality) StopAllFileShares(profile peer.CwtchPeer) { + profile.PublishEvent(event.NewEvent(event.StopAllFileShares, map[event.Field]string{})) +} diff --git a/model/attr/scope.go b/model/attr/scope.go index 406d1a9..1e6810c 100644 --- a/model/attr/scope.go +++ b/model/attr/scope.go @@ -23,6 +23,12 @@ type Scope string // ScopedZonedPath typed path with a scope and a zone type ScopedZonedPath string +func (szp ScopedZonedPath) GetScopeZonePath() (Scope, Zone, string) { + scope, path := ParseScope(string(szp)) + zone, zpath := ParseZone(path) + return scope, zone, zpath +} + // scopes for attributes const ( // on a peer, local and peer supplied data diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 6f911ee..22dc43b 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -14,10 +14,8 @@ import ( "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" - "math/bits" "os" path "path/filepath" - "runtime" "sort" "strconv" "strings" @@ -28,7 +26,6 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" - "cwtch.im/cwtch/protocol/files" "git.openprivacy.ca/openprivacy/log" ) @@ -38,8 +35,7 @@ const lastReceivedSignature = "LastReceivedSignature" var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, - event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, - event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true} + event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, event.TriggerAntispamCheck: true} // DefaultEventsToHandle specifies which events will be subscribed to // @@ -58,8 +54,6 @@ var DefaultEventsToHandle = []event.Type{ event.ServerStateChange, event.SendMessageToPeerError, event.NewRetValMessageFromPeer, - event.ManifestReceived, - event.FileDownloaded, event.TriggerAntispamCheck, } @@ -74,6 +68,25 @@ type cwtchPeer struct { queue event.Queue eventBus event.Manager + + extensions []ProfileHook + extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions +} + +func (cp *cwtchPeer) PublishEvent(resp event.Event) { + cp.eventBus.Publish(resp) +} + +func (cp *cwtchPeer) RegisterHook(extension ProfileHooks) { + cp.extensionLock.Lock() + defer cp.extensionLock.Unlock() + + // Register Requested Events + for _, event := range extension.RegisterEvents() { + cp.eventBus.Subscribe(event, cp.queue) + } + + cp.extensions = append(cp.extensions, ConstructHook(extension)) } func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) { @@ -1165,33 +1178,6 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message)) } -// ShareFile begins hosting the given serialized manifest -func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) { - tsStr, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", fileKey)) - if exists { - ts, err := strconv.ParseInt(tsStr, 10, 64) - if err != nil || ts < time.Now().Unix()-2592000 { - log.Errorf("ignoring request to download a file offered more than 30 days ago") - return - } - } - // set the filekey status to active - cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.True) - cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest})) -} - -// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file -func (cp *cwtchPeer) StopFileShare(fileKey string) { - // set the filekey status to inactive - cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False) - cp.eventBus.Publish(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey})) -} - -// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files -func (cp *cwtchPeer) StopAllFileShares() { - cp.eventBus.Publish(event.NewEvent(event.StopAllFileShares, map[event.Field]string{})) -} - // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { @@ -1288,128 +1274,48 @@ func (cp *cwtchPeer) eventHandler() { path := ev.Data[event.Path] log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, path, onion) - conversationInfo, err := cp.FetchConversationInfo(onion) + log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err) + // only accepted contacts can look up information if conversationInfo != nil && conversationInfo.Accepted { - scope := attr.IntoScope(scope) - if scope.IsPublic() || scope.IsConversation() { - zone, zpath := attr.ParseZone(path) - val, exists := cp.GetScopedZonedAttribute(scope, zone, zpath) + // Type Safe Scoped/Zoned Path + zscope := attr.IntoScope(scope) + zone, zpath := attr.ParseZone(path) + scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath)) - // NOTE: Temporary Override because UI currently wipes names if it can't find them... - if !exists && zone == attr.UnknownZone && path == constants.Name { - val, exists = cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) - } - - resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)}) - resp.EventID = ev.EventID - if exists { - resp.Data[event.Data] = val - } else { - resp.Data[event.Data] = "" - } - log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val) - - cp.eventBus.Publish(resp) + // Safe Access to Extensions + cp.extensionLock.Lock() + log.Infof("checking extension...%v", cp.extensions) + for _, extension := range cp.extensions { + log.Infof("checking extension...%v", extension) + extension.extension.OnContactRequestValue(cp, *conversationInfo, ev.EventID, scopedZonedPath) } + cp.extensionLock.Unlock() + } - - case event.ManifestReceived: - log.Debugf("Manifest Received Event!: %v", ev) - handle := ev.Data[event.Handle] - fileKey := ev.Data[event.FileKey] - serializedManifest := ev.Data[event.SerializedManifest] - - manifestFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey)) - if exists { - downloadFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey)) - if exists { - log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath) - var manifest files.Manifest - err := json.Unmarshal([]byte(serializedManifest), &manifest) - - if err == nil { - // We only need to check the file size here, as manifest is sent to engine and the file created - // will be bound to the size advertised in manifest. - fileSizeLimitValue, fileSizeLimitExists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey)) - if fileSizeLimitExists { - fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize) - if err == nil { - if manifest.FileSizeInBytes >= fileSizeLimit { - log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue) - } else { - manifest.Title = manifest.FileName - manifest.FileName = downloadFilePath - log.Debugf("saving manifest") - err = manifest.Save(manifestFilePath) - if err != nil { - log.Errorf("could not save manifest: %v", err) - } else { - tempFile := "" - if runtime.GOOS == "android" { - tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")] - log.Debugf("derived android temp path: %v", tempFile) - } - cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{ - event.FileKey: fileKey, - event.Handle: handle, - event.SerializedManifest: string(manifest.Serialize()), - event.TempFile: tempFile, - event.NameSuggestion: manifest.Title, - })) - } - } - } - } - } else { - log.Errorf("error saving manifest: %v", err) - } - } else { - log.Errorf("found manifest path but not download path for %v", fileKey) - } - } else { - log.Errorf("no download path found for manifest: %v", fileKey) - } - case event.FileDownloaded: - fileKey := ev.Data[event.FileKey] - cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true") case event.NewRetValMessageFromPeer: - onion := ev.Data[event.RemotePeer] + handle := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] path := ev.Data[event.Path] val := ev.Data[event.Data] exists, _ := strconv.ParseBool(ev.Data[event.Exists]) - log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", onion, scope, path, exists, val) - if exists { + log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", handle, scope, path, exists, val) - // Handle File Sharing Metadata - // TODO This probably should be broken out to it's own code.. - zone, path := attr.ParseZone(path) - if attr.Scope(scope).IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(path, ".manifest.size") { - fileKey := strings.Replace(path, ".manifest.size", "", 1) - size, err := strconv.Atoi(val) - // if size is valid and below the maximum size for a manifest - // this is to prevent malicious sharers from using large amounts of memory when distributing - // a manifest as we reconstruct this in-memory - if err == nil && size < files.MaxManifestSize { - cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion})) - } else { - cp.eventBus.Publish(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: onion})) - } - } + conversationInfo, _ := cp.FetchConversationInfo(handle) + // only accepted contacts can look up information + if conversationInfo != nil && conversationInfo.Accepted { + // Type Safe Scoped/Zoned Path + zscope := attr.IntoScope(scope) + zone, zpath := attr.ParseZone(path) + scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath)) - // Allow public profile parameters to be added as peer specific attributes... - if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone { - ci, err := cp.FetchConversationInfo(onion) - log.Debugf("fetch conversation info %v %v", ci, err) - if ci != nil && err == nil { - err := cp.SetConversationAttribute(ci.ID, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val) - if err != nil { - log.Errorf("error setting conversation attribute %v", err) - } - } + // Safe Access to Extensions + cp.extensionLock.Lock() + for _, extension := range cp.extensions { + extension.extension.OnContactReceiveValue(cp, *conversationInfo, scopedZonedPath, val, exists) } + cp.extensionLock.Unlock() } case event.PeerStateChange: handle := ev.Data[event.RemotePeer] @@ -1491,10 +1397,26 @@ func (cp *cwtchPeer) eventHandler() { } } default: - if ev.EventType != "" { - log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) + // invalid event, signifies shutdown + if ev.EventType == "" { + return + } + + // Otherwise, obtain Safe Access to Extensions + processed := false + cp.extensionLock.Lock() + for _, extension := range cp.extensions { + // if the extension is registered for this event type then process + if _, contains := extension.events[ev.EventType]; contains { + extension.extension.OnEvent(ev, cp) + processed = true + } + } + cp.extensionLock.Unlock() + + if !processed { + log.Errorf("cwtch profile received an event that it (or an extension) was unable to handle. this is very likely a programming error: %v", ev.EventType) } - return } } } diff --git a/peer/hooks.go b/peer/hooks.go new file mode 100644 index 0000000..d061c27 --- /dev/null +++ b/peer/hooks.go @@ -0,0 +1,49 @@ +package peer + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" +) + +type ProfileHooks interface { + + // RegisterEvents returns a set of events that the extension is interested hooking + RegisterEvents() []event.Type + + // RegisterExperiments RegisterExperiments returns a set of experiments that the extension is interested in being notified about + RegisterExperiments() []string + + // OnEvent is called whenever an event Registered with RegisterEvents is called + OnEvent(event event.Event, profile CwtchPeer) + + // OnContactRequestValue is Hooked when a contact sends a request for the given path + OnContactRequestValue(profile CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) + + // OnContactReceiveValue is Hooked after a profile receives a response to a Get/Val Request + OnContactReceiveValue(profile CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) +} + +type ProfileHook struct { + extension ProfileHooks + events map[event.Type]bool + experiments map[string]bool +} + +func ConstructHook(extension ProfileHooks) ProfileHook { + events := make(map[event.Type]bool) + for _, event := range extension.RegisterEvents() { + events[event] = true + } + + experiments := make(map[string]bool) + for _, experiment := range extension.RegisterExperiments() { + experiments[experiment] = true + } + + return ProfileHook{ + extension, + events, + experiments, + } +} diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 493bc3e..c9fa835 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -121,12 +121,6 @@ type CwtchPeer interface { GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error - // File Sharing APIS - // TODO move these to feature protected interfaces - ShareFile(fileKey string, serializedManifest string) - StopFileShare(fileKey string) - StopAllFileShares() - // Server Token APIS // TODO move these to feature protected interfaces StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) @@ -136,4 +130,6 @@ type CwtchPeer interface { ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error Export(file string) error Delete() + PublishEvent(resp event.Event) + RegisterHook(hook ProfileHooks) } diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index b266847..2574be0 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -18,6 +18,7 @@ import ( "fmt" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" + "path/filepath" // Import SQL Cipher mrand "math/rand" @@ -56,7 +57,7 @@ func TestFileSharing(t *testing.T) { os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - log.SetLevel(log.LevelInfo) + log.SetLevel(log.LevelDebug) os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") @@ -74,13 +75,22 @@ func TestFileSharing(t *testing.T) { panic(err) } + useCache := os.Getenv("TORCACHE") == "true" + torDataDir := "" - if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { - t.Fatalf("could not create data dir") + if useCache { + log.Infof("using tor cache") + torDataDir = filepath.Join(dataDir, "data-dir-torcache") + os.MkdirAll(torDataDir, 0700) + } else { + log.Infof("using clean tor data dir") + if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { + t.Fatalf("could not create data dir") + } } tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") - acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) if err != nil { t.Fatalf("Could not start Tor: %v", err) } @@ -125,6 +135,7 @@ func TestFileSharing(t *testing.T) { t.Logf("Waiting for alice and Bob to peer...") waitForPeerPeerConnection(t, alice, bob) + alice.AcceptConversation(1) t.Logf("Alice and Bob are Connected!!") @@ -145,7 +156,7 @@ func TestFileSharing(t *testing.T) { // Test stopping and restarting file shares t.Logf("Stopping File Share") - alice.StopAllFileShares() + filesharingFunctionality.StopAllFileShares(alice) // Allow time for the stop request to filter through Engine time.Sleep(time.Second * 5) @@ -181,6 +192,7 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") + bob.AcceptConversation(1) message, _, err := bob.GetChannelMessage(1, 0, 1) if err != nil { t.Fatalf("could not find file sharing message: %v", err) @@ -194,19 +206,19 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay) if err == nil { - + t.Logf("bob attempting to download file with invalid download") // try downloading with invalid download dir err = filesharingFunctionality.DownloadFile(bob, 1, "/do/not/download/this/file/cwtch.out.png", "./cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes) if err == nil { t.Fatalf("should not download file with invalid download dir") } - + t.Logf("bob attempting to download file with invalid manifest") // try downloading with invalid manifest dir err = filesharingFunctionality.DownloadFile(bob, 1, "./cwtch.out.png", "/do/not/download/this/file/cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes) if err == nil { t.Fatalf("should not download file with invalid manifest dir") } - + t.Logf("bob attempting to download file") err = filesharingFunctionality.DownloadFile(bob, 1, "./cwtch.out.png", "./cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes) if err != nil { t.Fatalf("could not download file: %v", err)