Merge pull request 'Send[Message|File|Invite] returns message id' (#437) from sendRetId into master
continuous-integration/drone/push Build is pending Details
continuous-integration/drone/tag Build is pending Details

Reviewed-on: #437
This commit is contained in:
Sarah Jamie Lewis 2022-03-23 22:38:43 +00:00
commit abfa95cddb
5 changed files with 33 additions and 28 deletions

View File

@ -56,6 +56,7 @@ type manager struct {
subscribers map[Type][]Queue subscribers map[Type][]Queue
events chan []byte events chan []byte
mapMutex sync.Mutex mapMutex sync.Mutex
chanMutex sync.Mutex
internal chan bool internal chan bool
closed bool closed bool
trace bool trace bool
@ -97,6 +98,8 @@ func (em *manager) Subscribe(eventType Type, queue Queue) {
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *manager) Publish(event Event) { func (em *manager) Publish(event Event) {
em.chanMutex.Lock()
defer em.chanMutex.Unlock()
if event.EventType != "" && !em.closed { if event.EventType != "" && !em.closed {
// Debug Events for Tracing, locked behind an environment variable // Debug Events for Tracing, locked behind an environment variable
@ -160,7 +163,9 @@ func (em *manager) eventBus() {
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish // Shutdown triggers, and waits for, the internal eventBus goroutine to finish
func (em *manager) Shutdown() { func (em *manager) Shutdown() {
em.events <- []byte{} em.events <- []byte{}
em.chanMutex.Lock()
em.closed = true em.closed = true
em.chanMutex.Unlock()
// wait for eventBus to finish // wait for eventBus to finish
<-em.internal <-em.internal
close(em.events) close(em.events)

View File

@ -210,7 +210,7 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
// SendMessage is a higher level that merges sending messages to contacts and group handles // SendMessage is a higher level that merges sending messages to contacts and group handles
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then // If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
// this function will error // this function will error
func (cp *cwtchPeer) SendMessage(conversation int, message string) error { func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) {
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
@ -225,28 +225,29 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks // For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
_, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message)) id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil { if err != nil {
return err return -1, err
} }
cp.eventBus.Publish(ev) cp.eventBus.Publish(ev)
return id, nil
} else { } else {
group, err := cp.constructGroupFromConversation(conversationInfo) group, err := cp.constructGroupFromConversation(conversationInfo)
if err != nil { if err != nil {
log.Errorf("error constructing group") log.Errorf("error constructing group")
return err return -1, err
} }
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey") privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
if err != nil { if err != nil {
log.Errorf("error loading private key from storage") log.Errorf("error loading private key from storage")
return err return -1, err
} }
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey") publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
if err != nil { if err != nil {
log.Errorf("error loading public key from storage") log.Errorf("error loading public key from storage")
return err return -1, err
} }
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey)) identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
@ -260,22 +261,21 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
ct, sig, dm, err := model.EncryptMessageToGroup(message, identity, group, signature) ct, sig, dm, err := model.EncryptMessageToGroup(message, identity, group, signature)
if err != nil { if err != nil {
return err return -1, err
} }
// Insert the Group Message // Insert the Group Message
log.Debugf("sending message to group: %v", conversationInfo.ID) log.Debugf("sending message to group: %v", conversationInfo.ID)
_, err = cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text)) id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text))
if err == nil { if err == nil {
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
cp.eventBus.Publish(ev) cp.eventBus.Publish(ev)
} else { return id, nil
return err
} }
return -1, err
} }
return nil
} }
return fmt.Errorf("error sending message to conversation %v", err) return -1, fmt.Errorf("error sending message to conversation %v", err)
} }
// BlockUnknownConnections will auto disconnect from connections if authentication doesn't resolve a hostname // BlockUnknownConnections will auto disconnect from connections if authentication doesn't resolve a hostname
@ -785,13 +785,13 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
} }
// SendInviteToConversation kicks off the invite process // SendInviteToConversation kicks off the invite process
func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) error { func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) (int, error) {
var invite model.MessageWrapper var invite model.MessageWrapper
inviteConversationInfo, err := cp.GetConversationInfo(inviteConversationID) inviteConversationInfo, err := cp.GetConversationInfo(inviteConversationID)
if inviteConversationInfo == nil || err != nil { if inviteConversationInfo == nil || err != nil {
return err return -1, err
} }
if tor.IsValidHostname(inviteConversationInfo.Handle) { if tor.IsValidHostname(inviteConversationInfo.Handle) {
@ -800,24 +800,24 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
// Reconstruct Group // Reconstruct Group
groupID, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()] groupID, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed - no id") return -1, errors.New("group structure is malformed - no id")
} }
groupServer, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()] groupServer, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed - no server") return -1, errors.New("group structure is malformed - no server")
} }
groupKeyBase64, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()] groupKeyBase64, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed - no key") return -1, errors.New("group structure is malformed - no key")
} }
groupName, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)).ToString()] groupName, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed - no name") return -1, errors.New("group structure is malformed - no name")
} }
groupKey, err := base64.StdEncoding.DecodeString(groupKeyBase64) groupKey, err := base64.StdEncoding.DecodeString(groupKeyBase64)
if err != nil { if err != nil {
return errors.New("malformed group key") return -1, errors.New("malformed group key")
} }
var groupKeyFixed = [32]byte{} var groupKeyFixed = [32]byte{}
@ -832,17 +832,17 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
groupInvite, err := group.Invite() groupInvite, err := group.Invite()
if err != nil { if err != nil {
return errors.New("group invite is malformed") return -1, errors.New("group invite is malformed")
} }
serverInfo, err := cp.FetchConversationInfo(groupServer) serverInfo, err := cp.FetchConversationInfo(groupServer)
if err != nil { if err != nil {
return errors.New("unknown server associated with group") return -1, errors.New("unknown server associated with group")
} }
bundle, exists := serverInfo.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))).ToString()] bundle, exists := serverInfo.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))).ToString()]
if !exists { if !exists {
return errors.New("server bundle not found") return -1, errors.New("server bundle not found")
} }
invite = model.MessageWrapper{Overlay: model.OverlayInviteGroup, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), groupInvite)} invite = model.MessageWrapper{Overlay: model.OverlayInviteGroup, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), groupInvite)}
@ -851,7 +851,7 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
inviteBytes, err := json.Marshal(invite) inviteBytes, err := json.Marshal(invite)
if err != nil { if err != nil {
log.Errorf("malformed invite: %v", err) log.Errorf("malformed invite: %v", err)
return err return -1, err
} }
return cp.SendMessage(conversationID, string(inviteBytes)) return cp.SendMessage(conversationID, string(inviteBytes))
} }

View File

@ -46,8 +46,8 @@ type ModifyServers interface {
// SendMessages enables a caller to sender messages to a contact // SendMessages enables a caller to sender messages to a contact
type SendMessages interface { type SendMessages interface {
SendMessage(conversation int, message string) error SendMessage(conversation int, message string) (int, error)
SendInviteToConversation(conversationID int, inviteConversationID int) error SendInviteToConversation(conversationID int, inviteConversationID int) (int, error)
SendScopedZonedGetValToContact(conversationID int, scope attr.Scope, zone attr.Zone, key string) SendScopedZonedGetValToContact(conversationID int, scope attr.Scope, zone attr.Zone, key string)
} }

View File

@ -246,7 +246,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Invites // Invites
log.Infoln("Alice inviting Bob to group...") log.Infoln("Alice inviting Bob to group...")
err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID) _, err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID)
if err != nil { if err != nil {
t.Fatalf("Error for Alice inviting Bob to group: %v", err) t.Fatalf("Error for Alice inviting Bob to group: %v", err)
} }
@ -364,7 +364,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) { func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) {
name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
log.Infof("%v> %v\n", name, message) log.Infof("%v> %v\n", name, message)
err := profile.SendMessage(id, message) _, err := profile.SendMessage(id, message)
if err != nil { if err != nil {
t.Fatalf("Alice failed to send a message to the group: %v", err) t.Fatalf("Alice failed to send a message to the group: %v", err)
} }

View File

@ -17,7 +17,7 @@ gofmt -l -s -w .
# ineffassign (https://github.com/gordonklaus/ineffassign) # ineffassign (https://github.com/gordonklaus/ineffassign)
echo "Checking for ineffectual assignment of errors (unchecked errors...)" echo "Checking for ineffectual assignment of errors (unchecked errors...)"
ineffassign . ineffassign ./...
# misspell (https://github.com/client9/misspell/cmd/misspell) # misspell (https://github.com/client9/misspell/cmd/misspell)
echo "Checking for misspelled words..." echo "Checking for misspelled words..."