From 62d24978436452fbb64c1c0cffd10efdb90a2d68 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 10 Nov 2021 16:41:43 -0800 Subject: [PATCH] Purging old Profile / Storage Code - Start of Group Integration --- app/app.go | 5 +- .../filesharing/filesharing_functionality.go | 4 +- model/attr/zone.go | 10 + model/constants/attributes.go | 12 + model/conversation.go | 5 + model/group.go | 181 +++--- model/group_test.go | 32 +- model/message_test.go | 127 ---- model/profile.go | 471 +++++---------- model/profile_test.go | 136 ----- peer/cwtch_peer.go | 546 +++++++++--------- peer/cwtchprofilestorage.go | 38 +- peer/profile_interface.go | 32 +- storage/profile_store.go | 49 -- storage/profile_store_test.go | 76 --- storage/v0/file_enc.go | 70 --- storage/v0/file_store.go | 46 -- storage/v0/profile_store.go | 120 ---- storage/v0/profile_store_test.go | 70 --- storage/v0/stream_store.go | 145 ----- storage/v0/stream_store_test.go | 50 -- storage/v1/profile_store.go | 420 +------------- storage/v1/profile_store_test.go | 159 ----- testing/cwtch_peer_server_integration_test.go | 32 +- .../encrypted_storage_integration_test.go | 5 +- .../file_sharing_integration_test.go | 17 +- 26 files changed, 625 insertions(+), 2233 deletions(-) delete mode 100644 model/message_test.go delete mode 100644 model/profile_test.go delete mode 100644 storage/profile_store_test.go delete mode 100644 storage/v0/file_enc.go delete mode 100644 storage/v0/file_store.go delete mode 100644 storage/v0/profile_store.go delete mode 100644 storage/v0/profile_store_test.go delete mode 100644 storage/v0/stream_store.go delete mode 100644 storage/v0/stream_store_test.go delete mode 100644 storage/v1/profile_store_test.go diff --git a/app/app.go b/app/app.go index d9a569e..c095ee1 100644 --- a/app/app.go +++ b/app/app.go @@ -107,7 +107,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin app.eventBuses[profile.GetOnion()] = eventBus profile.Init(app.eventBuses[profile.GetOnion()]) app.peers[profile.GetOnion()] = profile - app.engines[profile.GetOnion()] = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) if tag != "" { profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) @@ -180,7 +180,6 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf _, exists := ac.eventBuses[profile.Onion] if exists { - profileStore.Shutdown() eventBus.Shutdown() log.Errorf("profile for onion %v already exists", profile.Onion) continue @@ -204,7 +203,7 @@ func (app *application) LoadProfiles(password string) { profile.Init(app.eventBuses[profile.GetOnion()]) app.appmutex.Lock() app.peers[profile.GetOnion()] = profile - app.engines[profile.GetOnion()] = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.appmutex.Unlock() app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) count++ diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index ab4daf2..8b06940 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -54,7 +54,7 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, down // ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file // at filepath -func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handle string) error { +func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, conversationID int) error { manifest, err := files.CreateManifest(filepath) if err != nil { return err @@ -93,7 +93,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl profile.ShareFile(key, string(serializedManifest)) - profile.SendMessage(handle, string(wrapperJSON)) + profile.SendMessage(conversationID, string(wrapperJSON)) return nil } diff --git a/model/attr/zone.go b/model/attr/zone.go index 77d7753..3e3c667 100644 --- a/model/attr/zone.go +++ b/model/attr/zone.go @@ -17,9 +17,15 @@ const ( // ProfileZone for attributes related to profile details like name and profile image ProfileZone = Zone("profile") + // LegacyGroupZone for attributes related to legacy group experiment + LegacyGroupZone = Zone("legacygroup") + // FilesharingZone for attributes related to file sharing FilesharingZone = Zone("filesharing") + // ServerKeyZone for attributes related to Server Keys + ServerKeyZone = Zone("serverkey") + // UnknownZone is a catch all useful for error handling UnknownZone = Zone("unknown") ) @@ -44,8 +50,12 @@ func ParseZone(path string) (Zone, string) { switch Zone(parts[0]) { case ProfileZone: return ProfileZone, parts[1] + case LegacyGroupZone: + return LegacyGroupZone, parts[1] case FilesharingZone: return FilesharingZone, parts[1] + case ServerKeyZone: + return ServerKeyZone, parts[1] default: return UnknownZone, parts[1] } diff --git a/model/constants/attributes.go b/model/constants/attributes.go index 0eebcf6..1ec678a 100644 --- a/model/constants/attributes.go +++ b/model/constants/attributes.go @@ -3,6 +3,9 @@ package constants // Name refers to a Profile Name const Name = "name" +// Onion refers the Onion address of the profile +const Onion = "onion" + // Tag describes the type of a profile e.g. default password / encrypted etc. const Tag = "tag" @@ -11,3 +14,12 @@ const ProfileTypeV1DefaultPassword = "v1-defaultPassword" // ProfileTypeV1Password is a tag describing a profile encrypted derived from a user-provided password. const ProfileTypeV1Password = "v1-userPassword" + +// GroupID is the ID of a group +const GroupID = "groupid" + +// GroupServer identifies the Server the legacy group is hosted on +const GroupServer = "groupserver" + +// GroupKey is the name of the group key attribute... +const GroupKey = "groupkey" diff --git a/model/conversation.go b/model/conversation.go index 558709c..a8c7f79 100644 --- a/model/conversation.go +++ b/model/conversation.go @@ -18,24 +18,29 @@ func DefaultP2PAccessControl() AccessControl { // functions type AccessControlList map[string]AccessControl +// Serialize transforms the ACL into json. func (acl *AccessControlList) Serialize() []byte { data, _ := json.Marshal(acl) return data } +// DeserializeAccessControlList takes in JSON and returns an AccessControlList func DeserializeAccessControlList(data []byte) AccessControlList { var acl AccessControlList json.Unmarshal(data, &acl) return acl } +// Attributes a type-driven encapsulation of an Attribute map. type Attributes map[string]string +// Serialize transforms an Attributes map into a JSON struct func (a *Attributes) Serialize() []byte { data, _ := json.Marshal(a) return data } +// DeserializeAttributes convers a JSON struct into an Attributes map func DeserializeAttributes(data []byte) Attributes { var attributes Attributes json.Unmarshal(data, &attributes) diff --git a/model/group.go b/model/group.go index ff01c4f..61b5668 100644 --- a/model/group.go +++ b/model/group.go @@ -4,8 +4,6 @@ import ( "crypto/ed25519" "crypto/rand" "crypto/sha512" - "cwtch.im/cwtch/model/attr" - "cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/protocol/groups" "encoding/base32" "encoding/base64" @@ -19,8 +17,6 @@ import ( "golang.org/x/crypto/pbkdf2" "io" "strings" - "sync" - "time" ) // CurrentGroupVersion is used to set the version of newly created groups and make sure group structs stored are correct and up to date @@ -33,25 +29,17 @@ const GroupInvitePrefix = "torv3" // tied to a server under a given group key. Each group has a set of Messages. type Group struct { // GroupID is now derived from the GroupKey and the GroupServer - GroupID string - GroupKey [32]byte - GroupServer string - Timeline Timeline `json:"-"` - Accepted bool - IsCompromised bool - Attributes map[string]string - lock sync.Mutex - LocalID string - State string `json:"-"` - Version int + GroupID string + GroupKey [32]byte + GroupServer string + Version int + Timeline Timeline `json:"-"` + LocalID string } // NewGroup initializes a new group associated with a given CwtchServer func NewGroup(server string) (*Group, error) { group := new(Group) - group.Version = CurrentGroupVersion - group.LocalID = GenerateRandomID() - group.Accepted = true // we are starting a group, so we assume we want to connect to it... if !tor.IsValidHostname(server) { return nil, errors.New("server is not a valid v3 onion") } @@ -68,11 +56,6 @@ func NewGroup(server string) (*Group, error) { // Derive Group ID from the group key and the server public key. This binds the group to a particular server // and key. group.GroupID = deriveGroupID(groupKey[:], server) - - group.Attributes = make(map[string]string) - // By default we set the "name" of the group to a random string, we can override this later, but to simplify the - // codes around invite, we assume that this is always set. - group.Attributes[attr.GetLocalScope(constants.Name)] = group.GroupID return group, nil } @@ -89,17 +72,12 @@ func deriveGroupID(groupKey []byte, serverHostname string) string { return hex.EncodeToString(pbkdf2.Key(groupKey, pubkey, 4096, 16, sha512.New)) } -// Compromised should be called if we detect a groupkey leak -func (g *Group) Compromised() { - g.IsCompromised = true -} - // Invite generates a invitation that can be sent to a cwtch peer -func (g *Group) Invite() (string, error) { +func (g *Group) Invite(name string) (string, error) { gci := &groups.GroupInvite{ GroupID: g.GroupID, - GroupName: g.Attributes[attr.GetLocalScope(constants.Name)], + GroupName: name, SharedKey: g.GroupKey[:], ServerHost: g.GroupServer, } @@ -109,74 +87,74 @@ func (g *Group) Invite() (string, error) { return serializedInvite, err } -// AddSentMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline -func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte) Message { - g.lock.Lock() - defer g.lock.Unlock() - timelineMessage := Message{ - Message: message.Text, - Timestamp: time.Unix(int64(message.Timestamp), 0), - Received: time.Unix(0, 0), - Signature: sig, - PeerID: message.Onion, - PreviousMessageSig: message.PreviousMessageSig, - ReceivedByServer: false, - } - g.Timeline.Insert(&timelineMessage) - return timelineMessage -} +//// AddSentMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline +//func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte) Message { +// g.lock.Lock() +// defer g.lock.Unlock() +// timelineMessage := Message{ +// Message: message.Text, +// Timestamp: time.Unix(int64(message.Timestamp), 0), +// Received: time.Unix(0, 0), +// Signature: sig, +// PeerID: message.Onion, +// PreviousMessageSig: message.PreviousMessageSig, +// ReceivedByServer: false, +// } +// g.Timeline.Insert(&timelineMessage) +// return timelineMessage +//} -// ErrorSentMessage removes a sent message from the unacknowledged list and sets its error flag if found, otherwise returns false -func (g *Group) ErrorSentMessage(sig []byte, error string) bool { - g.lock.Lock() - defer g.lock.Unlock() +//// ErrorSentMessage removes a sent message from the unacknowledged list and sets its error flag if found, otherwise returns false +//func (g *Group) ErrorSentMessage(sig []byte, error string) bool { +// g.lock.Lock() +// defer g.lock.Unlock() +// +// return g.Timeline.SetSendError(sig, error) +//} - return g.Timeline.SetSendError(sig, error) -} +//// GetMessage returns the message at index `index` if it exists. Otherwise returns false. +//// This routine also returns the length of the timeline +//// If go has an optional type this would return Option... +//func (g *Group) GetMessage(index int) (bool, Message, int) { +// g.lock.Lock() +// defer g.lock.Unlock() +// +// length := len(g.Timeline.Messages) +// +// if length > index { +// return true, g.Timeline.Messages[index], length +// } +// return false, Message{}, length +//} -// GetMessage returns the message at index `index` if it exists. Otherwise returns false. -// This routine also returns the length of the timeline -// If go has an optional type this would return Option... -func (g *Group) GetMessage(index int) (bool, Message, int) { - g.lock.Lock() - defer g.lock.Unlock() +//// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline +//func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) { +// +// g.lock.Lock() +// defer g.lock.Unlock() +// +// timelineMessage := &Message{ +// Message: message.Text, +// Timestamp: time.Unix(int64(message.Timestamp), 0), +// Received: time.Now(), +// Signature: sig, +// PeerID: message.Onion, +// PreviousMessageSig: message.PreviousMessageSig, +// ReceivedByServer: true, +// Error: "", +// Acknowledged: true, +// } +// index := g.Timeline.Insert(timelineMessage) +// +// return timelineMessage, index +//} - length := len(g.Timeline.Messages) - - if length > index { - return true, g.Timeline.Messages[index], length - } - return false, Message{}, length -} - -// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline -func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) { - - g.lock.Lock() - defer g.lock.Unlock() - - timelineMessage := &Message{ - Message: message.Text, - Timestamp: time.Unix(int64(message.Timestamp), 0), - Received: time.Now(), - Signature: sig, - PeerID: message.Onion, - PreviousMessageSig: message.PreviousMessageSig, - ReceivedByServer: true, - Error: "", - Acknowledged: true, - } - index := g.Timeline.Insert(timelineMessage) - - return timelineMessage, index -} - -// GetTimeline provides a safe copy of the timeline -func (g *Group) GetTimeline() (timeline []Message) { - g.lock.Lock() - defer g.lock.Unlock() - return g.Timeline.GetMessages() -} +//// GetTimeline provides a safe copy of the timeline +//func (g *Group) GetTimeline() (timeline []Message) { +// g.lock.Lock() +// defer g.lock.Unlock() +// return g.Timeline.GetMessages() +//} //EncryptMessage takes a message and encrypts the message under the group key. func (g *Group) EncryptMessage(message *groups.DecryptedGroupMessage) ([]byte, error) { @@ -211,21 +189,6 @@ func (g *Group) DecryptMessage(ciphertext []byte) (bool, *groups.DecryptedGroupM return false, nil } -// SetAttribute allows applications to store arbitrary configuration info at the group level. -func (g *Group) SetAttribute(name string, value string) { - g.lock.Lock() - defer g.lock.Unlock() - g.Attributes[name] = value -} - -// GetAttribute returns the value of a value set with SetAttribute. If no such value has been set exists is set to false. -func (g *Group) GetAttribute(name string) (value string, exists bool) { - g.lock.Lock() - defer g.lock.Unlock() - value, exists = g.Attributes[name] - return -} - // ValidateInvite takes in a serialized invite and returns the invite structure if it is cryptographically valid // and an error if it is not func ValidateInvite(invite string) (*groups.GroupInvite, error) { diff --git a/model/group_test.go b/model/group_test.go index 1de73e7..5f3ea83 100644 --- a/model/group_test.go +++ b/model/group_test.go @@ -4,7 +4,6 @@ import ( "crypto/sha256" "cwtch.im/cwtch/protocol/groups" "strings" - "sync" "testing" "time" ) @@ -20,7 +19,7 @@ func TestGroup(t *testing.T) { Padding: []byte{}, } - invite, err := g.Invite() + invite, err := g.Invite("name") if err != nil { t.Fatalf("error creating group invite: %v", err) @@ -42,11 +41,7 @@ func TestGroup(t *testing.T) { t.Errorf("group encryption was invalid, or returned wrong message decrypted:%v message:%v", ok, message) return } - g.SetAttribute("test", "test_value") - value, exists := g.GetAttribute("test") - if !exists || value != "test_value" { - t.Errorf("Custom Attribute Should have been set, instead %v %v", exists, value) - } + t.Logf("Got message %v", message) } @@ -61,20 +56,15 @@ func TestGroupErr(t *testing.T) { func TestGroupValidation(t *testing.T) { group := &Group{ - GroupID: "", - GroupKey: [32]byte{}, - GroupServer: "", - Timeline: Timeline{}, - Accepted: false, - IsCompromised: false, - Attributes: nil, - lock: sync.Mutex{}, - LocalID: "", - State: "", - Version: 0, + GroupID: "", + GroupKey: [32]byte{}, + GroupServer: "", + Timeline: Timeline{}, + LocalID: "", + Version: 0, } - invite, _ := group.Invite() + invite, _ := group.Invite("name") _, err := ValidateInvite(invite) if err == nil { @@ -85,7 +75,7 @@ func TestGroupValidation(t *testing.T) { // Generate a valid group but replace the group server... group, _ = NewGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") group.GroupServer = "tcnkoch4nyr3cldkemejtkpqok342rbql6iclnjjs3ndgnjgufzyxvqd" - invite, _ = group.Invite() + invite, _ = group.Invite("name") _, err = ValidateInvite(invite) if err == nil { @@ -96,7 +86,7 @@ func TestGroupValidation(t *testing.T) { // Generate a valid group but replace the group key... group, _ = NewGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") group.GroupKey = sha256.Sum256([]byte{}) - invite, _ = group.Invite() + invite, _ = group.Invite("name") _, err = ValidateInvite(invite) if err == nil { diff --git a/model/message_test.go b/model/message_test.go deleted file mode 100644 index be2859e..0000000 --- a/model/message_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package model - -import ( - "strconv" - "testing" - "time" -) - -func TestMessagePadding(t *testing.T) { - - // Setup the Group - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - sarah.AddContact(alice.Onion, &alice.PublicProfile) - alice.AddContact(sarah.Onion, &sarah.PublicProfile) - - gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - - sarah.ProcessInvite(invite) - - group := alice.GetGroup(gid) - - c1, s1, err := sarah.EncryptMessageToGroup("Hello World 1", group.GroupID) - t.Logf("Length of Encrypted Message: %v %v", len(c1), err) - alice.AttemptDecryption(c1, s1) - - c2, s2, _ := alice.EncryptMessageToGroup("Hello World 2", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c2)) - alice.AttemptDecryption(c2, s2) - - c3, s3, _ := alice.EncryptMessageToGroup("Hello World 3", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c3)) - alice.AttemptDecryption(c3, s3) - - c4, s4, _ := alice.EncryptMessageToGroup("Hello World this is a much longer message 3", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c4)) - alice.AttemptDecryption(c4, s4) - -} - -func TestTranscriptConsistency(t *testing.T) { - timeline := new(Timeline) - - // Setup the Group - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - sarah.AddContact(alice.Onion, &alice.PublicProfile) - alice.AddContact(sarah.Onion, &sarah.PublicProfile) - - // The lightest weight server entry possible (usually we would import a key bundle...) - sarah.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}}) - - gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - - sarah.ProcessInvite(invite) - - group := alice.GetGroup(gid) - - t.Logf("group: %v, sarah %v", group, sarah) - - c1, s1, _ := alice.EncryptMessageToGroup("Hello World 1", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c1)) - alice.AttemptDecryption(c1, s1) - - c2, s2, _ := alice.EncryptMessageToGroup("Hello World 2", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c2)) - alice.AttemptDecryption(c2, s2) - - c3, s3, _ := alice.EncryptMessageToGroup("Hello World 3", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c3)) - alice.AttemptDecryption(c3, s3) - - time.Sleep(time.Second * 1) - - c4, s4, _ := alice.EncryptMessageToGroup("Hello World 4", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c4)) - alice.AttemptDecryption(c4, s4) - - c5, s5, _ := alice.EncryptMessageToGroup("Hello World 5", group.GroupID) - t.Logf("Length of Encrypted Message: %v", len(c5)) - - _, _, m1, _ := sarah.AttemptDecryption(c1, s1) - sarah.AttemptDecryption(c1, s1) // Try a duplicate - _, _, m2, _ := sarah.AttemptDecryption(c2, s2) - _, _, m3, _ := sarah.AttemptDecryption(c3, s3) - _, _, m4, _ := sarah.AttemptDecryption(c4, s4) - _, _, m5, _ := sarah.AttemptDecryption(c5, s5) - - // Now we simulate a client receiving these Messages completely out of order - timeline.Insert(m1) - timeline.Insert(m5) - timeline.Insert(m4) - timeline.Insert(m3) - timeline.Insert(m2) - - for i, m := range group.GetTimeline() { - if m.Message != "Hello World "+strconv.Itoa(i+1) { - t.Fatalf("Timeline Out of Order!: %v %v", i, m) - } - - t.Logf("Messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig) - } - - // Test message by hash lookup... - hash := timeline.calculateHash(*m5) - - t.Logf("Looking up %v ", hash) - - for key, msgs := range timeline.hashCache { - t.Logf("%v %v", key, msgs) - } - - // check a real message.. - msgs, err := timeline.GetMessagesByHash(hash) - if err != nil || len(msgs) != 1 { - t.Fatalf("looking up message by hash %v should have not errored: %v", hash, err) - } else if msgs[0].Message.Message != m5.Message { - t.Fatalf("%v != %v", msgs[0].Message, m5.Message) - } - - // Check a non existed hash... error if there is no error - _, err = timeline.GetMessagesByHash("not a real hash") - if err == nil { - t.Fatalf("looking up message by hash %v should have errored: %v", hash, err) - } - -} diff --git a/model/profile.go b/model/profile.go index 03c5dc3..ad92bc0 100644 --- a/model/profile.go +++ b/model/profile.go @@ -2,18 +2,11 @@ package model import ( "crypto/rand" - "cwtch.im/cwtch/model/attr" - "cwtch.im/cwtch/model/constants" - "cwtch.im/cwtch/protocol/groups" "encoding/base32" - "encoding/base64" "encoding/hex" "encoding/json" - "errors" - "fmt" "git.openprivacy.ca/openprivacy/connectivity/tor" "golang.org/x/crypto/ed25519" - "io" "path/filepath" "strings" "sync" @@ -127,42 +120,6 @@ func (p *Profile) AddContact(onion string, profile *PublicProfile) { p.lock.Unlock() } -// UpdateMessageFlags updates the flags stored with a message -func (p *Profile) UpdateMessageFlags(handle string, mIdx int, flags uint64) { - p.lock.Lock() - defer p.lock.Unlock() - if contact, exists := p.Contacts[handle]; exists { - if len(contact.Timeline.Messages) > mIdx { - contact.Timeline.Messages[mIdx].Flags = flags - } - } else if group, exists := p.Groups[handle]; exists { - if len(group.Timeline.Messages) > mIdx { - group.Timeline.Messages[mIdx].Flags = flags - } - } -} - -// DeleteContact deletes a peer contact -func (p *Profile) DeleteContact(onion string) { - p.lock.Lock() - defer p.lock.Unlock() - delete(p.Contacts, onion) -} - -// DeleteGroup deletes a group -func (p *Profile) DeleteGroup(groupID string) { - p.lock.Lock() - defer p.lock.Unlock() - delete(p.Groups, groupID) -} - -// RejectInvite rejects and removes a group invite -func (p *Profile) RejectInvite(groupID string) { - p.lock.Lock() - delete(p.Groups, groupID) - p.lock.Unlock() -} - // AddSentMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile. func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt string, sent time.Time, eventID string) *Message { p.lock.Lock() @@ -235,95 +192,6 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) int { return -1 } -// AddGroupSentMessageError searches matching groups for the message by sig and marks it as an error -func (p *Profile) AddGroupSentMessageError(groupID string, signature []byte, error string) { - p.lock.Lock() - defer p.lock.Unlock() - group, exists := p.Groups[groupID] - if exists { - group.ErrorSentMessage(signature, error) - } -} - -// AcceptInvite accepts a group invite -func (p *Profile) AcceptInvite(groupID string) (err error) { - p.lock.Lock() - defer p.lock.Unlock() - group, ok := p.Groups[groupID] - if ok { - group.Accepted = true - } else { - err = errors.New("group does not exist") - } - return -} - -// GetGroups returns an unordered list of group IDs associated with this profile. -func (p *Profile) GetGroups() []string { - p.lock.Lock() - defer p.lock.Unlock() - var keys []string - for onion := range p.Groups { - keys = append(keys, onion) - } - return keys -} - -// GetContacts returns an unordered list of contact onions associated with this profile. -func (p *Profile) GetContacts() []string { - p.lock.Lock() - defer p.lock.Unlock() - var keys []string - for onion := range p.Contacts { - if onion != p.Onion { - keys = append(keys, onion) - } - } - return keys -} - -// SetContactAuthorization sets the authoirization level of a peer -func (p *Profile) SetContactAuthorization(onion string, auth Authorization) (err error) { - p.lock.Lock() - defer p.lock.Unlock() - contact, ok := p.Contacts[onion] - if ok { - contact.Authorization = auth - } else { - err = errors.New("peer does not exist") - } - return -} - -// GetContactAuthorization returns the contact's authorization level -func (p *Profile) GetContactAuthorization(onion string) Authorization { - p.lock.Lock() - defer p.lock.Unlock() - contact, ok := p.Contacts[onion] - if ok { - return contact.Authorization - } - return AuthUnknown -} - -// ContactsAuthorizations calculates a list of Peers who are at the supplied auth levels -func (p *Profile) ContactsAuthorizations(authorizationFilter ...Authorization) map[string]Authorization { - authorizations := map[string]Authorization{} - for _, contact := range p.GetContacts() { - c, _ := p.GetContact(contact) - authorizations[c.Onion] = c.Authorization - } - return authorizations -} - -// GetContact returns a contact if the profile has it -func (p *Profile) GetContact(onion string) (*PublicProfile, bool) { - p.lock.Lock() - defer p.lock.Unlock() - contact, ok := p.Contacts[onion] - return contact, ok -} - // VerifyGroupMessage confirms the authenticity of a message given an sender onion, message and signature. // The goal of this function is 2-fold: // 1. We confirm that the sender referenced in the group text is the actual sender of the message (or at least @@ -336,28 +204,28 @@ func (p *Profile) GetContact(onion string) (*PublicProfile, bool) { // on two different servers with the same key and then forwards messages between them to convince the parties in // each group that they are actually in one big group (with the intent to later censor and/or selectively send messages // to each group). -func (p *Profile) VerifyGroupMessage(onion string, groupID string, message string, signature []byte) bool { - - group := p.GetGroup(groupID) - if group == nil { - return false - } - - // We use our group id, a known reference server and the ciphertext of the message. - m := groupID + group.GroupServer + message - - // If the message is ostensibly from us then we check it against our public key... - if onion == p.Onion { - return ed25519.Verify(p.Ed25519PublicKey, []byte(m), signature) - } - - // Otherwise we derive the public key from the sender and check it against that. - decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) - if err == nil && len(decodedPub) >= 32 { - return ed25519.Verify(decodedPub[:32], []byte(m), signature) - } - return false -} +//func (p *Profile) VerifyGroupMessage(onion string, groupID string, message string, signature []byte) bool { +// +// group := p.GetGroup(groupID) +// if group == nil { +// return false +// } +// +// // We use our group id, a known reference server and the ciphertext of the message. +// m := groupID + group.GroupServer + message +// +// // If the message is ostensibly from us then we check it against our public key... +// if onion == p.Onion { +// return ed25519.Verify(p.Ed25519PublicKey, []byte(m), signature) +// } +// +// // Otherwise we derive the public key from the sender and check it against that. +// decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) +// if err == nil && len(decodedPub) >= 32 { +// return ed25519.Verify(decodedPub[:32], []byte(m), signature) +// } +// return false +//} // SignMessage takes a given message and returns an Ed21159 signature func (p *Profile) SignMessage(message string) []byte { @@ -365,170 +233,139 @@ func (p *Profile) SignMessage(message string) []byte { return sig } -// StartGroup when given a server, creates a new Group under this profile and returns the group id an a precomputed -// invite which can be sent on the wire. -func (p *Profile) StartGroup(server string) (groupID string, invite string, err error) { - group, err := NewGroup(server) - if err != nil { - return "", "", err - } - groupID = group.GroupID - invite, err = group.Invite() - p.lock.Lock() - defer p.lock.Unlock() - p.Groups[group.GroupID] = group - return -} +//// ProcessInvite validates a group invite and adds a new group invite to the profile if it is valid. +//// returns the new group ID on success, error on fail. +//func (p *Profile) ProcessInvite(invite string) (string, error) { +// gci, err := ValidateInvite(invite) +// if err == nil { +// if server, exists := p.GetContact(gci.ServerHost); !exists || !server.IsServer() { +// return "", fmt.Errorf("unknown server. a server key bundle needs to be imported before this group can be verified") +// } +// group := new(Group) +// group.Version = CurrentGroupVersion +// group.GroupID = gci.GroupID +// group.LocalID = GenerateRandomID() +// copy(group.GroupKey[:], gci.SharedKey[:]) +// group.GroupServer = gci.ServerHost +// group.Accepted = false +// group.Attributes = make(map[string]string) +// group.Attributes[attr.GetLocalScope(constants.Name)] = gci.GroupName +// p.AddGroup(group) +// return gci.GroupID, nil +// } +// return "", err +//} -// GetGroup a pointer to a Group by the group Id, returns nil if no group found. -func (p *Profile) GetGroup(groupID string) (g *Group) { - p.lock.Lock() - defer p.lock.Unlock() - g = p.Groups[groupID] - return -} - -// ProcessInvite validates a group invite and adds a new group invite to the profile if it is valid. -// returns the new group ID on success, error on fail. -func (p *Profile) ProcessInvite(invite string) (string, error) { - gci, err := ValidateInvite(invite) - if err == nil { - if server, exists := p.GetContact(gci.ServerHost); !exists || !server.IsServer() { - return "", fmt.Errorf("unknown server. a server key bundle needs to be imported before this group can be verified") - } - group := new(Group) - group.Version = CurrentGroupVersion - group.GroupID = gci.GroupID - group.LocalID = GenerateRandomID() - copy(group.GroupKey[:], gci.SharedKey[:]) - group.GroupServer = gci.ServerHost - group.Accepted = false - group.Attributes = make(map[string]string) - group.Attributes[attr.GetLocalScope(constants.Name)] = gci.GroupName - p.AddGroup(group) - return gci.GroupID, nil - } - return "", err -} - -// AddGroup is a convenience method for adding a group to a profile. -func (p *Profile) AddGroup(group *Group) { - p.lock.Lock() - defer p.lock.Unlock() - _, exists := p.Groups[group.GroupID] - if !exists { - p.Groups[group.GroupID] = group - } -} - -// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups. -// If successful, adds the message to the group's timeline -func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) { - for _, group := range p.Groups { - success, dgm := group.DecryptMessage(ciphertext) - if success { - - // Attempt to serialize this message - serialized, err := json.Marshal(dgm) - - // Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer - // to verify the message, we simply ignore it. - if err != nil { - return false, group.GroupID, nil, -1 - } - - // This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only - // be derivable from the cryptographic key) which contains many unique elements such as the time and random padding - verified := p.VerifyGroupMessage(dgm.Onion, group.GroupID, base64.StdEncoding.EncodeToString(serialized), signature) - - if !verified { - // An earlier version of this protocol mistakenly signed the ciphertext of the message - // instead of the serialized decrypted group message. - // This has 2 issues: - // 1. A server with knowledge of group members public keys AND the Group ID would be able to detect valid messages - // 2. It made the metadata-security of a group dependent on keeping the cryptographically derived Group ID secret. - // While not awful, it also isn't good. For Version 3 groups only we permit Cwtch to check this older signature - // structure in a backwards compatible way for the duration of the Groups Experiment. - // TODO: Delete this check when Groups are no long Experimental - if group.Version == 3 { - verified = p.VerifyGroupMessage(dgm.Onion, group.GroupID, string(ciphertext), signature) - } - } - - // So we have a message that has a valid group key, but the signature can't be verified. - // The most obvious explanation for this is that the group key has been compromised (or we are in an open group and the server is being malicious) - // Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised. - if !verified { - group.Compromised() - return false, group.GroupID, nil, -1 - } - message, index := group.AddMessage(dgm, signature) - return true, group.GroupID, message, index - } - } - - // If we couldn't find a group to decrypt the message with we just return false. This is an expected case - return false, "", nil, -1 -} - -func getRandomness(arr *[]byte) { - if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil { - if err != nil { - // If we can't do randomness, just crash something is very very wrong and we are not going - // to resolve it here.... - panic(err.Error()) - } - } -} - -// EncryptMessageToGroup when given a message and a group, encrypts and signs the message under the group and -// profile -func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte, []byte, error) { - - if len(message) > MaxGroupMessageLength { - return nil, nil, errors.New("group message is too long") - } - - group := p.GetGroup(groupID) - if group != nil { - timestamp := time.Now().Unix() - - // Select the latest message from the timeline as a reference point. - var prevSig []byte - if len(group.Timeline.Messages) > 0 { - prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature - } else { - prevSig = []byte(group.GroupID) - } - - lenPadding := MaxGroupMessageLength - len(message) - padding := make([]byte, lenPadding) - getRandomness(&padding) - hexGroupID, err := hex.DecodeString(group.GroupID) - if err != nil { - return nil, nil, err - } - - dm := &groups.DecryptedGroupMessage{ - Onion: p.Onion, - Text: message, - SignedGroupID: hexGroupID, - Timestamp: uint64(timestamp), - PreviousMessageSig: prevSig, - Padding: padding[:], - } - - ciphertext, err := group.EncryptMessage(dm) - if err != nil { - return nil, nil, err - } - serialized, _ := json.Marshal(dm) - signature := p.SignMessage(groupID + group.GroupServer + base64.StdEncoding.EncodeToString(serialized)) - group.AddSentMessage(dm, signature) - return ciphertext, signature, nil - } - return nil, nil, errors.New("group does not exist") -} +// +// +//// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups. +//// If successful, adds the message to the group's timeline +//func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) { +// for _, group := range p.Groups { +// success, dgm := group.DecryptMessage(ciphertext) +// if success { +// +// // Attempt to serialize this message +// serialized, err := json.Marshal(dgm) +// +// // Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer +// // to verify the message, we simply ignore it. +// if err != nil { +// return false, group.GroupID, nil, -1 +// } +// +// // This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only +// // be derivable from the cryptographic key) which contains many unique elements such as the time and random padding +// verified := p.VerifyGroupMessage(dgm.Onion, group.GroupID, base64.StdEncoding.EncodeToString(serialized), signature) +// +// if !verified { +// // An earlier version of this protocol mistakenly signed the ciphertext of the message +// // instead of the serialized decrypted group message. +// // This has 2 issues: +// // 1. A server with knowledge of group members public keys AND the Group ID would be able to detect valid messages +// // 2. It made the metadata-security of a group dependent on keeping the cryptographically derived Group ID secret. +// // While not awful, it also isn't good. For Version 3 groups only we permit Cwtch to check this older signature +// // structure in a backwards compatible way for the duration of the Groups Experiment. +// // TODO: Delete this check when Groups are no long Experimental +// if group.Version == 3 { +// verified = p.VerifyGroupMessage(dgm.Onion, group.GroupID, string(ciphertext), signature) +// } +// } +// +// // So we have a message that has a valid group key, but the signature can't be verified. +// // The most obvious explanation for this is that the group key has been compromised (or we are in an open group and the server is being malicious) +// // Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised. +// if !verified { +// return false, group.GroupID, nil, -1 +// } +// message, index := group.AddMessage(dgm, signature) +// return true, group.GroupID, message, index +// } +// } +// +// // If we couldn't find a group to decrypt the message with we just return false. This is an expected case +// return false, "", nil, -1 +//} +// +//func getRandomness(arr *[]byte) { +// if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil { +// if err != nil { +// // If we can't do randomness, just crash something is very very wrong and we are not going +// // to resolve it here.... +// panic(err.Error()) +// } +// } +//} +// +//// EncryptMessageToGroup when given a message and a group, encrypts and signs the message under the group and +//// profile +//func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte, []byte, error) { +// +// if len(message) > MaxGroupMessageLength { +// return nil, nil, errors.New("group message is too long") +// } +// +// group := p.GetGroup(groupID) +// if group != nil { +// timestamp := time.Now().Unix() +// +// // Select the latest message from the timeline as a reference point. +// var prevSig []byte +// if len(group.Timeline.Messages) > 0 { +// prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature +// } else { +// prevSig = []byte(group.GroupID) +// } +// +// lenPadding := MaxGroupMessageLength - len(message) +// padding := make([]byte, lenPadding) +// getRandomness(&padding) +// hexGroupID, err := hex.DecodeString(group.GroupID) +// if err != nil { +// return nil, nil, err +// } +// +// dm := &groups.DecryptedGroupMessage{ +// Onion: p.Onion, +// Text: message, +// SignedGroupID: hexGroupID, +// Timestamp: uint64(timestamp), +// PreviousMessageSig: prevSig, +// Padding: padding[:], +// } +// +// ciphertext, err := group.EncryptMessage(dm) +// if err != nil { +// return nil, nil, err +// } +// serialized, _ := json.Marshal(dm) +// signature := p.SignMessage(groupID + group.GroupServer + base64.StdEncoding.EncodeToString(serialized)) +// group.AddSentMessage(dm, signature) +// return ciphertext, signature, nil +// } +// return nil, nil, errors.New("group does not exist") +//} +// // GetCopy returns a full deep copy of the Profile struct and its members (timeline inclusion control by arg) func (p *Profile) GetCopy(timeline bool) *Profile { diff --git a/model/profile_test.go b/model/profile_test.go deleted file mode 100644 index 2cd8ae5..0000000 --- a/model/profile_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package model - -import ( - "testing" -) - -func TestProfileIdentity(t *testing.T) { - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - - alice.AddContact(sarah.Onion, &sarah.PublicProfile) - if alice.Contacts[sarah.Onion].Name != "Sarah" { - t.Errorf("alice should have added sarah as a contact %v", alice.Contacts) - } - - if len(alice.GetContacts()) != 1 { - t.Errorf("alice should be only contact: %v", alice.GetContacts()) - } - - alice.SetAttribute("test", "hello world") - value, _ := alice.GetAttribute("test") - if value != "hello world" { - t.Errorf("value from custom attribute should have been 'hello world', instead was: %v", value) - } - - t.Logf("%v", alice) -} - -func TestTrustPeer(t *testing.T) { - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - sarah.AddContact(alice.Onion, &alice.PublicProfile) - alice.AddContact(sarah.Onion, &sarah.PublicProfile) - alice.SetContactAuthorization(sarah.Onion, AuthApproved) - if alice.GetContactAuthorization(sarah.Onion) != AuthApproved { - t.Errorf("peer should be approved") - } -} - -func TestBlockPeer(t *testing.T) { - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - sarah.AddContact(alice.Onion, &alice.PublicProfile) - alice.AddContact(sarah.Onion, &sarah.PublicProfile) - alice.SetContactAuthorization(sarah.Onion, AuthBlocked) - if alice.GetContactAuthorization(sarah.Onion) != AuthBlocked { - t.Errorf("peer should be blocked") - } - - if alice.SetContactAuthorization("", AuthUnknown) == nil { - t.Errorf("Seting Auth level of a non existent peer should error") - } -} - -func TestAcceptNonExistentGroup(t *testing.T) { - sarah := GenerateNewProfile("Sarah") - sarah.AcceptInvite("doesnotexist") -} - -func TestRejectGroupInvite(t *testing.T) { - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - sarah.AddContact(alice.Onion, &alice.PublicProfile) - alice.AddContact(sarah.Onion, &sarah.PublicProfile) - // The lightest weight server entry possible (usually we would import a key bundle...) - sarah.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}}) - - gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - sarah.ProcessInvite(invite) - group := alice.GetGroup(gid) - if len(sarah.Groups) == 1 { - if sarah.GetGroup(group.GroupID).Accepted { - t.Errorf("Group should not be accepted") - } - sarah.RejectInvite(group.GroupID) - if len(sarah.Groups) != 0 { - t.Errorf("Group %v should have been deleted", group.GroupID) - } - return - } - t.Errorf("Group should exist in map") -} - -func TestProfileGroup(t *testing.T) { - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - sarah.AddContact(alice.Onion, &alice.PublicProfile) - alice.AddContact(sarah.Onion, &sarah.PublicProfile) - - gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - - // The lightest weight server entry possible (usually we would import a key bundle...) - sarah.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}}) - sarah.ProcessInvite(invite) - if len(sarah.GetGroups()) != 1 { - t.Errorf("sarah should only be in 1 group instead: %v", sarah.GetGroups()) - } - - group := alice.GetGroup(gid) - sarah.AcceptInvite(group.GroupID) - c, s1, _ := sarah.EncryptMessageToGroup("Hello World", group.GroupID) - alice.AttemptDecryption(c, s1) - - gid2, invite2, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - sarah.ProcessInvite(invite2) - group2 := alice.GetGroup(gid2) - c2, s2, _ := sarah.EncryptMessageToGroup("Hello World", group2.GroupID) - alice.AttemptDecryption(c2, s2) - - _, _, err := sarah.EncryptMessageToGroup(string(make([]byte, MaxGroupMessageLength*2)), group2.GroupID) - if err == nil { - t.Errorf("Overly long message should have returned an error") - } - - bob := GenerateNewProfile("bob") - bob.AddContact(alice.Onion, &alice.PublicProfile) - // The lightest weight server entry possible (usually we would import a key bundle...) - bob.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}}) - - bob.ProcessInvite(invite2) - c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID) - if err == nil { - ok, _, message, _ := alice.AttemptDecryption(c3, s3) - if !ok { - t.Errorf("Bobs message to the group should be decrypted %v %v", message, ok) - } - - eve := GenerateNewProfile("eve") - ok, _, _, _ = eve.AttemptDecryption(c3, s3) - if ok { - t.Errorf("Eves hould not be able to decrypt Messages!") - } - } else { - t.Errorf("Bob failed to encrypt a message to the group") - } -} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 20e0d9d..f815b2e 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1,6 +1,7 @@ package peer import ( + "crypto/rand" "cwtch.im/cwtch/model/constants" "encoding/base64" "encoding/json" @@ -8,6 +9,8 @@ import ( "fmt" "git.openprivacy.ca/cwtch.im/tapir/primitives" "git.openprivacy.ca/openprivacy/connectivity" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "golang.org/x/crypto/ed25519" "runtime" "strconv" "strings" @@ -19,7 +22,6 @@ import ( "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/files" - "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" ) @@ -47,7 +49,6 @@ var DefaultEventsToHandle = []event.Type{ // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer type cwtchPeer struct { - Profile *model.Profile mutex sync.Mutex shutdown bool listenStatus bool @@ -61,14 +62,37 @@ type cwtchPeer struct { // GenerateProtocolEngine // Status: New in 1.5 -func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) connections.Engine { - return connections.NewProtocolEngine(cp.GetIdentity(), cp.Profile.Ed25519PrivateKey, acn, bus, cp.Profile.ContactsAuthorizations()) -} +func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + conversations, _ := cp.storage.FetchConversations() -// GetIdentity -// Status: New in 1.5 -func (cp *cwtchPeer) GetIdentity() primitives.Identity { - return primitives.InitializeIdentity("", &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey) + authorizations := make(map[string]model.Authorization) + for _, conversation := range conversations { + if tor.IsValidHostname(conversation.Handle) { + if conversation.ACL[conversation.Handle].Blocked { + authorizations[conversation.Handle] = model.AuthBlocked + } else { + authorizations[conversation.Handle] = model.AuthApproved + } + } + } + + privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey") + if err != nil { + log.Errorf("error loading private key from storage") + return nil, err + } + + publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey") + if err != nil { + log.Errorf("error loading public key from storage") + return nil, err + } + + identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey)) + + return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations), nil } // SendScopedZonedGetValToContact @@ -115,47 +139,38 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k // If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then // this function will error // Status: TODO -func (cp *cwtchPeer) SendMessage(handle string, message string) error { - - var ev event.Event +func (cp *cwtchPeer) SendMessage(conversation int, message string) error { + cp.mutex.Lock() + defer cp.mutex.Unlock() // Group Handles are always 32 bytes in length, but we forgo any further testing here // and delegate the group existence check to EncryptMessageToGroup - if len(handle) == 32 { - cp.mutex.Lock() - defer cp.mutex.Unlock() - group := cp.Profile.GetGroup(handle) - if group == nil { - return errors.New("invalid group id") - } - // Group adds it's own sent message to timeline - ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle) + //cp.mutex.Lock() + //defer cp.mutex.Unlock() + //group := cp.Profile.GetGroup(handle) + //if group == nil { + // return errors.New("invalid group id") + //} + // + //// Group adds it's own sent message to timeline + //ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle) + // + //// Group does not exist or some other unrecoverable error... + //if err != nil { + // return err + //} + //ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) - // Group does not exist or some other unrecoverable error... - if err != nil { - return err - } - ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) - } else if tor.IsValidHostname(handle) { - // We assume we are sending to a Contact. - contact, err := cp.FetchConversationInfo(handle) - ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message}) - // If the contact exists replace the event id wih the index of this message in the contacts timeline... - // Otherwise assume we don't log the message in the timeline... - if contact != nil && err == nil { - //ev.EventID = strconv.Itoa(contact.Timeline.Len()) - cp.mutex.Lock() - defer cp.mutex.Unlock() - cp.storage.InsertMessage(contact.ID, 0, message, model.Attributes{"ack": event.False, "sent": time.Now().String()}) - } - // Regardless we publish the send message to peer event for the protocol engine to execute on... - // We assume this is always successful as it is always valid to attempt to - // Contact a valid hostname - } else { - return errors.New("malformed handle type") + // We assume we are sending to a Contact. + conversationInfo, err := cp.storage.GetConversation(conversation) + // If the contact exists replace the event id wih the index of this message in the contacts timeline... + // Otherwise assume we don't log the message in the timeline... + if conversationInfo != nil && err == nil { + ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: conversationInfo.Handle, event.Data: message}) + //ev.EventID = strconv.Itoa(contact.Timeline.Len()) + cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{"ack": event.False, "sent": time.Now().String()}) + cp.eventBus.Publish(ev) } - - cp.eventBus.Publish(ev) return nil } @@ -165,7 +180,7 @@ func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) { cp.mutex.Lock() defer cp.mutex.Unlock() log.Debugf("Updating Flags for %v %v %v", handle, mIdx, flags) - cp.Profile.UpdateMessageFlags(handle, mIdx, flags) + //cp.Profile.UpdateMessageFlags(handle, mIdx, flags) cp.eventBus.Publish(event.NewEvent(event.UpdateMessageFlags, map[event.Field]string{event.Handle: handle, event.Index: strconv.Itoa(mIdx), event.Flags: strconv.FormatUint(flags, 2)})) } @@ -188,12 +203,13 @@ func NewProfileWithEncryptedStorage(name string, cps *CwtchProfileStorage) Cwtch cp.shutdown = false cp.storage = cps cp.state = make(map[string]connections.ConnectionState) - cp.Profile = model.GenerateNewProfile(name) + pub, priv, _ := ed25519.GenerateKey(rand.Reader) // Store all the Necessary Base Attributes In The Database - cp.storage.StoreProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)).ToString(), []byte(name)) - cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", cp.Profile.Ed25519PrivateKey) - cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", cp.Profile.Ed25519PublicKey) + cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name) + cp.storage.StoreProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString(), []byte(tor.GetTorV3Hostname(pub))) + cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", priv) + cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", pub) return cp } @@ -211,14 +227,13 @@ func FromEncryptedStorage(cps *CwtchProfileStorage) CwtchPeer { // Deprecated - Only to be used for importing new profiles func FromProfile(profile *model.Profile, cps *CwtchProfileStorage) CwtchPeer { cp := new(cwtchPeer) - cp.Profile = profile cp.shutdown = false cp.storage = cps // Store all the Necessary Base Attributes In The Database - cp.storage.StoreProfileKeyValue(TypeAttribute, "public.profile.name", []byte(profile.Name)) - cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", cp.Profile.Ed25519PrivateKey) - cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", cp.Profile.Ed25519PublicKey) + cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, profile.Name) + cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", profile.Ed25519PrivateKey) + cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", profile.Ed25519PublicKey) return cp } @@ -232,30 +247,31 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) { // It would be nice to do these checks in the storage engine itself, but it is easier to do them here // rather than duplicating the logic to construct/reconstruct attributes in storage engine... // TODO: Remove these checks after Cwtch ~1.5 storage engine is implemented - if _, exists := cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name); !exists { - // If public.profile.name does not exist, and we have an existing public.name then: - // set public.profile.name from public.name - // set local.profile.name from public.name - if name, exists := cp.Profile.GetAttribute(attr.GetPublicScope(constants.Name)); exists { - cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name) - cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name) - } else { - // Otherwise check if local.name exists and set it from that - // If not, then check the very old unzoned, unscoped name. - // If not, then set directly from Profile.Name... - if name, exists := cp.Profile.GetAttribute(attr.GetLocalScope(constants.Name)); exists { - cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name) - cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name) - } else if name, exists := cp.Profile.GetAttribute(constants.Name); exists { - cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name) - cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name) - } else { - // Profile.Name is very deprecated at this point... - cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, cp.Profile.Name) - cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, cp.Profile.Name) - } - } - } + // TODO: Move this to import script + //if _, exists := cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name); !exists { + // // If public.profile.name does not exist, and we have an existing public.name then: + // // set public.profile.name from public.name + // // set local.profile.name from public.name + // if name, exists := cp.GetAttribute(attr.GetPublicScope(constants.Name)); exists { + // cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name) + // cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name) + // } else { + // // Otherwise check if local.name exists and set it from that + // // If not, then check the very old unzoned, unscoped name. + // // If not, then set directly from Profile.Name... + // if name, exists := cp.Profile.GetAttribute(attr.GetLocalScope(constants.Name)); exists { + // cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name) + // cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name) + // } else if name, exists := cp.Profile.GetAttribute(constants.Name); exists { + // cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name) + // cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name) + // } else { + // // Profile.Name is very deprecated at this point... + // cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, cp.Profile.Name) + // cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, cp.Profile.Name) + // } + // } + //} // At this point we can safely assume that public.profile.name exists localName, _ := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name) @@ -271,14 +287,14 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) { // profile-> name - and remove all name processing code from libcwtch-go. // If local.profile.tag does not exist then set it from deprecated GetAttribute - if _, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag); !exists { - if tag, exists := cp.Profile.GetAttribute(constants.Tag); exists { - cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) - } else { - // Assume a default password, which will allow the older profile to have it's password reset by the UI - cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, constants.ProfileTypeV1DefaultPassword) - } - } + //if _, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag); !exists { + // if tag, exists := cp.Profile.GetAttribute(constants.Tag); exists { + // cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) + // } else { + // // Assume a default password, which will allow the older profile to have it's password reset by the UI + // cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, constants.ProfileTypeV1DefaultPassword) + // } + //} } // InitForEvents @@ -305,20 +321,20 @@ func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) { // ImportGroup initializes a group from an imported source rather than a peer invite // Status: TODO -func (cp *cwtchPeer) ImportGroup(exportedInvite string) (string, error) { +func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { cp.mutex.Lock() defer cp.mutex.Unlock() - gid, err := cp.Profile.ProcessInvite(exportedInvite) - - if err == nil { - cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: gid, event.GroupInvite: exportedInvite})) - } - - return gid, err + // //gid, err := model.ProcessInvite(exportedInvite) + // + // if err == nil { + // cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: gid, event.GroupInvite: exportedInvite})) + // } + // + return -1, nil } // NewContactConversation create a new p2p conversation with the given acl applied to the handle. -func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) error { +func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted) @@ -338,6 +354,12 @@ func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) { return cp.storage.FetchConversations() } +func (cp *cwtchPeer) GetConversationInfo(conversation int) (*model.Conversation, error) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + return cp.storage.GetConversation(conversation) +} + // FetchConversationInfo returns information about the given conversation referenced by the handle func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, error) { cp.mutex.Lock() @@ -380,130 +402,84 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s return cp.storage.GetChannelMessage(conversation, channel, id) } -// ExportGroup serializes a group invite so it can be given offline -// Status: TODO -func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() - group := cp.Profile.GetGroup(groupID) - if group != nil { - return group.Invite() - } - return "", errors.New("group id could not be found") -} - // StartGroup create a new group linked to the given server and returns the group ID, an invite or an error. -// Status: TODO -func (cp *cwtchPeer) StartGroup(server string) (string, string, error) { - cp.mutex.Lock() - groupID, invite, err := cp.Profile.StartGroup(server) - cp.mutex.Unlock() +// Status: TODO change server handle to conversation id...? +func (cp *cwtchPeer) StartGroup(server string) (int, error) { + group, err := model.NewGroup(server) if err == nil { - group := cp.GetGroup(groupID) - jsobj, err := json.Marshal(group) - if err == nil { - cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{ - event.GroupID: groupID, - event.GroupServer: group.GroupServer, - event.GroupInvite: invite, - // Needed for Storage Engine... - event.Data: string(jsobj), - })) + conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true) + if err != nil { + return -1, err } - } else { - log.Errorf("error creating group: %v", err) + cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), group.GroupID) + cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer) + cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:])) + + cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{ + event.GroupID: group.GroupID, + event.GroupServer: group.GroupServer, + })) + return conversationID, nil } - return groupID, invite, err -} - -// GetGroups returns an unordered list of all group IDs. -// Status: TODO -func (cp *cwtchPeer) GetGroups() []string { - cp.mutex.Lock() - defer cp.mutex.Unlock() - return cp.Profile.GetGroups() -} - -// GetGroup returns a pointer to a specific group, nil if no group exists. -// Status: TODO -func (cp *cwtchPeer) GetGroup(groupID string) *model.Group { - cp.mutex.Lock() - defer cp.mutex.Unlock() - return cp.Profile.GetGroup(groupID) + log.Errorf("error creating group: %v", err) + return -1, err } // AddServer takes in a serialized server specification (a bundle of related keys) and adds a contact for the // server assuming there are no errors and the contact doesn't already exist. // TODO in the future this function should also integrate with a trust provider to validate the key bundle. -// Status: TODO +// Status: Ready for 1.5 func (cp *cwtchPeer) AddServer(serverSpecification string) error { - //// This confirms that the server did at least sign the bundle - //keyBundle, err := model.DeserializeAndVerify([]byte(serverSpecification)) - //if err != nil { - // return err - //} - //log.Debugf("Got new key bundle %v", keyBundle.Serialize()) - // - //// TODO if the key bundle is incomplete then error out. In the future we may allow servers to attest to new - //// keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups - //// (that way we can be assured that the keybundle we store is a valid one) - //if !keyBundle.HasKeyType(model.KeyTypeTokenOnion) || !keyBundle.HasKeyType(model.KeyTypeServerOnion) || !keyBundle.HasKeyType(model.KeyTypePrivacyPass) { - // return errors.New("keybundle is incomplete") - //} - // - //if keyBundle.HasKeyType(model.KeyTypeServerOnion) { - // onionKey, _ := keyBundle.GetKey(model.KeyTypeServerOnion) - // onion := string(onionKey) - // - // // Add the contact if we don't already have it - // if cp.GetContact(onion) == nil { - // decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) - // ab := keyBundle.AttributeBundle() - // pp := &model.PublicProfile{Name: onion, Ed25519PublicKey: decodedPub, Authorization: model.AuthUnknown, Onion: onion, Attributes: ab} - // - // // The only part of this function that actually modifies the profile... - // cp.mutex.Lock() - // cp.Profile.AddContact(onion, pp) - // cp.mutex.Unlock() - // - // pd, _ := json.Marshal(pp) - // - // // Sync the Storage Engine - // cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{ - // event.Data: string(pd), - // event.RemotePeer: onion, - // })) - // } - // - // // At this point we know the server exists - // server := cp.GetContact(onion) - // ab := keyBundle.AttributeBundle() - // - // // Check server bundle for consistency if we have different keys stored than in the tofu bundle then we - // // abort... - // for k, v := range ab { - // val, exists := server.GetAttribute(k) - // if exists { - // if val != v { - // // this is inconsistent! - // return model.InconsistentKeyBundleError - // } - // } - // // we haven't seen this key associated with the server before - // } - // - // // Store the key bundle for the server so we can reconstruct a tofubundle invite - // cp.SetContactAttribute(onion, string(model.BundleType), serverSpecification) - // - // // If we have gotten to this point we can assume this is a safe key bundle signed by the - // // server with no conflicting keys. So we are going to publish all the keys - // for k, v := range ab { - // log.Debugf("Server (%v) has %v key %v", onion, k, v) - // cp.SetContactAttribute(onion, k, v) - // } - // - // return nil - //} + // This confirms that the server did at least sign the bundle + keyBundle, err := model.DeserializeAndVerify([]byte(serverSpecification)) + if err != nil { + return err + } + log.Debugf("Got new key bundle %v", keyBundle.Serialize()) + + // TODO if the key bundle is incomplete then error out. In the future we may allow servers to attest to new + // keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups + // (that way we can be assured that the keybundle we store is a valid one) + if !keyBundle.HasKeyType(model.KeyTypeTokenOnion) || !keyBundle.HasKeyType(model.KeyTypeServerOnion) || !keyBundle.HasKeyType(model.KeyTypePrivacyPass) { + return errors.New("keybundle is incomplete") + } + + if keyBundle.HasKeyType(model.KeyTypeServerOnion) { + onionKey, _ := keyBundle.GetKey(model.KeyTypeServerOnion) + onion := string(onionKey) + + // Add the contact if we don't already have it + conversationInfo, _ := cp.FetchConversationInfo(onion) + if conversationInfo == nil { + cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true) + } + + conversationInfo, err = cp.FetchConversationInfo(onion) + if conversationInfo != nil && err == nil { + ab := keyBundle.AttributeBundle() + for k, v := range ab { + val, exists := conversationInfo.Attributes[k] + if exists { + if val != v { + // this is inconsistent! + return model.InconsistentKeyBundleError + } + } + // we haven't seen this key associated with the server before + } + + // // If we have gotten to this point we can assume this is a safe key bundle signed by the + // // server with no conflicting keys. So we are going to save all the keys + for k, v := range ab { + cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v) + } + cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))), serverSpecification) + + } else { + return err + } + return nil + } return nil } @@ -519,31 +495,21 @@ func (cp *cwtchPeer) GetServers() []string { func (cp *cwtchPeer) GetOnion() string { cp.mutex.Lock() defer cp.mutex.Unlock() - return cp.Profile.Onion + onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) + return string(onion) } // GetPeerState -// Status: TODO -func (cp *cwtchPeer) GetPeerState(onion string) (connections.ConnectionState, bool) { +// Status: Ready for 1.5 +func (cp *cwtchPeer) GetPeerState(handle string) (connections.ConnectionState, bool) { cp.mutex.Lock() defer cp.mutex.Unlock() - if state, ok := cp.state[onion]; ok { + if state, ok := cp.state[handle]; ok { return state, ok } return connections.DISCONNECTED, false } -// GetGroupState -// Status: TODO -func (cp *cwtchPeer) GetGroupState(groupid string) (connections.ConnectionState, bool) { - cp.mutex.Lock() - defer cp.mutex.Unlock() - if group, ok := cp.Profile.Groups[groupid]; ok { - return connections.ConnectionStateToType()[group.State], true - } - return connections.DISCONNECTED, false -} - // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // address. func (cp *cwtchPeer) PeerWithOnion(onion string) { @@ -552,19 +518,81 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) { // InviteOnionToGroup kicks off the invite process // Status: TODO -func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error { - cp.mutex.Lock() - group := cp.Profile.GetGroup(groupid) - if group == nil { - cp.mutex.Unlock() - return errors.New("invalid group id") +func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) error { + var invite model.MessageWrapper + + conversationInfo, err := cp.GetConversationInfo(inviteConversationID) + + if conversationInfo != nil || err != nil { + return err } - invite, err := group.Invite() - cp.mutex.Unlock() - if err == nil { - err = cp.SendMessage(onion, invite) + + // groupServer, isGroup := conversationInfo.Attributes[event.GroupServer]; isGroup { + //bundle, _ := cp.Get(profile.GetGroup(target).GroupServer).GetAttribute(string(model.BundleType)) + //inviteStr, err := profile.GetGroup(target).Invite() + //if err == nil { + // invite = model.MessageWrapper{Overlay: 101, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), inviteStr)} + //} + if tor.IsValidHostname(conversationInfo.Handle) { + invite = model.MessageWrapper{Overlay: 100, Data: conversationInfo.Handle} + } else { + // Reconstruct Group + groupID, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()] + if !ok { + return errors.New("group structure is malformed") + } + groupServer, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()] + if !ok { + return errors.New("group structure is malformed") + } + groupKeyBase64, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()] + if !ok { + return errors.New("group structure is malformed") + } + groupName, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)).ToString()] + if !ok { + return errors.New("group structure is malformed") + } + + groupKey, err := base64.StdEncoding.DecodeString(groupKeyBase64) + if err != nil { + return errors.New("malformed group key") + } + + var groupKeyFixed = [32]byte{} + copy(groupKeyFixed[:], groupKey[:]) + + group := model.Group{ + GroupID: groupID, + GroupKey: groupKeyFixed, + GroupServer: groupServer, + } + + groupInvite, err := group.Invite(groupName) + if !ok { + return errors.New("group invite is malformed") + } + + serverInfo, err := cp.FetchConversationInfo(groupServer) + if err != nil { + return errors.New("unknown server associated with group") + } + + bundle, exists := serverInfo.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))).ToString()] + if !exists { + return errors.New("server bundle not found") + } + + invite = model.MessageWrapper{Overlay: 101, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), groupInvite)} } - return err + + inviteBytes, err := json.Marshal(invite) + if err != nil { + log.Errorf("malformed invite: %v", err) + } else { + cp.SendMessage(conversationID, string(inviteBytes)) + } + return nil } // JoinServer manages a new server connection with the given onion address @@ -615,7 +643,8 @@ func (cp *cwtchPeer) Listen() { if !cp.listenStatus { log.Infof("cwtchPeer Listen sending ProtocolEngineStartListen\n") cp.listenStatus = true - cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion})) + onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) + cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: string(onion)})) } // else protocol engine is already listening @@ -662,11 +691,11 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) // TOOD maybe atomize this? ci, err := cp.FetchConversationInfo(handle) if err != nil { - err := cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false) + id, err := cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false) if err != nil { return err } - ci, err = cp.FetchConversationInfo(handle) + ci, err = cp.GetConversationInfo(id) if err != nil { return err } @@ -691,13 +720,13 @@ func (cp *cwtchPeer) eventHandler() { case event.ProtocolEngineStopped: cp.mutex.Lock() cp.listenStatus = false - log.Infof("Protocol engine for %v has stopped listening", cp.Profile.Onion) + log.Infof("Protocol engine for %v has stopped listening", cp.GetOnion()) cp.mutex.Unlock() case event.EncryptedGroupMessage: // If successful, a side effect is the message is added to the group's timeline - ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) - signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + //ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) + //signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) // SECURITY NOTE: A malicious server could insert posts such that everyone always has a different lastKnownSignature // However the server can always replace **all** messages in an attempt to track users @@ -708,42 +737,35 @@ func (cp *cwtchPeer) eventHandler() { //cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature]) cp.mutex.Lock() - ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature) + //ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature) cp.mutex.Unlock() - if ok && index > -1 { - cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)})) - } - - // The group has been compromised - if !ok && groupID != "" { - if cp.Profile.GetGroup(groupID).IsCompromised { - cp.eventBus.Publish(event.NewEvent(event.GroupCompromised, map[event.Field]string{event.GroupID: groupID})) - } - } + //if ok && index > -1 { + // cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)})) + //} case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) case event.PeerAcknowledgement: cp.mutex.Lock() - idx := cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) - edata := ev.Data - edata[event.Index] = strconv.Itoa(idx) - cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, edata)) + //idx := cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) + //edata := ev.Data + //edata[event.Index] = strconv.Itoa(idx) + //cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, edata)) cp.mutex.Unlock() case event.SendMessageToGroupError: cp.mutex.Lock() - signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupID], signature, ev.Data[event.Error]) + //signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + //cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupID], signature, ev.Data[event.Error]) cp.mutex.Unlock() case event.SendMessageToPeerError: cp.mutex.Lock() - idx := cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error]) - edata := ev.Data - edata[event.Index] = strconv.Itoa(idx) - cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, edata)) + //idx := cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error]) + //edata := ev.Data + //edata[event.Index] = strconv.Itoa(idx) + //cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, edata)) cp.mutex.Unlock() case event.RetryServerRequest: // Automated Join Server Request triggered by a plugin. diff --git a/peer/cwtchprofilestorage.go b/peer/cwtchprofilestorage.go index d4e2f9d..f47dbf2 100644 --- a/peer/cwtchprofilestorage.go +++ b/peer/cwtchprofilestorage.go @@ -48,6 +48,7 @@ type CwtchProfileStorage struct { db *sql.DB } +// ChannelID encapsulates the data necessary to reference a channel structure. type ChannelID struct { Conversation int Channel int @@ -185,35 +186,51 @@ func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key } // NewConversation stores a new conversation in the data store -func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) error { +func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) (int, error) { tx, err := cps.db.Begin() if err != nil { log.Errorf("error executing transaction: %v", err) - return err + return -1, err } result, err := tx.Stmt(cps.insertConversationStmt).Exec(handle, attributes.Serialize(), acl.Serialize(), accepted) if err != nil { log.Errorf("error executing transaction: %v", err) - return tx.Rollback() + return -1, tx.Rollback() } id, err := result.LastInsertId() if err != nil { log.Errorf("error executing transaction: %v", err) - return tx.Rollback() + return -1, tx.Rollback() } - _, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id)) + result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id)) if err != nil { log.Errorf("error executing transaction: %v", err) - return tx.Rollback() + return -1, tx.Rollback() } - return tx.Commit() + conversationID, err := result.LastInsertId() + if err != nil { + log.Errorf("error executing transaction: %v", err) + return -1, tx.Rollback() + } + + err = tx.Commit() + if err != nil { + log.Errorf("error executing transaction: %v", err) + return -1, tx.Rollback() + } + + return int(conversationID), nil } +// GetConversationByHandle is a convienance method to fetch an active conversation by a handle +// Usage Notes: This should **only** be used to look up p2p conversations by convention. +// Ideally this function should not exist, and all lookups should happen by ID (this is currently +// unavoidable in some circumstances because the event bus references conversations by handle, not by id) func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.Conversation, error) { rows, err := cps.selectConversationByHandleStmt.Query(handle) if err != nil { @@ -242,6 +259,9 @@ func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.C return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil } +// FetchConversations returns *all* active conversations. This method should only be called +// on app start up to build a summary of conversations for the UI. Any further updates should be integrated +// through the event bus. func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, error) { rows, err := cps.fetchAllConversationsStmt.Query() if err != nil { @@ -324,6 +344,7 @@ func (cps *CwtchProfileStorage) DeleteConversation(id int) error { return nil } +// SetConversationAttribute sets a new attribute on a given conversation. func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error { ci, err := cps.GetConversation(id) if err != nil { @@ -338,6 +359,7 @@ func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.Scope return nil } +// InsertMessage appends a message to a conversation channel, with a given set of attributes func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes) error { channelID := ChannelID{Conversation: conversation, Channel: channel} @@ -361,6 +383,8 @@ func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, bod return nil } +// GetChannelMessage looks up a channel message by conversation, channel and message id. On success it +// returns the message body and the attributes associated with the message. Otherwise an error is returned. func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) { channelID := ChannelID{Conversation: conversation, Channel: channel} diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 2d472ad..b740099 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -5,7 +5,6 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" - "git.openprivacy.ca/cwtch.im/tapir/primitives" "git.openprivacy.ca/openprivacy/connectivity" ) @@ -33,18 +32,10 @@ type ReadServers interface { GetServers() []string } -// ReadGroups provides read-only access to group state -type ReadGroups interface { - GetGroup(string) *model.Group - GetGroupState(string) (connections.ConnectionState, bool) - GetGroups() []string - ExportGroup(string) (string, error) -} - // ModifyGroups provides write-only access add/edit/remove new groups type ModifyGroups interface { - ImportGroup(string) (string, error) - StartGroup(string) (string, string, error) + ImportGroup(string) (int, error) + StartGroup(string) (int, error) } // ModifyServers provides write-only access to servers @@ -55,16 +46,9 @@ type ModifyServers interface { // SendMessages enables a caller to sender messages to a contact type SendMessages interface { - SendMessage(handle string, message string) error - - // Deprecated: is unsafe - SendGetValToPeer(string, string, string) - + SendMessage(conversation int, message string) error + SendInviteToConversation(conversationID int, inviteConversationID int) error SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string) - - // TODO - // Deprecated use overlays instead - InviteOnionToGroup(string, string) error } // ModifyMessages enables a caller to modify the messages in a timeline @@ -79,8 +63,8 @@ type CwtchPeer interface { // Core Cwtch Peer Functions that should not be exposed to // most functions Init(event.Manager) - GetIdentity() primitives.Identity - GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) connections.Engine + + GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error) AutoHandleEvents(events []event.Type) Listen() @@ -105,7 +89,6 @@ type CwtchPeer interface { AccessPeeringState ModifyPeeringState - ReadGroups ModifyGroups ReadServers @@ -115,8 +98,9 @@ type CwtchPeer interface { ModifyMessages // New Unified Conversation Interfaces - NewContactConversation(handle string, acl model.AccessControl, accepted bool) error + NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) FetchConversations() ([]*model.Conversation, error) + GetConversationInfo(conversation int) (*model.Conversation, error) FetchConversationInfo(handle string) (*model.Conversation, error) AcceptConversation(conversation int) error SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error diff --git a/storage/profile_store.go b/storage/profile_store.go index a6ddae7..512c3c2 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -3,12 +3,7 @@ package storage import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/storage/v0" "cwtch.im/cwtch/storage/v1" - "git.openprivacy.ca/openprivacy/log" - "io/ioutil" - "path" - "strconv" ) const profileFilename = "profile" @@ -17,11 +12,8 @@ const currentVersion = 1 // ProfileStore is an interface to managing the storage of Cwtch Profiles type ProfileStore interface { - Shutdown() - Delete() GetProfileCopy(timeline bool) *model.Profile GetNewPeerMessage() *event.Event - GetStatusMessages() []*event.Event CheckPassword(string) bool } @@ -34,8 +26,6 @@ func CreateProfileWriterStore(eventManager event.Manager, directory, password st // LoadProfileWriterStore loads a profile store from filestore listening for events and saving them // directory should be $appDir/profiles/$rand func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (ProfileStore, error) { - versionCheckUpgrade(directory, password) - return v1.LoadProfileWriterStore(eventManager, directory, password) } @@ -53,42 +43,3 @@ func NewProfile(name string) *model.Profile { } // ********* Versioning and upgrade ********** - -func detectVersion(directory string) int { - vnumberStr, err := ioutil.ReadFile(path.Join(directory, versionFile)) - if err != nil { - return 0 - } - vnumber, err := strconv.Atoi(string(vnumberStr)) - if err != nil { - log.Errorf("Could not parse VERSION file contents: '%v' - %v\n", vnumber, err) - return -1 - } - return vnumber -} - -func upgradeV0ToV1(directory, password string) error { - log.Debugln("Attempting storage v0 to v1: Reading v0 profile...") - profile, err := v0.ReadProfile(directory, password) - if err != nil { - return err - } - - log.Debugln("Attempting storage v0 to v1: Writing v1 profile...") - return v1.UpgradeV0Profile(profile, directory, password) -} - -func versionCheckUpgrade(directory, password string) { - version := detectVersion(directory) - log.Debugf("versionCheck: %v\n", version) - if version == -1 { - return - } - if version == 0 { - err := upgradeV0ToV1(directory, password) - if err != nil { - return - } - //version = 1 - } -} diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go deleted file mode 100644 index ed71ad8..0000000 --- a/storage/profile_store_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Known race issue with event bus channel closure - -package storage - -import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/storage/v0" - "fmt" - "git.openprivacy.ca/openprivacy/log" - "os" - "testing" - "time" -) - -const testingDir = "./testing" -const filenameBase = "testStream" -const password = "asdfqwer" -const line1 = "Hello from storage!" -const testProfileName = "Alice" -const testKey = "key" -const testVal = "value" -const testInitialMessage = "howdy" -const testMessage = "Hello from storage" - -func TestProfileStoreUpgradeV0toV1(t *testing.T) { - log.SetLevel(log.LevelDebug) - os.RemoveAll(testingDir) - eventBus := event.NewEventManager() - - queue := event.NewQueue() - eventBus.Subscribe(event.ChangePasswordSuccess, queue) - - fmt.Println("Creating and initializing v0 profile and store...") - profile := NewProfile(testProfileName) - profile.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &model.PublicProfile{Attributes: map[string]string{string(model.KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}}) - - ps1 := v0.NewProfileWriterStore(eventBus, testingDir, password, profile) - - groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - if err != nil { - t.Errorf("Creating group: %v\n", err) - } - if err != nil { - t.Errorf("Creating group invite: %v\n", err) - } - - ps1.AddGroup(invite) - - fmt.Println("Sending 200 messages...") - - for i := 0; i < 200; i++ { - ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage, []byte{byte(i)}) - } - - fmt.Println("Shutdown v0 profile store...") - ps1.Shutdown() - - fmt.Println("New v1 Profile store...") - ps2, err := LoadProfileWriterStore(eventBus, testingDir, password) - if err != nil { - t.Errorf("Error createing new profileStore with new password: %v\n", err) - return - } - - profile2 := ps2.GetProfileCopy(true) - - if profile2.Groups[groupid] == nil { - t.Errorf("Failed to load group %v\n", groupid) - return - } - - if len(profile2.Groups[groupid].Timeline.Messages) != 200 { - t.Errorf("Failed to load group's 200 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages)) - } -} diff --git a/storage/v0/file_enc.go b/storage/v0/file_enc.go deleted file mode 100644 index 5b885bd..0000000 --- a/storage/v0/file_enc.go +++ /dev/null @@ -1,70 +0,0 @@ -package v0 - -import ( - "crypto/rand" - "errors" - "git.openprivacy.ca/openprivacy/log" - "golang.org/x/crypto/nacl/secretbox" - "golang.org/x/crypto/pbkdf2" - "golang.org/x/crypto/sha3" - "io" - "io/ioutil" - "path" -) - -// createKey derives a key from a password -func createKey(password string) ([32]byte, [128]byte, error) { - var salt [128]byte - if _, err := io.ReadFull(rand.Reader, salt[:]); err != nil { - log.Errorf("Cannot read from random: %v\n", err) - return [32]byte{}, salt, err - } - dk := pbkdf2.Key([]byte(password), salt[:], 4096, 32, sha3.New512) - - var dkr [32]byte - copy(dkr[:], dk) - return dkr, salt, nil -} - -//encryptFileData encrypts the cwtchPeer via the specified key. -func encryptFileData(data []byte, key [32]byte) ([]byte, error) { - var nonce [24]byte - - if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { - log.Errorf("Cannot read from random: %v\n", err) - return nil, err - } - - encrypted := secretbox.Seal(nonce[:], data, &nonce, &key) - return encrypted, nil -} - -//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key. -func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) { - var decryptNonce [24]byte - copy(decryptNonce[:], ciphertext[:24]) - decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key) - if ok { - return decrypted, nil - } - return nil, errors.New("Failed to decrypt") -} - -// Load instantiates a cwtchPeer from the file store -func readEncryptedFile(directory, filename, password string) ([]byte, error) { - encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename)) - if err == nil && len(encryptedbytes) > 128 { - var dkr [32]byte - //Separate the salt from the encrypted bytes, then generate the derived key - salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:] - dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512) - copy(dkr[:], dk) - - data, err := decryptFile(encryptedbytes, dkr) - if err == nil { - return data, nil - } - return nil, err - } - return nil, err -} diff --git a/storage/v0/file_store.go b/storage/v0/file_store.go deleted file mode 100644 index 06b6ee1..0000000 --- a/storage/v0/file_store.go +++ /dev/null @@ -1,46 +0,0 @@ -package v0 - -import ( - "io/ioutil" - "path" -) - -// fileStore stores a cwtchPeer in an encrypted file -type fileStore struct { - directory string - filename string - password string -} - -// FileStore is a primitive around storing encrypted files -type FileStore interface { - Read() ([]byte, error) - Write(data []byte) error -} - -// NewFileStore instantiates a fileStore given a filename and a password -func NewFileStore(directory string, filename string, password string) FileStore { - filestore := new(fileStore) - filestore.password = password - filestore.filename = filename - filestore.directory = directory - return filestore -} - -func (fps *fileStore) Read() ([]byte, error) { - return readEncryptedFile(fps.directory, fps.filename, fps.password) -} - -// write serializes a cwtchPeer to a file -func (fps *fileStore) Write(data []byte) error { - key, salt, _ := createKey(fps.password) - encryptedbytes, err := encryptFileData(data, key) - if err != nil { - return err - } - - // the salt for the derived key is appended to the front of the file - encryptedbytes = append(salt[:], encryptedbytes...) - err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600) - return err -} diff --git a/storage/v0/profile_store.go b/storage/v0/profile_store.go deleted file mode 100644 index f5aeb9d..0000000 --- a/storage/v0/profile_store.go +++ /dev/null @@ -1,120 +0,0 @@ -package v0 - -import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "encoding/json" - "fmt" - "os" - "time" -) - -const groupIDLen = 32 -const peerIDLen = 56 -const profileFilename = "profile" - -// ProfileStoreV0 is a legacy profile store used now for upgrading legacy profile stores to newer versions -type ProfileStoreV0 struct { - fs FileStore - streamStores map[string]StreamStore // map [groupId|onion] StreamStore - directory string - password string - profile *model.Profile -} - -// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them -// directory should be $appDir/profiles/$rand -func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV0 { - os.Mkdir(directory, 0700) - ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, streamStores: map[string]StreamStore{}} - if profile != nil { - ps.save() - } - - return ps -} - -// ReadProfile reads a profile from storqage and returns the profile -// directory should be $appDir/profiles/$rand -func ReadProfile(directory, password string) (*model.Profile, error) { - os.Mkdir(directory, 0700) - ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, streamStores: map[string]StreamStore{}} - - err := ps.Load() - if err != nil { - return nil, err - } - - profile := ps.getProfileCopy(true) - - return profile, nil -} - -/********************************************************************************************/ - -// AddGroup For testing, adds a group to the profile (and starts a stream store) -func (ps *ProfileStoreV0) AddGroup(invite string) { - gid, err := ps.profile.ProcessInvite(invite) - if err == nil { - ps.save() - group := ps.profile.Groups[gid] - ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password) - } -} - -// AddGroupMessage for testing, adds a group message -func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string, signature []byte) { - received, _ := time.Parse(time.RFC3339Nano, timeRecvied) - sent, _ := time.Parse(time.RFC3339Nano, timeSent) - message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: signature, PreviousMessageSig: []byte("PreviousSignature")} - ss, exists := ps.streamStores[groupid] - if exists { - ss.Write(message) - } else { - fmt.Println("ERROR") - } -} - -// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers -func (ps *ProfileStoreV0) GetNewPeerMessage() *event.Event { - message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running") - return &message -} - -// Load instantiates a cwtchPeer from the file store -func (ps *ProfileStoreV0) Load() error { - decrypted, err := ps.fs.Read() - if err != nil { - return err - } - cp := new(model.Profile) - err = json.Unmarshal(decrypted, &cp) - if err == nil { - ps.profile = cp - - for gid, group := range cp.Groups { - ss := NewStreamStore(ps.directory, group.LocalID, ps.password) - - cp.Groups[gid].Timeline.SetMessages(ss.Read()) - ps.streamStores[group.GroupID] = ss - } - } - - return err -} - -func (ps *ProfileStoreV0) getProfileCopy(timeline bool) *model.Profile { - return ps.profile.GetCopy(timeline) -} - -// Shutdown saves the storage system -func (ps *ProfileStoreV0) Shutdown() { - ps.save() -} - -/************* Writing *************/ - -func (ps *ProfileStoreV0) save() error { - bytes, _ := json.Marshal(ps.profile) - return ps.fs.Write(bytes) -} diff --git a/storage/v0/profile_store_test.go b/storage/v0/profile_store_test.go deleted file mode 100644 index cd5db41..0000000 --- a/storage/v0/profile_store_test.go +++ /dev/null @@ -1,70 +0,0 @@ -// Known race issue with event bus channel closure - -package v0 - -import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "log" - "os" - "testing" - "time" -) - -const testProfileName = "Alice" -const testKey = "key" -const testVal = "value" -const testInitialMessage = "howdy" -const testMessage = "Hello from storage" - -// NewProfile creates a new profile for use in the profile store. -func NewProfile(name string) *model.Profile { - profile := model.GenerateNewProfile(name) - return profile -} - -func TestProfileStoreWriteRead(t *testing.T) { - log.Println("profile store test!") - os.RemoveAll(testingDir) - eventBus := event.NewEventManager() - profile := NewProfile(testProfileName) - ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) - - profile.SetAttribute(testKey, testVal) - - groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - if err != nil { - t.Errorf("Creating group: %v\n", err) - } - if err != nil { - t.Errorf("Creating group invite: %v\n", err) - } - - ps1.AddGroup(invite) - - ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage, []byte{byte(0x01)}) - - ps1.Shutdown() - - ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil) - err = ps2.Load() - if err != nil { - t.Errorf("Error createing ProfileStoreV0: %v\n", err) - } - - profile = ps2.getProfileCopy(true) - if profile.Name != testProfileName { - t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name) - } - - v, _ := profile.GetAttribute(testKey) - if v != testVal { - t.Errorf("Profile attribute '%v' incorrect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v) - } - - group2 := ps2.getProfileCopy(true).Groups[groupid] - if group2 == nil { - t.Errorf("Group not loaded\n") - } - -} diff --git a/storage/v0/stream_store.go b/storage/v0/stream_store.go deleted file mode 100644 index 22bf385..0000000 --- a/storage/v0/stream_store.go +++ /dev/null @@ -1,145 +0,0 @@ -package v0 - -import ( - "cwtch.im/cwtch/model" - "encoding/json" - "fmt" - "git.openprivacy.ca/openprivacy/log" - "io/ioutil" - "os" - "path" - "sync" -) - -const ( - fileStorePartitions = 16 - bytesPerFile = 15 * 1024 -) - -// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files -type streamStore struct { - password string - - storeDirectory string - filenameBase string - - lock sync.Mutex - - // Buffer is used just for current file to write to - messages []model.Message - bufferByteCount int -} - -// StreamStore provides a stream like interface to encrypted storage -type StreamStore interface { - Read() []model.Message - Write(m model.Message) -} - -// NewStreamStore returns an initialized StreamStore ready for reading and writing -func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) { - ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password} - os.Mkdir(ss.storeDirectory, 0700) - - ss.initBuffer() - - return ss -} - -// Read returns all messages from the backing file (not the buffer, for writing to the current file) -func (ss *streamStore) Read() (messages []model.Message) { - ss.lock.Lock() - defer ss.lock.Unlock() - - resp := []model.Message{} - - for i := fileStorePartitions - 1; i >= 0; i-- { - filename := fmt.Sprintf("%s.%d", ss.filenameBase, i) - - bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password) - if err != nil { - continue - } - - msgs := []model.Message{} - json.Unmarshal([]byte(bytes), &msgs) - resp = append(resp, msgs...) - } - - // 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without - for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ { - resp[i].Acknowledged = true - resp[i].ReceivedByServer = true - } - - return resp -} - -// ****** Writing *******/ - -func (ss *streamStore) WriteN(messages []model.Message) { - ss.lock.Lock() - defer ss.lock.Unlock() - - for _, m := range messages { - ss.updateBuffer(m) - - if ss.bufferByteCount > bytesPerFile { - ss.updateFile() - log.Debugf("rotating log file") - ss.rotateFileStore() - ss.initBuffer() - } - } -} - -// Write adds a GroupMessage to the store -func (ss *streamStore) Write(m model.Message) { - ss.lock.Lock() - defer ss.lock.Unlock() - ss.updateBuffer(m) - ss.updateFile() - - if ss.bufferByteCount > bytesPerFile { - log.Debugf("rotating log file") - ss.rotateFileStore() - ss.initBuffer() - } -} - -func (ss *streamStore) initBuffer() { - ss.messages = []model.Message{} - ss.bufferByteCount = 0 -} - -func (ss *streamStore) updateBuffer(m model.Message) { - ss.messages = append(ss.messages, m) - ss.bufferByteCount += (104 * 1.5) + len(m.Message) -} - -func (ss *streamStore) updateFile() error { - msgs, err := json.Marshal(ss.messages) - if err != nil { - log.Errorf("Failed to marshal group messages %v\n", err) - } - - // ENCRYPT - key, salt, _ := createKey(ss.password) - encryptedMsgs, err := encryptFileData(msgs, key) - if err != nil { - log.Errorf("Failed to encrypt messages: %v\n", err) - return err - } - encryptedMsgs = append(salt[:], encryptedMsgs...) - - ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700) - return nil -} - -func (ss *streamStore) rotateFileStore() { - os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1))) - - for i := fileStorePartitions - 2; i >= 0; i-- { - os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1))) - } -} diff --git a/storage/v0/stream_store_test.go b/storage/v0/stream_store_test.go deleted file mode 100644 index 7632f36..0000000 --- a/storage/v0/stream_store_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package v0 - -import ( - "cwtch.im/cwtch/model" - "os" - "testing" -) - -const testingDir = "./testing" -const filenameBase = "testStream" -const password = "asdfqwer" -const line1 = "Hello from storage!" - -func TestStreamStoreWriteRead(t *testing.T) { - os.Remove(".test.json") - os.RemoveAll(testingDir) - os.Mkdir(testingDir, 0777) - ss1 := NewStreamStore(testingDir, filenameBase, password) - m := model.Message{Message: line1} - ss1.Write(m) - - ss2 := NewStreamStore(testingDir, filenameBase, password) - messages := ss2.Read() - if len(messages) != 1 { - t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages)) - } - if messages[0].Message != line1 { - t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message) - } -} - -func TestStreamStoreWriteReadRotate(t *testing.T) { - os.Remove(".test.json") - os.RemoveAll(testingDir) - os.Mkdir(testingDir, 0777) - ss1 := NewStreamStore(testingDir, filenameBase, password) - m := model.Message{Message: line1} - for i := 0; i < 400; i++ { - ss1.Write(m) - } - - ss2 := NewStreamStore(testingDir, filenameBase, password) - messages := ss2.Read() - if len(messages) != 400 { - t.Errorf("Read messages has wrong length. Expected: 400 Actual: %d\n", len(messages)) - } - if messages[0].Message != line1 { - t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message) - } -} diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go index a76c9cf..30c9808 100644 --- a/storage/v1/profile_store.go +++ b/storage/v1/profile_store.go @@ -3,18 +3,13 @@ package v1 import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" - "encoding/base64" "encoding/json" "git.openprivacy.ca/openprivacy/log" "io/ioutil" "os" "path" - "strconv" - "time" ) -const groupIDLen = 32 -const peerIDLen = 56 const profileFilename = "profile" const version = "1" const versionFile = "VERSION" @@ -22,15 +17,11 @@ const saltFile = "SALT" //ProfileStoreV1 storage for profiles and message streams that uses in memory key and fs stored salt instead of in memory password type ProfileStoreV1 struct { - fs FileStore - streamStores map[string]StreamStore // map [groupId|onion] StreamStore - directory string - profile *model.Profile - key [32]byte - salt [128]byte - eventManager event.Manager - queue event.Queue - writer bool + fs FileStore + directory string + profile *model.Profile + key [32]byte + salt [128]byte } // CheckPassword returns true if the given password produces the same key as the current stored key, otherwise false. @@ -70,39 +61,11 @@ func CreateProfileWriterStore(eventManager event.Manager, directory, password st return nil } - ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} - ps.save() - - ps.initProfileWriterStore() + ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile} return ps } -func (ps *ProfileStoreV1) initProfileWriterStore() { - ps.queue = event.NewQueue() - go ps.eventHandler() - - ps.eventManager.Subscribe(event.SetPeerAuthorization, ps.queue) - ps.eventManager.Subscribe(event.PeerCreated, ps.queue) - ps.eventManager.Subscribe(event.GroupCreated, ps.queue) - ps.eventManager.Subscribe(event.SetAttribute, ps.queue) - ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue) - ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue) - ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue) - ps.eventManager.Subscribe(event.RejectGroupInvite, ps.queue) - ps.eventManager.Subscribe(event.NewGroup, ps.queue) - ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue) - ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue) - ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue) - ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue) - ps.eventManager.Subscribe(event.PeerStateChange, ps.queue) - ps.eventManager.Subscribe(event.ServerStateChange, ps.queue) - ps.eventManager.Subscribe(event.DeleteContact, ps.queue) - ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) - ps.eventManager.Subscribe(event.ChangePassword, ps.queue) - ps.eventManager.Subscribe(event.UpdateMessageFlags, ps.queue) -} - // LoadProfileWriterStore loads a profile store from filestore listening for events and saving them // directory should be $appDir/profiles/$rand func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (*ProfileStoreV1, error) { @@ -113,7 +76,7 @@ func LoadProfileWriterStore(eventManager event.Manager, directory, password stri key := CreateKey(password, salt) - ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} + ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil} copy(ps.salt[:], salt) err = ps.load() @@ -121,7 +84,6 @@ func LoadProfileWriterStore(eventManager event.Manager, directory, password stri return nil, err } - ps.initProfileWriterStore() return ps, nil } @@ -129,7 +91,7 @@ func LoadProfileWriterStore(eventManager event.Manager, directory, password stri // directory should be $appDir/profiles/$rand func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) { os.Mkdir(directory, 0700) - ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} + ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil} err := ps.load() if err != nil { @@ -141,24 +103,6 @@ func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile return profile, nil } -// UpgradeV0Profile takes a profile (presumably from a V0 store) and creates and writes a V1 store -func UpgradeV0Profile(profile *model.Profile, directory, password string) error { - key, salt, err := InitV1Directory(directory, password) - if err != nil { - return err - } - - ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} - ps.save() - - for gid, group := range ps.profile.Groups { - ss := NewStreamStore(ps.directory, group.LocalID, ps.key) - ss.WriteN(ps.profile.Groups[gid].Timeline.Messages) - } - - return nil -} - // NewProfile creates a new profile for use in the profile store. func NewProfile(name string) *model.Profile { profile := model.GenerateNewProfile(name) @@ -171,113 +115,6 @@ func (ps *ProfileStoreV1) GetNewPeerMessage() *event.Event { return &message } -// GetStatusMessages creates an array of status messages for all peers and group servers from current information -func (ps *ProfileStoreV1) GetStatusMessages() []*event.Event { - messages := []*event.Event{} - for _, contact := range ps.profile.Contacts { - message := event.NewEvent(event.PeerStateChange, map[event.Field]string{ - event.RemotePeer: string(contact.Onion), - event.ConnectionState: contact.State, - }) - messages = append(messages, &message) - } - - doneServers := make(map[string]bool) - for _, group := range ps.profile.Groups { - if _, exists := doneServers[group.GroupServer]; !exists { - message := event.NewEvent(event.ServerStateChange, map[event.Field]string{ - event.GroupServer: string(group.GroupServer), - event.ConnectionState: group.State, - }) - messages = append(messages, &message) - doneServers[group.GroupServer] = true - } - } - - return messages -} - -// ChangePassword restores all data under a new password's encryption -func (ps *ProfileStoreV1) ChangePassword(oldpass, newpass, eventID string) { - oldkey := CreateKey(oldpass, ps.salt[:]) - - if oldkey != ps.key { - ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID)) - return - } - - newkey := CreateKey(newpass, ps.salt[:]) - - newStreamStores := map[string]StreamStore{} - idToNewLocalID := map[string]string{} - - // Generate all new StreamStores with the new password and write all the old StreamStore data into these ones - for ssid, ss := range ps.streamStores { - // New ss with new pass and new localID - newlocalID := model.GenerateRandomID() - idToNewLocalID[ssid] = newlocalID - - newSS := NewStreamStore(ps.directory, newlocalID, newkey) - newStreamStores[ssid] = newSS - - // write whole store - messages := ss.Read() - newSS.WriteN(messages) - } - - // Switch over - oldStreamStores := ps.streamStores - ps.streamStores = newStreamStores - for ssid, newLocalID := range idToNewLocalID { - if len(ssid) == groupIDLen { - ps.profile.Groups[ssid].LocalID = newLocalID - } else { - if ps.profile.Contacts[ssid] != nil { - ps.profile.Contacts[ssid].LocalID = newLocalID - } else { - log.Errorf("Unknown Contact: %v. This is probably the result of corrupted development data from fuzzing. This contact will not appear in the new profile.", ssid) - } - } - } - - ps.key = newkey - ps.fs.ChangeKey(newkey) - ps.save() - - // Clean up - for _, oldss := range oldStreamStores { - oldss.Delete() - } - - ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID)) - return -} - -func (ps *ProfileStoreV1) save() error { - if ps.writer { - bytes, _ := json.Marshal(ps.profile) - return ps.fs.Write(bytes) - } - - return nil -} - -func (ps *ProfileStoreV1) regenStreamStore(messages []model.Message, contact string) { - oldss := ps.streamStores[contact] - newLocalID := model.GenerateRandomID() - newSS := NewStreamStore(ps.directory, newLocalID, ps.key) - newSS.WriteN(messages) - if len(contact) == groupIDLen { - ps.profile.Groups[contact].LocalID = newLocalID - } else { - // We can assume this exists as regen stream store should only happen to *update* a message - ps.profile.Contacts[contact].LocalID = newLocalID - } - ps.streamStores[contact] = newSS - ps.save() - oldss.Delete() -} - // load instantiates a cwtchPeer from the file store func (ps *ProfileStoreV1) load() error { decrypted, err := ps.fs.Read() @@ -310,7 +147,6 @@ func (ps *ProfileStoreV1) load() error { if saveHistory == event.SaveHistoryConfirmed { ss := NewStreamStore(ps.directory, contact.LocalID, ps.key) cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read()) - ps.streamStores[contact.Onion] = ss } } @@ -320,15 +156,10 @@ func (ps *ProfileStoreV1) load() error { delete(cp.Groups, gid) continue } - ss := NewStreamStore(ps.directory, group.LocalID, ps.key) - cp.Groups[gid].Timeline.SetMessages(ss.Read()) cp.Groups[gid].Timeline.Sort() - ps.streamStores[group.GroupID] = ss } - - ps.save() } return err @@ -338,238 +169,3 @@ func (ps *ProfileStoreV1) load() error { func (ps *ProfileStoreV1) GetProfileCopy(timeline bool) *model.Profile { return ps.profile.GetCopy(timeline) } - -func (ps *ProfileStoreV1) eventHandler() { - for { - ev := ps.queue.Next() - log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID) - - switch ev.EventType { - case event.SetPeerAuthorization: - err := ps.profile.SetContactAuthorization(ev.Data[event.RemotePeer], model.Authorization(ev.Data[event.Authorization])) - if err == nil { - ps.save() - } - case event.PeerCreated: - var pp *model.PublicProfile - json.Unmarshal([]byte(ev.Data[event.Data]), &pp) - ps.profile.AddContact(ev.Data[event.RemotePeer], pp) - case event.GroupCreated: - var group *model.Group - json.Unmarshal([]byte(ev.Data[event.Data]), &group) - ps.profile.AddGroup(group) - ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key) - ps.save() - case event.SetAttribute: - ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) - ps.save() - case event.SetPeerAttribute: - contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer]) - if exists { - contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) - ps.save() - - switch ev.Data[event.Key] { - case event.SaveHistoryKey: - if event.DeleteHistoryConfirmed == ev.Data[event.Data] { - ss, exists := ps.streamStores[ev.Data[event.RemotePeer]] - if exists { - ss.Delete() - delete(ps.streamStores, ev.Data[event.RemotePeer]) - } - } else if event.SaveHistoryConfirmed == ev.Data[event.Data] { - _, exists := ps.streamStores[ev.Data[event.RemotePeer]] - if !exists { - ss := NewStreamStore(ps.directory, contact.LocalID, ps.key) - ps.streamStores[ev.Data[event.RemotePeer]] = ss - } - } - default: - { - } - } - - } else { - log.Errorf("error setting attribute on peer %v peer does not exist", ev) - } - case event.SetGroupAttribute: - group := ps.profile.GetGroup(ev.Data[event.GroupID]) - if group != nil { - group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data]) - ps.save() - } else { - log.Errorf("error setting attribute on group %v group does not exist", ev) - } - case event.AcceptGroupInvite: - err := ps.profile.AcceptInvite(ev.Data[event.GroupID]) - if err == nil { - ps.save() - } else { - log.Errorf("error accepting group invite") - } - case event.RejectGroupInvite: - ps.profile.RejectInvite(ev.Data[event.GroupID]) - ps.save() - case event.NewGroup: - gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite]) - if err == nil { - ps.save() - group := ps.profile.Groups[gid] - ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key) - } else { - log.Errorf("error storing new group invite: %v (%v)", err, ev) - } - case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store. - // stream store doesn't support updates, so we don't want to commit it till ack'd - ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID) - case event.NewMessageFromPeer: - ps.profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now()) - ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true) - case event.PeerAcknowledgement: - onion := ev.Data[event.RemotePeer] - eventID := ev.Data[event.EventID] - contact, ok := ps.profile.Contacts[onion] - if ok { - mIdx, ok := contact.UnacknowledgedMessages[eventID] - if ok { - message := contact.Timeline.Messages[mIdx] - ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false) - } - } - ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) - case event.NewMessageFromGroup: - groupid := ev.Data[event.GroupID] - received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) - sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) - sig, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - prevsig, _ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature]) - message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true} - ss, exists := ps.streamStores[groupid] - if exists { - // We need to store a local copy of the message... - ps.profile.GetGroup(groupid).Timeline.Insert(&message) - ss.Write(message) - } else { - log.Errorf("error storing new group message: %v stream store does not exist", ev) - } - case event.PeerStateChange: - if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists { - ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] - } - case event.ServerStateChange: - for _, group := range ps.profile.Groups { - if group.GroupServer == ev.Data[event.GroupServer] { - group.State = ev.Data[event.ConnectionState] - } - } - case event.DeleteContact: - onion := ev.Data[event.RemotePeer] - ps.profile.DeleteContact(onion) - ps.save() - ss, exists := ps.streamStores[onion] - if exists { - ss.Delete() - delete(ps.streamStores, onion) - } - case event.DeleteGroup: - groupID := ev.Data[event.GroupID] - ps.profile.DeleteGroup(groupID) - ps.save() - ss, exists := ps.streamStores[groupID] - if exists { - ss.Delete() - delete(ps.streamStores, groupID) - } - case event.ChangePassword: - oldpass := ev.Data[event.Password] - newpass := ev.Data[event.NewPassword] - ps.ChangePassword(oldpass, newpass, ev.EventID) - case event.UpdateMessageFlags: - handle := ev.Data[event.Handle] - mIx, err := strconv.Atoi(ev.Data[event.Index]) - if err != nil { - log.Errorf("Invalid Message Index: %v", err) - return - } - flags, err := strconv.ParseUint(ev.Data[event.Flags], 2, 64) - if err != nil { - log.Errorf("Invalid Message Flags: %v", err) - return - } - ps.profile.UpdateMessageFlags(handle, mIx, flags) - if len(handle) == groupIDLen { - ps.regenStreamStore(ps.profile.GetGroup(handle).Timeline.Messages, handle) - } else if contact, exists := ps.profile.GetContact(handle); exists { - if exists { - val, _ := contact.GetAttribute(event.SaveHistoryKey) - if val == event.SaveHistoryConfirmed { - ps.regenStreamStore(contact.Timeline.Messages, handle) - } - } - } - default: - log.Debugf("shutting down profile store: %v", ev) - return - } - - } -} - -// attemptSavePeerMessage checks if the peer has been configured to save history from this peer -// and if so the peer saves the message into history. fromPeer is used to control if the message is saved -// as coming from the remote peer or if it was sent by out profile. -func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) { - contact, exists := ps.profile.GetContact(peerID) - if exists { - val, _ := contact.GetAttribute(event.SaveHistoryKey) - switch val { - case event.SaveHistoryConfirmed: - { - peerID := peerID - var received time.Time - var message model.Message - if fromPeer { - received, _ = time.Parse(time.RFC3339Nano, timestampeReceived) - message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}} - } else { - received := time.Now() - message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true} - } - ss, exists := ps.streamStores[peerID] - if exists { - ss.Write(message) - } else { - log.Errorf("error storing new peer message: %v stream store does not exist", peerID) - } - } - default: - { - } - } - } else { - log.Errorf("error saving message for peer that doesn't exist: %v", peerID) - } -} - -// Shutdown shuts down the queue / thread -func (ps *ProfileStoreV1) Shutdown() { - if ps.queue != nil { - ps.queue.Shutdown() - } -} - -// Delete removes all stored files for this stored profile -func (ps *ProfileStoreV1) Delete() { - log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion) - - for _, ss := range ps.streamStores { - ss.Delete() - } - - ps.fs.Delete() - - err := os.RemoveAll(ps.directory) - if err != nil { - log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err) - } -} diff --git a/storage/v1/profile_store_test.go b/storage/v1/profile_store_test.go deleted file mode 100644 index 6cae401..0000000 --- a/storage/v1/profile_store_test.go +++ /dev/null @@ -1,159 +0,0 @@ -// Known race issue with event bus channel closure - -package v1 - -import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "encoding/base64" - "fmt" - "log" - "os" - "testing" - "time" -) - -const testProfileName = "Alice" -const testKey = "key" -const testVal = "value" -const testInitialMessage = "howdy" -const testMessage = "Hello from storage" - -func TestProfileStoreWriteRead(t *testing.T) { - log.Println("profile store test!") - os.RemoveAll(testingDir) - eventBus := event.NewEventManager() - profile := NewProfile(testProfileName) - // The lightest weight server entry possible (usually we would import a key bundle...) - profile.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &model.PublicProfile{Attributes: map[string]string{string(model.KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}}) - - ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile) - - eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal})) - time.Sleep(1 * time.Second) - - groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - if err != nil { - t.Errorf("Creating group: %v\n", err) - } - if err != nil { - t.Errorf("Creating group invite: %v\n", err) - } - - eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) - time.Sleep(1 * time.Second) - - eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ - event.GroupID: groupid, - event.TimestampSent: time.Now().Format(time.RFC3339Nano), - event.TimestampReceived: time.Now().Format(time.RFC3339Nano), - event.RemotePeer: ps1.GetProfileCopy(true).Onion, - event.Data: testMessage, - })) - time.Sleep(1 * time.Second) - - ps1.Shutdown() - - ps2, err := LoadProfileWriterStore(eventBus, testingDir, password) - if err != nil { - t.Errorf("Error createing ProfileStoreV1: %v\n", err) - } - - profile = ps2.GetProfileCopy(true) - if profile.Name != testProfileName { - t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name) - } - - v, _ := profile.GetAttribute(testKey) - if v != testVal { - t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v) - } - - group2 := ps2.GetProfileCopy(true).Groups[groupid] - if group2 == nil { - t.Errorf("Group not loaded\n") - } - -} - -func TestProfileStoreChangePassword(t *testing.T) { - os.RemoveAll(testingDir) - eventBus := event.NewEventManager() - - queue := event.NewQueue() - eventBus.Subscribe(event.ChangePasswordSuccess, queue) - - profile := NewProfile(testProfileName) - profile.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &model.PublicProfile{Attributes: map[string]string{string(model.KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}}) - - ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile) - - groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - if err != nil { - t.Errorf("Creating group: %v\n", err) - } - if err != nil { - t.Errorf("Creating group invite: %v\n", err) - } - - eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) - time.Sleep(1 * time.Second) - - fmt.Println("Sending 200 messages...") - - for i := 0; i < 200; i++ { - eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ - event.GroupID: groupid, - event.TimestampSent: time.Now().Format(time.RFC3339Nano), - event.TimestampReceived: time.Now().Format(time.RFC3339Nano), - event.RemotePeer: profile.Onion, - event.Data: testMessage, - event.Signature: base64.StdEncoding.EncodeToString([]byte{byte(i)}), - })) - } - - newPass := "qwerty123" - - fmt.Println("Sending Change Passwords event...") - eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass)) - - ev := queue.Next() - if ev.EventType != event.ChangePasswordSuccess { - t.Errorf("Unexpected event response detected %v\n", ev.EventType) - return - } - - fmt.Println("Sending 10 more messages...") - for i := 0; i < 10; i++ { - eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ - event.GroupID: groupid, - event.TimestampSent: time.Now().Format(time.RFC3339Nano), - event.TimestampReceived: time.Now().Format(time.RFC3339Nano), - event.RemotePeer: profile.Onion, - event.Data: testMessage, - event.Signature: base64.StdEncoding.EncodeToString([]byte{0x01, byte(i)}), - })) - } - time.Sleep(3 * time.Second) - - fmt.Println("Shutdown profile store...") - ps1.Shutdown() - - fmt.Println("New Profile store...") - ps2, err := LoadProfileWriterStore(eventBus, testingDir, newPass) - if err != nil { - t.Errorf("Error createing new ProfileStoreV1 with new password: %v\n", err) - return - } - - profile2 := ps2.GetProfileCopy(true) - - if profile2.Groups[groupid] == nil { - t.Errorf("Failed to load group %v\n", groupid) - return - } - - if len(profile2.Groups[groupid].Timeline.Messages) != 210 { - t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages)) - } -} diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 8d23ce5..8393e0a 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -40,22 +40,22 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int { return numVerified } -func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) { +func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, serverAddr string) { peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name) for { fmt.Printf("%v checking group connection...\n", peerName) - state, ok := peer.GetGroupState(groupID) + state, ok := peer.GetPeerState(serverAddr) if ok { - fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peerName, groupID, state) + fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peerName, serverAddr, state) if state == connections.FAILED { - t.Fatalf("%v could not connect to %v", peer.GetOnion(), groupID) + t.Fatalf("%v could not connect to %v", peer.GetOnion(), serverAddr) } if state != connections.SYNCED { - fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peerName, peer.GetOnion(), groupID, connections.ConnectionStateName[state]) + fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peerName, peer.GetOnion(), serverAddr, connections.ConnectionStateName[state]) time.Sleep(time.Second * 5) continue } else { - fmt.Printf("peer %v %v CONNECTED to group %v\n", peerName, peer.GetOnion(), groupID) + fmt.Printf("peer %v %v CONNECTED to group %v\n", peerName, peer.GetOnion(), serverAddr) break } } @@ -185,15 +185,15 @@ func TestCwtchPeerIntegration(t *testing.T) { alice.PeerWithOnion(carol.GetOnion()) fmt.Println("Creating group on ", ServerAddr, "...") - groupID, _, err := alice.StartGroup(ServerAddr) - fmt.Printf("Created group: %v!\n", groupID) + aliceGroupConversationID, err := alice.StartGroup(ServerAddr) + fmt.Printf("Created group: %v!\n", aliceGroupConversationID) if err != nil { t.Errorf("Failed to init group: %v", err) return } fmt.Println("Waiting for alice to join server...") - waitForPeerGroupConnection(t, alice, groupID) + waitForPeerGroupConnection(t, alice, ServerAddr) fmt.Println("Waiting for alice and Bob to peer...") waitForPeerPeerConnection(t, alice, bob) @@ -219,31 +219,31 @@ func TestCwtchPeerIntegration(t *testing.T) { // Probably related to latency/throughput problems in the underlying tor network. time.Sleep(30 * time.Second) - aliceName, err := bob.GetConversationAttribute(alice.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) + aliceName, err := bob.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || aliceName != "Alice" { t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err) } fmt.Printf("Bob has alice's name as '%v'\n", aliceName) - bobName, err := alice.GetConversationAttribute(bob.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) + bobName, err := alice.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || bobName != "Bob" { t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err) } fmt.Printf("Alice has bob's name as '%v'\n", bobName) - aliceName, err = carol.GetConversationAttribute(alice.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) + aliceName, err = carol.GetConversationAttribute(2, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || aliceName != "Alice" { t.Fatalf("carol GetKeyVal error for alice peer.name %v: %v\n", aliceName, err) } - carolName, err := alice.GetConversationAttribute(carol.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) + carolName, err := alice.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name))) if err != nil || carolName != "Carol" { t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err) } fmt.Printf("Alice has carol's name as '%v'\n", carolName) fmt.Println("Alice inviting Bob to group...") - err = alice.InviteOnionToGroup(bob.GetOnion(), groupID) + err = alice.SendInviteToConversation(1, aliceGroupConversationID) if err != nil { t.Fatalf("Error for Alice inviting Bob to group: %v", err) } @@ -265,7 +265,7 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Waiting for Bob to join connect to group server...") - waitForPeerGroupConnection(t, bob, groupID) + waitForPeerGroupConnection(t, bob, ServerAddr) numGoRoutinesPostServerConnect := runtime.NumGoroutine() @@ -274,7 +274,7 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Starting conversation in group...") // Conversation fmt.Printf("%v> %v\n", aliceName, aliceLines[0]) - err = alice.SendMessage(groupID, aliceLines[0]) + err = alice.SendMessage(aliceGroupConversationID, aliceLines[0]) if err != nil { t.Fatalf("Alice failed to send a message to the group: %v", err) } diff --git a/testing/encryptedstorage/encrypted_storage_integration_test.go b/testing/encryptedstorage/encrypted_storage_integration_test.go index 0bb37f0..ea92393 100644 --- a/testing/encryptedstorage/encrypted_storage_integration_test.go +++ b/testing/encryptedstorage/encrypted_storage_integration_test.go @@ -51,7 +51,6 @@ func TestEncryptedStorage(t *testing.T) { fmt.Println("Creating Alice...") - defer acn.Close() acn.WaitTillBootstrapped() app := app2.NewApp(acn, cwtchDir) @@ -76,11 +75,9 @@ func TestEncryptedStorage(t *testing.T) { time.Sleep(time.Second * 30) - alice.SendMessage(bob.GetOnion(), "Hello Bob") + alice.SendMessage(2, "Hello Bob") time.Sleep(time.Second * 30) - - ci, _ := bob.FetchConversationInfo(alice.GetOnion()) body, _, err := bob.GetChannelMessage(ci.ID, 0, 1) if body != "Hello Bob" || err != nil { diff --git a/testing/filesharing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go index e7de8d4..dcf7ab4 100644 --- a/testing/filesharing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -96,6 +96,7 @@ func TestFileSharing(t *testing.T) { fmt.Println("Creating Bob...") app.CreateTaggedPeer("bob", "asdfasdf", "testing") + t.Logf("** Waiting for Alice, Bob...") alice := utils.WaitGetPeer(app, "alice") bob := utils.WaitGetPeer(app, "bob") @@ -105,14 +106,15 @@ func TestFileSharing(t *testing.T) { queueOracle := event.NewQueue() app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle) + t.Logf("** Launching Peers...") app.LaunchPeers() waitTime := time.Duration(30) * time.Second t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime) time.Sleep(waitTime) - bob.NewContactConversation(alice.GetOnion(),model.DefaultP2PAccessControl(), true) - alice.NewContactConversation(bob.GetOnion(),model.DefaultP2PAccessControl(), true) + bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true) + alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) alice.PeerWithOnion(bob.GetOnion()) fmt.Println("Waiting for alice and Bob to peer...") @@ -122,7 +124,7 @@ func TestFileSharing(t *testing.T) { filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{"filesharing": true}) - err = filesharingFunctionality.ShareFile("cwtch.png", alice, bob.GetOnion()) + err = filesharingFunctionality.ShareFile("cwtch.png", alice, 1) if err != nil { t.Fatalf("Error!: %v", err) @@ -131,10 +133,10 @@ func TestFileSharing(t *testing.T) { // Wait for the messages to arrive... time.Sleep(time.Second * 10) - message,_,err := bob.GetChannelMessage(1,0, 1) - if err != nil { - t.Fatalf("could not find file sharing message: %v", err) - } + message, _, err := bob.GetChannelMessage(1, 0, 1) + if err != nil { + t.Fatalf("could not find file sharing message: %v", err) + } var messageWrapper model.MessageWrapper json.Unmarshal([]byte(message), &messageWrapper) @@ -148,7 +150,6 @@ func TestFileSharing(t *testing.T) { } } - // Wait for the file downloaded event ev := queueOracle.Next() if ev.EventType != event.FileDownloaded {