From 56cf2b7bf611dcf2db478357de157c9fafb20c3c Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 29 Jul 2022 17:24:22 -0700 Subject: [PATCH 1/4] remove unused events (libcwtch-rs audit); add anti dup on import --- event/common.go | 15 --------------- peer/cwtch_peer.go | 16 +++++++++++++--- protocol/connections/engine.go | 9 --------- testing/cwtch_peer_server_integration_test.go | 8 ++++++-- 4 files changed, 19 insertions(+), 29 deletions(-) diff --git a/event/common.go b/event/common.go index 1a77804..8829752 100644 --- a/event/common.go +++ b/event/common.go @@ -124,23 +124,11 @@ const ( // Password, NewPassword ChangePassword = Type("ChangePassword") - // Error(err), EventID - ChangePasswordError = Type("ChangePasswordError") - - // EventID - ChangePasswordSuccess = Type("ChangePasswordSuccess") - // a group has been successfully added or newly created // attributes: // Data [serialized *model.Group] GroupCreated = Type("GroupCreated") - // RemotePeer - DeleteContact = Type("DeleteContact") - - // GroupID - DeleteGroup = Type("DeleteGroup") - // PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections // RemotePeer // ConnectionState @@ -192,9 +180,6 @@ const ( Syn = Type("Syn") Ack = Type("Ack") - // For situations where we want to update $Identity -> $RemotePeer/$GroupID's total message count to be $Data - MessageCounterResync = Type("MessageCounterResync") - // File Handling Events StopFileShare = Type("StopFileShare") StopAllFileShares = Type("StopAllFileShares") diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 40ecacf..78fe9ac 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -483,8 +483,14 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { return -1, err } cp.mutex.Lock() + conversationInfo, err := cp.storage.GetConversationByHandle(gci.GroupID) + if conversationInfo != nil { + cp.mutex.Unlock() + return -1, err + } groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, model.AccessControlList{}, true) cp.mutex.Unlock() + if err == nil { cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost) @@ -500,9 +506,13 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) { cp.mutex.Lock() defer cp.mutex.Unlock() - conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted) - cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle})) - return conversationID, err + conversationInfo, err := cp.storage.GetConversationByHandle(handle) + if conversationInfo == nil { + conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted) + cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle})) + return conversationID, err + } + return -1, err } // AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true` diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index afe1af6..3ba557a 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -101,8 +101,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue) engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue) engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue) - engine.eventManager.Subscribe(event.DeleteContact, engine.queue) - engine.eventManager.Subscribe(event.DeleteGroup, engine.queue) engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue) engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) @@ -161,13 +159,6 @@ func (e *engine) eventHandler() { go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) case event.LeaveServer: e.leaveServer(ev.Data[event.GroupServer]) - case event.DeleteContact: - onion := ev.Data[event.RemotePeer] - // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. - e.authorizations.Delete(ev.Data[event.RemotePeer]) - e.deleteConnection(onion) - case event.DeleteGroup: - // TODO: There isn't a way here to determine if other Groups are using a server connection... case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index f2d39f9..2bbfc7b 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -144,18 +144,20 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Peering, server joining, group creation / invite ***** - log.Infoln("Alice peering with Bob...") + log.Infoln("Alice and Bob creating conversations...") // Simulate Alice Adding Bob + log.Infof(" alice.NewConvo(bob)...") alice2bobConversationID, err := alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) if err != nil { t.Fatalf("error adding conversaiton %v", alice2bobConversationID) } + log.Infof(" bob.NewConvo(alice)...") bob2aliceConversationID, err := bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true) if err != nil { t.Fatalf("error adding conversaiton %v", bob2aliceConversationID) } - log.Infof("Alice peering with Carol...") + log.Infof("Alice and Carol creating conversations...") // Simulate Alice Adding Carol alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true) if err != nil { @@ -166,7 +168,9 @@ func TestCwtchPeerIntegration(t *testing.T) { t.Fatalf("error adding conversaiton %v", carol2aliceConversationID) } + log.Infof("Alice peering with Bob...") alice.PeerWithOnion(bob.GetOnion()) + log.Infof("Alice Peering with Carol...") alice.PeerWithOnion(carol.GetOnion()) // Test that we can rekey alice without issues... From b64229c8b7b2558f5ef67f7d90bdf673fb6a7e1b Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 29 Jul 2022 17:37:40 -0700 Subject: [PATCH 2/4] delete engine.deleteConnection (unused) --- protocol/connections/engine.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 3ba557a..3b1e413 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -525,13 +525,6 @@ func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error { return e.sendPeerMessage(onion, pmodel.PeerMessage{ID: eventID, Context: event.ContextRetVal, Data: message}) } -func (e *engine) deleteConnection(id string) { - conn, err := e.service.GetConnection(id) - if err == nil { - conn.Close() - } -} - // receiveGroupMessage is a callback function that processes GroupMessages from a given server func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) { // Publish Event so that a Profile Engine can deal with it. From 3dc5dbb38ea5ecc31b7f234b3cdfe3a3a1b657fa Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Sat, 30 Jul 2022 00:19:16 -0700 Subject: [PATCH 3/4] fix errs --- peer/cwtch_peer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 78fe9ac..99c4ffa 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -483,10 +483,10 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { return -1, err } cp.mutex.Lock() - conversationInfo, err := cp.storage.GetConversationByHandle(gci.GroupID) + conversationInfo, _ := cp.storage.GetConversationByHandle(gci.GroupID) if conversationInfo != nil { cp.mutex.Unlock() - return -1, err + return -1, fmt.Errorf("group already exists") } groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, model.AccessControlList{}, true) cp.mutex.Unlock() @@ -506,13 +506,13 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) { cp.mutex.Lock() defer cp.mutex.Unlock() - conversationInfo, err := cp.storage.GetConversationByHandle(handle) + conversationInfo, _ := cp.storage.GetConversationByHandle(handle) if conversationInfo == nil { conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted) cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle})) return conversationID, err } - return -1, err + return -1, fmt.Errorf("contact conversation already exists") } // AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true` From 60caa08868668d66f37360668b84a95f6af1a966 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Sat, 30 Jul 2022 16:05:39 -0700 Subject: [PATCH 4/4] readd deletecontact and wire in --- event/common.go | 3 +++ peer/cwtch_peer.go | 7 ++++++- protocol/connections/engine.go | 13 +++++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/event/common.go b/event/common.go index 8829752..34e0bf9 100644 --- a/event/common.go +++ b/event/common.go @@ -129,6 +129,9 @@ const ( // Data [serialized *model.Group] GroupCreated = Type("GroupCreated") + // RemotePeer + DeleteContact = Type("DeleteContact") + // PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections // RemotePeer // ConnectionState diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 99c4ffa..a0d5498 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -614,7 +614,12 @@ func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, func (cp *cwtchPeer) DeleteConversation(id int) error { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.storage.DeleteConversation(id) + ci, err := cp.storage.GetConversation(id) + if err == nil && ci != nil { + cp.eventBus.Publish(event.NewEventList(event.DeleteContact, event.RemotePeer, ci.Handle)) + return cp.storage.DeleteConversation(id) + } + return fmt.Errorf("could not delete conversation, did not exist") } // SetConversationAttribute sets the conversation attribute at path to value diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 3b1e413..44d6e4e 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -101,6 +101,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue) engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue) engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue) + engine.eventManager.Subscribe(event.DeleteContact, engine.queue) engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue) engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) @@ -159,6 +160,11 @@ func (e *engine) eventHandler() { go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) case event.LeaveServer: e.leaveServer(ev.Data[event.GroupServer]) + case event.DeleteContact: + onion := ev.Data[event.RemotePeer] + // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. + e.authorizations.Delete(ev.Data[event.RemotePeer]) + e.deleteConnection(onion) case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) @@ -525,6 +531,13 @@ func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error { return e.sendPeerMessage(onion, pmodel.PeerMessage{ID: eventID, Context: event.ContextRetVal, Data: message}) } +func (e *engine) deleteConnection(id string) { + conn, err := e.service.GetConnection(id) + if err == nil { + conn.Close() + } +} + // receiveGroupMessage is a callback function that processes GroupMessages from a given server func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) { // Publish Event so that a Profile Engine can deal with it.