Merge branch 'pwv1' of dan/cwtch into master
the build was successful
Details
the build was successful
Details
This commit is contained in:
commit
958dbfd211
|
@ -114,7 +114,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin
|
|||
return
|
||||
}
|
||||
|
||||
profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
|
||||
profileStore := storage.CreateProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
|
||||
app.storage[profile.Onion] = profileStore
|
||||
|
||||
pc := app.storage[profile.Onion].GetProfileCopy(true)
|
||||
|
@ -182,8 +182,7 @@ func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProf
|
|||
|
||||
for _, file := range files {
|
||||
eventBus := event.NewEventManager()
|
||||
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password, nil)
|
||||
err = profileStore.Load()
|
||||
profileStore, err := storage.LoadProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -42,9 +42,10 @@ func (ac *applicationClient) handleEvent(ev *event.Event) {
|
|||
switch ev.EventType {
|
||||
case event.NewPeer:
|
||||
localID := ev.Data[event.Identity]
|
||||
password := ev.Data[event.Password]
|
||||
reload := ev.Data[event.Status] == "running"
|
||||
ac.newPeer(localID, password, reload)
|
||||
key := ev.Data[event.Key]
|
||||
salt := ev.Data[event.Salt]
|
||||
reload := ev.Data[event.Status] == event.StorageRunning
|
||||
ac.newPeer(localID, key, salt, reload)
|
||||
case event.DeletePeer:
|
||||
onion := ev.Data[event.Identity]
|
||||
ac.handleDeletedPeer(onion)
|
||||
|
@ -59,8 +60,12 @@ func (ac *applicationClient) handleEvent(ev *event.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (ac *applicationClient) newPeer(localID, password string, reload bool) {
|
||||
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password)
|
||||
func (ac *applicationClient) newPeer(localID, key, salt string, reload bool) {
|
||||
var keyBytes [32]byte
|
||||
var saltBytes [128]byte
|
||||
copy(keyBytes[:], key)
|
||||
copy(saltBytes[:], salt)
|
||||
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), keyBytes, saltBytes)
|
||||
if err != nil {
|
||||
log.Errorf("Could not read profile for NewPeer event: %v\n", err)
|
||||
ac.appBus.Publish(event.NewEventList(event.PeerError, event.Error, fmt.Sprintf("Could not read profile for NewPeer event: %v\n", err)))
|
||||
|
|
|
@ -65,7 +65,9 @@ func (as *applicationService) handleEvent(ev *event.Event) {
|
|||
as.loadProfiles(password)
|
||||
case event.ReloadClient:
|
||||
for _, storage := range as.storage {
|
||||
message := event.IPCMessage{Dest: DestApp, Message: *storage.GetNewPeerMessage()}
|
||||
peerMsg := *storage.GetNewPeerMessage()
|
||||
peerMsg.Data[event.Status] = event.StorageRunning
|
||||
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
|
||||
as.bridge.Write(&message)
|
||||
}
|
||||
|
||||
|
@ -103,7 +105,7 @@ func (as *applicationService) createPeer(name, password, tag string) {
|
|||
profile.SetAttribute(AttributeTag, tag)
|
||||
}
|
||||
|
||||
profileStore := storage.NewProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile)
|
||||
profileStore := storage.CreateProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile)
|
||||
|
||||
blockedPeers := profile.BlockedPeers()
|
||||
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
|
||||
|
@ -113,7 +115,9 @@ func (as *applicationService) createPeer(name, password, tag string) {
|
|||
as.storage[profile.Onion] = profileStore
|
||||
as.engines[profile.Onion] = engine
|
||||
|
||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})}
|
||||
peerMsg := *profileStore.GetNewPeerMessage()
|
||||
peerMsg.Data[event.Status] = event.StorageNew
|
||||
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
|
||||
as.bridge.Write(&message)
|
||||
}
|
||||
|
||||
|
@ -129,7 +133,10 @@ func (as *applicationService) loadProfiles(password string) {
|
|||
as.storage[profile.Onion] = profileStore
|
||||
as.engines[profile.Onion] = engine
|
||||
as.asmutex.Unlock()
|
||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.LocalID, event.Password: password})}
|
||||
|
||||
peerMsg := *profileStore.GetNewPeerMessage()
|
||||
peerMsg.Data[event.Status] = event.StorageNew
|
||||
message := event.IPCMessage{Dest: DestApp, Message: peerMsg}
|
||||
as.bridge.Write(&message)
|
||||
count++
|
||||
})
|
||||
|
|
|
@ -142,7 +142,7 @@ const (
|
|||
CreatePeer = Type("CreatePeer")
|
||||
|
||||
// service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')]
|
||||
// app -> Identity(onion)
|
||||
// app -> Key, Salt
|
||||
NewPeer = Type("NewPeer")
|
||||
|
||||
// Identity(onion)
|
||||
|
@ -184,6 +184,10 @@ const (
|
|||
// Error: Description of the Error
|
||||
// Onion: the local onion we attempt to check
|
||||
NetworkStatus = Type("NetworkError")
|
||||
|
||||
// For debugging. Allows test to emit a Syn and get a response Ack(eventID) when the subsystem is done processing a queue
|
||||
Syn = Type("Syn")
|
||||
Ack = Type("Ack")
|
||||
)
|
||||
|
||||
// Field defines common event attributes
|
||||
|
@ -217,6 +221,8 @@ const (
|
|||
Key = Field("Key")
|
||||
Data = Field("Data")
|
||||
|
||||
Salt = Field("Salt")
|
||||
|
||||
Error = Field("Error")
|
||||
|
||||
Progreess = Field("Progress")
|
||||
|
@ -230,6 +236,12 @@ const (
|
|||
AppErrLoaded0 = "Loaded 0 profiles"
|
||||
)
|
||||
|
||||
// Values to be suplied in event.NewPeer for Status
|
||||
const (
|
||||
StorageRunning = "running"
|
||||
StorageNew = "new"
|
||||
)
|
||||
|
||||
// Defining Protocol Contexts
|
||||
const (
|
||||
ContextAck = "im.cwtch.acknowledgement"
|
||||
|
|
|
@ -9,13 +9,20 @@ func TestCounter(t *testing.T) {
|
|||
starttime := time.Now()
|
||||
c := NewCounter()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
max := 100
|
||||
done := make(chan bool, max)
|
||||
|
||||
// slightly stress test atomic nature of metric by flooding with threads Add()ing
|
||||
for i := 0; i < max; i++ {
|
||||
go func() {
|
||||
c.Add(1)
|
||||
done <- true
|
||||
}()
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
for i := 0; i < max; i++ {
|
||||
<- done
|
||||
}
|
||||
|
||||
val := c.Count()
|
||||
if val != 100 {
|
||||
|
|
|
@ -3,30 +3,20 @@ package storage
|
|||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"encoding/json"
|
||||
"cwtch.im/cwtch/storage/v0"
|
||||
"cwtch.im/cwtch/storage/v1"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"os"
|
||||
"time"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const groupIDLen = 32
|
||||
const peerIDLen = 56
|
||||
const profileFilename = "profile"
|
||||
|
||||
type profileStore struct {
|
||||
fs FileStore
|
||||
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
|
||||
directory string
|
||||
password string
|
||||
profile *model.Profile
|
||||
eventManager event.Manager
|
||||
queue event.Queue
|
||||
writer bool
|
||||
}
|
||||
const versionFile = "VERSION"
|
||||
const currentVersion = 1
|
||||
|
||||
// ProfileStore is an interface to managing the storage of Cwtch Profiles
|
||||
type ProfileStore interface {
|
||||
Load() error
|
||||
Shutdown()
|
||||
Delete()
|
||||
GetProfileCopy(timeline bool) *model.Profile
|
||||
|
@ -34,52 +24,25 @@ type ProfileStore interface {
|
|||
GetStatusMessages() []*event.Event
|
||||
}
|
||||
|
||||
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
|
||||
// CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
|
||||
os.Mkdir(directory, 0700)
|
||||
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
|
||||
//ps.queue = event.NewQueue(100)
|
||||
ps.queue = event.NewQueue()
|
||||
if profile != nil {
|
||||
ps.save()
|
||||
}
|
||||
go ps.eventHandler()
|
||||
|
||||
ps.eventManager.Subscribe(event.BlockPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.UnblockPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
|
||||
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
|
||||
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
|
||||
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
|
||||
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
|
||||
ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
|
||||
|
||||
return ps
|
||||
func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
|
||||
return v1.CreateProfileWriterStore(eventManager, directory, password, profile)
|
||||
}
|
||||
|
||||
// ReadProfile reads a profile from storqage and returns the profile
|
||||
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func ReadProfile(directory, password string) (*model.Profile, error) {
|
||||
os.Mkdir(directory, 0700)
|
||||
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
|
||||
func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (ProfileStore, error) {
|
||||
versionCheckUpgrade(directory, password)
|
||||
|
||||
err := ps.Load()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return v1.LoadProfileWriterStore(eventManager, directory, password)
|
||||
}
|
||||
|
||||
profile := ps.GetProfileCopy(true)
|
||||
|
||||
return profile, nil
|
||||
// ReadProfile reads a profile from storage and returns the profile
|
||||
// Should only be called for cache refresh of the profile after a ProfileWriterStore has opened
|
||||
// (and upgraded) the store, and thus supplied the key/salt
|
||||
func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) {
|
||||
return v1.ReadProfile(directory, key, salt)
|
||||
}
|
||||
|
||||
// NewProfile creates a new profile for use in the profile store.
|
||||
|
@ -88,254 +51,43 @@ func NewProfile(name string) *model.Profile {
|
|||
return profile
|
||||
}
|
||||
|
||||
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
|
||||
func (ps *profileStore) GetNewPeerMessage() *event.Event {
|
||||
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
|
||||
return &message
|
||||
// ********* Versioning and upgrade **********
|
||||
|
||||
func detectVersion(directory string) int {
|
||||
vnumberStr, err := ioutil.ReadFile(path.Join(directory, versionFile))
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
vnumber, err := strconv.Atoi(string(vnumberStr))
|
||||
if err != nil {
|
||||
log.Errorf("Could not parse VERSION file contents: '%v' - %v\n", vnumber, err)
|
||||
return -1
|
||||
}
|
||||
return vnumber
|
||||
}
|
||||
|
||||
func (ps *profileStore) GetStatusMessages() []*event.Event {
|
||||
messages := []*event.Event{}
|
||||
for _, contact := range ps.profile.Contacts {
|
||||
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
||||
event.RemotePeer: string(contact.Onion),
|
||||
event.ConnectionState: contact.State,
|
||||
})
|
||||
messages = append(messages, &message)
|
||||
}
|
||||
|
||||
doneServers := make(map[string]bool)
|
||||
for _, group := range ps.profile.Groups {
|
||||
if _, exists := doneServers[group.GroupServer]; !exists {
|
||||
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: string(group.GroupServer),
|
||||
event.ConnectionState: group.State,
|
||||
})
|
||||
messages = append(messages, &message)
|
||||
doneServers[group.GroupServer] = true
|
||||
}
|
||||
}
|
||||
|
||||
return messages
|
||||
}
|
||||
|
||||
func (ps *profileStore) ChangePassword(oldpass, newpass, eventID string) {
|
||||
if oldpass != ps.password {
|
||||
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID))
|
||||
return
|
||||
}
|
||||
|
||||
newStreamStores := map[string]StreamStore{}
|
||||
idToNewLocalID := map[string]string{}
|
||||
|
||||
// Generate all new StreamStores with the new password and write all the old StreamStore data into these ones
|
||||
for ssid, ss := range ps.streamStores {
|
||||
// New ss with new pass and new localID
|
||||
newlocalID := model.GenerateRandomID()
|
||||
idToNewLocalID[ssid] = newlocalID
|
||||
|
||||
newSS := NewStreamStore(ps.directory, newlocalID, newpass)
|
||||
newStreamStores[ssid] = newSS
|
||||
|
||||
// write whole store
|
||||
messages := ss.Read()
|
||||
newSS.WriteN(messages)
|
||||
}
|
||||
|
||||
// Switch over
|
||||
oldStreamStores := ps.streamStores
|
||||
ps.streamStores = newStreamStores
|
||||
for ssid, newLocalID := range idToNewLocalID {
|
||||
if len(ssid) == groupIDLen {
|
||||
ps.profile.Groups[ssid].LocalID = newLocalID
|
||||
} else {
|
||||
ps.profile.Contacts[ssid].LocalID = newLocalID
|
||||
}
|
||||
}
|
||||
|
||||
ps.password = newpass
|
||||
ps.fs.ChangePassword(newpass)
|
||||
ps.save()
|
||||
|
||||
// Clean up
|
||||
for _, oldss := range oldStreamStores {
|
||||
oldss.Delete()
|
||||
}
|
||||
|
||||
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID))
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *profileStore) save() error {
|
||||
if ps.writer {
|
||||
bytes, _ := json.Marshal(ps.profile)
|
||||
return ps.fs.Write(bytes)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read instantiates a cwtchPeer from the file store
|
||||
func (ps *profileStore) Load() error {
|
||||
decrypted, err := ps.fs.Read()
|
||||
func upgradeV0ToV1(directory, password string) error {
|
||||
log.Debugln("Attempting storage v0 to v1: Reading v0 profile...")
|
||||
profile, err := v0.ReadProfile(directory, password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cp := new(model.Profile)
|
||||
err = json.Unmarshal(decrypted, &cp)
|
||||
if err == nil {
|
||||
ps.profile = cp
|
||||
|
||||
for gid, group := range cp.Groups {
|
||||
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
|
||||
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
log.Debugln("Attempting storage v0 to v1: Writing v1 profile...")
|
||||
return v1.UpgradeV0Profile(profile, directory, password)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile {
|
||||
return ps.profile.GetCopy(timeline)
|
||||
}
|
||||
|
||||
func (ps *profileStore) eventHandler() {
|
||||
log.Debugln("eventHandler()!")
|
||||
for {
|
||||
ev := ps.queue.Next()
|
||||
log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID)
|
||||
|
||||
switch ev.EventType {
|
||||
case event.BlockPeer:
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
contact.Blocked = true
|
||||
ps.save()
|
||||
}
|
||||
case event.UnblockPeer:
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
contact.Blocked = false
|
||||
ps.save()
|
||||
}
|
||||
case event.PeerCreated:
|
||||
var pp *model.PublicProfile
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
|
||||
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
|
||||
// TODO: configure - allow peers to be configured to turn on limited storage
|
||||
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
|
||||
pp.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[pp.Onion] = ss
|
||||
ps.save()*/
|
||||
case event.GroupCreated:
|
||||
var group *model.Group
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
|
||||
ps.profile.AddGroup(group)
|
||||
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
ps.save()
|
||||
case event.SetProfileName:
|
||||
ps.profile.Name = ev.Data[event.ProfileName]
|
||||
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
|
||||
ps.save()
|
||||
case event.SetAttribute:
|
||||
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
case event.SetPeerAttribute:
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
} else {
|
||||
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
|
||||
}
|
||||
case event.SetGroupAttribute:
|
||||
group := ps.profile.GetGroup(ev.Data[event.GroupID])
|
||||
if group != nil {
|
||||
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
} else {
|
||||
log.Errorf("error setting attribute on group %v group does not exist", ev)
|
||||
}
|
||||
case event.AcceptGroupInvite:
|
||||
err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
|
||||
if err == nil {
|
||||
ps.save()
|
||||
} else {
|
||||
log.Errorf("error accepting group invite")
|
||||
}
|
||||
case event.NewGroupInvite:
|
||||
gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
|
||||
log.Errorf("gid: %v err:%v\n", gid, err)
|
||||
if err == nil {
|
||||
ps.save()
|
||||
group := ps.profile.Groups[gid]
|
||||
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
} else {
|
||||
log.Errorf("error storing new group invite: %v (%v)", err, ev)
|
||||
}
|
||||
case event.NewMessageFromGroup:
|
||||
groupid := ev.Data[event.GroupID]
|
||||
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
|
||||
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
|
||||
ss, exists := ps.streamStores[groupid]
|
||||
if exists {
|
||||
ss.Write(message)
|
||||
} else {
|
||||
log.Errorf("error storing new group message: %v stream store does not exist", ev)
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
|
||||
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
|
||||
}
|
||||
case event.ServerStateChange:
|
||||
for _, group := range ps.profile.Groups {
|
||||
if group.GroupServer == ev.Data[event.GroupServer] {
|
||||
group.State = ev.Data[event.ConnectionState]
|
||||
}
|
||||
}
|
||||
case event.DeleteContact:
|
||||
onion := ev.Data[event.RemotePeer]
|
||||
ps.profile.DeleteContact(onion)
|
||||
ps.save()
|
||||
case event.DeleteGroup:
|
||||
groupID := ev.Data[event.GroupID]
|
||||
ps.profile.DeleteGroup(groupID)
|
||||
ps.save()
|
||||
ss, exists := ps.streamStores[groupID]
|
||||
if exists {
|
||||
ss.Delete()
|
||||
delete(ps.streamStores, groupID)
|
||||
}
|
||||
case event.ChangePassword:
|
||||
oldpass := ev.Data[event.Password]
|
||||
newpass := ev.Data[event.NewPassword]
|
||||
ps.ChangePassword(oldpass, newpass, ev.EventID)
|
||||
default:
|
||||
func versionCheckUpgrade(directory, password string) {
|
||||
version := detectVersion(directory)
|
||||
log.Infof("versionCheck: %v\n", version)
|
||||
if version == -1 {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *profileStore) Shutdown() {
|
||||
if ps.queue != nil {
|
||||
ps.queue.Shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *profileStore) Delete() {
|
||||
log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion)
|
||||
|
||||
for _, ss := range ps.streamStores {
|
||||
ss.Delete()
|
||||
}
|
||||
|
||||
ps.fs.Delete()
|
||||
|
||||
err := os.RemoveAll(ps.directory)
|
||||
if version == 0 {
|
||||
err := upgradeV0ToV1(directory, password)
|
||||
if err != nil {
|
||||
log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err)
|
||||
return
|
||||
}
|
||||
//version = 1
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,83 +4,35 @@ package storage
|
|||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/storage/v0"
|
||||
"fmt"
|
||||
"log"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const testingDir = "./testing"
|
||||
const filenameBase = "testStream"
|
||||
const password = "asdfqwer"
|
||||
const line1 = "Hello from storage!"
|
||||
const testProfileName = "Alice"
|
||||
const testKey = "key"
|
||||
const testVal = "value"
|
||||
const testInitialMessage = "howdy"
|
||||
const testMessage = "Hello from storage"
|
||||
|
||||
func TestProfileStoreWriteRead(t *testing.T) {
|
||||
log.Println("profile store test!")
|
||||
os.RemoveAll(testingDir)
|
||||
eventBus := event.NewEventManager()
|
||||
profile := NewProfile(testProfileName)
|
||||
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
|
||||
if err != nil {
|
||||
t.Errorf("Creating group: %v\n", err)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Creating group invite: %v\n", err)
|
||||
}
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
ps1.Shutdown()
|
||||
|
||||
ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil)
|
||||
err = ps2.Load()
|
||||
if err != nil {
|
||||
t.Errorf("Error createing profileStore: %v\n", err)
|
||||
}
|
||||
|
||||
profile = ps2.GetProfileCopy(true)
|
||||
if profile.Name != testProfileName {
|
||||
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
|
||||
}
|
||||
|
||||
v, _ := profile.GetAttribute(testKey)
|
||||
if v != testVal {
|
||||
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
|
||||
}
|
||||
|
||||
group2 := ps2.GetProfileCopy(true).Groups[groupid]
|
||||
if group2 == nil {
|
||||
t.Errorf("Group not loaded\n")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestProfileStoreChangePassword(t *testing.T) {
|
||||
func TestProfileStoreUpgradeV0toV1(t *testing.T) {
|
||||
log.SetLevel(log.LevelDebug)
|
||||
os.RemoveAll(testingDir)
|
||||
eventBus := event.NewEventManager()
|
||||
|
||||
queue := event.NewQueue()
|
||||
eventBus.Subscribe(event.ChangePasswordSuccess, queue)
|
||||
|
||||
fmt.Println("Creating and initializing v0 profile and store...")
|
||||
profile := NewProfile(testProfileName)
|
||||
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
|
||||
ps1 := v0.NewProfileWriterStore(eventBus, testingDir, password, profile)
|
||||
|
||||
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
|
||||
if err != nil {
|
||||
|
@ -90,50 +42,19 @@ func TestProfileStoreChangePassword(t *testing.T) {
|
|||
t.Errorf("Creating group invite: %v\n", err)
|
||||
}
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
|
||||
time.Sleep(1 * time.Second)
|
||||
ps1.AddGroup(invite, profile.Onion)
|
||||
|
||||
fmt.Println("Sending 200 messages...")
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage)
|
||||
}
|
||||
|
||||
newPass := "qwerty123"
|
||||
|
||||
fmt.Println("Sending Change Passwords event...")
|
||||
eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass))
|
||||
|
||||
ev := queue.Next()
|
||||
if ev.EventType != event.ChangePasswordSuccess {
|
||||
t.Errorf("Unexpected event response detected %v\n", ev.EventType)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("Sending 10 more messages...")
|
||||
for i := 0; i < 10; i++ {
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
fmt.Println("Shutdown profile store...")
|
||||
fmt.Println("Shutdown v0 profile store...")
|
||||
ps1.Shutdown()
|
||||
|
||||
fmt.Println("New Profile store...")
|
||||
ps2 := NewProfileWriterStore(eventBus, testingDir, newPass, nil)
|
||||
err = ps2.Load()
|
||||
fmt.Println("New v1 Profile store...")
|
||||
ps2, err := LoadProfileWriterStore(eventBus, testingDir, password)
|
||||
if err != nil {
|
||||
t.Errorf("Error createing new profileStore with new password: %v\n", err)
|
||||
return
|
||||
|
@ -146,7 +67,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
if len(profile2.Groups[groupid].Timeline.Messages) != 210 {
|
||||
t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
|
||||
if len(profile2.Groups[groupid].Timeline.Messages) != 200 {
|
||||
t.Errorf("Failed to load group's 200 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package v0
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
|
@ -1,9 +1,7 @@
|
|||
package storage
|
||||
package v0
|
||||
|
||||
import (
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
|
@ -16,10 +14,8 @@ type fileStore struct {
|
|||
|
||||
// FileStore is a primitive around storing encrypted files
|
||||
type FileStore interface {
|
||||
Write([]byte) error
|
||||
Read() ([]byte, error)
|
||||
Delete()
|
||||
ChangePassword(newpass string)
|
||||
Write(data []byte) error
|
||||
}
|
||||
|
||||
// NewFileStore instantiates a fileStore given a filename and a password
|
||||
|
@ -31,6 +27,10 @@ func NewFileStore(directory string, filename string, password string) FileStore
|
|||
return filestore
|
||||
}
|
||||
|
||||
func (fps *fileStore) Read() ([]byte, error) {
|
||||
return readEncryptedFile(fps.directory, fps.filename, fps.password)
|
||||
}
|
||||
|
||||
// write serializes a cwtchPeer to a file
|
||||
func (fps *fileStore) Write(data []byte) error {
|
||||
key, salt, _ := createKey(fps.password)
|
||||
|
@ -44,18 +44,3 @@ func (fps *fileStore) Write(data []byte) error {
|
|||
err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600)
|
||||
return err
|
||||
}
|
||||
|
||||
func (fps *fileStore) Read() ([]byte, error) {
|
||||
return readEncryptedFile(fps.directory, fps.filename, fps.password)
|
||||
}
|
||||
|
||||
func (fps *fileStore) Delete() {
|
||||
err := os.Remove(path.Join(fps.directory, fps.filename))
|
||||
if err != nil {
|
||||
log.Errorf("Deleting file %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (fps *fileStore) ChangePassword(newpass string) {
|
||||
fps.password = newpass
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
package v0
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
const groupIDLen = 32
|
||||
const peerIDLen = 56
|
||||
const profileFilename = "profile"
|
||||
|
||||
// ProfileStoreV0 is a legacy profile store used now for upgrading legacy profile stores to newer versions
|
||||
type ProfileStoreV0 struct {
|
||||
fs FileStore
|
||||
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
|
||||
directory string
|
||||
password string
|
||||
profile *model.Profile
|
||||
}
|
||||
|
||||
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV0 {
|
||||
os.Mkdir(directory, 0700)
|
||||
ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, streamStores: map[string]StreamStore{}}
|
||||
if profile != nil {
|
||||
ps.save()
|
||||
}
|
||||
|
||||
return ps
|
||||
}
|
||||
|
||||
// ReadProfile reads a profile from storqage and returns the profile
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func ReadProfile(directory, password string) (*model.Profile, error) {
|
||||
os.Mkdir(directory, 0700)
|
||||
ps := &ProfileStoreV0{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, streamStores: map[string]StreamStore{}}
|
||||
|
||||
err := ps.Load()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
profile := ps.getProfileCopy(true)
|
||||
|
||||
return profile, nil
|
||||
}
|
||||
|
||||
/********************************************************************************************/
|
||||
|
||||
// AddGroup For testing, adds a group to the profile (and startsa stream store)
|
||||
func (ps *ProfileStoreV0) AddGroup(invite []byte, peer string) {
|
||||
gid, err := ps.profile.ProcessInvite(string(invite), peer)
|
||||
if err == nil {
|
||||
ps.save()
|
||||
group := ps.profile.Groups[gid]
|
||||
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
}
|
||||
}
|
||||
|
||||
// AddGroupMessage for testing, adds a group message
|
||||
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string) {
|
||||
received, _ := time.Parse(time.RFC3339Nano, timeRecvied)
|
||||
sent, _ := time.Parse(time.RFC3339Nano, timeSent)
|
||||
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: []byte("signature"), PreviousMessageSig: []byte("PreviousSignature")}
|
||||
ss, exists := ps.streamStores[groupid]
|
||||
if exists {
|
||||
ss.Write(message)
|
||||
} else {
|
||||
fmt.Println("ERROR")
|
||||
}
|
||||
}
|
||||
|
||||
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
|
||||
func (ps *ProfileStoreV0) GetNewPeerMessage() *event.Event {
|
||||
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
|
||||
return &message
|
||||
}
|
||||
|
||||
// Load instantiates a cwtchPeer from the file store
|
||||
func (ps *ProfileStoreV0) Load() error {
|
||||
decrypted, err := ps.fs.Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cp := new(model.Profile)
|
||||
err = json.Unmarshal(decrypted, &cp)
|
||||
if err == nil {
|
||||
ps.profile = cp
|
||||
|
||||
for gid, group := range cp.Groups {
|
||||
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
|
||||
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ps *ProfileStoreV0) getProfileCopy(timeline bool) *model.Profile {
|
||||
return ps.profile.GetCopy(timeline)
|
||||
}
|
||||
|
||||
// Shutdown saves the storage system
|
||||
func (ps *ProfileStoreV0) Shutdown() {
|
||||
ps.save()
|
||||
}
|
||||
|
||||
/************* Writing *************/
|
||||
|
||||
func (ps *ProfileStoreV0) save() error {
|
||||
bytes, _ := json.Marshal(ps.profile)
|
||||
return ps.fs.Write(bytes)
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
// Known race issue with event bus channel closure
|
||||
|
||||
package v0
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const testProfileName = "Alice"
|
||||
const testKey = "key"
|
||||
const testVal = "value"
|
||||
const testInitialMessage = "howdy"
|
||||
const testMessage = "Hello from storage"
|
||||
|
||||
// NewProfile creates a new profile for use in the profile store.
|
||||
func NewProfile(name string) *model.Profile {
|
||||
profile := model.GenerateNewProfile(name)
|
||||
return profile
|
||||
}
|
||||
|
||||
func TestProfileStoreWriteRead(t *testing.T) {
|
||||
log.Println("profile store test!")
|
||||
os.RemoveAll(testingDir)
|
||||
eventBus := event.NewEventManager()
|
||||
profile := NewProfile(testProfileName)
|
||||
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
|
||||
|
||||
profile.SetAttribute(testKey, testVal)
|
||||
|
||||
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
|
||||
if err != nil {
|
||||
t.Errorf("Creating group: %v\n", err)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Creating group invite: %v\n", err)
|
||||
}
|
||||
|
||||
ps1.AddGroup(invite, profile.Onion)
|
||||
|
||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage)
|
||||
|
||||
ps1.Shutdown()
|
||||
|
||||
ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil)
|
||||
err = ps2.Load()
|
||||
if err != nil {
|
||||
t.Errorf("Error createing ProfileStoreV0: %v\n", err)
|
||||
}
|
||||
|
||||
profile = ps2.getProfileCopy(true)
|
||||
if profile.Name != testProfileName {
|
||||
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
|
||||
}
|
||||
|
||||
v, _ := profile.GetAttribute(testKey)
|
||||
if v != testVal {
|
||||
t.Errorf("Profile attribute '%v' incorrect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
|
||||
}
|
||||
|
||||
group2 := ps2.getProfileCopy(true).Groups[groupid]
|
||||
if group2 == nil {
|
||||
t.Errorf("Group not loaded\n")
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
package v0
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
fileStorePartitions = 16
|
||||
bytesPerFile = 15 * 1024
|
||||
)
|
||||
|
||||
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
|
||||
type streamStore struct {
|
||||
password string
|
||||
|
||||
storeDirectory string
|
||||
filenameBase string
|
||||
|
||||
lock sync.Mutex
|
||||
|
||||
// Buffer is used just for current file to write to
|
||||
messages []model.Message
|
||||
bufferByteCount int
|
||||
}
|
||||
|
||||
// StreamStore provides a stream like interface to encrypted storage
|
||||
type StreamStore interface {
|
||||
Read() []model.Message
|
||||
Write(m model.Message)
|
||||
}
|
||||
|
||||
// NewStreamStore returns an initialized StreamStore ready for reading and writing
|
||||
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
|
||||
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
|
||||
os.Mkdir(ss.storeDirectory, 0700)
|
||||
|
||||
ss.initBuffer()
|
||||
|
||||
return ss
|
||||
}
|
||||
|
||||
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
|
||||
func (ss *streamStore) Read() (messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
||||
resp := []model.Message{}
|
||||
|
||||
for i := fileStorePartitions - 1; i >= 0; i-- {
|
||||
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
|
||||
|
||||
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
msgs := []model.Message{}
|
||||
json.Unmarshal([]byte(bytes), &msgs)
|
||||
resp = append(resp, msgs...)
|
||||
}
|
||||
|
||||
// 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
|
||||
for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
|
||||
resp[i].Acknowledged = true
|
||||
resp[i].ReceivedByServer = true
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
// ****** Writing *******/
|
||||
|
||||
func (ss *streamStore) WriteN(messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
||||
for _, m := range messages {
|
||||
ss.updateBuffer(m)
|
||||
|
||||
if ss.bufferByteCount > bytesPerFile {
|
||||
ss.updateFile()
|
||||
log.Debugf("rotating log file")
|
||||
ss.rotateFileStore()
|
||||
ss.initBuffer()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write adds a GroupMessage to the store
|
||||
func (ss *streamStore) Write(m model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
ss.updateBuffer(m)
|
||||
ss.updateFile()
|
||||
|
||||
if ss.bufferByteCount > bytesPerFile {
|
||||
log.Debugf("rotating log file")
|
||||
ss.rotateFileStore()
|
||||
ss.initBuffer()
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *streamStore) initBuffer() {
|
||||
ss.messages = []model.Message{}
|
||||
ss.bufferByteCount = 0
|
||||
}
|
||||
|
||||
func (ss *streamStore) updateBuffer(m model.Message) {
|
||||
ss.messages = append(ss.messages, m)
|
||||
ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message)
|
||||
}
|
||||
|
||||
func (ss *streamStore) updateFile() error {
|
||||
msgs, err := json.Marshal(ss.messages)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to marshal group messages %v\n", err)
|
||||
}
|
||||
|
||||
// ENCRYPT
|
||||
key, salt, _ := createKey(ss.password)
|
||||
encryptedMsgs, err := encryptFileData(msgs, key)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to encrypt messages: %v\n", err)
|
||||
return err
|
||||
}
|
||||
encryptedMsgs = append(salt[:], encryptedMsgs...)
|
||||
|
||||
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *streamStore) rotateFileStore() {
|
||||
os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))
|
||||
|
||||
for i := fileStorePartitions - 2; i >= 0; i-- {
|
||||
os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package v0
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model"
|
|
@ -0,0 +1,72 @@
|
|||
package v1
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
"golang.org/x/crypto/pbkdf2"
|
||||
"golang.org/x/crypto/sha3"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
)
|
||||
|
||||
// createKeySalt derives a key from a password: returns key, salt, err
|
||||
func createKeySalt(password string) ([32]byte, [128]byte, error) {
|
||||
var salt [128]byte
|
||||
if _, err := io.ReadFull(rand.Reader, salt[:]); err != nil {
|
||||
log.Errorf("Cannot read from random: %v\n", err)
|
||||
return [32]byte{}, salt, err
|
||||
}
|
||||
dk := pbkdf2.Key([]byte(password), salt[:], 4096, 32, sha3.New512)
|
||||
|
||||
var dkr [32]byte
|
||||
copy(dkr[:], dk)
|
||||
return dkr, salt, nil
|
||||
}
|
||||
|
||||
func createKey(password string, salt []byte) [32]byte {
|
||||
dk := pbkdf2.Key([]byte(password), salt, 4096, 32, sha3.New512)
|
||||
|
||||
var dkr [32]byte
|
||||
copy(dkr[:], dk)
|
||||
return dkr
|
||||
}
|
||||
|
||||
//encryptFileData encrypts the cwtchPeer via the specified key.
|
||||
func encryptFileData(data []byte, key [32]byte) ([]byte, error) {
|
||||
var nonce [24]byte
|
||||
|
||||
if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil {
|
||||
log.Errorf("Cannot read from random: %v\n", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encrypted := secretbox.Seal(nonce[:], data, &nonce, &key)
|
||||
return encrypted, nil
|
||||
}
|
||||
|
||||
//decryptFile decrypts the passed ciphertext into a cwtchPeer via the specified key.
|
||||
func decryptFile(ciphertext []byte, key [32]byte) ([]byte, error) {
|
||||
var decryptNonce [24]byte
|
||||
copy(decryptNonce[:], ciphertext[:24])
|
||||
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &key)
|
||||
if ok {
|
||||
return decrypted, nil
|
||||
}
|
||||
return nil, errors.New("Failed to decrypt")
|
||||
}
|
||||
|
||||
// load instantiates a cwtchPeer from the file store
|
||||
func readEncryptedFile(directory, filename string, key [32]byte) ([]byte, error) {
|
||||
encryptedbytes, err := ioutil.ReadFile(path.Join(directory, filename))
|
||||
if err == nil {
|
||||
data, err := decryptFile(encryptedbytes, key)
|
||||
if err == nil {
|
||||
return data, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
package v1
|
||||
|
||||
import (
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
// fileStore stores a cwtchPeer in an encrypted file
|
||||
type fileStore struct {
|
||||
directory string
|
||||
filename string
|
||||
key [32]byte
|
||||
}
|
||||
|
||||
// FileStore is a primitive around storing encrypted files
|
||||
type FileStore interface {
|
||||
Write([]byte) error
|
||||
Read() ([]byte, error)
|
||||
Delete()
|
||||
ChangeKey(newkey [32]byte)
|
||||
}
|
||||
|
||||
// NewFileStore instantiates a fileStore given a filename and a password
|
||||
func NewFileStore(directory string, filename string, key [32]byte) FileStore {
|
||||
filestore := new(fileStore)
|
||||
filestore.key = key
|
||||
filestore.filename = filename
|
||||
filestore.directory = directory
|
||||
return filestore
|
||||
}
|
||||
|
||||
// write serializes a cwtchPeer to a file
|
||||
func (fps *fileStore) Write(data []byte) error {
|
||||
encryptedbytes, err := encryptFileData(data, fps.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ioutil.WriteFile(path.Join(fps.directory, fps.filename), encryptedbytes, 0600)
|
||||
return err
|
||||
}
|
||||
|
||||
func (fps *fileStore) Read() ([]byte, error) {
|
||||
return readEncryptedFile(fps.directory, fps.filename, fps.key)
|
||||
}
|
||||
|
||||
func (fps *fileStore) Delete() {
|
||||
err := os.Remove(path.Join(fps.directory, fps.filename))
|
||||
if err != nil {
|
||||
log.Errorf("Deleting file %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (fps *fileStore) ChangeKey(newkey [32]byte) {
|
||||
fps.key = newkey
|
||||
}
|
|
@ -0,0 +1,406 @@
|
|||
package v1
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
)
|
||||
|
||||
const groupIDLen = 32
|
||||
const peerIDLen = 56
|
||||
const profileFilename = "profile"
|
||||
const version = "1"
|
||||
const versionFile = "VERSION"
|
||||
const saltFile = "SALT"
|
||||
|
||||
//ProfileStoreV1 storage for profiles and message streams that uses in memory key and fs stored salt instead of in memory password
|
||||
type ProfileStoreV1 struct {
|
||||
fs FileStore
|
||||
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
|
||||
directory string
|
||||
profile *model.Profile
|
||||
key [32]byte
|
||||
salt [128]byte
|
||||
eventManager event.Manager
|
||||
queue event.Queue
|
||||
writer bool
|
||||
}
|
||||
|
||||
func initV1Directory(directory, password string) ([32]byte, [128]byte, error) {
|
||||
key, salt, err := createKeySalt(password)
|
||||
if err != nil {
|
||||
log.Errorf("Could not create key for profile store from password: %v\n", err)
|
||||
return [32]byte{}, [128]byte{}, err
|
||||
}
|
||||
|
||||
ioutil.WriteFile(path.Join(directory, versionFile), []byte(version), 0600)
|
||||
ioutil.WriteFile(path.Join(directory, saltFile), salt[:], 0600)
|
||||
|
||||
return key, salt, nil
|
||||
}
|
||||
|
||||
// CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV1 {
|
||||
os.Mkdir(directory, 0700)
|
||||
|
||||
key, salt, err := initV1Directory(directory, password)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
|
||||
ps.save()
|
||||
|
||||
ps.initProfileWriterStore()
|
||||
|
||||
return ps
|
||||
}
|
||||
|
||||
func (ps *ProfileStoreV1) initProfileWriterStore() {
|
||||
ps.queue = event.NewQueue()
|
||||
go ps.eventHandler()
|
||||
|
||||
ps.eventManager.Subscribe(event.BlockPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.UnblockPeer, ps.queue)
|
||||
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
|
||||
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
|
||||
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
|
||||
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
|
||||
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
|
||||
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
|
||||
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
|
||||
ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
|
||||
}
|
||||
|
||||
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (*ProfileStoreV1, error) {
|
||||
salt, err := ioutil.ReadFile(path.Join(directory, saltFile))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key := createKey(password, salt)
|
||||
|
||||
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
|
||||
copy(ps.salt[:], salt)
|
||||
|
||||
ps.initProfileWriterStore()
|
||||
|
||||
ps.load()
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
// ReadProfile reads a profile from storqage and returns the profile
|
||||
// directory should be $appDir/profiles/$rand
|
||||
func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) {
|
||||
os.Mkdir(directory, 0700)
|
||||
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
|
||||
|
||||
err := ps.load()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
profile := ps.GetProfileCopy(true)
|
||||
|
||||
return profile, nil
|
||||
}
|
||||
|
||||
// UpgradeV0Profile takes a profile (presumably from a V0 store) and creates and writes a V1 store
|
||||
func UpgradeV0Profile(profile *model.Profile, directory, password string) error {
|
||||
key, salt, err := initV1Directory(directory, password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
|
||||
ps.save()
|
||||
|
||||
for gid, group := range ps.profile.Groups {
|
||||
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
|
||||
ss.WriteN(ps.profile.Groups[gid].Timeline.Messages)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewProfile creates a new profile for use in the profile store.
|
||||
func NewProfile(name string) *model.Profile {
|
||||
profile := model.GenerateNewProfile(name)
|
||||
return profile
|
||||
}
|
||||
|
||||
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
|
||||
func (ps *ProfileStoreV1) GetNewPeerMessage() *event.Event {
|
||||
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Key, string(ps.key[:]), event.Salt, string(ps.salt[:]))
|
||||
return &message
|
||||
}
|
||||
|
||||
// GetStatusMessages creates an array of status messages for all peers and group servers from current information
|
||||
func (ps *ProfileStoreV1) GetStatusMessages() []*event.Event {
|
||||
messages := []*event.Event{}
|
||||
for _, contact := range ps.profile.Contacts {
|
||||
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
||||
event.RemotePeer: string(contact.Onion),
|
||||
event.ConnectionState: contact.State,
|
||||
})
|
||||
messages = append(messages, &message)
|
||||
}
|
||||
|
||||
doneServers := make(map[string]bool)
|
||||
for _, group := range ps.profile.Groups {
|
||||
if _, exists := doneServers[group.GroupServer]; !exists {
|
||||
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: string(group.GroupServer),
|
||||
event.ConnectionState: group.State,
|
||||
})
|
||||
messages = append(messages, &message)
|
||||
doneServers[group.GroupServer] = true
|
||||
}
|
||||
}
|
||||
|
||||
return messages
|
||||
}
|
||||
|
||||
// ChangePassword restores all data under a new password's encryption
|
||||
func (ps *ProfileStoreV1) ChangePassword(oldpass, newpass, eventID string) {
|
||||
oldkey := createKey(oldpass, ps.salt[:])
|
||||
|
||||
if oldkey != ps.key {
|
||||
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID))
|
||||
return
|
||||
}
|
||||
|
||||
newkey := createKey(newpass, ps.salt[:])
|
||||
|
||||
newStreamStores := map[string]StreamStore{}
|
||||
idToNewLocalID := map[string]string{}
|
||||
|
||||
// Generate all new StreamStores with the new password and write all the old StreamStore data into these ones
|
||||
for ssid, ss := range ps.streamStores {
|
||||
// New ss with new pass and new localID
|
||||
newlocalID := model.GenerateRandomID()
|
||||
idToNewLocalID[ssid] = newlocalID
|
||||
|
||||
newSS := NewStreamStore(ps.directory, newlocalID, newkey)
|
||||
newStreamStores[ssid] = newSS
|
||||
|
||||
// write whole store
|
||||
messages := ss.Read()
|
||||
newSS.WriteN(messages)
|
||||
}
|
||||
|
||||
// Switch over
|
||||
oldStreamStores := ps.streamStores
|
||||
ps.streamStores = newStreamStores
|
||||
for ssid, newLocalID := range idToNewLocalID {
|
||||
if len(ssid) == groupIDLen {
|
||||
ps.profile.Groups[ssid].LocalID = newLocalID
|
||||
} else {
|
||||
ps.profile.Contacts[ssid].LocalID = newLocalID
|
||||
}
|
||||
}
|
||||
|
||||
ps.key = newkey
|
||||
ps.fs.ChangeKey(newkey)
|
||||
ps.save()
|
||||
|
||||
// Clean up
|
||||
for _, oldss := range oldStreamStores {
|
||||
oldss.Delete()
|
||||
}
|
||||
|
||||
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID))
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *ProfileStoreV1) save() error {
|
||||
if ps.writer {
|
||||
bytes, _ := json.Marshal(ps.profile)
|
||||
return ps.fs.Write(bytes)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// load instantiates a cwtchPeer from the file store
|
||||
func (ps *ProfileStoreV1) load() error {
|
||||
decrypted, err := ps.fs.Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cp := new(model.Profile)
|
||||
err = json.Unmarshal(decrypted, &cp)
|
||||
if err == nil {
|
||||
ps.profile = cp
|
||||
|
||||
for gid, group := range cp.Groups {
|
||||
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
|
||||
|
||||
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetProfileCopy returns a copy of the stored profile
|
||||
func (ps *ProfileStoreV1) GetProfileCopy(timeline bool) *model.Profile {
|
||||
return ps.profile.GetCopy(timeline)
|
||||
}
|
||||
|
||||
func (ps *ProfileStoreV1) eventHandler() {
|
||||
log.Debugln("eventHandler()!")
|
||||
for {
|
||||
ev := ps.queue.Next()
|
||||
log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID)
|
||||
|
||||
switch ev.EventType {
|
||||
case event.BlockPeer:
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
contact.Blocked = true
|
||||
ps.save()
|
||||
}
|
||||
case event.UnblockPeer:
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
contact.Blocked = false
|
||||
ps.save()
|
||||
}
|
||||
case event.PeerCreated:
|
||||
var pp *model.PublicProfile
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
|
||||
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
|
||||
// TODO: configure - allow peers to be configured to turn on limited storage
|
||||
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
|
||||
pp.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[pp.Onion] = ss
|
||||
ps.save()*/
|
||||
case event.GroupCreated:
|
||||
var group *model.Group
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
|
||||
ps.profile.AddGroup(group)
|
||||
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
|
||||
ps.save()
|
||||
case event.SetProfileName:
|
||||
ps.profile.Name = ev.Data[event.ProfileName]
|
||||
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
|
||||
ps.save()
|
||||
case event.SetAttribute:
|
||||
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
case event.SetPeerAttribute:
|
||||
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
||||
if exists {
|
||||
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
} else {
|
||||
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
|
||||
}
|
||||
case event.SetGroupAttribute:
|
||||
group := ps.profile.GetGroup(ev.Data[event.GroupID])
|
||||
if group != nil {
|
||||
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
||||
ps.save()
|
||||
} else {
|
||||
log.Errorf("error setting attribute on group %v group does not exist", ev)
|
||||
}
|
||||
case event.AcceptGroupInvite:
|
||||
err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
|
||||
if err == nil {
|
||||
ps.save()
|
||||
} else {
|
||||
log.Errorf("error accepting group invite")
|
||||
}
|
||||
case event.NewGroupInvite:
|
||||
gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
|
||||
log.Errorf("gid: %v err:%v\n", gid, err)
|
||||
if err == nil {
|
||||
ps.save()
|
||||
group := ps.profile.Groups[gid]
|
||||
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
|
||||
} else {
|
||||
log.Errorf("error storing new group invite: %v (%v)", err, ev)
|
||||
}
|
||||
case event.NewMessageFromGroup:
|
||||
groupid := ev.Data[event.GroupID]
|
||||
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
|
||||
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
|
||||
ss, exists := ps.streamStores[groupid]
|
||||
if exists {
|
||||
ss.Write(message)
|
||||
} else {
|
||||
log.Errorf("error storing new group message: %v stream store does not exist", ev)
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
|
||||
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
|
||||
}
|
||||
case event.ServerStateChange:
|
||||
for _, group := range ps.profile.Groups {
|
||||
if group.GroupServer == ev.Data[event.GroupServer] {
|
||||
group.State = ev.Data[event.ConnectionState]
|
||||
}
|
||||
}
|
||||
case event.DeleteContact:
|
||||
onion := ev.Data[event.RemotePeer]
|
||||
ps.profile.DeleteContact(onion)
|
||||
ps.save()
|
||||
case event.DeleteGroup:
|
||||
groupID := ev.Data[event.GroupID]
|
||||
ps.profile.DeleteGroup(groupID)
|
||||
ps.save()
|
||||
ss, exists := ps.streamStores[groupID]
|
||||
if exists {
|
||||
ss.Delete()
|
||||
delete(ps.streamStores, groupID)
|
||||
}
|
||||
case event.ChangePassword:
|
||||
oldpass := ev.Data[event.Password]
|
||||
newpass := ev.Data[event.NewPassword]
|
||||
ps.ChangePassword(oldpass, newpass, ev.EventID)
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the queue / thread
|
||||
func (ps *ProfileStoreV1) Shutdown() {
|
||||
if ps.queue != nil {
|
||||
ps.queue.Shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
// Delete removes all stored files for this stored profile
|
||||
func (ps *ProfileStoreV1) Delete() {
|
||||
log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion)
|
||||
|
||||
for _, ss := range ps.streamStores {
|
||||
ss.Delete()
|
||||
}
|
||||
|
||||
ps.fs.Delete()
|
||||
|
||||
err := os.RemoveAll(ps.directory)
|
||||
if err != nil {
|
||||
log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,150 @@
|
|||
// Known race issue with event bus channel closure
|
||||
|
||||
package v1
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const testProfileName = "Alice"
|
||||
const testKey = "key"
|
||||
const testVal = "value"
|
||||
const testInitialMessage = "howdy"
|
||||
const testMessage = "Hello from storage"
|
||||
|
||||
func TestProfileStoreWriteRead(t *testing.T) {
|
||||
log.Println("profile store test!")
|
||||
os.RemoveAll(testingDir)
|
||||
eventBus := event.NewEventManager()
|
||||
profile := NewProfile(testProfileName)
|
||||
ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile)
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
|
||||
if err != nil {
|
||||
t.Errorf("Creating group: %v\n", err)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Creating group invite: %v\n", err)
|
||||
}
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
ps1.Shutdown()
|
||||
|
||||
ps2, err := LoadProfileWriterStore(eventBus, testingDir, password)
|
||||
if err != nil {
|
||||
t.Errorf("Error createing ProfileStoreV1: %v\n", err)
|
||||
}
|
||||
|
||||
profile = ps2.GetProfileCopy(true)
|
||||
if profile.Name != testProfileName {
|
||||
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
|
||||
}
|
||||
|
||||
v, _ := profile.GetAttribute(testKey)
|
||||
if v != testVal {
|
||||
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
|
||||
}
|
||||
|
||||
group2 := ps2.GetProfileCopy(true).Groups[groupid]
|
||||
if group2 == nil {
|
||||
t.Errorf("Group not loaded\n")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestProfileStoreChangePassword(t *testing.T) {
|
||||
os.RemoveAll(testingDir)
|
||||
eventBus := event.NewEventManager()
|
||||
|
||||
queue := event.NewQueue()
|
||||
eventBus.Subscribe(event.ChangePasswordSuccess, queue)
|
||||
|
||||
profile := NewProfile(testProfileName)
|
||||
ps1 := CreateProfileWriterStore(eventBus, testingDir, password, profile)
|
||||
|
||||
groupid, invite, err := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
|
||||
if err != nil {
|
||||
t.Errorf("Creating group: %v\n", err)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Creating group invite: %v\n", err)
|
||||
}
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
fmt.Println("Sending 200 messages...")
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: profile.Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
}
|
||||
|
||||
newPass := "qwerty123"
|
||||
|
||||
fmt.Println("Sending Change Passwords event...")
|
||||
eventBus.Publish(event.NewEventList(event.ChangePassword, event.Password, password, event.NewPassword, newPass))
|
||||
|
||||
ev := queue.Next()
|
||||
if ev.EventType != event.ChangePasswordSuccess {
|
||||
t.Errorf("Unexpected event response detected %v\n", ev.EventType)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("Sending 10 more messages...")
|
||||
for i := 0; i < 10; i++ {
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: profile.Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
fmt.Println("Shutdown profile store...")
|
||||
ps1.Shutdown()
|
||||
|
||||
fmt.Println("New Profile store...")
|
||||
ps2, err := LoadProfileWriterStore(eventBus, testingDir, newPass)
|
||||
if err != nil {
|
||||
t.Errorf("Error createing new ProfileStoreV1 with new password: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
profile2 := ps2.GetProfileCopy(true)
|
||||
|
||||
if profile2.Groups[groupid] == nil {
|
||||
t.Errorf("Failed to load group %v\n", groupid)
|
||||
return
|
||||
}
|
||||
|
||||
if len(profile2.Groups[groupid].Timeline.Messages) != 210 {
|
||||
t.Errorf("Failed to load group's 210 messages, instead got %v\n", len(profile2.Groups[groupid].Timeline.Messages))
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package storage
|
||||
package v1
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model"
|
||||
|
@ -18,7 +18,7 @@ const (
|
|||
|
||||
// streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
|
||||
type streamStore struct {
|
||||
password string
|
||||
key [32]byte
|
||||
|
||||
storeDirectory string
|
||||
filenameBase string
|
||||
|
@ -36,12 +36,11 @@ type StreamStore interface {
|
|||
WriteN(messages []model.Message)
|
||||
Read() []model.Message
|
||||
Delete()
|
||||
ChangePassword(newpass string)
|
||||
}
|
||||
|
||||
// NewStreamStore returns an initialized StreamStore ready for reading and writing
|
||||
func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
|
||||
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
|
||||
func NewStreamStore(directory string, filenameBase string, key [32]byte) (store StreamStore) {
|
||||
ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, key: key}
|
||||
os.Mkdir(ss.storeDirectory, 0700)
|
||||
|
||||
ss.initBuffer()
|
||||
|
@ -58,7 +57,7 @@ func (ss *streamStore) initBuffer() {
|
|||
func (ss *streamStore) initBufferFromStorage() error {
|
||||
filename := fmt.Sprintf("%s.%d", ss.filenameBase, 0)
|
||||
|
||||
bytes, _ := readEncryptedFile(ss.storeDirectory, filename, ss.password)
|
||||
bytes, _ := readEncryptedFile(ss.storeDirectory, filename, ss.key)
|
||||
|
||||
msgs := []model.Message{}
|
||||
err := json.Unmarshal([]byte(bytes), &msgs)
|
||||
|
@ -84,13 +83,11 @@ func (ss *streamStore) updateFile() error {
|
|||
}
|
||||
|
||||
// ENCRYPT
|
||||
key, salt, _ := createKey(ss.password)
|
||||
encryptedMsgs, err := encryptFileData(msgs, key)
|
||||
encryptedMsgs, err := encryptFileData(msgs, ss.key)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to encrypt messages: %v\n", err)
|
||||
return err
|
||||
}
|
||||
encryptedMsgs = append(salt[:], encryptedMsgs...)
|
||||
|
||||
ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
|
||||
return nil
|
||||
|
@ -121,7 +118,7 @@ func (ss *streamStore) Read() (messages []model.Message) {
|
|||
for i := fileStorePartitions - 1; i >= 0; i-- {
|
||||
filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
|
||||
|
||||
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
|
||||
bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.key)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
@ -131,12 +128,6 @@ func (ss *streamStore) Read() (messages []model.Message) {
|
|||
resp = append(resp, msgs...)
|
||||
}
|
||||
|
||||
// 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
|
||||
for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
|
||||
resp[i].Acknowledged = true
|
||||
resp[i].ReceivedByServer = true
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
|
@ -158,16 +149,17 @@ func (ss *streamStore) WriteN(messages []model.Message) {
|
|||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
||||
log.Infof("WriteN %v messages\n", len(messages))
|
||||
i := 0
|
||||
for _, m := range messages {
|
||||
ss.updateBuffer(m)
|
||||
i++
|
||||
|
||||
if ss.bufferByteCount > bytesPerFile {
|
||||
ss.updateFile()
|
||||
log.Debugf("rotating log file")
|
||||
ss.rotateFileStore()
|
||||
ss.initBuffer()
|
||||
}
|
||||
}
|
||||
ss.updateFile()
|
||||
}
|
||||
|
||||
func (ss *streamStore) ChangePassword(newpass string) {}
|
|
@ -0,0 +1,52 @@
|
|||
package v1
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
const testingDir = "./testing"
|
||||
const filenameBase = "testStream"
|
||||
const password = "asdfqwer"
|
||||
const line1 = "Hello from storage!"
|
||||
|
||||
func TestStreamStoreWriteRead(t *testing.T) {
|
||||
os.Remove(".test.json")
|
||||
os.RemoveAll(testingDir)
|
||||
os.Mkdir(testingDir, 0777)
|
||||
key, _, _ := createKeySalt(password)
|
||||
ss1 := NewStreamStore(testingDir, filenameBase, key)
|
||||
m := model.Message{Message: line1}
|
||||
ss1.Write(m)
|
||||
|
||||
ss2 := NewStreamStore(testingDir, filenameBase, key)
|
||||
messages := ss2.Read()
|
||||
if len(messages) != 1 {
|
||||
t.Errorf("Read messages has wrong length. Expected: 1 Actual: %d\n", len(messages))
|
||||
}
|
||||
if messages[0].Message != line1 {
|
||||
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamStoreWriteReadRotate(t *testing.T) {
|
||||
os.Remove(".test.json")
|
||||
os.RemoveAll(testingDir)
|
||||
os.Mkdir(testingDir, 0777)
|
||||
key, _, _ := createKeySalt(password)
|
||||
ss1 := NewStreamStore(testingDir, filenameBase, key)
|
||||
m := model.Message{Message: line1}
|
||||
for i := 0; i < 400; i++ {
|
||||
ss1.Write(m)
|
||||
}
|
||||
|
||||
ss2 := NewStreamStore(testingDir, filenameBase, key)
|
||||
messages := ss2.Read()
|
||||
if len(messages) != 400 {
|
||||
t.Errorf("Read messages has wrong length. Expected: 400 Actual: %d\n", len(messages))
|
||||
}
|
||||
if messages[0].Message != line1 {
|
||||
t.Errorf("Read message has wrong content. Expected: '%v' Actual: '%v'\n", line1, messages[0].Message)
|
||||
}
|
||||
}
|
|
@ -5,6 +5,8 @@ pwd
|
|||
GORACE="haltonerror=1"
|
||||
go test -race ${1} -coverprofile=model.cover.out -v ./model
|
||||
go test -race ${1} -coverprofile=event.cover.out -v ./event
|
||||
go test -race ${1} -coverprofile=storage.v0.cover.out -v ./storage/v0
|
||||
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
|
||||
go test -race ${1} -coverprofile=storage.cover.out -v ./storage
|
||||
go test -race ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
|
||||
go test -race ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam
|
||||
|
|
Loading…
Reference in New Issue