diff --git a/event/common.go b/event/common.go index e1430f6..1a77804 100644 --- a/event/common.go +++ b/event/common.go @@ -197,6 +197,7 @@ const ( // File Handling Events StopFileShare = Type("StopFileShare") + StopAllFileShares = Type("StopAllFileShares") ShareManifest = Type("ShareManifest") ManifestSizeReceived = Type("ManifestSizeReceived") ManifestError = Type("ManifestError") diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index e27e834..d69f9ab 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -89,6 +89,18 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int, profile.SendScopedZonedGetValToContact(conversationID, attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key)) } +// RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest +func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error { + // check that a manifest exists + manifest, manifestExists := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", filekey)) + if manifestExists { + // everything is in order, so reshare this file with the engine + profile.ShareFile(filekey, manifest) + return nil + } + return fmt.Errorf("manifest does not exist for filekey: %v", filekey) +} + // ReShareFiles given a profile we iterate through all existing fileshares and re-share them // if the time limit has not expired func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error { @@ -109,34 +121,14 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error { if len(keyparts) == 3 && keyparts[2] == "ts" { // fetch the timestamp key filekey := strings.Join(keyparts[:2], ".") - timestampString, tsExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey)) - - // assert that the timestamp actually exists - if !tsExists { - log.Errorf("could not find expected timestamp for %v", filekey) - continue - } - - // assert this is an actual timestamp - timestamp, err := strconv.Atoi(timestampString) - if err != nil { - log.Errorf("error parsing timestamp for %v: %v", filekey, err) - continue - } - - dateShared := time.Unix(int64(timestamp), 0) - if time.Since(dateShared) > time.Hour*24*30 { - log.Debugf("ignored expired file share for %v", filekey) - continue - } + sharedFile, err := f.GetFileShareInfo(profile, filekey) + // If we haven't explicitly stopped sharing the file AND // If fewer than 30 days have passed since we originally shared this file, - // then attempt to share this file again... + // Then attempt to share this file again... // TODO: In the future this would be the point to change the timestamp and reshare the file... - manifest, manifestExists := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", filekey)) - if manifestExists { - // everything is in order, so reshare this file with the engine - profile.ShareFile(filekey, manifest) + if err == nil && sharedFile.Active { + f.RestartFileShare(profile, filekey) } } } @@ -144,6 +136,31 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error { return nil } +// GetFileShareInfo returns information related to a known fileshare. +// An error is returned if the data is incomplete +func (f *Functionality) GetFileShareInfo(profile peer.CwtchPeer, filekey string) (*SharedFile, error) { + timestampString, tsExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey)) + pathString, pathExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", filekey)) + activeString, activeExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey)) + if tsExists && pathExists && activeExists { + timestamp, err := strconv.Atoi(timestampString) + if err == nil { + + dateShared := time.Unix(int64(timestamp), 0) + expired := time.Since(dateShared) >= time.Hour*24*30 + + return &SharedFile{ + FileKey: filekey, + Path: pathString, + DateShared: dateShared, + Active: !expired && activeString == constants.True, + Expired: expired, + }, nil + } + } + return nil, fmt.Errorf("nonexistant or malformed fileshare %v", filekey) +} + // ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file // at filepath func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (string, string, error) { @@ -199,6 +216,50 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (stri return key, string(wrapperJSON), err } +// SharedFile encapsulates information about a shared file +// including the file key, file path, the original share date and the +// current sharing status +type SharedFile struct { + + // The roothash.nonce identifier derived for this file share + FileKey string + + // Path is the OS specific location of the file + Path string + + // DateShared is the original datetime the file was shared + DateShared time.Time + + // Active is true if the file is currently being shared, false otherwise + Active bool + + // Expired is true if the file is not eligible to be shared (because e.g. it has been too long since the file was originally shared, + // or the file no longer exists). + Expired bool +} + +// GetSharedFiles returns all file shares associated with a given conversation +func (f *Functionality) GetSharedFiles(profile peer.CwtchPeer, conversationID int) []SharedFile { + sharedFiles := []SharedFile{} + ci, err := profile.GetConversationInfo(conversationID) + if err == nil { + for k := range ci.Attributes { + // when we share a file with a conversation we set a single attribute conversation.filesharing. + if strings.HasPrefix(k, "conversation.filesharing") { + parts := strings.SplitN(k, ".", 3) + if len(parts) == 3 { + key := parts[2] + sharedFile, err := f.GetFileShareInfo(profile, key) + if err == nil { + sharedFiles = append(sharedFiles, *sharedFile) + } + } + } + } + } + return sharedFiles +} + // GenerateDownloadPath creates a file path that doesn't currently exist on the filesystem func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath, manifestPath string) { // avoid all kina funky shit diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index b1fd470..40ecacf 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1044,12 +1044,18 @@ func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) { cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest})) } +// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file func (cp *cwtchPeer) StopFileShare(fileKey string) { // set the filekey status to inactive cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False) cp.eventBus.Publish(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey})) } +// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files +func (cp *cwtchPeer) StopAllFileShares() { + cp.eventBus.Publish(event.NewEvent(event.StopAllFileShares, map[event.Field]string{})) +} + // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { diff --git a/peer/profile_interface.go b/peer/profile_interface.go index e2b02eb..6b12310 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -118,6 +118,7 @@ type CwtchPeer interface { ShareFile(fileKey string, serializedManifest string) StopFileShare(fileKey string) + StopAllFileShares() CheckPassword(password string) bool ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error Export(file string) error diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index f60a183..afe1af6 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -111,6 +111,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK // File Handling engine.eventManager.Subscribe(event.ShareManifest, engine.queue) engine.eventManager.Subscribe(event.StopFileShare, engine.queue) + engine.eventManager.Subscribe(event.StopAllFileShares, engine.queue) engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) @@ -220,6 +221,8 @@ func (e *engine) eventHandler() { e.filesharingSubSystem.ShareFile(ev.Data[event.FileKey], ev.Data[event.SerializedManifest]) case event.StopFileShare: e.filesharingSubSystem.StopFileShare(ev.Data[event.FileKey]) + case event.StopAllFileShares: + e.filesharingSubSystem.StopAllFileShares() case event.ManifestSizeReceived: handle := ev.Data[event.Handle] key := ev.Data[event.FileKey] diff --git a/protocol/files/filesharing_subsystem.go b/protocol/files/filesharing_subsystem.go index d9704ff..4b85b3d 100644 --- a/protocol/files/filesharing_subsystem.go +++ b/protocol/files/filesharing_subsystem.go @@ -43,6 +43,14 @@ func (fsss *FileSharingSubSystem) StopFileShare(fileKey string) { fsss.activeShares.Delete(fileKey) } +// StopAllFileShares removes all active file shares from consideration +func (fsss *FileSharingSubSystem) StopAllFileShares() { + fsss.activeShares.Range(func(key, value interface{}) bool { + fsss.activeShares.Delete(key) + return true + }) +} + // FetchManifest given a file key and knowledge of the manifest size in chunks (obtained via an attribute lookup) // construct a request to download the manifest. func (fsss *FileSharingSubSystem) FetchManifest(fileKey string, manifestSize uint64) model.PeerMessage { diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index a1deebc..8770d5c 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -131,7 +131,7 @@ func TestFileSharing(t *testing.T) { filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{constants.FileSharingExperiment: true}) - filekey, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice) + _, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice) alice.SendMessage(1, fileSharingMessage) if err != nil { @@ -146,7 +146,7 @@ func TestFileSharing(t *testing.T) { // Test stopping and restarting file shares t.Logf("Stopping File Share") - alice.StopFileShare(filekey) + alice.StopAllFileShares() // Allow time for the stop request to filter through Engine time.Sleep(time.Second * 5)