New Storage Refactor #404

Merged
sarah merged 33 commits from p2p-interim-new-storage into master 2021-11-25 23:56:51 +00:00
15 changed files with 48 additions and 54 deletions
Showing only changes of commit 0614d31366 - Show all commits

View File

@ -178,6 +178,9 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf
legacyProfile := profileStore.GetProfileCopy(timeline) legacyProfile := profileStore.GetProfileCopy(timeline)
cps, err := peer.CreateEncryptedStore(profileDirectory, password) cps, err := peer.CreateEncryptedStore(profileDirectory, password)
if err != nil {
log.Errorf("error creating encrypted store: %v", err)
}
profile := peer.ImportLegacyProfile(legacyProfile, cps) profile := peer.ImportLegacyProfile(legacyProfile, cps)
loadProfileFn(profile) loadProfileFn(profile)
} }

View File

@ -23,7 +23,7 @@ type Functionality struct {
// FunctionalityGate returns contact.Functionality always // FunctionalityGate returns contact.Functionality always
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) { func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
if experimentMap["filesharing"] == true { if experimentMap["filesharing"] {
return new(Functionality), nil return new(Functionality), nil
} }
return nil, errors.New("filesharing is not enabled") return nil, errors.New("filesharing is not enabled")

View File

@ -44,7 +44,7 @@ func (a *Attributes) Serialize() []byte {
return data return data
} }
// DeserializeAttributes convers a JSON struct into an Attributes map // DeserializeAttributes converts a JSON struct into an Attributes map
func DeserializeAttributes(data []byte) Attributes { func DeserializeAttributes(data []byte) Attributes {
var attributes Attributes var attributes Attributes
json.Unmarshal(data, &attributes) json.Unmarshal(data, &attributes)
@ -62,6 +62,7 @@ type Conversation struct {
Accepted bool Accepted bool
} }
// GetAttribute is a helper function that fetches a conversation attribute by scope, zone and key
func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) { func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
if value, exists := ci.Attributes[scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key)).ToString()]; exists { if value, exists := ci.Attributes[scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key)).ToString()]; exists {
sarah marked this conversation as resolved
Review

this syntax is a little painful

this syntax is a little painful
return value, true return value, true
@ -69,6 +70,7 @@ func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key strin
return "", false return "", false
} }
// IsGroup is a helper attribute that identifies whether a conversation is a legacy group
func (ci *Conversation) IsGroup() bool { func (ci *Conversation) IsGroup() bool {
if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists { if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists {
return true return true
@ -76,6 +78,7 @@ func (ci *Conversation) IsGroup() bool {
return false return false
sarah marked this conversation as resolved
Review

....if true return true, else return false....

....if true return true, else return false....
Review

I agree, but current linting in quality.sh does not.

I agree, but current linting in quality.sh does not.
} }
// IsServer is a helper attribute that identifies whether a conversation is with a server
func (ci *Conversation) IsServer() bool { func (ci *Conversation) IsServer() bool {
if _, exists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(BundleType))).ToString()]; exists { if _, exists := ci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(BundleType))).ToString()]; exists {
return true return true
@ -83,6 +86,7 @@ func (ci *Conversation) IsServer() bool {
return false return false
sarah marked this conversation as resolved
Review

"

"
} }
// ConversationMessage bundles an instance of a conversation message row
type ConversationMessage struct { type ConversationMessage struct {
ID int ID int
Body string Body string

View File

@ -151,7 +151,7 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
// We assume we are sending to a Contact. // We assume we are sending to a Contact.
conversationInfo, err := cp.storage.GetConversation(conversation) conversationInfo, err := cp.storage.GetConversation(conversation)
// If the contact exists replace the event id wih the index of this message in the contacts timeline... // If the contact exists replace the event id with the index of this message in the contacts timeline...
// Otherwise assume we don't log the message in the timeline... // Otherwise assume we don't log the message in the timeline...
if conversationInfo != nil && err == nil { if conversationInfo != nil && err == nil {
@ -854,25 +854,27 @@ func (cp *cwtchPeer) Listen() {
// StartPeersConnections attempts to connect to peer connections // StartPeersConnections attempts to connect to peer connections
// Status: Ready for 1.5 // Status: Ready for 1.5
func (cp *cwtchPeer) StartPeersConnections() { func (cp *cwtchPeer) StartPeersConnections() {
//for _, contact := range cp.GetContacts() { conversations, _ := cp.FetchConversations()
// if !cp.GetContact(contact).IsServer() { for _, conversation := range conversations {
// cp.PeerWithOnion(contact) if conversation.Accepted && !conversation.IsGroup() && !conversation.IsServer() {
// } cp.PeerWithOnion(conversation.Handle)
//} }
}
} }
// StartServerConnections attempts to connect to all server connections // StartServerConnections attempts to connect to all server connections
// Status: Ready for 1.5 // Status: Ready for 1.5
func (cp *cwtchPeer) StartServerConnections() { func (cp *cwtchPeer) StartServerConnections() {
//for _, contact := range cp.GetContacts() { conversations, _ := cp.FetchConversations()
// if cp.GetContact(contact).IsServer() { for _, conversation := range conversations {
// err := cp.JoinServer(contact) if conversation.IsServer() {
// if err != nil { err := cp.JoinServer(conversation.Handle)
// // Almost certainly a programming error so print it.. if err != nil {
// log.Errorf("error joining server %v", err) // Almost certainly a programming error so print it..
// } log.Errorf("error joining server %v", err)
// } }
//} }
}
} }
// Shutdown kills all connections and cleans up all goroutines for the peer // Shutdown kills all connections and cleans up all goroutines for the peer
@ -889,7 +891,7 @@ func (cp *cwtchPeer) Shutdown() {
// Status: TODO // Status: TODO
func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) error { func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) error {
// TOOD maybe atomize this? // TODO maybe atomize this?
ci, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
if err != nil { if err != nil {
id, err := cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false) id, err := cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false)
@ -943,7 +945,7 @@ func (cp *cwtchPeer) eventHandler() {
conversations, err := cp.FetchConversations() conversations, err := cp.FetchConversations()
if err == nil { if err == nil {
for _, conversationInfo := range conversations { for _, conversationInfo := range conversations {
if tor.IsValidHostname(conversationInfo.Handle) == false { if !tor.IsValidHostname(conversationInfo.Handle) {
group, err := cp.constructGroupFromConversation(conversationInfo) group, err := cp.constructGroupFromConversation(conversationInfo)
if err == nil { if err == nil {
success, dgm := group.AttemptDecryption(ciphertext, signature) success, dgm := group.AttemptDecryption(ciphertext, signature)

View File

@ -83,17 +83,17 @@ const updateMessageIntoConversationSQLStmt = `update channel_%d_%d_chat set Attr
// getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation // getMessageFromConversationSQLStmt is a template for fetching a message by ID from a conversation
const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);` const getMessageFromConversationSQLStmt = `select Body, Attributes from channel_%d_%d_chat where ID=(?);`
// getMessageBySignatureFromConversationSQLStmt is a template for creating conversation based tables... // getMessageBySignatureFromConversationSQLStmt is a template for selecting conversation messages by signature
const getMessageBySignatureFromConversationSQLStmt = `select ID from channel_%d_%d_chat where Signature=(?);` const getMessageBySignatureFromConversationSQLStmt = `select ID from channel_%d_%d_chat where Signature=(?);`
// getMessageByContentHashFromConversationSQLStmt is a template for creating conversation based tables... // getMessageByContentHashFromConversationSQLStmt is a template for selecting conversation messages by content hash
const getMessageByContentHashFromConversationSQLStmt = `select ID from channel_%d_%d_chat where ContentHash=(?);` const getMessageByContentHashFromConversationSQLStmt = `select ID from channel_%d_%d_chat where ContentHash=(?);`
// getMessageCountFromConversationSqlStmt // getMessageCountFromConversationSQLStmt is a template for fetching the count of a messages in a conversation channel
const getMessageCountFromConversationSqlStmt = `select count(*) from channel_%d_%d_chat;` const getMessageCountFromConversationSQLStmt = `select count(*) from channel_%d_%d_chat;`
// getMostRecentMessagesFromSqlStmt // getMostRecentMessagesSQLStmt is a template for fetching the most recent N messages in a conversation channel
const getMostRecentMessagesSqlStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);` const getMostRecentMessagesSQLStmt = `select ID, Body, Attributes, Signature, ContentHash from channel_%d_%d_chat order by ID desc limit (?) offset (?);`
// NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for // NewCwtchProfileStorage constructs a new CwtchProfileStorage from a database. It is also responsible for
// Preparing commonly used SQL Statements // Preparing commonly used SQL Statements
@ -536,7 +536,7 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel
_, exists := cps.channelGetCountStmts[channelID] _, exists := cps.channelGetCountStmts[channelID]
if !exists { if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageCountFromConversationSqlStmt, conversation, channel)) conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMessageCountFromConversationSQLStmt, conversation, channel))
if err != nil { if err != nil {
log.Errorf("error executing transaction: %v", err) log.Errorf("error executing transaction: %v", err)
return -1, err return -1, err
@ -553,13 +553,13 @@ func (cps *CwtchProfileStorage) GetChannelMessageCount(conversation int, channel
return count, nil return count, nil
} }
// GetChannelMessageCount returns the number of messages in a channel // GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) { func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel} channelID := ChannelID{Conversation: conversation, Channel: channel}
_, exists := cps.channelGetMostRecentMessagesStmts[channelID] _, exists := cps.channelGetMostRecentMessagesStmts[channelID]
if !exists { if !exists {
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMostRecentMessagesSqlStmt, conversation, channel)) conversationStmt, err := cps.db.Prepare(fmt.Sprintf(getMostRecentMessagesSQLStmt, conversation, channel))
if err != nil { if err != nil {
log.Errorf("error executing transaction: %v", err) log.Errorf("error executing transaction: %v", err)
return nil, err return nil, err

View File

@ -140,7 +140,7 @@ func (e *engine) eventHandler() {
case event.InvitePeerToGroup: case event.InvitePeerToGroup:
err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
if err != nil { if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
} }
case event.JoinServer: case event.JoinServer:
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
@ -403,13 +403,6 @@ func (e *engine) serverConnecting(onion string) {
})) }))
} }
func (e *engine) serverConnected(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: onion,
event.ConnectionState: ConnectionStateName[CONNECTED],
}))
}
func (e *engine) serverAuthed(onion string) { func (e *engine) serverAuthed(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: onion, event.GroupServer: onion,

View File

@ -185,7 +185,7 @@ func (ta *TokenBoardClient) MakePayment() error {
log.Debugf("Waiting for successful PoW Auth...") log.Debugf("Waiting for successful PoW Auth...")
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp) connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
if connected == true && err == nil { if connected && err == nil {
log.Debugf("Waiting for successful Token Acquisition...") log.Debugf("Waiting for successful Token Acquisition...")
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
if err == nil { if err == nil {

View File

@ -201,7 +201,7 @@ func (m *Manifest) StoreChunk(id uint64, contents []byte) (uint64, error) {
// Write the contents of the chunk to the file // Write the contents of the chunk to the file
_, err = m.openFd.Write(contents) _, err = m.openFd.Write(contents)
if err == nil && m.chunkComplete[id] == false { if err == nil && !m.chunkComplete[id] {
m.chunkComplete[id] = true m.chunkComplete[id] = true
m.progress++ m.progress++
} }

View File

@ -29,16 +29,16 @@ func TestManifest(t *testing.T) {
t.Logf("%v", manifest) t.Logf("%v", manifest)
// Try to tread the chunk // Try to tread the chunk
contents, err := manifest.GetChunkBytes(1) _, err = manifest.GetChunkBytes(1)
if err == nil { if err == nil {
t.Fatalf("chunk fetch should have thrown an error") t.Fatalf("chunk fetch should have thrown an error")
} }
contents, err = manifest.GetChunkBytes(0) _, err = manifest.GetChunkBytes(0)
if err != nil { if err != nil {
t.Fatalf("chunk fetch error: %v", err) t.Fatalf("chunk fetch error: %v", err)
} }
contents, err = manifest.GetChunkBytes(0) _, err = manifest.GetChunkBytes(0)
if err != nil { if err != nil {
t.Fatalf("chunk fetch error: %v", err) t.Fatalf("chunk fetch error: %v", err)
} }
@ -46,7 +46,6 @@ func TestManifest(t *testing.T) {
json, _ := json.Marshal(manifest) json, _ := json.Marshal(manifest)
t.Logf("%s", json) t.Logf("%s", json)
t.Logf("%s", contents)
} }
func TestManifestLarge(t *testing.T) { func TestManifestLarge(t *testing.T) {

View File

@ -6,10 +6,6 @@ import (
"cwtch.im/cwtch/storage/v1" "cwtch.im/cwtch/storage/v1"
) )
const profileFilename = "profile"
const versionFile = "VERSION"
const currentVersion = 1
// ProfileStore is an interface to managing the storage of Cwtch Profiles // ProfileStore is an interface to managing the storage of Cwtch Profiles
type ProfileStore interface { type ProfileStore interface {
GetProfileCopy(timeline bool) *model.Profile GetProfileCopy(timeline bool) *model.Profile

View File

@ -56,7 +56,7 @@ func DecryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
if ok { if ok {
return decrypted, nil return decrypted, nil
} }
return nil, errors.New("Failed to decrypt") return nil, errors.New("failed to decrypt")
} }
// ReadEncryptedFile reads data from an encrypted file in directory with key // ReadEncryptedFile reads data from an encrypted file in directory with key

View File

@ -50,9 +50,7 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
fmt.Printf("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr) fmt.Printf("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break break
} }
time.Sleep(time.Second * 2)
} }
return
} }
func TestCwtchPeerIntegration(t *testing.T) { func TestCwtchPeerIntegration(t *testing.T) {

View File

@ -81,7 +81,7 @@ func TestEncryptedStorage(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("alice should have been able to fetch her own message") t.Fatalf("alice should have been able to fetch her own message")
} }
_, attr, err := alice.GetChannelMessage(2, 0, 1) _, attr, _ := alice.GetChannelMessage(2, 0, 1)
if attr[constants.AttrAck] != "false" { if attr[constants.AttrAck] != "false" {
t.Fatalf("Alices message should have been acknowledged...yet") t.Fatalf("Alices message should have been acknowledged...yet")
} }

View File

@ -48,7 +48,6 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
break break
} }
} }
return
} }
func TestFileSharing(t *testing.T) { func TestFileSharing(t *testing.T) {
@ -156,7 +155,7 @@ func TestFileSharing(t *testing.T) {
t.Fatalf("Expected file download event") t.Fatalf("Expected file download event")
} }
manifest, err := files.CreateManifest("cwtch.out.png") manifest, _ := files.CreateManifest("cwtch.out.png")
if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" { if hex.EncodeToString(manifest.RootHash) != "8f0ed73bbb30db45b6a740b1251cae02945f48e4f991464d5f3607685c45dcd136a325dab2e5f6429ce2b715e602b20b5b16bf7438fb6235fefe912adcedb5fd" {
t.Fatalf("file hash does not match expected %x: ", manifest.RootHash) t.Fatalf("file hash does not match expected %x: ", manifest.RootHash)
} }

View File

@ -9,7 +9,7 @@ go list ./... | xargs go vet
echo "" echo ""
echo "Linting:" echo "Linting:"
go list ./... | xargs golint staticcheck ./...
echo "Time to format" echo "Time to format"
@ -21,4 +21,4 @@ ineffassign .
# misspell (https://github.com/client9/misspell/cmd/misspell) # misspell (https://github.com/client9/misspell/cmd/misspell)
echo "Checking for misspelled words..." echo "Checking for misspelled words..."
misspell . | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea" misspell . | grep -v "testing/" | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea"