First Cut of P2P and Groups using new Storage APIs!
continuous-integration/drone/push Build is pending Details

This commit is contained in:
Sarah Jamie Lewis 2021-11-16 15:06:30 -08:00
parent 62d2497843
commit 406d900029
11 changed files with 775 additions and 431 deletions

View File

@ -273,11 +273,12 @@ const (
Identity = Field("Identity") Identity = Field("Identity")
GroupID = Field("GroupID") GroupConversationID = Field("GroupConversationID")
GroupServer = Field("GroupServer") GroupID = Field("GroupID")
ServerTokenY = Field("ServerTokenY") GroupServer = Field("GroupServer")
ServerTokenOnion = Field("ServerTokenOnion") ServerTokenY = Field("ServerTokenY")
GroupInvite = Field("GroupInvite") ServerTokenOnion = Field("ServerTokenOnion")
GroupInvite = Field("GroupInvite")
ProfileName = Field("ProfileName") ProfileName = Field("ProfileName")
Password = Field("Password") Password = Field("Password")

View File

@ -23,3 +23,15 @@ const GroupServer = "groupserver"
// GroupKey is the name of the group key attribute... // GroupKey is the name of the group key attribute...
const GroupKey = "groupkey" const GroupKey = "groupkey"
// True - true
const True = "true"
// False - false
const False = "false"
// AttrAck - conversation attribute for acknowledgement status
const AttrAck = "ack"
// AttrErr - conversation attribute for errored status
const AttrErr = "error"

View File

@ -226,3 +226,73 @@ func ValidateInvite(invite string) (*groups.GroupInvite, error) {
} }
return nil, errors.New("invite has invalid structure") return nil, errors.New("invite has invalid structure")
} }
// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
// If successful, adds the message to the group's timeline
func (g *Group) AttemptDecryption(ciphertext []byte, signature []byte) (bool, *groups.DecryptedGroupMessage) {
success, dgm := g.DecryptMessage(ciphertext)
if success {
// Attempt to serialize this message
serialized, err := json.Marshal(dgm)
// Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
// to verify the message, we simply ignore it.
if err != nil {
return false, nil
}
// This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
// be derivable from the cryptographic key) which contains many unique elements such as the time and random padding
verified := g.VerifyGroupMessage(dgm.Onion, g.GroupID, base64.StdEncoding.EncodeToString(serialized), signature)
if !verified {
// An earlier version of this protocol mistakenly signed the ciphertext of the message
// instead of the serialized decrypted group message.
// This has 2 issues:
// 1. A server with knowledge of group members public keys AND the Group ID would be able to detect valid messages
// 2. It made the metadata-security of a group dependent on keeping the cryptographically derived Group ID secret.
// While not awful, it also isn't good. For Version 3 groups only we permit Cwtch to check this older signature
// structure in a backwards compatible way for the duration of the Groups Experiment.
// TODO: Delete this check when Groups are no long Experimental
if g.Version == 3 {
verified = g.VerifyGroupMessage(dgm.Onion, g.GroupID, string(ciphertext), signature)
}
}
// So we have a message that has a valid group key, but the signature can't be verified.
// The most obvious explanation for this is that the group key has been compromised (or we are in an open group and the server is being malicious)
// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
if !verified {
return false, nil
}
return true, dgm
}
// If we couldn't find a group to decrypt the message with we just return false. This is an expected case
return false, nil
}
// VerifyGroupMessage confirms the authenticity of a message given an sender onion, message and signature.
// The goal of this function is 2-fold:
// 1. We confirm that the sender referenced in the group text is the actual sender of the message (or at least
// knows the senders private key)
// 2. Secondly, we confirm that the sender sent the message to a particular group id on a specific server (it doesn't
// matter if we actually received this message from the server or from a hybrid protocol, all that matters is
// that the sender and receivers agree that this message was intended for the group
// The 2nd point is important as it prevents an attack documented in the original Cwtch paper (and later at
// https://docs.openprivacy.ca/cwtch-security-handbook/groups.html) in which a malicious profile sets up 2 groups
// on two different servers with the same key and then forwards messages between them to convince the parties in
// each group that they are actually in one big group (with the intent to later censor and/or selectively send messages
// to each group).
func (g *Group) VerifyGroupMessage(onion string, groupID string, message string, signature []byte) bool {
// We use our group id, a known reference server and the ciphertext of the message.
m := groupID + g.GroupServer + message
// Otherwise we derive the public key from the sender and check it against that.
decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
if err == nil && len(decodedPub) >= 32 {
return ed25519.Verify(decodedPub[:32], []byte(m), signature)
}
return false
}

14
model/message_utils.go Normal file
View File

@ -0,0 +1,14 @@
package model
import (
"crypto/sha256"
"encoding/base64"
)
// CalculateContentHash derives a hash using the author and the message body. It is intended to be
// globally referencable in the context of a single conversation
func CalculateContentHash(author string, messageBody string) string {
content := []byte(author + messageBody)
contentBasedHash := sha256.Sum256(content)
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
}

View File

@ -2,11 +2,16 @@ package model
import ( import (
"crypto/rand" "crypto/rand"
"cwtch.im/cwtch/protocol/groups"
"encoding/base32" "encoding/base32"
"encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"io"
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
@ -192,41 +197,6 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) int {
return -1 return -1
} }
// VerifyGroupMessage confirms the authenticity of a message given an sender onion, message and signature.
// The goal of this function is 2-fold:
// 1. We confirm that the sender referenced in the group text is the actual sender of the message (or at least
// knows the senders private key)
// 2. Secondly, we confirm that the sender sent the message to a particular group id on a specific server (it doesn't
// matter if we actually received this message from the server or from a hybrid protocol, all that matters is
// that the sender and receivers agree that this message was intended for the group
// The 2nd point is important as it prevents an attack documented in the original Cwtch paper (and later at
// https://docs.openprivacy.ca/cwtch-security-handbook/groups.html) in which a malicious profile sets up 2 groups
// on two different servers with the same key and then forwards messages between them to convince the parties in
// each group that they are actually in one big group (with the intent to later censor and/or selectively send messages
// to each group).
//func (p *Profile) VerifyGroupMessage(onion string, groupID string, message string, signature []byte) bool {
//
// group := p.GetGroup(groupID)
// if group == nil {
// return false
// }
//
// // We use our group id, a known reference server and the ciphertext of the message.
// m := groupID + group.GroupServer + message
//
// // If the message is ostensibly from us then we check it against our public key...
// if onion == p.Onion {
// return ed25519.Verify(p.Ed25519PublicKey, []byte(m), signature)
// }
//
// // Otherwise we derive the public key from the sender and check it against that.
// decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
// if err == nil && len(decodedPub) >= 32 {
// return ed25519.Verify(decodedPub[:32], []byte(m), signature)
// }
// return false
//}
// SignMessage takes a given message and returns an Ed21159 signature // SignMessage takes a given message and returns an Ed21159 signature
func (p *Profile) SignMessage(message string) []byte { func (p *Profile) SignMessage(message string) []byte {
sig := ed25519.Sign(p.Ed25519PrivateKey, []byte(message)) sig := ed25519.Sign(p.Ed25519PrivateKey, []byte(message))
@ -258,114 +228,59 @@ func (p *Profile) SignMessage(message string) []byte {
// //
// //
//// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
//// If successful, adds the message to the group's timeline
//func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) {
// for _, group := range p.Groups {
// success, dgm := group.DecryptMessage(ciphertext)
// if success {
//
// // Attempt to serialize this message
// serialized, err := json.Marshal(dgm)
//
// // Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
// // to verify the message, we simply ignore it.
// if err != nil {
// return false, group.GroupID, nil, -1
// }
//
// // This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
// // be derivable from the cryptographic key) which contains many unique elements such as the time and random padding
// verified := p.VerifyGroupMessage(dgm.Onion, group.GroupID, base64.StdEncoding.EncodeToString(serialized), signature)
//
// if !verified {
// // An earlier version of this protocol mistakenly signed the ciphertext of the message
// // instead of the serialized decrypted group message.
// // This has 2 issues:
// // 1. A server with knowledge of group members public keys AND the Group ID would be able to detect valid messages
// // 2. It made the metadata-security of a group dependent on keeping the cryptographically derived Group ID secret.
// // While not awful, it also isn't good. For Version 3 groups only we permit Cwtch to check this older signature
// // structure in a backwards compatible way for the duration of the Groups Experiment.
// // TODO: Delete this check when Groups are no long Experimental
// if group.Version == 3 {
// verified = p.VerifyGroupMessage(dgm.Onion, group.GroupID, string(ciphertext), signature)
// }
// }
//
// // So we have a message that has a valid group key, but the signature can't be verified.
// // The most obvious explanation for this is that the group key has been compromised (or we are in an open group and the server is being malicious)
// // Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
// if !verified {
// return false, group.GroupID, nil, -1
// }
// message, index := group.AddMessage(dgm, signature)
// return true, group.GroupID, message, index
// }
// }
//
// // If we couldn't find a group to decrypt the message with we just return false. This is an expected case
// return false, "", nil, -1
//}
//
//func getRandomness(arr *[]byte) {
// if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil {
// if err != nil {
// // If we can't do randomness, just crash something is very very wrong and we are not going
// // to resolve it here....
// panic(err.Error())
// }
// }
//}
//
//// EncryptMessageToGroup when given a message and a group, encrypts and signs the message under the group and
//// profile
//func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte, []byte, error) {
//
// if len(message) > MaxGroupMessageLength {
// return nil, nil, errors.New("group message is too long")
// }
//
// group := p.GetGroup(groupID)
// if group != nil {
// timestamp := time.Now().Unix()
//
// // Select the latest message from the timeline as a reference point.
// var prevSig []byte
// if len(group.Timeline.Messages) > 0 {
// prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature
// } else {
// prevSig = []byte(group.GroupID)
// }
//
// lenPadding := MaxGroupMessageLength - len(message)
// padding := make([]byte, lenPadding)
// getRandomness(&padding)
// hexGroupID, err := hex.DecodeString(group.GroupID)
// if err != nil {
// return nil, nil, err
// }
//
// dm := &groups.DecryptedGroupMessage{
// Onion: p.Onion,
// Text: message,
// SignedGroupID: hexGroupID,
// Timestamp: uint64(timestamp),
// PreviousMessageSig: prevSig,
// Padding: padding[:],
// }
//
// ciphertext, err := group.EncryptMessage(dm)
// if err != nil {
// return nil, nil, err
// }
// serialized, _ := json.Marshal(dm)
// signature := p.SignMessage(groupID + group.GroupServer + base64.StdEncoding.EncodeToString(serialized))
// group.AddSentMessage(dm, signature)
// return ciphertext, signature, nil
// }
// return nil, nil, errors.New("group does not exist")
//}
// //
func getRandomness(arr *[]byte) {
if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil {
if err != nil {
// If we can't do randomness, just crash something is very very wrong and we are not going
// to resolve it here....
panic(err.Error())
}
}
}
// EncryptMessageToGroup when given a message and a group, encrypts and signs the message under the group and
// profile
func EncryptMessageToGroup(message string, author primitives.Identity, group *Group) ([]byte, []byte, *groups.DecryptedGroupMessage, error) {
if len(message) > MaxGroupMessageLength {
return nil, nil, nil, errors.New("group message is too long")
}
timestamp := time.Now().Unix()
// Select the latest message from the timeline as a reference point.
var prevSig []byte
if len(group.Timeline.Messages) > 0 {
prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature
} else {
prevSig = []byte(group.GroupID)
}
lenPadding := MaxGroupMessageLength - len(message)
padding := make([]byte, lenPadding)
getRandomness(&padding)
hexGroupID, err := hex.DecodeString(group.GroupID)
if err != nil {
return nil, nil, nil, err
}
dm := &groups.DecryptedGroupMessage{
Onion: author.Hostname(),
Text: message,
SignedGroupID: hexGroupID,
Timestamp: uint64(timestamp),
PreviousMessageSig: prevSig,
Padding: padding[:],
}
ciphertext, err := group.EncryptMessage(dm)
if err != nil {
return nil, nil, nil, err
}
serialized, _ := json.Marshal(dm)
signature := author.Sign([]byte(group.GroupID + group.GroupServer + base64.StdEncoding.EncodeToString(serialized)))
return ciphertext, signature, dm, nil
}
// GetCopy returns a full deep copy of the Profile struct and its members (timeline inclusion control by arg) // GetCopy returns a full deep copy of the Profile struct and its members (timeline inclusion control by arg)
func (p *Profile) GetCopy(timeline bool) *Profile { func (p *Profile) GetCopy(timeline bool) *Profile {

View File

@ -3,6 +3,7 @@ package peer
import ( import (
"crypto/rand" "crypto/rand"
"cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
@ -142,36 +143,58 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
func (cp *cwtchPeer) SendMessage(conversation int, message string) error { func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
// Group Handles are always 32 bytes in length, but we forgo any further testing here
// and delegate the group existence check to EncryptMessageToGroup
//cp.mutex.Lock()
//defer cp.mutex.Unlock()
//group := cp.Profile.GetGroup(handle)
//if group == nil {
// return errors.New("invalid group id")
//}
//
//// Group adds it's own sent message to timeline
//ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle)
//
//// Group does not exist or some other unrecoverable error...
//if err != nil {
// return err
//}
//ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
// We assume we are sending to a Contact. // We assume we are sending to a Contact.
conversationInfo, err := cp.storage.GetConversation(conversation) conversationInfo, err := cp.storage.GetConversation(conversation)
// If the contact exists replace the event id wih the index of this message in the contacts timeline... // If the contact exists replace the event id wih the index of this message in the contacts timeline...
// Otherwise assume we don't log the message in the timeline... // Otherwise assume we don't log the message in the timeline...
if conversationInfo != nil && err == nil { if conversationInfo != nil && err == nil {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: conversationInfo.Handle, event.Data: message})
//ev.EventID = strconv.Itoa(contact.Timeline.Len()) if tor.IsValidHostname(conversationInfo.Handle) {
cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{"ack": event.False, "sent": time.Now().String()}) ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: conversationInfo.Handle, event.Data: message})
cp.eventBus.Publish(ev) onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{"ack": event.False, "sent": time.Now().String()}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return err
}
cp.eventBus.Publish(ev)
} else {
group, err := cp.constructGroupFromConversation(conversationInfo)
if err != nil {
log.Errorf("error constructing group")
return err
}
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
if err != nil {
log.Errorf("error loading private key from storage")
return err
}
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
if err != nil {
log.Errorf("error loading public key from storage")
return err
}
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
ct, sig, dm, err := model.EncryptMessageToGroup(message, identity, group)
if err != nil {
return err
}
// Insert the Group Message
cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), "Author": dm.Onion, "Sent": strconv.Itoa(int(dm.Timestamp))}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text))
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
cp.eventBus.Publish(ev)
}
return nil
} }
return nil return fmt.Errorf("error sending message to conversation %v", err)
} }
// UpdateMessageFlags // UpdateMessageFlags
@ -322,15 +345,19 @@ func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) {
// ImportGroup initializes a group from an imported source rather than a peer invite // ImportGroup initializes a group from an imported source rather than a peer invite
// Status: TODO // Status: TODO
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
cp.mutex.Lock() gci, err := model.ValidateInvite(exportedInvite)
defer cp.mutex.Unlock() if err != nil {
// //gid, err := model.ProcessInvite(exportedInvite) return -1, err
// }
// if err == nil { groupConversationID, err := cp.NewContactConversation(gci.GroupID, model.DefaultP2PAccessControl(), true)
// cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: gid, event.GroupInvite: exportedInvite})) if err == nil {
// } cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID)
// cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost)
return -1, nil cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey))
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)), gci.GroupName)
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite}))
}
return groupConversationID, err
} }
// NewContactConversation create a new p2p conversation with the given acl applied to the handle. // NewContactConversation create a new p2p conversation with the given acl applied to the handle.
@ -378,6 +405,7 @@ func (cp *cwtchPeer) DeleteConversation(id int) error {
func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error { func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
log.Debugf("setting %v %v on conversation %v", path, value, id)
return cp.storage.SetConversationAttribute(id, path, value) return cp.storage.SetConversationAttribute(id, path, value)
} }
@ -404,7 +432,7 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s
// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error. // StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
// Status: TODO change server handle to conversation id...? // Status: TODO change server handle to conversation id...?
func (cp *cwtchPeer) StartGroup(server string) (int, error) { func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
group, err := model.NewGroup(server) group, err := model.NewGroup(server)
if err == nil { if err == nil {
conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true) conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true)
@ -414,6 +442,7 @@ func (cp *cwtchPeer) StartGroup(server string) (int, error) {
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), group.GroupID) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), group.GroupID)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:])) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:]))
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)), name)
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{ cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.GroupID: group.GroupID, event.GroupID: group.GroupID,
@ -521,9 +550,9 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) error { func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) error {
var invite model.MessageWrapper var invite model.MessageWrapper
conversationInfo, err := cp.GetConversationInfo(inviteConversationID) inviteConversationInfo, err := cp.GetConversationInfo(inviteConversationID)
if conversationInfo != nil || err != nil { if inviteConversationInfo == nil || err != nil {
return err return err
} }
@ -533,25 +562,25 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
//if err == nil { //if err == nil {
// invite = model.MessageWrapper{Overlay: 101, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), inviteStr)} // invite = model.MessageWrapper{Overlay: 101, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), inviteStr)}
//} //}
if tor.IsValidHostname(conversationInfo.Handle) { if tor.IsValidHostname(inviteConversationInfo.Handle) {
invite = model.MessageWrapper{Overlay: 100, Data: conversationInfo.Handle} invite = model.MessageWrapper{Overlay: 100, Data: inviteConversationInfo.Handle}
} else { } else {
// Reconstruct Group // Reconstruct Group
groupID, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()] groupID, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed") return errors.New("group structure is malformed - no id")
} }
groupServer, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()] groupServer, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed") return errors.New("group structure is malformed - no server")
} }
groupKeyBase64, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()] groupKeyBase64, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed") return errors.New("group structure is malformed - no key")
} }
groupName, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)).ToString()] groupName, ok := inviteConversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)).ToString()]
if !ok { if !ok {
return errors.New("group structure is malformed") return errors.New("group structure is malformed - no name")
} }
groupKey, err := base64.StdEncoding.DecodeString(groupKeyBase64) groupKey, err := base64.StdEncoding.DecodeString(groupKeyBase64)
@ -569,7 +598,7 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
} }
groupInvite, err := group.Invite(groupName) groupInvite, err := group.Invite(groupName)
if !ok { if err != nil {
return errors.New("group invite is malformed") return errors.New("group invite is malformed")
} }
@ -589,27 +618,66 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
inviteBytes, err := json.Marshal(invite) inviteBytes, err := json.Marshal(invite)
if err != nil { if err != nil {
log.Errorf("malformed invite: %v", err) log.Errorf("malformed invite: %v", err)
} else { return err
cp.SendMessage(conversationID, string(inviteBytes))
} }
return nil return cp.SendMessage(conversationID, string(inviteBytes))
}
const serverPrefix = "server:"
const tofuBundlePrefix = "tofubundle:"
const groupPrefix = "torv3"
const importBundlePrefix = "importBundle"
func (cp *cwtchPeer) ImportBundle(importString string) error {
if strings.HasPrefix(importString, tofuBundlePrefix) {
bundle := strings.Split(importString, "||")
if len(bundle) == 2 {
err := cp.ImportBundle(bundle[0][len(tofuBundlePrefix):])
// if the server import failed then abort the whole process..
if err != nil && !strings.HasSuffix(err.Error(), "success") {
return ConstructResponse(importBundlePrefix, err.Error())
}
return cp.ImportBundle(bundle[1])
}
} else if strings.HasPrefix(importString, serverPrefix) {
// Server Key Bundles are prefixed with
bundle, err := base64.StdEncoding.DecodeString(importString[len(serverPrefix):])
if err == nil {
if err = cp.AddServer(string(bundle)); err != nil {
return ConstructResponse(importBundlePrefix, err.Error())
}
return ConstructResponse(importBundlePrefix, "success")
}
return ConstructResponse(importBundlePrefix, err.Error())
} else if strings.HasPrefix(importString, groupPrefix) {
//eg: torv3JFDWkXExBsZLkjvfkkuAxHsiLGZBk0bvoeJID9ItYnU=EsEBCiBhOWJhZDU1OTQ0NWI3YmM2N2YxYTM5YjkzMTNmNTczNRIgpHeNaG+6jy750eDhwLO39UX4f2xs0irK/M3P6mDSYQIaOTJjM2ttb29ibnlnaGoyenc2cHd2N2Q1N3l6bGQ3NTNhdW8zdWdhdWV6enB2ZmFrM2FoYzRiZHlkCiJAdVSSVgsksceIfHe41OJu9ZFHO8Kwv3G6F5OK3Hw4qZ6hn6SiZjtmJlJezoBH0voZlCahOU7jCOg+dsENndZxAA==
if _, err := cp.ImportGroup(importString); err != nil {
return ConstructResponse(importBundlePrefix, err.Error())
}
return ConstructResponse(importBundlePrefix, "success")
}
return ConstructResponse(importBundlePrefix, "invalid_group_invite_prefix")
} }
// JoinServer manages a new server connection with the given onion address // JoinServer manages a new server connection with the given onion address
// Status: TODO // Status: TODO
func (cp *cwtchPeer) JoinServer(onion string) error { func (cp *cwtchPeer) JoinServer(onion string) error {
ci, err := cp.FetchConversationInfo(onion)
if ci == nil || err != nil {
return errors.New("no keys found for server connection")
}
//if cp.GetContact(onion) != nil { //if cp.GetContact(onion) != nil {
// tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass)) tokenY, yExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypePrivacyPass))).ToString()]
// tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion)) tokenOnion, onionExists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()]
// if yExists && onionExists { if yExists && onionExists {
// signature, exists := cp.GetContactAttribute(onion, lastKnownSignature) signature, exists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(lastKnownSignature)).ToString()]
// if !exists { if !exists {
// signature = base64.StdEncoding.EncodeToString([]byte{}) signature = base64.StdEncoding.EncodeToString([]byte{})
// } }
// cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature})) cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature}))
// return nil return nil
// } }
//}
return errors.New("no keys found for server connection") return errors.New("no keys found for server connection")
} }
@ -702,7 +770,10 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
} }
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{"ack": event.True, "sent": sent.String()})
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAck: event.True, "sent": sent.String()}, signature, model.CalculateContentHash(handle, message))
} }
// ShareFile begins hosting the given serialized manifest // ShareFile begins hosting the given serialized manifest
@ -723,11 +794,11 @@ func (cp *cwtchPeer) eventHandler() {
log.Infof("Protocol engine for %v has stopped listening", cp.GetOnion()) log.Infof("Protocol engine for %v has stopped listening", cp.GetOnion())
cp.mutex.Unlock() cp.mutex.Unlock()
case event.EncryptedGroupMessage: case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline // If successful, a side effect is the message is added to the group's timeline
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
//ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
//signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) log.Debugf("received encrypted group message: %x", ev.Data[event.Signature])
// SECURITY NOTE: A malicious server could insert posts such that everyone always has a different lastKnownSignature // SECURITY NOTE: A malicious server could insert posts such that everyone always has a different lastKnownSignature
// However the server can always replace **all** messages in an attempt to track users // However the server can always replace **all** messages in an attempt to track users
// This is mitigated somewhat by resync events which do wipe things entire. // This is mitigated somewhat by resync events which do wipe things entire.
@ -736,37 +807,42 @@ func (cp *cwtchPeer) eventHandler() {
// store the base64 encoded signature for later use // store the base64 encoded signature for later use
//cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature]) //cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
cp.mutex.Lock() conversations, err := cp.FetchConversations()
//ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature) if err == nil {
cp.mutex.Unlock() for _, conversationInfo := range conversations {
//if ok && index > -1 { if tor.IsValidHostname(conversationInfo.Handle) == false {
// cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)})) group, err := cp.constructGroupFromConversation(conversationInfo)
//} if err == nil {
success, dgm := group.AttemptDecryption(ciphertext, signature)
if success {
// Time to either acknowledge the message or insert a new message
cp.attemptInsertOrAcknowledgeLegacyGroupConversation(conversationInfo.ID, ev.Data[event.Signature], dgm)
break
}
}
}
}
}
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
case event.PeerAcknowledgement: case event.PeerAcknowledgement:
cp.mutex.Lock() err := cp.attemptAcknowledgeP2PConversation(ev.Data[event.RemotePeer], ev.Data[event.EventID])
//idx := cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) if err != nil {
//edata := ev.Data // Note: This is not an Error because malicious peers can just send acks for random things
//edata[event.Index] = strconv.Itoa(idx) // There is no point in polluting error logs with that mess.
//cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, edata)) log.Debugf("failed to acknowledge acknowledgement: %v", err)
cp.mutex.Unlock() }
case event.SendMessageToGroupError: case event.SendMessageToGroupError:
cp.mutex.Lock() err := cp.attemptErrorConversationMessage(ev.Data[event.GroupID], ev.Data[event.Signature], event.SendMessageToGroupError, ev.Data[event.Error])
//signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) if err != nil {
//cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupID], signature, ev.Data[event.Error]) log.Errorf("failed to error p2p message: %v", err)
cp.mutex.Unlock() }
case event.SendMessageToPeerError: case event.SendMessageToPeerError:
cp.mutex.Lock() err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], event.SendMessageToPeerError, ev.Data[event.Error])
//idx := cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error]) if err != nil {
//edata := ev.Data log.Errorf("failed to error p2p message: %v", err)
//edata[event.Index] = strconv.Itoa(idx) }
//cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, edata))
cp.mutex.Unlock()
case event.RetryServerRequest: case event.RetryServerRequest:
// Automated Join Server Request triggered by a plugin. // Automated Join Server Request triggered by a plugin.
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer]) log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
@ -781,7 +857,8 @@ func (cp *cwtchPeer) eventHandler() {
log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, path, onion) log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, path, onion)
conversationInfo, _ := cp.FetchConversationInfo(onion) conversationInfo, err := cp.FetchConversationInfo(onion)
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
if conversationInfo != nil && conversationInfo.Accepted { if conversationInfo != nil && conversationInfo.Accepted {
scope := attr.IntoScope(scope) scope := attr.IntoScope(scope)
if scope.IsPublic() || scope.IsConversation() { if scope.IsPublic() || scope.IsConversation() {
@ -858,7 +935,7 @@ func (cp *cwtchPeer) eventHandler() {
path := ev.Data[event.Path] path := ev.Data[event.Path]
val := ev.Data[event.Data] val := ev.Data[event.Data]
exists, _ := strconv.ParseBool(ev.Data[event.Exists]) exists, _ := strconv.ParseBool(ev.Data[event.Exists])
log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val) log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", onion, scope, path, exists, val)
if exists { if exists {
// Handle File Sharing Metadata // Handle File Sharing Metadata
@ -880,8 +957,12 @@ func (cp *cwtchPeer) eventHandler() {
// Allow public profile parameters to be added as peer specific attributes... // Allow public profile parameters to be added as peer specific attributes...
if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone { if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone {
ci, err := cp.FetchConversationInfo(onion) ci, err := cp.FetchConversationInfo(onion)
if ci != nil && err != nil { log.Debugf("fetch conversation info %v %v", ci, err)
cp.SetConversationAttribute(ci.ID, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val) if ci != nil && err == nil {
err := cp.SetConversationAttribute(ci.ID, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val)
if err != nil {
log.Errorf("error setting conversation attribute %v", err)
}
} }
} }
} }
@ -902,3 +983,104 @@ func (cp *cwtchPeer) eventHandler() {
} }
} }
} }
// attemptInsertOrAcknowledgeLegacyGroupConversation is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message
func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversationID int, signature string, dm *groups.DecryptedGroupMessage) error {
log.Infof("attempting to insert or ack group message %v %v", conversationID, signature)
messageID, err := cp.GetChannelMessageBySignature(conversationID, 0, signature)
// We have received our own message (probably), acknowledge and move on...
if err == nil {
_, attr, err := cp.GetChannelMessage(conversationID, 0, messageID)
if err == nil {
cp.mutex.Lock()
attr[constants.AttrAck] = constants.True
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.GroupConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
return nil
}
} else {
cp.mutex.Lock()
cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), "Author": dm.Onion, "Sent": strconv.Itoa(int(dm.Timestamp))}, signature, model.CalculateContentHash(dm.Onion, dm.Text))
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.GroupConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
return nil
}
return err
}
// attemptAcknowledgeP2PConversation is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message
func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature string) error {
ci, err := cp.FetchConversationInfo(handle)
// We should *never* received a peer acknowledgement for a conversation that doesn't exist...
if ci != nil && err == nil {
// for p2p messages the randomly generated event ID is the "signature"
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
if err == nil {
_, attr, err := cp.GetChannelMessage(ci.ID, 0, id)
if err == nil {
cp.mutex.Lock()
attr[constants.AttrAck] = constants.True
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
return nil
}
return err
}
return err
}
return err
}
// attemptErrorConversationMessage is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as errored. returns error on failure
// to either find the contact or the associated message
func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature string, eventType event.Type, error string) error {
ci, err := cp.FetchConversationInfo(handle)
// We should *never* received a peer acknowledgement for a conversation that doesn't exist...
if ci != nil && err == nil {
// for p2p messages the randomly generated event ID is the "signature"
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
if err == nil {
_, attr, err := cp.GetChannelMessage(ci.ID, 0, id)
if err == nil {
cp.mutex.Lock()
attr[constants.AttrErr] = constants.True
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(eventType, map[event.Field]string{event.RemotePeer: handle, event.Error: error, event.Index: strconv.Itoa(id)}))
return nil
}
return err
}
return err
}
return err
}
func (cp *cwtchPeer) GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetChannelMessageBySignature(conversationID, channelID, signature)
}
func (cp *cwtchPeer) constructGroupFromConversation(conversationInfo *model.Conversation) (*model.Group, error) {
key := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()]
groupKey, err := base64.StdEncoding.DecodeString(key)
if err != nil {
return nil, errors.New("group key is malformed")
}
var groupKeyFixed [32]byte
copy(groupKeyFixed[:], groupKey[:])
group := model.Group{
GroupID: conversationInfo.Handle,
GroupServer: conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()],
GroupKey: groupKeyFixed,
}
return &group, nil
}

View File

@ -42,8 +42,10 @@ type CwtchProfileStorage struct {
deleteConversationStmt *sql.Stmt deleteConversationStmt *sql.Stmt
setConversationAttributesStmt *sql.Stmt setConversationAttributesStmt *sql.Stmt
channelInsertStmts map[ChannelID]*sql.Stmt channelInsertStmts map[ChannelID]*sql.Stmt
channelGetMessageStmts map[ChannelID]*sql.Stmt channelUpdateMessageStmts map[ChannelID]*sql.Stmt
channelGetMessageStmts map[ChannelID]*sql.Stmt
channelGetMessageBySignatureStmts map[ChannelID]*sql.Stmt
db *sql.DB db *sql.DB
} }
@ -66,14 +68,23 @@ const setConversationAttributesSQLStmt = `update conversations set Attributes=(?
const deleteConversationSQLStmt = `delete from conversations where ID=(?);` const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables... // createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime);` const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);`
// insertMessageIntoConversationSQLStmt is a template for creating conversation based tables... // insertMessageIntoConversationSQLStmt is a template for creating conversation based tables...
const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes) values(?,?);` const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?);`
// getMessageFromConversationSQLStmt is a template for creating conversation based tables... // updateMessageIntoConversationSQLStmt is a template for updating attributes of a message in a conversation
const updateMessageIntoConversationSQLStmt = `update channel_%d_%d_chat set Attributes=(?) where ID=(?);`
// getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation
const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);` const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);`
// getMessageBySignatureFromConversationSQLStmt is a template for creating conversation based tables...
const getMessageBySignatureFromConversationSQLStmt = `select ID from channel_%d_%d_chat where Signature=(?);`
// getMessageByContentHashFromConversationSQLStmt is a template for creating conversation based tables...
const getMessageByContentHashFromConversationSQLStmt = `select ID from channel_%d_%d_chat where ContentHash=(?);`
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for // NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements // Preparing commonly used SQL Statements
func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) { func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
@ -137,17 +148,20 @@ func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
} }
return &CwtchProfileStorage{db: db, return &CwtchProfileStorage{db: db,
insertProfileKeyValueStmt: insertProfileKeyValueStmt, insertProfileKeyValueStmt: insertProfileKeyValueStmt,
selectProfileKeyValueStmt: selectProfileKeyStmt, selectProfileKeyValueStmt: selectProfileKeyStmt,
fetchAllConversationsStmt: fetchAllConversationsStmt, fetchAllConversationsStmt: fetchAllConversationsStmt,
insertConversationStmt: insertConversationStmt, insertConversationStmt: insertConversationStmt,
selectConversationStmt: selectConversationStmt, selectConversationStmt: selectConversationStmt,
selectConversationByHandleStmt: selectConversationByHandleStmt, selectConversationByHandleStmt: selectConversationByHandleStmt,
acceptConversationStmt: acceptConversationStmt, acceptConversationStmt: acceptConversationStmt,
deleteConversationStmt: deleteConversationStmt, deleteConversationStmt: deleteConversationStmt,
setConversationAttributesStmt: setConversationAttributesStmt, setConversationAttributesStmt: setConversationAttributesStmt,
channelInsertStmts: map[ChannelID]*sql.Stmt{}, channelInsertStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageStmts: map[ChannelID]*sql.Stmt{}}, nil channelUpdateMessageStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageBySignatureStmts: map[ChannelID]*sql.Stmt{}},
nil
} }
// StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine // StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine
@ -360,7 +374,7 @@ func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.Scope
} }
// InsertMessage appends a message to a conversation channel, with a given set of attributes // InsertMessage appends a message to a conversation channel, with a given set of attributes
func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes) error { func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes, signature string, contentHash string) error {
channelID := ChannelID{Conversation: conversation, Channel: channel} channelID := ChannelID{Conversation: conversation, Channel: channel}
@ -374,15 +388,78 @@ func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, bod
cps.channelInsertStmts[channelID] = conversationStmt cps.channelInsertStmts[channelID] = conversationStmt
} }
_, err := cps.channelInsertStmts[channelID].Exec(body, attributes.Serialize()) _, err := cps.channelInsertStmts[channelID].Exec(body, attributes.Serialize(), signature, contentHash)
if err != nil { if err != nil {
log.Errorf("error inserting message: %v", err) log.Errorf("error inserting message: %v %v", signature, err)
return err
}
log.Infof("inserted message with signature: %v", signature)
return nil
}
// UpdateMessageAttributes updates the attributes associated with a message of a given conversation
func (cps *CwtchProfileStorage) UpdateMessageAttributes(conversation int, channel int, messageID int, attributes model.Attributes) error {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelUpdateMessageStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(updateMessageIntoConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return err
}
cps.channelUpdateMessageStmts[channelID] = conversationStmt
}
_, err := cps.channelUpdateMessageStmts[channelID].Exec(attributes.Serialize(), messageID)
if err != nil {
log.Errorf("error updating message: %v", err)
return err return err
} }
return nil return nil
} }
// GetChannelMessageBySignature looks up a conversation message by signature instead of identifier. Both are unique but
// signatures are common between conversation participants (in groups) and so are a more useful message to index.
func (cps *CwtchProfileStorage) GetChannelMessageBySignature(conversation int, channel int, signature string) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetMessageBySignatureStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageBySignatureFromConversationSQLStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, err
}
cps.channelGetMessageBySignatureStmts[channelID] = conversationStmt
}
rows, err := cps.channelGetMessageBySignatureStmts[channelID].Query(signature)
if err != nil {
log.Errorf("error executing query: %v", err)
return -1, err
}
result := rows.Next()
if !result {
return -1, errors.New("no result found")
}
var id int
err = rows.Scan(&id)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return -1, err
}
rows.Close()
return id, nil
}
// GetChannelMessage looks up a channel message by conversation, channel and message id. On success it // GetChannelMessage looks up a channel message by conversation, channel and message id. On success it
// returns the message body and the attributes associated with the message. Otherwise an error is returned. // returns the message body and the attributes associated with the message. Otherwise an error is returned.
func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) { func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) {

View File

@ -35,7 +35,7 @@ type ReadServers interface {
// ModifyGroups provides write-only access add/edit/remove new groups // ModifyGroups provides write-only access add/edit/remove new groups
type ModifyGroups interface { type ModifyGroups interface {
ImportGroup(string) (int, error) ImportGroup(string) (int, error)
StartGroup(string) (int, error) StartGroup(string, string) (int, error)
} }
// ModifyServers provides write-only access to servers // ModifyServers provides write-only access to servers
@ -97,6 +97,9 @@ type CwtchPeer interface {
SendMessages SendMessages
ModifyMessages ModifyMessages
// Import Bundle
ImportBundle(string) error
// New Unified Conversation Interfaces // New Unified Conversation Interfaces
NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error)
FetchConversations() ([]*model.Conversation, error) FetchConversations() ([]*model.Conversation, error)

13
peer/response.go Normal file
View File

@ -0,0 +1,13 @@
package peer
import "errors"
// Response is a wrapper to better semantically convey the response type...
type Response error
const errorSeparator = "."
// ConstructResponse is a helper function for creating Response structures.
func ConstructResponse(prefix string, error string) Response {
return errors.New(prefix + errorSeparator + error)
}

View File

@ -11,6 +11,7 @@ import (
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"encoding/base64" "encoding/base64"
"encoding/json"
"fmt" "fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
@ -20,7 +21,6 @@ import (
"path" "path"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"strings"
"testing" "testing"
"time" "time"
) )
@ -119,7 +119,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
} }
pid, _ := acn.GetPID() pid, _ := acn.GetPID()
t.Logf("Tor pid: %v", pid) t.Logf("Tor pid: %v", pid)
acn.WaitTillBootstrapped()
defer acn.Close()
// ***** Cwtch Server management ***** // ***** Cwtch Server management *****
const ServerKeyBundleBase64 = "eyJLZXlzIjp7ImJ1bGxldGluX2JvYXJkX29uaW9uIjoibmZoeHp2enhpbnJpcGdkaDR0Mm00eGN5M2NyZjZwNGNiaGVjdGdja3VqM2lkc2pzYW90Z293YWQiLCJwcml2YWN5X3Bhc3NfcHVibGljX2tleSI6IjVwd2hQRGJ0c0EvdFI3ZHlUVUkzakpZZnM1L3Jaai9iQ1ZWZEpTc0Jtbk09IiwidG9rZW5fc2VydmljZV9vbmlvbiI6ImVvd25mcTRsNTZxMmU0NWs0bW03MjdsanJod3Z0aDZ5ZWN0dWV1bXB4emJ5cWxnbXVhZm1qdXFkIn0sIlNpZ25hdHVyZSI6IlY5R3NPMHNZWFJ1bGZxdzdmbGdtclVxSTBXS0JlSFIzNjIvR3hGbWZPekpEZjJaRks2ck9jNVRRR1ZxVWIrbXIwV2xId0pwdXh0UW1JRU9KNkplYkNRPT0ifQ==" const ServerKeyBundleBase64 = "eyJLZXlzIjp7ImJ1bGxldGluX2JvYXJkX29uaW9uIjoibmZoeHp2enhpbnJpcGdkaDR0Mm00eGN5M2NyZjZwNGNiaGVjdGdja3VqM2lkc2pzYW90Z293YWQiLCJwcml2YWN5X3Bhc3NfcHVibGljX2tleSI6IjVwd2hQRGJ0c0EvdFI3ZHlUVUkzakpZZnM1L3Jaai9iQ1ZWZEpTc0Jtbk09IiwidG9rZW5fc2VydmljZV9vbmlvbiI6ImVvd25mcTRsNTZxMmU0NWs0bW03MjdsanJod3Z0aDZ5ZWN0dWV1bXB4emJ5cWxnbXVhZm1qdXFkIn0sIlNpZ25hdHVyZSI6IlY5R3NPMHNZWFJ1bGZxdzdmbGdtclVxSTBXS0JlSFIzNjIvR3hGbWZPekpEZjJaRks2ck9jNVRRR1ZxVWIrbXIwV2xId0pwdXh0UW1JRU9KNkplYkNRPT0ifQ=="
@ -139,25 +140,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** cwtchPeer setup ***** // ***** cwtchPeer setup *****
fmt.Println("Creating Alice...") fmt.Println("Creating Alice...")
app.CreateTaggedPeer("alice", "asdfasdf", "test") app.CreateTaggedPeer("Alice", "asdfasdf", "test")
fmt.Println("Creating Bob...") fmt.Println("Creating Bob...")
app.CreateTaggedPeer("bob", "asdfasdf", "test") app.CreateTaggedPeer("Bob", "asdfasdf", "test")
fmt.Println("Creating Carol...") fmt.Println("Creating Carol...")
app.CreateTaggedPeer("carol", "asdfasdf", "test") app.CreateTaggedPeer("Carol", "asdfasdf", "test")
alice := utils.WaitGetPeer(app, "alice") alice := utils.WaitGetPeer(app, "Alice")
fmt.Println("Alice created:", alice.GetOnion()) fmt.Println("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := utils.WaitGetPeer(app, "bob") bob := utils.WaitGetPeer(app, "Bob")
fmt.Println("Bob created:", bob.GetOnion()) fmt.Println("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := utils.WaitGetPeer(app, "carol") carol := utils.WaitGetPeer(app, "Carol")
fmt.Println("Carol created:", carol.GetOnion()) fmt.Println("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
@ -172,20 +173,34 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** Peering, server joining, group creation / invite ***** // ***** Peering, server joining, group creation / invite *****
fmt.Println("Alice peering with Bob...")
// Simulate Alice Adding Bob
alice2bobConversationID, err := alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", alice2bobConversationID)
}
alice.PeerWithOnion(bob.GetOnion())
fmt.Println("Alice peering with Carol...")
// Simulate Alice Adding Carol
alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", alice2carolConversationID)
}
alice.PeerWithOnion(carol.GetOnion())
// Simulate Alice Creating a Group
fmt.Println("Alice joining server...") fmt.Println("Alice joining server...")
if err := alice.AddServer(string(serverKeyBundle)); err != nil { if err := alice.AddServer(string(serverKeyBundle)); err != nil {
t.Fatalf("Failed to Add Server Bundle %v", err) t.Fatalf("Failed to Add Server Bundle %v", err)
} }
alice.JoinServer(ServerAddr) err = alice.JoinServer(ServerAddr)
if err != nil {
fmt.Println("Alice peering with Bob...") t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
alice.PeerWithOnion(bob.GetOnion()) }
fmt.Println("Alice peering with Carol...")
alice.PeerWithOnion(carol.GetOnion())
fmt.Println("Creating group on ", ServerAddr, "...") fmt.Println("Creating group on ", ServerAddr, "...")
aliceGroupConversationID, err := alice.StartGroup(ServerAddr) aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr)
fmt.Printf("Created group: %v!\n", aliceGroupConversationID) fmt.Printf("Created group: %v!\n", aliceGroupConversationID)
if err != nil { if err != nil {
t.Errorf("Failed to init group: %v", err) t.Errorf("Failed to init group: %v", err)
@ -193,6 +208,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
} }
fmt.Println("Waiting for alice to join server...") fmt.Println("Waiting for alice to join server...")
waitForPeerGroupConnection(t, alice, ServerAddr) waitForPeerGroupConnection(t, alice, ServerAddr)
fmt.Println("Waiting for alice and Bob to peer...") fmt.Println("Waiting for alice and Bob to peer...")
@ -200,11 +216,19 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Need to add contact else SetContactAuth fails on peer peer doesnt exist // Need to add contact else SetContactAuth fails on peer peer doesnt exist
// Normal flow would be Bob app monitors for the new connection (a new connection state change to Auth // Normal flow would be Bob app monitors for the new connection (a new connection state change to Auth
// and the adds the user to peer, and then approves or blocks it // and the adds the user to peer, and then approves or blocks it
bob.NewContactConversation("alice?", model.DefaultP2PAccessControl(), true) // Simulate Bob adding Alice
bob2aliceConversationID, err := bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", bob2aliceConversationID)
}
bob.AddServer(string(serverKeyBundle)) bob.AddServer(string(serverKeyBundle))
waitForPeerPeerConnection(t, alice, carol) waitForPeerPeerConnection(t, alice, carol)
carol.NewContactConversation("alice?", model.DefaultP2PAccessControl(), true) // Simulate Carol adding Alice
carol2aliceConversationID, err := carol.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", carol2aliceConversationID)
}
carol.AddServer(string(serverKeyBundle)) carol.AddServer(string(serverKeyBundle))
fmt.Println("Alice and Bob getVal public.name...") fmt.Println("Alice and Bob getVal public.name...")
@ -219,194 +243,188 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Probably related to latency/throughput problems in the underlying tor network. // Probably related to latency/throughput problems in the underlying tor network.
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
aliceName, err := bob.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err := bob.GetConversationAttribute(bob2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err) t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err)
} }
fmt.Printf("Bob has alice's name as '%v'\n", aliceName) fmt.Printf("Bob has alice's name as '%v'\n", aliceName)
bobName, err := alice.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) bobName, err := alice.GetConversationAttribute(alice2bobConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || bobName != "Bob" { if err != nil || bobName != "Bob" {
t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err) t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err)
} }
fmt.Printf("Alice has bob's name as '%v'\n", bobName) fmt.Printf("Alice has bob's name as '%v'\n", bobName)
aliceName, err = carol.GetConversationAttribute(2, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) aliceName, err = carol.GetConversationAttribute(carol2aliceConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" { if err != nil || aliceName != "Alice" {
t.Fatalf("carol GetKeyVal error for alice peer.name %v: %v\n", aliceName, err) t.Fatalf("carol GetKeyVal error for alice peer.name %v: %v\n", aliceName, err)
} }
carolName, err := alice.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) carolName, err := alice.GetConversationAttribute(alice2carolConversationID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || carolName != "Carol" { if err != nil || carolName != "Carol" {
t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err) t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err)
} }
fmt.Printf("Alice has carol's name as '%v'\n", carolName) fmt.Printf("Alice has carol's name as '%v'\n", carolName)
fmt.Println("Alice inviting Bob to group...") fmt.Println("Alice inviting Bob to group...")
err = alice.SendInviteToConversation(1, aliceGroupConversationID) err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID)
if err != nil { if err != nil {
t.Fatalf("Error for Alice inviting Bob to group: %v", err) t.Fatalf("Error for Alice inviting Bob to group: %v", err)
} }
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
fmt.Println("Bob examining groups and accepting invites...") // Alice invites Bob to the Group...
for _, message := range bob.GetContact(alice.GetOnion()).Timeline.GetMessages() { message, _, err := alice.GetChannelMessage(alice2bobConversationID, 0, 1)
fmt.Printf("Found message from Alice: %v", message.Message) t.Logf("Alice message from Bob %v %v", message, err)
if strings.HasPrefix(message.Message, "torv3") { var overlayMessage model.MessageWrapper
gid, err := bob.ImportGroup(message.Message) json.Unmarshal([]byte(message), &overlayMessage)
if err == nil { t.Logf("Parsed Overlay Message: %v", overlayMessage)
fmt.Printf("Bob found invite...now accepting %v...", gid) err = bob.ImportBundle(overlayMessage.Data)
bob.AcceptInvite(gid) t.Logf("Result of Bob Importing the Bundle from Alice: %v", err)
} else {
t.Fatalf("Bob could not accept invite...%v", gid)
}
}
}
fmt.Println("Waiting for Bob to join connect to group server...") t.Logf("Waiting for Bob to join connect to group server...")
err = bob.JoinServer(ServerAddr) // for some unrealism we skip "discovering the server from the event bus
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
bobGroupConversationID := 3
waitForPeerGroupConnection(t, bob, ServerAddr) waitForPeerGroupConnection(t, bob, ServerAddr)
numGoRoutinesPostServerConnect := runtime.NumGoroutine() numGoRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Conversation ***** // ***** Conversation *****
t.Logf("Starting conversation in group...")
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[0])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[0])
checkSendMessageToGroup(t, alice, aliceGroupConversationID, aliceLines[1])
checkSendMessageToGroup(t, bob, bobGroupConversationID, bobLines[1])
fmt.Println("Starting conversation in group...") //fmt.Println("Alice inviting Carol to group...")
// Conversation //err = alice.InviteOnionToGroup(carol.GetOnion(), groupID)
fmt.Printf("%v> %v\n", aliceName, aliceLines[0]) //if err != nil {
err = alice.SendMessage(aliceGroupConversationID, aliceLines[0]) // t.Fatalf("Error for Alice inviting Carol to group: %v", err)
if err != nil { //}
t.Fatalf("Alice failed to send a message to the group: %v", err) //time.Sleep(time.Second * 60) // Account for some token acquisition in Alice and Bob flows.
} //fmt.Println("Carol examining groups and accepting invites...")
time.Sleep(time.Second * 10) //for _, message := range carol.GetContact(alice.GetOnion()).Timeline.GetMessages() {
// fmt.Printf("Found message from Alice: %v", message.Message)
fmt.Printf("%v> %v\n", bobName, bobLines[0]) // if strings.HasPrefix(message.Message, "torv3") {
err = bob.SendMessage(groupID, bobLines[0]) // gid, err := carol.ImportGroup(message.Message)
if err != nil { // if err == nil {
t.Fatalf("Bob failed to send a message to the group: %v", err) // fmt.Printf("Carol found invite...now accepting %v...", gid)
} // carol.AcceptInvite(gid)
time.Sleep(time.Second * 10) // } else {
// t.Fatalf("Carol could not accept invite...%v", gid)
fmt.Printf("%v> %v\n", aliceName, aliceLines[1]) // }
alice.SendMessage(groupID, aliceLines[1]) // }
time.Sleep(time.Second * 10) //}
//
fmt.Printf("%v> %v\n", bobName, bobLines[1]) //fmt.Println("Shutting down Alice...")
bob.SendMessage(groupID, bobLines[1]) //app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 10) //time.Sleep(time.Second * 5)
fmt.Println("Alice inviting Carol to group...")
err = alice.InviteOnionToGroup(carol.GetOnion(), groupID)
if err != nil {
t.Fatalf("Error for Alice inviting Carol to group: %v", err)
}
time.Sleep(time.Second * 60) // Account for some token acquisition in Alice and Bob flows.
fmt.Println("Carol examining groups and accepting invites...")
for _, message := range carol.GetContact(alice.GetOnion()).Timeline.GetMessages() {
fmt.Printf("Found message from Alice: %v", message.Message)
if strings.HasPrefix(message.Message, "torv3") {
gid, err := carol.ImportGroup(message.Message)
if err == nil {
fmt.Printf("Carol found invite...now accepting %v...", gid)
carol.AcceptInvite(gid)
} else {
t.Fatalf("Carol could not accept invite...%v", gid)
}
}
}
fmt.Println("Shutting down Alice...")
app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 5)
numGoRoutinesPostAlice := runtime.NumGoroutine() numGoRoutinesPostAlice := runtime.NumGoroutine()
//
//fmt.Println("Carol joining server...")
//carol.JoinServer(ServerAddr)
//waitForPeerGroupConnection(t, carol, groupID)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
//
//fmt.Printf("%v> %v", bobName, bobLines[2])
//bob.SendMessage(groupID, bobLines[2])
//// Bob should have enough tokens so we don't need to account for
//// token acquisition here...
//
//fmt.Printf("%v> %v", carolName, carolLines[0])
//carol.SendMessage(groupID, carolLines[0])
//time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
//// be warmed-up and delays should be pretty small.
//
//// ***** Verify Test *****
//
//fmt.Println("Final syncing time...")
//time.Sleep(time.Second * 30)
//
//alicesGroup := alice.GetGroup(groupID)
//if alicesGroup == nil {
// t.Error("aliceGroup == nil")
// return
//}
//
//fmt.Printf("Alice's TimeLine:\n")
//aliceVerified := printAndCountVerifedTimeline(t, alicesGroup.GetTimeline())
//if aliceVerified != 4 {
// t.Errorf("Alice did not have 4 verified messages")
//}
//
//bobsGroup := bob.GetGroup(groupID)
//if bobsGroup == nil {
// t.Error("bobGroup == nil")
// return
//}
//fmt.Printf("Bob's TimeLine:\n")
//bobVerified := printAndCountVerifedTimeline(t, bobsGroup.GetTimeline())
//if bobVerified != 6 {
// t.Errorf("Bob did not have 6 verified messages")
//}
//
//carolsGroup := carol.GetGroup(groupID)
//fmt.Printf("Carol's TimeLine:\n")
//carolVerified := printAndCountVerifedTimeline(t, carolsGroup.GetTimeline())
//if carolVerified != 6 {
// t.Errorf("Carol did not have 6 verified messages")
//}
//
//if len(alicesGroup.GetTimeline()) != 4 {
// t.Errorf("Alice's timeline does not have all messages")
//} else {
// // check message 0,1,2,3
// alicesGroup.Timeline.Sort()
// aliceGroupTimeline := alicesGroup.GetTimeline()
// if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
// aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
// t.Errorf("Some of Alice's timeline messages did not have the expected content!")
// }
fmt.Println("Carol joining server...") checkMessage(t, alice, aliceGroupConversationID, 1, aliceLines[0])
carol.JoinServer(ServerAddr) checkMessage(t, alice, aliceGroupConversationID, 2, bobLines[0])
waitForPeerGroupConnection(t, carol, groupID) checkMessage(t, alice, aliceGroupConversationID, 3, aliceLines[1])
numGoRotinesPostCarolConnect := runtime.NumGoroutine() checkMessage(t, alice, aliceGroupConversationID, 4, bobLines[1])
fmt.Printf("%v> %v", bobName, bobLines[2])
bob.SendMessage(groupID, bobLines[2])
// Bob should have enough tokens so we don't need to account for
// token acquisition here...
fmt.Printf("%v> %v", carolName, carolLines[0])
carol.SendMessage(groupID, carolLines[0])
time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
// be warmed-up and delays should be pretty small.
// ***** Verify Test *****
fmt.Println("Final syncing time...")
time.Sleep(time.Second * 30) time.Sleep(time.Second * 30)
alicesGroup := alice.GetGroup(groupID) checkMessage(t, bob, bobGroupConversationID, 1, aliceLines[0])
if alicesGroup == nil { checkMessage(t, bob, bobGroupConversationID, 2, bobLines[0])
t.Error("aliceGroup == nil") checkMessage(t, bob, bobGroupConversationID, 3, aliceLines[1])
return checkMessage(t, bob, bobGroupConversationID, 4, bobLines[1])
}
fmt.Printf("Alice's TimeLine:\n") //}
aliceVerified := printAndCountVerifedTimeline(t, alicesGroup.GetTimeline()) //
if aliceVerified != 4 { //if len(bobsGroup.GetTimeline()) != 6 {
t.Errorf("Alice did not have 4 verified messages") // t.Errorf("Bob's timeline does not have all messages")
} //} else {
// // check message 0,1,2,3,4,5
bobsGroup := bob.GetGroup(groupID) // bobsGroup.Timeline.Sort()
if bobsGroup == nil { // bobGroupTimeline := bobsGroup.GetTimeline()
t.Error("bobGroup == nil") // if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
return // bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] ||
} // bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
fmt.Printf("Bob's TimeLine:\n") // t.Errorf("Some of Bob's timeline messages did not have the expected content!")
bobVerified := printAndCountVerifedTimeline(t, bobsGroup.GetTimeline()) // }
if bobVerified != 6 { //}
t.Errorf("Bob did not have 6 verified messages") //
} //if len(carolsGroup.GetTimeline()) != 6 {
// t.Errorf("Carol's timeline does not have all messages")
carolsGroup := carol.GetGroup(groupID) //} else {
fmt.Printf("Carol's TimeLine:\n") // // check message 0,1,2,3,4,5
carolVerified := printAndCountVerifedTimeline(t, carolsGroup.GetTimeline()) // carolsGroup.Timeline.Sort()
if carolVerified != 6 { // carolGroupTimeline := carolsGroup.GetTimeline()
t.Errorf("Carol did not have 6 verified messages") // if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
} // carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
// carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] {
if len(alicesGroup.GetTimeline()) != 4 { // t.Errorf("Some of Carol's timeline messages did not have the expected content!")
t.Errorf("Alice's timeline does not have all messages") // }
} else { //}
// check message 0,1,2,3
alicesGroup.Timeline.Sort()
aliceGroupTimeline := alicesGroup.GetTimeline()
if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
t.Errorf("Some of Alice's timeline messages did not have the expected content!")
}
}
if len(bobsGroup.GetTimeline()) != 6 {
t.Errorf("Bob's timeline does not have all messages")
} else {
// check message 0,1,2,3,4,5
bobsGroup.Timeline.Sort()
bobGroupTimeline := bobsGroup.GetTimeline()
if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] ||
bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
t.Errorf("Some of Bob's timeline messages did not have the expected content!")
}
}
if len(carolsGroup.GetTimeline()) != 6 {
t.Errorf("Carol's timeline does not have all messages")
} else {
// check message 0,1,2,3,4,5
carolsGroup.Timeline.Sort()
carolGroupTimeline := carolsGroup.GetTimeline()
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] {
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
}
}
fmt.Println("Shutting down Bob...") fmt.Println("Shutting down Bob...")
app.ShutdownPeer(bob.GetOnion()) app.ShutdownPeer(bob.GetOnion())
@ -428,7 +446,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostAppShutdown := runtime.NumGoroutine() numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
fmt.Println("Shutting down ACN...") fmt.Println("Shutting down ACN...")
acn.Close() // acn.Close() TODO: ACN Now gets closed automatically with defer...attempting to close twice results in a dead lock...
time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting time.Sleep(time.Second * 2) // Server ^^ has a 5 second loop attempting reconnect before exiting
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostACN := runtime.NumGoroutine() numGoRoutinesPostACN := runtime.NumGoroutine()
@ -438,11 +456,32 @@ func TestCwtchPeerIntegration(t *testing.T) {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n", "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v\nnumGoRoutinesPostACN: %v\n",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN) numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown, numGoRoutinesPostACN)
if numGoRoutinesStart != numGoRoutinesPostACN { if numGoRoutinesStart != numGoRoutinesPostACN {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN) t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, leak detected!", numGoRoutinesStart, numGoRoutinesPostACN)
} }
}
func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, message string) {
name, _ := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
t.Logf("%v> %v\n", name, message)
err := profile.SendMessage(id, message)
if err != nil {
t.Fatalf("Alice failed to send a message to the group: %v", err)
}
time.Sleep(time.Second * 10)
}
func checkMessage(t *testing.T, profile peer.CwtchPeer, id int, messageID int, expected string) {
message, _, err := profile.GetChannelMessage(id, 0, messageID)
if err != nil {
t.Fatalf("unexpected message %v expected: %v got error: %v", profile.GetOnion(), expected, err)
}
if message != expected {
t.Fatalf("unexpected message %v expected: %v got: [%v]", profile.GetOnion(), expected, message)
}
} }

View File

@ -76,6 +76,14 @@ func TestEncryptedStorage(t *testing.T) {
time.Sleep(time.Second * 30) time.Sleep(time.Second * 30)
alice.SendMessage(2, "Hello Bob") alice.SendMessage(2, "Hello Bob")
if err != nil {
t.Fatalf("alice should have been able to fetch her own message")
}
_, attr, err := alice.GetChannelMessage(2, 0, 1)
if attr[constants.AttrAck] != "false" {
t.Fatalf("Alices message should have been acknowledged...yet")
}
time.Sleep(time.Second * 30) time.Sleep(time.Second * 30)
ci, _ := bob.FetchConversationInfo(alice.GetOnion()) ci, _ := bob.FetchConversationInfo(alice.GetOnion())
@ -86,6 +94,16 @@ func TestEncryptedStorage(t *testing.T) {
t.Logf("succesfully found message in conversation channel %v", body) t.Logf("succesfully found message in conversation channel %v", body)
} }
// Check that we received an ACk...
_, attr, err = alice.GetChannelMessage(2, 0, 1)
if err != nil {
t.Fatalf("alice should have been able to fetch her own message")
}
if attr[constants.AttrAck] != "true" {
t.Fatalf("Alices message should have been acknowledged.")
}
} }
// Sub Test testing that Alice can add Bob, delete the conversation associated with Bob, and then add Bob again // Sub Test testing that Alice can add Bob, delete the conversation associated with Bob, and then add Bob again