Push locks back into storage to free up cwthc peer operations
continuous-integration/drone/pr Build is pending Details
continuous-integration/drone/push Build was killed Details

This commit is contained in:
Sarah Jamie Lewis 2022-08-26 13:54:48 -07:00
parent 720fb664de
commit 3d49511c6c
3 changed files with 37 additions and 51 deletions

View File

@ -172,8 +172,7 @@ func (cp *cwtchPeer) SendScopedZonedGetValToContact(conversationID int, scope at
// GetScopedZonedAttribute
// Status: Ready for 1.5
func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
value, err := cp.storage.LoadProfileKeyValue(TypeAttribute, scopedZonedKey.ToString())
@ -187,8 +186,7 @@ func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
// GetScopedZonedAttributes finds all keys associated with the given scope and zone
func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zone) ([]string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(""))
keys, err := cp.storage.FindProfileKeysByPrefix(TypeAttribute, scopedZonedKey.ToString())
@ -202,8 +200,6 @@ func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zon
// SetScopedZonedAttribute
func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
@ -225,8 +221,6 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
// this function will error
func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
// We assume we are sending to a Contact.
conversationInfo, err := cp.storage.GetConversation(conversation)
@ -518,8 +512,6 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
// AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true`
// This will cause Cwtch to auto connect to this conversation on start up
func (cp *cwtchPeer) AcceptConversation(id int) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
err := cp.storage.AcceptConversation(id)
if err == nil {
// If a p2p conversation then attempt to peer with the onion...
@ -540,8 +532,6 @@ func (cp *cwtchPeer) AcceptConversation(id int) error {
// BlockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
// This will cause Cwtch to never try to connect to and refuse connections from the peer
func (cp *cwtchPeer) BlockConversation(id int) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
ci, err := cp.storage.GetConversation(id)
if err != nil {
return err
@ -563,8 +553,6 @@ func (cp *cwtchPeer) BlockConversation(id int) error {
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
// Further actions depend on the Accepted field
func (cp *cwtchPeer) UnblockConversation(id int) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
ci, err := cp.storage.GetConversation(id)
if err != nil {
return err
@ -592,21 +580,15 @@ func (cp *cwtchPeer) sendUpdateAuth(id int, handle string, accepted bool, blocke
}
func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.FetchConversations()
}
func (cp *cwtchPeer) GetConversationInfo(conversation int) (*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetConversation(conversation)
}
// FetchConversationInfo returns information about the given conversation referenced by the handle
func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetConversationByHandle(handle)
}
@ -624,15 +606,11 @@ func (cp *cwtchPeer) DeleteConversation(id int) error {
// SetConversationAttribute sets the conversation attribute at path to value
func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.SetConversationAttribute(id, path, value)
}
// GetConversationAttribute is a shortcut method for retrieving the value of a given path
func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath) (string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
ci, err := cp.storage.GetConversation(id)
if err != nil {
return "", err
@ -648,27 +626,21 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.)
func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetChannelMessage(conversation, channel, id)
}
// GetChannelMessageCount returns the absolute number of messages in a given conversation channel
func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetChannelMessageCount(conversation, channel)
}
// GetMostRecentMessages returns a selection of messages, ordered by most recently inserted
func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit)
}
// UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel
// errors if the message doesn't exist, or for underlying daabase issues.
// errors if the message doesn't exist, or for underlying database issues.
func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error {
_, attr, err := cp.GetChannelMessage(conversation, channel, id)
if err == nil {
@ -759,8 +731,8 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) {
// we haven't seen this key associated with the server before
}
// // If we have gotten to this point we can assume this is a safe key bundle signed by the
// // server with no conflicting keys. So we are going to save all the keys
// // If we have gotten to this point we can assume this is a safe key bundle signed by the
// // server with no conflicting keys. So we are going to save all the keys
for k, v := range ab {
cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v)
}
@ -1036,8 +1008,6 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
return -1, err
}
}
cp.mutex.Lock()
defer cp.mutex.Unlock()
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
@ -1355,19 +1325,17 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
if err == nil && attr[constants.AttrAck] != constants.True {
cp.mutex.Lock()
attr[constants.AttrAck] = constants.True
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
cp.mutex.Unlock()
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
return nil
}
} else {
cp.mutex.Lock()
contenthash := model.CalculateContentHash(dm.Onion, dm.Text)
id, err := cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano)}, signature, contenthash)
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.TimestampSent: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano), event.RemotePeer: dm.Onion, event.Index: strconv.Itoa(id), event.Data: dm.Text, event.ContentHash: contenthash}))
}
cp.mutex.Unlock()
return err
}
return err
@ -1387,8 +1355,8 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
if err == nil {
cp.mutex.Lock()
attr[constants.AttrAck] = constants.True
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
cp.mutex.Unlock()
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
return nil
}
@ -1427,14 +1395,10 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st
}
func (cp *cwtchPeer) GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetChannelMessageBySignature(conversationID, channelID, signature)
}
func (cp *cwtchPeer) GetChannelMessageByContentHash(conversationID int, channelID int, contenthash string) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
messageID, err := cp.storage.GetChannelMessageByContentHash(conversationID, channelID, contenthash)
if err == nil {
return cp.storage.GetRowNumberByMessageID(conversationID, channelID, messageID)

View File

@ -14,6 +14,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
)
// StorageKeyType is an interface wrapper around storage key types
@ -35,6 +36,7 @@ const (
type CwtchProfileStorage struct {
// Note: Statements are thread safe..
mutex sync.Mutex
// Profile related statements
insertProfileKeyValueStmt *sql.Stmt
@ -463,6 +465,8 @@ func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, bod
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelInsertStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(insertMessageIntoConversationSQLStmt, conversation, channel))
@ -488,6 +492,8 @@ func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channe
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelUpdateMessageStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(updateMessageIntoConversationSQLStmt, conversation, channel))
@ -512,6 +518,8 @@ func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channe
func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, channel int, signature string) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelGetMessageBySignatureStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageBySignatureFromConversationSQLStmt, conversation, channel))
@ -549,6 +557,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, c
func (cps *CwtchProfileStorage) GetChannelMessageByContentHash(conversation int, channel int, hash string) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelGetMessageByContentHashStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageByContentHashFromConversationSQLStmt, conversation, channel))
@ -587,6 +597,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageByContentHash(conversation int,
func (cps *CwtchProfileStorage) GetRowNumberByMessageID(conversation int, channel int, id int) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelRowNumberStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getLocalIndexOfMessageIDSQLStmt, conversation, channel))
@ -626,6 +638,8 @@ func (cps *CwtchProfileStorage) GetRowNumberByMessageID(conversation int, channe
func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelGetMessageStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageFromConversationSQLStmt, conversation, channel))
@ -666,6 +680,8 @@ func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int,
func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel int) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelGetCountStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageCountFromConversationSQLStmt, conversation, channel))
@ -689,6 +705,8 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
cps.mutex.Lock()
defer cps.mutex.Unlock()
_, exists := cps.channelGetMostRecentMessagesStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMostRecentMessagesSQLStmt, conversation, channel))
@ -754,6 +772,8 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
// Close closes the underlying database and prepared statements
func (cps *CwtchProfileStorage) Close() {
cps.mutex.Lock()
defer cps.mutex.Unlock()
if cps.db != nil {
cps.PurgeNonSavedMessages()
@ -809,6 +829,8 @@ func (cps *CwtchProfileStorage) Delete() {
// **note* this is technically a very dangerous API and should only be called after
// checks on the current password and the derived new password.
func (cps *CwtchProfileStorage) Rekey(newkey [32]byte) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
// PRAGMA queries don't allow subs...
_, err := cps.db.Exec(fmt.Sprintf(`PRAGMA rekey="x'%x'";`, newkey))
return err

View File

@ -418,15 +418,15 @@ func (e *engine) peerAuthed(onion string) {
//details, err := e.acn.GetInfo(onion)
//if err == nil {
// if hops, exists := details["circuit"]; exists {
// e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{
// event.Handle: onion,
// event.Key: "circuit",
// event.Data: hops,
// }))
// }
// if hops, exists := details["circuit"]; exists {
// e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{
// event.Handle: onion,
// event.Key: "circuit",
// event.Data: hops,
// }))
// }
//} else {
// log.Errorf("error getting info for onion %v", err)
// log.Errorf("error getting info for onion %v", err)
//}
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{