Updates to Event Handling given new Storage Engine
- AcceptConversation updates Peer Authorization and Peers with Contact - Group and Server no longer emit New Contact Events - SendMessageToPeer Events now contain an event Context to distinguish between get/ret vals and ui sent message errors
This commit is contained in:
parent
7f40ea2b51
commit
a0ea927a08
|
@ -418,13 +418,16 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
|
|||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
groupConversationID, err := cp.NewContactConversation(gci.GroupID, model.DefaultP2PAccessControl(), true)
|
||||
cp.mutex.Lock()
|
||||
groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, model.AccessControlList{}, true)
|
||||
cp.mutex.Unlock()
|
||||
if err == nil {
|
||||
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID)
|
||||
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost)
|
||||
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey))
|
||||
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName)
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite}))
|
||||
cp.JoinServer(gci.ServerHost)
|
||||
}
|
||||
return groupConversationID, err
|
||||
}
|
||||
|
@ -443,7 +446,17 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
|
|||
func (cp *cwtchPeer) AcceptConversation(id int) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
return cp.storage.AcceptConversation(id)
|
||||
err := cp.storage.AcceptConversation(id)
|
||||
if err == nil {
|
||||
// If a p2p conversation then attempt to peer with the onion...
|
||||
// Groups and Server have their own acceptance flow.
|
||||
ci, _ := cp.storage.GetConversation(id)
|
||||
if !ci.IsGroup() && !ci.IsServer() {
|
||||
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthApproved)}))
|
||||
cp.PeerWithOnion(ci.Handle)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// BlockConversation looks up a conversation by `handle` and sets the Accepted status to `true`
|
||||
|
@ -459,7 +472,7 @@ func (cp *cwtchPeer) BlockConversation(id int) error {
|
|||
ci.ACL[ci.Handle] = model.AccessControl{Blocked: true, Read: false, Append: false}
|
||||
// Send an event in any case to block the protocol engine...
|
||||
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
|
||||
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)}))
|
||||
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)}))
|
||||
return cp.storage.SetConversationACL(id, ci.ACL)
|
||||
}
|
||||
|
||||
|
@ -552,7 +565,9 @@ func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id in
|
|||
func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
|
||||
group, err := model.NewGroup(server)
|
||||
if err == nil {
|
||||
conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true)
|
||||
cp.mutex.Lock()
|
||||
conversationID, err := cp.storage.NewConversation(group.GroupID, map[string]string{}, model.AccessControlList{}, true)
|
||||
cp.mutex.Unlock()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
@ -600,7 +615,10 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) {
|
|||
// Add the contact if we don't already have it
|
||||
conversationInfo, _ := cp.FetchConversationInfo(onion)
|
||||
if conversationInfo == nil {
|
||||
_, err := cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true)
|
||||
cp.mutex.Lock()
|
||||
// Create a new conversation but do **not** push an event out.
|
||||
_, err := cp.storage.NewConversation(onion, map[string]string{}, model.AccessControlList{onion: model.DefaultP2PAccessControl()}, true)
|
||||
cp.mutex.Unlock()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -746,13 +764,7 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
|
|||
}
|
||||
|
||||
func (cp *cwtchPeer) ImportBundle(importString string) error {
|
||||
if tor.IsValidHostname(importString) {
|
||||
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
|
||||
if err == nil {
|
||||
return ConstructResponse(constants.ImportBundlePrefix, "success")
|
||||
}
|
||||
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
|
||||
} else if strings.HasPrefix(importString, constants.TofuBundlePrefix) {
|
||||
if strings.HasPrefix(importString, constants.TofuBundlePrefix) {
|
||||
bundle := strings.Split(importString, "||")
|
||||
if len(bundle) == 2 {
|
||||
err := cp.ImportBundle(bundle[0][len(constants.TofuBundlePrefix):])
|
||||
|
@ -778,6 +790,14 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
|
|||
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
|
||||
}
|
||||
return ConstructResponse(constants.ImportBundlePrefix, "success")
|
||||
} else if tor.IsValidHostname(importString) {
|
||||
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
|
||||
if err == nil {
|
||||
// Assuming all is good, we should peer with this contact.
|
||||
cp.PeerWithOnion(importString)
|
||||
return ConstructResponse(constants.ImportBundlePrefix, "success")
|
||||
}
|
||||
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
|
||||
}
|
||||
return ConstructResponse(constants.ImportBundlePrefix, "invalid_group_invite_prefix")
|
||||
}
|
||||
|
@ -968,12 +988,15 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
case event.SendMessageToGroupError:
|
||||
err := cp.attemptErrorConversationMessage(ev.Data[event.GroupID], ev.Data[event.Signature], event.SendMessageToGroupError, ev.Data[event.Error])
|
||||
if err != nil {
|
||||
log.Errorf("failed to error p2p message: %v", err)
|
||||
log.Errorf("failed to error group message: %s %v", ev.Data[event.GroupID], err)
|
||||
}
|
||||
case event.SendMessageToPeerError:
|
||||
err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error])
|
||||
if err != nil {
|
||||
log.Errorf("failed to error p2p message: %v", err)
|
||||
context := ev.Data[event.EventContext]
|
||||
if context == string(event.SendMessageToPeer) {
|
||||
err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error])
|
||||
if err != nil {
|
||||
log.Errorf("failed to error p2p message: %s %v", ev.Data[event.RemotePeer], err)
|
||||
}
|
||||
}
|
||||
case event.RetryServerRequest:
|
||||
// Automated Join Server Request triggered by a plugin.
|
||||
|
@ -1122,12 +1145,12 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
|
||||
// to either find the contact or the associated message
|
||||
func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversationID int, signature string, dm *groups.DecryptedGroupMessage) error {
|
||||
log.Infof("attempting to insert or ack group message %v %v", conversationID, signature)
|
||||
log.Debugf("attempting to insert or ack group message %v %v", conversationID, signature)
|
||||
messageID, err := cp.GetChannelMessageBySignature(conversationID, 0, signature)
|
||||
// We have received our own message (probably), acknowledge and move on...
|
||||
if err == nil {
|
||||
_, attr, err := cp.GetChannelMessage(conversationID, 0, messageID)
|
||||
if err == nil {
|
||||
if err == nil && attr[constants.AttrAck] != constants.True {
|
||||
cp.mutex.Lock()
|
||||
attr[constants.AttrAck] = constants.True
|
||||
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
|
||||
|
|
|
@ -140,7 +140,7 @@ func (e *engine) eventHandler() {
|
|||
case event.InvitePeerToGroup:
|
||||
err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
|
||||
if err != nil {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.InvitePeerToGroup), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
||||
}
|
||||
case event.JoinServer:
|
||||
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
|
@ -169,15 +169,15 @@ func (e *engine) eventHandler() {
|
|||
context = event.ContextRaw
|
||||
}
|
||||
if err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
|
||||
}
|
||||
case event.SendGetValMessageToPeer:
|
||||
if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendGetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
}
|
||||
case event.SendRetValMessageToPeer:
|
||||
if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
}
|
||||
case event.SetPeerAuthorization:
|
||||
auth := model.Authorization(ev.Data[event.Authorization])
|
||||
|
|
Loading…
Reference in New Issue