diff --git a/event/common.go b/event/common.go index f5a7332..e2deea6 100644 --- a/event/common.go +++ b/event/common.go @@ -248,6 +248,14 @@ const ( // For situations where we want to update $Identity -> $RemotePeer/$GroupID's total message count to be $Data MessageCounterResync = Type("MessageCounterResync") + + // File Handling Events + ShareManifest = Type("ShareManifest") + ManifestSizeReceived = Type("ManifestSizeReceived") + ManifestReceived = Type("ManifestReceived") + ManifestSaved = Type("ManifestSaved") + FileDownloadProgressUpdate = Type("FileDownloadProgressUpdate") + FileDownloaded = Type("FileDownloaded") ) // Field defines common event attributes @@ -312,6 +320,11 @@ const ( Imported = Field("Imported") Source = Field("Source") + + FileKey = Field("FileKey") + FileSizeInChunks = Field("FileSizeInChunks") + ManifestSize = Field("ManifestSize") + SerializedManifest = Field("SerializedManifest") ) // Defining Common errors @@ -328,11 +341,15 @@ const ( // Defining Protocol Contexts const ( - ContextAck = "im.cwtch.acknowledgement" - ContextInvite = "im.cwtch.invite" - ContextRaw = "im.cwtch.raw" - ContextGetVal = "im.cwtch.getVal" - ContextRetVal = "im.cwtch.retVal" + ContextAck = "im.cwtch.acknowledgement" + ContextInvite = "im.cwtch.invite" + ContextRaw = "im.cwtch.raw" + ContextGetVal = "im.cwtch.getVal" + ContextRetVal = "im.cwtch.retVal" + ContextRequestManifest = "im.cwtch.file.request.manifest" + ContextSendManifest = "im.cwtch.file.send.manifest" + ContextRequestFile = "im.cwtch.file.request.chunk" + ContextSendFile = "im.cwtch.file.send.chunk" ) // Define Default Attribute Keys diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go new file mode 100644 index 0000000..fe5c3cf --- /dev/null +++ b/functionality/filesharing/filesharing_functionality.go @@ -0,0 +1,87 @@ +package filesharing + +import ( + "crypto/rand" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/files" + "encoding/json" + "errors" + "fmt" + "git.openprivacy.ca/openprivacy/log" + "io" + "math" + "path" + "strconv" +) + +// Functionality groups some common UI triggered functions for contacts... +type Functionality struct { +} + +// FunctionalityGate returns contact.Functionality always +func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) { + if experimentMap["filesharing"] == true { + return new(Functionality), nil + } + return nil, errors.New("filesharing is not enabled") +} + +// OverlayMessage presents the canonical format of the File Sharing functionality Overlay Message +// This is the format that the UI will parse to display the message +type OverlayMessage struct { + Name string `json:"f"` + Hash []byte `json:"h"` + Nonce []byte `json:"n"` + Size uint64 `json:"s"` +} + +// DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process +// to downloadFilePath +func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, key string) { + profile.SetAttribute(attr.GetLocalScope(key), downloadFilePath) + profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key)) +} + +// ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file +// at filepath +func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handle string) error { + manifest, err := files.CreateManifest(filepath) + if err != nil { + return err + } + + var nonce [24]byte + if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { + log.Errorf("Cannot read from random: %v\n", err) + return err + } + + message := OverlayMessage{ + Name: path.Base(manifest.FileName), + Hash: manifest.RootHash, + Nonce: nonce[:], + Size: manifest.FileSizeInBytes, + } + + data, _ := json.Marshal(message) + + wrapper := model.MessageWrapper{ + Overlay: model.OverlayFileSharing, + Data: string(data), + } + + wrapperJSON, _ := json.Marshal(wrapper) + key := fmt.Sprintf("%x.%x", manifest.RootHash, nonce) + serializedManifest, _ := json.Marshal(manifest) + profile.ShareFile(key, string(serializedManifest)) + + // Store the size of the manifest (in chunks) as part of the public scope so contacts who we share the file with + // can fetch the manifest as if it were a file. + profile.SetAttribute(attr.GetPublicScope(fmt.Sprintf("%s.manifest.size", key)), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest))/float64(files.DefaultChunkSize))))) + + profile.SendMessageToPeer(handle, string(wrapperJSON)) + + return nil +} diff --git a/model/overlay.go b/model/overlay.go new file mode 100644 index 0000000..4149798 --- /dev/null +++ b/model/overlay.go @@ -0,0 +1,19 @@ +package model + +// MessageWrapper is the canonical Cwtch overlay wrapper +type MessageWrapper struct { + Overlay int `json:"o"` + Data string `json:"d"` +} + +// OverlayChat is the canonical identifier for chat overlays +const OverlayChat = 1 + +// OverlayInviteContact is the canonical identifier for the contact invite overlay +const OverlayInviteContact = 100 + +// OverlayInviteGroup is the canonical identifier for the group invite overlay +const OverlayInviteGroup = 101 + +// OverlayFileSharing is the canonical identifier for the file sharing overlay +const OverlayFileSharing = 200 diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index a4867e4..7dd385f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -5,10 +5,12 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" + "cwtch.im/cwtch/protocol/files" "encoding/base32" "encoding/base64" "encoding/json" "errors" + "fmt" "git.openprivacy.ca/openprivacy/log" "strconv" "strings" @@ -21,7 +23,8 @@ const lastKnownSignature = "LastKnowSignature" var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, - event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true} + event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, + event.ManifestSizeReceived: true, event.ManifestReceived: true} // DefaultEventsToHandle specifies which events will be subscribed to // when a peer has its Init() function called @@ -189,6 +192,8 @@ type CwtchPeer interface { SendMessages ModifyMessages SendMessagesToGroup + + ShareFile(fileKey string, serializedManifest string) } // NewCwtchPeer creates and returns a new cwtchPeer with the given name. @@ -702,6 +707,10 @@ func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Tim cp.mutex.Unlock() } +func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) { + cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest})) +} + // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { @@ -795,6 +804,33 @@ func (cp *cwtchPeer) eventHandler() { /***** Non default but requestable handlable events *****/ + case event.ManifestReceived: + log.Debugf("Manifest Received Event!: %v", ev) + handle := ev.Data[event.Handle] + fileKey := ev.Data[event.FileKey] + serializedManifest := ev.Data[event.SerializedManifest] + + downloadFilePath, exists := cp.GetAttribute(attr.GetLocalScope(fileKey)) + if exists { + log.Debugf("downloading file to %v", downloadFilePath) + var manifest files.Manifest + err := json.Unmarshal([]byte(serializedManifest), &manifest) + if err == nil { + manifest.FileName = downloadFilePath + log.Debugf("saving manifest") + err = manifest.Save(fmt.Sprintf("%v.manifest", downloadFilePath)) + if err != nil { + log.Errorf("could not save manifest...") + } else { + cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{event.FileKey: fileKey, event.Handle: handle, event.SerializedManifest: string(manifest.Serialize())})) + } + } else { + log.Errorf("error saving manifest: %v", err) + } + } else { + log.Errorf("no download path found for manifest: %v", fileKey) + } + case event.NewRetValMessageFromPeer: onion := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] @@ -804,7 +840,12 @@ func (cp *cwtchPeer) eventHandler() { log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val) if exists { if scope == attr.PublicScope { - cp.SetContactAttribute(onion, attr.GetPeerScope(path), val) + if strings.HasSuffix(path, ".manifest.size") { + fileKey := strings.Replace(path, ".manifest.size", "", 1) + cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion})) + } else { + cp.SetContactAttribute(onion, attr.GetPeerScope(path), val) + } } } case event.PeerStateChange: diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 9d26f79..bfde30a 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -3,7 +3,9 @@ package connections import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" + "cwtch.im/cwtch/protocol/files" "cwtch.im/cwtch/protocol/groups" + model3 "cwtch.im/cwtch/protocol/model" "encoding/base64" "encoding/json" "errors" @@ -46,6 +48,9 @@ type engine struct { // Required for listen(), inaccessible from identity privateKey ed25519.PrivateKey + // file sharing subsystem is responsible for maintaining active shares and downloads + filesharingSubSystem files.FileSharingSubSystem + shuttingDown bool } @@ -92,6 +97,11 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue) + // File Handling + engine.eventManager.Subscribe(event.ShareManifest, engine.queue) + engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) + engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) + for peer, authorization := range peerAuthorizations { engine.authorizations.Store(peer, authorization) } @@ -181,6 +191,20 @@ func (e *engine) eventHandler() { e.blockUnknownContacts = true case event.ProtocolEngineStartListen: go e.listenFn() + case event.ShareManifest: + e.filesharingSubSystem.ShareFile(ev.Data[event.FileKey], ev.Data[event.SerializedManifest]) + case event.ManifestSizeReceived: + handle := ev.Data[event.Handle] + key := ev.Data[event.FileKey] + size, _ := strconv.Atoi(ev.Data[event.ManifestSize]) + go e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))) + case event.ManifestSaved: + handle := ev.Data[event.Handle] + key := ev.Data[event.FileKey] + serializedManifest := ev.Data[event.SerializedManifest] + for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest) { + go e.sendPeerMessage(handle, message) + } default: return } @@ -410,7 +434,7 @@ func (e *engine) sendMessageToPeer(eventID string, onion string, context string, if err == nil { peerApp, ok := (conn.App()).(*PeerApp) if ok { - peerApp.SendMessage(PeerMessage{eventID, context, message}) + peerApp.SendMessage(model3.PeerMessage{ID: eventID, Context: context, Data: message}) return nil } return errors.New("failed type assertion conn.App != PeerApp") @@ -419,7 +443,7 @@ func (e *engine) sendMessageToPeer(eventID string, onion string, context string, } func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error { - log.Debugf("sendGetValMessage to peer %v %v%v\n", onion, scope, path) + log.Debugf("sendGetValMessage to peer %v %v.%v\n", onion, scope, path) getVal := peerGetVal{Scope: scope, Path: path} message, err := json.Marshal(getVal) if err != nil { @@ -499,6 +523,28 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri ev.EventID = eventID e.eventManager.Publish(ev) } + } else if context == event.ContextRequestManifest { + for _, message := range e.filesharingSubSystem.RequestManifestParts(eventID) { + e.sendPeerMessage(hostname, message) + } + } else if context == event.ContextSendManifest { + if fileKey, manifest := e.filesharingSubSystem.ReceiveManifestPart(eventID, message); len(manifest) != 0 { + // We have a valid manifest + e.eventManager.Publish(event.NewEvent(event.ManifestReceived, map[event.Field]string{event.Handle: hostname, event.FileKey: fileKey, event.SerializedManifest: manifest})) + } + } else if context == event.ContextRequestFile { + for _, message := range e.filesharingSubSystem.ProcessChunkRequest(eventID, message) { + e.sendPeerMessage(hostname, message) + } + } else if context == event.ContextSendFile { + fileKey, downloaded, progress, totalChunks, _ := e.filesharingSubSystem.ProcessChunk(eventID, message) + if len(fileKey) != 0 { + if downloaded { + log.Debugf("file verified and downloaded!") + e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey})) + } + e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks))})) + } } else { e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) } @@ -530,3 +576,13 @@ func (e *engine) leaveServer(server string) { e.ephemeralServices.Delete(server) } } + +func (e *engine) sendPeerMessage(handle string, message model3.PeerMessage) { + conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability) + if err == nil { + peerApp, ok := (conn.App()).(*PeerApp) + if ok { + peerApp.SendMessage(message) + } + } +} diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index a93179c..d97a7bf 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -2,6 +2,7 @@ package connections import ( "cwtch.im/cwtch/event" + model2 "cwtch.im/cwtch/protocol/model" "encoding/json" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" @@ -27,13 +28,6 @@ type PeerApp struct { getValRequests sync.Map // [string]string eventID:Data } -// PeerMessage is an encapsulation that can be used by higher level applications -type PeerMessage struct { - ID string // A unique Message ID (primarily used for acknowledgments) - Context string // A unique context identifier i.e. im.cwtch.chat - Data []byte // The serialized data packet. -} - type peerGetVal struct { Scope, Path string } @@ -88,7 +82,7 @@ func (pa *PeerApp) listen() { pa.OnClose(pa.connection.Hostname()) return } - var peerMessage PeerMessage + var peerMessage model2.PeerMessage err := json.Unmarshal(message, &peerMessage) if err == nil { switch peerMessage.Context { @@ -106,7 +100,7 @@ func (pa *PeerApp) listen() { pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data) // Acknowledge the message - pa.SendMessage(PeerMessage{peerMessage.ID, event.ContextAck, []byte{}}) + pa.SendMessage(model2.PeerMessage{ID: peerMessage.ID, Context: event.ContextAck, Data: []byte{}}) } } } else { @@ -117,7 +111,7 @@ func (pa *PeerApp) listen() { // SendMessage sends the peer a preformatted message // NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol -func (pa *PeerApp) SendMessage(message PeerMessage) { +func (pa *PeerApp) SendMessage(message model2.PeerMessage) { if message.Context == event.ContextGetVal { pa.getValRequests.Store(message.ID, string(message.Data)) } diff --git a/protocol/files/chunkspec.go b/protocol/files/chunkspec.go new file mode 100644 index 0000000..49725b3 --- /dev/null +++ b/protocol/files/chunkspec.go @@ -0,0 +1,87 @@ +package files + +import ( + "errors" + "fmt" + "strconv" + "strings" +) + +// ChunkSpec is a wrapper around an uncompressed array of chunk identifiers +type ChunkSpec []uint64 + +// CreateChunkSpec given a full list of chunks with their downloaded status (true for downloaded, false otherwise) +// derives a list of identifiers of chunks that have not been downloaded yet +func CreateChunkSpec(progress []bool) ChunkSpec { + var chunks ChunkSpec + for i, p := range progress { + if !p { + chunks = append(chunks, uint64(i)) + } + } + return chunks +} + +// Deserialize takes in a compressed chunk spec and returns an uncompressed ChunkSpec or an error +// if the serialized chunk spec has format errors +func Deserialize(serialized string) (*ChunkSpec, error) { + + var chunkSpec ChunkSpec + + if len(serialized) == 0 { + return &chunkSpec, nil + } + + ranges := strings.Split(serialized, ",") + for _, r := range ranges { + parts := strings.Split(r, ":") + if len(parts) == 1 { + single, err := strconv.Atoi(r) + if err != nil { + return nil, errors.New("invalid chunk spec") + } + chunkSpec = append(chunkSpec, uint64(single)) + } else if len(parts) == 2 { + start, err1 := strconv.Atoi(parts[0]) + end, err2 := strconv.Atoi(parts[1]) + if err1 != nil || err2 != nil { + return nil, errors.New("invalid chunk spec") + } + for i := start; i <= end; i++ { + chunkSpec = append(chunkSpec, uint64(i)) + } + } else { + return nil, errors.New("invalid chunk spec") + } + } + return &chunkSpec, nil +} + +// Serialize compresses the ChunkSpec into a list of inclusive ranges e.g. 1,2,3,5,6,7 becomes "1:3,5:7" +func (cs ChunkSpec) Serialize() string { + result := "" + + i := 0 + + for { + if i >= len(cs) { + break + } + j := i + 1 + for ; j < len(cs) && cs[j] == cs[j-1]+1; j++ { + } + + if result != "" { + result += "," + } + + if j == i+1 { + result += fmt.Sprintf("%d", cs[i]) + } else { + result += fmt.Sprintf("%d:%d", cs[i], cs[j-1]) + } + i = j + } + + return result +} diff --git a/protocol/files/chunkspec_test.go b/protocol/files/chunkspec_test.go new file mode 100644 index 0000000..990e78f --- /dev/null +++ b/protocol/files/chunkspec_test.go @@ -0,0 +1,37 @@ +package files + +import "testing" + +func TestChunkSpec(t *testing.T) { + + var testCases = map[string]ChunkSpec{ + "0": CreateChunkSpec([]bool{false}), + "0:10": CreateChunkSpec([]bool{false, false, false, false, false, false, false, false, false, false, false}), + "0:1,3:5,7:9": CreateChunkSpec([]bool{false, false, true, false, false, false, true, false, false, false, true}), + "": CreateChunkSpec([]bool{true, true, true, true, true, true, true, true, true, true, true}), + "2,5,8,10": CreateChunkSpec([]bool{true, true, false, true, true, false, true, true, false, true, false}), + // + "0,2:10": CreateChunkSpec([]bool{false, true, false, false, false, false, false, false, false, false, false}), + "0:8,10": CreateChunkSpec([]bool{false, false, false, false, false, false, false, false, false, true, false}), + "1:9": CreateChunkSpec([]bool{true, false, false, false, false, false, false, false, false, false, true}), + } + + for k, v := range testCases { + if k != v.Serialize() { + t.Fatalf("got %v but expected %v", v.Serialize(), k) + } + t.Logf("%v == %v", k, v.Serialize()) + } + + for k, v := range testCases { + if cs, err := Deserialize(k); err != nil { + t.Fatalf("error deserialized key: %v %v", k, err) + } else { + if v.Serialize() != cs.Serialize() { + t.Fatalf("got %v but expected %v", v.Serialize(), cs.Serialize()) + } + t.Logf("%v == %v", cs.Serialize(), v.Serialize()) + } + } + +} diff --git a/protocol/files/filesharing_subsystem.go b/protocol/files/filesharing_subsystem.go new file mode 100644 index 0000000..e900fdd --- /dev/null +++ b/protocol/files/filesharing_subsystem.go @@ -0,0 +1,201 @@ +package files + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/protocol/model" + "encoding/hex" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/log" + "strconv" + "strings" + "sync" +) + +// FileSharingSubSystem encapsulates the functionality necessary to share and download files via Cwtch +// +type FileSharingSubSystem struct { + + // for sharing files + activeShares sync.Map // file key to manifest + + // for downloading files + prospectiveManifests sync.Map // file key to serialized manifests + activeDownloads sync.Map // file key to manifests +} + + +// ShareFile given a file key and a serialized manifest, allow the serialized manifest to be downloaded +// by Cwtch profiles in possession of the fileKey +func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest string) { + var manifest Manifest + err := json.Unmarshal([]byte(serializedManifest), &manifest) + if err != nil { + log.Errorf("could not share file %v", err) + return + } + fsss.activeShares.Store(fileKey, &manifest) +} + +// FetchManifest given a file key and knowledge of the manifest size in chunks (obtained via an attribute lookup) +// construct a request to download the manifest. +func (fsss *FileSharingSubSystem) FetchManifest(fileKey string, manifestSize uint64) model.PeerMessage { + fsss.prospectiveManifests.Store(fileKey, strings.Repeat("\"", int(manifestSize*DefaultChunkSize))) + return model.PeerMessage{ + Context: event.ContextRequestManifest, + ID: fileKey, + Data: []byte{}, + } +} + +// CompileChunkRequests takes in a complete serializedManifest and returns a set of chunk request messages +// TODO in the future we will want this to return the handles of contacts to request chunks from +func (fsss *FileSharingSubSystem) CompileChunkRequests(fileKey string, serializedManifest string) []model.PeerMessage { + var manifest Manifest + err := json.Unmarshal([]byte(serializedManifest), &manifest) + var messages []model.PeerMessage + if err == nil { + err := manifest.PrepareDownload() + if err == nil { + fsss.activeDownloads.Store(fileKey, &manifest) + log.Debugf("downloading file chunks: %v", manifest.GetChunkRequest().Serialize()) + messages = append(messages, model.PeerMessage{ + ID: fileKey, + Context: event.ContextRequestFile, + Data: []byte(manifest.GetChunkRequest().Serialize()), + }) + } + } + return messages +} + +// RequestManifestParts given a fileKey construct a set of messages representing requests to download various +// parts of the Manifest +func (fsss *FileSharingSubSystem) RequestManifestParts(fileKey string) []model.PeerMessage { + manifestI, exists := fsss.activeShares.Load(fileKey) + var messages []model.PeerMessage + if exists { + manifest := manifestI.(*Manifest) + serializedManifest := manifest.Serialize() + log.Debugf("found serialized manifest: %s", serializedManifest) + for i := 0; i < len(serializedManifest); i += DefaultChunkSize { + offset := i * DefaultChunkSize + end := (i + 1) * DefaultChunkSize + if end > len(serializedManifest) { + end = len(serializedManifest) + } + chunk := serializedManifest[offset:end] + messages = append(messages, model.PeerMessage{ + Context: event.ContextSendManifest, + ID: fmt.Sprintf("%s.%d", fileKey, uint64(i)), + Data: chunk, + }) + } + } + return messages +} + +// ReceiveManifestPart given a manifestKey reconstruct part the manifest from the provided part +func (fsss *FileSharingSubSystem) ReceiveManifestPart(manifestKey string, part []byte) (fileKey string, serializedManifest string) { + fileKeyParts := strings.Split(manifestKey, ".") + if len(fileKeyParts) == 3 { + fileKey = fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1]) + log.Debugf("manifest filekey: %s", fileKey) + manifestPart, err := strconv.Atoi(fileKeyParts[2]) + if err == nil { + serializedManifest, exists := fsss.prospectiveManifests.Load(fileKey) + if exists { + serializedManifest := serializedManifest.(string) + log.Debugf("loaded manifest") + offset := manifestPart * DefaultChunkSize + end := (manifestPart + 1) * DefaultChunkSize + + log.Debugf("storing manifest part %v %v", offset, end) + serializedManifestBytes := []byte(serializedManifest) + copy(serializedManifestBytes[offset:end], part[:]) + + if len(part) < DefaultChunkSize { + serializedManifestBytes = serializedManifestBytes[0 : len(serializedManifestBytes)-(DefaultChunkSize-len(part))] + } + + serializedManifest = string(serializedManifestBytes) + fsss.prospectiveManifests.Store(fileKey, serializedManifest) + log.Debugf("current manifest: [%s]", serializedManifest) + var manifest Manifest + err := json.Unmarshal([]byte(serializedManifest), &manifest) + if err == nil && hex.EncodeToString(manifest.RootHash) == fileKeyParts[0] { + log.Debugf("valid manifest received! %x", manifest.RootHash) + return fileKey, serializedManifest + } + } + } + } + return "", "" +} + +// ProcessChunkRequest given a fileKey, and a chunk request, compile a set of responses for each requested Chunk +func (fsss *FileSharingSubSystem) ProcessChunkRequest(fileKey string, serializedChunkRequest []byte) []model.PeerMessage { + log.Debugf("chunk request: %v", fileKey) + manifestI, exists := fsss.activeShares.Load(fileKey) + var messages []model.PeerMessage + if exists { + manifest := manifestI.(*Manifest) + log.Debugf("manifest found: %x", manifest.RootHash) + chunkSpec, err := Deserialize(string(serializedChunkRequest)) + log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, serializedChunkRequest) + if err == nil { + for _, chunk := range *chunkSpec { + contents, err := manifest.GetChunkBytes(chunk) + if err == nil { + log.Debugf("sending chunk: %v %x", chunk, contents) + messages = append(messages, model.PeerMessage{ + ID: fmt.Sprintf("%v.%d", fileKey, chunk), + Context: event.ContextSendFile, + Data: contents, + }) + } + } + } + } + return messages +} + +// ProcessChunk given a chunk key and a chunk attempt to store and verify the chunk as part of an active download +// If this results in the file download being completed return downloaded = true +// Always return the progress of a matched download if it exists along with the total number of chunks and the +// given chunk ID +// If not such active download exists then return an empty file key and ignore all further processing. +func (fsss *FileSharingSubSystem) ProcessChunk(chunkKey string, chunk []byte) (fileKey string, downloaded bool, progress uint64, totalChunks uint64, chunkID uint64) { + fileKeyParts := strings.Split(chunkKey, ".") + downloaded = false + log.Debugf("got chunk for %s", fileKeyParts) + if len(fileKeyParts) == 3 { + fileKey = fmt.Sprintf("%s.%s", fileKeyParts[0], fileKeyParts[1]) + derivedChunkID, err := strconv.Atoi(fileKeyParts[2]) + if err == nil { + log.Debugf("got chunk id %d", chunkID) + chunkID = uint64(derivedChunkID) + manifestI, exists := fsss.activeDownloads.Load(fileKey) + if exists { + manifest := manifestI.(*Manifest) + log.Debugf("found active manifest %v", manifest) + progress, err = manifest.StoreChunk(uint64(chunkID), chunk) + totalChunks = uint64(len(manifest.Chunks)) + log.Debugf("attempts to store chunk %v %v", progress, err) + if err == nil { + if progress == totalChunks { + if manifest.VerifyFile() == nil { + manifest.Close() + fsss.activeDownloads.Delete(fileKey) + log.Debugf("file verified and downloaded!") + downloaded = true + } + } + } else { + // TODO if a chunk fails to save (possibly because its data hash didn't match the manifest), re-request it + } + } + } + } + return +} diff --git a/protocol/files/manifest.go b/protocol/files/manifest.go new file mode 100644 index 0000000..67d5fc6 --- /dev/null +++ b/protocol/files/manifest.go @@ -0,0 +1,312 @@ +package files + +import ( + "bufio" + "crypto/sha512" + "crypto/subtle" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "sync" +) + +// Chunk is a wrapper around a hash +type Chunk []byte + +// DefaultChunkSize is the default value of a manifest chunk +const DefaultChunkSize = 4096 + +// Manifest is a collection of hashes and other metadata needed to reconstruct a file and verify contents given a root hash +type Manifest struct { + Chunks []Chunk + FileName string + RootHash []byte + FileSizeInBytes uint64 + ChunkSizeInBytes uint64 + + chunkComplete []bool + openFd *os.File + progress uint64 + lock sync.Mutex +} + +// CreateManifest takes in a file path and constructs a file sharing manifest of hashes along with +// other information necessary to download, reconstruct and verify the file. +func CreateManifest(path string) (*Manifest, error) { + + // Process file into Chunks + f, err := os.Open(path) + + if err != nil { + return nil, err + } + + defer f.Close() + + reader := bufio.NewReader(f) + buf := make([]byte, DefaultChunkSize) + + var chunks []Chunk + fileSizeInBytes := uint64(0) + + rootHash := sha512.New() + + for { + n, err := reader.Read(buf) + if err != nil { + if err != io.EOF { + return nil, err + } + break + } + hash := sha512.New() + hash.Write(buf[0:n]) + rootHash.Write(buf[0:n]) + chunkHash := hash.Sum(nil) + chunks = append(chunks, chunkHash) + fileSizeInBytes += uint64(n) + } + + return &Manifest{ + Chunks: chunks, + FileName: path, + RootHash: rootHash.Sum(nil), + ChunkSizeInBytes: DefaultChunkSize, + FileSizeInBytes: fileSizeInBytes, + chunkComplete: make([]bool, len(chunks)), + }, nil +} + +// GetChunkBytes takes in a chunk identifier and returns the bytes associated with that chunk +// it does not attempt to validate the chunk Hash. +func (m *Manifest) GetChunkBytes(id uint64) ([]byte, error) { + m.lock.Lock() + defer m.lock.Unlock() + if id >= uint64(len(m.Chunks)) { + return nil, errors.New("chunk not found") + } + + if err := m.getFileHandle(); err != nil { + return nil, err + } + + // Seek to Chunk + offset, err := m.openFd.Seek(int64(id*m.ChunkSizeInBytes), 0) + if (uint64(offset) != id*m.ChunkSizeInBytes) || err != nil { + return nil, errors.New("chunk not found") + } + + // Read chunk into memory and return... + reader := bufio.NewReader(m.openFd) + buf := make([]byte, m.ChunkSizeInBytes) + n, err := reader.Read(buf) + if err != nil { + if err != io.EOF { + return nil, err + } + } + return buf[0:n], nil +} + +// LoadManifest reads in a json serialized Manifest from a file +func LoadManifest(filename string) (*Manifest, error) { + bytes, err := ioutil.ReadFile(filename) + if err != nil { + return nil, err + } + manifest := new(Manifest) + err = json.Unmarshal(bytes, manifest) + if err != nil { + return nil, err + } + manifest.chunkComplete = make([]bool, len(manifest.Chunks)) + return manifest, nil +} + +// VerifyFile attempts to calculate the rootHash of a file and compare it to the expected rootHash stored in the +// manifest +func (m *Manifest) VerifyFile() error { + m.lock.Lock() + defer m.lock.Unlock() + if err := m.getFileHandle(); err != nil { + return err + } + + offset, err := m.openFd.Seek(0, 0) + if offset != 0 || err != nil { + return errors.New("chunk not found") + } + + rootHash := sha512.New() + reader := bufio.NewReader(m.openFd) + buf := make([]byte, m.ChunkSizeInBytes) + for { + n, err := reader.Read(buf) + rootHash.Write(buf[0:n]) + if err != nil { + if err != io.EOF { + return err + } + break + } + } + + calculatedRootHash := rootHash.Sum(nil) + if subtle.ConstantTimeCompare(m.RootHash, calculatedRootHash) != 1 { + return fmt.Errorf("hashes do not match %x %x", m.RootHash, calculatedRootHash) + } + return nil +} + +// StoreChunk takes in a chunk id and contents, verifies the chunk has the expected hash and if so store the contents +// in the file. +func (m *Manifest) StoreChunk(id uint64, contents []byte) (uint64, error) { + m.lock.Lock() + defer m.lock.Unlock() + // Check the chunk id + if id >= uint64(len(m.Chunks)) { + return 0, errors.New("invalid chunk id") + } + + // Validate the chunk hash + hash := sha512.New() + hash.Write(contents) + chunkHash := hash.Sum(nil) + + if subtle.ConstantTimeCompare(chunkHash, m.Chunks[id]) != 1 { + return 0, fmt.Errorf("invalid chunk hash %x %x", chunkHash, m.Chunks[id]) + } + + if err := m.getFileHandle(); err != nil { + return 0, err + } + + offset, err := m.openFd.Seek(int64(id*m.ChunkSizeInBytes), 0) + if (uint64(offset) != id*m.ChunkSizeInBytes) || err != nil { + return 0, errors.New("chunk not found") + } + + // Write the contents of the chunk to the file + _, err = m.openFd.Write(contents) + + if err == nil && m.chunkComplete[id] == false { + m.chunkComplete[id] = true + m.progress++ + } + + return m.progress, err +} + +// private function to set the internal file handle +func (m *Manifest) getFileHandle() error { + // Seek to the chunk in the file + if m.openFd == nil { + fd, err := os.OpenFile(m.FileName, os.O_RDWR, 0600) + if err != nil { + return err + } + m.openFd = fd + } + return nil +} + +// GetChunkRequest returns an uncompressed list of Chunks needed to complete the file described in the manifest +func (m *Manifest) GetChunkRequest() ChunkSpec { + return CreateChunkSpec(m.chunkComplete) +} + +// PrepareDownload creates an empty file of the expected size of the file described by the manifest +// If the file already exists it assume it is the correct file and that it is resuming from when it left off. +func (m *Manifest) PrepareDownload() error { + m.lock.Lock() + defer m.lock.Unlock() + + m.chunkComplete = make([]bool, len(m.Chunks)) + + if info, err := os.Stat(m.FileName); os.IsNotExist(err) { + fd, err := os.Create(m.FileName) + if err != nil { + return err + } + m.openFd = fd + + writer := bufio.NewWriter(m.openFd) + buf := make([]byte, m.ChunkSizeInBytes) + for chunk := 0; chunk < len(m.Chunks)-1; chunk++ { + _, err := writer.Write(buf) + if err != nil { + return err + } + } + + lastChunkSize := m.FileSizeInBytes % m.ChunkSizeInBytes + if lastChunkSize > 0 { + buf = make([]byte, lastChunkSize) + _, err := writer.Write(buf) + if err != nil { + return err + } + } + writer.Flush() + } else { + + if err != nil { + return err + } + + if uint64(info.Size()) != m.FileSizeInBytes { + return fmt.Errorf("file exists but is the wrong size") + } + + if err := m.getFileHandle(); err != nil { + return err + } + + // Calculate Progress + reader := bufio.NewReader(m.openFd) + buf := make([]byte, m.ChunkSizeInBytes) + chunkI := 0 + for { + n, err := reader.Read(buf) + if err != nil { + if err != io.EOF { + return err + } + break + } + hash := sha512.New() + hash.Write(buf[0:n]) + chunkHash := hash.Sum(nil) + m.progress = 0 + if subtle.ConstantTimeCompare(chunkHash, m.Chunks[chunkI]) == 1 { + m.chunkComplete[chunkI] = true + m.progress++ + } + chunkI++ + } + } + return nil +} + +// Close closes the underlying file descriptor +func (m *Manifest) Close() { + m.lock.Lock() + defer m.lock.Unlock() + if m.openFd != nil { + m.openFd.Close() + } +} + +// Save writes a JSON encoded byte array version of the manifest to path +func (m *Manifest) Save(path string) error { + return ioutil.WriteFile(path, m.Serialize(), 0600) +} + +// Serialize returns the manifest as a JSON encoded byte array +func (m *Manifest) Serialize() []byte { + data, _ := json.Marshal(m) + return data +} diff --git a/protocol/files/manifest_test.go b/protocol/files/manifest_test.go new file mode 100644 index 0000000..c23e8b6 --- /dev/null +++ b/protocol/files/manifest_test.go @@ -0,0 +1,128 @@ +package files + +import ( + "encoding/hex" + "encoding/json" + "io/ioutil" + "math" + "testing" +) + +func TestManifest(t *testing.T) { + manifest, err := CreateManifest("testdata/example.txt") + if err != nil { + t.Fatalf("manifest create error: %v", err) + } + + if len(manifest.Chunks) != 1 { + t.Fatalf("manifest had unepxected Chunks : %v", manifest.Chunks) + } + + if manifest.FileSizeInBytes != 12 { + t.Fatalf("manifest had unepxected length : %v", manifest.FileSizeInBytes) + } + + if hex.EncodeToString(manifest.RootHash) != "861844d6704e8573fec34d967e20bcfef3d424cf48be04e6dc08f2bd58c729743371015ead891cc3cf1c9d34b49264b510751b1ff9e537937bc46b5d6ff4ecc8" { + t.Fatalf("manifest had incorrect root Hash : %v", manifest.RootHash) + } + + t.Logf("%v", manifest) + + // Try to tread the chunk + contents, err := manifest.GetChunkBytes(1) + if err == nil { + t.Fatalf("chunk fetch should have thrown an error") + } + + contents, err = manifest.GetChunkBytes(0) + if err != nil { + t.Fatalf("chunk fetch error: %v", err) + } + contents, err = manifest.GetChunkBytes(0) + if err != nil { + t.Fatalf("chunk fetch error: %v", err) + } + + json, _ := json.Marshal(manifest) + t.Logf("%s", json) + + t.Logf("%s", contents) +} + +func TestManifestLarge(t *testing.T) { + manifest, err := CreateManifest("testdata/cwtch.png") + if err != nil { + t.Fatalf("manifest create error: %v", err) + } + + if len(manifest.Chunks) != int(math.Ceil(float64(51791)/DefaultChunkSize)) { + t.Fatalf("manifest had unexpected Chunks : %v", manifest.Chunks) + } + + if manifest.FileSizeInBytes != 51791 { + t.Fatalf("manifest had unepxected length : %v", manifest.FileSizeInBytes) + } + + if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" { + t.Fatalf("manifest had incorrect root Hash : %v", manifest.RootHash) + } + + t.Logf("%v", len(manifest.Chunks)) + + json, _ := json.Marshal(manifest) + t.Logf("%v %s", len(json), json) + + // Pretend we downloaded the manifest + ioutil.WriteFile("testdata/cwtch.png.manifest", json, 0600) + + // Load the manifest from a file + cwtchPngManifest, err := LoadManifest("testdata/cwtch.png.manifest") + if err != nil { + t.Fatalf("manifest create error: %v", err) + } + defer cwtchPngManifest.Close() + t.Logf("%v", cwtchPngManifest) + + // Test verifying the hash + if cwtchPngManifest.VerifyFile() != nil { + t.Fatalf("hashes do not validate error: %v", err) + } + + // Prepare Download + cwtchPngOutManifest, _ := LoadManifest("testdata/cwtch.png.manifest") + cwtchPngOutManifest.FileName = "testdata/cwtch.out.png" + + defer cwtchPngOutManifest.Close() + err = cwtchPngOutManifest.PrepareDownload() + if err != nil { + t.Fatalf("could not prepare download %v", err) + } + + for i := 0; i < len(cwtchPngManifest.Chunks); i++ { + + t.Logf("Sending Chunk %v %x from %v", i, cwtchPngManifest.Chunks[i], cwtchPngManifest.FileName) + + contents, err := cwtchPngManifest.GetChunkBytes(uint64(i)) + + if err != nil { + t.Fatalf("could not get chunk %v %v", i, err) + } + t.Logf("Progress: %v", cwtchPngOutManifest.chunkComplete) + _, err = cwtchPngOutManifest.StoreChunk(uint64(i), contents) + if err != nil { + t.Fatalf("could not store chunk %v %v", i, err) + } + + } + err = cwtchPngOutManifest.VerifyFile() + if err != nil { + t.Fatalf("could not verify file %v", err) + } + + // Test that changing the hash throws an error + cwtchPngManifest.RootHash[3] = 0xFF + if cwtchPngManifest.VerifyFile() == nil { + t.Fatalf("hashes should not validate error") + } + +} diff --git a/protocol/files/testdata/cwtch.out.png b/protocol/files/testdata/cwtch.out.png new file mode 100644 index 0000000..e50812f Binary files /dev/null and b/protocol/files/testdata/cwtch.out.png differ diff --git a/protocol/files/testdata/cwtch.png b/protocol/files/testdata/cwtch.png new file mode 100644 index 0000000..e50812f Binary files /dev/null and b/protocol/files/testdata/cwtch.png differ diff --git a/protocol/files/testdata/cwtch.png.manifest b/protocol/files/testdata/cwtch.png.manifest new file mode 100644 index 0000000..826a093 --- /dev/null +++ b/protocol/files/testdata/cwtch.png.manifest @@ -0,0 +1 @@ +{"Chunks":["2614xLRT0mPJYSBejVhB+xfFMSe5sM72qSTDBn3KqmmcK89oIwVmLr+2X5K3Wpl2f1oo3byyU86zdJp//dwTVg==","H2GcT8AZcy65Bbov34aVFGini5c4awpAT9Pcj4qamN56RkHQiLdtAtY4Lo7YlD2XQvfGF7bbfTt15EUcY8G2+Q==","YmvYk+5Ds1JB6K5YYZsNCfaMn7Qciql4iWpTxgHMz2a3uTpibGTQPW/ja2B6x7hplbhyCzkqlPS5XZAM4bVxxA==","1WrybcOYtRmU3QaGv/VEjlRSj9rNvFFrB7pw4E2NvN5rDt2zjMfHTIAT69pVHUPm7oRjYPGX16w0rg9CKv7Eow==","lz5h++lfGOodW+/xwpsfQz3HdTqTZYczGqnpkpoCH/6hfUbGRlnOfWeGpovKR9c6azzJf1ciMjHCsE3T7ZiNKA==","gLf58jjGMM/SXpj/1DDk9qae/EBHuXdZ9lPZXybu0/ZNwsPHjf7hqgcVlKQnx8nfRIuy/LR5WTdGZPAklWRtbg==","ujddGeMM9b7uTIqopSW63aDZYsrvHbXluhcpBUECqr2BBxrxvOK+rHFrk8HKk1JjYWyma9UnTmWGUWj5QHGsoQ==","dYzRAfWCgrxgB82+JcqnXLiwa4tKNDb4PzBXuW5uPKhYIuydL/FOJi1SWGrBP1Es+9UmmPnFaFyy8IXVUOLeMg==","Saubl7U9KePb/4VCm2X3yFrNFoO0au93JFu0By1klmHR+3Vz7ayWX3aCdnSwLY3IBdjcsvZv9xQ0IafdSy6JSw==","enA+R1ufZNlsZrwgJb+HQQtuBLlJM0sCzpJ2lhGdTs2yYMBFpVwNo8/Z/1o4OOqUqVnlF4u/A3L7R1lgDn7VYA==","ywykeEMBsuTgLwTXu/4kQc79JDmv0LVZPo+spV0PhrNBX24PNlm7Fo3n7H83N5UMtSpTW7PtvXOyBFd+m1hxQQ==","RFahJ53i1/LWZuAFoa1ey7/qPjEeVyG6L4UQgXfKdfxUj/XoMqDXKAg8SOfwj3GgSsdTmZJ2xEMRN3BqF9Hu8g==","Fdu2XF7GIWeGejgqiMdKZEYG2lZCVBplX6Y2P8VPziAZVnc9PXvl7scLZ7/n0TkaBqLKOgruKPQyQM3ZrfmSVg=="],"FileName":"testdata/cwtch.png","RootHash":"jw7XO7sw20W2p0CxJRyuApRfSOT5kUZNXzYHaFxF3NE2oyXasuX2QpzitxXmArILWxa/dDj7YjX+/pEq3O21/Q==","FileSizeInBytes":51791,"ChunkSizeInBytes":4096} \ No newline at end of file diff --git a/protocol/files/testdata/example.txt b/protocol/files/testdata/example.txt new file mode 100644 index 0000000..c57eff5 --- /dev/null +++ b/protocol/files/testdata/example.txt @@ -0,0 +1 @@ +Hello World! \ No newline at end of file diff --git a/protocol/model/peermessage.go.go b/protocol/model/peermessage.go.go new file mode 100644 index 0000000..6d3bfa2 --- /dev/null +++ b/protocol/model/peermessage.go.go @@ -0,0 +1,8 @@ +package model + +// PeerMessage is an encapsulation that can be used by higher level applications +type PeerMessage struct { + ID string // A unique Message ID (primarily used for acknowledgments) + Context string // A unique context identifier i.e. im.cwtch.chat + Data []byte // The serialized data packet. +} diff --git a/testing/cwtch.png b/testing/cwtch.png new file mode 100644 index 0000000..e50812f Binary files /dev/null and b/testing/cwtch.png differ diff --git a/testing/file_sharing_integration_test.go b/testing/file_sharing_integration_test.go new file mode 100644 index 0000000..305ba00 --- /dev/null +++ b/testing/file_sharing_integration_test.go @@ -0,0 +1,132 @@ +package testing + +import ( + "crypto/rand" + app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/app/utils" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/functionality/filesharing" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/protocol/files" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "git.openprivacy.ca/openprivacy/log" + mrand "math/rand" + "os" + "os/user" + "path" + "testing" + "time" +) + +func TestFileSharing(t *testing.T) { + + os.RemoveAll("cwtch.out.png") + os.RemoveAll("cwtch.out.png.manifest") + + log.SetLevel(log.LevelDebug) + + os.Mkdir("tordir", 0700) + dataDir := path.Join("tordir", "tor") + os.MkdirAll(dataDir, 0700) + + // we don't need real randomness for the port, just to avoid a possible conflict... + mrand.Seed(int64(time.Now().Nanosecond())) + socksPort := mrand.Intn(1000) + 9051 + controlPort := mrand.Intn(1000) + 9052 + + // generate a random password + key := make([]byte, 64) + _, err := rand.Read(key) + if err != nil { + panic(err) + } + + tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") + acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + if err != nil { + t.Fatalf("Could not start Tor: %v", err) + } + + app := app2.NewApp(acn, "./storage") + + usr, _ := user.Current() + cwtchDir := path.Join(usr.HomeDir, ".cwtch") + os.Mkdir(cwtchDir, 0700) + os.RemoveAll(path.Join(cwtchDir, "testing")) + os.Mkdir(path.Join(cwtchDir, "testing"), 0700) + + fmt.Println("Creating Alice...") + app.CreatePeer("alice", "asdfasdf") + + fmt.Println("Creating Bob...") + app.CreatePeer("bob", "asdfasdf") + + alice := utils.WaitGetPeer(app, "alice") + bob := utils.WaitGetPeer(app, "bob") + + alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) + bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived}) + + queueOracle := event.NewQueue() + app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle) + + app.LaunchPeers() + + waitTime := time.Duration(30) * time.Second + t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime) + time.Sleep(waitTime) + + bob.AddContact("alice?", alice.GetOnion(), model.AuthApproved) + alice.PeerWithOnion(bob.GetOnion()) + + fmt.Println("Waiting for alice and Bob to peer...") + waitForPeerPeerConnection(t, alice, bob) + + fmt.Println("Alice and Bob are Connected!!") + + filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{"filesharing": true}) + + err = filesharingFunctionality.ShareFile("cwtch.png", alice, bob.GetOnion()) + + if err != nil { + t.Fatalf("Error!") + } + + time.Sleep(time.Second * 10) + + for _, message := range bob.GetContact(alice.GetOnion()).Timeline.GetMessages() { + + var messageWrapper model.MessageWrapper + json.Unmarshal([]byte(message.Message), &messageWrapper) + + if messageWrapper.Overlay == model.OverlayFileSharing { + var fileMessageOverlay filesharing.OverlayMessage + err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay) + + if err == nil { + filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", fmt.Sprintf("%x.%x", fileMessageOverlay.Hash, fileMessageOverlay.Nonce)) + } + } + + fmt.Printf("Found message from Alice: %v", message.Message) + } + + // Wait for the file downloaded event + ev := queueOracle.Next() + if ev.EventType != event.FileDownloaded { + t.Fatalf("Expected file download event") + } + + manifest, err := files.CreateManifest("cwtch.out.png") + if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" { + t.Fatalf("file hash does not match expected %x: ", manifest.RootHash) + } + + app.Shutdown() + acn.Close() + +} diff --git a/testing/tests.sh b/testing/tests.sh index fd268a8..8a3d9a7 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -9,6 +9,7 @@ go test -race ${1} -coverprofile=storage.v0.cover.out -v ./storage/v0 go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1 go test -race ${1} -coverprofile=storage.cover.out -v ./storage go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections +go test -race ${1} -coverprofile=peer.filesharing.cover.out -v ./protocol/files go test -race ${1} -coverprofile=peer.cover.out -v ./peer echo "mode: set" > coverage.out && cat *.cover.out | grep -v mode: | sort -r | \ awk '{if($1 != last) {print $0;last=$1}}' >> coverage.out