From a0ea927a0856fe1208d52323a4283203036cc5aa Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 1 Dec 2021 04:10:46 -0800 Subject: [PATCH 1/2] Updates to Event Handling given new Storage Engine - AcceptConversation updates Peer Authorization and Peers with Contact - Group and Server no longer emit New Contact Events - SendMessageToPeer Events now contain an event Context to distinguish between get/ret vals and ui sent message errors --- peer/cwtch_peer.go | 59 +++++++++++++++++++++++----------- protocol/connections/engine.go | 8 ++--- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index eeddd1a..920d39f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -418,13 +418,16 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { if err != nil { return -1, err } - groupConversationID, err := cp.NewContactConversation(gci.GroupID, model.DefaultP2PAccessControl(), true) + cp.mutex.Lock() + groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, model.AccessControlList{}, true) + cp.mutex.Unlock() if err == nil { cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey)) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName) cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite})) + cp.JoinServer(gci.ServerHost) } return groupConversationID, err } @@ -443,7 +446,17 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr func (cp *cwtchPeer) AcceptConversation(id int) error { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.storage.AcceptConversation(id) + err := cp.storage.AcceptConversation(id) + if err == nil { + // If a p2p conversation then attempt to peer with the onion... + // Groups and Server have their own acceptance flow. + ci, _ := cp.storage.GetConversation(id) + if !ci.IsGroup() && !ci.IsServer() { + cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthApproved)})) + cp.PeerWithOnion(ci.Handle) + } + } + return err } // BlockConversation looks up a conversation by `handle` and sets the Accepted status to `true` @@ -459,7 +472,7 @@ func (cp *cwtchPeer) BlockConversation(id int) error { ci.ACL[ci.Handle] = model.AccessControl{Blocked: true, Read: false, Append: false} // Send an event in any case to block the protocol engine... // TODO at some point in the future engine needs to understand ACLs not just legacy auth status - cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)})) + cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)})) return cp.storage.SetConversationACL(id, ci.ACL) } @@ -552,7 +565,9 @@ func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id in func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) { group, err := model.NewGroup(server) if err == nil { - conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true) + cp.mutex.Lock() + conversationID, err := cp.storage.NewConversation(group.GroupID, map[string]string{}, model.AccessControlList{}, true) + cp.mutex.Unlock() if err != nil { return -1, err } @@ -600,7 +615,10 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { // Add the contact if we don't already have it conversationInfo, _ := cp.FetchConversationInfo(onion) if conversationInfo == nil { - _, err := cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true) + cp.mutex.Lock() + // Create a new conversation but do **not** push an event out. + _, err := cp.storage.NewConversation(onion, map[string]string{}, model.AccessControlList{onion: model.DefaultP2PAccessControl()}, true) + cp.mutex.Unlock() if err != nil { return "", err } @@ -746,13 +764,7 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa } func (cp *cwtchPeer) ImportBundle(importString string) error { - if tor.IsValidHostname(importString) { - _, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true) - if err == nil { - return ConstructResponse(constants.ImportBundlePrefix, "success") - } - return ConstructResponse(constants.ImportBundlePrefix, err.Error()) - } else if strings.HasPrefix(importString, constants.TofuBundlePrefix) { + if strings.HasPrefix(importString, constants.TofuBundlePrefix) { bundle := strings.Split(importString, "||") if len(bundle) == 2 { err := cp.ImportBundle(bundle[0][len(constants.TofuBundlePrefix):]) @@ -778,6 +790,14 @@ func (cp *cwtchPeer) ImportBundle(importString string) error { return ConstructResponse(constants.ImportBundlePrefix, err.Error()) } return ConstructResponse(constants.ImportBundlePrefix, "success") + } else if tor.IsValidHostname(importString) { + _, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true) + if err == nil { + // Assuming all is good, we should peer with this contact. + cp.PeerWithOnion(importString) + return ConstructResponse(constants.ImportBundlePrefix, "success") + } + return ConstructResponse(constants.ImportBundlePrefix, err.Error()) } return ConstructResponse(constants.ImportBundlePrefix, "invalid_group_invite_prefix") } @@ -968,12 +988,15 @@ func (cp *cwtchPeer) eventHandler() { case event.SendMessageToGroupError: err := cp.attemptErrorConversationMessage(ev.Data[event.GroupID], ev.Data[event.Signature], event.SendMessageToGroupError, ev.Data[event.Error]) if err != nil { - log.Errorf("failed to error p2p message: %v", err) + log.Errorf("failed to error group message: %s %v", ev.Data[event.GroupID], err) } case event.SendMessageToPeerError: - err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error]) - if err != nil { - log.Errorf("failed to error p2p message: %v", err) + context := ev.Data[event.EventContext] + if context == string(event.SendMessageToPeer) { + err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error]) + if err != nil { + log.Errorf("failed to error p2p message: %s %v", ev.Data[event.RemotePeer], err) + } } case event.RetryServerRequest: // Automated Join Server Request triggered by a plugin. @@ -1122,12 +1145,12 @@ func (cp *cwtchPeer) eventHandler() { // by the given handle and attempts to mark the message as acknowledged. returns error on failure // to either find the contact or the associated message func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversationID int, signature string, dm *groups.DecryptedGroupMessage) error { - log.Infof("attempting to insert or ack group message %v %v", conversationID, signature) + log.Debugf("attempting to insert or ack group message %v %v", conversationID, signature) messageID, err := cp.GetChannelMessageBySignature(conversationID, 0, signature) // We have received our own message (probably), acknowledge and move on... if err == nil { _, attr, err := cp.GetChannelMessage(conversationID, 0, messageID) - if err == nil { + if err == nil && attr[constants.AttrAck] != constants.True { cp.mutex.Lock() attr[constants.AttrAck] = constants.True cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 03b89df..6fff43e 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -140,7 +140,7 @@ func (e *engine) eventHandler() { case event.InvitePeerToGroup: err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.InvitePeerToGroup), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.JoinServer: signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) @@ -169,15 +169,15 @@ func (e *engine) eventHandler() { context = event.ContextRaw } if err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.SendGetValMessageToPeer: if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendGetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.SendRetValMessageToPeer: if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.SetPeerAuthorization: auth := model.Authorization(ev.Data[event.Authorization]) From 93c562097a167af13e6578d82762d0f18f78eeac Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 1 Dec 2021 13:57:56 -0800 Subject: [PATCH 2/2] Kill all Tor Connections at end of Integ Test --- testing/cwtch_peer_server_integration_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 43a708b..86e50ec 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -333,13 +333,16 @@ func TestCwtchPeerIntegration(t *testing.T) { time.Sleep(2 * time.Second) t.Logf("Done shutdown: %v\n", runtime.NumGoroutine()) - numGoRoutinesPostAppShutdown := runtime.NumGoroutine() t.Logf("Shutting down ACN...") + acn.Restart() // kill all active tor connections... // acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock... - time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them + + numGoRoutinesPostAppShutdown := runtime.NumGoroutine() + + // Printing out the current goroutines // Very useful if we are leaking any. pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)