From bff922df350a4baf743cc6ecbadb87ef6d2f3335 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 2 Nov 2021 12:17:56 -0700 Subject: [PATCH 1/7] Temporarily ignore timeline dedupelication for p2p messags. --- model/message.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/model/message.go b/model/message.go index 566545e..e622a54 100644 --- a/model/message.go +++ b/model/message.go @@ -186,10 +186,18 @@ func (t *Timeline) Insert(mi *Message) int { // check that we haven't seen this message before (this has no impact on p2p messages, but is essential for // group messages) - idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] - if exists { - t.Messages[idx].Acknowledged = true - return idx + // FIXME: The below code now checks if the message has a signature. If it doesn't then skip duplication check. + // We do this because p2p messages right now do not have a signature, and so many p2p messages are not stored + // with a signature. In the future in hybrid groups this check will go away as all timelines will use the same + // underlying protocol. + // This is currently safe to do because p2p does not rely on signatures and groups will verify the signature of + // messages prior to generating an event to include them in the timeline. + if len(mi.Signature) != 0 { + idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] + if exists { + t.Messages[idx].Acknowledged = true + return idx + } } // update the message store From b9eb1311d1af3ebabc0aad9cb5db5f1e9554c9bf Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 2 Nov 2021 13:41:53 -0700 Subject: [PATCH 2/7] Sort Integration Timelines prior to checking --- testing/cwtch_peer_server_integration_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index ff63524..c95eb8f 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -383,6 +383,7 @@ func TestCwtchPeerIntegration(t *testing.T) { t.Errorf("Alice's timeline does not have all messages") } else { // check message 0,1,2,3 + alicesGroup.Timeline.Sort() aliceGroupTimeline := alicesGroup.GetTimeline() if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { @@ -394,6 +395,7 @@ func TestCwtchPeerIntegration(t *testing.T) { t.Errorf("Bob's timeline does not have all messages") } else { // check message 0,1,2,3,4,5 + bobsGroup.Timeline.Sort() bobGroupTimeline := bobsGroup.GetTimeline() if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || @@ -406,6 +408,7 @@ func TestCwtchPeerIntegration(t *testing.T) { t.Errorf("Carol's timeline does not have all messages") } else { // check message 0,1,2,3,4,5 + carolsGroup.Timeline.Sort() carolGroupTimeline := carolsGroup.GetTimeline() if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || From 22ef61a8d5f1ce80fd36a48ee441e4276aedcfbc Mon Sep 17 00:00:00 2001 From: erinn Date: Tue, 2 Nov 2021 14:30:03 -0700 Subject: [PATCH 3/7] wip: file retries --- functionality/filesharing/filesharing_functionality.go | 2 +- peer/cwtch_peer.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 59f5192..8666ff8 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -43,7 +43,7 @@ type OverlayMessage struct { // to downloadFilePath func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, manifestFilePath string, key string) { profile.SetAttribute(attr.GetLocalScope(fmt.Sprintf("%s.manifest", key)), manifestFilePath) - profile.SetAttribute(attr.GetLocalScope(key), downloadFilePath) + profile.SetAttribute(attr.GetLocalScope(fmt.Sprintf("%s.path", key)), downloadFilePath) profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key)) } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index c6f6e97..ff25100 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -27,7 +27,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, - event.ManifestSizeReceived: true, event.ManifestReceived: true} + event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true} // DefaultEventsToHandle specifies which events will be subscribed to // when a peer has its Init() function called @@ -875,7 +875,9 @@ func (cp *cwtchPeer) eventHandler() { } else { log.Errorf("no download path found for manifest: %v", fileKey) } - + case event.FileDownloaded: + fileKey := ev.Data[event.FileKey] + cp.SetAttribute(fmt.Sprintf("%s.complete", fileKey), "true") case event.NewRetValMessageFromPeer: onion := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] From bf1a92528a17bffa6f11a410cc68ceec85a4c339 Mon Sep 17 00:00:00 2001 From: erinn Date: Thu, 4 Nov 2021 14:07:43 -0700 Subject: [PATCH 4/7] file resumption support --- .../filesharing/filesharing_functionality.go | 15 ++++++++++----- peer/cwtch_peer.go | 15 ++++++++++++--- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index 1099abf..0dae38a 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -2,19 +2,21 @@ package filesharing import ( "crypto/rand" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/model/attr" - "cwtch.im/cwtch/peer" - "cwtch.im/cwtch/protocol/files" "encoding/hex" "encoding/json" "errors" "fmt" - "git.openprivacy.ca/openprivacy/log" "io" "math" path "path/filepath" "strconv" + "time" + + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/files" + "git.openprivacy.ca/openprivacy/log" ) // Functionality groups some common UI triggered functions for contacts... @@ -88,6 +90,9 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl // manifest.FileName gets redacted in filesharing_subsystem (to remove the system-specific file hierarchy), // but we need to *store* the full path because the sender also uses it to locate the file lenDiff := len(filepath) - len(path.Base(filepath)) + + profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", key), strconv.FormatInt(time.Now().Unix(), 10)) + profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), string(serializedManifest)) profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize))))) profile.ShareFile(key, string(serializedManifest)) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 9020186..da72524 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1,7 +1,6 @@ package peer import ( - "cwtch.im/cwtch/model/constants" "encoding/base32" "encoding/base64" "encoding/json" @@ -13,6 +12,8 @@ import ( "sync" "time" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" @@ -879,6 +880,14 @@ func (cp *cwtchPeer) eventHandler() { val, exists = cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) } + if exists && zone == attr.FilesharingZone && strings.HasSuffix(zpath, ".manifest.size") { + fileKey := strings.TrimSuffix(zpath, ".manifest.size") + serializedManifest, exists2 := cp.GetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", fileKey)) + if exists2 { + cp.ShareFile(fileKey, serializedManifest) + } + } + resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)}) resp.EventID = ev.EventID if exists { @@ -902,7 +911,7 @@ func (cp *cwtchPeer) eventHandler() { manifestFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey)) if exists { - downloadFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fileKey) + downloadFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey)) if exists { log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath) var manifest files.Manifest @@ -939,7 +948,7 @@ func (cp *cwtchPeer) eventHandler() { } case event.FileDownloaded: fileKey := ev.Data[event.FileKey] - cp.SetAttribute(fmt.Sprintf("%s.complete", fileKey), "true") + cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true") case event.NewRetValMessageFromPeer: onion := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] From 934bfe4f6927c38f533f3309df9c6793665540c9 Mon Sep 17 00:00:00 2001 From: erinn Date: Thu, 4 Nov 2021 14:28:58 -0700 Subject: [PATCH 5/7] timeout downloads --- peer/cwtch_peer.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index da72524..2ebf6ec 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -788,6 +788,13 @@ func (cp *cwtchPeer) storeMessage(onion string, messageTxt string, sent time.Tim } func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) { + tsStr, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", fileKey)) + if exists { + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil || ts < time.Now().Unix()-2592000 { + log.Errorf("ignoring request to download a file offered more than 30 days ago") + }s + } cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest})) } From 01a2b7833e9750a76ec77577461cc6894960e1b4 Mon Sep 17 00:00:00 2001 From: erinn Date: Thu, 4 Nov 2021 14:41:00 -0700 Subject: [PATCH 6/7] code type --- peer/cwtch_peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 2ebf6ec..3bc5a9c 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -793,7 +793,7 @@ func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) { ts, err := strconv.ParseInt(tsStr, 10, 64) if err != nil || ts < time.Now().Unix()-2592000 { log.Errorf("ignoring request to download a file offered more than 30 days ago") - }s + } } cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest})) } From 8b08cabed95656c4b8d3f7648f79aef1daa2edf5 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 5 Nov 2021 15:16:47 -0700 Subject: [PATCH 7/7] AddServer return new server onion --- peer/cwtch_peer.go | 15 ++++++++------- testing/cwtch_peer_server_integration_test.go | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 3bc5a9c..862d6e1 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -214,7 +214,7 @@ type ModifyGroups interface { // ModifyServers provides write-only access to servers type ModifyServers interface { - AddServer(string) error + AddServer(string) (string, error) ResyncServer(onion string) error } @@ -452,12 +452,13 @@ func (cp *cwtchPeer) AddContact(nick, onion string, authorization model.Authoriz // AddServer takes in a serialized server specification (a bundle of related keys) and adds a contact for the // server assuming there are no errors and the contact doesn't already exist. +// Returns the onion of the new server if added // TODO in the future this function should also integrate with a trust provider to validate the key bundle. -func (cp *cwtchPeer) AddServer(serverSpecification string) error { +func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { // This confirms that the server did at least sign the bundle keyBundle, err := model.DeserializeAndVerify([]byte(serverSpecification)) if err != nil { - return err + return "", err } log.Debugf("Got new key bundle %v", keyBundle.Serialize()) @@ -465,7 +466,7 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) error { // keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups // (that way we can be assured that the keybundle we store is a valid one) if !keyBundle.HasKeyType(model.KeyTypeTokenOnion) || !keyBundle.HasKeyType(model.KeyTypeServerOnion) || !keyBundle.HasKeyType(model.KeyTypePrivacyPass) { - return errors.New("keybundle is incomplete") + return "", errors.New("keybundle is incomplete") } if keyBundle.HasKeyType(model.KeyTypeServerOnion) { @@ -503,7 +504,7 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) error { if exists { if val != v { // this is inconsistent! - return model.InconsistentKeyBundleError + return "", model.InconsistentKeyBundleError } } // we haven't seen this key associated with the server before @@ -519,9 +520,9 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) error { cp.SetContactAttribute(onion, k, v) } - return nil + return onion, nil } - return err + return "", err } // GetContacts returns an unordered list of onions diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index c95eb8f..0a77b4f 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -180,7 +180,7 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Peering, server joining, group creation / invite ***** fmt.Println("Alice joining server...") - if err := alice.AddServer(string(serverKeyBundle)); err != nil { + if _, err := alice.AddServer(string(serverKeyBundle)); err != nil { t.Fatalf("Failed to Add Server Bundle %v", err) } alice.JoinServer(ServerAddr)