diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index eeddd1a..920d39f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -418,13 +418,16 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { if err != nil { return -1, err } - groupConversationID, err := cp.NewContactConversation(gci.GroupID, model.DefaultP2PAccessControl(), true) + cp.mutex.Lock() + groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, model.AccessControlList{}, true) + cp.mutex.Unlock() if err == nil { cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey)) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), gci.GroupName) cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite})) + cp.JoinServer(gci.ServerHost) } return groupConversationID, err } @@ -443,7 +446,17 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr func (cp *cwtchPeer) AcceptConversation(id int) error { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.storage.AcceptConversation(id) + err := cp.storage.AcceptConversation(id) + if err == nil { + // If a p2p conversation then attempt to peer with the onion... + // Groups and Server have their own acceptance flow. + ci, _ := cp.storage.GetConversation(id) + if !ci.IsGroup() && !ci.IsServer() { + cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthApproved)})) + cp.PeerWithOnion(ci.Handle) + } + } + return err } // BlockConversation looks up a conversation by `handle` and sets the Accepted status to `true` @@ -459,7 +472,7 @@ func (cp *cwtchPeer) BlockConversation(id int) error { ci.ACL[ci.Handle] = model.AccessControl{Blocked: true, Read: false, Append: false} // Send an event in any case to block the protocol engine... // TODO at some point in the future engine needs to understand ACLs not just legacy auth status - cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)})) + cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.ConversationID: strconv.Itoa(id), event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)})) return cp.storage.SetConversationACL(id, ci.ACL) } @@ -552,7 +565,9 @@ func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id in func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) { group, err := model.NewGroup(server) if err == nil { - conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true) + cp.mutex.Lock() + conversationID, err := cp.storage.NewConversation(group.GroupID, map[string]string{}, model.AccessControlList{}, true) + cp.mutex.Unlock() if err != nil { return -1, err } @@ -600,7 +615,10 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) (string, error) { // Add the contact if we don't already have it conversationInfo, _ := cp.FetchConversationInfo(onion) if conversationInfo == nil { - _, err := cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true) + cp.mutex.Lock() + // Create a new conversation but do **not** push an event out. + _, err := cp.storage.NewConversation(onion, map[string]string{}, model.AccessControlList{onion: model.DefaultP2PAccessControl()}, true) + cp.mutex.Unlock() if err != nil { return "", err } @@ -746,13 +764,7 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa } func (cp *cwtchPeer) ImportBundle(importString string) error { - if tor.IsValidHostname(importString) { - _, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true) - if err == nil { - return ConstructResponse(constants.ImportBundlePrefix, "success") - } - return ConstructResponse(constants.ImportBundlePrefix, err.Error()) - } else if strings.HasPrefix(importString, constants.TofuBundlePrefix) { + if strings.HasPrefix(importString, constants.TofuBundlePrefix) { bundle := strings.Split(importString, "||") if len(bundle) == 2 { err := cp.ImportBundle(bundle[0][len(constants.TofuBundlePrefix):]) @@ -778,6 +790,14 @@ func (cp *cwtchPeer) ImportBundle(importString string) error { return ConstructResponse(constants.ImportBundlePrefix, err.Error()) } return ConstructResponse(constants.ImportBundlePrefix, "success") + } else if tor.IsValidHostname(importString) { + _, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true) + if err == nil { + // Assuming all is good, we should peer with this contact. + cp.PeerWithOnion(importString) + return ConstructResponse(constants.ImportBundlePrefix, "success") + } + return ConstructResponse(constants.ImportBundlePrefix, err.Error()) } return ConstructResponse(constants.ImportBundlePrefix, "invalid_group_invite_prefix") } @@ -968,12 +988,15 @@ func (cp *cwtchPeer) eventHandler() { case event.SendMessageToGroupError: err := cp.attemptErrorConversationMessage(ev.Data[event.GroupID], ev.Data[event.Signature], event.SendMessageToGroupError, ev.Data[event.Error]) if err != nil { - log.Errorf("failed to error p2p message: %v", err) + log.Errorf("failed to error group message: %s %v", ev.Data[event.GroupID], err) } case event.SendMessageToPeerError: - err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error]) - if err != nil { - log.Errorf("failed to error p2p message: %v", err) + context := ev.Data[event.EventContext] + if context == string(event.SendMessageToPeer) { + err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error]) + if err != nil { + log.Errorf("failed to error p2p message: %s %v", ev.Data[event.RemotePeer], err) + } } case event.RetryServerRequest: // Automated Join Server Request triggered by a plugin. @@ -1122,12 +1145,12 @@ func (cp *cwtchPeer) eventHandler() { // by the given handle and attempts to mark the message as acknowledged. returns error on failure // to either find the contact or the associated message func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversationID int, signature string, dm *groups.DecryptedGroupMessage) error { - log.Infof("attempting to insert or ack group message %v %v", conversationID, signature) + log.Debugf("attempting to insert or ack group message %v %v", conversationID, signature) messageID, err := cp.GetChannelMessageBySignature(conversationID, 0, signature) // We have received our own message (probably), acknowledge and move on... if err == nil { _, attr, err := cp.GetChannelMessage(conversationID, 0, messageID) - if err == nil { + if err == nil && attr[constants.AttrAck] != constants.True { cp.mutex.Lock() attr[constants.AttrAck] = constants.True cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 03b89df..6fff43e 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -140,7 +140,7 @@ func (e *engine) eventHandler() { case event.InvitePeerToGroup: err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.InvitePeerToGroup), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.JoinServer: signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) @@ -169,15 +169,15 @@ func (e *engine) eventHandler() { context = event.ContextRaw } if err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.SendGetValMessageToPeer: if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendGetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.SendRetValMessageToPeer: if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.SetPeerAuthorization: auth := model.Authorization(ev.Data[event.Authorization])