Browse Source

More File Sharing APIS (StopAllFileShares / GetFileShareInfo / GetSharedFiles)

filesharing-persist
Sarah Jamie Lewis 1 month ago
parent
commit
4d080a2854
  1. 1
      event/common.go
  2. 111
      functionality/filesharing/filesharing_functionality.go
  3. 6
      peer/cwtch_peer.go
  4. 1
      peer/profile_interface.go
  5. 3
      protocol/connections/engine.go
  6. 8
      protocol/files/filesharing_subsystem.go
  7. 4
      testing/filesharing/file_sharing_integration_test.go

1
event/common.go

@ -197,6 +197,7 @@ const (
// File Handling Events
StopFileShare = Type("StopFileShare")
StopAllFileShares = Type("StopAllFileShares")
ShareManifest = Type("ShareManifest")
ManifestSizeReceived = Type("ManifestSizeReceived")
ManifestError = Type("ManifestError")

111
functionality/filesharing/filesharing_functionality.go

@ -89,6 +89,18 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int,
profile.SendScopedZonedGetValToContact(conversationID, attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key))
}
// RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest
func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error {
// check that a manifest exists
manifest, manifestExists := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", filekey))
if manifestExists {
// everything is in order, so reshare this file with the engine
profile.ShareFile(filekey, manifest)
return nil
}
return fmt.Errorf("manifest does not exist for filekey: %v", filekey)
}
// ReShareFiles given a profile we iterate through all existing fileshares and re-share them
// if the time limit has not expired
func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error {
@ -109,34 +121,14 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error {
if len(keyparts) == 3 && keyparts[2] == "ts" {
// fetch the timestamp key
filekey := strings.Join(keyparts[:2], ".")
timestampString, tsExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey))
// assert that the timestamp actually exists
if !tsExists {
log.Errorf("could not find expected timestamp for %v", filekey)
continue
}
// assert this is an actual timestamp
timestamp, err := strconv.Atoi(timestampString)
if err != nil {
log.Errorf("error parsing timestamp for %v: %v", filekey, err)
continue
}
dateShared := time.Unix(int64(timestamp), 0)
if time.Since(dateShared) > time.Hour*24*30 {
log.Debugf("ignored expired file share for %v", filekey)
continue
}
sharedFile, err := f.GetFileShareInfo(profile, filekey)
// If we haven't explicitly stopped sharing the file AND
// If fewer than 30 days have passed since we originally shared this file,
// then attempt to share this file again...
// Then attempt to share this file again...
// TODO: In the future this would be the point to change the timestamp and reshare the file...
manifest, manifestExists := profile.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", filekey))
if manifestExists {
// everything is in order, so reshare this file with the engine
profile.ShareFile(filekey, manifest)
if err == nil && sharedFile.Active {
f.RestartFileShare(profile, filekey)
}
}
}
@ -144,6 +136,31 @@ func (f *Functionality) ReShareFiles(profile peer.CwtchPeer) error {
return nil
}
// GetFileShareInfo returns information related to a known fileshare.
// An error is returned if the data is incomplete
func (f *Functionality) GetFileShareInfo(profile peer.CwtchPeer, filekey string) (*SharedFile, error) {
timestampString, tsExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey))
pathString, pathExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", filekey))
activeString, activeExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey))
if tsExists && pathExists && activeExists {
timestamp, err := strconv.Atoi(timestampString)
if err == nil {
dateShared := time.Unix(int64(timestamp), 0)
expired := time.Since(dateShared) >= time.Hour*24*30
return &SharedFile{
FileKey: filekey,
Path: pathString,
DateShared: dateShared,
Active: !expired && activeString == constants.True,
Expired: expired,
}, nil
}
}
return nil, fmt.Errorf("nonexistant or malformed fileshare %v", filekey)
}
// ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file
// at filepath
func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (string, string, error) {
@ -199,6 +216,50 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (stri
return key, string(wrapperJSON), err
}
// SharedFile encapsulates information about a shared file
// including the file key, file path, the original share date and the
// current sharing status
type SharedFile struct {
// The roothash.nonce identifier derived for this file share
FileKey string
// Path is the OS specific location of the file
Path string
// DateShared is the original datetime the file was shared
DateShared time.Time
// Active is true if the file is currently being shared, false otherwise
Active bool
// Expired is true if the file is not eligible to be shared (because e.g. it has been too long since the file was originally shared,
// or the file no longer exists).
Expired bool
}
// GetSharedFiles returns all file shares associated with a given conversation
func (f *Functionality) GetSharedFiles(profile peer.CwtchPeer, conversationID int) []SharedFile {
sharedFiles := []SharedFile{}
ci, err := profile.GetConversationInfo(conversationID)
if err == nil {
for k := range ci.Attributes {
// when we share a file with a conversation we set a single attribute conversation.filesharing.<filekey>
if strings.HasPrefix(k, "conversation.filesharing") {
parts := strings.SplitN(k, ".", 3)
if len(parts) == 3 {
key := parts[2]
sharedFile, err := f.GetFileShareInfo(profile, key)
if err == nil {
sharedFiles = append(sharedFiles, *sharedFile)
}
}
}
}
}
return sharedFiles
}
// GenerateDownloadPath creates a file path that doesn't currently exist on the filesystem
func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath, manifestPath string) {
// avoid all kina funky shit

6
peer/cwtch_peer.go

@ -1044,12 +1044,18 @@ func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) {
cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest}))
}
// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file
func (cp *cwtchPeer) StopFileShare(fileKey string) {
// set the filekey status to inactive
cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False)
cp.eventBus.Publish(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey}))
}
// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files
func (cp *cwtchPeer) StopAllFileShares() {
cp.eventBus.Publish(event.NewEvent(event.StopAllFileShares, map[event.Field]string{}))
}
// eventHandler process events from other subsystems
func (cp *cwtchPeer) eventHandler() {
for {

1
peer/profile_interface.go

@ -118,6 +118,7 @@ type CwtchPeer interface {
ShareFile(fileKey string, serializedManifest string)
StopFileShare(fileKey string)
StopAllFileShares()
CheckPassword(password string) bool
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
Export(file string) error

3
protocol/connections/engine.go

@ -111,6 +111,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
// File Handling
engine.eventManager.Subscribe(event.ShareManifest, engine.queue)
engine.eventManager.Subscribe(event.StopFileShare, engine.queue)
engine.eventManager.Subscribe(event.StopAllFileShares, engine.queue)
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
@ -220,6 +221,8 @@ func (e *engine) eventHandler() {
e.filesharingSubSystem.ShareFile(ev.Data[event.FileKey], ev.Data[event.SerializedManifest])
case event.StopFileShare:
e.filesharingSubSystem.StopFileShare(ev.Data[event.FileKey])
case event.StopAllFileShares:
e.filesharingSubSystem.StopAllFileShares()
case event.ManifestSizeReceived:
handle := ev.Data[event.Handle]
key := ev.Data[event.FileKey]

8
protocol/files/filesharing_subsystem.go

@ -43,6 +43,14 @@ func (fsss *FileSharingSubSystem) StopFileShare(fileKey string) {
fsss.activeShares.Delete(fileKey)
}
// StopAllFileShares removes all active file shares from consideration
func (fsss *FileSharingSubSystem) StopAllFileShares() {
fsss.activeShares.Range(func(key, value interface{}) bool {
fsss.activeShares.Delete(key)
return true
})
}
// FetchManifest given a file key and knowledge of the manifest size in chunks (obtained via an attribute lookup)
// construct a request to download the manifest.
func (fsss *FileSharingSubSystem) FetchManifest(fileKey string, manifestSize uint64) model.PeerMessage {

4
testing/filesharing/file_sharing_integration_test.go

@ -131,7 +131,7 @@ func TestFileSharing(t *testing.T) {
filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{constants.FileSharingExperiment: true})
filekey, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice)
_, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice)
alice.SendMessage(1, fileSharingMessage)
if err != nil {
@ -146,7 +146,7 @@ func TestFileSharing(t *testing.T) {
// Test stopping and restarting file shares
t.Logf("Stopping File Share")
alice.StopFileShare(filekey)
alice.StopAllFileShares()
// Allow time for the stop request to filter through Engine
time.Sleep(time.Second * 5)

Loading…
Cancel
Save