Browse Source

fix logic arroudn accept/block contact and add unblock support

fixAcceptBlock
Dan Ballard 3 weeks ago
parent
commit
830e479539
  1. 9
      event/common.go
  2. 6
      model/conversation.go
  3. 39
      peer/cwtch_peer.go
  4. 2
      peer/cwtchprofilestorage.go
  5. 1
      peer/profile_interface.go
  6. 13
      protocol/connections/engine.go

9
event/common.go

@ -25,8 +25,10 @@ const (
RetryServerRequest = Type("RetryServerRequest")
// RemotePeer
// Authorization(model.peer.Auth_...)
SetPeerAuthorization = Type("UpdatePeerAuthorization")
// ConversationID
// Accepted
// Blocked
UpdateConversationAuthorization = Type("UpdateConversationAuthorization")
// Turn on/off blocking of unknown peers (if peers aren't in the contact list then they will be autoblocked
BlockUnknownPeers = Type("BlockUnknownPeers")
@ -263,7 +265,8 @@ const (
// Flags denotes a set of message flags
Flags = Field("Flags")
Authorization = Field("Authorization")
Accepted = Field("Accepted")
Blocked = Field("Blocked")
KeyBundle = Field("KeyBundle")

6
model/conversation.go

@ -13,6 +13,12 @@ type AccessControl struct {
Append bool // Allows a handle to append new messages to the conversation
}
// Serialize transforms the AccessControl into json.
func (ac *AccessControl) Serialize() []byte {
data, _ := json.Marshal(ac)
return data
}
// DefaultP2PAccessControl - because in the year 2021, go does not support constant structs...
func DefaultP2PAccessControl() AccessControl {
return AccessControl{Read: true, Append: true, Blocked: false}

39
peer/cwtch_peer.go

@ -486,33 +486,58 @@ func (cp *cwtchPeer) AcceptConversation(id int) error {
err := cp.storage.AcceptConversation(id)
if err == nil {
// If a p2p conversation then attempt to peer with the onion...
// Groups and Server have their own acceptance flow.
ci, _ := cp.storage.GetConversation(id)
// Groups and Server have their own acceptance flow.,
ci, err := cp.storage.GetConversation(id)
if err != nil {
log.Errorf("Could not get conversation for %v: %v", id, err)
return err
}
if !ci.IsGroup() && !ci.IsServer() {
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthApproved)}))
cp.sendUpdateAuth(id, ci.Handle, ci.Accepted, ci.ACL[ci.Handle].Blocked)
cp.PeerWithOnion(ci.Handle)
}
}
return err
}
// BlockConversation looks up a conversation by `handle` and sets the Accepted status to `true`
// This will cause Cwtch to auto connect to this conversation on start up
// BlockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
// This will cause Cwtch to never try to connect to and refuse connections from the peer
func (cp *cwtchPeer) BlockConversation(id int) error {
return cp.setACL(id, &model.AccessControl{Blocked: true, Read: false, Append: false})
}
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
// Further actions depend on the Accepted field
func (cp *cwtchPeer) UnblockConversation(id int) error {
return cp.setACL(id, &model.AccessControl{Blocked: false, Read: false, Append: false})
}
func (cp *cwtchPeer) setACL(id int, acl *model.AccessControl) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
ci, err := cp.storage.GetConversation(id)
if err != nil {
return err
}
// p2p conversations have a single ACL referencing the remote peer. Set this to blocked...
ci.ACL[ci.Handle] = model.AccessControl{Blocked: true, Read: false, Append: false}
ci.ACL[ci.Handle] = *acl
// Send an event in any case to block the protocol engine...
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)}))
cp.sendUpdateAuth(id, ci.Handle, ci.Accepted, ci.ACL[ci.Handle].Blocked)
if !ci.IsGroup() && !ci.IsServer() && ci.Accepted {
cp.PeerWithOnion(ci.Handle)
}
return cp.storage.SetConversationACL(id, ci.ACL)
}
func (cp *cwtchPeer) sendUpdateAuth(id int, handle string, accepted bool, blocked bool) {
cp.eventBus.Publish(event.NewEvent(event.UpdateConversationAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: handle, event.Accepted: strconv.FormatBool(accepted), event.Blocked: strconv.FormatBool(blocked)}))
}
func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()

2
peer/cwtchprofilestorage.go

@ -393,7 +393,7 @@ func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
// SetConversationACL sets a new ACL on a given conversation.
func (cps *CwtchProfileStorage) SetConversationACL(id int, acl model.AccessControlList) error {
_, err := cps.setConversationACLStmt.Exec(acl, id)
_, err := cps.setConversationACLStmt.Exec(acl.Serialize(), id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err

1
peer/profile_interface.go

@ -101,6 +101,7 @@ type CwtchPeer interface {
FetchConversationInfo(handle string) (*model.Conversation, error)
AcceptConversation(conversation int) error
BlockConversation(conversation int) error
UnblockConversation(conversation int) error
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
DeleteConversation(conversation int) error

13
protocol/connections/engine.go

@ -103,7 +103,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)
engine.eventManager.Subscribe(event.SetPeerAuthorization, engine.queue)
engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue)
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
@ -186,8 +186,15 @@ func (e *engine) eventHandler() {
if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.SetPeerAuthorization:
auth := model.Authorization(ev.Data[event.Authorization])
case event.UpdateConversationAuthorization:
accepted, _ := strconv.ParseBool(ev.Data[event.Accepted])
blocked, _ := strconv.ParseBool(ev.Data[event.Blocked])
auth := model.AuthUnknown
if blocked {
auth = model.AuthBlocked
} else if accepted {
auth = model.AuthApproved
}
e.authorizations.Store(ev.Data[event.RemotePeer], auth)
if auth == model.AuthBlocked {
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])

Loading…
Cancel
Save