diff --git a/.gitignore b/.gitignore index 9308c6b..b074e0f 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,7 @@ testing/cwtch.out.png.manifest testing/tordir/ tokens-bak.db tokens.db -tokens1.db \ No newline at end of file +tokens1.db +arch/ +testing/encryptedstorage/encrypted_storage_profiles +testing/encryptedstorage/tordir \ No newline at end of file diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 47ee4c3..20e0d9d 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -138,11 +138,11 @@ func (cp *cwtchPeer) SendMessage(handle string, message string) error { ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) } else if tor.IsValidHostname(handle) { // We assume we are sending to a Contact. - contact, exists := cp.FetchConversationInfo(handle) + contact, err := cp.FetchConversationInfo(handle) ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message}) // If the contact exists replace the event id wih the index of this message in the contacts timeline... // Otherwise assume we don't log the message in the timeline... - if exists != nil { + if contact != nil && err == nil { //ev.EventID = strconv.Itoa(contact.Timeline.Len()) cp.mutex.Lock() defer cp.mutex.Unlock() @@ -326,44 +326,50 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr // AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true` // This will cause Cwtch to auto connect to this conversation on start up -func (cp *cwtchPeer) AcceptConversation(handle string) error { +func (cp *cwtchPeer) AcceptConversation(id int) error { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.storage.AcceptConversation(handle) + return cp.storage.AcceptConversation(id) +} + +func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + return cp.storage.FetchConversations() } // FetchConversationInfo returns information about the given conversation referenced by the handle func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, error) { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.storage.GetConversation(handle) + return cp.storage.GetConversationByHandle(handle) } // DeleteConversation purges all data about the conversation, including message timelines, referenced by the handle -func (cp *cwtchPeer) DeleteConversation(handle string) error { +func (cp *cwtchPeer) DeleteConversation(id int) error { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.storage.DeleteConversation(handle) + return cp.storage.DeleteConversation(id) } // SetConversationAttribute sets the conversation attribute at path to value -func (cp *cwtchPeer) SetConversationAttribute(handle string, path attr.ScopedZonedPath, value string) error { +func (cp *cwtchPeer) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.storage.SetConversationAttribute(handle, path, value) + return cp.storage.SetConversationAttribute(id, path, value) } // GetConversationAttribute is a shortcut method for retrieving the value of a given path -func (cp *cwtchPeer) GetConversationAttribute(handle string, path attr.ScopedZonedPath) (string, error) { +func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath) (string, error) { cp.mutex.Lock() defer cp.mutex.Unlock() - ci, err := cp.storage.GetConversation(handle) + ci, err := cp.storage.GetConversation(id) if err != nil { return "", err } val, exists := ci.Attributes[path.ToString()] if !exists { - return "", fmt.Errorf("%v does not exist for conversation %v", path.ToString(), handle) + return "", fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id) } return val, nil } @@ -521,8 +527,8 @@ func (cp *cwtchPeer) GetOnion() string { func (cp *cwtchPeer) GetPeerState(onion string) (connections.ConnectionState, bool) { cp.mutex.Lock() defer cp.mutex.Unlock() - if peer, ok := cp.Profile.Contacts[onion]; ok { - return connections.ConnectionStateToType()[peer.State], true + if state, ok := cp.state[onion]; ok { + return state, ok } return connections.DISCONNECTED, false } @@ -699,7 +705,7 @@ func (cp *cwtchPeer) eventHandler() { // The security of cwtch groups are also not dependent on the servers inability to uniquely tag connections (as long as // it learns nothing else about each connection). // store the base64 encoded signature for later use - cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature]) + //cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature]) cp.mutex.Lock() ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature) @@ -851,7 +857,10 @@ func (cp *cwtchPeer) eventHandler() { // Allow public profile parameters to be added as peer specific attributes... if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone { - cp.SetConversationAttribute(onion, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val) + ci, err := cp.FetchConversationInfo(onion) + if ci != nil && err != nil { + cp.SetConversationAttribute(ci.ID, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val) + } } } case event.PeerStateChange: diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index f2f96f0..d4e2f9d 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -34,11 +34,13 @@ type CwtchProfileStorage struct { selectProfileKeyValueStmt *sql.Stmt // Conversation related statements - insertConversationStmt *sql.Stmt - selectConversationStmt *sql.Stmt - acceptConversationStmt *sql.Stmt - deleteConversationStmt *sql.Stmt - setConversationAttributesStmt *sql.Stmt + insertConversationStmt *sql.Stmt + fetchAllConversationsStmt *sql.Stmt + selectConversationStmt *sql.Stmt + selectConversationByHandleStmt *sql.Stmt + acceptConversationStmt *sql.Stmt + deleteConversationStmt *sql.Stmt + setConversationAttributesStmt *sql.Stmt channelInsertStmts map[ChannelID]*sql.Stmt channelGetMessageStmts map[ChannelID]*sql.Stmt @@ -55,10 +57,12 @@ const insertProfileKeySQLStmt = `insert into profile_kv(KeyType, KeyName, KeyVal const selectProfileKeySQLStmt = `select KeyValue from profile_kv where KeyType=(?) and KeyName=(?);` const insertConversationSQLStmt = `insert into conversations(Handle, Attributes, ACL, Accepted) values(?,?,?,?);` -const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);` -const acceptedConversationSQLStmt = `update conversations set Accepted=true where Handle=(?);` -const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where Handle=(?) ;` -const deleteConversationSQLStmt = `delete from conversations where Handle=(?);` +const fetchAllConversationsSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations;` +const selectConversationSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where ID=(?);` +const selectConversationByHandleSQLStmt = `select ID, Handle, Attributes, ACL, Accepted from conversations where Handle=(?);` +const acceptedConversationSQLStmt = `update conversations set Accepted=true where ID=(?);` +const setConversationAttributesSQLStmt = `update conversations set Attributes=(?) where ID=(?) ;` +const deleteConversationSQLStmt = `delete from conversations where ID=(?);` // createTableConversationMessagesSQLStmt is a template for creating conversation based tables... const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime);` @@ -95,12 +99,24 @@ func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) { return nil, err } + fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt) + if err != nil { + log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err) + return nil, err + } + selectConversationStmt, err := db.Prepare(selectConversationSQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err) return nil, err } + selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt) + if err != nil { + log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err) + return nil, err + } + acceptConversationStmt, err := db.Prepare(acceptedConversationSQLStmt) if err != nil { log.Errorf("error preparing query: %v %v", acceptedConversationSQLStmt, err) @@ -119,7 +135,18 @@ func NewCwtchProfileStorage(db *sql.DB) (*CwtchProfileStorage, error) { return nil, err } - return &CwtchProfileStorage{db: db, insertProfileKeyValueStmt: insertProfileKeyValueStmt, selectProfileKeyValueStmt: selectProfileKeyStmt, insertConversationStmt: insertConversationStmt, selectConversationStmt: selectConversationStmt, acceptConversationStmt: acceptConversationStmt, deleteConversationStmt: deleteConversationStmt, setConversationAttributesStmt: setConversationAttributesStmt, channelInsertStmts: map[ChannelID]*sql.Stmt{}, channelGetMessageStmts: map[ChannelID]*sql.Stmt{}}, nil + return &CwtchProfileStorage{db: db, + insertProfileKeyValueStmt: insertProfileKeyValueStmt, + selectProfileKeyValueStmt: selectProfileKeyStmt, + fetchAllConversationsStmt: fetchAllConversationsStmt, + insertConversationStmt: insertConversationStmt, + selectConversationStmt: selectConversationStmt, + selectConversationByHandleStmt: selectConversationByHandleStmt, + acceptConversationStmt: acceptConversationStmt, + deleteConversationStmt: deleteConversationStmt, + setConversationAttributesStmt: setConversationAttributesStmt, + channelInsertStmts: map[ChannelID]*sql.Stmt{}, + channelGetMessageStmts: map[ChannelID]*sql.Stmt{}}, nil } // StoreProfileKeyValue allows storing of typed Key/Value attribute in the Storage Engine @@ -187,9 +214,8 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model. return tx.Commit() } -// GetConversation looks up a particular conversation by handle -func (cps *CwtchProfileStorage) GetConversation(handle string) (*model.Conversation, error) { - rows, err := cps.selectConversationStmt.Query(handle) +func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.Conversation, error) { + rows, err := cps.selectConversationByHandleStmt.Query(handle) if err != nil { log.Errorf("error executing query: %v", err) return nil, err @@ -202,11 +228,10 @@ func (cps *CwtchProfileStorage) GetConversation(handle string) (*model.Conversat } var id int - var rhandle string var acl []byte var attributes []byte var accepted bool - err = rows.Scan(&id, &rhandle, &attributes, &acl, &accepted) + err = rows.Scan(&id, &handle, &attributes, &acl, &accepted) if err != nil { log.Errorf("error fetching rows: %v", err) rows.Close() @@ -214,12 +239,74 @@ func (cps *CwtchProfileStorage) GetConversation(handle string) (*model.Conversat } rows.Close() - return &model.Conversation{ID: id, Handle: rhandle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil + return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil +} + +func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, error) { + rows, err := cps.fetchAllConversationsStmt.Query() + if err != nil { + log.Errorf("error executing query: %v", err) + return nil, err + } + + var conversations []*model.Conversation + + defer rows.Close() + for { + result := rows.Next() + + if !result { + return conversations, nil + } + + var id int + var handle string + var acl []byte + var attributes []byte + var accepted bool + err = rows.Scan(&id, &handle, &attributes, &acl, &accepted) + if err != nil { + log.Errorf("error fetching rows: %v", err) + rows.Close() + return nil, err + } + conversations = append(conversations, &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}) + + } +} + +// GetConversation looks up a particular conversation by handle +func (cps *CwtchProfileStorage) GetConversation(id int) (*model.Conversation, error) { + rows, err := cps.selectConversationStmt.Query(id) + if err != nil { + log.Errorf("error executing query: %v", err) + return nil, err + } + + result := rows.Next() + + if !result { + return nil, errors.New("no result found") + } + + var handle string + var acl []byte + var attributes []byte + var accepted bool + err = rows.Scan(&id, &handle, &attributes, &acl, &accepted) + if err != nil { + log.Errorf("error fetching rows: %v", err) + rows.Close() + return nil, err + } + rows.Close() + + return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil } // AcceptConversation sets the accepted status of a conversation to true in the backing datastore -func (cps *CwtchProfileStorage) AcceptConversation(handle string) error { - _, err := cps.acceptConversationStmt.Exec(handle) +func (cps *CwtchProfileStorage) AcceptConversation(id int) error { + _, err := cps.acceptConversationStmt.Exec(id) if err != nil { log.Errorf("error executing query: %v", err) return err @@ -228,8 +315,8 @@ func (cps *CwtchProfileStorage) AcceptConversation(handle string) error { } // DeleteConversation purges the conversation and any associated message history from the conversation store. -func (cps *CwtchProfileStorage) DeleteConversation(handle string) error { - _, err := cps.deleteConversationStmt.Exec(handle) +func (cps *CwtchProfileStorage) DeleteConversation(id int) error { + _, err := cps.deleteConversationStmt.Exec(id) if err != nil { log.Errorf("error executing query: %v", err) return err @@ -237,13 +324,13 @@ func (cps *CwtchProfileStorage) DeleteConversation(handle string) error { return nil } -func (cps *CwtchProfileStorage) SetConversationAttribute(handle string, path attr.ScopedZonedPath, value string) error { - ci, err := cps.GetConversation(handle) +func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error { + ci, err := cps.GetConversation(id) if err != nil { return err } ci.Attributes[path.ToString()] = value - _, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), handle) + _, err = cps.setConversationAttributesStmt.Exec(ci.Attributes.Serialize(), id) if err != nil { log.Errorf("error executing query: %v", err) return err diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 46634e9..2d472ad 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -116,13 +116,14 @@ type CwtchPeer interface { // New Unified Conversation Interfaces NewContactConversation(handle string, acl model.AccessControl, accepted bool) error + FetchConversations() ([]*model.Conversation, error) FetchConversationInfo(handle string) (*model.Conversation, error) - AcceptConversation(handle string) error - SetConversationAttribute(handle string, path attr.ScopedZonedPath, value string) error - GetConversationAttribute(handle string, path attr.ScopedZonedPath) (string, error) - DeleteConversation(handle string) error + AcceptConversation(conversation int) error + SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error + GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error) + DeleteConversation(conversation int) error - GetChannelMessage(converstion int, channel int, id int) (string, model.Attributes, error) + GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) ShareFile(fileKey string, serializedManifest string) } diff --git a/testing/encryptedstorage/encrypted_storage_integration_test.go b/testing/encryptedstorage/encrypted_storage_integration_test.go index 47251f4..0bb37f0 100644 --- a/testing/encryptedstorage/encrypted_storage_integration_test.go +++ b/testing/encryptedstorage/encrypted_storage_integration_test.go @@ -21,6 +21,8 @@ import ( func TestEncryptedStorage(t *testing.T) { + log.SetLevel(log.LevelDebug) + os.Mkdir("tordir", 0700) dataDir := filepath.Join("tordir", "tor") os.MkdirAll(dataDir, 0700) @@ -49,7 +51,7 @@ func TestEncryptedStorage(t *testing.T) { fmt.Println("Creating Alice...") - log.SetLevel(log.LevelDebug) + defer acn.Close() acn.WaitTillBootstrapped() app := app2.NewApp(acn, cwtchDir) @@ -65,6 +67,11 @@ func TestEncryptedStorage(t *testing.T) { // To keep this large test organized, we will break it down into sub tests... subTestAliceAddAndDeleteBob(t, alice, bob) + conversations, err := alice.FetchConversations() + if err != nil || len(conversations) != 1 { + t.Fatalf("unexpected issue when fetching all of alices conversations. Expected 1 got : %v %v", conversations, err) + } + alice.PeerWithOnion(bob.GetOnion()) time.Sleep(time.Second * 30) @@ -72,7 +79,9 @@ func TestEncryptedStorage(t *testing.T) { alice.SendMessage(bob.GetOnion(), "Hello Bob") time.Sleep(time.Second * 30) - ci,_ := bob.FetchConversationInfo(alice.GetOnion()) + + + ci, _ := bob.FetchConversationInfo(alice.GetOnion()) body, _, err := bob.GetChannelMessage(ci.ID, 0, 1) if body != "Hello Bob" || err != nil { t.Fatalf("unexpected message in conversation channel %v %v", body, err) @@ -100,7 +109,7 @@ func subTestAliceAddAndDeleteBob(t *testing.T, alice peer.CwtchPeer, bob peer.Cw oldID := bobCI.ID - alice.DeleteConversation(bob.GetOnion()) + alice.DeleteConversation(oldID) // Test Basic Fetching bobCI, err = alice.FetchConversationInfo(bob.GetOnion()) diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index 19ef6f4..e7de8d4 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -91,10 +91,10 @@ func TestFileSharing(t *testing.T) { os.Mkdir(path.Join(cwtchDir, "testing"), 0700) fmt.Println("Creating Alice...") - app.CreatePeer("alice", "asdfasdf") + app.CreateTaggedPeer("alice", "asdfasdf", "testing") fmt.Println("Creating Bob...") - app.CreatePeer("bob", "asdfasdf") + app.CreateTaggedPeer("bob", "asdfasdf", "testing") alice := utils.WaitGetPeer(app, "alice") bob := utils.WaitGetPeer(app, "bob") @@ -111,7 +111,8 @@ func TestFileSharing(t *testing.T) { t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime) time.Sleep(waitTime) - bob.AddContact("alice?", alice.GetOnion(), model.AuthApproved) + bob.NewContactConversation(alice.GetOnion(),model.DefaultP2PAccessControl(), true) + alice.NewContactConversation(bob.GetOnion(),model.DefaultP2PAccessControl(), true) alice.PeerWithOnion(bob.GetOnion()) fmt.Println("Waiting for alice and Bob to peer...") @@ -130,23 +131,24 @@ func TestFileSharing(t *testing.T) { // Wait for the messages to arrive... time.Sleep(time.Second * 10) - for _, message := range bob.GetContact(alice.GetOnion()).Timeline.GetMessages() { + message,_,err := bob.GetChannelMessage(1,0, 1) + if err != nil { + t.Fatalf("could not find file sharing message: %v", err) + } - var messageWrapper model.MessageWrapper - json.Unmarshal([]byte(message.Message), &messageWrapper) + var messageWrapper model.MessageWrapper + json.Unmarshal([]byte(message), &messageWrapper) - if messageWrapper.Overlay == model.OverlayFileSharing { - var fileMessageOverlay filesharing.OverlayMessage - err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay) + if messageWrapper.Overlay == model.OverlayFileSharing { + var fileMessageOverlay filesharing.OverlayMessage + err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay) - if err == nil { - filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", "cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce)) - } + if err == nil { + filesharingFunctionality.DownloadFile(bob, alice.GetOnion(), "cwtch.out.png", "cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce)) } - - fmt.Printf("Found message from Alice: %v", message.Message) } + // Wait for the file downloaded event ev := queueOracle.Next() if ev.EventType != event.FileDownloaded {