From 3d49511c6c79cc8094efd7b05f6379e36a17ba19 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 26 Aug 2022 13:54:48 -0700 Subject: [PATCH] Push locks back into storage to free up cwthc peer operations --- peer/cwtch_peer.go | 50 +++++----------------------------- peer/cwtchprofilestorage.go | 22 +++++++++++++++ protocol/connections/engine.go | 16 +++++------ 3 files changed, 37 insertions(+), 51 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index a0d5498..f38a1ff 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -172,8 +172,7 @@ func (cp *cwtchPeer) SendScopedZonedGetValToContact(conversationID int, scope at // GetScopedZonedAttribute // Status: Ready for 1.5 func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) { - cp.mutex.Lock() - defer cp.mutex.Unlock() + scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key)) value, err := cp.storage.LoadProfileKeyValue(TypeAttribute, scopedZonedKey.ToString()) @@ -187,8 +186,7 @@ func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k // GetScopedZonedAttributes finds all keys associated with the given scope and zone func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zone) ([]string, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() + scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath("")) keys, err := cp.storage.FindProfileKeysByPrefix(TypeAttribute, scopedZonedKey.ToString()) @@ -202,8 +200,6 @@ func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zon // SetScopedZonedAttribute func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string) { - cp.mutex.Lock() - defer cp.mutex.Unlock() scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key)) @@ -225,8 +221,6 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k // If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then // this function will error func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() // We assume we are sending to a Contact. conversationInfo, err := cp.storage.GetConversation(conversation) @@ -518,8 +512,6 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr // AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true` // This will cause Cwtch to auto connect to this conversation on start up func (cp *cwtchPeer) AcceptConversation(id int) error { - cp.mutex.Lock() - defer cp.mutex.Unlock() err := cp.storage.AcceptConversation(id) if err == nil { // If a p2p conversation then attempt to peer with the onion... @@ -540,8 +532,6 @@ func (cp *cwtchPeer) AcceptConversation(id int) error { // BlockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true` // This will cause Cwtch to never try to connect to and refuse connections from the peer func (cp *cwtchPeer) BlockConversation(id int) error { - cp.mutex.Lock() - defer cp.mutex.Unlock() ci, err := cp.storage.GetConversation(id) if err != nil { return err @@ -563,8 +553,6 @@ func (cp *cwtchPeer) BlockConversation(id int) error { // UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true` // Further actions depend on the Accepted field func (cp *cwtchPeer) UnblockConversation(id int) error { - cp.mutex.Lock() - defer cp.mutex.Unlock() ci, err := cp.storage.GetConversation(id) if err != nil { return err @@ -592,21 +580,15 @@ func (cp *cwtchPeer) sendUpdateAuth(id int, handle string, accepted bool, blocke } func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.FetchConversations() } func (cp *cwtchPeer) GetConversationInfo(conversation int) (*model.Conversation, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.GetConversation(conversation) } // FetchConversationInfo returns information about the given conversation referenced by the handle func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.GetConversationByHandle(handle) } @@ -624,15 +606,11 @@ func (cp *cwtchPeer) DeleteConversation(id int) error { // SetConversationAttribute sets the conversation attribute at path to value func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.SetConversationAttribute(id, path, value) } // GetConversationAttribute is a shortcut method for retrieving the value of a given path func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath) (string, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() ci, err := cp.storage.GetConversation(id) if err != nil { return "", err @@ -648,27 +626,21 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath) // Note: This should note be used to index a list as the ID is not expected to be tied to absolute position // in the table (e.g. deleted messages, expired messages, etc.) func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.GetChannelMessage(conversation, channel, id) } // GetChannelMessageCount returns the absolute number of messages in a given conversation channel func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.GetChannelMessageCount(conversation, channel) } // GetMostRecentMessages returns a selection of messages, ordered by most recently inserted func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit) } // UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel -// errors if the message doesn't exist, or for underlying daabase issues. +// errors if the message doesn't exist, or for underlying database issues. func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error { _, attr, err := cp.GetChannelMessage(conversation, channel, id) if err == nil { @@ -759,8 +731,8 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { // we haven't seen this key associated with the server before } - // // If we have gotten to this point we can assume this is a safe key bundle signed by the - // // server with no conflicting keys. So we are going to save all the keys + // // If we have gotten to this point we can assume this is a safe key bundle signed by the + // // server with no conflicting keys. So we are going to save all the keys for k, v := range ab { cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v) } @@ -1036,8 +1008,6 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) return -1, err } } - cp.mutex.Lock() - defer cp.mutex.Unlock() // Generate a random number and use it as the signature signature := event.GetRandNumber().String() @@ -1355,19 +1325,17 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat if err == nil && attr[constants.AttrAck] != constants.True { cp.mutex.Lock() attr[constants.AttrAck] = constants.True - cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr) cp.mutex.Unlock() + cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr) cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)})) return nil } } else { - cp.mutex.Lock() contenthash := model.CalculateContentHash(dm.Onion, dm.Text) id, err := cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano)}, signature, contenthash) if err == nil { cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.TimestampSent: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano), event.RemotePeer: dm.Onion, event.Index: strconv.Itoa(id), event.Data: dm.Text, event.ContentHash: contenthash})) } - cp.mutex.Unlock() return err } return err @@ -1387,8 +1355,8 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature if err == nil { cp.mutex.Lock() attr[constants.AttrAck] = constants.True - cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr) cp.mutex.Unlock() + cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr) cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)})) return nil } @@ -1427,14 +1395,10 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st } func (cp *cwtchPeer) GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() return cp.storage.GetChannelMessageBySignature(conversationID, channelID, signature) } func (cp *cwtchPeer) GetChannelMessageByContentHash(conversationID int, channelID int, contenthash string) (int, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() messageID, err := cp.storage.GetChannelMessageByContentHash(conversationID, channelID, contenthash) if err == nil { return cp.storage.GetRowNumberByMessageID(conversationID, channelID, messageID) diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index b9512c2..3ba6b81 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -14,6 +14,7 @@ import ( "os" "path/filepath" "strings" + "sync" ) // StorageKeyType is an interface wrapper around storage key types @@ -35,6 +36,7 @@ const ( type CwtchProfileStorage struct { // Note: Statements are thread safe.. + mutex sync.Mutex // Profile related statements insertProfileKeyValueStmt *sql.Stmt @@ -463,6 +465,8 @@ func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, bod channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelInsertStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(insertMessageIntoConversationSQLStmt, conversation, channel)) @@ -488,6 +492,8 @@ func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channe channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelUpdateMessageStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(updateMessageIntoConversationSQLStmt, conversation, channel)) @@ -512,6 +518,8 @@ func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channe func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, channel int, signature string) (int, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelGetMessageBySignatureStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageBySignatureFromConversationSQLStmt, conversation, channel)) @@ -549,6 +557,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, c func (cps *CwtchProfileStorage) GetChannelMessageByContentHash(conversation int, channel int, hash string) (int, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelGetMessageByContentHashStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageByContentHashFromConversationSQLStmt, conversation, channel)) @@ -587,6 +597,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageByContentHash(conversation int, func (cps *CwtchProfileStorage) GetRowNumberByMessageID(conversation int, channel int, id int) (int, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelRowNumberStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getLocalIndexOfMessageIDSQLStmt, conversation, channel)) @@ -626,6 +638,8 @@ func (cps *CwtchProfileStorage) GetRowNumberByMessageID(conversation int, channe func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelGetMessageStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageFromConversationSQLStmt, conversation, channel)) @@ -666,6 +680,8 @@ func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel int) (int, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelGetCountStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageCountFromConversationSQLStmt, conversation, channel)) @@ -689,6 +705,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} + cps.mutex.Lock() + defer cps.mutex.Unlock() _, exists := cps.channelGetMostRecentMessagesStmts[channelID] if !exists { conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMostRecentMessagesSQLStmt, conversation, channel)) @@ -754,6 +772,8 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() { // Close closes the underlying database and prepared statements func (cps *CwtchProfileStorage) Close() { + cps.mutex.Lock() + defer cps.mutex.Unlock() if cps.db != nil { cps.PurgeNonSavedMessages() @@ -809,6 +829,8 @@ func (cps *CwtchProfileStorage) Delete() { // **note* this is technically a very dangerous API and should only be called after // checks on the current password and the derived new password. func (cps *CwtchProfileStorage) Rekey(newkey [32]byte) error { + cps.mutex.Lock() + defer cps.mutex.Unlock() // PRAGMA queries don't allow subs... _, err := cps.db.Exec(fmt.Sprintf(`PRAGMA rekey="x'%x'";`, newkey)) return err diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 44d6e4e..f112c5c 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -418,15 +418,15 @@ func (e *engine) peerAuthed(onion string) { //details, err := e.acn.GetInfo(onion) //if err == nil { - // if hops, exists := details["circuit"]; exists { - // e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{ - // event.Handle: onion, - // event.Key: "circuit", - // event.Data: hops, - // })) - // } + // if hops, exists := details["circuit"]; exists { + // e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{ + // event.Handle: onion, + // event.Key: "circuit", + // event.Data: hops, + // })) + // } //} else { - // log.Errorf("error getting info for onion %v", err) + // log.Errorf("error getting info for onion %v", err) //} e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{