From cfb2335c05f69d9390be7132dd3f7d3dd94b7524 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 26 Jun 2023 15:21:11 -0700 Subject: [PATCH 1/5] First cut of Conversation Search --- event/common.go | 5 ++ peer/cwtch_peer.go | 59 +++++++++++++++++++ peer/cwtchprofilestorage.go | 44 ++++++++++++++ peer/profile_interface.go | 1 + .../file_sharing_integration_test.go | 43 +++++++++++--- 5 files changed, 143 insertions(+), 9 deletions(-) diff --git a/event/common.go b/event/common.go index 2095790..0271b6b 100644 --- a/event/common.go +++ b/event/common.go @@ -217,6 +217,9 @@ const ( // Heartbeat is used to trigger actions that need to happen every so often... Heartbeat = Type("Heartbeat") + + // Conversation Search + SearchResult = Type("SearchResult") ) // Field defines common event attributes @@ -298,6 +301,8 @@ const ( FilePath = Field("FilePath") FileDownloadFinished = Field("FileDownloadFinished") NameSuggestion = Field("NameSuggestion") + + SearchID = Field("SearchID") ) // Defining Common errors diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 956c323..fabd3ac 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1,6 +1,7 @@ package peer import ( + "context" "crypto/rand" "encoding/base64" "encoding/hex" @@ -75,6 +76,8 @@ type cwtchPeer struct { extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions experiments model.Experiments experimentsLock sync.Mutex + + cancelSearchContext context.CancelFunc } // EnhancedSendInviteMessage encapsulates attempting to send an invite to a conversation and looking up the enhanced message @@ -801,6 +804,62 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s return cp.storage.GetChannelMessage(conversation, channel, id) } +func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern string) { + + conversations, _ := cp.FetchConversations() + maxCount := 0 + conversationCount := map[int]int{} + for _, conversation := range conversations { + count, err := cp.storage.GetChannelMessageCount(conversation.ID, 0) + if err != nil { + log.Errorf("could not fetch channel count for conversation %d:%d: %s", conversation.ID, 0, err) + } + if maxCount > count { + maxCount = count + } + conversationCount[conversation.ID] = count + } + log.Debugf("searching messages..%v", conversationCount) + + for offset := 0; offset < (maxCount + 10); offset += 10 { + select { + case <-ctx.Done(): + return + case <-time.After(time.Millisecond): + for _, conversation := range conversations { + ccount := conversationCount[conversation.ID] + if offset > ccount { + continue + } + log.Debugf("searching messages..%v: %v offset: %v", conversation.ID, pattern, offset) + matchingMessages, err := cp.storage.SearchMessages(conversation.ID, 0, pattern, offset, 10) + if err != nil { + log.Errorf("could not fetch matching messages for conversation %d:%d: %s", conversation.ID, 0, err) + } + for _, matchingMessage := range matchingMessages { + // publish this search result... + cp.PublishEvent(event.NewEvent(event.SearchResult, map[event.Field]string{event.SearchID: searchID, event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(matchingMessage.ID)})) + log.Debugf("found matching message: %q", matchingMessage) + } + } + } + } +} + +// SearchConversation returns a message from a conversation channel referenced by the absolute ID. +// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position +// in the table (e.g. deleted messages, expired messages, etc.) +func (cp *cwtchPeer) SearchConversations(pattern string) string { + if cp.cancelSearchContext != nil { + cp.cancelSearchContext() // Cancel any current searches... + } + ctx, cancel := context.WithCancel(context.Background()) // create a new cancellable contexts... + cp.cancelSearchContext = cancel // save the cancel function... + searchID := event.GetRandNumber().String() // generate a new search id + go cp.doSearch(ctx, searchID, pattern) // perform the search in a new goroutine + return searchID // return the search id so any clients listening to the event bus can associate SearchResult events with this search +} + // GetChannelMessageCount returns the absolute number of messages in a given conversation channel func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) { return cp.storage.GetChannelMessageCount(conversation, channel) diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index bbe796e..7bf8e7b 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -61,6 +61,7 @@ type CwtchProfileStorage struct { channelGetMostRecentMessagesStmts map[ChannelID]*sql.Stmt channelGetMessageByContentHashStmts map[ChannelID]*sql.Stmt channelRowNumberStmts map[ChannelID]*sql.Stmt + channelSearchConversationSQLStmt map[ChannelID]*sql.Stmt ProfileDirectory string db *sql.DB } @@ -114,6 +115,9 @@ const getMessageCountFromConversationSQLStmt = `select count(*) from channel_%d_ // getMostRecentMessagesSQLStmt is a template for fetching the most recent N messages in a conversation channel const getMostRecentMessagesSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);` +// searchConversationSQLStmt is a template for search a conversation for the most recent N messages matching a given pattern +const searchConversationSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from (select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?)) where BODY like (?)` + // NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for // Preparing commonly used SQL Statements func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileStorage, error) { @@ -222,6 +226,7 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS channelGetMostRecentMessagesStmts: map[ChannelID]*sql.Stmt{}, channelGetCountStmts: map[ChannelID]*sql.Stmt{}, channelRowNumberStmts: map[ChannelID]*sql.Stmt{}, + channelSearchConversationSQLStmt: map[ChannelID]*sql.Stmt{}, }, nil } @@ -735,6 +740,45 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel return count, nil } +func (cps *CwtchProfileStorage) SearchMessages(conversation int, channel int, pattern string, offset int, limit int) ([]model.ConversationMessage, error) { + channelID := ChannelID{Conversation: conversation, Channel: channel} + + cps.mutex.Lock() + defer cps.mutex.Unlock() + _, exists := cps.channelSearchConversationSQLStmt[channelID] + if !exists { + conversationStmt, err := cps.db.Prepare(fmt.Sprintf(searchConversationSQLStmt, conversation, channel)) + if err != nil { + log.Errorf("error executing transaction: %v", err) + return nil, err + } + cps.channelSearchConversationSQLStmt[channelID] = conversationStmt + } + rows, err := cps.channelSearchConversationSQLStmt[channelID].Query(limit, offset, pattern) + if err != nil { + log.Errorf("error executing prepared stmt: %v", err) + return nil, err + } + var conversationMessages []model.ConversationMessage + defer rows.Close() + for { + result := rows.Next() + if !result { + return conversationMessages, nil + } + var id int + var body string + var attributes []byte + var sig string + var contenthash string + err = rows.Scan(&id, &body, &attributes, &sig, &contenthash) + if err != nil { + return conversationMessages, err + } + conversationMessages = append(conversationMessages, model.ConversationMessage{ID: id, Body: body, Attr: model.DeserializeAttributes(attributes), Signature: sig, ContentHash: contenthash}) + } +} + // GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 9079bfe..eda8130 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -131,6 +131,7 @@ type CwtchPeer interface { GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error + SearchConversations(pattern string) string // EnhancedGetMessageById returns a json-encoded enhanced message, suitable for rendering in a UI EnhancedGetMessageById(conversation int, mid int) string diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index 5287acb..bdaf7a0 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -2,6 +2,12 @@ package filesharing import ( "crypto/rand" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "path/filepath" + app2 "cwtch.im/cwtch/app" "cwtch.im/cwtch/event" "cwtch.im/cwtch/functionality/filesharing" @@ -12,13 +18,8 @@ import ( "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/files" utils2 "cwtch.im/cwtch/utils" - "encoding/base64" - "encoding/hex" - "encoding/json" - "fmt" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" - "path/filepath" // Import SQL Cipher mrand "math/rand" @@ -57,7 +58,8 @@ func TestFileSharing(t *testing.T) { os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - log.SetLevel(log.LevelDebug) + log.SetLevel(log.LevelInfo) + log.ExcludeFromPattern("tapir") os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") @@ -120,6 +122,8 @@ func TestFileSharing(t *testing.T) { alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) + aliceQueueOracle := event.NewQueue() + app.GetEventBus(alice.GetOnion()).Subscribe(event.SearchResult, aliceQueueOracle) queueOracle := event.NewQueue() app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle) @@ -147,15 +151,36 @@ func TestFileSharing(t *testing.T) { filesharingFunctionality := filesharing.FunctionalityGate() _, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice) - alice.SendMessage(1, fileSharingMessage) - if err != nil { t.Fatalf("Error!: %v", err) } + alice.SendMessage(1, fileSharingMessage) + bob.AcceptConversation(1) + // Wait for the messages to arrive... time.Sleep(time.Second * 10) + bob.SendMessage(1, "this is a test message") + bob.SendMessage(1, "this is another test message") + + // Wait for the messages to arrive... + time.Sleep(time.Second * 20) + alice.SearchConversations("%test%") + + results := 0 + for { + ev := aliceQueueOracle.Next() + if ev.EventType != event.SearchResult { + t.Fatalf("Expected a search result vent") + } + results += 1 + t.Logf("found search result (%d)....%v", results, ev) + if results == 2 { + break + } + } + // test that bob can download and verify the file testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle) @@ -180,6 +205,7 @@ func TestFileSharing(t *testing.T) { // test that we can delete bob... app.DeleteProfile(bob.GetOnion(), "asdfasdf") + aliceQueueOracle.Shutdown() queueOracle.Shutdown() app.Shutdown() acn.Close() @@ -201,7 +227,6 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - bob.AcceptConversation(1) message, _, err := bob.GetChannelMessage(1, 0, 1) if err != nil { t.Fatalf("could not find file sharing message: %v", err) From 75eb49d6eed509ff40685843639d4bbfff437c6b Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 27 Jun 2023 11:54:02 -0700 Subject: [PATCH 2/5] Fix maxCount calculation --- peer/cwtch_peer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index fabd3ac..b0da6e1 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -814,7 +814,7 @@ func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern stri if err != nil { log.Errorf("could not fetch channel count for conversation %d:%d: %s", conversation.ID, 0, err) } - if maxCount > count { + if count > maxCount { maxCount = count } conversationCount[conversation.ID] = count @@ -850,6 +850,9 @@ func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern stri // Note: This should note be used to index a list as the ID is not expected to be tied to absolute position // in the table (e.g. deleted messages, expired messages, etc.) func (cp *cwtchPeer) SearchConversations(pattern string) string { + // we need this lock here to prevent weirdness happening when reassigning cp.cancelSearchContext + cp.mutex.Lock() + defer cp.mutex.Unlock() if cp.cancelSearchContext != nil { cp.cancelSearchContext() // Cancel any current searches... } From b84de2aa612073f437c231002f02a619fc1651ad Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 11 Jul 2023 13:20:57 -0700 Subject: [PATCH 3/5] Fix bug in Engine that leaked Peer Connecting Status --- event/common.go | 1 + peer/cwtch_peer.go | 8 +++++++- protocol/connections/engine.go | 8 +++++--- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/event/common.go b/event/common.go index 0271b6b..1b9843c 100644 --- a/event/common.go +++ b/event/common.go @@ -220,6 +220,7 @@ const ( // Conversation Search SearchResult = Type("SearchResult") + SearchCancelled = Type("SearchCancelled") ) // Field defines common event attributes diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index b0da6e1..b31a374 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -805,6 +805,11 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s } func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern string) { + + // do not allow trivial searches that would match a wide variety of messages... + if len(pattern) <=5 { + return + } conversations, _ := cp.FetchConversations() maxCount := 0 @@ -824,8 +829,9 @@ func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern stri for offset := 0; offset < (maxCount + 10); offset += 10 { select { case <-ctx.Done(): + cp.PublishEvent(event.NewEvent(event.SearchCancelled, map[event.Field]string{event.SearchID: searchID})) return - case <-time.After(time.Millisecond): + case <-time.After(time.Millisecond*100): for _, conversation := range conversations { ccount := conversationCount[conversation.ID] if offset > ccount { diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index cb2030b..6b29561 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -344,10 +344,11 @@ func (e *engine) Shutdown() { log.Infof("shutting down ephemeral service") // work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a // goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits + conn := connection; // don't capture loop variable go func() { - connection.connectingLock.Lock() - connection.service.Shutdown() - connection.connectingLock.Unlock() + conn.connectingLock.Lock() + conn.service.Shutdown() + conn.connectingLock.Unlock() }() } @@ -380,6 +381,7 @@ func (e *engine) peerWithOnion(onion string) { } } } + e.ignoreOnShutdown(e.peerDisconnected)(onion) } func (e *engine) makeAntispamPayment(onion string) { From 77e4e981e879531ec53523ef53629a6999ba297c Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 11 Jul 2023 13:21:26 -0700 Subject: [PATCH 4/5] Formatting --- event/common.go | 2 +- peer/cwtch_peer.go | 6 +++--- protocol/connections/engine.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/event/common.go b/event/common.go index 1b9843c..83fd20d 100644 --- a/event/common.go +++ b/event/common.go @@ -219,7 +219,7 @@ const ( Heartbeat = Type("Heartbeat") // Conversation Search - SearchResult = Type("SearchResult") + SearchResult = Type("SearchResult") SearchCancelled = Type("SearchCancelled") ) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index b31a374..eaa66a2 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -805,9 +805,9 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s } func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern string) { - + // do not allow trivial searches that would match a wide variety of messages... - if len(pattern) <=5 { + if len(pattern) <= 5 { return } @@ -831,7 +831,7 @@ func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern stri case <-ctx.Done(): cp.PublishEvent(event.NewEvent(event.SearchCancelled, map[event.Field]string{event.SearchID: searchID})) return - case <-time.After(time.Millisecond*100): + case <-time.After(time.Millisecond * 100): for _, conversation := range conversations { ccount := conversationCount[conversation.ID] if offset > ccount { diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 6b29561..b9b7cac 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -344,7 +344,7 @@ func (e *engine) Shutdown() { log.Infof("shutting down ephemeral service") // work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a // goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits - conn := connection; // don't capture loop variable + conn := connection // don't capture loop variable go func() { conn.connectingLock.Lock() conn.service.Shutdown() From 1e0cbe1dc603fd35d7f173f5b2bb4c7657f7f632 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Thu, 13 Jul 2023 11:48:14 -0700 Subject: [PATCH 5/5] Refine Connection Logic --- protocol/connections/engine.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index b9b7cac..533a81a 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -362,6 +362,11 @@ func (e *engine) peerWithOnion(onion string) { if !e.isBlocked(onion) { e.ignoreOnShutdown(e.peerConnecting)(onion) connected, err := e.service.Connect(onion, e.createPeerTemplate()) + if connected && err == nil { + // on success CwtchPeer will handle Auth and other status updates + // early exit from this function... + return + } // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) if connected && err != nil {