Extract Register On New Group Message Functions
This commit is contained in:
parent
0aa0c94af7
commit
74698a0e00
|
@ -5,6 +5,7 @@ import (
|
||||||
"cwtch.im/cwtch/model/attr"
|
"cwtch.im/cwtch/model/attr"
|
||||||
"cwtch.im/cwtch/model/constants"
|
"cwtch.im/cwtch/model/constants"
|
||||||
"cwtch.im/cwtch/peer"
|
"cwtch.im/cwtch/peer"
|
||||||
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -23,6 +24,29 @@ func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
|
||||||
return nil, errors.New("groupmanagement is not enabled")
|
return nil, errors.New("groupmanagement is not enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Functionality) Init(cp peer.CwtchPeer) {
|
||||||
|
cp.RegisterOnNewGroupMessage(func(conversationID int, dgm groups.DecryptedGroupMessage, storage *peer.CwtchProfileStorage) {
|
||||||
|
/// Time to Handle any Meta-Actions
|
||||||
|
overlayMessage := new(model.MessageWrapper)
|
||||||
|
err := json.Unmarshal([]byte(dgm.Text), overlayMessage)
|
||||||
|
if err == nil {
|
||||||
|
if overlayMessage.Overlay == model.OverlayGroupManagement {
|
||||||
|
ci, _ := storage.GetConversation(conversationID)
|
||||||
|
// NOTE: this is safe because dm.Onion will only be valid if there is a valid signature
|
||||||
|
// from the onion on the message. As such we don't re-verify the signature here.
|
||||||
|
acl, exists := ci.ACL[dgm.Onion]
|
||||||
|
if exists && acl.ManageACL {
|
||||||
|
newACL := new(model.AccessControlList)
|
||||||
|
err := json.Unmarshal([]byte(overlayMessage.Data), newACL)
|
||||||
|
if err == nil {
|
||||||
|
storage.SetConversationACL(conversationID, *newACL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// CreateManagedGroup is a convenience function for creating a new managed group
|
// CreateManagedGroup is a convenience function for creating a new managed group
|
||||||
func (f *Functionality) CreateManagedGroup(cp peer.CwtchPeer, name string, server string) (int, error) {
|
func (f *Functionality) CreateManagedGroup(cp peer.CwtchPeer, name string, server string) (int, error) {
|
||||||
return cp.StartGroup(name, server)
|
return cp.StartGroup(name, server)
|
||||||
|
|
|
@ -68,10 +68,25 @@ type cwtchPeer struct {
|
||||||
|
|
||||||
state map[string]connections.ConnectionState
|
state map[string]connections.ConnectionState
|
||||||
|
|
||||||
|
onNewPeerMessageFunctions []func(conversationID int, message string, storage *CwtchProfileStorage)
|
||||||
|
onNewGroupMessageFunctions []func(conversationID int, dgm groups.DecryptedGroupMessage, storage *CwtchProfileStorage)
|
||||||
|
|
||||||
queue event.Queue
|
queue event.Queue
|
||||||
eventBus event.Manager
|
eventBus event.Manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cp *cwtchPeer) RegisterOnNewPeerMessage(f func(conversationID int, message string, storage *CwtchProfileStorage)) {
|
||||||
|
cp.mutex.Lock()
|
||||||
|
defer cp.mutex.Unlock()
|
||||||
|
cp.onNewPeerMessageFunctions = append(cp.onNewPeerMessageFunctions, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cp *cwtchPeer) RegisterOnNewGroupMessage(f func(conversationID int, dgm groups.DecryptedGroupMessage, storage *CwtchProfileStorage)) {
|
||||||
|
cp.mutex.Lock()
|
||||||
|
defer cp.mutex.Unlock()
|
||||||
|
cp.onNewGroupMessageFunctions = append(cp.onNewGroupMessageFunctions, f)
|
||||||
|
}
|
||||||
|
|
||||||
func (cp *cwtchPeer) DoStorageTransaction(f func(storage *CwtchProfileStorage) error) error {
|
func (cp *cwtchPeer) DoStorageTransaction(f func(storage *CwtchProfileStorage) error) error {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
defer cp.mutex.Unlock()
|
defer cp.mutex.Unlock()
|
||||||
|
@ -542,24 +557,6 @@ func (cp *cwtchPeer) BlockConversation(id int) error {
|
||||||
return cp.storage.SetConversationACL(id, ci.ACL)
|
return cp.storage.SetConversationACL(id, ci.ACL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *cwtchPeer) AddMember(id int, handle string, ac model.AccessControl) error {
|
|
||||||
cp.mutex.Lock()
|
|
||||||
defer cp.mutex.Unlock()
|
|
||||||
ci, err := cp.storage.GetConversation(id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
requesterHandle, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
|
|
||||||
|
|
||||||
if ci.ACL[string(requesterHandle)].ManageACL {
|
|
||||||
aclCopy := ci.ACL
|
|
||||||
aclCopy[handle] = ac
|
|
||||||
cp.storage.SetConversationACL(id, ci.ACL)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("unable to add a member to the conversation")
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
|
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
|
||||||
// Further actions depend on the Accepted field
|
// Further actions depend on the Accepted field
|
||||||
func (cp *cwtchPeer) UnblockConversation(id int) error {
|
func (cp *cwtchPeer) UnblockConversation(id int) error {
|
||||||
|
@ -1356,23 +1353,8 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
|
||||||
id, err := cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano)}, signature, contenthash)
|
id, err := cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano)}, signature, contenthash)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
||||||
/// Time to Handle any Meta-Actions
|
for _, f := range cp.onNewGroupMessageFunctions {
|
||||||
overlayMessage := new(model.MessageWrapper)
|
f(id, *dm, cp.storage)
|
||||||
err := json.Unmarshal([]byte(dm.Text), overlayMessage)
|
|
||||||
if err == nil {
|
|
||||||
if overlayMessage.Overlay == model.OverlayGroupManagement {
|
|
||||||
ci, _ := cp.storage.GetConversation(conversationID)
|
|
||||||
// NOTE: this is safe because dm.Onion will only be valid if there is a valid signature
|
|
||||||
// from the onion on the message. As such we don't re-verify the signature here.
|
|
||||||
acl, exists := ci.ACL[dm.Onion]
|
|
||||||
if exists && acl.ManageACL {
|
|
||||||
newACL := new(model.AccessControlList)
|
|
||||||
err := json.Unmarshal([]byte(overlayMessage.Data), newACL)
|
|
||||||
if err == nil {
|
|
||||||
cp.storage.SetConversationACL(conversationID, *newACL)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.TimestampSent: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano), event.RemotePeer: dm.Onion, event.Index: strconv.Itoa(id), event.Data: dm.Text, event.ContentHash: contenthash}))
|
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.TimestampSent: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano), event.RemotePeer: dm.Onion, event.Index: strconv.Itoa(id), event.Data: dm.Text, event.ContentHash: contenthash}))
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"cwtch.im/cwtch/model"
|
"cwtch.im/cwtch/model"
|
||||||
"cwtch.im/cwtch/model/attr"
|
"cwtch.im/cwtch/model/attr"
|
||||||
"cwtch.im/cwtch/protocol/connections"
|
"cwtch.im/cwtch/protocol/connections"
|
||||||
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
"git.openprivacy.ca/openprivacy/connectivity"
|
"git.openprivacy.ca/openprivacy/connectivity"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -106,10 +107,6 @@ type CwtchPeer interface {
|
||||||
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
|
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
|
||||||
DeleteConversation(conversation int) error
|
DeleteConversation(conversation int) error
|
||||||
|
|
||||||
AddMember(conversation int, handle string, ac model.AccessControl) error
|
|
||||||
|
|
||||||
DoStorageTransaction(func(storage *CwtchProfileStorage) error) error
|
|
||||||
|
|
||||||
// New Unified Conversation Channel Interfaces
|
// New Unified Conversation Channel Interfaces
|
||||||
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
|
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
|
||||||
GetChannelMessageCount(conversation int, channel int) (int, error)
|
GetChannelMessageCount(conversation int, channel int) (int, error)
|
||||||
|
@ -122,4 +119,11 @@ type CwtchPeer interface {
|
||||||
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
|
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
|
||||||
Export(file string) error
|
Export(file string) error
|
||||||
Delete()
|
Delete()
|
||||||
|
|
||||||
|
// DoStorageTransaction is a low level API for building new functionality on top of Cwtch Peer
|
||||||
|
// in general you do not want to use this.
|
||||||
|
DoStorageTransaction(func(storage *CwtchProfileStorage) error) error
|
||||||
|
|
||||||
|
RegisterOnNewPeerMessage(func(conversationID int, message string, storage *CwtchProfileStorage))
|
||||||
|
RegisterOnNewGroupMessage(func(conversationID int, dgm groups.DecryptedGroupMessage, storage *CwtchProfileStorage))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue