Browse Source

save new groups and group timelines

ebf201902021718
erinn 10 months ago
parent
commit
ac077521be
10 changed files with 97 additions and 37 deletions
  1. 1
    0
      app/app.go
  2. 10
    0
      event/common.go
  3. 1
    1
      model/group.go
  4. 16
    16
      model/message.go
  5. 2
    2
      model/message_test.go
  6. 2
    2
      model/profile.go
  7. 1
    1
      model/profile_test.go
  8. 34
    3
      peer/cwtch_peer.go
  9. 1
    1
      storage/file_enc.go
  10. 29
    11
      storage/profile_store.go

+ 1
- 0
app/app.go View File

@@ -92,6 +92,7 @@ func (app *application) LoadProfiles(password string) error {
}

profile := profileStore.GetProfileCopy()

_, exists := app.peers[profile.Onion]
if exists {
profileStore.Shutdown()

+ 10
- 0
event/common.go View File

@@ -27,6 +27,16 @@ const (

// REQUESTS TO STORAGE ENGINE

// a peer contact has been added
// attributes:
// RemotePeer [eg ""]
PeerCreated = Type("PeerCreated")

// a group has been successfully added or newly created
// attributes:
// Data [serialized *model.Group]
GroupCreated = Type("GroupCreated")

// change the .Name attribute of a profile (careful - this is not a custom attribute. it is used in the underlying protocol during handshakes!)
// attributes:
// ProfileName [eg "erinn"]

+ 1
- 1
model/group.go View File

@@ -15,7 +15,7 @@ import (
)

// Group defines and encapsulates Cwtch's conception of group chat. Which are sessions
// tied to a server under a given group key. Each group has a set of messages.
// tied to a server under a given group key. Each group has a set of Messages.
type Group struct {
GroupID string
SignedGroupID []byte

+ 16
- 16
model/message.go View File

@@ -6,10 +6,10 @@ import (
"time"
)

// Timeline encapsulates a collection of ordered messages, and a mechanism to access them
// Timeline encapsulates a collection of ordered Messages, and a mechanism to access them
// in a threadsafe manner.
type Timeline struct {
messages []Message
Messages []Message
SignedGroupID []byte
lock sync.Mutex
}
@@ -42,46 +42,46 @@ func compareSignatures(a []byte, b []byte) bool {
// GetMessages returns a copy of the entire timeline
func (t *Timeline) GetMessages() []Message {
t.lock.Lock()
messages := make([]Message, len(t.messages))
copy(messages[:], t.messages[:])
messages := make([]Message, len(t.Messages))
copy(messages[:], t.Messages[:])
t.lock.Unlock()
return messages
}

// SetMessages sets the messages of this timeline. Only to be used in loading/initialization
// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
func (t *Timeline) SetMessages(messages []Message) {
t.lock.Lock()
defer t.lock.Unlock()
t.messages = messages
t.Messages = messages
}

// Len gets the length of the timeline
func (t *Timeline) Len() int {
return len(t.messages)
return len(t.Messages)
}

// Swap swaps 2 messages on the timeline.
// Swap swaps 2 Messages on the timeline.
func (t *Timeline) Swap(i, j int) {
t.messages[i], t.messages[j] = t.messages[j], t.messages[i]
t.Messages[i], t.Messages[j] = t.Messages[j], t.Messages[i]
}

// Less checks 2 messages (i and j) in the timeline and returns true if i occurred before j, else false
// Less checks 2 Messages (i and j) in the timeline and returns true if i occurred before j, else false
func (t *Timeline) Less(i, j int) bool {

if t.messages[i].Timestamp.Before(t.messages[j].Timestamp) {
if t.Messages[i].Timestamp.Before(t.Messages[j].Timestamp) {
return true
}

// Short circuit false if j is before i, signature checks will give a wrong order in this case.
if t.messages[j].Timestamp.Before(t.messages[i].Timestamp) {
if t.Messages[j].Timestamp.Before(t.Messages[i].Timestamp) {
return false
}

if compareSignatures(t.messages[i].PreviousMessageSig, t.SignedGroupID) {
if compareSignatures(t.Messages[i].PreviousMessageSig, t.SignedGroupID) {
return true
}

if compareSignatures(t.messages[i].Signature, t.messages[j].PreviousMessageSig) {
if compareSignatures(t.Messages[i].Signature, t.Messages[j].PreviousMessageSig) {
return true
}

@@ -101,14 +101,14 @@ func (t *Timeline) Insert(mi *Message) bool {
t.lock.Lock()
defer t.lock.Unlock()

for _, m := range t.messages {
for _, m := range t.Messages {
// If the message already exists, then we don't add it
if compareSignatures(m.Signature, mi.Signature) {
return true
}
}

t.messages = append(t.messages, *mi)
t.Messages = append(t.Messages, *mi)
sort.Sort(t)
return false
}

+ 2
- 2
model/message_test.go View File

@@ -87,7 +87,7 @@ func TestTranscriptConsistency(t *testing.T) {
_, _, m4 := sarah.AttemptDecryption(c4, s4)
_, _, m5 := sarah.AttemptDecryption(c5, s5)

// Now we simulate a client receiving these messages completely out of order
// Now we simulate a client receiving these Messages completely out of order
timeline.Insert(m1)
timeline.Insert(m5)
timeline.Insert(m4)
@@ -99,6 +99,6 @@ func TestTranscriptConsistency(t *testing.T) {
t.Fatalf("Timeline Out of Order!: %v %v", i, m)
}

t.Logf("messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
t.Logf("Messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
}
}

+ 2
- 2
model/profile.go View File

@@ -337,8 +337,8 @@ func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte,
timestamp := time.Now().Unix()

var prevSig []byte
if len(group.Timeline.messages) > 0 {
prevSig = group.Timeline.messages[len(group.Timeline.messages)-1].Signature
if len(group.Timeline.Messages) > 0 {
prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature
} else {
prevSig = group.SignedGroupID
}

+ 1
- 1
model/profile_test.go View File

@@ -169,7 +169,7 @@ func TestProfileGroup(t *testing.T) {
eve := GenerateNewProfile("eve")
ok, _, _ = eve.AttemptDecryption(c3, s3)
if ok {
t.Errorf("Eves hould not be able to decrypt messages!")
t.Errorf("Eves hould not be able to decrypt Messages!")
}
} else {
t.Errorf("Bob failed to encrypt a message to the group")

+ 34
- 3
peer/cwtch_peer.go View File

@@ -6,6 +6,7 @@ import (
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections"
"encoding/base64"
"encoding/json"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
@@ -58,6 +59,7 @@ type CwtchPeer interface {

GetGroup(string) *model.Group
GetGroups() []string
AddContact(nick, onion string, publickey []byte, trusted bool)
GetContacts() []string
GetContact(string) *model.PublicProfile

@@ -118,6 +120,14 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
onion := utils.GetTorV3Hostname(edpk)
cp.Profile.AddContact(onion, &model.PublicProfile{Name: "", Ed25519PublicKey: edpk, Trusted: true, Blocked: false, Onion: onion})
cp.Profile.ProcessInvite(cpp.GetGroupChatInvite(), onion)
jsobj, err := json.Marshal(cp.GetGroup(cpp.GroupChatInvite.GetGroupName()))
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.Data: string(jsobj),
}))
} else {
log.Errorf("error serializing group: %v", err)
}
return cpp.GroupChatInvite.GetGroupName(), nil
}
}
@@ -143,12 +153,24 @@ func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {

// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) {
return cp.Profile.StartGroup(server)
return cp.StartGroupWithMessage(server, []byte{})
}

// StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error.
func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (string, []byte, error) {
return cp.Profile.StartGroupWithMessage(server, initialMessage)
func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) {
groupID, invite, err = cp.Profile.StartGroupWithMessage(server, initialMessage)
if err == nil {
group := cp.GetGroup(groupID)
jsobj, err := json.Marshal(group)
if err != nil {
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.Data: string(jsobj),
}))
}
} else {
log.Errorf("error creating group: %v", err)
}
return
}

// GetGroups returns an unordered list of all group IDs.
@@ -161,6 +183,15 @@ func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
return cp.Profile.GetGroupByGroupID(groupID)
}

func (cp *cwtchPeer) AddContact(nick, onion string, publickey []byte, trusted bool) {
pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: publickey, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"name": nick}}
cp.GetProfile().Contacts[onion] = pp
cp.Profile.AddContact(onion, pp)
cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
event.RemotePeer: onion,
}))
}

// GetContacts returns an unordered list of onions
func (cp *cwtchPeer) GetContacts() []string {
return cp.Profile.GetContacts()

+ 1
- 1
storage/file_enc.go View File

@@ -53,7 +53,7 @@ func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
// Load instantiates a cwtchPeer from the file store
func readEncryptedFile(directory, filename, password string) ([]byte, error) {
encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename))
if err == nil {
if err == nil && len(encryptedbytes) > 128 {
var dkr [32]byte
//Separate the salt from the encrypted bytes, then generate the derived key
salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:]

+ 29
- 11
storage/profile_store.go View File

@@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"encoding/json"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto"
"os"
"time"
@@ -39,6 +40,8 @@ func NewProfileStore(eventManager *event.Manager, directory, password string) Pr
go ps.eventHandler()

ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel)
@@ -69,19 +72,20 @@ func (ps *profileStore) Load() error {
err = json.Unmarshal(decrypted, &cp)
if err == nil {
ps.profile = cp
return nil
}

for _, profile := range cp.Contacts {
ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
profile.Timeline.SetMessages(ss.Read())
ps.streamStores[profile.Onion] = ss
}
for _, profile := range cp.Contacts {
ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
profile.Timeline.SetMessages(ss.Read())
ps.streamStores[profile.Onion] = ss
}

for _, group := range cp.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
group.Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
for _, group := range cp.Groups {
log.Debugf("loading group %v", group)
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
group.Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
log.Debugf("loading group %v", group)
}
}

return err
@@ -101,6 +105,20 @@ func (ps *profileStore) eventHandler() {
contact.Blocked = true
ps.save()
}
case event.PeerCreated:
var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
pp.Timeline.SetMessages(ss.Read())
ps.streamStores[pp.Onion] = ss
ps.save()
case event.GroupCreated:
var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
ps.profile.AddGroup(group)
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
ps.save()
case event.SetProfileName:
ps.profile.Name = ev.Data[event.ProfileName]
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])

Loading…
Cancel
Save