Merge pull request 'Updates to Event Handling given new Storage Engine' (#407) from p2p-interim-new-storage into master
continuous-integration/drone/push Build is passing Details

Reviewed-on: #407
This commit is contained in:
Dan Ballard 2021-12-01 23:07:04 +00:00
commit 7eddd1c7e5
3 changed files with 50 additions and 24 deletions

View File

@ -418,13 +418,16 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
if err != nil {
return -1, err
}
groupConversationID, err := cp.NewContactConversation(gci.GroupID, model.DefaultP2PAccessControl(), true)
cp.mutex.Lock()
groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, model.AccessControlList{}, true)
cp.mutex.Unlock()
if err == nil {
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID)
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost)
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey))
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName)
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite}))
cp.JoinServer(gci.ServerHost)
}
return groupConversationID, err
}
@ -443,7 +446,17 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
func (cp *cwtchPeer) AcceptConversation(id int) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.AcceptConversation(id)
err := cp.storage.AcceptConversation(id)
if err == nil {
// If a p2p conversation then attempt to peer with the onion...
// Groups and Server have their own acceptance flow.
ci, _ := cp.storage.GetConversation(id)
if !ci.IsGroup() && !ci.IsServer() {
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthApproved)}))
cp.PeerWithOnion(ci.Handle)
}
}
return err
}
// BlockConversation looks up a conversation by `handle` and sets the Accepted status to `true`
@ -459,7 +472,7 @@ func (cp *cwtchPeer) BlockConversation(id int) error {
ci.ACL[ci.Handle] = model.AccessControl{Blocked: true, Read: false, Append: false}
// Send an event in any case to block the protocol engine...
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)}))
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)}))
return cp.storage.SetConversationACL(id, ci.ACL)
}
@ -552,7 +565,9 @@ func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id in
func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
group, err := model.NewGroup(server)
if err == nil {
conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true)
cp.mutex.Lock()
conversationID, err := cp.storage.NewConversation(group.GroupID, map[string]string{}, model.AccessControlList{}, true)
cp.mutex.Unlock()
if err != nil {
return -1, err
}
@ -600,7 +615,10 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) {
// Add the contact if we don't already have it
conversationInfo, _ := cp.FetchConversationInfo(onion)
if conversationInfo == nil {
_, err := cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true)
cp.mutex.Lock()
// Create a new conversation but do **not** push an event out.
_, err := cp.storage.NewConversation(onion, map[string]string{}, model.AccessControlList{onion: model.DefaultP2PAccessControl()}, true)
cp.mutex.Unlock()
if err != nil {
return "", err
}
@ -746,13 +764,7 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
}
func (cp *cwtchPeer) ImportBundle(importString string) error {
if tor.IsValidHostname(importString) {
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
if err == nil {
return ConstructResponse(constants.ImportBundlePrefix, "success")
}
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
} else if strings.HasPrefix(importString, constants.TofuBundlePrefix) {
if strings.HasPrefix(importString, constants.TofuBundlePrefix) {
bundle := strings.Split(importString, "||")
if len(bundle) == 2 {
err := cp.ImportBundle(bundle[0][len(constants.TofuBundlePrefix):])
@ -778,6 +790,14 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
}
return ConstructResponse(constants.ImportBundlePrefix, "success")
} else if tor.IsValidHostname(importString) {
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
if err == nil {
// Assuming all is good, we should peer with this contact.
cp.PeerWithOnion(importString)
return ConstructResponse(constants.ImportBundlePrefix, "success")
}
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
}
return ConstructResponse(constants.ImportBundlePrefix, "invalid_group_invite_prefix")
}
@ -968,12 +988,15 @@ func (cp *cwtchPeer) eventHandler() {
case event.SendMessageToGroupError:
err := cp.attemptErrorConversationMessage(ev.Data[event.GroupID], ev.Data[event.Signature], event.SendMessageToGroupError, ev.Data[event.Error])
if err != nil {
log.Errorf("failed to error p2p message: %v", err)
log.Errorf("failed to error group message: %s %v", ev.Data[event.GroupID], err)
}
case event.SendMessageToPeerError:
err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error])
if err != nil {
log.Errorf("failed to error p2p message: %v", err)
context := ev.Data[event.EventContext]
if context == string(event.SendMessageToPeer) {
err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error])
if err != nil {
log.Errorf("failed to error p2p message: %s %v", ev.Data[event.RemotePeer], err)
}
}
case event.RetryServerRequest:
// Automated Join Server Request triggered by a plugin.
@ -1122,12 +1145,12 @@ func (cp *cwtchPeer) eventHandler() {
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message
func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversationID int, signature string, dm *groups.DecryptedGroupMessage) error {
log.Infof("attempting to insert or ack group message %v %v", conversationID, signature)
log.Debugf("attempting to insert or ack group message %v %v", conversationID, signature)
messageID, err := cp.GetChannelMessageBySignature(conversationID, 0, signature)
// We have received our own message (probably), acknowledge and move on...
if err == nil {
_, attr, err := cp.GetChannelMessage(conversationID, 0, messageID)
if err == nil {
if err == nil && attr[constants.AttrAck] != constants.True {
cp.mutex.Lock()
attr[constants.AttrAck] = constants.True
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)

View File

@ -140,7 +140,7 @@ func (e *engine) eventHandler() {
case event.InvitePeerToGroup:
err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.InvitePeerToGroup), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
}
case event.JoinServer:
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
@ -169,15 +169,15 @@ func (e *engine) eventHandler() {
context = event.ContextRaw
}
if err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
}
case event.SendGetValMessageToPeer:
if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendGetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.SendRetValMessageToPeer:
if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.SetPeerAuthorization:
auth := model.Authorization(ev.Data[event.Authorization])

View File

@ -333,13 +333,16 @@ func TestCwtchPeerIntegration(t *testing.T) {
time.Sleep(2 * time.Second)
t.Logf("Done shutdown: %v\n", runtime.NumGoroutine())
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
t.Logf("Shutting down ACN...")
acn.Restart() // kill all active tor connections...
// acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock...
time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)