File Sharing MVP #384

Merged
sarah merged 52 commits from filesharing into master 2021-09-30 00:57:14 +00:00
16 changed files with 1032 additions and 3 deletions
Showing only changes of commit a2e3f6ee18 - Show all commits

View File

@ -248,6 +248,14 @@ const (
// For situations where we want to update $Identity -> $RemotePeer/$GroupID's total message count to be $Data
MessageCounterResync = Type("MessageCounterResync")
// File Handling Events
ShareManifest = Type("ShareManifest")
ManifestSizeReceived = Type("ManifestSizeReceived")
ManifestReceived = Type("ManifestReceived")
ManifestSaved = Type("ManifestSaved")
FileDownloadProgressUpdate = Type("FileDownloadProgressUpdate")
FileDownloaded = Type("FileDownloaded")
)
// Field defines common event attributes
@ -312,6 +320,13 @@ const (
Imported = Field("Imported")
Source = Field("Source")
FileKey = Field("FileKey")
FileSizeInChunks = Field("FileSizeInChunks")
ManifestSize = Field("ManifestSize")
SerializedManifest = Field("SerializedManifest")
)
// Defining Common errors
@ -333,6 +348,10 @@ const (
ContextRaw = "im.cwtch.raw"
ContextGetVal = "im.cwtch.getVal"
ContextRetVal = "im.cwtch.retVal"
ContextRequestManifest = "im.cwtch.file.request.manifest"
ContextSendManifest = "im.cwtch.file.send.manifest"
ContextRequestFile = "im.cwtch.file.request.chunk"
ContextSendFile = "im.cwtch.file.send.chunk"
)
// Define Default Attribute Keys

View File

@ -0,0 +1,82 @@
package filesharing
import (
"crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/files"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"io"
"math"
"path"
"strconv"
)
// Functionality groups some common UI triggered functions for contacts...
type Functionality struct {
}
// FunctionalityGate returns contact.Functionality always
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
if experimentMap["filesharing"] == true {
Review

shouldnt FunctionalityGate stuff be defined a level up in /functionality/ ? not in functionality/fileshareing? so it could be reused in the future.

Also we're moving functionality gate up into cwtch then? makes sense. double cehcking, with the assumption of porting libcwtch-go to this then?

shouldnt FunctionalityGate stuff be defined a level up in /functionality/ ? not in functionality/fileshareing? so it could be reused in the future. Also we're moving functionality gate up into cwtch then? makes sense. double cehcking, with the assumption of porting libcwtch-go to this then?
Review

This is because of golangs (very bad) naming convention. The true name for this struct reads filesharing.FunctionalityGate but if you try to call it FilesharingFunctionalityGate go quality complains because it "stutters".

This is because of golangs (very bad) naming convention. The true name for this struct reads `filesharing.FunctionalityGate` but if you try to call it FilesharingFunctionalityGate go quality complains because it "stutters".
Review

aaaaah i see. This seems like an interface we could be implementing defined in functionality/? i know it's just convention now but something to codify it to clear it up and make further use quicker easier more obvious for anyone else stepping into the code? or maybe over kill?

aaaaah i see. This seems like an interface we could be implementing defined in functionality/? i know it's just convention now but something to codify it to clear it up and make further use quicker easier more obvious for anyone else stepping into the code? or maybe over kill?
return new(Functionality), nil
}
return nil, errors.New("filesharing is not enabled")
}
type OverlayMessage struct {
Name string `json:"f"`
Hash []byte `json:"h"`
Nonce []byte `json:"n"`
Size uint64 `json:"s"`
}
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, key string) {
profile.SetAttribute(attr.GetLocalScope(key), downloadFilePath)
profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key))
}
func (f *Functionality) SendFile(filepath string, profile peer.CwtchPeer, handle string) error {
manifest,err := files.CreateManifest(filepath)
if err != nil {
return err
}
var nonce [24]byte
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
log.Errorf("Cannot read from random: %v\n", err)
return err
}
message := OverlayMessage{
Name: path.Base(manifest.FileName),
Hash: manifest.RootHash,
Nonce: nonce[:],
Size: manifest.FileSizeInBytes,
}
data,_ := json.Marshal(message)
wrapper := model.MessageWrapper{
Overlay: model.OverlayFileSharing,
Data: string(data),
}
wrapperJson,_ := json.Marshal(wrapper)
key := fmt.Sprintf("%x.%x", manifest.RootHash, nonce)
serializedManifest,_ := json.Marshal(manifest)
profile.ShareFile(key, string(serializedManifest))
// Store the size of the manifest (in chunks) as part of the public scope so contacts who we share the file with
// can fetch the manifest as if it were a file.
profile.SetAttribute(attr.GetPublicScope(fmt.Sprintf("%s.manifest.size", key)), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest))/float64(files.DefaultChunkSize)))))
profile.SendMessageToPeer(handle, string(wrapperJson))
return nil
}

11
model/overlay.go Normal file
View File

@ -0,0 +1,11 @@
package model
type MessageWrapper struct {
Overlay int `json:"o"`
dan marked this conversation as resolved
Review

do we want to more formally move this into Cwtch core and expose it a bit more, and have cwtch-ui use that directly reather than dupping it there?

do we want to more formally move this into Cwtch core and expose it a bit more, and have cwtch-ui use that directly reather than dupping it there?
Review

Yes, that is why it is now in model.

Yes, that is why it is now in model.
Data string `json:"d"`
}
const OverlayChat = 1
const OverlayInviteContact = 100
const OverlayInviteGroup = 101
const OverlayFileSharing = 200

View File

@ -5,10 +5,12 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/protocol/files"
"encoding/base32"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"strconv"
"strings"
@ -21,7 +23,8 @@ const lastKnownSignature = "LastKnowSignature"
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true}
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
event.ManifestSizeReceived: true, event.ManifestReceived: true}
// DefaultEventsToHandle specifies which events will be subscribed to
// when a peer has its Init() function called
@ -189,6 +192,9 @@ type CwtchPeer interface {
SendMessages
ModifyMessages
SendMessagesToGroup
ShareFile(fileKey string, serializedManifest string)
}
// NewCwtchPeer creates and returns a new cwtchPeer with the given name.
@ -702,6 +708,10 @@ func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Tim
cp.mutex.Unlock()
}
func (cp * cwtchPeer) ShareFile(fileKey string, serializedManifest string) {
cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string {event.FileKey: fileKey, event.SerializedManifest: serializedManifest}))
}
// eventHandler process events from other subsystems
func (cp *cwtchPeer) eventHandler() {
for {
@ -795,6 +805,33 @@ func (cp *cwtchPeer) eventHandler() {
/***** Non default but requestable handlable events *****/
case event.ManifestReceived:
log.Debugf("Manifest Received Event!: %v", ev)
handle := ev.Data[event.Handle]
fileKey := ev.Data[event.FileKey]
serializedManifest := ev.Data[event.SerializedManifest]
downloadFilePath,exists := cp.GetAttribute(attr.GetLocalScope(fileKey))
if exists {
log.Debugf("downloading file to %v", downloadFilePath)
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest),&manifest)
if err == nil {
manifest.FileName = downloadFilePath
log.Debugf("saving manifest")
err = manifest.Save(fmt.Sprintf("%v.manifest", downloadFilePath))
if err != nil {
log.Errorf("could not save manifest...")
} else {
cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{event.FileKey: fileKey, event.Handle: handle, event.SerializedManifest: string(manifest.Serialize())} ))
}
} else {
log.Errorf("error saving manifest: %v", err)
}
} else {
log.Errorf("no download path found for manifest: %v", fileKey)
}
case event.NewRetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]
scope := ev.Data[event.Scope]
@ -804,7 +841,12 @@ func (cp *cwtchPeer) eventHandler() {
log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val)
if exists {
if scope == attr.PublicScope {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
if strings.HasSuffix(path, ".manifest.size") {
fileKey := strings.Replace(path, ".manifest.size", "", 1)
cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion} ))
} else {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
}
}
}
case event.PeerStateChange:

View File

@ -3,10 +3,13 @@ package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/files"
"cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
@ -17,6 +20,7 @@ import (
"github.com/gtank/ristretto255"
"golang.org/x/crypto/ed25519"
"strconv"
"strings"
"sync"
"time"
)
@ -46,6 +50,10 @@ type engine struct {
// Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey
manifests sync.Map // file key to manifests
activeDownloads sync.Map // file key to manifests
shuttingDown bool
}
@ -92,6 +100,11 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
// File Handling
engine.eventManager.Subscribe(event.ShareManifest, engine.queue)
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
for peer, authorization := range peerAuthorizations {
engine.authorizations.Store(peer, authorization)
}
@ -181,12 +194,50 @@ func (e *engine) eventHandler() {
e.blockUnknownContacts = true
case event.ProtocolEngineStartListen:
go e.listenFn()
case event.ShareManifest:
e.manifests.Store(ev.Data[event.FileKey], ev.Data[event.SerializedManifest])
case event.ManifestSizeReceived:
handle := ev.Data[event.Handle]
key := ev.Data[event.FileKey]
size,_ := strconv.Atoi(ev.Data[event.ManifestSize])
go e.fetchManifest(handle, key, uint64(size))
case event.ManifestSaved:
handle := ev.Data[event.Handle]
key := ev.Data[event.FileKey]
serializedManifest := ev.Data[event.SerializedManifest]
go e.downloadFile(handle, key, serializedManifest)
default:
return
}
}
}
func (e *engine) fetchManifest(handle string, fileKey string, manifestSize uint64) {
// Store a blank manifest
e.manifests.Store(fileKey, strings.Repeat("\"", int(manifestSize*files.DefaultChunkSize)))
conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability)
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.FetchManifestChunks(fileKey)
}
}
}
func (e *engine) sendManifestChunk(handle string, fileKey string, id uint64, chunk []byte) {
conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability)
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.SendMessage(PeerMessage{
Context: event.ContextSendManifest,
ID: fmt.Sprintf("%s.%d", fileKey, id),
Data: chunk,
})
}
}
}
func (e *engine) isBlocked(onion string) bool {
authorization, known := e.authorizations.Load(onion)
if !known {
@ -419,7 +470,7 @@ func (e *engine) sendMessageToPeer(eventID string, onion string, context string,
}
func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
log.Debugf("sendGetValMessage to peer %v %v%v\n", onion, scope, path)
log.Debugf("sendGetValMessage to peer %v %v.%v\n", onion, scope, path)
getVal := peerGetVal{Scope: scope, Path: path}
message, err := json.Marshal(getVal)
if err != nil {
@ -499,6 +550,109 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
ev.EventID = eventID
e.eventManager.Publish(ev)
}
} else if context == event.ContextRequestManifest {
serializedManifest, exists := e.manifests.Load(eventID)
if exists {
serializedManifest := serializedManifest.(string)
for i := 0; i < len(serializedManifest); i += files.DefaultChunkSize {
offset := i * files.DefaultChunkSize
end := (i + 1) * files.DefaultChunkSize
if end > len(serializedManifest) {
end = len(serializedManifest)
}
e.sendManifestChunk(hostname, eventID, uint64(i), []byte(serializedManifest[offset:end]))
}
}
} else if context == event.ContextSendManifest {
log.Debugf("manifest received %v %v %s", eventID, hostname, message)
fileKeyParts := strings.Split(eventID, ".")
if len(fileKeyParts) == 3 {
fileKey := fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
log.Debugf("manifest filekey: %s", fileKey)
manifestPart,err := strconv.Atoi(fileKeyParts[2])
if err == nil {
serializedManifest, exists := e.manifests.Load(fileKey)
if exists {
serializedManifest := serializedManifest.(string)
log.Debugf("loaded manifest")
offset := manifestPart * files.DefaultChunkSize
end := (manifestPart + 1) * files.DefaultChunkSize
log.Debugf("storing manifest part %v %v", offset, end)
serializedManifestBytes := []byte(serializedManifest)
copy(serializedManifestBytes[offset:end], message[:])
if len(message) < files.DefaultChunkSize {
serializedManifestBytes = serializedManifestBytes[0:len(serializedManifestBytes)-(files.DefaultChunkSize-len(message))]
}
serializedManifest = string(serializedManifestBytes)
e.manifests.Store(fileKey, serializedManifest)
log.Debugf("current manifest: [%s]", serializedManifest)
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err == nil && hex.EncodeToString(manifest.RootHash) == fileKeyParts[0] {
log.Debugf("valid manifest received! %x", manifest.RootHash)
// We have a valid manifest
e.eventManager.Publish(event.NewEvent(event.ManifestReceived, map[event.Field]string{event.Handle: hostname, event.FileKey: fileKey, event.SerializedManifest: string(serializedManifest)}))
}
}
}
}
} else if context == event.ContextRequestFile {
log.Debugf("chunk request: %v", eventID)
serializedManifest, exists := e.manifests.Load(eventID)
if exists {
serializedManifest := serializedManifest.(string)
log.Debugf("manifest found: %v", serializedManifest)
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err == nil {
chunkSpec, err := files.Deserialize(string(message))
log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, message)
if err == nil {
for _, chunk := range *chunkSpec {
contents, err := manifest.GetChunkBytes(chunk)
if err == nil {
log.Debugf("sending chunk: %v %x", chunk, contents)
e.sendMessageToPeer(fmt.Sprintf("%v.%d", eventID, chunk), hostname, event.ContextSendFile, contents)
}
}
}
}
}
} else if context == event.ContextSendFile {
fileKeyParts := strings.Split(eventID, ".")
log.Debugf("got chunk for %s", fileKeyParts)
if len(fileKeyParts) == 3 {
fileKey := fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
chunkID, err := strconv.Atoi(fileKeyParts[2])
log.Debugf("got chunk id %d", chunkID)
if err == nil {
manifestI, exists := e.activeDownloads.Load(fileKey)
if exists {
manifest := manifestI.(*files.Manifest)
log.Debugf("found active manifest %v", manifest)
progress,err := manifest.StoreChunk(uint64(chunkID), message)
log.Debugf("attempts to store chunk %v %v", progress, err)
if err == nil{
if int(progress) == len(manifest.Chunks) {
if manifest.VerifyFile() == nil {
manifest.Close()
e.activeDownloads.Delete(fileKey)
log.Debugf("file verified and downloaded!")
e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string {event.FileKey: fileKey} ))
}
}
e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string {event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(len(manifest.Chunks))} ))
} else {
// if a chunk fails to save (possibly because its data hash didn't match the manifest), re-request it
e.sendMessageToPeer(fileKey, hostname, event.ContextRequestFile, []byte(strconv.Itoa(chunkID)))
}
}
}
}
} else {
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
}
@ -530,3 +684,31 @@ func (e *engine) leaveServer(server string) {
e.ephemeralServices.Delete(server)
}
}
func (e *engine) downloadFile(handle string, key string, serializedManifest string) {
var manifest files.Manifest
err := json.Unmarshal([]byte(serializedManifest), &manifest)
if err == nil {
err := manifest.PrepareDownload()
if err != nil {
// TODO
return
}
e.activeDownloads.Store(key, &manifest)
log.Debugf("downloading file chunks: %v %v", manifest.GetChunkRequest().Serialize(), manifest)
conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability)
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.SendMessage(PeerMessage{
ID: key,
Context: event.ContextRequestFile,
Data: []byte(manifest.GetChunkRequest().Serialize()),
})
}
}
}
}

View File

@ -124,3 +124,16 @@ func (pa *PeerApp) SendMessage(message PeerMessage) {
serialized, _ := json.Marshal(message)
pa.connection.Send(serialized)
}
func (pa *PeerApp) FetchManifestChunks(key string) {
message := PeerMessage{
Context: event.ContextRequestManifest,
ID: key,
Data: []byte{},
}
serialized, _ := json.Marshal(message)
pa.connection.Send(serialized)
}

View File

@ -0,0 +1,81 @@
package files
import (
"errors"
"fmt"
"strconv"
"strings"
)
type ChunkSpec []uint64
func CreateChunkSpec(progress []bool) ChunkSpec {
var chunks ChunkSpec
for i,p := range progress {
if !p {
chunks = append(chunks, uint64(i))
}
}
return chunks
}
func Deserialize(serialized string) (*ChunkSpec,error) {
var chunkSpec ChunkSpec
if len(serialized) == 0 {
return &chunkSpec, nil
}
ranges := strings.Split(serialized, ",")
for _,r := range ranges {
parts := strings.Split(r, ":")
if len(parts) == 1 {
single,err := strconv.Atoi(r)
if err != nil {
return nil, errors.New("invalid chunk spec")
}
chunkSpec = append(chunkSpec, uint64(single))
} else if len(parts) == 2 {
start,err1 := strconv.Atoi(parts[0])
end,err2 := strconv.Atoi(parts[1])
if err1 != nil || err2 != nil {
return nil, errors.New("invalid chunk spec")
}
for i := start; i <=end; i++ {
chunkSpec = append(chunkSpec, uint64(i))
}
} else {
return nil, errors.New("invalid chunk spec")
}
}
return &chunkSpec, nil
}
func (cs ChunkSpec) Serialize() string {
result := ""
i := 0
for {
if i >= len(cs) {
break
}
j := i+1
for ; j < len(cs) && cs[j] == cs[j-1] +1; j++ {}
Review

for ; i >= len(cs) ; {
?

for ; i >= len(cs) ; { ?
if result != "" {
result += ","
}
if j == i + 1 {
result += fmt.Sprintf("%d", cs[i])
} else {
result += fmt.Sprintf("%d:%d", cs[i], cs[j-1])
}
i = j
}
return result
}

View File

@ -0,0 +1,37 @@
package files
import "testing"
func TestChunkSpec(t *testing.T) {
var testCases = map[string]ChunkSpec {
"0": CreateChunkSpec([]bool{false}),
"0:10": CreateChunkSpec([]bool{false,false,false,false,false,false,false,false,false,false,false}),
"0:1,3:5,7:9": CreateChunkSpec([]bool{false,false,true,false,false,false,true,false,false,false,true}),
"": CreateChunkSpec([]bool{true,true,true,true,true,true,true,true,true,true,true}),
"2,5,8,10": CreateChunkSpec([]bool{true,true,false,true,true,false,true,true,false,true,false}),
//
"0,2:10": CreateChunkSpec([]bool{false,true,false,false,false,false,false,false,false,false,false}),
"0:8,10": CreateChunkSpec([]bool{false,false,false,false,false,false,false,false,false,true,false}),
"1:9": CreateChunkSpec([]bool{true,false,false,false,false,false,false,false,false,false,true}),
}
for k,v := range testCases {
if k != v.Serialize() {
t.Fatalf("got %v but expected %v", v.Serialize(),k )
}
t.Logf("%v == %v", k, v.Serialize())
}
for k,v := range testCases {
if cs, err := Deserialize(k); err != nil {
t.Fatalf("error deserialized key: %v %v", k, err )
} else {
if v.Serialize() != cs.Serialize() {
t.Fatalf("got %v but expected %v", v.Serialize(),cs.Serialize() )
}
t.Logf("%v == %v", cs.Serialize(), v.Serialize())
}
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

BIN
protocol/files/cwtch.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

View File

@ -0,0 +1 @@
{"Chunks":["G3eXqrYVuCSATqG9RsXS/jHpSdp5zp2aO2zFAjO+ZueaVzPTxd33wvFr4zUvHQVDI+VSVV+ldogpVVE1Fyaapg==","glF2hLxOyXwvz/L2WuLrHf0Ed14hsXsilyPMSjDsn5ERuEaCmczg8qkwbpwoYcjJo/fsFJLzRj8m37RP31WTDg==","XQgG1ibuhEZyLfzIS6cw+mqkWQMz1XJTKM6bD9nFduc0/Wxdyv9oqbQhn4RvqcMam8N3x5yIJmFqbsG6nNlZxA==","WwIbGgjwG4iF/w3G1R5BpluHEgNOFXdxO3ZtLdFAbFLhB1HhxK2D8wrnpjWUnVXzynPSnvA/InFlWhaN9zvvsA==","0QGYg1b69T0LCU5IZkt0fBQYxZeFGpbBV3QErMFpENPM0GUVWwRQFc8GA0v6Mnoyl0iAps5eB4DENBiahGIDpQ==","w995uLObqfuZqvxsNG2jr5itl4LUvO8WZzuTOMFLC1vE5+lcdjwE06+42q6C54QE5PWB1wYbN3cVf4cNmC8FQA==","Yf4+mx59iSQJs6UYhelxbVIdFlx1SRgoOhAtjdJh2oLauPGvnCVQVaWN9ylKVPP+PNDoqfpbpKnryQNF0lwX2g=="],"FileName":"cwtch.png","RootHash":"jw7XO7sw20W2p0CxJRyuApRfSOT5kUZNXzYHaFxF3NE2oyXasuX2QpzitxXmArILWxa/dDj7YjX+/pEq3O21/Q==","FileSizeInBytes":51791,"ChunkSizeInBytes":8000}

View File

@ -0,0 +1 @@
Hello World!

310
protocol/files/manifest.go Normal file
View File

@ -0,0 +1,310 @@
package files
import (
"bufio"
"crypto/sha512"
"crypto/subtle"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
)
// Chunk is a wrapper around a hash
type Chunk []byte
// DefaultChunkSize is the default value of a manifest chunk
const DefaultChunkSize = 4096
dan marked this conversation as resolved
Review

this seems a little low? most torrents I see anyways chunk in 1MB to 8mb chunks. dependingly a single jpg would only be like 1-3 chunks. with 4kb instead even a jpg is gonna be a ton of chunks, that may be a lot of needless overhead?

this seems a little low? most torrents I see anyways chunk in 1MB to 8mb chunks. dependingly a single jpg would only be like 1-3 chunks. with 4kb instead even a jpg is gonna be a ton of chunks, that may be a lot of needless overhead?
Review

We are limited by tapir upper bound here plus the over head of json + encryption. We may increse the tapir limit at some point but it is currently also bound to server message max size.

We are limited by tapir upper bound here plus the over head of json + encryption. We may increse the tapir limit at some point but it is currently also bound to server message max size.
// Manifest is a collection of hashes and other metadata needed to reconstruct a file and verify contents given a root hash
type Manifest struct {
Chunks []Chunk
FileName string
RootHash []byte
FileSizeInBytes uint64
ChunkSizeInBytes uint64
chunkComplete []bool
openFd *os.File
progress uint64
lock sync.Mutex
}
// CreateManifest takes in a file path and constructs a file sharing manifest of hashes along with
// other information necessary to download, reconstruct and verify the file.
func CreateManifest(path string) (*Manifest, error) {
// Process file into Chunks
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
reader := bufio.NewReader(f)
buf := make([]byte, DefaultChunkSize)
var chunks []Chunk
fileSizeInBytes := uint64(0)
rootHash := sha512.New()
for {
n, err := reader.Read(buf)
if err != nil {
if err != io.EOF {
return nil, err
}
break
}
hash := sha512.New()
hash.Write(buf[0:n])
rootHash.Write(buf[0:n])
chunkHash := hash.Sum(nil)
chunks = append(chunks, chunkHash)
fileSizeInBytes += uint64(n)
}
return &Manifest{
Chunks: chunks,
FileName: path,
RootHash: rootHash.Sum(nil),
ChunkSizeInBytes: DefaultChunkSize,
FileSizeInBytes: fileSizeInBytes,
dan marked this conversation as resolved
Review

yeah we inc filesize by n here

yeah we inc filesize by n here
chunkComplete: make([]bool, len(chunks)),
}, nil
}
// GetChunkBytes takes in a chunk identifier and returns the bytes associated with that chunk
// it does not attempt to validate the chunk Hash.
func (m *Manifest) GetChunkBytes(id uint64) ([]byte, error) {
m.lock.Lock()
defer m.lock.Unlock()
if id >= uint64(len(m.Chunks)) {
return nil, errors.New("chunk not found")
}
if err := m.getFileHandle(); err != nil {
return nil, err
}
// Seek to Chunk
offset, err := m.openFd.Seek(int64(id*m.ChunkSizeInBytes), 0)
if (uint64(offset) != id*m.ChunkSizeInBytes) || err != nil {
return nil, errors.New("chunk not found")
}
// Read chunk into memory and return...
reader := bufio.NewReader(m.openFd)
buf := make([]byte, m.ChunkSizeInBytes)
n, err := reader.Read(buf)
dan marked this conversation as resolved
Review

i'm just picturing thrashing if we're sharing the same file to multiple parties and chunk sizes are 4k. and a lot of lock waiting

i'm just picturing thrashing if we're sharing the same file to multiple parties and chunk sizes are 4k. and a lot of lock waiting
if err != nil {
if err != io.EOF {
return nil, err
}
}
return buf[0:n], nil
}
// LoadManifest reads in a json serialized Manifest from a file
func LoadManifest(filename string) (*Manifest, error) {
bytes, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
manifest := new(Manifest)
err = json.Unmarshal(bytes, manifest)
if err != nil {
return nil, err
}
manifest.chunkComplete = make([]bool, len(manifest.Chunks))
return manifest, nil
}
// VerifyFile attempts to calculate the rootHash of a file and compare it to the expected rootHash stored in the
// manifest
func (m *Manifest) VerifyFile() error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.getFileHandle(); err != nil {
return err
}
offset, err := m.openFd.Seek(0, 0)
if offset != 0 || err != nil {
return errors.New("chunk not found")
}
rootHash := sha512.New()
reader := bufio.NewReader(m.openFd)
buf := make([]byte, m.ChunkSizeInBytes)
for {
n, err := reader.Read(buf)
rootHash.Write(buf[0:n])
if err != nil {
if err != io.EOF {
return err
}
break
}
}
calculatedRootHash := rootHash.Sum(nil)
if subtle.ConstantTimeCompare(m.RootHash, calculatedRootHash) != 1 {
return fmt.Errorf("hashes do not match %x %x", m.RootHash, calculatedRootHash)
}
return nil
}
// StoreChunk takes in a chunk id and contents, verifies the chunk has the expected hash and if so store the contents
// in the file.
func (m *Manifest) StoreChunk(id uint64, contents []byte) (uint64, error) {
m.lock.Lock()
defer m.lock.Unlock()
// Check the chunk id
if id >= uint64(len(m.Chunks)) {
return 0, errors.New("invalid chunk id")
}
// Validate the chunk hash
hash := sha512.New()
hash.Write(contents)
chunkHash := hash.Sum(nil)
if subtle.ConstantTimeCompare(chunkHash, m.Chunks[id]) != 1 {
return 0, fmt.Errorf("invalid chunk hash %x %x", chunkHash, m.Chunks[id])
}
if err := m.getFileHandle(); err != nil {
return 0, err
}
offset, err := m.openFd.Seek(int64(id*m.ChunkSizeInBytes), 0)
if (uint64(offset) != id*m.ChunkSizeInBytes) || err != nil {
return 0, errors.New("chunk not found")
}
// Write the contents of the chunk to the file
_, err = m.openFd.Write(contents)
if err == nil && m.chunkComplete[id] == false {
m.chunkComplete[id] = true
m.progress += 1
}
return m.progress, err
}
// private function to set the internal file handle
func (m *Manifest) getFileHandle() error {
// Seek to the chunk in the file
if m.openFd == nil {
fd, err := os.OpenFile(m.FileName, os.O_RDWR, 0600)
if err != nil {
return err
}
m.openFd = fd
}
return nil
}
func (m * Manifest) GetChunkRequest() ChunkSpec {
return CreateChunkSpec(m.chunkComplete)
}
// PrepareDownload creates an empty file of the expected size of the file described by the manifest
// If the file already exists it assume it is the correct file and that it is resuming from when it left off.
func (m *Manifest) PrepareDownload() error {
m.lock.Lock()
defer m.lock.Unlock()
m.chunkComplete = make([]bool, len(m.Chunks))
if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
fd, err := os.Create(m.FileName)
if err != nil {
return err
}
m.openFd = fd
writer := bufio.NewWriter(m.openFd)
buf := make([]byte, m.ChunkSizeInBytes)
for chunk := 0; chunk < len(m.Chunks)-1; chunk++ {
_, err := writer.Write(buf)
if err != nil {
return err
}
}
lastChunkSize := m.FileSizeInBytes % m.ChunkSizeInBytes
if lastChunkSize > 0 {
buf = make([]byte, lastChunkSize)
_, err := writer.Write(buf)
if err != nil {
return err
}
}
writer.Flush()
} else {
if err != nil {
return err
}
if uint64(info.Size()) != m.FileSizeInBytes {
return fmt.Errorf("file exists but is the wrong size")
}
if err := m.getFileHandle(); err != nil {
return err
}
// Calculate Progress
reader := bufio.NewReader(m.openFd)
buf := make([]byte, m.ChunkSizeInBytes)
chunkI := 0
for {
n, err := reader.Read(buf)
if err != nil {
if err != io.EOF {
return err
}
break
}
hash := sha512.New()
hash.Write(buf[0:n])
chunkHash := hash.Sum(nil)
m.progress = 0
if subtle.ConstantTimeCompare(chunkHash, m.Chunks[chunkI]) == 1 {
m.chunkComplete[chunkI] = true
m.progress += 1
}
chunkI++
}
}
return nil
Review

go's only use of for loops i feel fails it here... for looping this is a bit ugly... so ... not a request, just a evaluation put forward

for n, err := reader.Read(buf) ; err != nil ; n, err = reader.Read(buf) {
...
}
if err != io.EOF {
return err
}

go's only use of for loops i feel fails it here... for looping this is a bit ugly... so ... not a request, just a evaluation put forward for n, err := reader.Read(buf) ; err != nil ; n, err = reader.Read(buf) { ... } if err != io.EOF { return err }
}
// Close closes the underlying file descriptor
func (m *Manifest) Close() {
m.lock.Lock()
defer m.lock.Unlock()
if m.openFd != nil {
m.openFd.Close()
}
}
func (m Manifest) Save(path string) error {
manifestJson,_ := json.Marshal(&m)
return ioutil.WriteFile(path, manifestJson, 0600)
}
func (m * Manifest) Serialize() []byte {
data,_ := json.Marshal(m)
return data
}

View File

@ -0,0 +1,130 @@
package files
import (
"encoding/hex"
"encoding/json"
"io/ioutil"
"math"
"testing"
)
func TestManifest(t *testing.T) {
manifest, err := CreateManifest("example.txt")
if err != nil {
t.Fatalf("manifest create error: %v", err)
}
if len(manifest.Chunks) != 1 {
t.Fatalf("manifest had unepxected Chunks : %v", manifest.Chunks)
}
if manifest.FileSizeInBytes != 12 {
t.Fatalf("manifest had unepxected length : %v", manifest.FileSizeInBytes)
}
if hex.EncodeToString(manifest.RootHash) != "861844d6704e8573fec34d967e20bcfef3d424cf48be04e6dc08f2bd58c729743371015ead891cc3cf1c9d34b49264b510751b1ff9e537937bc46b5d6ff4ecc8" {
t.Fatalf("manifest had incorrect root Hash : %v", manifest.RootHash)
}
t.Logf("%v", manifest)
// Try to tread the chunk
contents, err := manifest.GetChunkBytes(1)
if err == nil {
t.Fatalf("chunk fetch should have thrown an error")
}
contents, err = manifest.GetChunkBytes(0)
if err != nil {
t.Fatalf("chunk fetch error: %v", err)
}
contents, err = manifest.GetChunkBytes(0)
if err != nil {
t.Fatalf("chunk fetch error: %v", err)
}
json, _ := json.Marshal(manifest)
t.Logf("%s", json)
t.Logf("%s", contents)
}
func TestManifestLarge(t *testing.T) {
manifest, err := CreateManifest("cwtch.png")
if err != nil {
t.Fatalf("manifest create error: %v", err)
}
if len(manifest.Chunks) != int(math.Ceil(float64(51791)/float64(8000))) {
t.Fatalf("manifest had unexpected Chunks : %v", manifest.Chunks)
}
if manifest.FileSizeInBytes != 51791 {
t.Fatalf("manifest had unepxected length : %v", manifest.FileSizeInBytes)
}
if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" {
t.Fatalf("manifest had incorrect root Hash : %v", manifest.RootHash)
}
t.Logf("%v", len(manifest.Chunks))
json, _ := json.Marshal(manifest)
t.Logf("%v %s", len(json), json)
// Pretend we downloaded the manifest
ioutil.WriteFile("cwtch.png.manifest", json, 0600)
// Load the manifest from a file
cwtchPngManifest, err := LoadManifest("cwtch.png.manifest")
if err != nil {
t.Fatalf("manifest create error: %v", err)
}
defer cwtchPngManifest.Close()
t.Logf("%v", cwtchPngManifest)
// Test verifying the hash
if cwtchPngManifest.VerifyFile() != nil {
t.Fatalf("hashes do not validate error: %v", err)
}
// Prepare Download
cwtchPngOutManifest, _ := LoadManifest("cwtch.png.manifest")
cwtchPngOutManifest.FileName = "cwtch.out.png"
defer cwtchPngOutManifest.Close()
err = cwtchPngOutManifest.PrepareDownload()
if err != nil {
t.Fatalf("could not prepare download %v", err)
}
for i := 0; i < len(cwtchPngManifest.Chunks); i++ {
t.Logf("Sending Chunk %v %x from %v", i, cwtchPngManifest.Chunks[i], cwtchPngManifest.FileName)
contents, err := cwtchPngManifest.GetChunkBytes(uint64(i))
if err != nil {
t.Fatalf("could not get chunk %v %v", i, err)
}
t.Logf("Progress: %v", cwtchPngOutManifest.chunkComplete)
_, err = cwtchPngOutManifest.StoreChunk(uint64(i), contents)
if err != nil {
t.Fatalf("could not store chunk %v %v", i, err)
}
}
err = cwtchPngOutManifest.VerifyFile()
if err != nil {
t.Fatalf("could not verify file %v", err)
}
// Test that changing the hash throws an error
cwtchPngManifest.RootHash[3] = 0xFF
if cwtchPngManifest.VerifyFile() == nil {
t.Fatalf("hashes should not validate error")
}
}

BIN
testing/cwtch.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

View File

@ -0,0 +1,120 @@
package testing
import (
"crypto/rand"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/utils"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/model"
"encoding/base64"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
mrand "math/rand"
"os"
"os/user"
"path"
"testing"
"time"
)
func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.AddEverythingFromPattern("connectivity")
log.SetLevel(log.LevelDebug)
log.ExcludeFromPattern("connection/connection")
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("pipeBridge")
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
// we don't need real randomness for the port, just to avoid a possible conflict...
mrand.Seed(int64(time.Now().Nanosecond()))
socksPort := mrand.Intn(1000) + 9051
controlPort := mrand.Intn(1000) + 9052
// generate a random password
key := make([]byte, 64)
_, err := rand.Read(key)
if err != nil {
panic(err)
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
}
app := app2.NewApp(acn, "./storage")
usr, _ := user.Current()
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
os.Mkdir(cwtchDir, 0700)
os.RemoveAll(path.Join(cwtchDir, "testing"))
os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
fmt.Println("Creating Alice...")
app.CreatePeer("alice", "asdfasdf")
fmt.Println("Creating Bob...")
app.CreatePeer("bob", "asdfasdf")
alice := utils.WaitGetPeer(app, "alice")
bob := utils.WaitGetPeer(app, "bob")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived})
app.LaunchPeers()
waitTime := time.Duration(30) * time.Second
t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime)
time.Sleep(waitTime)
bob.AddContact("alice?", alice.GetOnion(), model.AuthApproved)
alice.PeerWithOnion(bob.GetOnion())
fmt.Println("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
fmt.Println("Alice and Bob are Connected!!")
filesharingFunctionality,_ := filesharing.FunctionalityGate(map[string]bool{"filesharing": true})
err = filesharingFunctionality.SendFile("cwtch.png", alice, bob.GetOnion())
if err != nil {
t.Fatalf("Error!")
}
time.Sleep(time.Second * 10)
for _, message := range bob.GetContact(alice.GetOnion()).Timeline.GetMessages() {
var messageWrapper model.MessageWrapper
json.Unmarshal([]byte(message.Message), &messageWrapper)
if messageWrapper.Overlay == model.OverlayFileSharing {
var fileMessageOverlay filesharing.OverlayMessage
err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay)
if err == nil {
filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", fmt.Sprintf("%x.%x", fileMessageOverlay.Hash, fileMessageOverlay.Nonce))
}
}
fmt.Printf("Found message from Alice: %v", message.Message)
}
time.Sleep(time.Minute)
}