573 lines
19 KiB
Go
573 lines
19 KiB
Go
package v1
|
|
|
|
import (
|
|
"cwtch.im/cwtch/event"
|
|
"cwtch.im/cwtch/model"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"git.openprivacy.ca/openprivacy/log"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
const groupIDLen = 32
|
|
const peerIDLen = 56
|
|
const profileFilename = "profile"
|
|
const version = "1"
|
|
const versionFile = "VERSION"
|
|
const saltFile = "SALT"
|
|
|
|
//ProfileStoreV1 storage for profiles and message streams that uses in memory key and fs stored salt instead of in memory password
|
|
type ProfileStoreV1 struct {
|
|
fs FileStore
|
|
streamStores map[string]StreamStore // map [groupId|onion] StreamStore
|
|
directory string
|
|
profile *model.Profile
|
|
key [32]byte
|
|
salt [128]byte
|
|
eventManager event.Manager
|
|
queue event.Queue
|
|
writer bool
|
|
}
|
|
|
|
// CheckPassword returns true if the given password produces the same key as the current stored key, otherwise false.
|
|
func (ps *ProfileStoreV1) CheckPassword(checkpass string) bool {
|
|
oldkey := CreateKey(checkpass, ps.salt[:])
|
|
return oldkey == ps.key
|
|
}
|
|
|
|
func initV1Directory(directory, password string) ([32]byte, [128]byte, error) {
|
|
key, salt, err := CreateKeySalt(password)
|
|
if err != nil {
|
|
log.Errorf("Could not create key for profile store from password: %v\n", err)
|
|
return [32]byte{}, [128]byte{}, err
|
|
}
|
|
|
|
ioutil.WriteFile(path.Join(directory, versionFile), []byte(version), 0600)
|
|
ioutil.WriteFile(path.Join(directory, saltFile), salt[:], 0600)
|
|
|
|
return key, salt, nil
|
|
}
|
|
|
|
// CreateProfileWriterStore creates a profile store backed by a filestore listening for events and saving them
|
|
// directory should be $appDir/profiles/$rand
|
|
func CreateProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) *ProfileStoreV1 {
|
|
os.Mkdir(directory, 0700)
|
|
|
|
key, salt, err := initV1Directory(directory, password)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
|
|
ps.save()
|
|
|
|
ps.initProfileWriterStore()
|
|
|
|
return ps
|
|
}
|
|
|
|
func (ps *ProfileStoreV1) initProfileWriterStore() {
|
|
ps.queue = event.NewQueue()
|
|
go ps.eventHandler()
|
|
|
|
ps.eventManager.Subscribe(event.SetPeerAuthorization, ps.queue)
|
|
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
|
|
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
|
|
ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
|
|
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
|
|
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
|
|
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
|
|
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
|
|
ps.eventManager.Subscribe(event.RejectGroupInvite, ps.queue)
|
|
ps.eventManager.Subscribe(event.NewGroup, ps.queue)
|
|
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
|
|
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
|
|
ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue)
|
|
ps.eventManager.Subscribe(event.NewMessageFromPeer, ps.queue)
|
|
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
|
|
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
|
|
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
|
|
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
|
|
ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
|
|
ps.eventManager.Subscribe(event.UpdateMessageFlags, ps.queue)
|
|
}
|
|
|
|
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
|
|
// directory should be $appDir/profiles/$rand
|
|
func LoadProfileWriterStore(eventManager event.Manager, directory, password string) (*ProfileStoreV1, error) {
|
|
salt, err := ioutil.ReadFile(path.Join(directory, saltFile))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
key := CreateKey(password, salt)
|
|
|
|
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, directory: directory, profile: nil, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
|
|
copy(ps.salt[:], salt)
|
|
|
|
err = ps.load()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ps.initProfileWriterStore()
|
|
return ps, nil
|
|
}
|
|
|
|
// ReadProfile reads a profile from storqage and returns the profile
|
|
// directory should be $appDir/profiles/$rand
|
|
func ReadProfile(directory string, key [32]byte, salt [128]byte) (*model.Profile, error) {
|
|
os.Mkdir(directory, 0700)
|
|
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
|
|
|
|
err := ps.load()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
profile := ps.GetProfileCopy(true)
|
|
|
|
return profile, nil
|
|
}
|
|
|
|
// UpgradeV0Profile takes a profile (presumably from a V0 store) and creates and writes a V1 store
|
|
func UpgradeV0Profile(profile *model.Profile, directory, password string) error {
|
|
key, salt, err := initV1Directory(directory, password)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ps := &ProfileStoreV1{fs: NewFileStore(directory, profileFilename, key), key: key, salt: salt, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
|
|
ps.save()
|
|
|
|
for gid, group := range ps.profile.Groups {
|
|
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
|
|
ss.WriteN(ps.profile.Groups[gid].Timeline.Messages)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewProfile creates a new profile for use in the profile store.
|
|
func NewProfile(name string) *model.Profile {
|
|
profile := model.GenerateNewProfile(name)
|
|
return profile
|
|
}
|
|
|
|
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
|
|
func (ps *ProfileStoreV1) GetNewPeerMessage() *event.Event {
|
|
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Key, string(ps.key[:]), event.Salt, string(ps.salt[:]))
|
|
return &message
|
|
}
|
|
|
|
// GetStatusMessages creates an array of status messages for all peers and group servers from current information
|
|
func (ps *ProfileStoreV1) GetStatusMessages() []*event.Event {
|
|
messages := []*event.Event{}
|
|
for _, contact := range ps.profile.Contacts {
|
|
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
|
event.RemotePeer: string(contact.Onion),
|
|
event.ConnectionState: contact.State,
|
|
})
|
|
messages = append(messages, &message)
|
|
}
|
|
|
|
doneServers := make(map[string]bool)
|
|
for _, group := range ps.profile.Groups {
|
|
if _, exists := doneServers[group.GroupServer]; !exists {
|
|
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
|
event.GroupServer: string(group.GroupServer),
|
|
event.ConnectionState: group.State,
|
|
})
|
|
messages = append(messages, &message)
|
|
doneServers[group.GroupServer] = true
|
|
}
|
|
}
|
|
|
|
return messages
|
|
}
|
|
|
|
// ChangePassword restores all data under a new password's encryption
|
|
func (ps *ProfileStoreV1) ChangePassword(oldpass, newpass, eventID string) {
|
|
oldkey := CreateKey(oldpass, ps.salt[:])
|
|
|
|
if oldkey != ps.key {
|
|
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID))
|
|
return
|
|
}
|
|
|
|
newkey := CreateKey(newpass, ps.salt[:])
|
|
|
|
newStreamStores := map[string]StreamStore{}
|
|
idToNewLocalID := map[string]string{}
|
|
|
|
// Generate all new StreamStores with the new password and write all the old StreamStore data into these ones
|
|
for ssid, ss := range ps.streamStores {
|
|
// New ss with new pass and new localID
|
|
newlocalID := model.GenerateRandomID()
|
|
idToNewLocalID[ssid] = newlocalID
|
|
|
|
newSS := NewStreamStore(ps.directory, newlocalID, newkey)
|
|
newStreamStores[ssid] = newSS
|
|
|
|
// write whole store
|
|
messages := ss.Read()
|
|
newSS.WriteN(messages)
|
|
}
|
|
|
|
// Switch over
|
|
oldStreamStores := ps.streamStores
|
|
ps.streamStores = newStreamStores
|
|
for ssid, newLocalID := range idToNewLocalID {
|
|
if len(ssid) == groupIDLen {
|
|
ps.profile.Groups[ssid].LocalID = newLocalID
|
|
} else {
|
|
if ps.profile.Contacts[ssid] != nil {
|
|
ps.profile.Contacts[ssid].LocalID = newLocalID
|
|
} else {
|
|
log.Errorf("Unknown Contact: %v. This is probably the result of corrupted development data from fuzzing. This contact will not appear in the new profile.", ssid)
|
|
}
|
|
}
|
|
}
|
|
|
|
ps.key = newkey
|
|
ps.fs.ChangeKey(newkey)
|
|
ps.save()
|
|
|
|
// Clean up
|
|
for _, oldss := range oldStreamStores {
|
|
oldss.Delete()
|
|
}
|
|
|
|
ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID))
|
|
return
|
|
}
|
|
|
|
func (ps *ProfileStoreV1) save() error {
|
|
if ps.writer {
|
|
bytes, _ := json.Marshal(ps.profile)
|
|
return ps.fs.Write(bytes)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ps *ProfileStoreV1) regenStreamStore(messages []model.Message, contact string) {
|
|
oldss := ps.streamStores[contact]
|
|
newLocalID := model.GenerateRandomID()
|
|
newSS := NewStreamStore(ps.directory, newLocalID, ps.key)
|
|
newSS.WriteN(messages)
|
|
if len(contact) == groupIDLen {
|
|
ps.profile.Groups[contact].LocalID = newLocalID
|
|
} else {
|
|
// We can assume this exists as regen stream store should only happen to *update* a message
|
|
ps.profile.Contacts[contact].LocalID = newLocalID
|
|
}
|
|
ps.streamStores[contact] = newSS
|
|
ps.save()
|
|
oldss.Delete()
|
|
}
|
|
|
|
// load instantiates a cwtchPeer from the file store
|
|
func (ps *ProfileStoreV1) load() error {
|
|
decrypted, err := ps.fs.Read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cp := new(model.Profile)
|
|
err = json.Unmarshal(decrypted, &cp)
|
|
|
|
if err == nil {
|
|
ps.profile = cp
|
|
|
|
// TODO 2020.06.09: v1 update, Remove on v2
|
|
// if we already have the contact it can be assumed "approved" unless blocked
|
|
for _, contact := range cp.Contacts {
|
|
if contact.Authorization == "" {
|
|
if contact.DeprecatedBlocked {
|
|
contact.Authorization = model.AuthBlocked
|
|
} else {
|
|
contact.Authorization = model.AuthApproved
|
|
}
|
|
}
|
|
|
|
// Check if there is any saved history...
|
|
saveHistory, keyExists := contact.GetAttribute(event.SaveHistoryKey)
|
|
if !keyExists {
|
|
contact.SetAttribute(event.SaveHistoryKey, event.DeleteHistoryDefault)
|
|
}
|
|
|
|
if saveHistory == event.SaveHistoryConfirmed {
|
|
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
|
|
cp.Contacts[contact.Onion].Timeline.SetMessages(ss.Read())
|
|
ps.streamStores[contact.Onion] = ss
|
|
}
|
|
}
|
|
|
|
for gid, group := range cp.Groups {
|
|
if group.Version == 0 {
|
|
log.Infof("group %v is of unsupported version 0. dropping group...\n", group.GroupID)
|
|
delete(cp.Groups, gid)
|
|
continue
|
|
}
|
|
|
|
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
|
|
|
|
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
|
cp.Groups[gid].Timeline.Sort()
|
|
ps.streamStores[group.GroupID] = ss
|
|
}
|
|
|
|
ps.save()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// GetProfileCopy returns a copy of the stored profile
|
|
func (ps *ProfileStoreV1) GetProfileCopy(timeline bool) *model.Profile {
|
|
return ps.profile.GetCopy(timeline)
|
|
}
|
|
|
|
func (ps *ProfileStoreV1) eventHandler() {
|
|
for {
|
|
ev := ps.queue.Next()
|
|
log.Debugf("eventHandler event %v %v\n", ev.EventType, ev.EventID)
|
|
|
|
switch ev.EventType {
|
|
case event.SetPeerAuthorization:
|
|
err := ps.profile.SetContactAuthorization(ev.Data[event.RemotePeer], model.Authorization(ev.Data[event.Authorization]))
|
|
if err == nil {
|
|
ps.save()
|
|
}
|
|
case event.PeerCreated:
|
|
var pp *model.PublicProfile
|
|
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
|
|
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
|
|
case event.GroupCreated:
|
|
var group *model.Group
|
|
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
|
|
ps.profile.AddGroup(group)
|
|
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
|
|
ps.save()
|
|
case event.SetProfileName:
|
|
ps.profile.Name = ev.Data[event.ProfileName]
|
|
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
|
|
ps.save()
|
|
case event.SetAttribute:
|
|
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
|
ps.save()
|
|
case event.SetPeerAttribute:
|
|
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
|
|
if exists {
|
|
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
|
ps.save()
|
|
|
|
switch ev.Data[event.Key] {
|
|
case event.SaveHistoryKey:
|
|
if event.DeleteHistoryConfirmed == ev.Data[event.Data] {
|
|
ss, exists := ps.streamStores[ev.Data[event.RemotePeer]]
|
|
if exists {
|
|
ss.Delete()
|
|
delete(ps.streamStores, ev.Data[event.RemotePeer])
|
|
}
|
|
} else if event.SaveHistoryConfirmed == ev.Data[event.Data] {
|
|
_, exists := ps.streamStores[ev.Data[event.RemotePeer]]
|
|
if !exists {
|
|
ss := NewStreamStore(ps.directory, contact.LocalID, ps.key)
|
|
ps.streamStores[ev.Data[event.RemotePeer]] = ss
|
|
}
|
|
}
|
|
default:
|
|
{
|
|
}
|
|
}
|
|
|
|
} else {
|
|
log.Errorf("error setting attribute on peer %v peer does not exist", ev)
|
|
}
|
|
case event.SetGroupAttribute:
|
|
group := ps.profile.GetGroup(ev.Data[event.GroupID])
|
|
if group != nil {
|
|
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
|
|
ps.save()
|
|
} else {
|
|
log.Errorf("error setting attribute on group %v group does not exist", ev)
|
|
}
|
|
case event.AcceptGroupInvite:
|
|
err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
|
|
if err == nil {
|
|
ps.save()
|
|
} else {
|
|
log.Errorf("error accepting group invite")
|
|
}
|
|
case event.RejectGroupInvite:
|
|
ps.profile.RejectInvite(ev.Data[event.GroupID])
|
|
ps.save()
|
|
case event.NewGroup:
|
|
gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite])
|
|
if err == nil {
|
|
ps.save()
|
|
group := ps.profile.Groups[gid]
|
|
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
|
|
} else {
|
|
log.Errorf("error storing new group invite: %v (%v)", err, ev)
|
|
}
|
|
case event.SendMessageToPeer: // cache the message till an ack, then it's given to stream store.
|
|
// stream store doesn't support updates, so we don't want to commit it till ack'd
|
|
ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID)
|
|
case event.NewMessageFromPeer:
|
|
ps.profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now())
|
|
ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true)
|
|
case event.PeerAcknowledgement:
|
|
onion := ev.Data[event.RemotePeer]
|
|
eventID := ev.Data[event.EventID]
|
|
contact, ok := ps.profile.Contacts[onion]
|
|
if ok {
|
|
mIdx, ok := contact.UnacknowledgedMessages[eventID]
|
|
if ok {
|
|
message := contact.Timeline.Messages[mIdx]
|
|
ps.attemptSavePeerMessage(onion, message.Message, message.Timestamp.Format(time.RFC3339Nano), false)
|
|
}
|
|
}
|
|
ps.profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
|
|
case event.NewMessageFromGroup:
|
|
groupid := ev.Data[event.GroupID]
|
|
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
|
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
|
|
sig, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
|
prevsig, _ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature])
|
|
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true}
|
|
ss, exists := ps.streamStores[groupid]
|
|
if exists {
|
|
// We need to store a local copy of the message...
|
|
ps.profile.GetGroup(groupid).Timeline.Insert(&message)
|
|
ss.Write(message)
|
|
} else {
|
|
log.Errorf("error storing new group message: %v stream store does not exist", ev)
|
|
}
|
|
case event.PeerStateChange:
|
|
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
|
|
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
|
|
}
|
|
case event.ServerStateChange:
|
|
for _, group := range ps.profile.Groups {
|
|
if group.GroupServer == ev.Data[event.GroupServer] {
|
|
group.State = ev.Data[event.ConnectionState]
|
|
}
|
|
}
|
|
case event.DeleteContact:
|
|
onion := ev.Data[event.RemotePeer]
|
|
ps.profile.DeleteContact(onion)
|
|
ps.save()
|
|
ss, exists := ps.streamStores[onion]
|
|
if exists {
|
|
ss.Delete()
|
|
delete(ps.streamStores, onion)
|
|
}
|
|
case event.DeleteGroup:
|
|
groupID := ev.Data[event.GroupID]
|
|
ps.profile.DeleteGroup(groupID)
|
|
ps.save()
|
|
ss, exists := ps.streamStores[groupID]
|
|
if exists {
|
|
ss.Delete()
|
|
delete(ps.streamStores, groupID)
|
|
}
|
|
case event.ChangePassword:
|
|
oldpass := ev.Data[event.Password]
|
|
newpass := ev.Data[event.NewPassword]
|
|
ps.ChangePassword(oldpass, newpass, ev.EventID)
|
|
case event.UpdateMessageFlags:
|
|
handle := ev.Data[event.Handle]
|
|
mIx, err := strconv.Atoi(ev.Data[event.Index])
|
|
if err != nil {
|
|
log.Errorf("Invalid Message Index: %v", err)
|
|
return
|
|
}
|
|
flags, err := strconv.ParseUint(ev.Data[event.Flags], 2, 64)
|
|
if err != nil {
|
|
log.Errorf("Invalid Message Falgs: %v", err)
|
|
return
|
|
}
|
|
ps.profile.UpdateMessageFlags(handle, mIx, flags)
|
|
if len(handle) == groupIDLen {
|
|
ps.regenStreamStore(ps.profile.GetGroup(handle).Timeline.Messages, handle)
|
|
} else if contact, exists := ps.profile.GetContact(handle); exists {
|
|
if exists {
|
|
val, _ := contact.GetAttribute(event.SaveHistoryKey)
|
|
if val == event.SaveHistoryConfirmed {
|
|
ps.regenStreamStore(contact.Timeline.Messages, handle)
|
|
}
|
|
}
|
|
}
|
|
default:
|
|
log.Debugf("shutting down profile store: %v", ev)
|
|
return
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// attemptSavePeerMessage checks if the peer has been configured to save history from this peer
|
|
// and if so the peer saves the message into history. fromPeer is used to control if the message is saved
|
|
// as coming from the remote peer or if it was sent by out profile.
|
|
func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampeReceived string, fromPeer bool) {
|
|
contact, exists := ps.profile.GetContact(peerID)
|
|
if exists {
|
|
val, _ := contact.GetAttribute(event.SaveHistoryKey)
|
|
switch val {
|
|
case event.SaveHistoryConfirmed:
|
|
{
|
|
peerID := peerID
|
|
var received time.Time
|
|
var message model.Message
|
|
if fromPeer {
|
|
received, _ = time.Parse(time.RFC3339Nano, timestampeReceived)
|
|
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
|
} else {
|
|
received := time.Now()
|
|
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: ps.profile.Onion, Signature: []byte{}, PreviousMessageSig: []byte{}, Acknowledged: true}
|
|
}
|
|
ss, exists := ps.streamStores[peerID]
|
|
if exists {
|
|
ss.Write(message)
|
|
} else {
|
|
log.Errorf("error storing new peer message: %v stream store does not exist", peerID)
|
|
}
|
|
}
|
|
default:
|
|
{
|
|
}
|
|
}
|
|
} else {
|
|
log.Errorf("error saving message for peer that doesn't exist: %v", peerID)
|
|
}
|
|
}
|
|
|
|
// Shutdown shuts down the queue / thread
|
|
func (ps *ProfileStoreV1) Shutdown() {
|
|
if ps.queue != nil {
|
|
ps.queue.Shutdown()
|
|
}
|
|
}
|
|
|
|
// Delete removes all stored files for this stored profile
|
|
func (ps *ProfileStoreV1) Delete() {
|
|
log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion)
|
|
|
|
for _, ss := range ps.streamStores {
|
|
ss.Delete()
|
|
}
|
|
|
|
ps.fs.Delete()
|
|
|
|
err := os.RemoveAll(ps.directory)
|
|
if err != nil {
|
|
log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err)
|
|
}
|
|
}
|