Merge pull request 'Push locks back into storage to free up cwtch peer operations' (#454) from thread_works into master
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
Reviewed-on: #454
This commit is contained in:
commit
33bcc40206
|
@ -172,8 +172,7 @@ func (cp *cwtchPeer) SendScopedZonedGetValToContact(conversationID int, scope at
|
|||
// GetScopedZonedAttribute
|
||||
// Status: Ready for 1.5
|
||||
func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
|
||||
|
||||
value, err := cp.storage.LoadProfileKeyValue(TypeAttribute, scopedZonedKey.ToString())
|
||||
|
@ -187,8 +186,7 @@ func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
|
|||
|
||||
// GetScopedZonedAttributes finds all keys associated with the given scope and zone
|
||||
func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zone) ([]string, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(""))
|
||||
|
||||
keys, err := cp.storage.FindProfileKeysByPrefix(TypeAttribute, scopedZonedKey.ToString())
|
||||
|
@ -202,8 +200,6 @@ func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zon
|
|||
|
||||
// SetScopedZonedAttribute
|
||||
func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
|
||||
|
||||
|
@ -225,8 +221,6 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
|
|||
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
|
||||
// this function will error
|
||||
func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
// We assume we are sending to a Contact.
|
||||
conversationInfo, err := cp.storage.GetConversation(conversation)
|
||||
|
@ -518,8 +512,6 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
|
|||
// AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true`
|
||||
// This will cause Cwtch to auto connect to this conversation on start up
|
||||
func (cp *cwtchPeer) AcceptConversation(id int) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
err := cp.storage.AcceptConversation(id)
|
||||
if err == nil {
|
||||
// If a p2p conversation then attempt to peer with the onion...
|
||||
|
@ -540,8 +532,6 @@ func (cp *cwtchPeer) AcceptConversation(id int) error {
|
|||
// BlockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
|
||||
// This will cause Cwtch to never try to connect to and refuse connections from the peer
|
||||
func (cp *cwtchPeer) BlockConversation(id int) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
ci, err := cp.storage.GetConversation(id)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -563,8 +553,6 @@ func (cp *cwtchPeer) BlockConversation(id int) error {
|
|||
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
|
||||
// Further actions depend on the Accepted field
|
||||
func (cp *cwtchPeer) UnblockConversation(id int) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
ci, err := cp.storage.GetConversation(id)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -592,21 +580,15 @@ func (cp *cwtchPeer) sendUpdateAuth(id int, handle string, accepted bool, blocke
|
|||
}
|
||||
|
||||
func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.FetchConversations()
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) GetConversationInfo(conversation int) (*model.Conversation, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.GetConversation(conversation)
|
||||
}
|
||||
|
||||
// FetchConversationInfo returns information about the given conversation referenced by the handle
|
||||
func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.GetConversationByHandle(handle)
|
||||
}
|
||||
|
||||
|
@ -624,15 +606,11 @@ func (cp *cwtchPeer) DeleteConversation(id int) error {
|
|||
|
||||
// SetConversationAttribute sets the conversation attribute at path to value
|
||||
func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.SetConversationAttribute(id, path, value)
|
||||
}
|
||||
|
||||
// GetConversationAttribute is a shortcut method for retrieving the value of a given path
|
||||
func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath) (string, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
ci, err := cp.storage.GetConversation(id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -648,27 +626,21 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
|
|||
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
|
||||
// in the table (e.g. deleted messages, expired messages, etc.)
|
||||
func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.GetChannelMessage(conversation, channel, id)
|
||||
}
|
||||
|
||||
// GetChannelMessageCount returns the absolute number of messages in a given conversation channel
|
||||
func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.GetChannelMessageCount(conversation, channel)
|
||||
}
|
||||
|
||||
// GetMostRecentMessages returns a selection of messages, ordered by most recently inserted
|
||||
func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit)
|
||||
}
|
||||
|
||||
// UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel
|
||||
// errors if the message doesn't exist, or for underlying daabase issues.
|
||||
// errors if the message doesn't exist, or for underlying database issues.
|
||||
func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error {
|
||||
_, attr, err := cp.GetChannelMessage(conversation, channel, id)
|
||||
if err == nil {
|
||||
|
@ -1036,8 +1008,6 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
|
|||
return -1, err
|
||||
}
|
||||
}
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
// Generate a random number and use it as the signature
|
||||
signature := event.GetRandNumber().String()
|
||||
|
@ -1355,19 +1325,17 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
|
|||
if err == nil && attr[constants.AttrAck] != constants.True {
|
||||
cp.mutex.Lock()
|
||||
attr[constants.AttrAck] = constants.True
|
||||
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
|
||||
cp.mutex.Unlock()
|
||||
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
|
||||
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
cp.mutex.Lock()
|
||||
contenthash := model.CalculateContentHash(dm.Onion, dm.Text)
|
||||
id, err := cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano)}, signature, contenthash)
|
||||
if err == nil {
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.TimestampSent: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano), event.RemotePeer: dm.Onion, event.Index: strconv.Itoa(id), event.Data: dm.Text, event.ContentHash: contenthash}))
|
||||
}
|
||||
cp.mutex.Unlock()
|
||||
return err
|
||||
}
|
||||
return err
|
||||
|
@ -1387,8 +1355,8 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
|
|||
if err == nil {
|
||||
cp.mutex.Lock()
|
||||
attr[constants.AttrAck] = constants.True
|
||||
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
|
||||
cp.mutex.Unlock()
|
||||
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
|
||||
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
|
||||
return nil
|
||||
}
|
||||
|
@ -1427,14 +1395,10 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st
|
|||
}
|
||||
|
||||
func (cp *cwtchPeer) GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.GetChannelMessageBySignature(conversationID, channelID, signature)
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) GetChannelMessageByContentHash(conversationID int, channelID int, contenthash string) (int, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
messageID, err := cp.storage.GetChannelMessageByContentHash(conversationID, channelID, contenthash)
|
||||
if err == nil {
|
||||
return cp.storage.GetRowNumberByMessageID(conversationID, channelID, messageID)
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// StorageKeyType is an interface wrapper around storage key types
|
||||
|
@ -35,6 +36,7 @@ const (
|
|||
type CwtchProfileStorage struct {
|
||||
|
||||
// Note: Statements are thread safe..
|
||||
mutex sync.Mutex
|
||||
|
||||
// Profile related statements
|
||||
insertProfileKeyValueStmt *sql.Stmt
|
||||
|
@ -463,6 +465,8 @@ func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, bod
|
|||
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelInsertStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(insertMessageIntoConversationSQLStmt, conversation, channel))
|
||||
|
@ -488,6 +492,8 @@ func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channe
|
|||
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelUpdateMessageStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(updateMessageIntoConversationSQLStmt, conversation, channel))
|
||||
|
@ -512,6 +518,8 @@ func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channe
|
|||
func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, channel int, signature string) (int, error) {
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelGetMessageBySignatureStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageBySignatureFromConversationSQLStmt, conversation, channel))
|
||||
|
@ -549,6 +557,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, c
|
|||
func (cps *CwtchProfileStorage) GetChannelMessageByContentHash(conversation int, channel int, hash string) (int, error) {
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelGetMessageByContentHashStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageByContentHashFromConversationSQLStmt, conversation, channel))
|
||||
|
@ -587,6 +597,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageByContentHash(conversation int,
|
|||
func (cps *CwtchProfileStorage) GetRowNumberByMessageID(conversation int, channel int, id int) (int, error) {
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelRowNumberStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getLocalIndexOfMessageIDSQLStmt, conversation, channel))
|
||||
|
@ -626,6 +638,8 @@ func (cps *CwtchProfileStorage) GetRowNumberByMessageID(conversation int, channe
|
|||
func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) {
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelGetMessageStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageFromConversationSQLStmt, conversation, channel))
|
||||
|
@ -666,6 +680,8 @@ func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int,
|
|||
func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel int) (int, error) {
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelGetCountStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageCountFromConversationSQLStmt, conversation, channel))
|
||||
|
@ -689,6 +705,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel
|
|||
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
|
||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
_, exists := cps.channelGetMostRecentMessagesStmts[channelID]
|
||||
if !exists {
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMostRecentMessagesSQLStmt, conversation, channel))
|
||||
|
@ -754,6 +772,8 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
|
|||
|
||||
// Close closes the underlying database and prepared statements
|
||||
func (cps *CwtchProfileStorage) Close() {
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
if cps.db != nil {
|
||||
|
||||
cps.PurgeNonSavedMessages()
|
||||
|
@ -809,6 +829,8 @@ func (cps *CwtchProfileStorage) Delete() {
|
|||
// **note* this is technically a very dangerous API and should only be called after
|
||||
// checks on the current password and the derived new password.
|
||||
func (cps *CwtchProfileStorage) Rekey(newkey [32]byte) error {
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
// PRAGMA queries don't allow subs...
|
||||
_, err := cps.db.Exec(fmt.Sprintf(`PRAGMA rekey="x'%x'";`, newkey))
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue