File Sharing MVP #384
|
@ -248,6 +248,14 @@ const (
|
|||
|
||||
// For situations where we want to update $Identity -> $RemotePeer/$GroupID's total message count to be $Data
|
||||
MessageCounterResync = Type("MessageCounterResync")
|
||||
|
||||
// File Handling Events
|
||||
ShareManifest = Type("ShareManifest")
|
||||
ManifestSizeReceived = Type("ManifestSizeReceived")
|
||||
ManifestReceived = Type("ManifestReceived")
|
||||
ManifestSaved = Type("ManifestSaved")
|
||||
FileDownloadProgressUpdate = Type("FileDownloadProgressUpdate")
|
||||
FileDownloaded = Type("FileDownloaded")
|
||||
)
|
||||
|
||||
// Field defines common event attributes
|
||||
|
@ -312,6 +320,11 @@ const (
|
|||
Imported = Field("Imported")
|
||||
|
||||
Source = Field("Source")
|
||||
|
||||
FileKey = Field("FileKey")
|
||||
FileSizeInChunks = Field("FileSizeInChunks")
|
||||
ManifestSize = Field("ManifestSize")
|
||||
SerializedManifest = Field("SerializedManifest")
|
||||
)
|
||||
|
||||
// Defining Common errors
|
||||
|
@ -328,11 +341,15 @@ const (
|
|||
|
||||
// Defining Protocol Contexts
|
||||
const (
|
||||
ContextAck = "im.cwtch.acknowledgement"
|
||||
ContextInvite = "im.cwtch.invite"
|
||||
ContextRaw = "im.cwtch.raw"
|
||||
ContextGetVal = "im.cwtch.getVal"
|
||||
ContextRetVal = "im.cwtch.retVal"
|
||||
ContextAck = "im.cwtch.acknowledgement"
|
||||
ContextInvite = "im.cwtch.invite"
|
||||
ContextRaw = "im.cwtch.raw"
|
||||
ContextGetVal = "im.cwtch.getVal"
|
||||
ContextRetVal = "im.cwtch.retVal"
|
||||
ContextRequestManifest = "im.cwtch.file.request.manifest"
|
||||
ContextSendManifest = "im.cwtch.file.send.manifest"
|
||||
ContextRequestFile = "im.cwtch.file.request.chunk"
|
||||
ContextSendFile = "im.cwtch.file.send.chunk"
|
||||
)
|
||||
|
||||
// Define Default Attribute Keys
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
package filesharing
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/files"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"io"
|
||||
"math"
|
||||
"path"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Functionality groups some common UI triggered functions for contacts...
|
||||
type Functionality struct {
|
||||
}
|
||||
|
||||
// FunctionalityGate returns contact.Functionality always
|
||||
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
|
||||
if experimentMap["filesharing"] == true {
|
||||
return new(Functionality), nil
|
||||
|
||||
}
|
||||
return nil, errors.New("filesharing is not enabled")
|
||||
}
|
||||
|
||||
// OverlayMessage presents the canonical format of the File Sharing functionality Overlay Message
|
||||
// This is the format that the UI will parse to display the message
|
||||
type OverlayMessage struct {
|
||||
Name string `json:"f"`
|
||||
Hash []byte `json:"h"`
|
||||
Nonce []byte `json:"n"`
|
||||
Size uint64 `json:"s"`
|
||||
}
|
||||
|
||||
// DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process
|
||||
// to downloadFilePath
|
||||
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, key string) {
|
||||
profile.SetAttribute(attr.GetLocalScope(key), downloadFilePath)
|
||||
profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key))
|
||||
}
|
||||
|
||||
// ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file
|
||||
// at filepath
|
||||
func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handle string) error {
|
||||
manifest, err := files.CreateManifest(filepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var nonce [24]byte
|
||||
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
|
||||
log.Errorf("Cannot read from random: %v\n", err)
|
||||
return err
|
||||
}
|
||||
|
||||
message := OverlayMessage{
|
||||
Name: path.Base(manifest.FileName),
|
||||
Hash: manifest.RootHash,
|
||||
Nonce: nonce[:],
|
||||
Size: manifest.FileSizeInBytes,
|
||||
}
|
||||
|
||||
data, _ := json.Marshal(message)
|
||||
|
||||
wrapper := model.MessageWrapper{
|
||||
Overlay: model.OverlayFileSharing,
|
||||
Data: string(data),
|
||||
}
|
||||
|
||||
wrapperJSON, _ := json.Marshal(wrapper)
|
||||
key := fmt.Sprintf("%x.%x", manifest.RootHash, nonce)
|
||||
serializedManifest, _ := json.Marshal(manifest)
|
||||
profile.ShareFile(key, string(serializedManifest))
|
||||
|
||||
// Store the size of the manifest (in chunks) as part of the public scope so contacts who we share the file with
|
||||
// can fetch the manifest as if it were a file.
|
||||
profile.SetAttribute(attr.GetPublicScope(fmt.Sprintf("%s.manifest.size", key)), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest))/float64(files.DefaultChunkSize)))))
|
||||
|
||||
profile.SendMessageToPeer(handle, string(wrapperJSON))
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package model
|
||||
|
||||
// MessageWrapper is the canonical Cwtch overlay wrapper
|
||||
type MessageWrapper struct {
|
||||
dan marked this conversation as resolved
dan
commented
do we want to more formally move this into Cwtch core and expose it a bit more, and have cwtch-ui use that directly reather than dupping it there? do we want to more formally move this into Cwtch core and expose it a bit more, and have cwtch-ui use that directly reather than dupping it there?
sarah
commented
Yes, that is why it is now in model. Yes, that is why it is now in model.
|
||||
Overlay int `json:"o"`
|
||||
Data string `json:"d"`
|
||||
}
|
||||
|
||||
// OverlayChat is the canonical identifier for chat overlays
|
||||
const OverlayChat = 1
|
||||
|
||||
// OverlayInviteContact is the canonical identifier for the contact invite overlay
|
||||
const OverlayInviteContact = 100
|
||||
|
||||
// OverlayInviteGroup is the canonical identifier for the group invite overlay
|
||||
const OverlayInviteGroup = 101
|
||||
|
||||
// OverlayFileSharing is the canonical identifier for the file sharing overlay
|
||||
const OverlayFileSharing = 200
|
|
@ -5,10 +5,12 @@ import (
|
|||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/protocol/files"
|
||||
"encoding/base32"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -21,7 +23,8 @@ const lastKnownSignature = "LastKnowSignature"
|
|||
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
|
||||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
|
||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true}
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
|
||||
event.ManifestSizeReceived: true, event.ManifestReceived: true}
|
||||
|
||||
// DefaultEventsToHandle specifies which events will be subscribed to
|
||||
// when a peer has its Init() function called
|
||||
|
@ -189,6 +192,8 @@ type CwtchPeer interface {
|
|||
SendMessages
|
||||
ModifyMessages
|
||||
SendMessagesToGroup
|
||||
|
||||
ShareFile(fileKey string, serializedManifest string)
|
||||
}
|
||||
|
||||
// NewCwtchPeer creates and returns a new cwtchPeer with the given name.
|
||||
|
@ -702,6 +707,10 @@ func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Tim
|
|||
cp.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) {
|
||||
cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest}))
|
||||
}
|
||||
|
||||
// eventHandler process events from other subsystems
|
||||
func (cp *cwtchPeer) eventHandler() {
|
||||
for {
|
||||
|
@ -795,6 +804,33 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
|
||||
/***** Non default but requestable handlable events *****/
|
||||
|
||||
case event.ManifestReceived:
|
||||
log.Debugf("Manifest Received Event!: %v", ev)
|
||||
handle := ev.Data[event.Handle]
|
||||
fileKey := ev.Data[event.FileKey]
|
||||
serializedManifest := ev.Data[event.SerializedManifest]
|
||||
|
||||
downloadFilePath, exists := cp.GetAttribute(attr.GetLocalScope(fileKey))
|
||||
if exists {
|
||||
log.Debugf("downloading file to %v", downloadFilePath)
|
||||
var manifest files.Manifest
|
||||
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
||||
if err == nil {
|
||||
manifest.FileName = downloadFilePath
|
||||
log.Debugf("saving manifest")
|
||||
err = manifest.Save(fmt.Sprintf("%v.manifest", downloadFilePath))
|
||||
if err != nil {
|
||||
log.Errorf("could not save manifest...")
|
||||
} else {
|
||||
cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{event.FileKey: fileKey, event.Handle: handle, event.SerializedManifest: string(manifest.Serialize())}))
|
||||
}
|
||||
} else {
|
||||
log.Errorf("error saving manifest: %v", err)
|
||||
}
|
||||
} else {
|
||||
log.Errorf("no download path found for manifest: %v", fileKey)
|
||||
}
|
||||
|
||||
case event.NewRetValMessageFromPeer:
|
||||
onion := ev.Data[event.RemotePeer]
|
||||
scope := ev.Data[event.Scope]
|
||||
|
@ -804,7 +840,12 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val)
|
||||
if exists {
|
||||
if scope == attr.PublicScope {
|
||||
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
|
||||
if strings.HasSuffix(path, ".manifest.size") {
|
||||
fileKey := strings.Replace(path, ".manifest.size", "", 1)
|
||||
cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion}))
|
||||
} else {
|
||||
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
|
||||
}
|
||||
}
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
|
|
|
@ -3,7 +3,9 @@ package connections
|
|||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/protocol/files"
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
model3 "cwtch.im/cwtch/protocol/model"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
@ -46,6 +48,9 @@ type engine struct {
|
|||
// Required for listen(), inaccessible from identity
|
||||
privateKey ed25519.PrivateKey
|
||||
|
||||
// file sharing subsystem is responsible for maintaining active shares and downloads
|
||||
filesharingSubSystem files.FileSharingSubSystem
|
||||
|
||||
shuttingDown bool
|
||||
}
|
||||
|
||||
|
@ -92,6 +97,11 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
|||
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
|
||||
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
|
||||
|
||||
// File Handling
|
||||
engine.eventManager.Subscribe(event.ShareManifest, engine.queue)
|
||||
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
|
||||
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
|
||||
|
||||
for peer, authorization := range peerAuthorizations {
|
||||
engine.authorizations.Store(peer, authorization)
|
||||
}
|
||||
|
@ -181,6 +191,20 @@ func (e *engine) eventHandler() {
|
|||
e.blockUnknownContacts = true
|
||||
case event.ProtocolEngineStartListen:
|
||||
go e.listenFn()
|
||||
case event.ShareManifest:
|
||||
e.filesharingSubSystem.ShareFile(ev.Data[event.FileKey], ev.Data[event.SerializedManifest])
|
||||
case event.ManifestSizeReceived:
|
||||
handle := ev.Data[event.Handle]
|
||||
key := ev.Data[event.FileKey]
|
||||
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
|
||||
go e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size)))
|
||||
case event.ManifestSaved:
|
||||
handle := ev.Data[event.Handle]
|
||||
key := ev.Data[event.FileKey]
|
||||
serializedManifest := ev.Data[event.SerializedManifest]
|
||||
for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest) {
|
||||
go e.sendPeerMessage(handle, message)
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
@ -410,7 +434,7 @@ func (e *engine) sendMessageToPeer(eventID string, onion string, context string,
|
|||
if err == nil {
|
||||
peerApp, ok := (conn.App()).(*PeerApp)
|
||||
if ok {
|
||||
peerApp.SendMessage(PeerMessage{eventID, context, message})
|
||||
peerApp.SendMessage(model3.PeerMessage{ID: eventID, Context: context, Data: message})
|
||||
return nil
|
||||
}
|
||||
return errors.New("failed type assertion conn.App != PeerApp")
|
||||
|
@ -419,7 +443,7 @@ func (e *engine) sendMessageToPeer(eventID string, onion string, context string,
|
|||
}
|
||||
|
||||
func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
|
||||
log.Debugf("sendGetValMessage to peer %v %v%v\n", onion, scope, path)
|
||||
log.Debugf("sendGetValMessage to peer %v %v.%v\n", onion, scope, path)
|
||||
getVal := peerGetVal{Scope: scope, Path: path}
|
||||
message, err := json.Marshal(getVal)
|
||||
if err != nil {
|
||||
|
@ -499,6 +523,28 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
|
|||
ev.EventID = eventID
|
||||
e.eventManager.Publish(ev)
|
||||
}
|
||||
} else if context == event.ContextRequestManifest {
|
||||
for _, message := range e.filesharingSubSystem.RequestManifestParts(eventID) {
|
||||
e.sendPeerMessage(hostname, message)
|
||||
}
|
||||
} else if context == event.ContextSendManifest {
|
||||
if fileKey, manifest := e.filesharingSubSystem.ReceiveManifestPart(eventID, message); len(manifest) != 0 {
|
||||
// We have a valid manifest
|
||||
e.eventManager.Publish(event.NewEvent(event.ManifestReceived, map[event.Field]string{event.Handle: hostname, event.FileKey: fileKey, event.SerializedManifest: manifest}))
|
||||
}
|
||||
} else if context == event.ContextRequestFile {
|
||||
for _, message := range e.filesharingSubSystem.ProcessChunkRequest(eventID, message) {
|
||||
e.sendPeerMessage(hostname, message)
|
||||
}
|
||||
} else if context == event.ContextSendFile {
|
||||
fileKey, downloaded, progress, totalChunks, _ := e.filesharingSubSystem.ProcessChunk(eventID, message)
|
||||
if len(fileKey) != 0 {
|
||||
if downloaded {
|
||||
log.Debugf("file verified and downloaded!")
|
||||
e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey}))
|
||||
}
|
||||
e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks))}))
|
||||
}
|
||||
} else {
|
||||
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
|
||||
}
|
||||
|
@ -530,3 +576,13 @@ func (e *engine) leaveServer(server string) {
|
|||
e.ephemeralServices.Delete(server)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *engine) sendPeerMessage(handle string, message model3.PeerMessage) {
|
||||
conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability)
|
||||
if err == nil {
|
||||
peerApp, ok := (conn.App()).(*PeerApp)
|
||||
if ok {
|
||||
peerApp.SendMessage(message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package connections
|
|||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
dan
commented
model2? model2?
|
||||
model2 "cwtch.im/cwtch/protocol/model"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
|
@ -27,13 +28,6 @@ type PeerApp struct {
|
|||
getValRequests sync.Map // [string]string eventID:Data
|
||||
}
|
||||
|
||||
// PeerMessage is an encapsulation that can be used by higher level applications
|
||||
type PeerMessage struct {
|
||||
ID string // A unique Message ID (primarily used for acknowledgments)
|
||||
Context string // A unique context identifier i.e. im.cwtch.chat
|
||||
Data []byte // The serialized data packet.
|
||||
}
|
||||
|
||||
type peerGetVal struct {
|
||||
Scope, Path string
|
||||
}
|
||||
|
@ -88,7 +82,7 @@ func (pa *PeerApp) listen() {
|
|||
pa.OnClose(pa.connection.Hostname())
|
||||
return
|
||||
}
|
||||
var peerMessage PeerMessage
|
||||
var peerMessage model2.PeerMessage
|
||||
err := json.Unmarshal(message, &peerMessage)
|
||||
if err == nil {
|
||||
switch peerMessage.Context {
|
||||
|
@ -106,7 +100,7 @@ func (pa *PeerApp) listen() {
|
|||
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
|
||||
|
||||
// Acknowledge the message
|
||||
pa.SendMessage(PeerMessage{peerMessage.ID, event.ContextAck, []byte{}})
|
||||
pa.SendMessage(model2.PeerMessage{ID: peerMessage.ID, Context: event.ContextAck, Data: []byte{}})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -117,7 +111,7 @@ func (pa *PeerApp) listen() {
|
|||
|
||||
// SendMessage sends the peer a preformatted message
|
||||
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
||||
func (pa *PeerApp) SendMessage(message PeerMessage) {
|
||||
func (pa *PeerApp) SendMessage(message model2.PeerMessage) {
|
||||
if message.Context == event.ContextGetVal {
|
||||
pa.getValRequests.Store(message.ID, string(message.Data))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
package files
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ChunkSpec is a wrapper around an uncompressed array of chunk identifiers
|
||||
type ChunkSpec []uint64
|
||||
|
||||
// CreateChunkSpec given a full list of chunks with their downloaded status (true for downloaded, false otherwise)
|
||||
// derives a list of identifiers of chunks that have not been downloaded yet
|
||||
func CreateChunkSpec(progress []bool) ChunkSpec {
|
||||
var chunks ChunkSpec
|
||||
for i, p := range progress {
|
||||
if !p {
|
||||
chunks = append(chunks, uint64(i))
|
||||
}
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
// Deserialize takes in a compressed chunk spec and returns an uncompressed ChunkSpec or an error
|
||||
// if the serialized chunk spec has format errors
|
||||
func Deserialize(serialized string) (*ChunkSpec, error) {
|
||||
|
||||
var chunkSpec ChunkSpec
|
||||
|
||||
if len(serialized) == 0 {
|
||||
return &chunkSpec, nil
|
||||
}
|
||||
|
||||
ranges := strings.Split(serialized, ",")
|
||||
for _, r := range ranges {
|
||||
parts := strings.Split(r, ":")
|
||||
if len(parts) == 1 {
|
||||
single, err := strconv.Atoi(r)
|
||||
if err != nil {
|
||||
return nil, errors.New("invalid chunk spec")
|
||||
}
|
||||
chunkSpec = append(chunkSpec, uint64(single))
|
||||
} else if len(parts) == 2 {
|
||||
start, err1 := strconv.Atoi(parts[0])
|
||||
end, err2 := strconv.Atoi(parts[1])
|
||||
if err1 != nil || err2 != nil {
|
||||
return nil, errors.New("invalid chunk spec")
|
||||
}
|
||||
for i := start; i <= end; i++ {
|
||||
chunkSpec = append(chunkSpec, uint64(i))
|
||||
}
|
||||
} else {
|
||||
return nil, errors.New("invalid chunk spec")
|
||||
}
|
||||
}
|
||||
return &chunkSpec, nil
|
||||
}
|
||||
|
||||
// Serialize compresses the ChunkSpec into a list of inclusive ranges e.g. 1,2,3,5,6,7 becomes "1:3,5:7"
|
||||
func (cs ChunkSpec) Serialize() string {
|
||||
result := ""
|
||||
|
||||
i := 0
|
||||
|
||||
for {
|
||||
dan
commented
for ; i >= len(cs) ; { for ; i >= len(cs) ; {
?
|
||||
if i >= len(cs) {
|
||||
break
|
||||
}
|
||||
j := i + 1
|
||||
for ; j < len(cs) && cs[j] == cs[j-1]+1; j++ {
|
||||
}
|
||||
|
||||
if result != "" {
|
||||
result += ","
|
||||
}
|
||||
|
||||
if j == i+1 {
|
||||
result += fmt.Sprintf("%d", cs[i])
|
||||
} else {
|
||||
result += fmt.Sprintf("%d:%d", cs[i], cs[j-1])
|
||||
}
|
||||
i = j
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package files
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestChunkSpec(t *testing.T) {
|
||||
|
||||
var testCases = map[string]ChunkSpec{
|
||||
"0": CreateChunkSpec([]bool{false}),
|
||||
"0:10": CreateChunkSpec([]bool{false, false, false, false, false, false, false, false, false, false, false}),
|
||||
"0:1,3:5,7:9": CreateChunkSpec([]bool{false, false, true, false, false, false, true, false, false, false, true}),
|
||||
"": CreateChunkSpec([]bool{true, true, true, true, true, true, true, true, true, true, true}),
|
||||
"2,5,8,10": CreateChunkSpec([]bool{true, true, false, true, true, false, true, true, false, true, false}),
|
||||
//
|
||||
"0,2:10": CreateChunkSpec([]bool{false, true, false, false, false, false, false, false, false, false, false}),
|
||||
"0:8,10": CreateChunkSpec([]bool{false, false, false, false, false, false, false, false, false, true, false}),
|
||||
"1:9": CreateChunkSpec([]bool{true, false, false, false, false, false, false, false, false, false, true}),
|
||||
}
|
||||
|
||||
for k, v := range testCases {
|
||||
if k != v.Serialize() {
|
||||
t.Fatalf("got %v but expected %v", v.Serialize(), k)
|
||||
}
|
||||
t.Logf("%v == %v", k, v.Serialize())
|
||||
}
|
||||
|
||||
for k, v := range testCases {
|
||||
if cs, err := Deserialize(k); err != nil {
|
||||
t.Fatalf("error deserialized key: %v %v", k, err)
|
||||
} else {
|
||||
if v.Serialize() != cs.Serialize() {
|
||||
t.Fatalf("got %v but expected %v", v.Serialize(), cs.Serialize())
|
||||
}
|
||||
t.Logf("%v == %v", cs.Serialize(), v.Serialize())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,201 @@
|
|||
package files
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/protocol/model"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// FileSharingSubSystem encapsulates the functionality necessary to share and download files via Cwtch
|
||||
//
|
||||
type FileSharingSubSystem struct {
|
||||
|
||||
// for sharing files
|
||||
activeShares sync.Map // file key to manifest
|
||||
|
||||
// for downloading files
|
||||
prospectiveManifests sync.Map // file key to serialized manifests
|
||||
activeDownloads sync.Map // file key to manifests
|
||||
}
|
||||
|
||||
|
||||
// ShareFile given a file key and a serialized manifest, allow the serialized manifest to be downloaded
|
||||
// by Cwtch profiles in possession of the fileKey
|
||||
func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest string) {
|
||||
var manifest Manifest
|
||||
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
||||
if err != nil {
|
||||
log.Errorf("could not share file %v", err)
|
||||
return
|
||||
}
|
||||
fsss.activeShares.Store(fileKey, &manifest)
|
||||
}
|
||||
|
||||
// FetchManifest given a file key and knowledge of the manifest size in chunks (obtained via an attribute lookup)
|
||||
// construct a request to download the manifest.
|
||||
func (fsss *FileSharingSubSystem) FetchManifest(fileKey string, manifestSize uint64) model.PeerMessage {
|
||||
fsss.prospectiveManifests.Store(fileKey, strings.Repeat("\"", int(manifestSize*DefaultChunkSize)))
|
||||
return model.PeerMessage{
|
||||
Context: event.ContextRequestManifest,
|
||||
ID: fileKey,
|
||||
Data: []byte{},
|
||||
}
|
||||
}
|
||||
|
||||
// CompileChunkRequests takes in a complete serializedManifest and returns a set of chunk request messages
|
||||
// TODO in the future we will want this to return the handles of contacts to request chunks from
|
||||
func (fsss *FileSharingSubSystem) CompileChunkRequests(fileKey string, serializedManifest string) []model.PeerMessage {
|
||||
var manifest Manifest
|
||||
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
||||
var messages []model.PeerMessage
|
||||
if err == nil {
|
||||
err := manifest.PrepareDownload()
|
||||
if err == nil {
|
||||
fsss.activeDownloads.Store(fileKey, &manifest)
|
||||
log.Debugf("downloading file chunks: %v", manifest.GetChunkRequest().Serialize())
|
||||
messages = append(messages, model.PeerMessage{
|
||||
ID: fileKey,
|
||||
Context: event.ContextRequestFile,
|
||||
Data: []byte(manifest.GetChunkRequest().Serialize()),
|
||||
})
|
||||
}
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
// RequestManifestParts given a fileKey construct a set of messages representing requests to download various
|
||||
// parts of the Manifest
|
||||
func (fsss *FileSharingSubSystem) RequestManifestParts(fileKey string) []model.PeerMessage {
|
||||
manifestI, exists := fsss.activeShares.Load(fileKey)
|
||||
var messages []model.PeerMessage
|
||||
if exists {
|
||||
manifest := manifestI.(*Manifest)
|
||||
serializedManifest := manifest.Serialize()
|
||||
log.Debugf("found serialized manifest: %s", serializedManifest)
|
||||
for i := 0; i < len(serializedManifest); i += DefaultChunkSize {
|
||||
offset := i * DefaultChunkSize
|
||||
end := (i + 1) * DefaultChunkSize
|
||||
if end > len(serializedManifest) {
|
||||
end = len(serializedManifest)
|
||||
}
|
||||
chunk := serializedManifest[offset:end]
|
||||
messages = append(messages, model.PeerMessage{
|
||||
Context: event.ContextSendManifest,
|
||||
ID: fmt.Sprintf("%s.%d", fileKey, uint64(i)),
|
||||
Data: chunk,
|
||||
})
|
||||
}
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
// ReceiveManifestPart given a manifestKey reconstruct part the manifest from the provided part
|
||||
func (fsss *FileSharingSubSystem) ReceiveManifestPart(manifestKey string, part []byte) (fileKey string, serializedManifest string) {
|
||||
fileKeyParts := strings.Split(manifestKey, ".")
|
||||
if len(fileKeyParts) == 3 {
|
||||
fileKey = fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
|
||||
log.Debugf("manifest filekey: %s", fileKey)
|
||||
manifestPart, err := strconv.Atoi(fileKeyParts[2])
|
||||
if err == nil {
|
||||
serializedManifest, exists := fsss.prospectiveManifests.Load(fileKey)
|
||||
if exists {
|
||||
serializedManifest := serializedManifest.(string)
|
||||
log.Debugf("loaded manifest")
|
||||
offset := manifestPart * DefaultChunkSize
|
||||
end := (manifestPart + 1) * DefaultChunkSize
|
||||
|
||||
log.Debugf("storing manifest part %v %v", offset, end)
|
||||
serializedManifestBytes := []byte(serializedManifest)
|
||||
copy(serializedManifestBytes[offset:end], part[:])
|
||||
|
||||
if len(part) < DefaultChunkSize {
|
||||
serializedManifestBytes = serializedManifestBytes[0 : len(serializedManifestBytes)-(DefaultChunkSize-len(part))]
|
||||
}
|
||||
|
||||
serializedManifest = string(serializedManifestBytes)
|
||||
fsss.prospectiveManifests.Store(fileKey, serializedManifest)
|
||||
log.Debugf("current manifest: [%s]", serializedManifest)
|
||||
var manifest Manifest
|
||||
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
||||
if err == nil && hex.EncodeToString(manifest.RootHash) == fileKeyParts[0] {
|
||||
log.Debugf("valid manifest received! %x", manifest.RootHash)
|
||||
return fileKey, serializedManifest
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// ProcessChunkRequest given a fileKey, and a chunk request, compile a set of responses for each requested Chunk
|
||||
func (fsss *FileSharingSubSystem) ProcessChunkRequest(fileKey string, serializedChunkRequest []byte) []model.PeerMessage {
|
||||
log.Debugf("chunk request: %v", fileKey)
|
||||
manifestI, exists := fsss.activeShares.Load(fileKey)
|
||||
var messages []model.PeerMessage
|
||||
if exists {
|
||||
manifest := manifestI.(*Manifest)
|
||||
log.Debugf("manifest found: %x", manifest.RootHash)
|
||||
chunkSpec, err := Deserialize(string(serializedChunkRequest))
|
||||
log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, serializedChunkRequest)
|
||||
if err == nil {
|
||||
for _, chunk := range *chunkSpec {
|
||||
contents, err := manifest.GetChunkBytes(chunk)
|
||||
if err == nil {
|
||||
log.Debugf("sending chunk: %v %x", chunk, contents)
|
||||
messages = append(messages, model.PeerMessage{
|
||||
ID: fmt.Sprintf("%v.%d", fileKey, chunk),
|
||||
Context: event.ContextSendFile,
|
||||
Data: contents,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
// ProcessChunk given a chunk key and a chunk attempt to store and verify the chunk as part of an active download
|
||||
// If this results in the file download being completed return downloaded = true
|
||||
// Always return the progress of a matched download if it exists along with the total number of chunks and the
|
||||
// given chunk ID
|
||||
// If not such active download exists then return an empty file key and ignore all further processing.
|
||||
func (fsss *FileSharingSubSystem) ProcessChunk(chunkKey string, chunk []byte) (fileKey string, downloaded bool, progress uint64, totalChunks uint64, chunkID uint64) {
|
||||
fileKeyParts := strings.Split(chunkKey, ".")
|
||||
downloaded = false
|
||||
log.Debugf("got chunk for %s", fileKeyParts)
|
||||
if len(fileKeyParts) == 3 {
|
||||
fileKey = fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1])
|
||||
derivedChunkID, err := strconv.Atoi(fileKeyParts[2])
|
||||
if err == nil {
|
||||
log.Debugf("got chunk id %d", chunkID)
|
||||
chunkID = uint64(derivedChunkID)
|
||||
manifestI, exists := fsss.activeDownloads.Load(fileKey)
|
||||
if exists {
|
||||
manifest := manifestI.(*Manifest)
|
||||
log.Debugf("found active manifest %v", manifest)
|
||||
progress, err = manifest.StoreChunk(uint64(chunkID), chunk)
|
||||
totalChunks = uint64(len(manifest.Chunks))
|
||||
log.Debugf("attempts to store chunk %v %v", progress, err)
|
||||
if err == nil {
|
||||
if progress == totalChunks {
|
||||
if manifest.VerifyFile() == nil {
|
||||
manifest.Close()
|
||||
fsss.activeDownloads.Delete(fileKey)
|
||||
log.Debugf("file verified and downloaded!")
|
||||
downloaded = true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO if a chunk fails to save (possibly because its data hash didn't match the manifest), re-request it
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
|
@ -0,0 +1,312 @@
|
|||
package files
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/sha512"
|
||||
"crypto/subtle"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Chunk is a wrapper around a hash
|
||||
type Chunk []byte
|
||||
|
||||
// DefaultChunkSize is the default value of a manifest chunk
|
||||
const DefaultChunkSize = 4096
|
||||
|
||||
dan marked this conversation as resolved
dan
commented
this seems a little low? most torrents I see anyways chunk in 1MB to 8mb chunks. dependingly a single jpg would only be like 1-3 chunks. with 4kb instead even a jpg is gonna be a ton of chunks, that may be a lot of needless overhead? this seems a little low? most torrents I see anyways chunk in 1MB to 8mb chunks. dependingly a single jpg would only be like 1-3 chunks. with 4kb instead even a jpg is gonna be a ton of chunks, that may be a lot of needless overhead?
sarah
commented
We are limited by tapir upper bound here plus the over head of json + encryption. We may increse the tapir limit at some point but it is currently also bound to server message max size. We are limited by tapir upper bound here plus the over head of json + encryption. We may increse the tapir limit at some point but it is currently also bound to server message max size.
|
||||
// Manifest is a collection of hashes and other metadata needed to reconstruct a file and verify contents given a root hash
|
||||
type Manifest struct {
|
||||
Chunks []Chunk
|
||||
FileName string
|
||||
RootHash []byte
|
||||
FileSizeInBytes uint64
|
||||
ChunkSizeInBytes uint64
|
||||
|
||||
chunkComplete []bool
|
||||
openFd *os.File
|
||||
progress uint64
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// CreateManifest takes in a file path and constructs a file sharing manifest of hashes along with
|
||||
// other information necessary to download, reconstruct and verify the file.
|
||||
func CreateManifest(path string) (*Manifest, error) {
|
||||
|
||||
// Process file into Chunks
|
||||
f, err := os.Open(path)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
reader := bufio.NewReader(f)
|
||||
buf := make([]byte, DefaultChunkSize)
|
||||
|
||||
var chunks []Chunk
|
||||
fileSizeInBytes := uint64(0)
|
||||
|
||||
rootHash := sha512.New()
|
||||
|
||||
for {
|
||||
n, err := reader.Read(buf)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
break
|
||||
}
|
||||
hash := sha512.New()
|
||||
hash.Write(buf[0:n])
|
||||
rootHash.Write(buf[0:n])
|
||||
chunkHash := hash.Sum(nil)
|
||||
chunks = append(chunks, chunkHash)
|
||||
fileSizeInBytes += uint64(n)
|
||||
}
|
||||
|
||||
return &Manifest{
|
||||
Chunks: chunks,
|
||||
FileName: path,
|
||||
RootHash: rootHash.Sum(nil),
|
||||
ChunkSizeInBytes: DefaultChunkSize,
|
||||
FileSizeInBytes: fileSizeInBytes,
|
||||
dan marked this conversation as resolved
dan
commented
yeah we inc filesize by n here yeah we inc filesize by n here
|
||||
chunkComplete: make([]bool, len(chunks)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetChunkBytes takes in a chunk identifier and returns the bytes associated with that chunk
|
||||
// it does not attempt to validate the chunk Hash.
|
||||
func (m *Manifest) GetChunkBytes(id uint64) ([]byte, error) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if id >= uint64(len(m.Chunks)) {
|
||||
return nil, errors.New("chunk not found")
|
||||
}
|
||||
|
||||
if err := m.getFileHandle(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Seek to Chunk
|
||||
offset, err := m.openFd.Seek(int64(id*m.ChunkSizeInBytes), 0)
|
||||
if (uint64(offset) != id*m.ChunkSizeInBytes) || err != nil {
|
||||
return nil, errors.New("chunk not found")
|
||||
}
|
||||
|
||||
// Read chunk into memory and return...
|
||||
reader := bufio.NewReader(m.openFd)
|
||||
buf := make([]byte, m.ChunkSizeInBytes)
|
||||
n, err := reader.Read(buf)
|
||||
dan marked this conversation as resolved
dan
commented
i'm just picturing thrashing if we're sharing the same file to multiple parties and chunk sizes are 4k. and a lot of lock waiting i'm just picturing thrashing if we're sharing the same file to multiple parties and chunk sizes are 4k. and a lot of lock waiting
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return buf[0:n], nil
|
||||
}
|
||||
|
||||
// LoadManifest reads in a json serialized Manifest from a file
|
||||
func LoadManifest(filename string) (*Manifest, error) {
|
||||
bytes, err := ioutil.ReadFile(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
manifest := new(Manifest)
|
||||
err = json.Unmarshal(bytes, manifest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
manifest.chunkComplete = make([]bool, len(manifest.Chunks))
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
// VerifyFile attempts to calculate the rootHash of a file and compare it to the expected rootHash stored in the
|
||||
// manifest
|
||||
func (m *Manifest) VerifyFile() error {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if err := m.getFileHandle(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
offset, err := m.openFd.Seek(0, 0)
|
||||
if offset != 0 || err != nil {
|
||||
return errors.New("chunk not found")
|
||||
}
|
||||
|
||||
rootHash := sha512.New()
|
||||
reader := bufio.NewReader(m.openFd)
|
||||
buf := make([]byte, m.ChunkSizeInBytes)
|
||||
for {
|
||||
n, err := reader.Read(buf)
|
||||
rootHash.Write(buf[0:n])
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
calculatedRootHash := rootHash.Sum(nil)
|
||||
if subtle.ConstantTimeCompare(m.RootHash, calculatedRootHash) != 1 {
|
||||
return fmt.Errorf("hashes do not match %x %x", m.RootHash, calculatedRootHash)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StoreChunk takes in a chunk id and contents, verifies the chunk has the expected hash and if so store the contents
|
||||
// in the file.
|
||||
func (m *Manifest) StoreChunk(id uint64, contents []byte) (uint64, error) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
// Check the chunk id
|
||||
if id >= uint64(len(m.Chunks)) {
|
||||
return 0, errors.New("invalid chunk id")
|
||||
}
|
||||
|
||||
// Validate the chunk hash
|
||||
hash := sha512.New()
|
||||
hash.Write(contents)
|
||||
chunkHash := hash.Sum(nil)
|
||||
|
||||
if subtle.ConstantTimeCompare(chunkHash, m.Chunks[id]) != 1 {
|
||||
return 0, fmt.Errorf("invalid chunk hash %x %x", chunkHash, m.Chunks[id])
|
||||
}
|
||||
|
||||
if err := m.getFileHandle(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
offset, err := m.openFd.Seek(int64(id*m.ChunkSizeInBytes), 0)
|
||||
if (uint64(offset) != id*m.ChunkSizeInBytes) || err != nil {
|
||||
return 0, errors.New("chunk not found")
|
||||
}
|
||||
|
||||
// Write the contents of the chunk to the file
|
||||
_, err = m.openFd.Write(contents)
|
||||
|
||||
if err == nil && m.chunkComplete[id] == false {
|
||||
m.chunkComplete[id] = true
|
||||
m.progress++
|
||||
}
|
||||
|
||||
return m.progress, err
|
||||
}
|
||||
|
||||
// private function to set the internal file handle
|
||||
func (m *Manifest) getFileHandle() error {
|
||||
// Seek to the chunk in the file
|
||||
if m.openFd == nil {
|
||||
fd, err := os.OpenFile(m.FileName, os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.openFd = fd
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetChunkRequest returns an uncompressed list of Chunks needed to complete the file described in the manifest
|
||||
func (m *Manifest) GetChunkRequest() ChunkSpec {
|
||||
return CreateChunkSpec(m.chunkComplete)
|
||||
}
|
||||
|
||||
// PrepareDownload creates an empty file of the expected size of the file described by the manifest
|
||||
// If the file already exists it assume it is the correct file and that it is resuming from when it left off.
|
||||
func (m *Manifest) PrepareDownload() error {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
m.chunkComplete = make([]bool, len(m.Chunks))
|
||||
|
||||
if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
|
||||
fd, err := os.Create(m.FileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.openFd = fd
|
||||
|
||||
writer := bufio.NewWriter(m.openFd)
|
||||
buf := make([]byte, m.ChunkSizeInBytes)
|
||||
for chunk := 0; chunk < len(m.Chunks)-1; chunk++ {
|
||||
_, err := writer.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
lastChunkSize := m.FileSizeInBytes % m.ChunkSizeInBytes
|
||||
if lastChunkSize > 0 {
|
||||
buf = make([]byte, lastChunkSize)
|
||||
_, err := writer.Write(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
writer.Flush()
|
||||
} else {
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if uint64(info.Size()) != m.FileSizeInBytes {
|
||||
return fmt.Errorf("file exists but is the wrong size")
|
||||
}
|
||||
|
||||
if err := m.getFileHandle(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Calculate Progress
|
||||
reader := bufio.NewReader(m.openFd)
|
||||
buf := make([]byte, m.ChunkSizeInBytes)
|
||||
chunkI := 0
|
||||
for {
|
||||
n, err := reader.Read(buf)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
hash := sha512.New()
|
||||
hash.Write(buf[0:n])
|
||||
chunkHash := hash.Sum(nil)
|
||||
m.progress = 0
|
||||
if subtle.ConstantTimeCompare(chunkHash, m.Chunks[chunkI]) == 1 {
|
||||
m.chunkComplete[chunkI] = true
|
||||
m.progress++
|
||||
}
|
||||
chunkI++
|
||||
}
|
||||
}
|
||||
dan
commented
go's only use of for loops i feel fails it here... for looping this is a bit ugly... so ... not a request, just a evaluation put forward for n, err := reader.Read(buf) ; err != nil ; n, err = reader.Read(buf) { go's only use of for loops i feel fails it here... for looping this is a bit ugly... so ... not a request, just a evaluation put forward
for n, err := reader.Read(buf) ; err != nil ; n, err = reader.Read(buf) {
...
}
if err != io.EOF {
return err
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the underlying file descriptor
|
||||
func (m *Manifest) Close() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
if m.openFd != nil {
|
||||
m.openFd.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Save writes a JSON encoded byte array version of the manifest to path
|
||||
func (m *Manifest) Save(path string) error {
|
||||
return ioutil.WriteFile(path, m.Serialize(), 0600)
|
||||
}
|
||||
|
||||
// Serialize returns the manifest as a JSON encoded byte array
|
||||
func (m *Manifest) Serialize() []byte {
|
||||
data, _ := json.Marshal(m)
|
||||
return data
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package files
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestManifest(t *testing.T) {
|
||||
manifest, err := CreateManifest("testdata/example.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("manifest create error: %v", err)
|
||||
}
|
||||
|
||||
if len(manifest.Chunks) != 1 {
|
||||
t.Fatalf("manifest had unepxected Chunks : %v", manifest.Chunks)
|
||||
}
|
||||
|
||||
if manifest.FileSizeInBytes != 12 {
|
||||
t.Fatalf("manifest had unepxected length : %v", manifest.FileSizeInBytes)
|
||||
}
|
||||
|
||||
if hex.EncodeToString(manifest.RootHash) != "861844d6704e8573fec34d967e20bcfef3d424cf48be04e6dc08f2bd58c729743371015ead891cc3cf1c9d34b49264b510751b1ff9e537937bc46b5d6ff4ecc8" {
|
||||
t.Fatalf("manifest had incorrect root Hash : %v", manifest.RootHash)
|
||||
}
|
||||
|
||||
t.Logf("%v", manifest)
|
||||
|
||||
// Try to tread the chunk
|
||||
contents, err := manifest.GetChunkBytes(1)
|
||||
if err == nil {
|
||||
t.Fatalf("chunk fetch should have thrown an error")
|
||||
}
|
||||
|
||||
contents, err = manifest.GetChunkBytes(0)
|
||||
if err != nil {
|
||||
t.Fatalf("chunk fetch error: %v", err)
|
||||
}
|
||||
contents, err = manifest.GetChunkBytes(0)
|
||||
if err != nil {
|
||||
t.Fatalf("chunk fetch error: %v", err)
|
||||
}
|
||||
|
||||
json, _ := json.Marshal(manifest)
|
||||
t.Logf("%s", json)
|
||||
|
||||
t.Logf("%s", contents)
|
||||
}
|
||||
|
||||
func TestManifestLarge(t *testing.T) {
|
||||
manifest, err := CreateManifest("testdata/cwtch.png")
|
||||
if err != nil {
|
||||
t.Fatalf("manifest create error: %v", err)
|
||||
}
|
||||
|
||||
if len(manifest.Chunks) != int(math.Ceil(float64(51791)/DefaultChunkSize)) {
|
||||
t.Fatalf("manifest had unexpected Chunks : %v", manifest.Chunks)
|
||||
}
|
||||
|
||||
if manifest.FileSizeInBytes != 51791 {
|
||||
t.Fatalf("manifest had unepxected length : %v", manifest.FileSizeInBytes)
|
||||
}
|
||||
|
||||
if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" {
|
||||
t.Fatalf("manifest had incorrect root Hash : %v", manifest.RootHash)
|
||||
}
|
||||
|
||||
t.Logf("%v", len(manifest.Chunks))
|
||||
|
||||
json, _ := json.Marshal(manifest)
|
||||
t.Logf("%v %s", len(json), json)
|
||||
|
||||
// Pretend we downloaded the manifest
|
||||
ioutil.WriteFile("testdata/cwtch.png.manifest", json, 0600)
|
||||
|
||||
// Load the manifest from a file
|
||||
cwtchPngManifest, err := LoadManifest("testdata/cwtch.png.manifest")
|
||||
if err != nil {
|
||||
t.Fatalf("manifest create error: %v", err)
|
||||
}
|
||||
defer cwtchPngManifest.Close()
|
||||
t.Logf("%v", cwtchPngManifest)
|
||||
|
||||
// Test verifying the hash
|
||||
if cwtchPngManifest.VerifyFile() != nil {
|
||||
t.Fatalf("hashes do not validate error: %v", err)
|
||||
}
|
||||
|
||||
// Prepare Download
|
||||
cwtchPngOutManifest, _ := LoadManifest("testdata/cwtch.png.manifest")
|
||||
cwtchPngOutManifest.FileName = "testdata/cwtch.out.png"
|
||||
|
||||
defer cwtchPngOutManifest.Close()
|
||||
err = cwtchPngOutManifest.PrepareDownload()
|
||||
if err != nil {
|
||||
t.Fatalf("could not prepare download %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < len(cwtchPngManifest.Chunks); i++ {
|
||||
|
||||
t.Logf("Sending Chunk %v %x from %v", i, cwtchPngManifest.Chunks[i], cwtchPngManifest.FileName)
|
||||
|
||||
contents, err := cwtchPngManifest.GetChunkBytes(uint64(i))
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("could not get chunk %v %v", i, err)
|
||||
}
|
||||
t.Logf("Progress: %v", cwtchPngOutManifest.chunkComplete)
|
||||
_, err = cwtchPngOutManifest.StoreChunk(uint64(i), contents)
|
||||
if err != nil {
|
||||
t.Fatalf("could not store chunk %v %v", i, err)
|
||||
}
|
||||
|
||||
}
|
||||
err = cwtchPngOutManifest.VerifyFile()
|
||||
if err != nil {
|
||||
t.Fatalf("could not verify file %v", err)
|
||||
}
|
||||
|
||||
// Test that changing the hash throws an error
|
||||
cwtchPngManifest.RootHash[3] = 0xFF
|
||||
if cwtchPngManifest.VerifyFile() == nil {
|
||||
t.Fatalf("hashes should not validate error")
|
||||
}
|
||||
|
||||
}
|
After Width: | Height: | Size: 51 KiB |
After Width: | Height: | Size: 51 KiB |
|
@ -0,0 +1 @@
|
|||
{"Chunks":["2614xLRT0mPJYSBejVhB+xfFMSe5sM72qSTDBn3KqmmcK89oIwVmLr+2X5K3Wpl2f1oo3byyU86zdJp//dwTVg==","H2GcT8AZcy65Bbov34aVFGini5c4awpAT9Pcj4qamN56RkHQiLdtAtY4Lo7YlD2XQvfGF7bbfTt15EUcY8G2+Q==","YmvYk+5Ds1JB6K5YYZsNCfaMn7Qciql4iWpTxgHMz2a3uTpibGTQPW/ja2B6x7hplbhyCzkqlPS5XZAM4bVxxA==","1WrybcOYtRmU3QaGv/VEjlRSj9rNvFFrB7pw4E2NvN5rDt2zjMfHTIAT69pVHUPm7oRjYPGX16w0rg9CKv7Eow==","lz5h++lfGOodW+/xwpsfQz3HdTqTZYczGqnpkpoCH/6hfUbGRlnOfWeGpovKR9c6azzJf1ciMjHCsE3T7ZiNKA==","gLf58jjGMM/SXpj/1DDk9qae/EBHuXdZ9lPZXybu0/ZNwsPHjf7hqgcVlKQnx8nfRIuy/LR5WTdGZPAklWRtbg==","ujddGeMM9b7uTIqopSW63aDZYsrvHbXluhcpBUECqr2BBxrxvOK+rHFrk8HKk1JjYWyma9UnTmWGUWj5QHGsoQ==","dYzRAfWCgrxgB82+JcqnXLiwa4tKNDb4PzBXuW5uPKhYIuydL/FOJi1SWGrBP1Es+9UmmPnFaFyy8IXVUOLeMg==","Saubl7U9KePb/4VCm2X3yFrNFoO0au93JFu0By1klmHR+3Vz7ayWX3aCdnSwLY3IBdjcsvZv9xQ0IafdSy6JSw==","enA+R1ufZNlsZrwgJb+HQQtuBLlJM0sCzpJ2lhGdTs2yYMBFpVwNo8/Z/1o4OOqUqVnlF4u/A3L7R1lgDn7VYA==","ywykeEMBsuTgLwTXu/4kQc79JDmv0LVZPo+spV0PhrNBX24PNlm7Fo3n7H83N5UMtSpTW7PtvXOyBFd+m1hxQQ==","RFahJ53i1/LWZuAFoa1ey7/qPjEeVyG6L4UQgXfKdfxUj/XoMqDXKAg8SOfwj3GgSsdTmZJ2xEMRN3BqF9Hu8g==","Fdu2XF7GIWeGejgqiMdKZEYG2lZCVBplX6Y2P8VPziAZVnc9PXvl7scLZ7/n0TkaBqLKOgruKPQyQM3ZrfmSVg=="],"FileName":"testdata/cwtch.png","RootHash":"jw7XO7sw20W2p0CxJRyuApRfSOT5kUZNXzYHaFxF3NE2oyXasuX2QpzitxXmArILWxa/dDj7YjX+/pEq3O21/Q==","FileSizeInBytes":51791,"ChunkSizeInBytes":4096}
|
|
@ -0,0 +1 @@
|
|||
Hello World!
|
|
@ -0,0 +1,8 @@
|
|||
package model
|
||||
|
||||
// PeerMessage is an encapsulation that can be used by higher level applications
|
||||
type PeerMessage struct {
|
||||
ID string // A unique Message ID (primarily used for acknowledgments)
|
||||
Context string // A unique context identifier i.e. im.cwtch.chat
|
||||
Data []byte // The serialized data packet.
|
||||
}
|
After Width: | Height: | Size: 51 KiB |
|
@ -0,0 +1,132 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/app/utils"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/functionality/filesharing"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/protocol/files"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFileSharing(t *testing.T) {
|
||||
|
||||
os.RemoveAll("cwtch.out.png")
|
||||
os.RemoveAll("cwtch.out.png.manifest")
|
||||
|
||||
log.SetLevel(log.LevelDebug)
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
dataDir := path.Join("tordir", "tor")
|
||||
os.MkdirAll(dataDir, 0700)
|
||||
|
||||
// we don't need real randomness for the port, just to avoid a possible conflict...
|
||||
mrand.Seed(int64(time.Now().Nanosecond()))
|
||||
socksPort := mrand.Intn(1000) + 9051
|
||||
controlPort := mrand.Intn(1000) + 9052
|
||||
|
||||
// generate a random password
|
||||
key := make([]byte, 64)
|
||||
_, err := rand.Read(key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
|
||||
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start Tor: %v", err)
|
||||
}
|
||||
|
||||
app := app2.NewApp(acn, "./storage")
|
||||
|
||||
usr, _ := user.Current()
|
||||
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
|
||||
os.Mkdir(cwtchDir, 0700)
|
||||
os.RemoveAll(path.Join(cwtchDir, "testing"))
|
||||
os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
|
||||
|
||||
fmt.Println("Creating Alice...")
|
||||
app.CreatePeer("alice", "asdfasdf")
|
||||
|
||||
fmt.Println("Creating Bob...")
|
||||
app.CreatePeer("bob", "asdfasdf")
|
||||
|
||||
alice := utils.WaitGetPeer(app, "alice")
|
||||
bob := utils.WaitGetPeer(app, "bob")
|
||||
|
||||
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
|
||||
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived})
|
||||
|
||||
queueOracle := event.NewQueue()
|
||||
app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle)
|
||||
|
||||
app.LaunchPeers()
|
||||
|
||||
waitTime := time.Duration(30) * time.Second
|
||||
t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime)
|
||||
time.Sleep(waitTime)
|
||||
|
||||
bob.AddContact("alice?", alice.GetOnion(), model.AuthApproved)
|
||||
alice.PeerWithOnion(bob.GetOnion())
|
||||
|
||||
fmt.Println("Waiting for alice and Bob to peer...")
|
||||
waitForPeerPeerConnection(t, alice, bob)
|
||||
|
||||
fmt.Println("Alice and Bob are Connected!!")
|
||||
|
||||
filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{"filesharing": true})
|
||||
|
||||
err = filesharingFunctionality.ShareFile("cwtch.png", alice, bob.GetOnion())
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Error!")
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
for _, message := range bob.GetContact(alice.GetOnion()).Timeline.GetMessages() {
|
||||
|
||||
var messageWrapper model.MessageWrapper
|
||||
json.Unmarshal([]byte(message.Message), &messageWrapper)
|
||||
|
||||
if messageWrapper.Overlay == model.OverlayFileSharing {
|
||||
var fileMessageOverlay filesharing.OverlayMessage
|
||||
err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay)
|
||||
|
||||
if err == nil {
|
||||
filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", fmt.Sprintf("%x.%x", fileMessageOverlay.Hash, fileMessageOverlay.Nonce))
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("Found message from Alice: %v", message.Message)
|
||||
}
|
||||
|
||||
// Wait for the file downloaded event
|
||||
ev := queueOracle.Next()
|
||||
if ev.EventType != event.FileDownloaded {
|
||||
t.Fatalf("Expected file download event")
|
||||
}
|
||||
|
||||
manifest, err := files.CreateManifest("cwtch.out.png")
|
||||
if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" {
|
||||
t.Fatalf("file hash does not match expected %x: ", manifest.RootHash)
|
||||
}
|
||||
|
||||
app.Shutdown()
|
||||
acn.Close()
|
||||
|
||||
}
|
|
@ -9,6 +9,7 @@ go test -race ${1} -coverprofile=storage.v0.cover.out -v ./storage/v0
|
|||
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
|
||||
go test -race ${1} -coverprofile=storage.cover.out -v ./storage
|
||||
go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
|
||||
go test -race ${1} -coverprofile=peer.filesharing.cover.out -v ./protocol/files
|
||||
go test -race ${1} -coverprofile=peer.cover.out -v ./peer
|
||||
echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \
|
||||
awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out
|
||||
|
|
shouldnt FunctionalityGate stuff be defined a level up in /functionality/ ? not in functionality/fileshareing? so it could be reused in the future.
Also we're moving functionality gate up into cwtch then? makes sense. double cehcking, with the assumption of porting libcwtch-go to this then?
This is because of golangs (very bad) naming convention. The true name for this struct reads
filesharing.FunctionalityGate
but if you try to call it FilesharingFunctionalityGate go quality complains because it "stutters".aaaaah i see. This seems like an interface we could be implementing defined in functionality/? i know it's just convention now but something to codify it to clear it up and make further use quicker easier more obvious for anyone else stepping into the code? or maybe over kill?