cwtch/storage/profile_store.go

169 lines
5.4 KiB
Go

package storage
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"encoding/json"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto"
"os"
"time"
)
const profileFilename = "profile"
type profileStore struct {
fs FileStore
streamStores map[string]StreamStore
directory string
password string
profile *model.Profile
eventManager *event.Manager
queue *event.Queue
}
// ProfileStore is an interface to managing the storage of Cwtch Profiles
type ProfileStore interface {
Load() error
Shutdown()
GetProfileCopy() *model.Profile
}
// NewProfileStore returns a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func NewProfileStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}}
ps.queue = event.NewEventQueue(100)
go ps.eventHandler()
ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel)
return ps
}
// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
return profile
}
func (ps *profileStore) save() error {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Save(bytes)
}
// Load instantiates a cwtchPeer from the file store
func (ps *profileStore) Load() error {
decrypted, err := ps.fs.Load()
if err != nil {
return err
}
cp := new(model.Profile)
err = json.Unmarshal(decrypted, &cp)
if err == nil {
ps.profile = cp
for _, profile := range cp.Contacts {
ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
profile.Timeline.SetMessages(ss.Read())
ps.streamStores[profile.Onion] = ss
}
for _, group := range cp.Groups {
log.Debugf("loading group %v", group)
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
group.Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
log.Debugf("loading group %v", group)
}
}
return err
}
func (ps *profileStore) GetProfileCopy() *model.Profile {
return ps.profile.GetCopy()
}
func (ps *profileStore) eventHandler() {
for {
ev := ps.queue.Next()
switch ev.EventType {
case event.BlockPeer:
contact, exists := ps.profile.GetContact(ev.Data["Onion"])
if exists {
contact.Blocked = true
ps.save()
}
case event.PeerCreated:
var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
pp.Timeline.SetMessages(ss.Read())
ps.streamStores[pp.Onion] = ss
ps.save()
case event.GroupCreated:
var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
ps.profile.AddGroup(group)
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
ps.save()
case event.SetProfileName:
ps.profile.Name = ev.Data[event.ProfileName]
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
ps.save()
case event.SetAttribute:
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
case event.SetPeerAttribute:
contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
if exists {
contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
}
case event.SetGroupAttribute:
group, exists := ps.profile.Groups[ev.Data[event.GroupID]]
if exists {
group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()
}
case event.NewGroupInvite:
var gci protocol.CwtchPeerPacket //protocol.GroupChatInvite
proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &gci)
groupInvite := gci.GroupChatInvite
ps.profile.ProcessInvite(groupInvite, ev.Data[event.RemotePeer])
ps.save()
group := ps.profile.Groups[groupInvite.GetGroupName()]
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
case event.NewMessageFromGroup:
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
// TODO: Sig, prev message Sig
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature])}
//ps.profile.Groups[groupid].AddMessage(message) <- wants protocol.DecryptedGroupMessage so group.Timeline will drift here from launch when it's initialized
ps.streamStores[groupid].Write(message)
default:
return
}
}
}
func (ps *profileStore) Shutdown() {
ps.queue.Shutdown()
}