Merge pull request 'First cut of Conversation Search' (#518) from conversation_search into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #518
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2023-07-13 19:39:41 +00:00
commit 3f1e2d7a14
6 changed files with 163 additions and 12 deletions

View File

@ -217,6 +217,10 @@ const (
// Heartbeat is used to trigger actions that need to happen every so often...
Heartbeat = Type("Heartbeat")
// Conversation Search
SearchResult = Type("SearchResult")
SearchCancelled = Type("SearchCancelled")
)
// Field defines common event attributes
@ -298,6 +302,8 @@ const (
FilePath = Field("FilePath")
FileDownloadFinished = Field("FileDownloadFinished")
NameSuggestion = Field("NameSuggestion")
SearchID = Field("SearchID")
)
// Defining Common errors

View File

@ -1,6 +1,7 @@
package peer
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
@ -75,6 +76,8 @@ type cwtchPeer struct {
extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions
experiments model.Experiments
experimentsLock sync.Mutex
cancelSearchContext context.CancelFunc
}
// EnhancedSendInviteMessage encapsulates attempting to send an invite to a conversation and looking up the enhanced message
@ -801,6 +804,71 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s
return cp.storage.GetChannelMessage(conversation, channel, id)
}
func (cp *cwtchPeer) doSearch(ctx context.Context, searchID string, pattern string) {
// do not allow trivial searches that would match a wide variety of messages...
if len(pattern) <= 5 {
return
}
conversations, _ := cp.FetchConversations()
maxCount := 0
conversationCount := map[int]int{}
for _, conversation := range conversations {
count, err := cp.storage.GetChannelMessageCount(conversation.ID, 0)
if err != nil {
log.Errorf("could not fetch channel count for conversation %d:%d: %s", conversation.ID, 0, err)
}
if count > maxCount {
maxCount = count
}
conversationCount[conversation.ID] = count
}
log.Debugf("searching messages..%v", conversationCount)
for offset := 0; offset < (maxCount + 10); offset += 10 {
select {
case <-ctx.Done():
cp.PublishEvent(event.NewEvent(event.SearchCancelled, map[event.Field]string{event.SearchID: searchID}))
return
case <-time.After(time.Millisecond * 100):
for _, conversation := range conversations {
ccount := conversationCount[conversation.ID]
if offset > ccount {
continue
}
log.Debugf("searching messages..%v: %v offset: %v", conversation.ID, pattern, offset)
matchingMessages, err := cp.storage.SearchMessages(conversation.ID, 0, pattern, offset, 10)
if err != nil {
log.Errorf("could not fetch matching messages for conversation %d:%d: %s", conversation.ID, 0, err)
}
for _, matchingMessage := range matchingMessages {
// publish this search result...
cp.PublishEvent(event.NewEvent(event.SearchResult, map[event.Field]string{event.SearchID: searchID, event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(matchingMessage.ID)}))
log.Debugf("found matching message: %q", matchingMessage)
}
}
}
}
}
// SearchConversation returns a message from a conversation channel referenced by the absolute ID.
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.)
func (cp *cwtchPeer) SearchConversations(pattern string) string {
// we need this lock here to prevent weirdness happening when reassigning cp.cancelSearchContext
cp.mutex.Lock()
defer cp.mutex.Unlock()
if cp.cancelSearchContext != nil {
cp.cancelSearchContext() // Cancel any current searches...
}
ctx, cancel := context.WithCancel(context.Background()) // create a new cancellable contexts...
cp.cancelSearchContext = cancel // save the cancel function...
searchID := event.GetRandNumber().String() // generate a new search id
go cp.doSearch(ctx, searchID, pattern) // perform the search in a new goroutine
return searchID // return the search id so any clients listening to the event bus can associate SearchResult events with this search
}
// GetChannelMessageCount returns the absolute number of messages in a given conversation channel
func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) {
return cp.storage.GetChannelMessageCount(conversation, channel)

View File

@ -61,6 +61,7 @@ type CwtchProfileStorage struct {
channelGetMostRecentMessagesStmts map[ChannelID]*sql.Stmt
channelGetMessageByContentHashStmts map[ChannelID]*sql.Stmt
channelRowNumberStmts map[ChannelID]*sql.Stmt
channelSearchConversationSQLStmt map[ChannelID]*sql.Stmt
ProfileDirectory string
db *sql.DB
}
@ -114,6 +115,9 @@ const getMessageCountFromConversationSQLStmt = `select count(*) from channel_%d_
// getMostRecentMessagesSQLStmt is a template for fetching the most recent N messages in a conversation channel
const getMostRecentMessagesSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);`
// searchConversationSQLStmt is a template for search a conversation for the most recent N messages matching a given pattern
const searchConversationSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from (select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?)) where BODY like (?)`
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements
func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileStorage, error) {
@ -222,6 +226,7 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
channelGetMostRecentMessagesStmts: map[ChannelID]*sql.Stmt{},
channelGetCountStmts: map[ChannelID]*sql.Stmt{},
channelRowNumberStmts: map[ChannelID]*sql.Stmt{},
channelSearchConversationSQLStmt: map[ChannelID]*sql.Stmt{},
},
nil
}
@ -735,6 +740,45 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel
return count, nil
}
func (cps *CwtchProfileStorage) SearchMessages(conversation int, channel int, pattern string, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelSearchConversationSQLStmt[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(searchConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return nil, err
}
cps.channelSearchConversationSQLStmt[channelID] = conversationStmt
}
rows, err := cps.channelSearchConversationSQLStmt[channelID].Query(limit, offset, pattern)
if err != nil {
log.Errorf("error executing prepared stmt: %v", err)
return nil, err
}
var conversationMessages []model.ConversationMessage
defer rows.Close()
for {
result := rows.Next()
if !result {
return conversationMessages, nil
}
var id int
var body string
var attributes []byte
var sig string
var contenthash string
err = rows.Scan(&id, &body, &attributes, &sig, &contenthash)
if err != nil {
return conversationMessages, err
}
conversationMessages = append(conversationMessages, model.ConversationMessage{ID: id, Body: body, Attr: model.DeserializeAttributes(attributes), Signature: sig, ContentHash: contenthash})
}
}
// GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}

View File

@ -131,6 +131,7 @@ type CwtchPeer interface {
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
SearchConversations(pattern string) string
// EnhancedGetMessageById returns a json-encoded enhanced message, suitable for rendering in a UI
EnhancedGetMessageById(conversation int, mid int) string

View File

@ -344,10 +344,11 @@ func (e *engine) Shutdown() {
log.Infof("shutting down ephemeral service")
// work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a
// goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits
conn := connection // don't capture loop variable
go func() {
connection.connectingLock.Lock()
connection.service.Shutdown()
connection.connectingLock.Unlock()
conn.connectingLock.Lock()
conn.service.Shutdown()
conn.connectingLock.Unlock()
}()
}
@ -361,6 +362,11 @@ func (e *engine) peerWithOnion(onion string) {
if !e.isBlocked(onion) {
e.ignoreOnShutdown(e.peerConnecting)(onion)
connected, err := e.service.Connect(onion, e.createPeerTemplate())
if connected && err == nil {
// on success CwtchPeer will handle Auth and other status updates
// early exit from this function...
return
}
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
@ -380,6 +386,7 @@ func (e *engine) peerWithOnion(onion string) {
}
}
}
e.ignoreOnShutdown(e.peerDisconnected)(onion)
}
func (e *engine) makeAntispamPayment(onion string) {

View File

@ -2,6 +2,12 @@ package filesharing
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"path/filepath"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
@ -12,13 +18,8 @@ import (
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/protocol/files"
utils2 "cwtch.im/cwtch/utils"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"path/filepath"
// Import SQL Cipher
mrand "math/rand"
@ -57,7 +58,8 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelDebug)
log.SetLevel(log.LevelInfo)
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
@ -120,6 +122,8 @@ func TestFileSharing(t *testing.T) {
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
aliceQueueOracle := event.NewQueue()
app.GetEventBus(alice.GetOnion()).Subscribe(event.SearchResult, aliceQueueOracle)
queueOracle := event.NewQueue()
app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle)
@ -147,15 +151,36 @@ func TestFileSharing(t *testing.T) {
filesharingFunctionality := filesharing.FunctionalityGate()
_, fileSharingMessage, err := filesharingFunctionality.ShareFile("cwtch.png", alice)
alice.SendMessage(1, fileSharingMessage)
if err != nil {
t.Fatalf("Error!: %v", err)
}
alice.SendMessage(1, fileSharingMessage)
bob.AcceptConversation(1)
// Wait for the messages to arrive...
time.Sleep(time.Second * 10)
bob.SendMessage(1, "this is a test message")
bob.SendMessage(1, "this is another test message")
// Wait for the messages to arrive...
time.Sleep(time.Second * 20)
alice.SearchConversations("%test%")
results := 0
for {
ev := aliceQueueOracle.Next()
if ev.EventType != event.SearchResult {
t.Fatalf("Expected a search result vent")
}
results += 1
t.Logf("found search result (%d)....%v", results, ev)
if results == 2 {
break
}
}
// test that bob can download and verify the file
testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle)
@ -180,6 +205,7 @@ func TestFileSharing(t *testing.T) {
// test that we can delete bob...
app.DeleteProfile(bob.GetOnion(), "asdfasdf")
aliceQueueOracle.Shutdown()
queueOracle.Shutdown()
app.Shutdown()
acn.Close()
@ -201,7 +227,6 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
bob.AcceptConversation(1)
message, _, err := bob.GetChannelMessage(1, 0, 1)
if err != nil {
t.Fatalf("could not find file sharing message: %v", err)