libcwtch-go first cut integration / message timelines etc

pull/404/head
Sarah Jamie Lewis 1 year ago
parent 5c47dd789a
commit e296c30818
  1. 15
      event/common.go
  2. 4
      functionality/filesharing/filesharing_functionality.go
  3. 3
      model/constants/attributes.go
  4. 35
      model/conversation.go
  5. 5
      model/group.go
  6. 8
      model/group_test.go
  7. 8
      model/profile.go
  8. 99
      peer/cwtch_peer.go
  9. 93
      peer/cwtchprofilestorage.go
  10. 8
      peer/profile_interface.go
  11. 2
      peer/storage.go
  12. 6
      storage/profile_store.go
  13. 159
      testing/cwtch_peer_server_integration_test.go
  14. 18
      testing/encryptedstorage/encrypted_storage_integration_test.go
  15. 34
      testing/filesharing/file_sharing_integration_test.go

@ -126,8 +126,7 @@ const (
// a peer contact has been added
// attributes:
// RemotePeer [eg ""]
// Authorization
PeerCreated = Type("PeerCreated")
ContactCreated = Type("ContactCreated")
// Password, NewPassword
ChangePassword = Type("ChangePassword")
@ -273,12 +272,12 @@ const (
Identity = Field("Identity")
GroupConversationID = Field("GroupConversationID")
GroupID = Field("GroupID")
GroupServer = Field("GroupServer")
ServerTokenY = Field("ServerTokenY")
ServerTokenOnion = Field("ServerTokenOnion")
GroupInvite = Field("GroupInvite")
ConversationID = Field("ConversationID")
GroupID = Field("GroupID")
GroupServer = Field("GroupServer")
ServerTokenY = Field("ServerTokenY")
ServerTokenOnion = Field("ServerTokenOnion")
GroupInvite = Field("GroupInvite")
ProfileName = Field("ProfileName")
Password = Field("Password")

@ -40,7 +40,7 @@ type OverlayMessage struct {
// DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process
// to downloadFilePath
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, manifestFilePath string, key string) {
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int, downloadFilePath string, manifestFilePath string, key string) {
// Store local.filesharing.filekey.manifest as the location of the manifest
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), manifestFilePath)
@ -49,7 +49,7 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, down
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, key, downloadFilePath)
// Get the value of conversation.filesharing.filekey.manifest.size from `handle`
profile.SendScopedZonedGetValToContact(handle, attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key))
profile.SendScopedZonedGetValToContact(conversationID, attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key))
}
// ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file

@ -35,3 +35,6 @@ const AttrAck = "ack"
// AttrErr - conversation attribute for errored status
const AttrErr = "error"
// AttrSentTimestamp - conversation attribute for the time the message was (nominally) sent
const AttrSentTimestamp = "sent"

@ -1,6 +1,10 @@
package model
import "encoding/json"
import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"encoding/json"
)
// AccessControl is a type determining client assigned authorization to a peer
type AccessControl struct {
@ -57,3 +61,32 @@ type Conversation struct {
ACL AccessControlList
Accepted bool
}
func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
if value, exists := ci.Attributes[scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key)).ToString()]; exists {
return value, true
}
return "", false
}
func (ci *Conversation) IsGroup() bool {
if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists {
return true
}
return false
}
func (ci *Conversation) IsServer() bool {
if _, exists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(BundleType))).ToString()]; exists {
return true
}
return false
}
type ConversationMessage struct {
ID int
Body string
Attr Attributes
Signature string
ContentHash string
}

@ -30,6 +30,7 @@ const GroupInvitePrefix = "torv3"
type Group struct {
// GroupID is now derived from the GroupKey and the GroupServer
GroupID string
GroupName string
GroupKey [32]byte
GroupServer string
Version int
@ -73,11 +74,11 @@ func deriveGroupID(groupKey []byte, serverHostname string) string {
}
// Invite generates a invitation that can be sent to a cwtch peer
func (g *Group) Invite(name string) (string, error) {
func (g *Group) Invite() (string, error) {
gci := &groups.GroupInvite{
GroupID: g.GroupID,
GroupName: name,
GroupName: g.GroupName,
SharedKey: g.GroupKey[:],
ServerHost: g.GroupServer,
}

@ -19,7 +19,7 @@ func TestGroup(t *testing.T) {
Padding: []byte{},
}
invite, err := g.Invite("name")
invite, err := g.Invite()
if err != nil {
t.Fatalf("error creating group invite: %v", err)
@ -64,7 +64,7 @@ func TestGroupValidation(t *testing.T) {
Version: 0,
}
invite, _ := group.Invite("name")
invite, _ := group.Invite()
_, err := ValidateInvite(invite)
if err == nil {
@ -75,7 +75,7 @@ func TestGroupValidation(t *testing.T) {
// Generate a valid group but replace the group server...
group, _ = NewGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
group.GroupServer = "tcnkoch4nyr3cldkemejtkpqok342rbql6iclnjjs3ndgnjgufzyxvqd"
invite, _ = group.Invite("name")
invite, _ = group.Invite()
_, err = ValidateInvite(invite)
if err == nil {
@ -86,7 +86,7 @@ func TestGroupValidation(t *testing.T) {
// Generate a valid group but replace the group key...
group, _ = NewGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
group.GroupKey = sha256.Sum256([]byte{})
invite, _ = group.Invite("name")
invite, _ = group.Invite()
_, err = ValidateInvite(invite)
if err == nil {

@ -10,6 +10,7 @@ import (
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"golang.org/x/crypto/ed25519"
"io"
"path/filepath"
"sync"
"time"
)
@ -63,6 +64,13 @@ func getRandomness(arr *[]byte) {
}
}
// GenerateRandomID generates a random 16 byte hex id code
func GenerateRandomID() string {
randBytes := make([]byte, 16)
rand.Read(randBytes)
return filepath.Join(hex.EncodeToString(randBytes))
}
// EncryptMessageToGroup when given a message and a group, encrypts and signs the message under the group and
// profile
func EncryptMessageToGroup(message string, author primitives.Identity, group *Group) ([]byte, []byte, *groups.DecryptedGroupMessage, error) {

@ -98,9 +98,14 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
// SendScopedZonedGetValToContact
// Status: No change in 1.5
func (cp *cwtchPeer) SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, path string) {
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path)))
cp.eventBus.Publish(ev)
func (cp *cwtchPeer) SendScopedZonedGetValToContact(conversationID int, scope attr.Scope, zone attr.Zone, path string) {
ci, err := cp.GetConversationInfo(conversationID)
if err == nil {
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, ci.Handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path)))
cp.eventBus.Publish(ev)
} else {
log.Errorf("Error sending scoped zone to contact %v %v", conversationID, err)
}
}
// GetScopedZonedAttribute
@ -151,11 +156,11 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
if conversationInfo != nil && err == nil {
if tor.IsValidHostname(conversationInfo.Handle) {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: conversationInfo.Handle, event.Data: message})
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{"ack": event.False, "sent": time.Now().String()}, ev.EventID, model.CalculateContentHash(string(onion), message))
err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return err
}
@ -187,10 +192,13 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
}
// Insert the Group Message
cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), "Author": dm.Onion, "Sent": strconv.Itoa(int(dm.Timestamp))}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text))
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
cp.eventBus.Publish(ev)
err = cp.storage.InsertMessage(conversationInfo.ID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.False, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), "Author": dm.Onion, constants.AttrSentTimestamp: strconv.Itoa(int(dm.Timestamp))}, base64.StdEncoding.EncodeToString(sig), model.CalculateContentHash(dm.Onion, dm.Text))
if err == nil {
ev := event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.GroupID: conversationInfo.Handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
cp.eventBus.Publish(ev)
} else {
return err
}
}
return nil
}
@ -355,7 +363,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), gci.ServerHost)
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(gci.SharedKey))
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)), gci.GroupName)
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite}))
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(groupConversationID), event.GroupServer: gci.ServerHost, event.GroupInvite: exportedInvite}))
}
return groupConversationID, err
}
@ -364,7 +372,9 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle}))
return conversationID, err
}
// AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true`
@ -375,6 +385,23 @@ func (cp *cwtchPeer) AcceptConversation(id int) error {
return cp.storage.AcceptConversation(id)
}
// BlockConversation looks up a conversation by `handle` and sets the Accepted status to `true`
// This will cause Cwtch to auto connect to this conversation on start up
func (cp *cwtchPeer) BlockConversation(id int) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
ci, err := cp.storage.GetConversation(id)
if err != nil {
return err
}
// p2p conversations have a single ACL referencing the remote peer. Set this to blocked...
ci.ACL[ci.Handle] = model.AccessControl{Blocked: true, Read: false, Append: false}
// Send an event in any case to block the protocol engine...
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.RemotePeer: ci.Handle, event.Authorization: string(model.AuthBlocked)}))
return cp.storage.SetConversationACL(id, ci.ACL)
}
func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
@ -424,12 +451,29 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
return val, nil
}
// GetChannelMessage returns a message from a conversation channel referenced by the absolute ID.
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.)
func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetChannelMessage(conversation, channel, id)
}
// GetChannelMessageCount returns the absolute number of messages in a given conversation channel
func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetChannelMessageCount(conversation, channel)
}
// GetMostRecentMessages returns a selection of messages, ordered by most recently inserted
func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit)
}
// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
// Status: TODO change server handle to conversation id...?
func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
@ -445,8 +489,9 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)), name)
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.GroupID: group.GroupID,
event.GroupServer: group.GroupServer,
event.ConversationID: strconv.Itoa(conversationID),
event.GroupID: group.GroupID,
event.GroupServer: group.GroupServer,
}))
return conversationID, nil
}
@ -480,7 +525,10 @@ func (cp *cwtchPeer) AddServer(serverSpecification string) error {
// Add the contact if we don't already have it
conversationInfo, _ := cp.FetchConversationInfo(onion)
if conversationInfo == nil {
cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true)
_, err := cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true)
if err != nil {
return err
}
}
conversationInfo, err = cp.FetchConversationInfo(onion)
@ -530,13 +578,13 @@ func (cp *cwtchPeer) GetOnion() string {
// GetPeerState
// Status: Ready for 1.5
func (cp *cwtchPeer) GetPeerState(handle string) (connections.ConnectionState, bool) {
func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if state, ok := cp.state[handle]; ok {
return state, ok
return state
}
return connections.DISCONNECTED, false
return connections.DISCONNECTED
}
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
@ -593,11 +641,12 @@ func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversa
group := model.Group{
GroupID: groupID,
GroupName: groupName,
GroupKey: groupKeyFixed,
GroupServer: groupServer,
}
groupInvite, err := group.Invite(groupName)
groupInvite, err := group.Invite()
if err != nil {
return errors.New("group invite is malformed")
}
@ -773,7 +822,7 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAck: event.True, "sent": sent.String()}, signature, model.CalculateContentHash(handle, message))
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
}
// ShareFile begins hosting the given serialized manifest
@ -870,7 +919,7 @@ func (cp *cwtchPeer) eventHandler() {
val, exists = cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
}
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)})
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)})
resp.EventID = ev.EventID
if exists {
resp.Data[event.Data] = val
@ -998,14 +1047,14 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
attr[constants.AttrAck] = constants.True
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.GroupConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
return nil
}
} else {
cp.mutex.Lock()
cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), "Author": dm.Onion, "Sent": strconv.Itoa(int(dm.Timestamp))}, signature, model.CalculateContentHash(dm.Onion, dm.Text))
cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), "Author": dm.Onion, constants.AttrSentTimestamp: strconv.Itoa(int(dm.Timestamp))}, signature, model.CalculateContentHash(dm.Onion, dm.Text))
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.GroupConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
return nil
}
return err
@ -1027,7 +1076,7 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
attr[constants.AttrAck] = constants.True
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
return nil
}
return err
@ -1053,7 +1102,7 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st
attr[constants.AttrErr] = constants.True
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(eventType, map[event.Field]string{event.RemotePeer: handle, event.Error: error, event.Index: strconv.Itoa(id)}))
cp.eventBus.Publish(event.NewEvent(eventType, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Error: error, event.Index: strconv.Itoa(id)}))
return nil
}
return err

@ -41,11 +41,14 @@ type CwtchProfileStorage struct {
acceptConversationStmt *sql.Stmt
deleteConversationStmt *sql.Stmt
setConversationAttributesStmt *sql.Stmt
setConversationACLStmt *sql.Stmt
channelInsertStmts map[ChannelID]*sql.Stmt
channelUpdateMessageStmts map[ChannelID]*sql.Stmt
channelGetMessageStmts map[ChannelID]*sql.Stmt
channelGetMessageBySignatureStmts map[ChannelID]*sql.Stmt
channelGetCountStmts map[ChannelID]*sql.Stmt
channelGetMostRecentMessagesStmts map[ChannelID]*sql.Stmt
db *sql.DB
}
@ -65,6 +68,7 @@ const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted
const selectConversationByHandleSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);`
const acceptedConversationSQLStmt = `update conversations set Accepted=true where ID=(?);`
const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where ID=(?) ;`
const setConversationACLSQLStmt = `update conversations set ACL=(?) where ID=(?) ;`
const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
@ -85,6 +89,12 @@ const getMessageBySignatureFromConversationSQLStmt = `select ID from channel_%d_
// getMessageByContentHashFromConversationSQLStmt is a template for creating conversation based tables...
const getMessageByContentHashFromConversationSQLStmt = `select ID from channel_%d_%d_chat where ContentHash=(?);`
// getMessageCountFromConversationSqlStmt
const getMessageCountFromConversationSqlStmt = `select count(*) from channel_%d_%d_chat;`
// getMostRecentMessagesFromSqlStmt
const getMostRecentMessagesSqlStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);`
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements
func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
@ -147,6 +157,12 @@ func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
return nil, err
}
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
return nil, err
}
return &CwtchProfileStorage{db: db,
insertProfileKeyValueStmt: insertProfileKeyValueStmt,
selectProfileKeyValueStmt: selectProfileKeyStmt,
@ -157,10 +173,13 @@ func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
acceptConversationStmt: acceptConversationStmt,
deleteConversationStmt: deleteConversationStmt,
setConversationAttributesStmt: setConversationAttributesStmt,
setConversationACLStmt: setConversationACLStmt,
channelInsertStmts: map[ChannelID]*sql.Stmt{},
channelUpdateMessageStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageBySignatureStmts: map[ChannelID]*sql.Stmt{}},
channelGetMessageBySignatureStmts: map[ChannelID]*sql.Stmt{},
channelGetMostRecentMessagesStmts: map[ChannelID]*sql.Stmt{},
channelGetCountStmts: map[ChannelID]*sql.Stmt{}},
nil
}
@ -358,6 +377,16 @@ func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
return nil
}
// SetConversationACL sets a new ACL on a given conversation.
func (cps *CwtchProfileStorage) SetConversationACL(id int, acl model.AccessControlList) error {
_, err := cps.setConversationACLStmt.Exec(acl, id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
}
return nil
}
// SetConversationAttribute sets a new attribute on a given conversation.
func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
ci, err := cps.GetConversation(id)
@ -501,6 +530,68 @@ func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int,
return body, model.DeserializeAttributes(attributes), nil
}
// GetChannelMessageCount returns the number of messages in a channel
func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel int) (int, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetCountStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageCountFromConversationSqlStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, err
}
cps.channelGetCountStmts[channelID] = conversationStmt
}
var count int
err := cps.channelGetCountStmts[channelID].QueryRow().Scan(&count)
if err != nil {
log.Errorf("error executing query: %v", err)
return -1, err
}
return count, nil
}
// GetChannelMessageCount returns the number of messages in a channel
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetMostRecentMessagesStmts[channelID]
if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMostRecentMessagesSqlStmt, conversation, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return nil, err
}
cps.channelGetMostRecentMessagesStmts[channelID] = conversationStmt
}
rows, err := cps.channelGetMostRecentMessagesStmts[channelID].Query(limit, offset)
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
var conversationMessages []model.ConversationMessage
defer rows.Close()
for {
result := rows.Next()
if !result {
return conversationMessages, nil
}
var id int
var body string
var attributes []byte
var sig string
var contenthash string
err = rows.Scan(&id, &body, &attributes, &sig, &contenthash)
if err != nil {
return conversationMessages, err
}
conversationMessages = append(conversationMessages, model.ConversationMessage{ID: id, Body: body, Attr: model.DeserializeAttributes(attributes), Signature: sig, ContentHash: contenthash})
}
}
// Close closes the underlying database and prepared statements
func (cps *CwtchProfileStorage) Close() {
if cps.db != nil {

@ -10,7 +10,7 @@ import (
// AccessPeeringState provides access to functions relating to the underlying connections of a peer.
type AccessPeeringState interface {
GetPeerState(string) (connections.ConnectionState, bool)
GetPeerState(string) connections.ConnectionState
}
// ModifyPeeringState is a meta-interface intended to restrict callers to modify-only access to connection peers
@ -48,7 +48,7 @@ type ModifyServers interface {
type SendMessages interface {
SendMessage(conversation int, message string) error
SendInviteToConversation(conversationID int, inviteConversationID int) error
SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string)
SendScopedZonedGetValToContact(conversationID int, scope attr.Scope, zone attr.Zone, key string)
}
// ModifyMessages enables a caller to modify the messages in a timeline
@ -106,11 +106,15 @@ type CwtchPeer interface {
GetConversationInfo(conversation int) (*model.Conversation, error)
FetchConversationInfo(handle string) (*model.Conversation, error)
AcceptConversation(conversation int) error
BlockConversation(conversation int) error
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
DeleteConversation(conversation int) error
// New Unified Conversation Channel Interfaces
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
GetChannelMessageCount(conversation int, channel int) (int, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
ShareFile(fileKey string, serializedManifest string)
}

@ -5,8 +5,6 @@ import (
"database/sql"
"fmt"
"git.openprivacy.ca/openprivacy/log"
// Import SQL Cipher
_ "github.com/mutecomm/go-sqlcipher/v4"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/crypto/sha3"
"io"

@ -36,10 +36,4 @@ func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile
return v1.ReadProfile(directory, key, salt)
}
// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
return profile
}
// ********* Versioning and upgrade **********

@ -1,6 +1,7 @@
package testing
import (
// Import SQL Cipher
"crypto/rand"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/utils"
@ -15,6 +16,7 @@ import (
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
mrand "math/rand"
"os"
"os/user"
@ -31,58 +33,24 @@ var (
carolLines = []string{"Howdy, thanks!"}
)
func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
numVerified := 0
for _, message := range timeline {
fmt.Printf("%v %v> %s\n", message.Timestamp, message.PeerID, message.Message)
numVerified++
}
return numVerified
}
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, serverAddr string) {
func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
fmt.Printf("%v checking group connection...\n", peerName)
state, ok := peer.GetPeerState(serverAddr)
if ok {
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peerName, serverAddr, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), serverAddr)
}
if state != connections.SYNCED {
fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peerName, peer.GetOnion(), serverAddr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
fmt.Printf("peer %v %v CONNECTED to group %v\n", peerName, peer.GetOnion(), serverAddr)
break
}
fmt.Printf("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr)
fmt.Printf("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
}
time.Sleep(time.Second * 2)
}
return
}
func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) {
for {
state, ok := peera.GetPeerState(peerb.GetOnion())
if ok {
//log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion())
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
peerAName, _ := peera.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
peerBName, _ := peerb.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}
if state != target {
fmt.Printf("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
fmt.Printf("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break
}
time.Sleep(time.Second * 2)
}
return
}
@ -179,65 +147,37 @@ func TestCwtchPeerIntegration(t *testing.T) {
if err != nil {
t.Fatalf("error adding conversaiton %v", alice2bobConversationID)
}
alice.PeerWithOnion(bob.GetOnion())
bob2aliceConversationID, err := bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", bob2aliceConversationID)
}
fmt.Println("Alice peering with Carol...")
t.Logf("Alice peering with Carol...")
// Simulate Alice Adding Carol
alice2carolConversationID, err := alice.NewContactConversation(carol.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", alice2carolConversationID)
}
alice.PeerWithOnion(carol.GetOnion())
// Simulate Alice Creating a Group
fmt.Println("Alice joining server...")
if err := alice.AddServer(string(serverKeyBundle)); err != nil {
t.Fatalf("Failed to Add Server Bundle %v", err)
}
err = alice.JoinServer(ServerAddr)
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
fmt.Println("Creating group on ", ServerAddr, "...")
aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr)
fmt.Printf("Created group: %v!\n", aliceGroupConversationID)
if err != nil {
t.Errorf("Failed to init group: %v", err)
return
}
fmt.Println("Waiting for alice to join server...")
waitForPeerGroupConnection(t, alice, ServerAddr)
fmt.Println("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
// Need to add contact else SetContactAuth fails on peer peer doesnt exist
// Normal flow would be Bob app monitors for the new connection (a new connection state change to Auth
// and the adds the user to peer, and then approves or blocks it
// Simulate Bob adding Alice
bob2aliceConversationID, err := bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", bob2aliceConversationID)
}
bob.AddServer(string(serverKeyBundle))
waitForPeerPeerConnection(t, alice, carol)
// Simulate Carol adding Alice
carol2aliceConversationID, err := carol.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
if err != nil {
t.Fatalf("error adding conversaiton %v", carol2aliceConversationID)
}
carol.AddServer(string(serverKeyBundle))
fmt.Println("Alice and Bob getVal public.name...")
alice.PeerWithOnion(bob.GetOnion())
alice.PeerWithOnion(carol.GetOnion())
alice.SendScopedZonedGetValToContact(bob.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
bob.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
waitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
alice.SendScopedZonedGetValToContact(carol.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
carol.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
t.Logf("Alice and Bob getVal public.name...")
alice.SendScopedZonedGetValToContact(alice2bobConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
bob.SendScopedZonedGetValToContact(bob2aliceConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
alice.SendScopedZonedGetValToContact(alice2carolConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
carol.SendScopedZonedGetValToContact(carol2aliceConversationID, attr.PublicScope, attr.ProfileZone, constants.Name)
// This used to be 10, but increasing it to 30 because this is now causing frequent issues
// Probably related to latency/throughput problems in the underlying tor network.
@ -266,6 +206,35 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
fmt.Printf("Alice has carol's name as '%v'\n", carolName)
// Group Testing
// Simulate Alice Creating a Group
fmt.Println("Alice joining server...")
if err := alice.AddServer(string(serverKeyBundle)); err != nil {
t.Fatalf("Failed to Add Server Bundle %v", err)
}
bob.AddServer(string(serverKeyBundle))
carol.AddServer(string(serverKeyBundle))
err = alice.JoinServer(ServerAddr)
if err != nil {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
waitForConnection(t, alice, ServerAddr, connections.AUTHENTICATED)
// Creating a Group
fmt.Println("Creating group on ", ServerAddr, "...")
aliceGroupConversationID, err := alice.StartGroup("Our Cool Testing Group", ServerAddr)
fmt.Printf("Created group: %v!\n", aliceGroupConversationID)
if err != nil {
t.Errorf("Failed to init group: %v", err)
return
}
fmt.Println("Waiting for alice to join server...")
// Invites
fmt.Println("Alice inviting Bob to group...")
err = alice.SendInviteToConversation(alice2bobConversationID, aliceGroupConversationID)
if err != nil {
@ -289,7 +258,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
t.Fatalf("alice cannot join server %v %v", ServerAddr, err)
}
bobGroupConversationID := 3
waitForPeerGroupConnection(t, bob, ServerAddr)
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
numGoRoutinesPostServerConnect := runtime.NumGoroutine()

@ -1,6 +1,7 @@
package encryptedstorage
import (
// Import SQL Cipher
"crypto/rand"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/utils"
@ -11,6 +12,7 @@ import (
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
mrand "math/rand"
"os"
"path"
@ -73,7 +75,7 @@ func TestEncryptedStorage(t *testing.T) {
alice.PeerWithOnion(bob.GetOnion())
time.Sleep(time.Second * 30)
time.Sleep(time.Second * 40)
alice.SendMessage(2, "Hello Bob")
if err != nil {
@ -104,6 +106,20 @@ func TestEncryptedStorage(t *testing.T) {
t.Fatalf("Alices message should have been acknowledged.")
}
if count, err := alice.GetChannelMessageCount(2, 0); err != nil || count != 1 {
t.Fatalf("Channel should have a single message in it. Instead returned %v %v", count, err)
}
messages, err := alice.GetMostRecentMessages(2, 0, 0, 10)
if err != nil {
t.Fatalf("fetching messages over offset should not result in error: %v", err)
}
if len(messages) != 1 || len(messages) > 0 && messages[0].Body != "Hello Bob" {
t.Fatalf("expeced GetMostRecentMessages to return 1, instead returned: %v %v", len(messages), messages)
}
}
// Sub Test testing that Alice can add Bob, delete the conversation associated with Bob, and then add Bob again

@ -18,6 +18,8 @@ import (
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
// Import SQL Cipher
_ "github.com/mutecomm/go-sqlcipher/v4"
mrand "math/rand"
"os"
"os/user"
@ -30,22 +32,20 @@ import (
func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) {
for {
state, ok := peera.GetPeerState(peerb.GetOnion())
if ok {
//log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion())
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
peerAName, _ := peera.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
peerBName, _ := peerb.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}
state := peera.GetPeerState(peerb.GetOnion())
//log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion())
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
peerAName, _ := peera.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
peerBName, _ := peerb.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}
}
return
@ -146,7 +146,7 @@ func TestFileSharing(t *testing.T) {
err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay)
if err == nil {
filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", "cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce))
filesharingFunctionality.DownloadFile(bob, 1, "cwtch.out.png", "cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce))
}
}

Loading…
Cancel
Save