erinn adding filesharing revisions
This commit is contained in:
parent
f0c4c4a92e
commit
98239ae7ea
|
@ -327,6 +327,8 @@ const (
|
||||||
FileSizeInChunks = Field("FileSizeInChunks")
|
FileSizeInChunks = Field("FileSizeInChunks")
|
||||||
ManifestSize = Field("ManifestSize")
|
ManifestSize = Field("ManifestSize")
|
||||||
SerializedManifest = Field("SerializedManifest")
|
SerializedManifest = Field("SerializedManifest")
|
||||||
|
TempFile = Field("TempFile")
|
||||||
|
FilePath = Field("FilePath")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Defining Common errors
|
// Defining Common errors
|
||||||
|
|
|
@ -42,7 +42,8 @@ type OverlayMessage struct {
|
||||||
// DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process
|
// DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process
|
||||||
// to downloadFilePath
|
// to downloadFilePath
|
||||||
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, manifestFilePath string, key string) {
|
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, manifestFilePath string, key string) {
|
||||||
profile.SetAttribute(attr.GetLocalScope(key), manifestFilePath)
|
profile.SetAttribute(attr.GetLocalScope(fmt.Sprintf("%s.manifest", key)), manifestFilePath)
|
||||||
|
profile.SetAttribute(attr.GetLocalScope(key), downloadFilePath)
|
||||||
profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key))
|
profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module cwtch.im/cwtch
|
||||||
go 1.14
|
go 1.14
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.openprivacy.ca/cwtch.im/tapir v0.4.6
|
git.openprivacy.ca/cwtch.im/tapir v0.4.7
|
||||||
git.openprivacy.ca/openprivacy/connectivity v1.4.5
|
git.openprivacy.ca/openprivacy/connectivity v1.4.5
|
||||||
git.openprivacy.ca/openprivacy/log v1.0.3
|
git.openprivacy.ca/openprivacy/log v1.0.3
|
||||||
github.com/gtank/ristretto255 v0.1.2
|
github.com/gtank/ristretto255 v0.1.2
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -851,7 +852,17 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("could not save manifest: %v", err)
|
log.Errorf("could not save manifest: %v", err)
|
||||||
} else {
|
} else {
|
||||||
cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{event.FileKey: fileKey, event.Handle: handle, event.SerializedManifest: string(manifest.Serialize())}))
|
tempFile := ""
|
||||||
|
if runtime.GOOS == "android" {
|
||||||
|
tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")]
|
||||||
|
log.Debugf("derived android temp path: %v", tempFile)
|
||||||
|
}
|
||||||
|
cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{
|
||||||
|
event.FileKey: fileKey,
|
||||||
|
event.Handle: handle,
|
||||||
|
event.SerializedManifest: string(manifest.Serialize()),
|
||||||
|
event.TempFile: tempFile,
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("error saving manifest: %v", err)
|
log.Errorf("error saving manifest: %v", err)
|
||||||
|
|
|
@ -1,14 +1,18 @@
|
||||||
package connections
|
package connections
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
"cwtch.im/cwtch/model"
|
"cwtch.im/cwtch/model"
|
||||||
"cwtch.im/cwtch/protocol/files"
|
"cwtch.im/cwtch/protocol/files"
|
||||||
"cwtch.im/cwtch/protocol/groups"
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
model3 "cwtch.im/cwtch/protocol/model"
|
model3 "cwtch.im/cwtch/protocol/model"
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"git.openprivacy.ca/cwtch.im/tapir"
|
"git.openprivacy.ca/cwtch.im/tapir"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
||||||
|
@ -18,9 +22,6 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"github.com/gtank/ristretto255"
|
"github.com/gtank/ristretto255"
|
||||||
"golang.org/x/crypto/ed25519"
|
"golang.org/x/crypto/ed25519"
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type engine struct {
|
type engine struct {
|
||||||
|
@ -212,10 +213,11 @@ func (e *engine) eventHandler() {
|
||||||
handle := ev.Data[event.Handle]
|
handle := ev.Data[event.Handle]
|
||||||
key := ev.Data[event.FileKey]
|
key := ev.Data[event.FileKey]
|
||||||
serializedManifest := ev.Data[event.SerializedManifest]
|
serializedManifest := ev.Data[event.SerializedManifest]
|
||||||
|
tempFile := ev.Data[event.TempFile]
|
||||||
// NOTE: for now there will probably only ever be a single chunk request. When we enable group
|
// NOTE: for now there will probably only ever be a single chunk request. When we enable group
|
||||||
// sharing and rehosting then this loop will serve as a a way of splitting the request among multiple
|
// sharing and rehosting then this loop will serve as a a way of splitting the request among multiple
|
||||||
// contacts
|
// contacts
|
||||||
for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest) {
|
for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest, tempFile) {
|
||||||
if err := e.sendPeerMessage(handle, message); err != nil {
|
if err := e.sendPeerMessage(handle, message); err != nil {
|
||||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||||
}
|
}
|
||||||
|
@ -568,9 +570,9 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
|
||||||
if len(fileKey) != 0 {
|
if len(fileKey) != 0 {
|
||||||
e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks))}))
|
e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks))}))
|
||||||
if progress == totalChunks {
|
if progress == totalChunks {
|
||||||
if e.filesharingSubSystem.VerifyFile(fileKey) {
|
if tempFile, filePath, success := e.filesharingSubSystem.VerifyFile(fileKey); success {
|
||||||
log.Debugf("file verified and downloaded!")
|
log.Debugf("file verified and downloaded!")
|
||||||
e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey}))
|
e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: filePath, event.TempFile: tempFile}))
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("file failed to verify!")
|
log.Debugf("file failed to verify!")
|
||||||
e.eventManager.Publish(event.NewEvent(event.FileVerificationFailed, map[event.Field]string{event.FileKey: fileKey}))
|
e.eventManager.Publish(event.NewEvent(event.FileVerificationFailed, map[event.Field]string{event.FileKey: fileKey}))
|
||||||
|
|
|
@ -1,15 +1,16 @@
|
||||||
package files
|
package files
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
"cwtch.im/cwtch/protocol/model"
|
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"cwtch.im/cwtch/event"
|
||||||
|
"cwtch.im/cwtch/protocol/model"
|
||||||
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileSharingSubSystem encapsulates the functionality necessary to share and download files via Cwtch
|
// FileSharingSubSystem encapsulates the functionality necessary to share and download files via Cwtch
|
||||||
|
@ -49,11 +50,12 @@ func (fsss *FileSharingSubSystem) FetchManifest(fileKey string, manifestSize uin
|
||||||
|
|
||||||
// CompileChunkRequests takes in a complete serializedManifest and returns a set of chunk request messages
|
// CompileChunkRequests takes in a complete serializedManifest and returns a set of chunk request messages
|
||||||
// TODO in the future we will want this to return the handles of contacts to request chunks from
|
// TODO in the future we will want this to return the handles of contacts to request chunks from
|
||||||
func (fsss *FileSharingSubSystem) CompileChunkRequests(fileKey string, serializedManifest string) []model.PeerMessage {
|
func (fsss *FileSharingSubSystem) CompileChunkRequests(fileKey, serializedManifest, tempFile string) []model.PeerMessage {
|
||||||
var manifest Manifest
|
var manifest Manifest
|
||||||
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
||||||
var messages []model.PeerMessage
|
var messages []model.PeerMessage
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
manifest.TempFileName = tempFile
|
||||||
err := manifest.PrepareDownload()
|
err := manifest.PrepareDownload()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fsss.activeDownloads.Store(fileKey, &manifest)
|
fsss.activeDownloads.Store(fileKey, &manifest)
|
||||||
|
@ -63,6 +65,8 @@ func (fsss *FileSharingSubSystem) CompileChunkRequests(fileKey string, serialize
|
||||||
Context: event.ContextRequestFile,
|
Context: event.ContextRequestFile,
|
||||||
Data: []byte(manifest.GetChunkRequest().Serialize()),
|
Data: []byte(manifest.GetChunkRequest().Serialize()),
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
log.Errorf("couldn't prepare download: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return messages
|
return messages
|
||||||
|
@ -200,7 +204,8 @@ func (fsss *FileSharingSubSystem) ProcessChunk(chunkKey string, chunk []byte) (f
|
||||||
}
|
}
|
||||||
|
|
||||||
// VerifyFile returns true if the file has been downloaded, false otherwise
|
// VerifyFile returns true if the file has been downloaded, false otherwise
|
||||||
func (fsss *FileSharingSubSystem) VerifyFile(fileKey string) (downloaded bool) {
|
// as well as the temporary filename, if one was used
|
||||||
|
func (fsss *FileSharingSubSystem) VerifyFile(fileKey string) (tempFile string, filePath string, downloaded bool) {
|
||||||
manifestI, exists := fsss.activeDownloads.Load(fileKey)
|
manifestI, exists := fsss.activeDownloads.Load(fileKey)
|
||||||
if exists {
|
if exists {
|
||||||
manifest := manifestI.(*Manifest)
|
manifest := manifestI.(*Manifest)
|
||||||
|
@ -208,8 +213,8 @@ func (fsss *FileSharingSubSystem) VerifyFile(fileKey string) (downloaded bool) {
|
||||||
manifest.Close()
|
manifest.Close()
|
||||||
fsss.activeDownloads.Delete(fileKey)
|
fsss.activeDownloads.Delete(fileKey)
|
||||||
log.Debugf("file verified and downloaded!")
|
log.Debugf("file verified and downloaded!")
|
||||||
return true
|
return manifest.TempFileName, manifest.FileName, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
return "", "", false
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ type Manifest struct {
|
||||||
RootHash []byte
|
RootHash []byte
|
||||||
FileSizeInBytes uint64
|
FileSizeInBytes uint64
|
||||||
ChunkSizeInBytes uint64
|
ChunkSizeInBytes uint64
|
||||||
|
TempFileName string `json:"-"`
|
||||||
|
|
||||||
chunkComplete []bool
|
chunkComplete []bool
|
||||||
openFd *os.File
|
openFd *os.File
|
||||||
|
@ -212,7 +213,11 @@ func (m *Manifest) StoreChunk(id uint64, contents []byte) (uint64, error) {
|
||||||
func (m *Manifest) getFileHandle() error {
|
func (m *Manifest) getFileHandle() error {
|
||||||
// Seek to the chunk in the file
|
// Seek to the chunk in the file
|
||||||
if m.openFd == nil {
|
if m.openFd == nil {
|
||||||
fd, err := os.OpenFile(m.FileName, os.O_RDWR, 0600)
|
useFileName := m.FileName
|
||||||
|
if m.TempFileName != "" {
|
||||||
|
useFileName = m.TempFileName
|
||||||
|
}
|
||||||
|
fd, err := os.OpenFile(useFileName, os.O_RDWR, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -235,7 +240,12 @@ func (m *Manifest) PrepareDownload() error {
|
||||||
m.chunkComplete = make([]bool, len(m.Chunks))
|
m.chunkComplete = make([]bool, len(m.Chunks))
|
||||||
|
|
||||||
if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
|
if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
|
||||||
fd, err := os.Create(m.FileName)
|
useFileName := m.FileName
|
||||||
|
if m.TempFileName != "" {
|
||||||
|
useFileName = m.TempFileName
|
||||||
|
}
|
||||||
|
|
||||||
|
fd, err := os.Create(useFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -260,7 +270,6 @@ func (m *Manifest) PrepareDownload() error {
|
||||||
}
|
}
|
||||||
writer.Flush()
|
writer.Flush()
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue