From cfb2335c05f69d9390be7132dd3f7d3dd94b7524 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 26 Jun 2023 15:21:11 -0700 Subject: [PATCH] First cut of Conversation Search --- event/common.go | 5 ++ peer/cwtch_peer.go | 59 +++++++++++++++++++ peer/cwtchprofilestorage.go | 44 ++++++++++++++ peer/profile_interface.go | 1 + .../file_sharing_integration_test.go | 43 +++++++++++--- 5 files changed, 143 insertions(+), 9 deletions(-) diff --git a/event/common.go b/event/common.go index 2095790..0271b6b 100644 --- a/event/common.go +++ b/event/common.go @@ -217,6 +217,9 @@ const ( // Heartbeat is used to trigger actions that need to happen every so often... Heartbeat = Type("Heartbeat") + + // Conversation Search + SearchResult = Type("SearchResult") ) // Field defines common event attributes @@ -298,6 +301,8 @@ const ( FilePath = Field("FilePath") FileDownloadFinished = Field("FileDownloadFinished") NameSuggestion = Field("NameSuggestion") + + SearchID = Field("SearchID") ) // Defining Common errors diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 956c323..fabd3ac 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1,6 +1,7 @@ package peer import ( + "context" "crypto/rand" "encoding/base64" "encoding/hex" @@ -75,6 +76,8 @@ type cwtchPeer struct { extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions experiments model.Experiments experimentsLock sync.Mutex + + cancelSearchContext context.CancelFunc } // EnhancedSendInviteMessage encapsulates attempting to send an invite to a conversation and looking up the enhanced message @@ -801,6 +804,62 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s return cp.storage.GetChannelMessage(conversation, channel, id) } +func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern string) { + + conversations, _ := cp.FetchConversations() + maxCount := 0 + conversationCount := map[int]int{} + for _, conversation := range conversations { + count, err := cp.storage.GetChannelMessageCount(conversation.ID, 0) + if err != nil { + log.Errorf("could not fetch channel count for conversation %d:%d: %s", conversation.ID, 0, err) + } + if maxCount > count { + maxCount = count + } + conversationCount[conversation.ID] = count + } + log.Debugf("searching messages..%v", conversationCount) + + for offset := 0; offset < (maxCount + 10); offset += 10 { + select { + case <-ctx.Done(): + return + case <-time.After(time.Millisecond): + for _, conversation := range conversations { + ccount := conversationCount[conversation.ID] + if offset > ccount { + continue + } + log.Debugf("searching messages..%v: %v offset: %v", conversation.ID, pattern, offset) + matchingMessages, err := cp.storage.SearchMessages(conversation.ID, 0, pattern, offset, 10) + if err != nil { + log.Errorf("could not fetch matching messages for conversation %d:%d: %s", conversation.ID, 0, err) + } + for _, matchingMessage := range matchingMessages { + // publish this search result... + cp.PublishEvent(event.NewEvent(event.SearchResult, map[event.Field]string{event.SearchID: searchID, event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(matchingMessage.ID)})) + log.Debugf("found matching message: %q", matchingMessage) + } + } + } + } +} + +// SearchConversation returns a message from a conversation channel referenced by the absolute ID. +// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position +// in the table (e.g. deleted messages, expired messages, etc.) +func (cp *cwtchPeer) SearchConversations(pattern string) string { + if cp.cancelSearchContext != nil { + cp.cancelSearchContext() // Cancel any current searches... + } + ctx, cancel := context.WithCancel(context.Background()) // create a new cancellable contexts... + cp.cancelSearchContext = cancel // save the cancel function... + searchID := event.GetRandNumber().String() // generate a new search id + go cp.doSearch(ctx, searchID, pattern) // perform the search in a new goroutine + return searchID // return the search id so any clients listening to the event bus can associate SearchResult events with this search +} + // GetChannelMessageCount returns the absolute number of messages in a given conversation channel func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) { return cp.storage.GetChannelMessageCount(conversation, channel) diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index bbe796e..7bf8e7b 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -61,6 +61,7 @@ type CwtchProfileStorage struct { channelGetMostRecentMessagesStmts map[ChannelID]*sql.Stmt channelGetMessageByContentHashStmts map[ChannelID]*sql.Stmt channelRowNumberStmts map[ChannelID]*sql.Stmt + channelSearchConversationSQLStmt map[ChannelID]*sql.Stmt ProfileDirectory string db *sql.DB } @@ -114,6 +115,9 @@ const getMessageCountFromConversationSQLStmt = `select count(*) from channel_%d_ // getMostRecentMessagesSQLStmt is a template for fetching the most recent N messages in a conversation channel const getMostRecentMessagesSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);` +// searchConversationSQLStmt is a template for search a conversation for the most recent N messages matching a given pattern +const searchConversationSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from (select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?)) where BODY like (?)` + // NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for // Preparing commonly used SQL Statements func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileStorage, error) { @@ -222,6 +226,7 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS channelGetMostRecentMessagesStmts: map[ChannelID]*sql.Stmt{}, channelGetCountStmts: map[ChannelID]*sql.Stmt{}, channelRowNumberStmts: map[ChannelID]*sql.Stmt{}, + channelSearchConversationSQLStmt: map[ChannelID]*sql.Stmt{}, }, nil } @@ -735,6 +740,45 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel return count, nil } +func (cps *CwtchProfileStorage) SearchMessages(conversation int, channel int, pattern string, offset int, limit int) ([]model.ConversationMessage, error) { + channelID := ChannelID{Conversation: conversation, Channel: channel} + + cps.mutex.Lock() + defer cps.mutex.Unlock() + _, exists := cps.channelSearchConversationSQLStmt[channelID] + if !exists { + conversationStmt, err := cps.db.Prepare(fmt.Sprintf(searchConversationSQLStmt, conversation, channel)) + if err != nil { + log.Errorf("error executing transaction: %v", err) + return nil, err + } + cps.channelSearchConversationSQLStmt[channelID] = conversationStmt + } + rows, err := cps.channelSearchConversationSQLStmt[channelID].Query(limit, offset, pattern) + if err != nil { + log.Errorf("error executing prepared stmt: %v", err) + return nil, err + } + var conversationMessages []model.ConversationMessage + defer rows.Close() + for { + result := rows.Next() + if !result { + return conversationMessages, nil + } + var id int + var body string + var attributes []byte + var sig string + var contenthash string + err = rows.Scan(&id, &body, &attributes, &sig, &contenthash) + if err != nil { + return conversationMessages, err + } + conversationMessages = append(conversationMessages, model.ConversationMessage{ID: id, Body: body, Attr: model.DeserializeAttributes(attributes), Signature: sig, ContentHash: contenthash}) + } +} + // GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 9079bfe..eda8130 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -131,6 +131,7 @@ type CwtchPeer interface { GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error + SearchConversations(pattern string) string // EnhancedGetMessageById returns a json-encoded enhanced message, suitable for rendering in a UI EnhancedGetMessageById(conversation int, mid int) string diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index 5287acb..bdaf7a0 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -2,6 +2,12 @@ package filesharing import ( "crypto/rand" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "path/filepath" + app2 "cwtch.im/cwtch/app" "cwtch.im/cwtch/event" "cwtch.im/cwtch/functionality/filesharing" @@ -12,13 +18,8 @@ import ( "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/files" utils2 "cwtch.im/cwtch/utils" - "encoding/base64" - "encoding/hex" - "encoding/json" - "fmt" "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" - "path/filepath" // Import SQL Cipher mrand "math/rand" @@ -57,7 +58,8 @@ func TestFileSharing(t *testing.T) { os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - log.SetLevel(log.LevelDebug) + log.SetLevel(log.LevelInfo) + log.ExcludeFromPattern("tapir") os.Mkdir("tordir", 0700) dataDir := path.Join("tordir", "tor") @@ -120,6 +122,8 @@ func TestFileSharing(t *testing.T) { alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) + aliceQueueOracle := event.NewQueue() + app.GetEventBus(alice.GetOnion()).Subscribe(event.SearchResult, aliceQueueOracle) queueOracle := event.NewQueue() app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle) @@ -147,15 +151,36 @@ func TestFileSharing(t *testing.T) { filesharingFunctionality := filesharing.FunctionalityGate() _, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice) - alice.SendMessage(1, fileSharingMessage) - if err != nil { t.Fatalf("Error!: %v", err) } + alice.SendMessage(1, fileSharingMessage) + bob.AcceptConversation(1) + // Wait for the messages to arrive... time.Sleep(time.Second * 10) + bob.SendMessage(1, "this is a test message") + bob.SendMessage(1, "this is another test message") + + // Wait for the messages to arrive... + time.Sleep(time.Second * 20) + alice.SearchConversations("%test%") + + results := 0 + for { + ev := aliceQueueOracle.Next() + if ev.EventType != event.SearchResult { + t.Fatalf("Expected a search result vent") + } + results += 1 + t.Logf("found search result (%d)....%v", results, ev) + if results == 2 { + break + } + } + // test that bob can download and verify the file testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle) @@ -180,6 +205,7 @@ func TestFileSharing(t *testing.T) { // test that we can delete bob... app.DeleteProfile(bob.GetOnion(), "asdfasdf") + aliceQueueOracle.Shutdown() queueOracle.Shutdown() app.Shutdown() acn.Close() @@ -201,7 +227,6 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png.manifest") - bob.AcceptConversation(1) message, _, err := bob.GetChannelMessage(1, 0, 1) if err != nil { t.Fatalf("could not find file sharing message: %v", err)