Purging old Profile / Storage Code - Start of Group Integration
continuous-integration/drone/push Build is pending Details

This commit is contained in:
Sarah Jamie Lewis 2021-11-10 16:41:43 -08:00
parent 3d0ed3d4b0
commit 62d2497843
26 changed files with 625 additions and 2233 deletions

View File

@ -107,7 +107,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()] = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
if tag != "" {
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
@ -180,7 +180,6 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf
_, exists := ac.eventBuses[profile.Onion]
if exists {
profileStore.Shutdown()
eventBus.Shutdown()
log.Errorf("profile for onion %v already exists", profile.Onion)
continue
@ -204,7 +203,7 @@ func (app *application) LoadProfiles(password string) {
profile.Init(app.eventBuses[profile.GetOnion()])
app.appmutex.Lock()
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()] = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appmutex.Unlock()
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
count++

View File

@ -54,7 +54,7 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, down
// ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file
// at filepath
func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handle string) error {
func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, conversationID int) error {
manifest, err := files.CreateManifest(filepath)
if err != nil {
return err
@ -93,7 +93,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl
profile.ShareFile(key, string(serializedManifest))
profile.SendMessage(handle, string(wrapperJSON))
profile.SendMessage(conversationID, string(wrapperJSON))
return nil
}

View File

@ -17,9 +17,15 @@ const (
// ProfileZone for attributes related to profile details like name and profile image
ProfileZone = Zone("profile")
// LegacyGroupZone for attributes related to legacy group experiment
LegacyGroupZone = Zone("legacygroup")
// FilesharingZone for attributes related to file sharing
FilesharingZone = Zone("filesharing")
// ServerKeyZone for attributes related to Server Keys
ServerKeyZone = Zone("serverkey")
// UnknownZone is a catch all useful for error handling
UnknownZone = Zone("unknown")
)
@ -44,8 +50,12 @@ func ParseZone(path string) (Zone, string) {
switch Zone(parts[0]) {
case ProfileZone:
return ProfileZone, parts[1]
case LegacyGroupZone:
return LegacyGroupZone, parts[1]
case FilesharingZone:
return FilesharingZone, parts[1]
case ServerKeyZone:
return ServerKeyZone, parts[1]
default:
return UnknownZone, parts[1]
}

View File

@ -3,6 +3,9 @@ package constants
// Name refers to a Profile Name
const Name = "name"
// Onion refers the Onion address of the profile
const Onion = "onion"
// Tag describes the type of a profile e.g. default password / encrypted etc.
const Tag = "tag"
@ -11,3 +14,12 @@ const ProfileTypeV1DefaultPassword = "v1-defaultPassword"
// ProfileTypeV1Password is a tag describing a profile encrypted derived from a user-provided password.
const ProfileTypeV1Password = "v1-userPassword"
// GroupID is the ID of a group
const GroupID = "groupid"
// GroupServer identifies the Server the legacy group is hosted on
const GroupServer = "groupserver"
// GroupKey is the name of the group key attribute...
const GroupKey = "groupkey"

View File

@ -18,24 +18,29 @@ func DefaultP2PAccessControl() AccessControl {
// functions
type AccessControlList map[string]AccessControl
// Serialize transforms the ACL into json.
func (acl *AccessControlList) Serialize() []byte {
data, _ := json.Marshal(acl)
return data
}
// DeserializeAccessControlList takes in JSON and returns an AccessControlList
func DeserializeAccessControlList(data []byte) AccessControlList {
var acl AccessControlList
json.Unmarshal(data, &acl)
return acl
}
// Attributes a type-driven encapsulation of an Attribute map.
type Attributes map[string]string
// Serialize transforms an Attributes map into a JSON struct
func (a *Attributes) Serialize() []byte {
data, _ := json.Marshal(a)
return data
}
// DeserializeAttributes convers a JSON struct into an Attributes map
func DeserializeAttributes(data []byte) Attributes {
var attributes Attributes
json.Unmarshal(data, &attributes)

View File

@ -4,8 +4,6 @@ import (
"crypto/ed25519"
"crypto/rand"
"crypto/sha512"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"encoding/base32"
"encoding/base64"
@ -19,8 +17,6 @@ import (
"golang.org/x/crypto/pbkdf2"
"io"
"strings"
"sync"
"time"
)
// CurrentGroupVersion is used to set the version of newly created groups and make sure group structs stored are correct and up to date
@ -33,25 +29,17 @@ const GroupInvitePrefix = "torv3"
// tied to a server under a given group key. Each group has a set of Messages.
type Group struct {
// GroupID is now derived from the GroupKey and the GroupServer
GroupID string
GroupKey [32]byte
GroupServer string
Timeline Timeline `json:"-"`
Accepted bool
IsCompromised bool
Attributes map[string]string
lock sync.Mutex
LocalID string
State string `json:"-"`
Version int
GroupID string
GroupKey [32]byte
GroupServer string
Version int
Timeline Timeline `json:"-"`
LocalID string
}
// NewGroup initializes a new group associated with a given CwtchServer
func NewGroup(server string) (*Group, error) {
group := new(Group)
group.Version = CurrentGroupVersion
group.LocalID = GenerateRandomID()
group.Accepted = true // we are starting a group, so we assume we want to connect to it...
if !tor.IsValidHostname(server) {
return nil, errors.New("server is not a valid v3 onion")
}
@ -68,11 +56,6 @@ func NewGroup(server string) (*Group, error) {
// Derive Group ID from the group key and the server public key. This binds the group to a particular server
// and key.
group.GroupID = deriveGroupID(groupKey[:], server)
group.Attributes = make(map[string]string)
// By default we set the "name" of the group to a random string, we can override this later, but to simplify the
// codes around invite, we assume that this is always set.
group.Attributes[attr.GetLocalScope(constants.Name)] = group.GroupID
return group, nil
}
@ -89,17 +72,12 @@ func deriveGroupID(groupKey []byte, serverHostname string) string {
return hex.EncodeToString(pbkdf2.Key(groupKey, pubkey, 4096, 16, sha512.New))
}
// Compromised should be called if we detect a groupkey leak
func (g *Group) Compromised() {
g.IsCompromised = true
}
// Invite generates a invitation that can be sent to a cwtch peer
func (g *Group) Invite() (string, error) {
func (g *Group) Invite(name string) (string, error) {
gci := &groups.GroupInvite{
GroupID: g.GroupID,
GroupName: g.Attributes[attr.GetLocalScope(constants.Name)],
GroupName: name,
SharedKey: g.GroupKey[:],
ServerHost: g.GroupServer,
}
@ -109,74 +87,74 @@ func (g *Group) Invite() (string, error) {
return serializedInvite, err
}
// AddSentMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte) Message {
g.lock.Lock()
defer g.lock.Unlock()
timelineMessage := Message{
Message: message.Text,
Timestamp: time.Unix(int64(message.Timestamp), 0),
Received: time.Unix(0, 0),
Signature: sig,
PeerID: message.Onion,
PreviousMessageSig: message.PreviousMessageSig,
ReceivedByServer: false,
}
g.Timeline.Insert(&timelineMessage)
return timelineMessage
}
//// AddSentMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
//func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte) Message {
// g.lock.Lock()
// defer g.lock.Unlock()
// timelineMessage := Message{
// Message: message.Text,
// Timestamp: time.Unix(int64(message.Timestamp), 0),
// Received: time.Unix(0, 0),
// Signature: sig,
// PeerID: message.Onion,
// PreviousMessageSig: message.PreviousMessageSig,
// ReceivedByServer: false,
// }
// g.Timeline.Insert(&timelineMessage)
// return timelineMessage
//}
// ErrorSentMessage removes a sent message from the unacknowledged list and sets its error flag if found, otherwise returns false
func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
g.lock.Lock()
defer g.lock.Unlock()
//// ErrorSentMessage removes a sent message from the unacknowledged list and sets its error flag if found, otherwise returns false
//func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
// g.lock.Lock()
// defer g.lock.Unlock()
//
// return g.Timeline.SetSendError(sig, error)
//}
return g.Timeline.SetSendError(sig, error)
}
//// GetMessage returns the message at index `index` if it exists. Otherwise returns false.
//// This routine also returns the length of the timeline
//// If go has an optional type this would return Option<Message>...
//func (g *Group) GetMessage(index int) (bool, Message, int) {
// g.lock.Lock()
// defer g.lock.Unlock()
//
// length := len(g.Timeline.Messages)
//
// if length > index {
// return true, g.Timeline.Messages[index], length
// }
// return false, Message{}, length
//}
// GetMessage returns the message at index `index` if it exists. Otherwise returns false.
// This routine also returns the length of the timeline
// If go has an optional type this would return Option<Message>...
func (g *Group) GetMessage(index int) (bool, Message, int) {
g.lock.Lock()
defer g.lock.Unlock()
//// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
//func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) {
//
// g.lock.Lock()
// defer g.lock.Unlock()
//
// timelineMessage := &Message{
// Message: message.Text,
// Timestamp: time.Unix(int64(message.Timestamp), 0),
// Received: time.Now(),
// Signature: sig,
// PeerID: message.Onion,
// PreviousMessageSig: message.PreviousMessageSig,
// ReceivedByServer: true,
// Error: "",
// Acknowledged: true,
// }
// index := g.Timeline.Insert(timelineMessage)
//
// return timelineMessage, index
//}
length := len(g.Timeline.Messages)
if length > index {
return true, g.Timeline.Messages[index], length
}
return false, Message{}, length
}
// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) {
g.lock.Lock()
defer g.lock.Unlock()
timelineMessage := &Message{
Message: message.Text,
Timestamp: time.Unix(int64(message.Timestamp), 0),
Received: time.Now(),
Signature: sig,
PeerID: message.Onion,
PreviousMessageSig: message.PreviousMessageSig,
ReceivedByServer: true,
Error: "",
Acknowledged: true,
}
index := g.Timeline.Insert(timelineMessage)
return timelineMessage, index
}
// GetTimeline provides a safe copy of the timeline
func (g *Group) GetTimeline() (timeline []Message) {
g.lock.Lock()
defer g.lock.Unlock()
return g.Timeline.GetMessages()
}
//// GetTimeline provides a safe copy of the timeline
//func (g *Group) GetTimeline() (timeline []Message) {
// g.lock.Lock()
// defer g.lock.Unlock()
// return g.Timeline.GetMessages()
//}
//EncryptMessage takes a message and encrypts the message under the group key.
func (g *Group) EncryptMessage(message *groups.DecryptedGroupMessage) ([]byte, error) {
@ -211,21 +189,6 @@ func (g *Group) DecryptMessage(ciphertext []byte) (bool, *groups.DecryptedGroupM
return false, nil
}
// SetAttribute allows applications to store arbitrary configuration info at the group level.
func (g *Group) SetAttribute(name string, value string) {
g.lock.Lock()
defer g.lock.Unlock()
g.Attributes[name] = value
}
// GetAttribute returns the value of a value set with SetAttribute. If no such value has been set exists is set to false.
func (g *Group) GetAttribute(name string) (value string, exists bool) {
g.lock.Lock()
defer g.lock.Unlock()
value, exists = g.Attributes[name]
return
}
// ValidateInvite takes in a serialized invite and returns the invite structure if it is cryptographically valid
// and an error if it is not
func ValidateInvite(invite string) (*groups.GroupInvite, error) {

View File

@ -4,7 +4,6 @@ import (
"crypto/sha256"
"cwtch.im/cwtch/protocol/groups"
"strings"
"sync"
"testing"
"time"
)
@ -20,7 +19,7 @@ func TestGroup(t *testing.T) {
Padding: []byte{},
}
invite, err := g.Invite()
invite, err := g.Invite("name")
if err != nil {
t.Fatalf("error creating group invite: %v", err)
@ -42,11 +41,7 @@ func TestGroup(t *testing.T) {
t.Errorf("group encryption was invalid, or returned wrong message decrypted:%v message:%v", ok, message)
return
}
g.SetAttribute("test", "test_value")
value, exists := g.GetAttribute("test")
if !exists || value != "test_value" {
t.Errorf("Custom Attribute Should have been set, instead %v %v", exists, value)
}
t.Logf("Got message %v", message)
}
@ -61,20 +56,15 @@ func TestGroupErr(t *testing.T) {
func TestGroupValidation(t *testing.T) {
group := &Group{
GroupID: "",
GroupKey: [32]byte{},
GroupServer: "",
Timeline: Timeline{},
Accepted: false,
IsCompromised: false,
Attributes: nil,
lock: sync.Mutex{},
LocalID: "",
State: "",
Version: 0,
GroupID: "",
GroupKey: [32]byte{},
GroupServer: "",
Timeline: Timeline{},
LocalID: "",
Version: 0,
}
invite, _ := group.Invite()
invite, _ := group.Invite("name")
_, err := ValidateInvite(invite)
if err == nil {
@ -85,7 +75,7 @@ func TestGroupValidation(t *testing.T) {
// Generate a valid group but replace the group server...
group, _ = NewGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
group.GroupServer = "tcnkoch4nyr3cldkemejtkpqok342rbql6iclnjjs3ndgnjgufzyxvqd"
invite, _ = group.Invite()
invite, _ = group.Invite("name")
_, err = ValidateInvite(invite)
if err == nil {
@ -96,7 +86,7 @@ func TestGroupValidation(t *testing.T) {
// Generate a valid group but replace the group key...
group, _ = NewGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
group.GroupKey = sha256.Sum256([]byte{})
invite, _ = group.Invite()
invite, _ = group.Invite("name")
_, err = ValidateInvite(invite)
if err == nil {

View File

@ -1,127 +0,0 @@
package model
import (
"strconv"
"testing"
"time"
)
func TestMessagePadding(t *testing.T) {
// Setup the Group
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")
sarah.AddContact(alice.Onion, &alice.PublicProfile)
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(invite)
group := alice.GetGroup(gid)
c1, s1, err := sarah.EncryptMessageToGroup("Hello World 1", group.GroupID)
t.Logf("Length of Encrypted Message: %v %v", len(c1), err)
alice.AttemptDecryption(c1, s1)
c2, s2, _ := alice.EncryptMessageToGroup("Hello World 2", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c2))
alice.AttemptDecryption(c2, s2)
c3, s3, _ := alice.EncryptMessageToGroup("Hello World 3", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c3))
alice.AttemptDecryption(c3, s3)
c4, s4, _ := alice.EncryptMessageToGroup("Hello World this is a much longer message 3", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c4))
alice.AttemptDecryption(c4, s4)
}
func TestTranscriptConsistency(t *testing.T) {
timeline := new(Timeline)
// Setup the Group
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")
sarah.AddContact(alice.Onion, &alice.PublicProfile)
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
// The lightest weight server entry possible (usually we would import a key bundle...)
sarah.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}})
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(invite)
group := alice.GetGroup(gid)
t.Logf("group: %v, sarah %v", group, sarah)
c1, s1, _ := alice.EncryptMessageToGroup("Hello World 1", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c1))
alice.AttemptDecryption(c1, s1)
c2, s2, _ := alice.EncryptMessageToGroup("Hello World 2", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c2))
alice.AttemptDecryption(c2, s2)
c3, s3, _ := alice.EncryptMessageToGroup("Hello World 3", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c3))
alice.AttemptDecryption(c3, s3)
time.Sleep(time.Second * 1)
c4, s4, _ := alice.EncryptMessageToGroup("Hello World 4", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c4))
alice.AttemptDecryption(c4, s4)
c5, s5, _ := alice.EncryptMessageToGroup("Hello World 5", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c5))
_, _, m1, _ := sarah.AttemptDecryption(c1, s1)
sarah.AttemptDecryption(c1, s1) // Try a duplicate
_, _, m2, _ := sarah.AttemptDecryption(c2, s2)
_, _, m3, _ := sarah.AttemptDecryption(c3, s3)
_, _, m4, _ := sarah.AttemptDecryption(c4, s4)
_, _, m5, _ := sarah.AttemptDecryption(c5, s5)
// Now we simulate a client receiving these Messages completely out of order
timeline.Insert(m1)
timeline.Insert(m5)
timeline.Insert(m4)
timeline.Insert(m3)
timeline.Insert(m2)
for i, m := range group.GetTimeline() {
if m.Message != "Hello World "+strconv.Itoa(i+1) {
t.Fatalf("Timeline Out of Order!: %v %v", i, m)
}
t.Logf("Messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
}
// Test message by hash lookup...
hash := timeline.calculateHash(*m5)
t.Logf("Looking up %v ", hash)
for key, msgs := range timeline.hashCache {
t.Logf("%v %v", key, msgs)
}
// check a real message..
msgs, err := timeline.GetMessagesByHash(hash)
if err != nil || len(msgs) != 1 {
t.Fatalf("looking up message by hash %v should have not errored: %v", hash, err)
} else if msgs[0].Message.Message != m5.Message {
t.Fatalf("%v != %v", msgs[0].Message, m5.Message)
}
// Check a non existed hash... error if there is no error
_, err = timeline.GetMessagesByHash("not a real hash")
if err == nil {
t.Fatalf("looking up message by hash %v should have errored: %v", hash, err)
}
}

View File

@ -2,18 +2,11 @@ package model
import (
"crypto/rand"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"encoding/base32"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"io"
"path/filepath"
"strings"
"sync"
@ -127,42 +120,6 @@ func (p *Profile) AddContact(onion string, profile *PublicProfile) {
p.lock.Unlock()
}
// UpdateMessageFlags updates the flags stored with a message
func (p *Profile) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
p.lock.Lock()
defer p.lock.Unlock()
if contact, exists := p.Contacts[handle]; exists {
if len(contact.Timeline.Messages) > mIdx {
contact.Timeline.Messages[mIdx].Flags = flags
}
} else if group, exists := p.Groups[handle]; exists {
if len(group.Timeline.Messages) > mIdx {
group.Timeline.Messages[mIdx].Flags = flags
}
}
}
// DeleteContact deletes a peer contact
func (p *Profile) DeleteContact(onion string) {
p.lock.Lock()
defer p.lock.Unlock()
delete(p.Contacts, onion)
}
// DeleteGroup deletes a group
func (p *Profile) DeleteGroup(groupID string) {
p.lock.Lock()
defer p.lock.Unlock()
delete(p.Groups, groupID)
}
// RejectInvite rejects and removes a group invite
func (p *Profile) RejectInvite(groupID string) {
p.lock.Lock()
delete(p.Groups, groupID)
p.lock.Unlock()
}
// AddSentMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile.
func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt string, sent time.Time, eventID string) *Message {
p.lock.Lock()
@ -235,95 +192,6 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) int {
return -1
}
// AddGroupSentMessageError searches matching groups for the message by sig and marks it as an error
func (p *Profile) AddGroupSentMessageError(groupID string, signature []byte, error string) {
p.lock.Lock()
defer p.lock.Unlock()
group, exists := p.Groups[groupID]
if exists {
group.ErrorSentMessage(signature, error)
}
}
// AcceptInvite accepts a group invite
func (p *Profile) AcceptInvite(groupID string) (err error) {
p.lock.Lock()
defer p.lock.Unlock()
group, ok := p.Groups[groupID]
if ok {
group.Accepted = true
} else {
err = errors.New("group does not exist")
}
return
}
// GetGroups returns an unordered list of group IDs associated with this profile.
func (p *Profile) GetGroups() []string {
p.lock.Lock()
defer p.lock.Unlock()
var keys []string
for onion := range p.Groups {
keys = append(keys, onion)
}
return keys
}
// GetContacts returns an unordered list of contact onions associated with this profile.
func (p *Profile) GetContacts() []string {
p.lock.Lock()
defer p.lock.Unlock()
var keys []string
for onion := range p.Contacts {
if onion != p.Onion {
keys = append(keys, onion)
}
}
return keys
}
// SetContactAuthorization sets the authoirization level of a peer
func (p *Profile) SetContactAuthorization(onion string, auth Authorization) (err error) {
p.lock.Lock()
defer p.lock.Unlock()
contact, ok := p.Contacts[onion]
if ok {
contact.Authorization = auth
} else {
err = errors.New("peer does not exist")
}
return
}
// GetContactAuthorization returns the contact's authorization level
func (p *Profile) GetContactAuthorization(onion string) Authorization {
p.lock.Lock()
defer p.lock.Unlock()
contact, ok := p.Contacts[onion]
if ok {
return contact.Authorization
}
return AuthUnknown
}
// ContactsAuthorizations calculates a list of Peers who are at the supplied auth levels
func (p *Profile) ContactsAuthorizations(authorizationFilter ...Authorization) map[string]Authorization {
authorizations := map[string]Authorization{}
for _, contact := range p.GetContacts() {
c, _ := p.GetContact(contact)
authorizations[c.Onion] = c.Authorization
}
return authorizations
}
// GetContact returns a contact if the profile has it
func (p *Profile) GetContact(onion string) (*PublicProfile, bool) {
p.lock.Lock()
defer p.lock.Unlock()
contact, ok := p.Contacts[onion]
return contact, ok
}
// VerifyGroupMessage confirms the authenticity of a message given an sender onion, message and signature.
// The goal of this function is 2-fold:
// 1. We confirm that the sender referenced in the group text is the actual sender of the message (or at least
@ -336,28 +204,28 @@ func (p *Profile) GetContact(onion string) (*PublicProfile, bool) {
// on two different servers with the same key and then forwards messages between them to convince the parties in
// each group that they are actually in one big group (with the intent to later censor and/or selectively send messages
// to each group).
func (p *Profile) VerifyGroupMessage(onion string, groupID string, message string, signature []byte) bool {
group := p.GetGroup(groupID)
if group == nil {
return false
}
// We use our group id, a known reference server and the ciphertext of the message.
m := groupID + group.GroupServer + message
// If the message is ostensibly from us then we check it against our public key...
if onion == p.Onion {
return ed25519.Verify(p.Ed25519PublicKey, []byte(m), signature)
}
// Otherwise we derive the public key from the sender and check it against that.
decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
if err == nil && len(decodedPub) >= 32 {
return ed25519.Verify(decodedPub[:32], []byte(m), signature)
}
return false
}
//func (p *Profile) VerifyGroupMessage(onion string, groupID string, message string, signature []byte) bool {
//
// group := p.GetGroup(groupID)
// if group == nil {
// return false
// }
//
// // We use our group id, a known reference server and the ciphertext of the message.
// m := groupID + group.GroupServer + message
//
// // If the message is ostensibly from us then we check it against our public key...
// if onion == p.Onion {
// return ed25519.Verify(p.Ed25519PublicKey, []byte(m), signature)
// }
//
// // Otherwise we derive the public key from the sender and check it against that.
// decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
// if err == nil && len(decodedPub) >= 32 {
// return ed25519.Verify(decodedPub[:32], []byte(m), signature)
// }
// return false
//}
// SignMessage takes a given message and returns an Ed21159 signature
func (p *Profile) SignMessage(message string) []byte {
@ -365,170 +233,139 @@ func (p *Profile) SignMessage(message string) []byte {
return sig
}
// StartGroup when given a server, creates a new Group under this profile and returns the group id an a precomputed
// invite which can be sent on the wire.
func (p *Profile) StartGroup(server string) (groupID string, invite string, err error) {
group, err := NewGroup(server)
if err != nil {
return "", "", err
}
groupID = group.GroupID
invite, err = group.Invite()
p.lock.Lock()
defer p.lock.Unlock()
p.Groups[group.GroupID] = group
return
}
//// ProcessInvite validates a group invite and adds a new group invite to the profile if it is valid.
//// returns the new group ID on success, error on fail.
//func (p *Profile) ProcessInvite(invite string) (string, error) {
// gci, err := ValidateInvite(invite)
// if err == nil {
// if server, exists := p.GetContact(gci.ServerHost); !exists || !server.IsServer() {
// return "", fmt.Errorf("unknown server. a server key bundle needs to be imported before this group can be verified")
// }
// group := new(Group)
// group.Version = CurrentGroupVersion
// group.GroupID = gci.GroupID
// group.LocalID = GenerateRandomID()
// copy(group.GroupKey[:], gci.SharedKey[:])
// group.GroupServer = gci.ServerHost
// group.Accepted = false
// group.Attributes = make(map[string]string)
// group.Attributes[attr.GetLocalScope(constants.Name)] = gci.GroupName
// p.AddGroup(group)
// return gci.GroupID, nil
// }
// return "", err
//}
// GetGroup a pointer to a Group by the group Id, returns nil if no group found.
func (p *Profile) GetGroup(groupID string) (g *Group) {
p.lock.Lock()
defer p.lock.Unlock()
g = p.Groups[groupID]
return
}
// ProcessInvite validates a group invite and adds a new group invite to the profile if it is valid.
// returns the new group ID on success, error on fail.
func (p *Profile) ProcessInvite(invite string) (string, error) {
gci, err := ValidateInvite(invite)
if err == nil {
if server, exists := p.GetContact(gci.ServerHost); !exists || !server.IsServer() {
return "", fmt.Errorf("unknown server. a server key bundle needs to be imported before this group can be verified")
}
group := new(Group)
group.Version = CurrentGroupVersion
group.GroupID = gci.GroupID
group.LocalID = GenerateRandomID()
copy(group.GroupKey[:], gci.SharedKey[:])
group.GroupServer = gci.ServerHost
group.Accepted = false
group.Attributes = make(map[string]string)
group.Attributes[attr.GetLocalScope(constants.Name)] = gci.GroupName
p.AddGroup(group)
return gci.GroupID, nil
}
return "", err
}
// AddGroup is a convenience method for adding a group to a profile.
func (p *Profile) AddGroup(group *Group) {
p.lock.Lock()
defer p.lock.Unlock()
_, exists := p.Groups[group.GroupID]
if !exists {
p.Groups[group.GroupID] = group
}
}
// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
// If successful, adds the message to the group's timeline
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) {
for _, group := range p.Groups {
success, dgm := group.DecryptMessage(ciphertext)
if success {
// Attempt to serialize this message
serialized, err := json.Marshal(dgm)
// Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
// to verify the message, we simply ignore it.
if err != nil {
return false, group.GroupID, nil, -1
}
// This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
// be derivable from the cryptographic key) which contains many unique elements such as the time and random padding
verified := p.VerifyGroupMessage(dgm.Onion, group.GroupID, base64.StdEncoding.EncodeToString(serialized), signature)
if !verified {
// An earlier version of this protocol mistakenly signed the ciphertext of the message
// instead of the serialized decrypted group message.
// This has 2 issues:
// 1. A server with knowledge of group members public keys AND the Group ID would be able to detect valid messages
// 2. It made the metadata-security of a group dependent on keeping the cryptographically derived Group ID secret.
// While not awful, it also isn't good. For Version 3 groups only we permit Cwtch to check this older signature
// structure in a backwards compatible way for the duration of the Groups Experiment.
// TODO: Delete this check when Groups are no long Experimental
if group.Version == 3 {
verified = p.VerifyGroupMessage(dgm.Onion, group.GroupID, string(ciphertext), signature)
}
}
// So we have a message that has a valid group key, but the signature can't be verified.
// The most obvious explanation for this is that the group key has been compromised (or we are in an open group and the server is being malicious)
// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
if !verified {
group.Compromised()
return false, group.GroupID, nil, -1
}
message, index := group.AddMessage(dgm, signature)
return true, group.GroupID, message, index
}
}
// If we couldn't find a group to decrypt the message with we just return false. This is an expected case
return false, "", nil, -1
}
func getRandomness(arr *[]byte) {
if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil {
if err != nil {
// If we can't do randomness, just crash something is very very wrong and we are not going
// to resolve it here....
panic(err.Error())
}
}
}
// EncryptMessageToGroup when given a message and a group, encrypts and signs the message under the group and
// profile
func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte, []byte, error) {
if len(message) > MaxGroupMessageLength {
return nil, nil, errors.New("group message is too long")
}
group := p.GetGroup(groupID)
if group != nil {
timestamp := time.Now().Unix()
// Select the latest message from the timeline as a reference point.
var prevSig []byte
if len(group.Timeline.Messages) > 0 {
prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature
} else {
prevSig = []byte(group.GroupID)
}
lenPadding := MaxGroupMessageLength - len(message)
padding := make([]byte, lenPadding)
getRandomness(&padding)
hexGroupID, err := hex.DecodeString(group.GroupID)
if err != nil {
return nil, nil, err
}
dm := &groups.DecryptedGroupMessage{
Onion: p.Onion,
Text: message,
SignedGroupID: hexGroupID,
Timestamp: uint64(timestamp),
PreviousMessageSig: prevSig,
Padding: padding[:],
}
ciphertext, err := group.EncryptMessage(dm)
if err != nil {
return nil, nil, err
}
serialized, _ := json.Marshal(dm)
signature := p.SignMessage(groupID + group.GroupServer + base64.StdEncoding.EncodeToString(serialized))
group.AddSentMessage(dm, signature)
return ciphertext, signature, nil
}
return nil, nil, errors.New("group does not exist")
}
//
//
//// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
//// If successful, adds the message to the group's timeline
//func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) {
// for _, group := range p.Groups {
// success, dgm := group.DecryptMessage(ciphertext)
// if success {
//
// // Attempt to serialize this message
// serialized, err := json.Marshal(dgm)
//
// // Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
// // to verify the message, we simply ignore it.
// if err != nil {
// return false, group.GroupID, nil, -1
// }
//
// // This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
// // be derivable from the cryptographic key) which contains many unique elements such as the time and random padding
// verified := p.VerifyGroupMessage(dgm.Onion, group.GroupID, base64.StdEncoding.EncodeToString(serialized), signature)
//
// if !verified {
// // An earlier version of this protocol mistakenly signed the ciphertext of the message
// // instead of the serialized decrypted group message.
// // This has 2 issues:
// // 1. A server with knowledge of group members public keys AND the Group ID would be able to detect valid messages
// // 2. It made the metadata-security of a group dependent on keeping the cryptographically derived Group ID secret.
// // While not awful, it also isn't good. For Version 3 groups only we permit Cwtch to check this older signature
// // structure in a backwards compatible way for the duration of the Groups Experiment.
// // TODO: Delete this check when Groups are no long Experimental
// if group.Version == 3 {
// verified = p.VerifyGroupMessage(dgm.Onion, group.GroupID, string(ciphertext), signature)
// }
// }
//
// // So we have a message that has a valid group key, but the signature can't be verified.
// // The most obvious explanation for this is that the group key has been compromised (or we are in an open group and the server is being malicious)
// // Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
// if !verified {
// return false, group.GroupID, nil, -1
// }
// message, index := group.AddMessage(dgm, signature)
// return true, group.GroupID, message, index
// }
// }
//
// // If we couldn't find a group to decrypt the message with we just return false. This is an expected case
// return false, "", nil, -1
//}
//
//func getRandomness(arr *[]byte) {
// if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil {
// if err != nil {
// // If we can't do randomness, just crash something is very very wrong and we are not going
// // to resolve it here....
// panic(err.Error())
// }
// }
//}
//
//// EncryptMessageToGroup when given a message and a group, encrypts and signs the message under the group and
//// profile
//func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte, []byte, error) {
//
// if len(message) > MaxGroupMessageLength {
// return nil, nil, errors.New("group message is too long")
// }
//
// group := p.GetGroup(groupID)
// if group != nil {
// timestamp := time.Now().Unix()
//
// // Select the latest message from the timeline as a reference point.
// var prevSig []byte
// if len(group.Timeline.Messages) > 0 {
// prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature
// } else {
// prevSig = []byte(group.GroupID)
// }
//
// lenPadding := MaxGroupMessageLength - len(message)
// padding := make([]byte, lenPadding)
// getRandomness(&padding)
// hexGroupID, err := hex.DecodeString(group.GroupID)
// if err != nil {
// return nil, nil, err
// }
//
// dm := &groups.DecryptedGroupMessage{
// Onion: p.Onion,
// Text: message,
// SignedGroupID: hexGroupID,
// Timestamp: uint64(timestamp),
// PreviousMessageSig: prevSig,
// Padding: padding[:],
// }
//
// ciphertext, err := group.EncryptMessage(dm)
// if err != nil {
// return nil, nil, err
// }
// serialized, _ := json.Marshal(dm)
// signature := p.SignMessage(groupID + group.GroupServer + base64.StdEncoding.EncodeToString(serialized))
// group.AddSentMessage(dm, signature)
// return ciphertext, signature, nil
// }
// return nil, nil, errors.New("group does not exist")
//}
//
// GetCopy returns a full deep copy of the Profile struct and its members (timeline inclusion control by arg)
func (p *Profile) GetCopy(timeline bool) *Profile {

View File

@ -1,136 +0,0 @@
package model
import (
"testing"
)
func TestProfileIdentity(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
if alice.Contacts[sarah.Onion].Name != "Sarah" {
t.Errorf("alice should have added sarah as a contact %v", alice.Contacts)
}
if len(alice.GetContacts()) != 1 {
t.Errorf("alice should be only contact: %v", alice.GetContacts())
}
alice.SetAttribute("test", "hello world")
value, _ := alice.GetAttribute("test")
if value != "hello world" {
t.Errorf("value from custom attribute should have been 'hello world', instead was: %v", value)
}
t.Logf("%v", alice)
}
func TestTrustPeer(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")
sarah.AddContact(alice.Onion, &alice.PublicProfile)
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
alice.SetContactAuthorization(sarah.Onion, AuthApproved)
if alice.GetContactAuthorization(sarah.Onion) != AuthApproved {
t.Errorf("peer should be approved")
}
}
func TestBlockPeer(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")
sarah.AddContact(alice.Onion, &alice.PublicProfile)
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
alice.SetContactAuthorization(sarah.Onion, AuthBlocked)
if alice.GetContactAuthorization(sarah.Onion) != AuthBlocked {
t.Errorf("peer should be blocked")
}
if alice.SetContactAuthorization("", AuthUnknown) == nil {
t.Errorf("Seting Auth level of a non existent peer should error")
}
}
func TestAcceptNonExistentGroup(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
sarah.AcceptInvite("doesnotexist")
}
func TestRejectGroupInvite(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")
sarah.AddContact(alice.Onion, &alice.PublicProfile)
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
// The lightest weight server entry possible (usually we would import a key bundle...)
sarah.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}})
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(invite)
group := alice.GetGroup(gid)
if len(sarah.Groups) == 1 {
if sarah.GetGroup(group.GroupID).Accepted {
t.Errorf("Group should not be accepted")
}
sarah.RejectInvite(group.GroupID)
if len(sarah.Groups) != 0 {
t.Errorf("Group %v should have been deleted", group.GroupID)
}
return
}
t.Errorf("Group should exist in map")
}
func TestProfileGroup(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")
sarah.AddContact(alice.Onion, &alice.PublicProfile)
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
// The lightest weight server entry possible (usually we would import a key bundle...)
sarah.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}})
sarah.ProcessInvite(invite)
if len(sarah.GetGroups()) != 1 {
t.Errorf("sarah should only be in 1 group instead: %v", sarah.GetGroups())
}
group := alice.GetGroup(gid)
sarah.AcceptInvite(group.GroupID)
c, s1, _ := sarah.EncryptMessageToGroup("Hello World", group.GroupID)
alice.AttemptDecryption(c, s1)
gid2, invite2, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(invite2)
group2 := alice.GetGroup(gid2)
c2, s2, _ := sarah.EncryptMessageToGroup("Hello World", group2.GroupID)
alice.AttemptDecryption(c2, s2)
_, _, err := sarah.EncryptMessageToGroup(string(make([]byte, MaxGroupMessageLength*2)), group2.GroupID)
if err == nil {
t.Errorf("Overly long message should have returned an error")
}
bob := GenerateNewProfile("bob")
bob.AddContact(alice.Onion, &alice.PublicProfile)
// The lightest weight server entry possible (usually we would import a key bundle...)
bob.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &PublicProfile{Attributes: map[string]string{string(KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}})
bob.ProcessInvite(invite2)
c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID)
if err == nil {
ok, _, message, _ := alice.AttemptDecryption(c3, s3)
if !ok {
t.Errorf("Bobs message to the group should be decrypted %v %v", message, ok)
}
eve := GenerateNewProfile("eve")
ok, _, _, _ = eve.AttemptDecryption(c3, s3)
if ok {
t.Errorf("Eves hould not be able to decrypt Messages!")
}
} else {
t.Errorf("Bob failed to encrypt a message to the group")
}
}

View File

@ -1,6 +1,7 @@
package peer
import (
"crypto/rand"
"cwtch.im/cwtch/model/constants"
"encoding/base64"
"encoding/json"
@ -8,6 +9,8 @@ import (
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"runtime"
"strconv"
"strings"
@ -19,7 +22,6 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/protocol/files"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
)
@ -47,7 +49,6 @@ var DefaultEventsToHandle = []event.Type{
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct {
Profile *model.Profile
mutex sync.Mutex
shutdown bool
listenStatus bool
@ -61,14 +62,37 @@ type cwtchPeer struct {
// GenerateProtocolEngine
// Status: New in 1.5
func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) connections.Engine {
return connections.NewProtocolEngine(cp.GetIdentity(), cp.Profile.Ed25519PrivateKey, acn, bus, cp.Profile.ContactsAuthorizations())
}
func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
conversations, _ := cp.storage.FetchConversations()
// GetIdentity
// Status: New in 1.5
func (cp *cwtchPeer) GetIdentity() primitives.Identity {
return primitives.InitializeIdentity("", &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey)
authorizations := make(map[string]model.Authorization)
for _, conversation := range conversations {
if tor.IsValidHostname(conversation.Handle) {
if conversation.ACL[conversation.Handle].Blocked {
authorizations[conversation.Handle] = model.AuthBlocked
} else {
authorizations[conversation.Handle] = model.AuthApproved
}
}
}
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
if err != nil {
log.Errorf("error loading private key from storage")
return nil, err
}
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
if err != nil {
log.Errorf("error loading public key from storage")
return nil, err
}
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations), nil
}
// SendScopedZonedGetValToContact
@ -115,47 +139,38 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
// this function will error
// Status: TODO
func (cp *cwtchPeer) SendMessage(handle string, message string) error {
var ev event.Event
func (cp *cwtchPeer) SendMessage(conversation int, message string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
// Group Handles are always 32 bytes in length, but we forgo any further testing here
// and delegate the group existence check to EncryptMessageToGroup
if len(handle) == 32 {
cp.mutex.Lock()
defer cp.mutex.Unlock()
group := cp.Profile.GetGroup(handle)
if group == nil {
return errors.New("invalid group id")
}
// Group adds it's own sent message to timeline
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle)
//cp.mutex.Lock()
//defer cp.mutex.Unlock()
//group := cp.Profile.GetGroup(handle)
//if group == nil {
// return errors.New("invalid group id")
//}
//
//// Group adds it's own sent message to timeline
//ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle)
//
//// Group does not exist or some other unrecoverable error...
//if err != nil {
// return err
//}
//ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
// Group does not exist or some other unrecoverable error...
if err != nil {
return err
}
ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
} else if tor.IsValidHostname(handle) {
// We assume we are sending to a Contact.
contact, err := cp.FetchConversationInfo(handle)
ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message})
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
// Otherwise assume we don't log the message in the timeline...
if contact != nil && err == nil {
//ev.EventID = strconv.Itoa(contact.Timeline.Len())
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.storage.InsertMessage(contact.ID, 0, message, model.Attributes{"ack": event.False, "sent": time.Now().String()})
}
// Regardless we publish the send message to peer event for the protocol engine to execute on...
// We assume this is always successful as it is always valid to attempt to
// Contact a valid hostname
} else {
return errors.New("malformed handle type")
// We assume we are sending to a Contact.
conversationInfo, err := cp.storage.GetConversation(conversation)
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
// Otherwise assume we don't log the message in the timeline...
if conversationInfo != nil && err == nil {
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: conversationInfo.Handle, event.Data: message})
//ev.EventID = strconv.Itoa(contact.Timeline.Len())
cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{"ack": event.False, "sent": time.Now().String()})
cp.eventBus.Publish(ev)
}
cp.eventBus.Publish(ev)
return nil
}
@ -165,7 +180,7 @@ func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
log.Debugf("Updating Flags for %v %v %v", handle, mIdx, flags)
cp.Profile.UpdateMessageFlags(handle, mIdx, flags)
//cp.Profile.UpdateMessageFlags(handle, mIdx, flags)
cp.eventBus.Publish(event.NewEvent(event.UpdateMessageFlags, map[event.Field]string{event.Handle: handle, event.Index: strconv.Itoa(mIdx), event.Flags: strconv.FormatUint(flags, 2)}))
}
@ -188,12 +203,13 @@ func NewProfileWithEncryptedStorage(name string, cps *CwtchProfileStorage) Cwtch
cp.shutdown = false
cp.storage = cps
cp.state = make(map[string]connections.ConnectionState)
cp.Profile = model.GenerateNewProfile(name)
pub, priv, _ := ed25519.GenerateKey(rand.Reader)
// Store all the Necessary Base Attributes In The Database
cp.storage.StoreProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)).ToString(), []byte(name))
cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", cp.Profile.Ed25519PrivateKey)
cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", cp.Profile.Ed25519PublicKey)
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
cp.storage.StoreProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString(), []byte(tor.GetTorV3Hostname(pub)))
cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", priv)
cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", pub)
return cp
}
@ -211,14 +227,13 @@ func FromEncryptedStorage(cps *CwtchProfileStorage) CwtchPeer {
// Deprecated - Only to be used for importing new profiles
func FromProfile(profile *model.Profile, cps *CwtchProfileStorage) CwtchPeer {
cp := new(cwtchPeer)
cp.Profile = profile
cp.shutdown = false
cp.storage = cps
// Store all the Necessary Base Attributes In The Database
cp.storage.StoreProfileKeyValue(TypeAttribute, "public.profile.name", []byte(profile.Name))
cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", cp.Profile.Ed25519PrivateKey)
cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", cp.Profile.Ed25519PublicKey)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, profile.Name)
cp.storage.StoreProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey", profile.Ed25519PrivateKey)
cp.storage.StoreProfileKeyValue(TypePublicKey, "Ed25519PublicKey", profile.Ed25519PublicKey)
return cp
}
@ -232,30 +247,31 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) {
// It would be nice to do these checks in the storage engine itself, but it is easier to do them here
// rather than duplicating the logic to construct/reconstruct attributes in storage engine...
// TODO: Remove these checks after Cwtch ~1.5 storage engine is implemented
if _, exists := cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name); !exists {
// If public.profile.name does not exist, and we have an existing public.name then:
// set public.profile.name from public.name
// set local.profile.name from public.name
if name, exists := cp.Profile.GetAttribute(attr.GetPublicScope(constants.Name)); exists {
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
} else {
// Otherwise check if local.name exists and set it from that
// If not, then check the very old unzoned, unscoped name.
// If not, then set directly from Profile.Name...
if name, exists := cp.Profile.GetAttribute(attr.GetLocalScope(constants.Name)); exists {
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
} else if name, exists := cp.Profile.GetAttribute(constants.Name); exists {
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
} else {
// Profile.Name is very deprecated at this point...
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, cp.Profile.Name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, cp.Profile.Name)
}
}
}
// TODO: Move this to import script
//if _, exists := cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name); !exists {
// // If public.profile.name does not exist, and we have an existing public.name then:
// // set public.profile.name from public.name
// // set local.profile.name from public.name
// if name, exists := cp.GetAttribute(attr.GetPublicScope(constants.Name)); exists {
// cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
// cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
// } else {
// // Otherwise check if local.name exists and set it from that
// // If not, then check the very old unzoned, unscoped name.
// // If not, then set directly from Profile.Name...
// if name, exists := cp.Profile.GetAttribute(attr.GetLocalScope(constants.Name)); exists {
// cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
// cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
// } else if name, exists := cp.Profile.GetAttribute(constants.Name); exists {
// cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
// cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
// } else {
// // Profile.Name is very deprecated at this point...
// cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, cp.Profile.Name)
// cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, cp.Profile.Name)
// }
// }
//}
// At this point we can safely assume that public.profile.name exists
localName, _ := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
@ -271,14 +287,14 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) {
// profile-> name - and remove all name processing code from libcwtch-go.
// If local.profile.tag does not exist then set it from deprecated GetAttribute
if _, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag); !exists {
if tag, exists := cp.Profile.GetAttribute(constants.Tag); exists {
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
} else {
// Assume a default password, which will allow the older profile to have it's password reset by the UI
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, constants.ProfileTypeV1DefaultPassword)
}
}
//if _, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag); !exists {
// if tag, exists := cp.Profile.GetAttribute(constants.Tag); exists {
// cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
// } else {
// // Assume a default password, which will allow the older profile to have it's password reset by the UI
// cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, constants.ProfileTypeV1DefaultPassword)
// }
//}
}
// InitForEvents
@ -305,20 +321,20 @@ func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) {
// ImportGroup initializes a group from an imported source rather than a peer invite
// Status: TODO
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (string, error) {
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
gid, err := cp.Profile.ProcessInvite(exportedInvite)
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: gid, event.GroupInvite: exportedInvite}))
}
return gid, err
// //gid, err := model.ProcessInvite(exportedInvite)
//
// if err == nil {
// cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: gid, event.GroupInvite: exportedInvite}))
// }
//
return -1, nil
}
// NewContactConversation create a new p2p conversation with the given acl applied to the handle.
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) error {
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
@ -338,6 +354,12 @@ func (cp *cwtchPeer) FetchConversations() ([]*model.Conversation, error) {
return cp.storage.FetchConversations()
}
func (cp *cwtchPeer) GetConversationInfo(conversation int) (*model.Conversation, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.storage.GetConversation(conversation)
}
// FetchConversationInfo returns information about the given conversation referenced by the handle
func (cp *cwtchPeer) FetchConversationInfo(handle string) (*model.Conversation, error) {
cp.mutex.Lock()
@ -380,130 +402,84 @@ func (cp *cwtchPeer) GetChannelMessage(conversation int, channel int, id int) (s
return cp.storage.GetChannelMessage(conversation, channel, id)
}
// ExportGroup serializes a group invite so it can be given offline
// Status: TODO
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
group := cp.Profile.GetGroup(groupID)
if group != nil {
return group.Invite()
}
return "", errors.New("group id could not be found")
}
// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
// Status: TODO
func (cp *cwtchPeer) StartGroup(server string) (string, string, error) {
cp.mutex.Lock()
groupID, invite, err := cp.Profile.StartGroup(server)
cp.mutex.Unlock()
// Status: TODO change server handle to conversation id...?
func (cp *cwtchPeer) StartGroup(server string) (int, error) {
group, err := model.NewGroup(server)
if err == nil {
group := cp.GetGroup(groupID)
jsobj, err := json.Marshal(group)
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.GroupID: groupID,
event.GroupServer: group.GroupServer,
event.GroupInvite: invite,
// Needed for Storage Engine...
event.Data: string(jsobj),
}))
conversationID, err := cp.NewContactConversation(group.GroupID, model.DefaultP2PAccessControl(), true)
if err != nil {
return -1, err
}
} else {
log.Errorf("error creating group: %v", err)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), group.GroupID)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:]))
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.GroupID: group.GroupID,
event.GroupServer: group.GroupServer,
}))
return conversationID, nil
}
return groupID, invite, err
}
// GetGroups returns an unordered list of all group IDs.
// Status: TODO
func (cp *cwtchPeer) GetGroups() []string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.GetGroups()
}
// GetGroup returns a pointer to a specific group, nil if no group exists.
// Status: TODO
func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.GetGroup(groupID)
log.Errorf("error creating group: %v", err)
return -1, err
}
// AddServer takes in a serialized server specification (a bundle of related keys) and adds a contact for the
// server assuming there are no errors and the contact doesn't already exist.
// TODO in the future this function should also integrate with a trust provider to validate the key bundle.
// Status: TODO
// Status: Ready for 1.5
func (cp *cwtchPeer) AddServer(serverSpecification string) error {
//// This confirms that the server did at least sign the bundle
//keyBundle, err := model.DeserializeAndVerify([]byte(serverSpecification))
//if err != nil {
// return err
//}
//log.Debugf("Got new key bundle %v", keyBundle.Serialize())
//
//// TODO if the key bundle is incomplete then error out. In the future we may allow servers to attest to new
//// keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups
//// (that way we can be assured that the keybundle we store is a valid one)
//if !keyBundle.HasKeyType(model.KeyTypeTokenOnion) || !keyBundle.HasKeyType(model.KeyTypeServerOnion) || !keyBundle.HasKeyType(model.KeyTypePrivacyPass) {
// return errors.New("keybundle is incomplete")
//}
//
//if keyBundle.HasKeyType(model.KeyTypeServerOnion) {
// onionKey, _ := keyBundle.GetKey(model.KeyTypeServerOnion)
// onion := string(onionKey)
//
// // Add the contact if we don't already have it
// if cp.GetContact(onion) == nil {
// decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
// ab := keyBundle.AttributeBundle()
// pp := &model.PublicProfile{Name: onion, Ed25519PublicKey: decodedPub, Authorization: model.AuthUnknown, Onion: onion, Attributes: ab}
//
// // The only part of this function that actually modifies the profile...
// cp.mutex.Lock()
// cp.Profile.AddContact(onion, pp)
// cp.mutex.Unlock()
//
// pd, _ := json.Marshal(pp)
//
// // Sync the Storage Engine
// cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
// event.Data: string(pd),
// event.RemotePeer: onion,
// }))
// }
//
// // At this point we know the server exists
// server := cp.GetContact(onion)
// ab := keyBundle.AttributeBundle()
//
// // Check server bundle for consistency if we have different keys stored than in the tofu bundle then we
// // abort...
// for k, v := range ab {
// val, exists := server.GetAttribute(k)
// if exists {
// if val != v {
// // this is inconsistent!
// return model.InconsistentKeyBundleError
// }
// }
// // we haven't seen this key associated with the server before
// }
//
// // Store the key bundle for the server so we can reconstruct a tofubundle invite
// cp.SetContactAttribute(onion, string(model.BundleType), serverSpecification)
//
// // If we have gotten to this point we can assume this is a safe key bundle signed by the
// // server with no conflicting keys. So we are going to publish all the keys
// for k, v := range ab {
// log.Debugf("Server (%v) has %v key %v", onion, k, v)
// cp.SetContactAttribute(onion, k, v)
// }
//
// return nil
//}
// This confirms that the server did at least sign the bundle
keyBundle, err := model.DeserializeAndVerify([]byte(serverSpecification))
if err != nil {
return err
}
log.Debugf("Got new key bundle %v", keyBundle.Serialize())
// TODO if the key bundle is incomplete then error out. In the future we may allow servers to attest to new
// keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups
// (that way we can be assured that the keybundle we store is a valid one)
if !keyBundle.HasKeyType(model.KeyTypeTokenOnion) || !keyBundle.HasKeyType(model.KeyTypeServerOnion) || !keyBundle.HasKeyType(model.KeyTypePrivacyPass) {
return errors.New("keybundle is incomplete")
}
if keyBundle.HasKeyType(model.KeyTypeServerOnion) {
onionKey, _ := keyBundle.GetKey(model.KeyTypeServerOnion)
onion := string(onionKey)
// Add the contact if we don't already have it
conversationInfo, _ := cp.FetchConversationInfo(onion)
if conversationInfo == nil {
cp.NewContactConversation(onion, model.DefaultP2PAccessControl(), true)
}
conversationInfo, err = cp.FetchConversationInfo(onion)
if conversationInfo != nil && err == nil {
ab := keyBundle.AttributeBundle()
for k, v := range ab {
val, exists := conversationInfo.Attributes[k]
if exists {
if val != v {
// this is inconsistent!
return model.InconsistentKeyBundleError
}
}
// we haven't seen this key associated with the server before
}
// // If we have gotten to this point we can assume this is a safe key bundle signed by the
// // server with no conflicting keys. So we are going to save all the keys
for k, v := range ab {
cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(k)), v)
}
cp.SetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))), serverSpecification)
} else {
return err
}
return nil
}
return nil
}
@ -519,31 +495,21 @@ func (cp *cwtchPeer) GetServers() []string {
func (cp *cwtchPeer) GetOnion() string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.Onion
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
return string(onion)
}
// GetPeerState
// Status: TODO
func (cp *cwtchPeer) GetPeerState(onion string) (connections.ConnectionState, bool) {
// Status: Ready for 1.5
func (cp *cwtchPeer) GetPeerState(handle string) (connections.ConnectionState, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if state, ok := cp.state[onion]; ok {
if state, ok := cp.state[handle]; ok {
return state, ok
}
return connections.DISCONNECTED, false
}
// GetGroupState
// Status: TODO
func (cp *cwtchPeer) GetGroupState(groupid string) (connections.ConnectionState, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if group, ok := cp.Profile.Groups[groupid]; ok {
return connections.ConnectionStateToType()[group.State], true
}
return connections.DISCONNECTED, false
}
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
// address.
func (cp *cwtchPeer) PeerWithOnion(onion string) {
@ -552,19 +518,81 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
// InviteOnionToGroup kicks off the invite process
// Status: TODO
func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
cp.mutex.Lock()
group := cp.Profile.GetGroup(groupid)
if group == nil {
cp.mutex.Unlock()
return errors.New("invalid group id")
func (cp *cwtchPeer) SendInviteToConversation(conversationID int, inviteConversationID int) error {
var invite model.MessageWrapper
conversationInfo, err := cp.GetConversationInfo(inviteConversationID)
if conversationInfo != nil || err != nil {
return err
}
invite, err := group.Invite()
cp.mutex.Unlock()
if err == nil {
err = cp.SendMessage(onion, invite)
// groupServer, isGroup := conversationInfo.Attributes[event.GroupServer]; isGroup {
//bundle, _ := cp.Get(profile.GetGroup(target).GroupServer).GetAttribute(string(model.BundleType))
//inviteStr, err := profile.GetGroup(target).Invite()
//if err == nil {
// invite = model.MessageWrapper{Overlay: 101, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), inviteStr)}
//}
if tor.IsValidHostname(conversationInfo.Handle) {
invite = model.MessageWrapper{Overlay: 100, Data: conversationInfo.Handle}
} else {
// Reconstruct Group
groupID, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]
if !ok {
return errors.New("group structure is malformed")
}
groupServer, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()]
if !ok {
return errors.New("group structure is malformed")
}
groupKeyBase64, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)).ToString()]
if !ok {
return errors.New("group structure is malformed")
}
groupName, ok := conversationInfo.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.Name)).ToString()]
if !ok {
return errors.New("group structure is malformed")
}
groupKey, err := base64.StdEncoding.DecodeString(groupKeyBase64)
if err != nil {
return errors.New("malformed group key")
}
var groupKeyFixed = [32]byte{}
copy(groupKeyFixed[:], groupKey[:])
group := model.Group{
GroupID: groupID,
GroupKey: groupKeyFixed,
GroupServer: groupServer,
}
groupInvite, err := group.Invite(groupName)
if !ok {
return errors.New("group invite is malformed")
}
serverInfo, err := cp.FetchConversationInfo(groupServer)
if err != nil {
return errors.New("unknown server associated with group")
}
bundle, exists := serverInfo.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.BundleType))).ToString()]
if !exists {
return errors.New("server bundle not found")
}
invite = model.MessageWrapper{Overlay: 101, Data: fmt.Sprintf("tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString([]byte(bundle)), groupInvite)}
}
return err
inviteBytes, err := json.Marshal(invite)
if err != nil {
log.Errorf("malformed invite: %v", err)
} else {
cp.SendMessage(conversationID, string(inviteBytes))
}
return nil
}
// JoinServer manages a new server connection with the given onion address
@ -615,7 +643,8 @@ func (cp *cwtchPeer) Listen() {
if !cp.listenStatus {
log.Infof("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.listenStatus = true
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion}))
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: string(onion)}))
}
// else protocol engine is already listening
@ -662,11 +691,11 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
// TOOD maybe atomize this?
ci, err := cp.FetchConversationInfo(handle)
if err != nil {
err := cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false)
id, err := cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false)
if err != nil {
return err
}
ci, err = cp.FetchConversationInfo(handle)
ci, err = cp.GetConversationInfo(id)
if err != nil {
return err
}
@ -691,13 +720,13 @@ func (cp *cwtchPeer) eventHandler() {
case event.ProtocolEngineStopped:
cp.mutex.Lock()
cp.listenStatus = false
log.Infof("Protocol engine for %v has stopped listening", cp.Profile.Onion)
log.Infof("Protocol engine for %v has stopped listening", cp.GetOnion())
cp.mutex.Unlock()
case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
//ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
//signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
// SECURITY NOTE: A malicious server could insert posts such that everyone always has a different lastKnownSignature
// However the server can always replace **all** messages in an attempt to track users
@ -708,42 +737,35 @@ func (cp *cwtchPeer) eventHandler() {
//cp.SetConversationAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
cp.mutex.Lock()
ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature)
//ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature)
cp.mutex.Unlock()
if ok && index > -1 {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)}))
}
// The group has been compromised
if !ok && groupID != "" {
if cp.Profile.GetGroup(groupID).IsCompromised {
cp.eventBus.Publish(event.NewEvent(event.GroupCompromised, map[event.Field]string{event.GroupID: groupID}))
}
}
//if ok && index > -1 {
// cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)}))
//}
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
case event.PeerAcknowledgement:
cp.mutex.Lock()
idx := cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
edata := ev.Data
edata[event.Index] = strconv.Itoa(idx)
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, edata))
//idx := cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
//edata := ev.Data
//edata[event.Index] = strconv.Itoa(idx)
//cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, edata))
cp.mutex.Unlock()
case event.SendMessageToGroupError:
cp.mutex.Lock()
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupID], signature, ev.Data[event.Error])
//signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
//cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupID], signature, ev.Data[event.Error])
cp.mutex.Unlock()
case event.SendMessageToPeerError:
cp.mutex.Lock()
idx := cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
edata := ev.Data
edata[event.Index] = strconv.Itoa(idx)
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, edata))
//idx := cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
//edata := ev.Data
//edata[event.Index] = strconv.Itoa(idx)
//cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, edata))
cp.mutex.Unlock()
case event.RetryServerRequest:
// Automated Join Server Request triggered by a plugin.

View File

@ -48,6 +48,7 @@ type CwtchProfileStorage struct {
db *sql.DB
}
// ChannelID encapsulates the data necessary to reference a channel structure.
type ChannelID struct {
Conversation int
Channel int
@ -185,35 +186,51 @@ func (cps *CwtchProfileStorage) LoadProfileKeyValue(keyType StorageKeyType, key
}
// NewConversation stores a new conversation in the data store
func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) error {
func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.Attributes, acl model.AccessControlList, accepted bool) (int, error) {
tx, err := cps.db.Begin()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return err
return -1, err
}
result, err := tx.Stmt(cps.insertConversationStmt).Exec(handle, attributes.Serialize(), acl.Serialize(), accepted)
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
return -1, tx.Rollback()
}
id, err := result.LastInsertId()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
return -1, tx.Rollback()
}
_, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id))
result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
return -1, tx.Rollback()
}
return tx.Commit()
conversationID, err := result.LastInsertId()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
}
err = tx.Commit()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
}
return int(conversationID), nil
}
// GetConversationByHandle is a convienance method to fetch an active conversation by a handle
// Usage Notes: This should **only** be used to look up p2p conversations by convention.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently
// unavoidable in some circumstances because the event bus references conversations by handle, not by id)
func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.Conversation, error) {
rows, err := cps.selectConversationByHandleStmt.Query(handle)
if err != nil {
@ -242,6 +259,9 @@ func (cps *CwtchProfileStorage) GetConversationByHandle(handle string) (*model.C
return &model.Conversation{ID: id, Handle: handle, ACL: model.DeserializeAccessControlList(acl), Attributes: model.DeserializeAttributes(attributes), Accepted: accepted}, nil
}
// FetchConversations returns *all* active conversations. This method should only be called
// on app start up to build a summary of conversations for the UI. Any further updates should be integrated
// through the event bus.
func (cps *CwtchProfileStorage) FetchConversations() ([]*model.Conversation, error) {
rows, err := cps.fetchAllConversationsStmt.Query()
if err != nil {
@ -324,6 +344,7 @@ func (cps *CwtchProfileStorage) DeleteConversation(id int) error {
return nil
}
// SetConversationAttribute sets a new attribute on a given conversation.
func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.ScopedZonedPath, value string) error {
ci, err := cps.GetConversation(id)
if err != nil {
@ -338,6 +359,7 @@ func (cps *CwtchProfileStorage) SetConversationAttribute(id int, path attr.Scope
return nil
}
// InsertMessage appends a message to a conversation channel, with a given set of attributes
func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, body string, attributes model.Attributes) error {
channelID := ChannelID{Conversation: conversation, Channel: channel}
@ -361,6 +383,8 @@ func (cps *CwtchProfileStorage) InsertMessage(conversation int, channel int, bod
return nil
}
// GetChannelMessage looks up a channel message by conversation, channel and message id. On success it
// returns the message body and the attributes associated with the message. Otherwise an error is returned.
func (cps *CwtchProfileStorage) GetChannelMessage(conversation int, channel int, messageID int) (string, model.Attributes, error) {
channelID := ChannelID{Conversation: conversation, Channel: channel}

View File

@ -5,7 +5,6 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/openprivacy/connectivity"
)
@ -33,18 +32,10 @@ type ReadServers interface {
GetServers() []string
}
// ReadGroups provides read-only access to group state
type ReadGroups interface {
GetGroup(string) *model.Group
GetGroupState(string) (connections.ConnectionState, bool)
GetGroups() []string
ExportGroup(string) (string, error)
}
// ModifyGroups provides write-only access add/edit/remove new groups
type ModifyGroups interface {
ImportGroup(string) (string, error)
StartGroup(string) (string, string, error)
ImportGroup(string) (int, error)
StartGroup(string) (int, error)
}
// ModifyServers provides write-only access to servers
@ -55,16 +46,9 @@ type ModifyServers interface {
// SendMessages enables a caller to sender messages to a contact
type SendMessages interface {
SendMessage(handle string, message string) error
// Deprecated: is unsafe
SendGetValToPeer(string, string, string)
SendMessage(conversation int, message string) error
SendInviteToConversation(conversationID int, inviteConversationID int) error
SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string)
// TODO
// Deprecated use overlays instead
InviteOnionToGroup(string, string) error
}
// ModifyMessages enables a caller to modify the messages in a timeline
@ -79,8 +63,8 @@ type CwtchPeer interface {
// Core Cwtch Peer Functions that should not be exposed to
// most functions
Init(event.Manager)
GetIdentity() primitives.Identity
GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) connections.Engine
GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error)
AutoHandleEvents(events []event.Type)
Listen()
@ -105,7 +89,6 @@ type CwtchPeer interface {
AccessPeeringState
ModifyPeeringState
ReadGroups
ModifyGroups
ReadServers
@ -115,8 +98,9 @@ type CwtchPeer interface {
ModifyMessages
// New Unified Conversation Interfaces
NewContactConversation(handle string, acl model.AccessControl, accepted bool) error
NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error)
FetchConversations() ([]*model.Conversation, error)
GetConversationInfo(conversation int) (*model.Conversation, error)
FetchConversationInfo(handle string) (*model.Conversation, error)
AcceptConversation(conversation int) error
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error

View File

@ -3,12 +3,7 @@ package storage
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/storage/v0"
"cwtch.im/cwtch/storage/v1"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
"path"
"strconv"
)
const profileFilename = "profile"
@ -17,11 +12,8 @@ const currentVersion = 1
// ProfileStore is an interface to managing the storage of Cwtch Profiles
type ProfileStore interface {
Shutdown()
Delete()
GetProfileCopy(timeline bool) *model.Profile
GetNewPeerMessage() *event.Event
GetStatusMessages() []*event.Event
CheckPassword(string) bool
}
@ -34,8 +26,6 @@ func CreateProfileWriterStore(eventManager event.Manager, directory, password st
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (ProfileStore, error) {
versionCheckUpgrade(directory, password)
return v1.LoadProfileWriterStore(eventManager, directory, password)
}
@ -53,42 +43,3 @@ func NewProfile(name string) *model.Profile {
}
// ********* Versioning and upgrade **********
func detectVersion(directory string) int {
vnumberStr, err := ioutil.ReadFile(path.Join(directory, versionFile))
if err != nil {
return 0
}
vnumber, err := strconv.Atoi(string(vnumberStr))
if err != nil {
log.Errorf("Could not parse VERSION file contents: '%v' - %v\n", vnumber, err)
return -1
}
return vnumber
}
func upgradeV0ToV1(directory, password string) error {
log.Debugln("Attempting storage v0 to v1: Reading v0 profile...")
profile, err := v0.ReadProfile(directory, password)
if err != nil {
return err
}
log.Debugln("Attempting storage v0 to v1: Writing v1 profile...")
return v1.UpgradeV0Profile(profile, directory, password)
}
func versionCheckUpgrade(directory, password string) {
version := detectVersion(directory)
log.Debugf("versionCheck: %v\n", version)
if version == -1 {
return
}
if version == 0 {
err := upgradeV0ToV1(directory, password)
if err != nil {
return
}
//version = 1
}
}

View File

@ -1,76 +0,0 @@
// Known race issue with event bus channel closure
package storage
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/storage/v0"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"os"
"testing"
"time"
)
const testingDir = "./testing"
const filenameBase = "testStream"
const password = "asdfqwer"
const line1 = "Hello from storage!"
const testProfileName = "Alice"
const testKey = "key"
const testVal = "value"
const testInitialMessage = "howdy"
const testMessage = "Hello from storage"
func TestProfileStoreUpgradeV0toV1(t *testing.T) {
log.SetLevel(log.LevelDebug)
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
queue := event.NewQueue()
eventBus.Subscribe(event.ChangePasswordSuccess, queue)
fmt.Println("Creating and initializing v0 profile and store...")
profile := NewProfile(testProfileName)
profile.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &model.PublicProfile{Attributes: map[string]string{string(model.KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}})
ps1 := v0.NewProfileWriterStore(eventBus, testingDir, password, profile)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
ps1.AddGroup(invite)
fmt.Println("Sending 200 messages...")
for i := 0; i < 200; i++ {
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage, []byte{byte(i)})
}
fmt.Println("Shutdown v0 profile store...")
ps1.Shutdown()
fmt.Println("New v1 Profile store...")
ps2, err := LoadProfileWriterStore(eventBus, testingDir, password)
if err != nil {
t.Errorf("Error createing new profileStore with new password: %v\n", err)
return
}
profile2 := ps2.GetProfileCopy(true)
if profile2.Groups[groupid] == nil {
t.Errorf("Failed to load group %v\n", groupid)
return
}
if len(profile2.Groups[groupid].Timeline.Messages) != 200 {
t.Errorf("Failed to load group's 200 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
}
}

View File

@ -1,70 +0,0 @@
package v0
import (
"crypto/rand"
"errors"
"git.openprivacy.ca/openprivacy/log"
"golang.org/x/crypto/nacl/secretbox"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/crypto/sha3"
"io"
"io/ioutil"
"path"
)
// createKey derives a key from a password
func createKey(password string) ([32]byte, [128]byte, error) {
var salt [128]byte
if _, err := io.ReadFull(rand.Reader, salt[:]); err != nil {
log.Errorf("Cannot read from random: %v\n", err)
return [32]byte{}, salt, err
}
dk := pbkdf2.Key([]byte(password), salt[:], 4096, 32, sha3.New512)
var dkr [32]byte
copy(dkr[:], dk)
return dkr, salt, nil
}
//encryptFileData encrypts the cwtchPeer via the specified key.
func encryptFileData(data []byte, key [32]byte) ([]byte, error) {
var nonce [24]byte
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
log.Errorf("Cannot read from random: %v\n", err)
return nil, err
}
encrypted := secretbox.Seal(nonce[:], data, &nonce, &key)
return encrypted, nil
}
//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key.
func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
var decryptNonce [24]byte
copy(decryptNonce[:], ciphertext[:24])
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key)
if ok {
return decrypted, nil
}
return nil, errors.New("Failed to decrypt")
}
// Load instantiates a cwtchPeer from the file store
func readEncryptedFile(directory, filename, password string) ([]byte, error) {
encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename))
if err == nil && len(encryptedbytes) > 128 {
var dkr [32]byte
//Separate the salt from the encrypted bytes, then generate the derived key
salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:]
dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512)
copy(dkr[:], dk)
data, err := decryptFile(encryptedbytes, dkr)
if err == nil {
return data, nil
}
return nil, err
}
return nil, err
}

View File

@ -1,46 +0,0 @@
package v0
import (
"io/ioutil"
"path"
)
// fileStore stores a cwtchPeer in an encrypted file
type fileStore struct {
directory string
filename string
password string
}
// FileStore is a primitive around storing encrypted files
type FileStore interface {
Read() ([]byte, error)
Write(data []byte) error
}
// NewFileStore instantiates a fileStore given a filename and a password
func NewFileStore(directory string, filename string, password string) FileStore {
filestore := new(fileStore)
filestore.password = password
filestore.filename = filename
filestore.directory = directory
return filestore
}
func (fps *fileStore) Read() ([]byte, error) {
return readEncryptedFile(fps.directory, fps.filename, fps.password)
}
// write serializes a cwtchPeer to a file
func (fps *fileStore) Write(data []byte) error {
key, salt, _ := createKey(fps.password)
encryptedbytes, err := encryptFileData(data, key)
if err != nil {
return err
}
// the salt for the derived key is appended to the front of the file
encryptedbytes = append(salt[:], encryptedbytes...)
err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600)
return err
}

View File

@ -1,120 +0,0 @@
package v0
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"encoding/json"
"fmt"
"os"
"time"
)
const groupIDLen = 32
const peerIDLen = 56
const profileFilename = "profile"
// ProfileStoreV0 is a legacy profile store used now for upgrading legacy profile stores to newer versions
type ProfileStoreV0 struct {
fs FileStore
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
directory string
password string
profile *model.Profile
}
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV0 {
os.Mkdir(directory, 0700)
ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, streamStores: map[string]StreamStore{}}
if profile != nil {
ps.save()
}
return ps
}
// ReadProfile reads a profile from storqage and returns the profile
// directory should be $appDir/profiles/$rand
func ReadProfile(directory, password string) (*model.Profile, error) {
os.Mkdir(directory, 0700)
ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, streamStores: map[string]StreamStore{}}
err := ps.Load()
if err != nil {
return nil, err
}
profile := ps.getProfileCopy(true)
return profile, nil
}
/********************************************************************************************/
// AddGroup For testing, adds a group to the profile (and starts a stream store)
func (ps *ProfileStoreV0) AddGroup(invite string) {
gid, err := ps.profile.ProcessInvite(invite)
if err == nil {
ps.save()
group := ps.profile.Groups[gid]
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
}
}
// AddGroupMessage for testing, adds a group message
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string, signature []byte) {
received, _ := time.Parse(time.RFC3339Nano, timeRecvied)
sent, _ := time.Parse(time.RFC3339Nano, timeSent)
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: signature, PreviousMessageSig: []byte("PreviousSignature")}
ss, exists := ps.streamStores[groupid]
if exists {
ss.Write(message)
} else {
fmt.Println("ERROR")
}
}
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
func (ps *ProfileStoreV0) GetNewPeerMessage() *event.Event {
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
return &message
}
// Load instantiates a cwtchPeer from the file store
func (ps *ProfileStoreV0) Load() error {
decrypted, err := ps.fs.Read()
if err != nil {
return err
}
cp := new(model.Profile)
err = json.Unmarshal(decrypted, &cp)
if err == nil {
ps.profile = cp
for gid, group := range cp.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
cp.Groups[gid].Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
}
}
return err
}
func (ps *ProfileStoreV0) getProfileCopy(timeline bool) *model.Profile {
return ps.profile.GetCopy(timeline)
}
// Shutdown saves the storage system
func (ps *ProfileStoreV0) Shutdown() {
ps.save()
}
/************* Writing *************/
func (ps *ProfileStoreV0) save() error {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Write(bytes)
}

View File

@ -1,70 +0,0 @@
// Known race issue with event bus channel closure
package v0
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"log"
"os"
"testing"
"time"
)
const testProfileName = "Alice"
const testKey = "key"
const testVal = "value"
const testInitialMessage = "howdy"
const testMessage = "Hello from storage"
// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
return profile
}
func TestProfileStoreWriteRead(t *testing.T) {
log.Println("profile store test!")
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
profile := NewProfile(testProfileName)
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
profile.SetAttribute(testKey, testVal)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
ps1.AddGroup(invite)
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage, []byte{byte(0x01)})
ps1.Shutdown()
ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil)
err = ps2.Load()
if err != nil {
t.Errorf("Error createing ProfileStoreV0: %v\n", err)
}
profile = ps2.getProfileCopy(true)
if profile.Name != testProfileName {
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
}
v, _ := profile.GetAttribute(testKey)
if v != testVal {
t.Errorf("Profile attribute '%v' incorrect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
}
group2 := ps2.getProfileCopy(true).Groups[groupid]
if group2 == nil {
t.Errorf("Group not loaded\n")
}
}

View File

@ -1,145 +0,0 @@
package v0
import (
"cwtch.im/cwtch/model"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
"os"
"path"
"sync"
)
const (
fileStorePartitions = 16
bytesPerFile = 15 * 1024
)
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
type streamStore struct {
password string
storeDirectory string
filenameBase string
lock sync.Mutex
// Buffer is used just for current file to write to
messages []model.Message
bufferByteCount int
}
// StreamStore provides a stream like interface to encrypted storage
type StreamStore interface {
Read() []model.Message
Write(m model.Message)
}
// NewStreamStore returns an initialized StreamStore ready for reading and writing
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
os.Mkdir(ss.storeDirectory, 0700)
ss.initBuffer()
return ss
}
// Read returns all messages from the backing file (not the buffer, for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
resp := []model.Message{}
for i := fileStorePartitions - 1; i >= 0; i-- {
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
if err != nil {
continue
}
msgs := []model.Message{}
json.Unmarshal([]byte(bytes), &msgs)
resp = append(resp, msgs...)
}
// 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
resp[i].Acknowledged = true
resp[i].ReceivedByServer = true
}
return resp
}
// ****** Writing *******/
func (ss *streamStore) WriteN(messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
for _, m := range messages {
ss.updateBuffer(m)
if ss.bufferByteCount > bytesPerFile {
ss.updateFile()
log.Debugf("rotating log file")
ss.rotateFileStore()
ss.initBuffer()
}
}
}
// Write adds a GroupMessage to the store
func (ss *streamStore) Write(m model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.updateBuffer(m)
ss.updateFile()
if ss.bufferByteCount > bytesPerFile {
log.Debugf("rotating log file")
ss.rotateFileStore()
ss.initBuffer()
}
}
func (ss *streamStore) initBuffer() {
ss.messages = []model.Message{}
ss.bufferByteCount = 0
}
func (ss *streamStore) updateBuffer(m model.Message) {
ss.messages = append(ss.messages, m)
ss.bufferByteCount += (104 * 1.5) + len(m.Message)
}
func (ss *streamStore) updateFile() error {
msgs, err := json.Marshal(ss.messages)
if err != nil {
log.Errorf("Failed to marshal group messages %v\n", err)
}
// ENCRYPT
key, salt, _ := createKey(ss.password)
encryptedMsgs, err := encryptFileData(msgs, key)
if err != nil {
log.Errorf("Failed to encrypt messages: %v\n", err)
return err
}
encryptedMsgs = append(salt[:], encryptedMsgs...)
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
return nil
}
func (ss *streamStore) rotateFileStore() {
os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))
for i := fileStorePartitions - 2; i >= 0; i-- {
os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
}
}

View File

@ -1,50 +0,0 @@
package v0
import (
"cwtch.im/cwtch/model"
"os"
"testing"
)
const testingDir = "./testing"
const filenameBase = "testStream"
const password = "asdfqwer"
const line1 = "Hello from storage!"
func TestStreamStoreWriteRead(t *testing.T) {
os.Remove(".test.json")
os.RemoveAll(testingDir)
os.Mkdir(testingDir, 0777)
ss1 := NewStreamStore(testingDir, filenameBase, password)
m := model.Message{Message: line1}
ss1.Write(m)
ss2 := NewStreamStore(testingDir, filenameBase, password)
messages := ss2.Read()
if len(messages) != 1 {
t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages))
}
if messages[0].Message != line1 {
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
}
}
func TestStreamStoreWriteReadRotate(t *testing.T) {
os.Remove(".test.json")
os.RemoveAll(testingDir)
os.Mkdir(testingDir, 0777)
ss1 := NewStreamStore(testingDir, filenameBase, password)
m := model.Message{Message: line1}
for i := 0; i < 400; i++ {
ss1.Write(m)
}
ss2 := NewStreamStore(testingDir, filenameBase, password)
messages := ss2.Read()
if len(messages) != 400 {
t.Errorf("Read messages has wrong length. Expected: 400 Actual: %d\n", len(messages))
}
if messages[0].Message != line1 {
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
}
}

View File

@ -3,18 +3,13 @@ package v1
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"encoding/base64"
"encoding/json"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
"os"
"path"
"strconv"
"time"
)
const groupIDLen = 32
const peerIDLen = 56
const profileFilename = "profile"
const version = "1"
const versionFile = "VERSION"
@ -22,15 +17,11 @@ const saltFile = "SALT"
//ProfileStoreV1 storage for profiles and message streams that uses in memory key and fs stored salt instead of in memory password
type ProfileStoreV1 struct {
fs FileStore
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
directory string
profile *model.Profile
key [32]byte
salt [128]byte
eventManager event.Manager
queue event.Queue
writer bool
fs FileStore
directory string
profile *model.Profile
key [32]byte
salt [128]byte
}
// CheckPassword returns true if the given password produces the same key as the current stored key, otherwise false.
@ -70,39 +61,11 @@ func CreateProfileWriterStore(eventManager event.Manager, directory, password st
return nil
}
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
ps.save()
ps.initProfileWriterStore()
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile}
return ps
}
func (ps *ProfileStoreV1) initProfileWriterStore() {
ps.queue = event.NewQueue()
go ps.eventHandler()
ps.eventManager.Subscribe(event.SetPeerAuthorization, ps.queue)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.RejectGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewGroup, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
ps.eventManager.Subscribe(event.UpdateMessageFlags, ps.queue)
}
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (*ProfileStoreV1, error) {
@ -113,7 +76,7 @@ func LoadProfileWriterStore(eventManager event.Manager, directory, password stri
key := CreateKey(password, salt)
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil}
copy(ps.salt[:], salt)
err = ps.load()
@ -121,7 +84,6 @@ func LoadProfileWriterStore(eventManager event.Manager, directory, password stri
return nil, err
}
ps.initProfileWriterStore()
return ps, nil
}
@ -129,7 +91,7 @@ func LoadProfileWriterStore(eventManager event.Manager, directory, password stri
// directory should be $appDir/profiles/$rand
func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) {
os.Mkdir(directory, 0700)
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil}
err := ps.load()
if err != nil {
@ -141,24 +103,6 @@ func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile
return profile, nil
}
// UpgradeV0Profile takes a profile (presumably from a V0 store) and creates and writes a V1 store
func UpgradeV0Profile(profile *model.Profile, directory, password string) error {
key, salt, err := InitV1Directory(directory, password)
if err != nil {
return err
}
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
ps.save()
for gid, group := range ps.profile.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
ss.WriteN(ps.profile.Groups[gid].Timeline.Messages)
}
return nil
}
// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
@ -171,113 +115,6 @@ func (ps *ProfileStoreV1) GetNewPeerMessage() *event.Event {
return &message
}
// GetStatusMessages creates an array of status messages for all peers and group servers from current information
func (ps *ProfileStoreV1) GetStatusMessages() []*event.Event {
messages := []*event.Event{}
for _, contact := range ps.profile.Contacts {
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(contact.Onion),
event.ConnectionState: contact.State,
})
messages = append(messages, &message)
}
doneServers := make(map[string]bool)
for _, group := range ps.profile.Groups {
if _, exists := doneServers[group.GroupServer]; !exists {
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(group.GroupServer),
event.ConnectionState: group.State,
})
messages = append(messages, &message)
doneServers[group.GroupServer] = true
}
}
return messages
}
// ChangePassword restores all data under a new password's encryption
func (ps *ProfileStoreV1) ChangePassword(oldpass, newpass, eventID string) {
oldkey := CreateKey(oldpass, ps.salt[:])
if oldkey != ps.key {
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID))
return
}
newkey := CreateKey(newpass, ps.salt[:])
newStreamStores := map[string]StreamStore{}
idToNewLocalID := map[string]string{}
// Generate all new StreamStores with the new password and write all the old StreamStore data into these ones
for ssid, ss := range ps.streamStores {
// New ss with new pass and new localID
newlocalID := model.GenerateRandomID()
idToNewLocalID[ssid] = newlocalID
newSS := NewStreamStore(ps.directory, newlocalID, newkey)
newStreamStores[ssid] = newSS
// write whole store
messages := ss.Read()
newSS.WriteN(messages)
}
// Switch over
oldStreamStores := ps.streamStores
ps.streamStores = newStreamStores
for ssid, newLocalID := range idToNewLocalID {
if len(ssid) == groupIDLen {
ps.profile.Groups[ssid].LocalID = newLocalID
} else {
if ps.profile.Contacts[ssid] != nil {
ps.profile.Contacts[ssid].LocalID = newLocalID
} else {
log.Errorf("Unknown Contact: %v. This is probably the result of corrupted development data from fuzzing. This contact will not appear in the new profile.", ssid)
}
}
}
ps.key = newkey
ps.fs.ChangeKey(newkey)
ps.save()
// Clean up
for _, oldss := range oldStreamStores {
oldss.Delete()
}
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID))
return
}
func (ps *ProfileStoreV1) save() error {
if ps.writer {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Write(bytes)
}
return nil
}
func (ps *ProfileStoreV1) regenStreamStore(messages []model.Message, contact string) {
oldss := ps.streamStores[contact]
newLocalID := model.GenerateRandomID()
newSS := NewStreamStore(ps.directory, newLocalID, ps.key)
newSS.WriteN(messages)
if len(contact) == groupIDLen {
ps.profile.Groups[contact].LocalID = newLocalID
} else {
// We can assume this exists as regen stream store should only happen to *update* a message
ps.profile.Contacts[contact].LocalID = newLocalID
}
ps.streamStores[contact] = newSS
ps.save()
oldss.Delete()
}
// load instantiates a cwtchPeer from the file store
func (ps *ProfileStoreV1) load() error {
decrypted, err := ps.fs.Read()
@ -310,7 +147,6 @@ func (ps *ProfileStoreV1) load() error {
if saveHistory == event.SaveHistoryConfirmed {
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read())
ps.streamStores[contact.Onion] = ss
}
}
@ -320,15 +156,10 @@ func (ps *ProfileStoreV1) load() error {
delete(cp.Groups, gid)
continue
}
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
cp.Groups[gid].Timeline.SetMessages(ss.Read())
cp.Groups[gid].Timeline.Sort()
ps.streamStores[group.GroupID] = ss
}
ps.save()
}
return err
@ -338,238 +169,3 @@ func (ps *ProfileStoreV1) load() error {
func (ps *ProfileStoreV1) GetProfileCopy(timeline bool) *model.Profile {
return ps.profile.GetCopy(timeline)
}
func (ps *ProfileStoreV1) eventHandler() {
for {
ev := ps.queue.Next()
log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID)
switch ev.EventType {
case event.SetPeerAuthorization:
err := ps.profile.SetContactAuthorization(ev.Data[event.RemotePeer], model.Authorization(ev.Data[event.Authorization]))
if err == nil {
ps.save()
}
case event.PeerCreated:
var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
case event.GroupCreated:
var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
ps.profile.AddGroup(group)
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
ps.save()
case event.SetAttribute:
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
case event.SetPeerAttribute:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
switch ev.Data[event.Key] {
case event.SaveHistoryKey:
if event.DeleteHistoryConfirmed == ev.Data[event.Data] {
ss, exists := ps.streamStores[ev.Data[event.RemotePeer]]
if exists {
ss.Delete()
delete(ps.streamStores, ev.Data[event.RemotePeer])
}
} else if event.SaveHistoryConfirmed == ev.Data[event.Data] {
_, exists := ps.streamStores[ev.Data[event.RemotePeer]]
if !exists {
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
ps.streamStores[ev.Data[event.RemotePeer]] = ss
}
}
default:
{
}
}
} else {
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
}
case event.SetGroupAttribute:
group := ps.profile.GetGroup(ev.Data[event.GroupID])
if group != nil {
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
} else {
log.Errorf("error setting attribute on group %v group does not exist", ev)
}
case event.AcceptGroupInvite:
err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
if err == nil {
ps.save()
} else {
log.Errorf("error accepting group invite")
}
case event.RejectGroupInvite:
ps.profile.RejectInvite(ev.Data[event.GroupID])
ps.save()
case event.NewGroup:
gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite])
if err == nil {
ps.save()
group := ps.profile.Groups[gid]
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
} else {
log.Errorf("error storing new group invite: %v (%v)", err, ev)
}
case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store.
// stream store doesn't support updates, so we don't want to commit it till ack'd
ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID)
case event.NewMessageFromPeer:
ps.profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now())
ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true)
case event.PeerAcknowledgement:
onion := ev.Data[event.RemotePeer]
eventID := ev.Data[event.EventID]
contact, ok := ps.profile.Contacts[onion]
if ok {
mIdx, ok := contact.UnacknowledgedMessages[eventID]
if ok {
message := contact.Timeline.Messages[mIdx]
ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false)
}
}
ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
sig, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
prevsig, _ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature])
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true}
ss, exists := ps.streamStores[groupid]
if exists {
// We need to store a local copy of the message...
ps.profile.GetGroup(groupid).Timeline.Insert(&message)
ss.Write(message)
} else {
log.Errorf("error storing new group message: %v stream store does not exist", ev)
}
case event.PeerStateChange:
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
}
case event.ServerStateChange:
for _, group := range ps.profile.Groups {
if group.GroupServer == ev.Data[event.GroupServer] {
group.State = ev.Data[event.ConnectionState]
}
}
case event.DeleteContact:
onion := ev.Data[event.RemotePeer]
ps.profile.DeleteContact(onion)
ps.save()
ss, exists := ps.streamStores[onion]
if exists {
ss.Delete()
delete(ps.streamStores, onion)
}
case event.DeleteGroup:
groupID := ev.Data[event.GroupID]
ps.profile.DeleteGroup(groupID)
ps.save()
ss, exists := ps.streamStores[groupID]
if exists {
ss.Delete()
delete(ps.streamStores, groupID)
}
case event.ChangePassword:
oldpass := ev.Data[event.Password]
newpass := ev.Data[event.NewPassword]
ps.ChangePassword(oldpass, newpass, ev.EventID)
case event.UpdateMessageFlags:
handle := ev.Data[event.Handle]
mIx, err := strconv.Atoi(ev.Data[event.Index])
if err != nil {
log.Errorf("Invalid Message Index: %v", err)
return
}
flags, err := strconv.ParseUint(ev.Data[event.Flags], 2, 64)
if err != nil {
log.Errorf("Invalid Message Flags: %v", err)
return
}
ps.profile.UpdateMessageFlags(handle, mIx, flags)
if len(handle) == groupIDLen {
ps.regenStreamStore(ps.profile.GetGroup(handle).Timeline.Messages, handle)
} else if contact, exists := ps.profile.GetContact(handle); exists {
if exists {
val, _ := contact.GetAttribute(event.SaveHistoryKey)
if val == event.SaveHistoryConfirmed {
ps.regenStreamStore(contact.Timeline.Messages, handle)
}
}
}
default:
log.Debugf("shutting down profile store: %v", ev)
return
}
}
}
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
// as coming from the remote peer or if it was sent by out profile.
func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) {
contact, exists := ps.profile.GetContact(peerID)
if exists {
val, _ := contact.GetAttribute(event.SaveHistoryKey)
switch val {
case event.SaveHistoryConfirmed:
{
peerID := peerID
var received time.Time
var message model.Message
if fromPeer {
received, _ = time.Parse(time.RFC3339Nano, timestampeReceived)
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
} else {
received := time.Now()
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true}
}
ss, exists := ps.streamStores[peerID]
if exists {
ss.Write(message)
} else {
log.Errorf("error storing new peer message: %v stream store does not exist", peerID)
}
}
default:
{
}
}
} else {
log.Errorf("error saving message for peer that doesn't exist: %v", peerID)
}
}
// Shutdown shuts down the queue / thread
func (ps *ProfileStoreV1) Shutdown() {
if ps.queue != nil {
ps.queue.Shutdown()
}
}
// Delete removes all stored files for this stored profile
func (ps *ProfileStoreV1) Delete() {
log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion)
for _, ss := range ps.streamStores {
ss.Delete()
}
ps.fs.Delete()
err := os.RemoveAll(ps.directory)
if err != nil {
log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err)
}
}

View File

@ -1,159 +0,0 @@
// Known race issue with event bus channel closure
package v1
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"encoding/base64"
"fmt"
"log"
"os"
"testing"
"time"
)
const testProfileName = "Alice"
const testKey = "key"
const testVal = "value"
const testInitialMessage = "howdy"
const testMessage = "Hello from storage"
func TestProfileStoreWriteRead(t *testing.T) {
log.Println("profile store test!")
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
profile := NewProfile(testProfileName)
// The lightest weight server entry possible (usually we would import a key bundle...)
profile.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &model.PublicProfile{Attributes: map[string]string{string(model.KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}})
ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile)
eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
time.Sleep(1 * time.Second)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
event.Data: testMessage,
}))
time.Sleep(1 * time.Second)
ps1.Shutdown()
ps2, err := LoadProfileWriterStore(eventBus, testingDir, password)
if err != nil {
t.Errorf("Error createing ProfileStoreV1: %v\n", err)
}
profile = ps2.GetProfileCopy(true)
if profile.Name != testProfileName {
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
}
v, _ := profile.GetAttribute(testKey)
if v != testVal {
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
}
group2 := ps2.GetProfileCopy(true).Groups[groupid]
if group2 == nil {
t.Errorf("Group not loaded\n")
}
}
func TestProfileStoreChangePassword(t *testing.T) {
os.RemoveAll(testingDir)
eventBus := event.NewEventManager()
queue := event.NewQueue()
eventBus.Subscribe(event.ChangePasswordSuccess, queue)
profile := NewProfile(testProfileName)
profile.AddContact("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", &model.PublicProfile{Attributes: map[string]string{string(model.KeyTypeServerOnion): "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"}})
ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile)
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
if err != nil {
t.Errorf("Creating group: %v\n", err)
}
if err != nil {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
fmt.Println("Sending 200 messages...")
for i := 0; i < 200; i++ {
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: profile.Onion,
event.Data: testMessage,
event.Signature: base64.StdEncoding.EncodeToString([]byte{byte(i)}),
}))
}
newPass := "qwerty123"
fmt.Println("Sending Change Passwords event...")
eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass))
ev := queue.Next()
if ev.EventType != event.ChangePasswordSuccess {
t.Errorf("Unexpected event response detected %v\n", ev.EventType)
return
}
fmt.Println("Sending 10 more messages...")
for i := 0; i < 10; i++ {
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: profile.Onion,
event.Data: testMessage,
event.Signature: base64.StdEncoding.EncodeToString([]byte{0x01, byte(i)}),
}))
}
time.Sleep(3 * time.Second)
fmt.Println("Shutdown profile store...")
ps1.Shutdown()
fmt.Println("New Profile store...")
ps2, err := LoadProfileWriterStore(eventBus, testingDir, newPass)
if err != nil {
t.Errorf("Error createing new ProfileStoreV1 with new password: %v\n", err)
return
}
profile2 := ps2.GetProfileCopy(true)
if profile2.Groups[groupid] == nil {
t.Errorf("Failed to load group %v\n", groupid)
return
}
if len(profile2.Groups[groupid].Timeline.Messages) != 210 {
t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
}
}

View File

@ -40,22 +40,22 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
return numVerified
}
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, serverAddr string) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
fmt.Printf("%v checking group connection...\n", peerName)
state, ok := peer.GetGroupState(groupID)
state, ok := peer.GetPeerState(serverAddr)
if ok {
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peerName, groupID, state)
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peerName, serverAddr, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), groupID)
t.Fatalf("%v could not connect to %v", peer.GetOnion(), serverAddr)
}
if state != connections.SYNCED {
fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peerName, peer.GetOnion(), groupID, connections.ConnectionStateName[state])
fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peerName, peer.GetOnion(), serverAddr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
fmt.Printf("peer %v %v CONNECTED to group %v\n", peerName, peer.GetOnion(), groupID)
fmt.Printf("peer %v %v CONNECTED to group %v\n", peerName, peer.GetOnion(), serverAddr)
break
}
}
@ -185,15 +185,15 @@ func TestCwtchPeerIntegration(t *testing.T) {
alice.PeerWithOnion(carol.GetOnion())
fmt.Println("Creating group on ", ServerAddr, "...")
groupID, _, err := alice.StartGroup(ServerAddr)
fmt.Printf("Created group: %v!\n", groupID)
aliceGroupConversationID, err := alice.StartGroup(ServerAddr)
fmt.Printf("Created group: %v!\n", aliceGroupConversationID)
if err != nil {
t.Errorf("Failed to init group: %v", err)
return
}
fmt.Println("Waiting for alice to join server...")
waitForPeerGroupConnection(t, alice, groupID)
waitForPeerGroupConnection(t, alice, ServerAddr)
fmt.Println("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
@ -219,31 +219,31 @@ func TestCwtchPeerIntegration(t *testing.T) {
// Probably related to latency/throughput problems in the underlying tor network.
time.Sleep(30 * time.Second)
aliceName, err := bob.GetConversationAttribute(alice.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
aliceName, err := bob.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" {
t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v: %v\n", aliceName, err)
}
fmt.Printf("Bob has alice's name as '%v'\n", aliceName)
bobName, err := alice.GetConversationAttribute(bob.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
bobName, err := alice.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || bobName != "Bob" {
t.Fatalf("Alice: bob GetKeyVal error on bob peer.name %v: %v \n", bobName, err)
}
fmt.Printf("Alice has bob's name as '%v'\n", bobName)
aliceName, err = carol.GetConversationAttribute(alice.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
aliceName, err = carol.GetConversationAttribute(2, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || aliceName != "Alice" {
t.Fatalf("carol GetKeyVal error for alice peer.name %v: %v\n", aliceName, err)
}
carolName, err := alice.GetConversationAttribute(carol.GetOnion(), attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
carolName, err := alice.GetConversationAttribute(1, attr.PeerScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)))
if err != nil || carolName != "Carol" {
t.Fatalf("alice GetKeyVal error, carol peer.name: %v: %v\n", carolName, err)
}
fmt.Printf("Alice has carol's name as '%v'\n", carolName)
fmt.Println("Alice inviting Bob to group...")
err = alice.InviteOnionToGroup(bob.GetOnion(), groupID)
err = alice.SendInviteToConversation(1, aliceGroupConversationID)
if err != nil {
t.Fatalf("Error for Alice inviting Bob to group: %v", err)
}
@ -265,7 +265,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
fmt.Println("Waiting for Bob to join connect to group server...")
waitForPeerGroupConnection(t, bob, groupID)
waitForPeerGroupConnection(t, bob, ServerAddr)
numGoRoutinesPostServerConnect := runtime.NumGoroutine()
@ -274,7 +274,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Starting conversation in group...")
// Conversation
fmt.Printf("%v> %v\n", aliceName, aliceLines[0])
err = alice.SendMessage(groupID, aliceLines[0])
err = alice.SendMessage(aliceGroupConversationID, aliceLines[0])
if err != nil {
t.Fatalf("Alice failed to send a message to the group: %v", err)
}

View File

@ -51,7 +51,6 @@ func TestEncryptedStorage(t *testing.T) {
fmt.Println("Creating Alice...")
defer acn.Close()
acn.WaitTillBootstrapped()
app := app2.NewApp(acn, cwtchDir)
@ -76,11 +75,9 @@ func TestEncryptedStorage(t *testing.T) {
time.Sleep(time.Second * 30)
alice.SendMessage(bob.GetOnion(), "Hello Bob")
alice.SendMessage(2, "Hello Bob")
time.Sleep(time.Second * 30)
ci, _ := bob.FetchConversationInfo(alice.GetOnion())
body, _, err := bob.GetChannelMessage(ci.ID, 0, 1)
if body != "Hello Bob" || err != nil {

View File

@ -96,6 +96,7 @@ func TestFileSharing(t *testing.T) {
fmt.Println("Creating Bob...")
app.CreateTaggedPeer("bob", "asdfasdf", "testing")
t.Logf("** Waiting for Alice, Bob...")
alice := utils.WaitGetPeer(app, "alice")
bob := utils.WaitGetPeer(app, "bob")
@ -105,14 +106,15 @@ func TestFileSharing(t *testing.T) {
queueOracle := event.NewQueue()
app.GetEventBus(bob.GetOnion()).Subscribe(event.FileDownloaded, queueOracle)
t.Logf("** Launching Peers...")
app.LaunchPeers()
waitTime := time.Duration(30) * time.Second
t.Logf("** Waiting for Alice, Bob to connect with onion network... (%v)\n", waitTime)
time.Sleep(waitTime)
bob.NewContactConversation(alice.GetOnion(),model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(),model.DefaultP2PAccessControl(), true)
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
fmt.Println("Waiting for alice and Bob to peer...")
@ -122,7 +124,7 @@ func TestFileSharing(t *testing.T) {
filesharingFunctionality, _ := filesharing.FunctionalityGate(map[string]bool{"filesharing": true})
err = filesharingFunctionality.ShareFile("cwtch.png", alice, bob.GetOnion())
err = filesharingFunctionality.ShareFile("cwtch.png", alice, 1)
if err != nil {
t.Fatalf("Error!: %v", err)
@ -131,10 +133,10 @@ func TestFileSharing(t *testing.T) {
// Wait for the messages to arrive...
time.Sleep(time.Second * 10)
message,_,err := bob.GetChannelMessage(1,0, 1)
if err != nil {
t.Fatalf("could not find file sharing message: %v", err)
}
message, _, err := bob.GetChannelMessage(1, 0, 1)
if err != nil {
t.Fatalf("could not find file sharing message: %v", err)
}
var messageWrapper model.MessageWrapper
json.Unmarshal([]byte(message), &messageWrapper)
@ -148,7 +150,6 @@ func TestFileSharing(t *testing.T) {
}
}
// Wait for the file downloaded event
ev := queueOracle.Next()
if ev.EventType != event.FileDownloaded {