fix logic arroudn accept/block contact and add unblock support #421
|
@ -25,8 +25,10 @@ const (
|
|||
RetryServerRequest = Type("RetryServerRequest")
|
||||
|
||||
// RemotePeer
|
||||
// Authorization(model.peer.Auth_...)
|
||||
SetPeerAuthorization = Type("UpdatePeerAuthorization")
|
||||
// ConversationID
|
||||
// Accepted
|
||||
// Blocked
|
||||
UpdateConversationAuthorization = Type("UpdateConversationAuthorization")
|
||||
|
||||
// Turn on/off blocking of unknown peers (if peers aren't in the contact list then they will be autoblocked
|
||||
BlockUnknownPeers = Type("BlockUnknownPeers")
|
||||
|
@ -263,7 +265,8 @@ const (
|
|||
// Flags denotes a set of message flags
|
||||
Flags = Field("Flags")
|
||||
|
||||
Authorization = Field("Authorization")
|
||||
Accepted = Field("Accepted")
|
||||
Blocked = Field("Blocked")
|
||||
|
||||
KeyBundle = Field("KeyBundle")
|
||||
|
||||
|
|
|
@ -486,18 +486,22 @@ func (cp *cwtchPeer) AcceptConversation(id int) error {
|
|||
err := cp.storage.AcceptConversation(id)
|
||||
if err == nil {
|
||||
// If a p2p conversation then attempt to peer with the onion...
|
||||
// Groups and Server have their own acceptance flow.
|
||||
ci, _ := cp.storage.GetConversation(id)
|
||||
// Groups and Server have their own acceptance flow.,
|
||||
ci, err := cp.storage.GetConversation(id)
|
||||
if err != nil {
|
||||
log.Errorf("Could not get conversation for %v: %v", id, err)
|
||||
return err
|
||||
}
|
||||
if !ci.IsGroup() && !ci.IsServer() {
|
||||
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthApproved)}))
|
||||
cp.sendUpdateAuth(id, ci.Handle, ci.Accepted, ci.ACL[ci.Handle].Blocked)
|
||||
cp.PeerWithOnion(ci.Handle)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// BlockConversation looks up a conversation by `handle` and sets the Accepted status to `true`
|
||||
// This will cause Cwtch to auto connect to this conversation on start up
|
||||
// BlockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
|
||||
// This will cause Cwtch to never try to connect to and refuse connections from the peer
|
||||
func (cp *cwtchPeer) BlockConversation(id int) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
@ -505,14 +509,51 @@ func (cp *cwtchPeer) BlockConversation(id int) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// p2p conversations have a single ACL referencing the remote peer. Set this to blocked...
|
||||
ci.ACL[ci.Handle] = model.AccessControl{Blocked: true, Read: false, Append: false}
|
||||
if ac, exists := ci.ACL[ci.Handle]; exists {
|
||||
ac.Blocked = true
|
||||
ci.ACL[ci.Handle] = ac
|
||||
}
|
||||
|
||||
// Send an event in any case to block the protocol engine...
|
||||
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
|
||||
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)}))
|
||||
cp.sendUpdateAuth(id, ci.Handle, ci.Accepted, ci.ACL[ci.Handle].Blocked)
|
||||
|
||||
return cp.storage.SetConversationACL(id, ci.ACL)
|
||||
}
|
||||
|
||||
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
|
||||
// Further actions depend on the Accepted field
|
||||
func (cp *cwtchPeer) UnblockConversation(id int) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
ci, err := cp.storage.GetConversation(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// p2p conversations have a single ACL referencing the remote peer. Set ACL's blocked to false...
|
||||
if ac, exists := ci.ACL[ci.Handle]; exists {
|
||||
ac.Blocked = false
|
||||
ci.ACL[ci.Handle] = ac
|
||||
}
|
||||
|
||||
// Send an event in any case to block the protocol engine...
|
||||
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
|
||||
cp.sendUpdateAuth(id, ci.Handle, ci.Accepted, ci.ACL[ci.Handle].Blocked)
|
||||
|
||||
if !ci.IsGroup() && !ci.IsServer() && ci.Accepted {
|
||||
cp.PeerWithOnion(ci.Handle)
|
||||
}
|
||||
|
||||
return cp.storage.SetConversationACL(id, ci.ACL)
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) sendUpdateAuth(id int, handle string, accepted bool, blocked bool) {
|
||||
cp.eventBus.Publish(event.NewEvent(event.UpdateConversationAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: handle, event.Accepted: strconv.FormatBool(accepted), event.Blocked: strconv.FormatBool(blocked)}))
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
|
|
@ -393,7 +393,7 @@ func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
|
|||
|
||||
// SetConversationACL sets a new ACL on a given conversation.
|
||||
func (cps *CwtchProfileStorage) SetConversationACL(id int, acl model.AccessControlList) error {
|
||||
_, err := cps.setConversationACLStmt.Exec(acl, id)
|
||||
_, err := cps.setConversationACLStmt.Exec(acl.Serialize(), id)
|
||||
if err != nil {
|
||||
log.Errorf("error executing query: %v", err)
|
||||
return err
|
||||
|
|
|
@ -101,6 +101,7 @@ type CwtchPeer interface {
|
|||
FetchConversationInfo(handle string) (*model.Conversation, error)
|
||||
AcceptConversation(conversation int) error
|
||||
BlockConversation(conversation int) error
|
||||
UnblockConversation(conversation int) error
|
||||
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
|
||||
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
|
||||
DeleteConversation(conversation int) error
|
||||
|
|
|
@ -103,7 +103,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
|||
engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
|
||||
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)
|
||||
|
||||
engine.eventManager.Subscribe(event.SetPeerAuthorization, engine.queue)
|
||||
engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue)
|
||||
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
|
||||
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
|
||||
|
||||
|
@ -186,8 +186,15 @@ func (e *engine) eventHandler() {
|
|||
if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
}
|
||||
case event.SetPeerAuthorization:
|
||||
auth := model.Authorization(ev.Data[event.Authorization])
|
||||
case event.UpdateConversationAuthorization:
|
||||
accepted, _ := strconv.ParseBool(ev.Data[event.Accepted])
|
||||
blocked, _ := strconv.ParseBool(ev.Data[event.Blocked])
|
||||
auth := model.AuthUnknown
|
||||
if blocked {
|
||||
auth = model.AuthBlocked
|
||||
} else if accepted {
|
||||
auth = model.AuthApproved
|
||||
}
|
||||
e.authorizations.Store(ev.Data[event.RemotePeer], auth)
|
||||
if auth == model.AuthBlocked {
|
||||
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
|
||||
|
|
Loading…
Reference in New Issue