Browse Source

File Sharing Integration Tests now Works with New Storage Code

pull/404/head
Sarah Jamie Lewis 2 months ago
parent
commit
3d0ed3d4b0
  1. 5
      .gitignore
  2. 41
      peer/cwtch_peer.go
  3. 133
      peer/cwtchprofilestorage.go
  4. 11
      peer/profile_interface.go
  5. 15
      testing/encryptedstorage/encrypted_storage_integration_test.go
  6. 30
      testing/filesharing/file_sharing_integration_test.go

5
.gitignore

@ -24,4 +24,7 @@ testing/cwtch.out.png.manifest
testing/tordir/
tokens-bak.db
tokens.db
tokens1.db
tokens1.db
arch/
testing/encryptedstorage/encrypted_storage_profiles
testing/encryptedstorage/tordir

41
peer/cwtch_peer.go

@ -138,11 +138,11 @@ func (cp *cwtchPeer) SendMessage(handle string, message string) error {
ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
} else if tor.IsValidHostname(handle) {
// We assume we are sending to a Contact.
contact, exists := cp.FetchConversationInfo(handle)
contact, err := cp.FetchConversationInfo(handle)
ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message})
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
// Otherwise assume we don't log the message in the timeline...
if exists != nil {
if contact != nil && err == nil {
//ev.EventID = strconv.Itoa(contact.Timeline.Len())
cp.mutex.Lock()
defer cp.mutex.Unlock()
@ -326,44 +326,50 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
// AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true`
// This will cause Cwtch to auto connect to this conversation on start up
func (cp *cwtchPeer) AcceptConversation(handle string) error {
func (cp *cwtchPeer) AcceptConversation(id int) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.AcceptConversation(handle)
return cp.storage.AcceptConversation(id)
}
func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.FetchConversations()
}
// FetchConversationInfo returns information about the given conversation referenced by the handle
func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetConversation(handle)
return cp.storage.GetConversationByHandle(handle)
}
// DeleteConversation purges all data about the conversation, including message timelines, referenced by the handle
func (cp *cwtchPeer) DeleteConversation(handle string) error {
func (cp *cwtchPeer) DeleteConversation(id int) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.DeleteConversation(handle)
return cp.storage.DeleteConversation(id)
}
// SetConversationAttribute sets the conversation attribute at path to value
func (cp *cwtchPeer) SetConversationAttribute(handle string, path attr.ScopedZonedPath, value string) error {
func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.SetConversationAttribute(handle, path, value)
return cp.storage.SetConversationAttribute(id, path, value)
}
// GetConversationAttribute is a shortcut method for retrieving the value of a given path
func (cp *cwtchPeer) GetConversationAttribute(handle string, path attr.ScopedZonedPath) (string, error) {
func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath) (string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
ci, err := cp.storage.GetConversation(handle)
ci, err := cp.storage.GetConversation(id)
if err != nil {
return "", err
}
val, exists := ci.Attributes[path.ToString()]
if !exists {
return "", fmt.Errorf("%v does not exist for conversation %v", path.ToString(), handle)
return "", fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id)
}
return val, nil
}
@ -521,8 +527,8 @@ func (cp *cwtchPeer) GetOnion() string {
func (cp *cwtchPeer) GetPeerState(onion string) (connections.ConnectionState, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if peer, ok := cp.Profile.Contacts[onion]; ok {
return connections.ConnectionStateToType()[peer.State], true
if state, ok := cp.state[onion]; ok {
return state, ok
}
return connections.DISCONNECTED, false
}
@ -699,7 +705,7 @@ func (cp *cwtchPeer) eventHandler() {
// The security of cwtch groups are also not dependent on the servers inability to uniquely tag connections (as long as
// it learns nothing else about each connection).
// store the base64 encoded signature for later use
cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
//cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
cp.mutex.Lock()
ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature)
@ -851,7 +857,10 @@ func (cp *cwtchPeer) eventHandler() {
// Allow public profile parameters to be added as peer specific attributes...
if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone {
cp.SetConversationAttribute(onion, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val)
ci, err := cp.FetchConversationInfo(onion)
if ci != nil && err != nil {
cp.SetConversationAttribute(ci.ID, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val)
}
}
}
case event.PeerStateChange:

133
peer/cwtchprofilestorage.go

@ -34,11 +34,13 @@ type CwtchProfileStorage struct {
selectProfileKeyValueStmt *sql.Stmt
// Conversation related statements
insertConversationStmt *sql.Stmt
selectConversationStmt *sql.Stmt
acceptConversationStmt *sql.Stmt
deleteConversationStmt *sql.Stmt
setConversationAttributesStmt *sql.Stmt
insertConversationStmt *sql.Stmt
fetchAllConversationsStmt *sql.Stmt
selectConversationStmt *sql.Stmt
selectConversationByHandleStmt *sql.Stmt
acceptConversationStmt *sql.Stmt
deleteConversationStmt *sql.Stmt
setConversationAttributesStmt *sql.Stmt
channelInsertStmts map[ChannelID]*sql.Stmt
channelGetMessageStmts map[ChannelID]*sql.Stmt
@ -55,10 +57,12 @@ const insertProfileKeySQLStmt = `insert into profile_kv(KeyType, KeyName, KeyVal
const selectProfileKeySQLStmt = `select KeyValue from profile_kv where KeyType=(?) and KeyName=(?);`
const insertConversationSQLStmt = `insert into conversations(Handle, Attributes, ACL, Accepted) values(?,?,?,?);`
const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);`
const acceptedConversationSQLStmt = `update conversations set Accepted=true where Handle=(?);`
const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where Handle=(?) ;`
const deleteConversationSQLStmt = `delete from conversations where Handle=(?);`
const fetchAllConversationsSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations;`
const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where ID=(?);`
const selectConversationByHandleSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);`
const acceptedConversationSQLStmt = `update conversations set Accepted=true where ID=(?);`
const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where ID=(?) ;`
const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime);`
@ -95,12 +99,24 @@ func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
return nil, err
}
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
return nil, err
}
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
return nil, err
}
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
return nil, err
}
acceptConversationStmt, err := db.Prepare(acceptedConversationSQLStmt)
if err != nil {
log.Errorf("error preparing query: %v %v", acceptedConversationSQLStmt, err)
@ -119,7 +135,18 @@ func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) {
return nil, err
}
return &CwtchProfileStorage{db: db, insertProfileKeyValueStmt: insertProfileKeyValueStmt, selectProfileKeyValueStmt: selectProfileKeyStmt, insertConversationStmt: insertConversationStmt, selectConversationStmt: selectConversationStmt, acceptConversationStmt: acceptConversationStmt, deleteConversationStmt: deleteConversationStmt, setConversationAttributesStmt: setConversationAttributesStmt, channelInsertStmts: map[ChannelID]*sql.Stmt{}, channelGetMessageStmts: map[ChannelID]*sql.Stmt{}}, nil
return &CwtchProfileStorage{db: db,
insertProfileKeyValueStmt: insertProfileKeyValueStmt,
selectProfileKeyValueStmt: selectProfileKeyStmt,
fetchAllConversationsStmt: fetchAllConversationsStmt,
insertConversationStmt: insertConversationStmt,
selectConversationStmt: selectConversationStmt,
selectConversationByHandleStmt: selectConversationByHandleStmt,
acceptConversationStmt: acceptConversationStmt,
deleteConversationStmt: deleteConversationStmt,
setConversationAttributesStmt: setConversationAttributesStmt,
channelInsertStmts: map[ChannelID]*sql.Stmt{},
channelGetMessageStmts: map[ChannelID]*sql.Stmt{}}, nil
}
// StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine
@ -187,9 +214,8 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
return tx.Commit()
}
// GetConversation looks up a particular conversation by handle
func (cps *CwtchProfileStorage) GetConversation(handle string) (*model.Conversation, error) {
rows, err := cps.selectConversationStmt.Query(handle)
func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.Conversation, error) {
rows, err := cps.selectConversationByHandleStmt.Query(handle)
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
@ -202,11 +228,72 @@ func (cps *CwtchProfileStorage) GetConversation(handle string) (*model.Conversat
}
var id int
var rhandle string
var acl []byte
var attributes []byte
var accepted bool
err = rows.Scan(&id, &rhandle, &attributes, &acl, &accepted)
err = rows.Scan(&id, &handle, &attributes, &acl, &accepted)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return nil, err
}
rows.Close()
return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
}
func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, error) {
rows, err := cps.fetchAllConversationsStmt.Query()
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
var conversations []*model.Conversation
defer rows.Close()
for {
result := rows.Next()
if !result {
return conversations, nil
}
var id int
var handle string
var acl []byte
var attributes []byte
var accepted bool
err = rows.Scan(&id, &handle, &attributes, &acl, &accepted)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
return nil, err
}
conversations = append(conversations, &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted})
}
}
// GetConversation looks up a particular conversation by handle
func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, error) {
rows, err := cps.selectConversationStmt.Query(id)
if err != nil {
log.Errorf("error executing query: %v", err)
return nil, err
}
result := rows.Next()
if !result {
return nil, errors.New("no result found")
}
var handle string
var acl []byte
var attributes []byte
var accepted bool
err = rows.Scan(&id, &handle, &attributes, &acl, &accepted)
if err != nil {
log.Errorf("error fetching rows: %v", err)
rows.Close()
@ -214,12 +301,12 @@ func (cps *CwtchProfileStorage) GetConversation(handle string) (*model.Conversat
}
rows.Close()
return &model.Conversation{ID: id, Handle: rhandle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
}
// AcceptConversation sets the accepted status of a conversation to true in the backing datastore
func (cps *CwtchProfileStorage) AcceptConversation(handle string) error {
_, err := cps.acceptConversationStmt.Exec(handle)
func (cps *CwtchProfileStorage) AcceptConversation(id int) error {
_, err := cps.acceptConversationStmt.Exec(id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
@ -228,8 +315,8 @@ func (cps *CwtchProfileStorage) AcceptConversation(handle string) error {
}
// DeleteConversation purges the conversation and any associated message history from the conversation store.
func (cps *CwtchProfileStorage) DeleteConversation(handle string) error {
_, err := cps.deleteConversationStmt.Exec(handle)
func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
_, err := cps.deleteConversationStmt.Exec(id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err
@ -237,13 +324,13 @@ func (cps *CwtchProfileStorage) DeleteConversation(handle string) error {
return nil
}
func (cps *CwtchProfileStorage) SetConversationAttribute(handle string, path attr.ScopedZonedPath, value string) error {
ci, err := cps.GetConversation(handle)
func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
ci, err := cps.GetConversation(id)
if err != nil {
return err
}
ci.Attributes[path.ToString()] = value
_, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), handle)
_, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), id)
if err != nil {
log.Errorf("error executing query: %v", err)
return err

11
peer/profile_interface.go

@ -116,13 +116,14 @@ type CwtchPeer interface {
// New Unified Conversation Interfaces
NewContactConversation(handle string, acl model.AccessControl, accepted bool) error
FetchConversations() ([]*model.Conversation, error)
FetchConversationInfo(handle string) (*model.Conversation, error)
AcceptConversation(handle string) error
SetConversationAttribute(handle string, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(handle string, path attr.ScopedZonedPath) (string, error)
DeleteConversation(handle string) error
AcceptConversation(conversation int) error
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
DeleteConversation(conversation int) error
GetChannelMessage(converstion int, channel int, id int) (string, model.Attributes, error)
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
ShareFile(fileKey string, serializedManifest string)
}

15
testing/encryptedstorage/encrypted_storage_integration_test.go

@ -21,6 +21,8 @@ import (
func TestEncryptedStorage(t *testing.T) {
log.SetLevel(log.LevelDebug)
os.Mkdir("tordir", 0700)
dataDir := filepath.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
@ -49,7 +51,7 @@ func TestEncryptedStorage(t *testing.T) {
fmt.Println("Creating Alice...")
log.SetLevel(log.LevelDebug)
defer acn.Close()
acn.WaitTillBootstrapped()
app := app2.NewApp(acn, cwtchDir)
@ -65,6 +67,11 @@ func TestEncryptedStorage(t *testing.T) {
// To keep this large test organized, we will break it down into sub tests...
subTestAliceAddAndDeleteBob(t, alice, bob)
conversations, err := alice.FetchConversations()
if err != nil || len(conversations) != 1 {
t.Fatalf("unexpected issue when fetching all of alices conversations. Expected 1 got : %v %v", conversations, err)
}
alice.PeerWithOnion(bob.GetOnion())
time.Sleep(time.Second * 30)
@ -72,7 +79,9 @@ func TestEncryptedStorage(t *testing.T) {
alice.SendMessage(bob.GetOnion(), "Hello Bob")
time.Sleep(time.Second * 30)
ci,_ := bob.FetchConversationInfo(alice.GetOnion())
ci, _ := bob.FetchConversationInfo(alice.GetOnion())
body, _, err := bob.GetChannelMessage(ci.ID, 0, 1)
if body != "Hello Bob" || err != nil {
t.Fatalf("unexpected message in conversation channel %v %v", body, err)
@ -100,7 +109,7 @@ func subTestAliceAddAndDeleteBob(t *testing.T, alice peer.CwtchPeer, bob peer.Cw
oldID := bobCI.ID
alice.DeleteConversation(bob.GetOnion())
alice.DeleteConversation(oldID)
// Test Basic Fetching
bobCI, err = alice.FetchConversationInfo(bob.GetOnion())

30
testing/filesharing/file_sharing_integration_test.go

@ -91,10 +91,10 @@ func TestFileSharing(t *testing.T) {
os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
fmt.Println("Creating Alice...")
app.CreatePeer("alice", "asdfasdf")
app.CreateTaggedPeer("alice", "asdfasdf", "testing")
fmt.Println("Creating Bob...")
app.CreatePeer("bob", "asdfasdf")
app.CreateTaggedPeer("bob", "asdfasdf", "testing")
alice := utils.WaitGetPeer(app, "alice")
bob := utils.WaitGetPeer(app, "bob")
@ -111,7 +111,8 @@ func TestFileSharing(t *testing.T) {
t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime)
time.Sleep(waitTime)
bob.AddContact("alice?", alice.GetOnion(), model.AuthApproved)
bob.NewContactConversation(alice.GetOnion(),model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(),model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
fmt.Println("Waiting for alice and Bob to peer...")
@ -130,23 +131,24 @@ func TestFileSharing(t *testing.T) {
// Wait for the messages to arrive...
time.Sleep(time.Second * 10)
for _, message := range bob.GetContact(alice.GetOnion()).Timeline.GetMessages() {
message,_,err := bob.GetChannelMessage(1,0, 1)
if err != nil {
t.Fatalf("could not find file sharing message: %v", err)
}
var messageWrapper model.MessageWrapper
json.Unmarshal([]byte(message.Message), &messageWrapper)
var messageWrapper model.MessageWrapper
json.Unmarshal([]byte(message), &messageWrapper)
if messageWrapper.Overlay == model.OverlayFileSharing {
var fileMessageOverlay filesharing.OverlayMessage
err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay)
if messageWrapper.Overlay == model.OverlayFileSharing {
var fileMessageOverlay filesharing.OverlayMessage
err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay)
if err == nil {
filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", "cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce))
}
if err == nil {
filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", "cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce))
}
fmt.Printf("Found message from Alice: %v", message.Message)
}
// Wait for the file downloaded event
ev := queueOracle.Next()
if ev.EventType != event.FileDownloaded {

Loading…
Cancel
Save