Merge branch 'storage-eb' of dan/cwtch into master
This commit is contained in:
commit
34e0a8f925
24
app/app.go
24
app/app.go
|
@ -1,11 +1,9 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/storage"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
||||
|
@ -13,7 +11,6 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
|
@ -52,25 +49,15 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
|
|||
return app
|
||||
}
|
||||
|
||||
func generateRandomFilename() string {
|
||||
randBytes := make([]byte, 16)
|
||||
rand.Read(randBytes)
|
||||
return filepath.Join(hex.EncodeToString(randBytes))
|
||||
}
|
||||
|
||||
// NewProfile creates a new cwtchPeer with a given name.
|
||||
func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) {
|
||||
log.Debugf("CreatePeer(%v)\n", name)
|
||||
|
||||
randomFileName := generateRandomFilename()
|
||||
// TODO: eventBus per profile
|
||||
profileStore, err := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", randomFileName), password)
|
||||
profileStore.Init(name)
|
||||
p := peer.NewCwtchPeer(name)
|
||||
|
||||
app.eventBus.Publish(event.NewEvent(event.SetProfileName, map[event.Field]string{
|
||||
event.ProfileName: name,
|
||||
}))
|
||||
profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", p.GetProfile().LocalID), password)
|
||||
err := profileStore.Init(name)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -98,12 +85,7 @@ func (app *application) LoadProfiles(password string) error {
|
|||
for _, file := range files {
|
||||
|
||||
// TODO: Per profile eventBus
|
||||
profileStore, err := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password)
|
||||
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password)
|
||||
err = profileStore.Load()
|
||||
if err != nil {
|
||||
continue
|
||||
|
|
|
@ -512,7 +512,7 @@ func main() {
|
|||
group = g
|
||||
|
||||
fmt.Printf("--------------- %v ---------------\n", group.GroupID)
|
||||
gms := group.Timeline.Messages
|
||||
gms := group.Timeline.GetMessages()
|
||||
max := 20
|
||||
if len(gms) < max {
|
||||
max = len(gms)
|
||||
|
|
|
@ -38,7 +38,7 @@ func convertTorFile(filename string, password string) error {
|
|||
peer.GetProfile().Ed25519PublicKey = pk
|
||||
peer.GetProfile().Onion = string(onion)
|
||||
fileStore := storage2.NewFileStore(filename, password)
|
||||
err = fileStore.Save(peer)
|
||||
err = fileStore.save(peer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -147,7 +147,7 @@ func main() {
|
|||
|
||||
fileStore2, _ := storage.NewProfileStore(nil, os.Args[2], newpw1)
|
||||
// No way to copy, populate this method
|
||||
err = fileStore2.Save(peer)
|
||||
err = fileStore2.save(peer)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
os.Exit(1)
|
||||
|
|
|
@ -72,7 +72,7 @@ const (
|
|||
|
||||
ProfileName = Field("ProfileName")
|
||||
|
||||
Key = Field("Key")
|
||||
Key = Field("Key")
|
||||
Data = Field("Data")
|
||||
|
||||
Error = Field("Error")
|
||||
|
|
|
@ -29,11 +29,13 @@ type Group struct {
|
|||
Attributes map[string]string
|
||||
lock sync.Mutex
|
||||
NewMessage chan Message `json:"-"`
|
||||
LocalID string
|
||||
}
|
||||
|
||||
// NewGroup initializes a new group associated with a given CwtchServer
|
||||
func NewGroup(server string) (*Group, error) {
|
||||
group := new(Group)
|
||||
group.LocalID = generateRandomID()
|
||||
|
||||
if utils.IsValidHostname(server) == false {
|
||||
return nil, errors.New("Server is not a valid v3 onion")
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
// Timeline encapsulates a collection of ordered messages, and a mechanism to access them
|
||||
// in a threadsafe manner.
|
||||
type Timeline struct {
|
||||
Messages []Message
|
||||
messages []Message
|
||||
SignedGroupID []byte
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
@ -24,6 +24,9 @@ type Message struct {
|
|||
PreviousMessageSig []byte
|
||||
}
|
||||
|
||||
// MessageBaseSize is a rough estimate of the base number of bytes the struct uses before strings are populated
|
||||
const MessageBaseSize = 104
|
||||
|
||||
func compareSignatures(a []byte, b []byte) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
|
@ -39,39 +42,46 @@ func compareSignatures(a []byte, b []byte) bool {
|
|||
// GetMessages returns a copy of the entire timeline
|
||||
func (t *Timeline) GetMessages() []Message {
|
||||
t.lock.Lock()
|
||||
messages := make([]Message, len(t.Messages))
|
||||
copy(messages[:], t.Messages[:])
|
||||
messages := make([]Message, len(t.messages))
|
||||
copy(messages[:], t.messages[:])
|
||||
t.lock.Unlock()
|
||||
return messages
|
||||
}
|
||||
|
||||
// SetMessages sets the messages of this timeline. Only to be used in loading/initialization
|
||||
func (t *Timeline) SetMessages(messages []Message) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.messages = messages
|
||||
}
|
||||
|
||||
// Len gets the length of the timeline
|
||||
func (t *Timeline) Len() int {
|
||||
return len(t.Messages)
|
||||
return len(t.messages)
|
||||
}
|
||||
|
||||
// Swap swaps 2 messages on the timeline.
|
||||
func (t *Timeline) Swap(i, j int) {
|
||||
t.Messages[i], t.Messages[j] = t.Messages[j], t.Messages[i]
|
||||
t.messages[i], t.messages[j] = t.messages[j], t.messages[i]
|
||||
}
|
||||
|
||||
// Less checks 2 messages (i and j) in the timeline and returns true if i occurred before j, else false
|
||||
func (t *Timeline) Less(i, j int) bool {
|
||||
|
||||
if t.Messages[i].Timestamp.Before(t.Messages[j].Timestamp) {
|
||||
if t.messages[i].Timestamp.Before(t.messages[j].Timestamp) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Short circuit false if j is before i, signature checks will give a wrong order in this case.
|
||||
if t.Messages[j].Timestamp.Before(t.Messages[i].Timestamp) {
|
||||
if t.messages[j].Timestamp.Before(t.messages[i].Timestamp) {
|
||||
return false
|
||||
}
|
||||
|
||||
if compareSignatures(t.Messages[i].PreviousMessageSig, t.SignedGroupID) {
|
||||
if compareSignatures(t.messages[i].PreviousMessageSig, t.SignedGroupID) {
|
||||
return true
|
||||
}
|
||||
|
||||
if compareSignatures(t.Messages[i].Signature, t.Messages[j].PreviousMessageSig) {
|
||||
if compareSignatures(t.messages[i].Signature, t.messages[j].PreviousMessageSig) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -91,14 +101,14 @@ func (t *Timeline) Insert(mi *Message) bool {
|
|||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
for _, m := range t.Messages {
|
||||
for _, m := range t.messages {
|
||||
// If the message already exists, then we don't add it
|
||||
if compareSignatures(m.Signature, mi.Signature) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
t.Messages = append(t.Messages, *mi)
|
||||
t.messages = append(t.messages, *mi)
|
||||
sort.Sort(t)
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -99,6 +99,6 @@ func TestTranscriptConsistency(t *testing.T) {
|
|||
t.Fatalf("Timeline Out of Order!: %v %v", i, m)
|
||||
}
|
||||
|
||||
t.Logf("Messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
|
||||
t.Logf("messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,12 +4,14 @@ import (
|
|||
"crypto/rand"
|
||||
"cwtch.im/cwtch/protocol"
|
||||
"encoding/base32"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -24,6 +26,7 @@ type PublicProfile struct {
|
|||
Onion string
|
||||
Attributes map[string]string
|
||||
Timeline Timeline
|
||||
LocalID string // used by storage engine
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -39,8 +42,15 @@ type Profile struct {
|
|||
// TODO: Should this be per server?
|
||||
const MaxGroupMessageLength = 1800
|
||||
|
||||
func generateRandomID() string {
|
||||
randBytes := make([]byte, 16)
|
||||
rand.Read(randBytes)
|
||||
return filepath.Join(hex.EncodeToString(randBytes))
|
||||
}
|
||||
|
||||
func (p *PublicProfile) init() {
|
||||
p.Attributes = make(map[string]string)
|
||||
p.LocalID = generateRandomID()
|
||||
}
|
||||
|
||||
// SetAttribute allows applications to store arbitrary configuration info at the profile level.
|
||||
|
@ -327,8 +337,8 @@ func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte,
|
|||
timestamp := time.Now().Unix()
|
||||
|
||||
var prevSig []byte
|
||||
if len(group.Timeline.Messages) > 0 {
|
||||
prevSig = group.Timeline.Messages[len(group.Timeline.Messages)-1].Signature
|
||||
if len(group.Timeline.messages) > 0 {
|
||||
prevSig = group.Timeline.messages[len(group.Timeline.messages)-1].Signature
|
||||
} else {
|
||||
prevSig = group.SignedGroupID
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ func (mp *Monitors) report() {
|
|||
|
||||
fmt.Fprintf(w, "Uptime: %v\n\n", time.Now().Sub(mp.starttime))
|
||||
|
||||
fmt.Fprintln(w, "Messages:")
|
||||
fmt.Fprintln(w, "messages:")
|
||||
mp.Messages.Report(w)
|
||||
|
||||
fmt.Fprintln(w, "\nClient Connections:")
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestServerInstance(t *testing.T) {
|
|||
res := si.HandleFetchRequest()
|
||||
|
||||
if len(res) != 1 {
|
||||
t.Errorf("Expected 1 Group Messages Instead got %v", res)
|
||||
t.Errorf("Expected 1 Group messages Instead got %v", res)
|
||||
}
|
||||
|
||||
// ra.HandleApplicationInstance(ai)
|
||||
|
|
|
@ -2,10 +2,14 @@ package storage
|
|||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
"golang.org/x/crypto/pbkdf2"
|
||||
"golang.org/x/crypto/sha3"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
)
|
||||
|
||||
// createKey derives a key from a password
|
||||
|
@ -21,3 +25,46 @@ func createKey(password string) ([32]byte, [128]byte, error) {
|
|||
copy(dkr[:], dk)
|
||||
return dkr, salt, nil
|
||||
}
|
||||
|
||||
//encryptFileData encrypts the cwtchPeer via the specified key.
|
||||
func encryptFileData(data []byte, key [32]byte) ([]byte, error) {
|
||||
var nonce [24]byte
|
||||
|
||||
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
|
||||
log.Errorf("Cannot read from random: %v\n", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encrypted := secretbox.Seal(nonce[:], data, &nonce, &key)
|
||||
return encrypted, nil
|
||||
}
|
||||
|
||||
//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key.
|
||||
func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
|
||||
var decryptNonce [24]byte
|
||||
copy(decryptNonce[:], ciphertext[:24])
|
||||
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key)
|
||||
if ok {
|
||||
return decrypted, nil
|
||||
}
|
||||
return nil, errors.New("Failed to decrypt")
|
||||
}
|
||||
|
||||
// Load instantiates a cwtchPeer from the file store
|
||||
func readEncryptedFile(directory, filename, password string) ([]byte, error) {
|
||||
encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename))
|
||||
if err == nil {
|
||||
var dkr [32]byte
|
||||
//Separate the salt from the encrypted bytes, then generate the derived key
|
||||
salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:]
|
||||
dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512)
|
||||
copy(dkr[:], dk)
|
||||
|
||||
data, err := decryptFile(encryptedbytes, dkr)
|
||||
if err == nil {
|
||||
return data, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,20 +1,15 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
"golang.org/x/crypto/pbkdf2"
|
||||
"golang.org/x/crypto/sha3"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
)
|
||||
|
||||
// fileStore stores a cwtchPeer in an encrypted file
|
||||
type fileStore struct {
|
||||
filename string
|
||||
password string
|
||||
directory string
|
||||
filename string
|
||||
password string
|
||||
}
|
||||
|
||||
// FileStore is a primitive around storing encrypted files
|
||||
|
@ -24,14 +19,15 @@ type FileStore interface {
|
|||
}
|
||||
|
||||
// NewFileStore instantiates a fileStore given a filename and a password
|
||||
func NewFileStore(filename string, password string) FileStore {
|
||||
func NewFileStore(directory string, filename string, password string) FileStore {
|
||||
filestore := new(fileStore)
|
||||
filestore.password = password
|
||||
filestore.filename = filename
|
||||
filestore.directory = directory
|
||||
return filestore
|
||||
}
|
||||
|
||||
// Save serializes a cwtchPeer to a file
|
||||
// save serializes a cwtchPeer to a file
|
||||
func (fps *fileStore) Save(data []byte) error {
|
||||
key, salt, _ := createKey(fps.password)
|
||||
encryptedbytes, err := encryptFileData(data, key)
|
||||
|
@ -41,50 +37,11 @@ func (fps *fileStore) Save(data []byte) error {
|
|||
|
||||
// the salt for the derived key is appended to the front of the file
|
||||
encryptedbytes = append(salt[:], encryptedbytes...)
|
||||
err = ioutil.WriteFile(fps.filename, encryptedbytes, 0600)
|
||||
err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600)
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
//encryptFileData encrypts the cwtchPeer via the specified key.
|
||||
func encryptFileData(data []byte, key [32]byte) ([]byte, error) {
|
||||
var nonce [24]byte
|
||||
|
||||
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
|
||||
log.Errorf("Cannot read from random: %v\n", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encrypted := secretbox.Seal(nonce[:], data, &nonce, &key)
|
||||
return encrypted, nil
|
||||
}
|
||||
|
||||
//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key.
|
||||
func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
|
||||
var decryptNonce [24]byte
|
||||
copy(decryptNonce[:], ciphertext[:24])
|
||||
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key)
|
||||
if ok {
|
||||
return decrypted, nil
|
||||
}
|
||||
return nil, fmt.Errorf("Failed to decrypt")
|
||||
}
|
||||
|
||||
// Load instantiates a cwtchPeer from the file store
|
||||
func (fps *fileStore) Load() ([]byte, error) {
|
||||
encryptedbytes, err := ioutil.ReadFile(fps.filename)
|
||||
if err == nil {
|
||||
var dkr [32]byte
|
||||
//Separate the salt from the encrypted bytes, then generate the derived key
|
||||
salt, encryptedbytes := encryptedbytes[0:128], encryptedbytes[128:]
|
||||
dk := pbkdf2.Key([]byte(fps.password), salt, 4096, 32, sha3.New512)
|
||||
copy(dkr[:], dk)
|
||||
|
||||
data, err := decryptFile(encryptedbytes, dkr)
|
||||
if err == nil {
|
||||
return data, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
return readEncryptedFile(fps.directory, fps.filename, fps.password)
|
||||
}
|
||||
|
|
|
@ -3,11 +3,20 @@ package storage
|
|||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/protocol"
|
||||
"encoding/json"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
const profileFilename = "profile"
|
||||
|
||||
type profileStore struct {
|
||||
fs FileStore
|
||||
streamStores map[string]StreamStore
|
||||
directory string
|
||||
password string
|
||||
profile *model.Profile
|
||||
eventManager *event.Manager
|
||||
queue *event.Queue
|
||||
|
@ -15,32 +24,37 @@ type profileStore struct {
|
|||
|
||||
// ProfileStore is an interface to managing the storage of Cwtch Profiles
|
||||
type ProfileStore interface {
|
||||
Save() error
|
||||
Init(name string)
|
||||
Init(name string) error
|
||||
Load() error
|
||||
Shutdown()
|
||||
GetProfileCopy() *model.Profile
|
||||
}
|
||||
|
||||
// NewProfileStore returns a profile store backed by a filestore listening for events and saving them
|
||||
func NewProfileStore(eventManager *event.Manager, filename string, password string) (ProfileStore, error) {
|
||||
ps := &profileStore{fs: NewFileStore(filename, password), profile: nil, eventManager: eventManager}
|
||||
err := ps.Load()
|
||||
if err == nil {
|
||||
ps.queue = event.NewEventQueue(100)
|
||||
go ps.eventHandler()
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func NewProfileStore(eventManager *event.Manager, directory, password string) ProfileStore {
|
||||
os.Mkdir(directory, 0700)
|
||||
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}}
|
||||
ps.queue = event.NewEventQueue(100)
|
||||
go ps.eventHandler()
|
||||
|
||||
ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
|
||||
}
|
||||
return ps, err
|
||||
ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel)
|
||||
|
||||
return ps
|
||||
}
|
||||
|
||||
func (ps *profileStore) Init(name string) {
|
||||
func (ps *profileStore) Init(name string) error {
|
||||
ps.profile = model.GenerateNewProfile(name)
|
||||
ps.Save()
|
||||
return ps.save()
|
||||
}
|
||||
|
||||
func (ps *profileStore) Save() error {
|
||||
func (ps *profileStore) save() error {
|
||||
bytes, _ := json.Marshal(ps.profile)
|
||||
return ps.fs.Save(bytes)
|
||||
}
|
||||
|
@ -57,6 +71,19 @@ func (ps *profileStore) Load() error {
|
|||
ps.profile = cp
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, profile := range cp.Contacts {
|
||||
ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
|
||||
profile.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[profile.Onion] = ss
|
||||
}
|
||||
|
||||
for _, group := range cp.Groups {
|
||||
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
group.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -72,12 +99,49 @@ func (ps *profileStore) eventHandler() {
|
|||
contact, exists := ps.profile.GetContact(ev.Data["Onion"])
|
||||
if exists {
|
||||
contact.Blocked = true
|
||||
ps.save()
|
||||
}
|
||||
case event.SetProfileName:
|
||||
ps.profile.Name = ev.Data[event.ProfileName]
|
||||
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
|
||||
ps.save()
|
||||
|
||||
case event.SetAttribute:
|
||||
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
case event.SetPeerAttribute:
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
}
|
||||
case event.SetGroupAttribute:
|
||||
group, exists := ps.profile.Groups[ev.Data[event.GroupID]]
|
||||
if exists {
|
||||
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
}
|
||||
case event.NewGroupInvite:
|
||||
var gci protocol.CwtchPeerPacket //protocol.GroupChatInvite
|
||||
proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &gci)
|
||||
groupInvite := gci.GroupChatInvite
|
||||
ps.profile.ProcessInvite(groupInvite, ev.Data[event.RemotePeer])
|
||||
ps.save()
|
||||
group := ps.profile.Groups[groupInvite.GetGroupName()]
|
||||
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
|
||||
case event.NewMessageFromGroup:
|
||||
groupid := ev.Data[event.GroupID]
|
||||
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
|
||||
// TODO: Sig, prev message Sig
|
||||
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer]}
|
||||
//ps.profile.Groups[groupid].AddMessage(message) <- wants protocol.DecryptedGroupMessage so group.Timeline will drift here from launch when it's initialized
|
||||
ps.streamStores[groupid].Write(message)
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
ps.Save()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const testProfileName = "Alice"
|
||||
const testKey = "key"
|
||||
const testVal = "value"
|
||||
const testInitialMessage = "howdy"
|
||||
const testMessage = "Hello from storage"
|
||||
|
||||
func TestProfileStoreWriteRead(t *testing.T) {
|
||||
os.RemoveAll(testingDir)
|
||||
eventBus := new(event.Manager)
|
||||
eventBus.Initialize()
|
||||
ps1 := NewProfileStore(eventBus, testingDir, password)
|
||||
|
||||
err := ps1.Init(testProfileName)
|
||||
if err != nil {
|
||||
t.Errorf("Error initializing profileStore: %v\n", err)
|
||||
}
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
groupid, invite, err := ps1.GetProfileCopy().StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
|
||||
if err != nil {
|
||||
t.Errorf("Creating group: %v\n", err)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Creating group invite: %v\n", err)
|
||||
}
|
||||
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy().Onion, event.GroupInvite: string(invite)}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: ps1.GetProfileCopy().Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
ps1.Shutdown()
|
||||
|
||||
ps2 := NewProfileStore(eventBus, testingDir, password)
|
||||
err = ps2.Load()
|
||||
if err != nil {
|
||||
t.Errorf("Error createing profileStore: %v\n", err)
|
||||
}
|
||||
|
||||
profile := ps2.GetProfileCopy()
|
||||
if profile.Name != testProfileName {
|
||||
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
|
||||
}
|
||||
|
||||
v, _ := profile.GetAttribute(testKey)
|
||||
if v != testVal {
|
||||
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
|
||||
}
|
||||
|
||||
group2 := ps2.GetProfileCopy().Groups[groupid]
|
||||
if group2 == nil {
|
||||
t.Errorf("Group not loaded\n")
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"cwtch.im/cwtch/model"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
fileStorePartitions = 16
|
||||
bytesPerFile = 15 * 1024
|
||||
)
|
||||
|
||||
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
|
||||
type streamStore struct {
|
||||
password string
|
||||
|
||||
storeDirectory string
|
||||
filenameBase string
|
||||
|
||||
messages []model.Message
|
||||
bufferByteCount int
|
||||
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// StreamStore provides a stream like interface to encrypted storage
|
||||
type StreamStore interface {
|
||||
Write(message model.Message)
|
||||
Read() []model.Message
|
||||
}
|
||||
|
||||
// NewStreamStore returns an initialized StreamStore ready for reading and writing
|
||||
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
|
||||
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
|
||||
os.Mkdir(ss.storeDirectory, 0700)
|
||||
|
||||
ss.initBuffer()
|
||||
ss.initBufferFromStorage()
|
||||
|
||||
return ss
|
||||
}
|
||||
|
||||
func (ss *streamStore) initBuffer() {
|
||||
ss.messages = []model.Message{}
|
||||
ss.bufferByteCount = 0
|
||||
}
|
||||
|
||||
func (ss *streamStore) initBufferFromStorage() error {
|
||||
filename := path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0))
|
||||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
log.Errorf("StreamStore could not open: %v: %v", filename, err)
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
for scanner.Scan() {
|
||||
mText := scanner.Text()
|
||||
m := model.Message{}
|
||||
err := json.Unmarshal([]byte(mText), &m)
|
||||
if err == nil {
|
||||
ss.updateBuffer(m)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *streamStore) updateBuffer(m model.Message) {
|
||||
ss.messages = append(ss.messages, m)
|
||||
ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message)
|
||||
}
|
||||
|
||||
func (ss *streamStore) updateFile() error {
|
||||
msgs, err := json.Marshal(ss.messages)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to marshal group messages %v\n", err)
|
||||
}
|
||||
|
||||
// ENCRYPT
|
||||
key, salt, _ := createKey(ss.password)
|
||||
encryptedMsgs, err := encryptFileData(msgs, key)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to encrypt messages: %v\n", err)
|
||||
return err
|
||||
}
|
||||
encryptedMsgs = append(salt[:], encryptedMsgs...)
|
||||
|
||||
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *streamStore) rotateFileStore() {
|
||||
os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))
|
||||
|
||||
for i := fileStorePartitions - 2; i >= 0; i-- {
|
||||
os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
|
||||
}
|
||||
}
|
||||
|
||||
// FetchMessages returns all messages from the backing file.
|
||||
func (ss *streamStore) Read() (messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
||||
resp := []model.Message{}
|
||||
|
||||
for i := fileStorePartitions - 1; i >= 0; i-- {
|
||||
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
|
||||
|
||||
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
msgs := []model.Message{}
|
||||
err = json.Unmarshal([]byte(bytes), &msgs)
|
||||
if err == nil {
|
||||
resp = append(resp, msgs...)
|
||||
}
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
// AddMessage adds a GroupMessage to the store
|
||||
func (ss *streamStore) Write(m model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
ss.updateBuffer(m)
|
||||
ss.updateFile()
|
||||
|
||||
if ss.bufferByteCount > bytesPerFile {
|
||||
ss.rotateFileStore()
|
||||
ss.initBuffer()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const testingDir = "./testing"
|
||||
const filenameBase = "testStream"
|
||||
const password = "asdfqwer"
|
||||
const line1 = "Hello from storage!"
|
||||
|
||||
func TestStreamStoreWriteRead(t *testing.T) {
|
||||
os.RemoveAll(testingDir)
|
||||
os.Mkdir(testingDir, 0777)
|
||||
ss1 := NewStreamStore(testingDir, filenameBase, password)
|
||||
m := model.Message{Message: line1}
|
||||
ss1.Write(m)
|
||||
|
||||
ss2 := NewStreamStore(testingDir, filenameBase, password)
|
||||
messages := ss2.Read()
|
||||
if len(messages) != 1 {
|
||||
t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages))
|
||||
}
|
||||
if messages[0].Message != line1 {
|
||||
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
|
||||
}
|
||||
}
|
|
@ -19,6 +19,6 @@ gofmt -l -s -w .
|
|||
echo "Checking for ineffectual assignment of errors (unchecked errors...)"
|
||||
ineffassign .
|
||||
|
||||
# misspell (https://github.com/client9/misspell)
|
||||
# misspell (https://github.com/client9/misspell/cmd/misspell)
|
||||
echo "Checking for misspelled words..."
|
||||
misspell . | grep -v "vendor/" | grep -v "go.sum" | grep -v ".idea"
|
||||
|
|
Loading…
Reference in New Issue