First cut of Conversation Search

This commit is contained in:
Sarah Jamie Lewis 2023-06-26 15:21:11 -07:00
parent 31f397e332
commit cfb2335c05
5 changed files with 143 additions and 9 deletions

View File

@ -217,6 +217,9 @@ const (
// Heartbeat is used to trigger actions that need to happen every so often...
Heartbeat = Type("Heartbeat")
// Conversation Search
SearchResult = Type("SearchResult")
)
// Field defines common event attributes
@ -298,6 +301,8 @@ const (
FilePath = Field("FilePath")
FileDownloadFinished = Field("FileDownloadFinished")
NameSuggestion = Field("NameSuggestion")
SearchID = Field("SearchID")
)
// Defining Common errors

View File

@ -1,6 +1,7 @@
package peer
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
@ -75,6 +76,8 @@ type cwtchPeer struct {
extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions
experiments model.Experiments
experimentsLock sync.Mutex
cancelSearchContext context.CancelFunc
}
// EnhancedSendInviteMessage encapsulates attempting to send an invite to a conversation and looking up the enhanced message
@ -801,6 +804,62 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s
return cp.storage.GetChannelMessage(conversation, channel, id)
}
func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern string) {
conversations, _ := cp.FetchConversations()
maxCount := 0
conversationCount := map[int]int{}
for _, conversation := range conversations {
count, err := cp.storage.GetChannelMessageCount(conversation.ID, 0)
if err != nil {
log.Errorf("could not fetch channel count for conversation %d:%d: %s", conversation.ID, 0, err)
}
if maxCount > count {
maxCount = count
}
conversationCount[conversation.ID] = count
}
log.Debugf("searching messages..%v", conversationCount)
for offset := 0; offset < (maxCount + 10); offset += 10 {
select {
case <-ctx.Done():
return
case <-time.After(time.Millisecond):
for _, conversation := range conversations {
ccount := conversationCount[conversation.ID]
if offset > ccount {
continue
}
log.Debugf("searching messages..%v: %v offset: %v", conversation.ID, pattern, offset)
matchingMessages, err := cp.storage.SearchMessages(conversation.ID, 0, pattern, offset, 10)
if err != nil {
log.Errorf("could not fetch matching messages for conversation %d:%d: %s", conversation.ID, 0, err)
}
for _, matchingMessage := range matchingMessages {
// publish this search result...
cp.PublishEvent(event.NewEvent(event.SearchResult, map[event.Field]string{event.SearchID: searchID, event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(matchingMessage.ID)}))
log.Debugf("found matching message: %q", matchingMessage)
}
}
}
}
}
// SearchConversation returns a message from a conversation channel referenced by the absolute ID.
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.)
func (cp *cwtchPeer) SearchConversations(pattern string) string {
if cp.cancelSearchContext != nil {
cp.cancelSearchContext() // Cancel any current searches...
}
ctx, cancel := context.WithCancel(context.Background()) // create a new cancellable contexts...
cp.cancelSearchContext = cancel // save the cancel function...
searchID := event.GetRandNumber().String() // generate a new search id
go cp.doSearch(ctx, searchID, pattern) // perform the search in a new goroutine
return searchID // return the search id so any clients listening to the event bus can associate SearchResult events with this search
}
// GetChannelMessageCount returns the absolute number of messages in a given conversation channel
func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) {
return cp.storage.GetChannelMessageCount(conversation, channel)

View File

@ -61,6 +61,7 @@ type CwtchProfileStorage struct {
channelGetMostRecentMessagesStmts map[ChannelID]*sql.Stmt
channelGetMessageByContentHashStmts map[ChannelID]*sql.Stmt
channelRowNumberStmts map[ChannelID]*sql.Stmt
channelSearchConversationSQLStmt map[ChannelID]*sql.Stmt
ProfileDirectory string
db *sql.DB
}
@ -114,6 +115,9 @@ const getMessageCountFromConversationSQLStmt = `select count(*) from channel_%d_
// getMostRecentMessagesSQLStmt is a template for fetching the most recent N messages in a conversation channel
const getMostRecentMessagesSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);`
// searchConversationSQLStmt is a template for search a conversation for the most recent N messages matching a given pattern
const searchConversationSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from (select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?)) where BODY like (?)`
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements
func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileStorage, error) {
@ -222,6 +226,7 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
channelGetMostRecentMessagesStmts: map[ChannelID]*sql.Stmt{},
channelGetCountStmts: map[ChannelID]*sql.Stmt{},
channelRowNumberStmts: map[ChannelID]*sql.Stmt{},
channelSearchConversationSQLStmt: map[ChannelID]*sql.Stmt{},
},
nil
}
@ -735,6 +740,45 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel
return count, nil
}
func (cps *CwtchProfileStorage) SearchMessages(conversation int, channel int, pattern string, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelSearchConversationSQLStmt[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(searchConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return nil, err
}
cps.channelSearchConversationSQLStmt[channelID] = conversationStmt
}
rows, err := cps.channelSearchConversationSQLStmt[channelID].Query(limit, offset, pattern)
if err != nil {
log.Errorf("error executing prepared stmt: %v", err)
return nil, err
}
var conversationMessages []model.ConversationMessage
defer rows.Close()
for {
result := rows.Next()
if !result {
return conversationMessages, nil
}
var id int
var body string
var attributes []byte
var sig string
var contenthash string
err = rows.Scan(&id, &body, &attributes, &sig, &contenthash)
if err != nil {
return conversationMessages, err
}
conversationMessages = append(conversationMessages, model.ConversationMessage{ID: id, Body: body, Attr: model.DeserializeAttributes(attributes), Signature: sig, ContentHash: contenthash})
}
}
// GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}

View File

@ -131,6 +131,7 @@ type CwtchPeer interface {
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
SearchConversations(pattern string) string
// EnhancedGetMessageById returns a json-encoded enhanced message, suitable for rendering in a UI
EnhancedGetMessageById(conversation int, mid int) string

View File

@ -2,6 +2,12 @@ package filesharing
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"path/filepath"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
@ -12,13 +18,8 @@ import (
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/protocol/files"
utils2 "cwtch.im/cwtch/utils"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"path/filepath"
// Import SQL Cipher
mrand "math/rand"
@ -57,7 +58,8 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelDebug)
log.SetLevel(log.LevelInfo)
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
@ -120,6 +122,8 @@ func TestFileSharing(t *testing.T) {
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
aliceQueueOracle := event.NewQueue()
app.GetEventBus(alice.GetOnion()).Subscribe(event.SearchResult, aliceQueueOracle)
queueOracle := event.NewQueue()
app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle)
@ -147,15 +151,36 @@ func TestFileSharing(t *testing.T) {
filesharingFunctionality := filesharing.FunctionalityGate()
_, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice)
alice.SendMessage(1, fileSharingMessage)
if err != nil {
t.Fatalf("Error!: %v", err)
}
alice.SendMessage(1, fileSharingMessage)
bob.AcceptConversation(1)
// Wait for the messages to arrive...
time.Sleep(time.Second * 10)
bob.SendMessage(1, "this is a test message")
bob.SendMessage(1, "this is another test message")
// Wait for the messages to arrive...
time.Sleep(time.Second * 20)
alice.SearchConversations("%test%")
results := 0
for {
ev := aliceQueueOracle.Next()
if ev.EventType != event.SearchResult {
t.Fatalf("Expected a search result vent")
}
results += 1
t.Logf("found search result (%d)....%v", results, ev)
if results == 2 {
break
}
}
// test that bob can download and verify the file
testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle)
@ -180,6 +205,7 @@ func TestFileSharing(t *testing.T) {
// test that we can delete bob...
app.DeleteProfile(bob.GetOnion(), "asdfasdf")
aliceQueueOracle.Shutdown()
queueOracle.Shutdown()
app.Shutdown()
acn.Close()
@ -201,7 +227,6 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
bob.AcceptConversation(1)
message, _, err := bob.GetChannelMessage(1, 0, 1)
if err != nil {
t.Fatalf("could not find file sharing message: %v", err)