Merge branch 'filesharing' into origfile5
continuous-integration/drone/pr Build is pending Details
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2021-09-02 12:57:48 -07:00
commit 603632b206
20 changed files with 376 additions and 238 deletions

View File

@ -30,6 +30,9 @@ Development and Contributing information in [CONTRIBUTING.md](https://git.openpr
## Running Cwtch
### Server
#### Docker
### NOTE: The following section is out of date. The new Cwtch server is available from https://git.openprivacy.ca/cwtch.im/server, but there is no current docker container for it.
This repository contains a `Dockerfile` allowing you to build and run the server as a [docker](https://www.docker.com/) container.
To get started issue `docker build -t openpriv/cwtch-server:latest .`, this will create 2 temporary docker containers, one to build the Tor daemon and one to build Cwtch. The compiled binaries will then be bundled into a new image and tagged as `openpriv/cwtch-server:latest`.

View File

@ -250,12 +250,12 @@ const (
MessageCounterResync = Type("MessageCounterResync")
// File Handling Events
ShareManifest = Type("ShareManifest")
ManifestSizeReceived = Type("ManifestSizeReceived")
ManifestReceived = Type("ManifestReceived")
ManifestSaved = Type("ManifestSaved")
ShareManifest = Type("ShareManifest")
ManifestSizeReceived = Type("ManifestSizeReceived")
ManifestReceived = Type("ManifestReceived")
ManifestSaved = Type("ManifestSaved")
FileDownloadProgressUpdate = Type("FileDownloadProgressUpdate")
FileDownloaded = Type("FileDownloaded")
FileDownloaded = Type("FileDownloaded")
)
// Field defines common event attributes
@ -321,12 +321,10 @@ const (
Source = Field("Source")
FileKey = Field("FileKey")
FileSizeInChunks = Field("FileSizeInChunks")
ManifestSize = Field("ManifestSize")
FileKey = Field("FileKey")
FileSizeInChunks = Field("FileSizeInChunks")
ManifestSize = Field("ManifestSize")
SerializedManifest = Field("SerializedManifest")
)
// Defining Common errors
@ -343,15 +341,15 @@ const (
// Defining Protocol Contexts
const (
ContextAck = "im.cwtch.acknowledgement"
ContextInvite = "im.cwtch.invite"
ContextRaw = "im.cwtch.raw"
ContextGetVal = "im.cwtch.getVal"
ContextRetVal = "im.cwtch.retVal"
ContextAck = "im.cwtch.acknowledgement"
ContextInvite = "im.cwtch.invite"
ContextRaw = "im.cwtch.raw"
ContextGetVal = "im.cwtch.getVal"
ContextRetVal = "im.cwtch.retVal"
ContextRequestManifest = "im.cwtch.file.request.manifest"
ContextSendManifest = "im.cwtch.file.send.manifest"
ContextRequestFile = "im.cwtch.file.request.chunk"
ContextSendFile = "im.cwtch.file.send.chunk"
ContextSendManifest = "im.cwtch.file.send.manifest"
ContextRequestFile = "im.cwtch.file.request.chunk"
ContextSendFile = "im.cwtch.file.send.chunk"
)
// Define Default Attribute Keys

View File

@ -20,7 +20,6 @@ import (
type Functionality struct {
}
// FunctionalityGate returns contact.Functionality always
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
if experimentMap["filesharing"] == true {
@ -29,20 +28,26 @@ func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
return nil, errors.New("filesharing is not enabled")
}
// OverlayMessage presents the canonical format of the File Sharing functionality Overlay Message
// This is the format that the UI will parse to display the message
type OverlayMessage struct {
Name string `json:"f"`
Hash []byte `json:"h"`
Name string `json:"f"`
Hash []byte `json:"h"`
Nonce []byte `json:"n"`
Size uint64 `json:"s"`
}
// DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process
// to downloadFilePath
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, key string) {
profile.SetAttribute(attr.GetLocalScope(key), downloadFilePath)
profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key))
}
func (f *Functionality) SendFile(filepath string, profile peer.CwtchPeer, handle string) error {
manifest,err := files.CreateManifest(filepath)
// ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file
// at filepath
func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handle string) error {
manifest, err := files.CreateManifest(filepath)
if err != nil {
return err
}
@ -54,29 +59,29 @@ func (f *Functionality) SendFile(filepath string, profile peer.CwtchPeer, handle
}
message := OverlayMessage{
Name: path.Base(manifest.FileName),
Hash: manifest.RootHash,
Name: path.Base(manifest.FileName),
Hash: manifest.RootHash,
Nonce: nonce[:],
Size: manifest.FileSizeInBytes,
Size: manifest.FileSizeInBytes,
}
data,_ := json.Marshal(message)
data, _ := json.Marshal(message)
wrapper := model.MessageWrapper{
Overlay: model.OverlayFileSharing,
Data: string(data),
Data: string(data),
}
wrapperJson,_ := json.Marshal(wrapper)
key := fmt.Sprintf("%x.%x", manifest.RootHash, nonce)
serializedManifest,_ := json.Marshal(manifest)
wrapperJSON, _ := json.Marshal(wrapper)
key := fmt.Sprintf("%x.%x", manifest.RootHash, nonce)
serializedManifest, _ := json.Marshal(manifest)
profile.ShareFile(key, string(serializedManifest))
// Store the size of the manifest (in chunks) as part of the public scope so contacts who we share the file with
// can fetch the manifest as if it were a file.
profile.SetAttribute(attr.GetPublicScope(fmt.Sprintf("%s.manifest.size", key)), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest))/float64(files.DefaultChunkSize)))))
profile.SendMessageToPeer(handle, string(wrapperJson))
profile.SendMessageToPeer(handle, string(wrapperJSON))
return nil
}
}

View File

@ -1,11 +1,19 @@
package model
// MessageWrapper is the canonical Cwtch overlay wrapper
type MessageWrapper struct {
Overlay int `json:"o"`
Data string `json:"d"`
Overlay int `json:"o"`
Data string `json:"d"`
}
// OverlayChat is the canonical identifier for chat overlays
const OverlayChat = 1
// OverlayInviteContact is the canonical identifier for the contact invite overlay
const OverlayInviteContact = 100
// OverlayInviteGroup is the canonical identifier for the group invite overlay
const OverlayInviteGroup = 101
const OverlayFileSharing = 200
// OverlayFileSharing is the canonical identifier for the file sharing overlay
const OverlayFileSharing = 200

View File

@ -194,7 +194,6 @@ type CwtchPeer interface {
SendMessagesToGroup
ShareFile(fileKey string, serializedManifest string)
}
// NewCwtchPeer creates and returns a new cwtchPeer with the given name.
@ -708,8 +707,8 @@ func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Tim
cp.mutex.Unlock()
}
func (cp * cwtchPeer) ShareFile(fileKey string, serializedManifest string) {
cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string {event.FileKey: fileKey, event.SerializedManifest: serializedManifest}))
func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) {
cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest}))
}
// eventHandler process events from other subsystems
@ -811,11 +810,11 @@ func (cp *cwtchPeer) eventHandler() {
fileKey := ev.Data[event.FileKey]
serializedManifest := ev.Data[event.SerializedManifest]
downloadFilePath,exists := cp.GetAttribute(attr.GetLocalScope(fileKey))
downloadFilePath, exists := cp.GetAttribute(attr.GetLocalScope(fileKey))
if exists {
log.Debugf("downloading file to %v", downloadFilePath)
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest),&manifest)
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err == nil {
manifest.FileName = downloadFilePath
log.Debugf("saving manifest")
@ -823,7 +822,7 @@ func (cp *cwtchPeer) eventHandler() {
if err != nil {
log.Errorf("could not save manifest...")
} else {
cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{event.FileKey: fileKey, event.Handle: handle, event.SerializedManifest: string(manifest.Serialize())} ))
cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{event.FileKey: fileKey, event.Handle: handle, event.SerializedManifest: string(manifest.Serialize())}))
}
} else {
log.Errorf("error saving manifest: %v", err)
@ -843,7 +842,7 @@ func (cp *cwtchPeer) eventHandler() {
if scope == attr.PublicScope {
if strings.HasSuffix(path, ".manifest.size") {
fileKey := strings.Replace(path, ".manifest.size", "", 1)
cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion} ))
cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion}))
} else {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
}

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/files"
"cwtch.im/cwtch/protocol/groups"
model3 "cwtch.im/cwtch/protocol/model"
"encoding/base64"
"encoding/hex"
"encoding/json"
@ -50,9 +51,8 @@ type engine struct {
// Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey
manifests sync.Map // file key to manifests
activeDownloads sync.Map // file key to manifests
// file sharing subsystem is responsible for maintaining active shares and downloads
filesharingSubSystem files.FileSharingSubSystem
shuttingDown bool
}
@ -195,17 +195,19 @@ func (e *engine) eventHandler() {
case event.ProtocolEngineStartListen:
go e.listenFn()
case event.ShareManifest:
e.manifests.Store(ev.Data[event.FileKey], ev.Data[event.SerializedManifest])
e.filesharingSubSystem.ShareFile(ev.Data[event.FileKey], ev.Data[event.SerializedManifest])
case event.ManifestSizeReceived:
handle := ev.Data[event.Handle]
key := ev.Data[event.FileKey]
size,_ := strconv.Atoi(ev.Data[event.ManifestSize])
go e.fetchManifest(handle, key, uint64(size))
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
go e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size)))
case event.ManifestSaved:
handle := ev.Data[event.Handle]
key := ev.Data[event.FileKey]
serializedManifest := ev.Data[event.SerializedManifest]
go e.downloadFile(handle, key, serializedManifest)
for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest) {
go e.sendPeerMessage(handle, message)
}
default:
return
}
@ -461,7 +463,7 @@ func (e *engine) sendMessageToPeer(eventID string, onion string, context string,
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.SendMessage(PeerMessage{eventID, context, message})
peerApp.SendMessage(model3.PeerMessage{ID: eventID, Context: context, Data: message})
return nil
}
return errors.New("failed type assertion conn.App != PeerApp")
@ -551,107 +553,26 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
e.eventManager.Publish(ev)
}
} else if context == event.ContextRequestManifest {
serializedManifest, exists := e.manifests.Load(eventID)
if exists {
serializedManifest := serializedManifest.(string)
for i := 0; i < len(serializedManifest); i += files.DefaultChunkSize {
offset := i * files.DefaultChunkSize
end := (i + 1) * files.DefaultChunkSize
if end > len(serializedManifest) {
end = len(serializedManifest)
}
e.sendManifestChunk(hostname, eventID, uint64(i), []byte(serializedManifest[offset:end]))
}
for _, message := range e.filesharingSubSystem.RequestManifestParts(eventID) {
e.sendPeerMessage(hostname, message)
}
} else if context == event.ContextSendManifest {
log.Debugf("manifest received %v %v %s", eventID, hostname, message)
fileKeyParts := strings.Split(eventID, ".")
if len(fileKeyParts) == 3 {
fileKey := fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
log.Debugf("manifest filekey: %s", fileKey)
manifestPart,err := strconv.Atoi(fileKeyParts[2])
if err == nil {
serializedManifest, exists := e.manifests.Load(fileKey)
if exists {
serializedManifest := serializedManifest.(string)
log.Debugf("loaded manifest")
offset := manifestPart * files.DefaultChunkSize
end := (manifestPart + 1) * files.DefaultChunkSize
log.Debugf("storing manifest part %v %v", offset, end)
serializedManifestBytes := []byte(serializedManifest)
copy(serializedManifestBytes[offset:end], message[:])
if len(message) < files.DefaultChunkSize {
serializedManifestBytes = serializedManifestBytes[0:len(serializedManifestBytes)-(files.DefaultChunkSize-len(message))]
}
serializedManifest = string(serializedManifestBytes)
e.manifests.Store(fileKey, serializedManifest)
log.Debugf("current manifest: [%s]", serializedManifest)
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err == nil && hex.EncodeToString(manifest.RootHash) == fileKeyParts[0] {
log.Debugf("valid manifest received! %x", manifest.RootHash)
// We have a valid manifest
e.eventManager.Publish(event.NewEvent(event.ManifestReceived, map[event.Field]string{event.Handle: hostname, event.FileKey: fileKey, event.SerializedManifest: string(serializedManifest)}))
}
}
}
if fileKey, manifest := e.filesharingSubSystem.ReceiveManifestPart(eventID, message); len(manifest) != 0 {
// We have a valid manifest
e.eventManager.Publish(event.NewEvent(event.ManifestReceived, map[event.Field]string{event.Handle: hostname, event.FileKey: fileKey, event.SerializedManifest: manifest}))
}
} else if context == event.ContextRequestFile {
log.Debugf("chunk request: %v", eventID)
serializedManifest, exists := e.manifests.Load(eventID)
if exists {
serializedManifest := serializedManifest.(string)
log.Debugf("manifest found: %v", serializedManifest)
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err == nil {
chunkSpec, err := files.Deserialize(string(message))
log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, message)
if err == nil {
for _, chunk := range *chunkSpec {
contents, err := manifest.GetChunkBytes(chunk)
if err == nil {
log.Debugf("sending chunk: %v %x", chunk, contents)
e.sendMessageToPeer(fmt.Sprintf("%v.%d", eventID, chunk), hostname, event.ContextSendFile, contents)
}
}
}
}
for _, message := range e.filesharingSubSystem.ProcessChunkRequest(eventID, message) {
e.sendPeerMessage(hostname, message)
}
} else if context == event.ContextSendFile {
fileKeyParts := strings.Split(eventID, ".")
log.Debugf("got chunk for %s", fileKeyParts)
if len(fileKeyParts) == 3 {
fileKey := fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
chunkID, err := strconv.Atoi(fileKeyParts[2])
log.Debugf("got chunk id %d", chunkID)
if err == nil {
manifestI, exists := e.activeDownloads.Load(fileKey)
if exists {
manifest := manifestI.(*files.Manifest)
log.Debugf("found active manifest %v", manifest)
progress,err := manifest.StoreChunk(uint64(chunkID), message)
log.Debugf("attempts to store chunk %v %v", progress, err)
if err == nil{
if int(progress) == len(manifest.Chunks) {
if manifest.VerifyFile() == nil {
manifest.Close()
e.activeDownloads.Delete(fileKey)
log.Debugf("file verified and downloaded!")
e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string {event.FileKey: fileKey} ))
}
}
e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string {event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(len(manifest.Chunks))} ))
} else {
// if a chunk fails to save (possibly because its data hash didn't match the manifest), re-request it
e.sendMessageToPeer(fileKey, hostname, event.ContextRequestFile, []byte(strconv.Itoa(chunkID)))
}
}
fileKey, downloaded, progress, totalChunks, _ := e.filesharingSubSystem.ProcessChunk(eventID, message)
if len(fileKey) != 0 {
if downloaded {
log.Debugf("file verified and downloaded!")
e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey}))
}
e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks))}))
}
} else {
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
@ -685,30 +606,12 @@ func (e *engine) leaveServer(server string) {
}
}
func (e *engine) downloadFile(handle string, key string, serializedManifest string) {
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
func (e *engine) sendPeerMessage(handle string, message model3.PeerMessage) {
conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability)
if err == nil {
err := manifest.PrepareDownload()
if err != nil {
// TODO
return
}
e.activeDownloads.Store(key, &manifest)
log.Debugf("downloading file chunks: %v %v", manifest.GetChunkRequest().Serialize(), manifest)
conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability)
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.SendMessage(PeerMessage{
ID: key,
Context: event.ContextRequestFile,
Data: []byte(manifest.GetChunkRequest().Serialize()),
})
}
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.SendMessage(message)
}
}
}

View File

@ -2,6 +2,7 @@ package connections
import (
"cwtch.im/cwtch/event"
model2 "cwtch.im/cwtch/protocol/model"
"encoding/json"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
@ -27,13 +28,6 @@ type PeerApp struct {
getValRequests sync.Map // [string]string eventID:Data
}
// PeerMessage is an encapsulation that can be used by higher level applications
type PeerMessage struct {
ID string // A unique Message ID (primarily used for acknowledgments)
Context string // A unique context identifier i.e. im.cwtch.chat
Data []byte // The serialized data packet.
}
type peerGetVal struct {
Scope, Path string
}
@ -88,7 +82,7 @@ func (pa *PeerApp) listen() {
pa.OnClose(pa.connection.Hostname())
return
}
var peerMessage PeerMessage
var peerMessage model2.PeerMessage
err := json.Unmarshal(message, &peerMessage)
if err == nil {
switch peerMessage.Context {
@ -106,7 +100,7 @@ func (pa *PeerApp) listen() {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
// Acknowledge the message
pa.SendMessage(PeerMessage{peerMessage.ID, event.ContextAck, []byte{}})
pa.SendMessage(model2.PeerMessage{ID: peerMessage.ID, Context: event.ContextAck, Data: []byte{}})
}
}
} else {
@ -117,7 +111,7 @@ func (pa *PeerApp) listen() {
// SendMessage sends the peer a preformatted message
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
func (pa *PeerApp) SendMessage(message PeerMessage) {
func (pa *PeerApp) SendMessage(message model2.PeerMessage) {
if message.Context == event.ContextGetVal {
pa.getValRequests.Store(message.ID, string(message.Data))
}

View File

@ -7,11 +7,14 @@ import (
"strings"
)
// ChunkSpec is a wrapper around an uncompressed array of chunk identifiers
type ChunkSpec []uint64
// CreateChunkSpec given a full list of chunks with their downloaded status (true for downloaded, false otherwise)
// derives a list of identifiers of chunks that have not been downloaded yet
func CreateChunkSpec(progress []bool) ChunkSpec {
var chunks ChunkSpec
for i,p := range progress {
for i, p := range progress {
if !p {
chunks = append(chunks, uint64(i))
}
@ -19,7 +22,9 @@ func CreateChunkSpec(progress []bool) ChunkSpec {
return chunks
}
func Deserialize(serialized string) (*ChunkSpec,error) {
// Deserialize takes in a compressed chunk spec and returns an uncompressed ChunkSpec or an error
// if the serialized chunk spec has format errors
func Deserialize(serialized string) (*ChunkSpec, error) {
var chunkSpec ChunkSpec
@ -28,21 +33,21 @@ func Deserialize(serialized string) (*ChunkSpec,error) {
}
ranges := strings.Split(serialized, ",")
for _,r := range ranges {
for _, r := range ranges {
parts := strings.Split(r, ":")
if len(parts) == 1 {
single,err := strconv.Atoi(r)
single, err := strconv.Atoi(r)
if err != nil {
return nil, errors.New("invalid chunk spec")
}
chunkSpec = append(chunkSpec, uint64(single))
} else if len(parts) == 2 {
start,err1 := strconv.Atoi(parts[0])
end,err2 := strconv.Atoi(parts[1])
start, err1 := strconv.Atoi(parts[0])
end, err2 := strconv.Atoi(parts[1])
if err1 != nil || err2 != nil {
return nil, errors.New("invalid chunk spec")
}
for i := start; i <=end; i++ {
for i := start; i <= end; i++ {
chunkSpec = append(chunkSpec, uint64(i))
}
} else {
@ -52,24 +57,25 @@ func Deserialize(serialized string) (*ChunkSpec,error) {
return &chunkSpec, nil
}
// Serialize compresses the ChunkSpec into a list of inclusive ranges e.g. 1,2,3,5,6,7 becomes "1:3,5:7"
func (cs ChunkSpec) Serialize() string {
result := ""
i := 0
for {
if i >= len(cs) {
break
}
j := i+1
for ; j < len(cs) && cs[j] == cs[j-1] +1; j++ {}
j := i + 1
for ; j < len(cs) && cs[j] == cs[j-1]+1; j++ {
}
if result != "" {
result += ","
}
if j == i + 1 {
if j == i+1 {
result += fmt.Sprintf("%d", cs[i])
} else {
result += fmt.Sprintf("%d:%d", cs[i], cs[j-1])
@ -78,4 +84,4 @@ func (cs ChunkSpec) Serialize() string {
}
return result
}
}

View File

@ -4,34 +4,34 @@ import "testing"
func TestChunkSpec(t *testing.T) {
var testCases = map[string]ChunkSpec {
"0": CreateChunkSpec([]bool{false}),
"0:10": CreateChunkSpec([]bool{false,false,false,false,false,false,false,false,false,false,false}),
"0:1,3:5,7:9": CreateChunkSpec([]bool{false,false,true,false,false,false,true,false,false,false,true}),
"": CreateChunkSpec([]bool{true,true,true,true,true,true,true,true,true,true,true}),
"2,5,8,10": CreateChunkSpec([]bool{true,true,false,true,true,false,true,true,false,true,false}),
var testCases = map[string]ChunkSpec{
"0": CreateChunkSpec([]bool{false}),
"0:10": CreateChunkSpec([]bool{false, false, false, false, false, false, false, false, false, false, false}),
"0:1,3:5,7:9": CreateChunkSpec([]bool{false, false, true, false, false, false, true, false, false, false, true}),
"": CreateChunkSpec([]bool{true, true, true, true, true, true, true, true, true, true, true}),
"2,5,8,10": CreateChunkSpec([]bool{true, true, false, true, true, false, true, true, false, true, false}),
//
"0,2:10": CreateChunkSpec([]bool{false,true,false,false,false,false,false,false,false,false,false}),
"0:8,10": CreateChunkSpec([]bool{false,false,false,false,false,false,false,false,false,true,false}),
"1:9": CreateChunkSpec([]bool{true,false,false,false,false,false,false,false,false,false,true}),
"0,2:10": CreateChunkSpec([]bool{false, true, false, false, false, false, false, false, false, false, false}),
"0:8,10": CreateChunkSpec([]bool{false, false, false, false, false, false, false, false, false, true, false}),
"1:9": CreateChunkSpec([]bool{true, false, false, false, false, false, false, false, false, false, true}),
}
for k,v := range testCases {
for k, v := range testCases {
if k != v.Serialize() {
t.Fatalf("got %v but expected %v", v.Serialize(),k )
t.Fatalf("got %v but expected %v", v.Serialize(), k)
}
t.Logf("%v == %v", k, v.Serialize())
}
for k,v := range testCases {
for k, v := range testCases {
if cs, err := Deserialize(k); err != nil {
t.Fatalf("error deserialized key: %v %v", k, err )
t.Fatalf("error deserialized key: %v %v", k, err)
} else {
if v.Serialize() != cs.Serialize() {
t.Fatalf("got %v but expected %v", v.Serialize(),cs.Serialize() )
t.Fatalf("got %v but expected %v", v.Serialize(), cs.Serialize())
}
t.Logf("%v == %v", cs.Serialize(), v.Serialize())
}
}
}
}

View File

@ -1 +0,0 @@
{"Chunks":["G3eXqrYVuCSATqG9RsXS/jHpSdp5zp2aO2zFAjO+ZueaVzPTxd33wvFr4zUvHQVDI+VSVV+ldogpVVE1Fyaapg==","glF2hLxOyXwvz/L2WuLrHf0Ed14hsXsilyPMSjDsn5ERuEaCmczg8qkwbpwoYcjJo/fsFJLzRj8m37RP31WTDg==","XQgG1ibuhEZyLfzIS6cw+mqkWQMz1XJTKM6bD9nFduc0/Wxdyv9oqbQhn4RvqcMam8N3x5yIJmFqbsG6nNlZxA==","WwIbGgjwG4iF/w3G1R5BpluHEgNOFXdxO3ZtLdFAbFLhB1HhxK2D8wrnpjWUnVXzynPSnvA/InFlWhaN9zvvsA==","0QGYg1b69T0LCU5IZkt0fBQYxZeFGpbBV3QErMFpENPM0GUVWwRQFc8GA0v6Mnoyl0iAps5eB4DENBiahGIDpQ==","w995uLObqfuZqvxsNG2jr5itl4LUvO8WZzuTOMFLC1vE5+lcdjwE06+42q6C54QE5PWB1wYbN3cVf4cNmC8FQA==","Yf4+mx59iSQJs6UYhelxbVIdFlx1SRgoOhAtjdJh2oLauPGvnCVQVaWN9ylKVPP+PNDoqfpbpKnryQNF0lwX2g=="],"FileName":"cwtch.png","RootHash":"jw7XO7sw20W2p0CxJRyuApRfSOT5kUZNXzYHaFxF3NE2oyXasuX2QpzitxXmArILWxa/dDj7YjX+/pEq3O21/Q==","FileSizeInBytes":51791,"ChunkSizeInBytes":8000}

View File

@ -0,0 +1,201 @@
package files
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/model"
"encoding/hex"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"strconv"
"strings"
"sync"
)
// FileSharingSubSystem encapsulates the functionality necessary to share and download files via Cwtch
//
type FileSharingSubSystem struct {
// for sharing files
activeShares sync.Map // file key to manifest
// for downloading files
prospectiveManifests sync.Map // file key to serialized manifests
activeDownloads sync.Map // file key to manifests
}
// ShareFile given a file key and a serialized manifest, allow the serialized manifest to be downloaded
// by Cwtch profiles in possession of the fileKey
func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest string) {
var manifest Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err != nil {
log.Errorf("could not share file %v", err)
return
}
fsss.activeShares.Store(fileKey, &manifest)
}
// FetchManifest given a file key and knowledge of the manifest size in chunks (obtained via an attribute lookup)
// construct a request to download the manifest.
func (fsss *FileSharingSubSystem) FetchManifest(fileKey string, manifestSize uint64) model.PeerMessage {
fsss.prospectiveManifests.Store(fileKey, strings.Repeat("\"", int(manifestSize*DefaultChunkSize)))
return model.PeerMessage{
Context: event.ContextRequestManifest,
ID: fileKey,
Data: []byte{},
}
}
// CompileChunkRequests takes in a complete serializedManifest and returns a set of chunk request messages
// TODO in the future we will want this to return the handles of contacts to request chunks from
func (fsss *FileSharingSubSystem) CompileChunkRequests(fileKey string, serializedManifest string) []model.PeerMessage {
var manifest Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
var messages []model.PeerMessage
if err == nil {
err := manifest.PrepareDownload()
if err == nil {
fsss.activeDownloads.Store(fileKey, &manifest)
log.Debugf("downloading file chunks: %v", manifest.GetChunkRequest().Serialize())
messages = append(messages, model.PeerMessage{
ID: fileKey,
Context: event.ContextRequestFile,
Data: []byte(manifest.GetChunkRequest().Serialize()),
})
}
}
return messages
}
// RequestManifestParts given a fileKey construct a set of messages representing requests to download various
// parts of the Manifest
func (fsss *FileSharingSubSystem) RequestManifestParts(fileKey string) []model.PeerMessage {
manifestI, exists := fsss.activeShares.Load(fileKey)
var messages []model.PeerMessage
if exists {
manifest := manifestI.(*Manifest)
serializedManifest := manifest.Serialize()
log.Debugf("found serialized manifest: %s", serializedManifest)
for i := 0; i < len(serializedManifest); i += DefaultChunkSize {
offset := i * DefaultChunkSize
end := (i + 1) * DefaultChunkSize
if end > len(serializedManifest) {
end = len(serializedManifest)
}
chunk := serializedManifest[offset:end]
messages = append(messages, model.PeerMessage{
Context: event.ContextSendManifest,
ID: fmt.Sprintf("%s.%d", fileKey, uint64(i)),
Data: chunk,
})
}
}
return messages
}
// ReceiveManifestPart given a manifestKey reconstruct part the manifest from the provided part
func (fsss *FileSharingSubSystem) ReceiveManifestPart(manifestKey string, part []byte) (fileKey string, serializedManifest string) {
fileKeyParts := strings.Split(manifestKey, ".")
if len(fileKeyParts) == 3 {
fileKey = fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
log.Debugf("manifest filekey: %s", fileKey)
manifestPart, err := strconv.Atoi(fileKeyParts[2])
if err == nil {
serializedManifest, exists := fsss.prospectiveManifests.Load(fileKey)
if exists {
serializedManifest := serializedManifest.(string)
log.Debugf("loaded manifest")
offset := manifestPart * DefaultChunkSize
end := (manifestPart + 1) * DefaultChunkSize
log.Debugf("storing manifest part %v %v", offset, end)
serializedManifestBytes := []byte(serializedManifest)
copy(serializedManifestBytes[offset:end], part[:])
if len(part) < DefaultChunkSize {
serializedManifestBytes = serializedManifestBytes[0 : len(serializedManifestBytes)-(DefaultChunkSize-len(part))]
}
serializedManifest = string(serializedManifestBytes)
fsss.prospectiveManifests.Store(fileKey, serializedManifest)
log.Debugf("current manifest: [%s]", serializedManifest)
var manifest Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err == nil && hex.EncodeToString(manifest.RootHash) == fileKeyParts[0] {
log.Debugf("valid manifest received! %x", manifest.RootHash)
return fileKey, serializedManifest
}
}
}
}
return "", ""
}
// ProcessChunkRequest given a fileKey, and a chunk request, compile a set of responses for each requested Chunk
func (fsss *FileSharingSubSystem) ProcessChunkRequest(fileKey string, serializedChunkRequest []byte) []model.PeerMessage {
log.Debugf("chunk request: %v", fileKey)
manifestI, exists := fsss.activeShares.Load(fileKey)
var messages []model.PeerMessage
if exists {
manifest := manifestI.(*Manifest)
log.Debugf("manifest found: %x", manifest.RootHash)
chunkSpec, err := Deserialize(string(serializedChunkRequest))
log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, serializedChunkRequest)
if err == nil {
for _, chunk := range *chunkSpec {
contents, err := manifest.GetChunkBytes(chunk)
if err == nil {
log.Debugf("sending chunk: %v %x", chunk, contents)
messages = append(messages, model.PeerMessage{
ID: fmt.Sprintf("%v.%d", fileKey, chunk),
Context: event.ContextSendFile,
Data: contents,
})
}
}
}
}
return messages
}
// ProcessChunk given a chunk key and a chunk attempt to store and verify the chunk as part of an active download
// If this results in the file download being completed return downloaded = true
// Always return the progress of a matched download if it exists along with the total number of chunks and the
// given chunk ID
// If not such active download exists then return an empty file key and ignore all further processing.
func (fsss *FileSharingSubSystem) ProcessChunk(chunkKey string, chunk []byte) (fileKey string, downloaded bool, progress uint64, totalChunks uint64, chunkID uint64) {
fileKeyParts := strings.Split(chunkKey, ".")
downloaded = false
log.Debugf("got chunk for %s", fileKeyParts)
if len(fileKeyParts) == 3 {
fileKey = fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
derivedChunkID, err := strconv.Atoi(fileKeyParts[2])
if err == nil {
log.Debugf("got chunk id %d", chunkID)
chunkID = uint64(derivedChunkID)
manifestI, exists := fsss.activeDownloads.Load(fileKey)
if exists {
manifest := manifestI.(*Manifest)
log.Debugf("found active manifest %v", manifest)
progress, err = manifest.StoreChunk(uint64(chunkID), chunk)
totalChunks = uint64(len(manifest.Chunks))
log.Debugf("attempts to store chunk %v %v", progress, err)
if err == nil {
if progress == totalChunks {
if manifest.VerifyFile() == nil {
manifest.Close()
fsss.activeDownloads.Delete(fileKey)
log.Debugf("file verified and downloaded!")
downloaded = true
}
}
} else {
// TODO if a chunk fails to save (possibly because its data hash didn't match the manifest), re-request it
}
}
}
}
return
}

View File

@ -28,9 +28,9 @@ type Manifest struct {
ChunkSizeInBytes uint64
chunkComplete []bool
openFd *os.File
progress uint64
lock sync.Mutex
openFd *os.File
progress uint64
lock sync.Mutex
}
// CreateManifest takes in a file path and constructs a file sharing manifest of hashes along with
@ -76,7 +76,7 @@ func CreateManifest(path string) (*Manifest, error) {
RootHash: rootHash.Sum(nil),
ChunkSizeInBytes: DefaultChunkSize,
FileSizeInBytes: fileSizeInBytes,
chunkComplete: make([]bool, len(chunks)),
chunkComplete: make([]bool, len(chunks)),
}, nil
}
@ -194,7 +194,7 @@ func (m *Manifest) StoreChunk(id uint64, contents []byte) (uint64, error) {
if err == nil && m.chunkComplete[id] == false {
m.chunkComplete[id] = true
m.progress += 1
m.progress++
}
return m.progress, err
@ -213,7 +213,8 @@ func (m *Manifest) getFileHandle() error {
return nil
}
func (m * Manifest) GetChunkRequest() ChunkSpec {
// GetChunkRequest returns an uncompressed list of Chunks needed to complete the file described in the manifest
func (m *Manifest) GetChunkRequest() ChunkSpec {
return CreateChunkSpec(m.chunkComplete)
}
@ -282,7 +283,7 @@ func (m *Manifest) PrepareDownload() error {
m.progress = 0
if subtle.ConstantTimeCompare(chunkHash, m.Chunks[chunkI]) == 1 {
m.chunkComplete[chunkI] = true
m.progress += 1
m.progress++
}
chunkI++
}
@ -299,12 +300,13 @@ func (m *Manifest) Close() {
}
}
func (m Manifest) Save(path string) error {
manifestJson,_ := json.Marshal(&m)
return ioutil.WriteFile(path, manifestJson, 0600)
// Save writes a JSON encoded byte array version of the manifest to path
func (m *Manifest) Save(path string) error {
return ioutil.WriteFile(path, m.Serialize(), 0600)
}
func (m * Manifest) Serialize() []byte {
data,_ := json.Marshal(m)
// Serialize returns the manifest as a JSON encoded byte array
func (m *Manifest) Serialize() []byte {
data, _ := json.Marshal(m)
return data
}

View File

@ -9,7 +9,7 @@ import (
)
func TestManifest(t *testing.T) {
manifest, err := CreateManifest("example.txt")
manifest, err := CreateManifest("testdata/example.txt")
if err != nil {
t.Fatalf("manifest create error: %v", err)
}
@ -50,12 +50,12 @@ func TestManifest(t *testing.T) {
}
func TestManifestLarge(t *testing.T) {
manifest, err := CreateManifest("cwtch.png")
manifest, err := CreateManifest("testdata/cwtch.png")
if err != nil {
t.Fatalf("manifest create error: %v", err)
}
if len(manifest.Chunks) != int(math.Ceil(float64(51791)/float64(8000))) {
if len(manifest.Chunks) != int(math.Ceil(float64(51791)/DefaultChunkSize)) {
t.Fatalf("manifest had unexpected Chunks : %v", manifest.Chunks)
}
@ -73,10 +73,10 @@ func TestManifestLarge(t *testing.T) {
t.Logf("%v %s", len(json), json)
// Pretend we downloaded the manifest
ioutil.WriteFile("cwtch.png.manifest", json, 0600)
ioutil.WriteFile("testdata/cwtch.png.manifest", json, 0600)
// Load the manifest from a file
cwtchPngManifest, err := LoadManifest("cwtch.png.manifest")
cwtchPngManifest, err := LoadManifest("testdata/cwtch.png.manifest")
if err != nil {
t.Fatalf("manifest create error: %v", err)
}
@ -89,8 +89,8 @@ func TestManifestLarge(t *testing.T) {
}
// Prepare Download
cwtchPngOutManifest, _ := LoadManifest("cwtch.png.manifest")
cwtchPngOutManifest.FileName = "cwtch.out.png"
cwtchPngOutManifest, _ := LoadManifest("testdata/cwtch.png.manifest")
cwtchPngOutManifest.FileName = "testdata/cwtch.out.png"
defer cwtchPngOutManifest.Close()
err = cwtchPngOutManifest.PrepareDownload()
@ -113,8 +113,6 @@ func TestManifestLarge(t *testing.T) {
t.Fatalf("could not store chunk %v %v", i, err)
}
}
err = cwtchPngOutManifest.VerifyFile()
if err != nil {

View File

Before

Width:  |  Height:  |  Size: 51 KiB

After

Width:  |  Height:  |  Size: 51 KiB

View File

Before

Width:  |  Height:  |  Size: 51 KiB

After

Width:  |  Height:  |  Size: 51 KiB

View File

@ -0,0 +1 @@
{"Chunks":["2614xLRT0mPJYSBejVhB+xfFMSe5sM72qSTDBn3KqmmcK89oIwVmLr+2X5K3Wpl2f1oo3byyU86zdJp//dwTVg==","H2GcT8AZcy65Bbov34aVFGini5c4awpAT9Pcj4qamN56RkHQiLdtAtY4Lo7YlD2XQvfGF7bbfTt15EUcY8G2+Q==","YmvYk+5Ds1JB6K5YYZsNCfaMn7Qciql4iWpTxgHMz2a3uTpibGTQPW/ja2B6x7hplbhyCzkqlPS5XZAM4bVxxA==","1WrybcOYtRmU3QaGv/VEjlRSj9rNvFFrB7pw4E2NvN5rDt2zjMfHTIAT69pVHUPm7oRjYPGX16w0rg9CKv7Eow==","lz5h++lfGOodW+/xwpsfQz3HdTqTZYczGqnpkpoCH/6hfUbGRlnOfWeGpovKR9c6azzJf1ciMjHCsE3T7ZiNKA==","gLf58jjGMM/SXpj/1DDk9qae/EBHuXdZ9lPZXybu0/ZNwsPHjf7hqgcVlKQnx8nfRIuy/LR5WTdGZPAklWRtbg==","ujddGeMM9b7uTIqopSW63aDZYsrvHbXluhcpBUECqr2BBxrxvOK+rHFrk8HKk1JjYWyma9UnTmWGUWj5QHGsoQ==","dYzRAfWCgrxgB82+JcqnXLiwa4tKNDb4PzBXuW5uPKhYIuydL/FOJi1SWGrBP1Es+9UmmPnFaFyy8IXVUOLeMg==","Saubl7U9KePb/4VCm2X3yFrNFoO0au93JFu0By1klmHR+3Vz7ayWX3aCdnSwLY3IBdjcsvZv9xQ0IafdSy6JSw==","enA+R1ufZNlsZrwgJb+HQQtuBLlJM0sCzpJ2lhGdTs2yYMBFpVwNo8/Z/1o4OOqUqVnlF4u/A3L7R1lgDn7VYA==","ywykeEMBsuTgLwTXu/4kQc79JDmv0LVZPo+spV0PhrNBX24PNlm7Fo3n7H83N5UMtSpTW7PtvXOyBFd+m1hxQQ==","RFahJ53i1/LWZuAFoa1ey7/qPjEeVyG6L4UQgXfKdfxUj/XoMqDXKAg8SOfwj3GgSsdTmZJ2xEMRN3BqF9Hu8g==","Fdu2XF7GIWeGejgqiMdKZEYG2lZCVBplX6Y2P8VPziAZVnc9PXvl7scLZ7/n0TkaBqLKOgruKPQyQM3ZrfmSVg=="],"FileName":"testdata/cwtch.png","RootHash":"jw7XO7sw20W2p0CxJRyuApRfSOT5kUZNXzYHaFxF3NE2oyXasuX2QpzitxXmArILWxa/dDj7YjX+/pEq3O21/Q==","FileSizeInBytes":51791,"ChunkSizeInBytes":4096}

View File

@ -0,0 +1,8 @@
package model
// PeerMessage is an encapsulation that can be used by higher level applications
type PeerMessage struct {
ID string // A unique Message ID (primarily used for acknowledgments)
Context string // A unique context identifier i.e. im.cwtch.chat
Data []byte // The serialized data packet.
}

View File

@ -7,7 +7,9 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/files"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
@ -25,13 +27,8 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.AddEverythingFromPattern("connectivity")
log.SetLevel(log.LevelDebug)
log.ExcludeFromPattern("connection/connection")
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("pipeBridge")
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
@ -71,8 +68,11 @@ func TestFileSharing(t *testing.T) {
alice := utils.WaitGetPeer(app, "alice")
bob := utils.WaitGetPeer(app, "bob")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived})
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived})
queueOracle := event.NewQueue()
app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle)
app.LaunchPeers()
@ -88,9 +88,9 @@ func TestFileSharing(t *testing.T) {
fmt.Println("Alice and Bob are Connected!!")
filesharingFunctionality,_ := filesharing.FunctionalityGate(map[string]bool{"filesharing": true})
filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{"filesharing": true})
err = filesharingFunctionality.SendFile("cwtch.png", alice, bob.GetOnion())
err = filesharingFunctionality.ShareFile("cwtch.png", alice, bob.GetOnion())
if err != nil {
t.Fatalf("Error!")
@ -115,6 +115,18 @@ func TestFileSharing(t *testing.T) {
fmt.Printf("Found message from Alice: %v", message.Message)
}
time.Sleep(time.Minute)
// Wait for the file downloaded event
ev := queueOracle.Next()
if ev.EventType != event.FileDownloaded {
t.Fatalf("Expected file download event")
}
manifest, err := files.CreateManifest("cwtch.out.png")
if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" {
t.Fatalf("file hash does not match expected %x: ", manifest.RootHash)
}
app.Shutdown()
acn.Close()
}

View File

@ -9,6 +9,7 @@ go test -race ${1} -coverprofile=storage.v0.cover.out -v ./storage/v0
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
go test -race ${1} -coverprofile=storage.cover.out -v ./storage
go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
go test -race ${1} -coverprofile=peer.filesharing.cover.out -v ./protocol/files
go test -race ${1} -coverprofile=peer.cover.out -v ./peer
echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \
awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out