2018-10-05 03:18:34 +00:00
package storage
2018-10-06 03:50:55 +00:00
import (
2019-01-21 20:11:40 +00:00
"cwtch.im/cwtch/event"
2019-01-14 20:09:25 +00:00
"cwtch.im/cwtch/model"
2019-01-29 20:56:59 +00:00
"cwtch.im/cwtch/protocol"
2019-01-14 20:09:25 +00:00
"encoding/json"
2019-02-03 01:18:33 +00:00
"git.openprivacy.ca/openprivacy/libricochet-go/log"
2019-01-29 20:56:59 +00:00
"github.com/golang/protobuf/proto"
"os"
"time"
2018-10-06 03:50:55 +00:00
)
2019-01-29 20:56:59 +00:00
const profileFilename = "profile"
2019-01-14 20:09:25 +00:00
type profileStore struct {
2019-01-21 20:11:40 +00:00
fs FileStore
2019-01-29 20:56:59 +00:00
streamStores map [ string ] StreamStore
directory string
password string
2019-01-21 20:11:40 +00:00
profile * model . Profile
eventManager * event . Manager
queue * event . Queue
2019-01-14 20:09:25 +00:00
}
2018-10-06 03:50:55 +00:00
// ProfileStore is an interface to managing the storage of Cwtch Profiles
type ProfileStore interface {
2019-01-29 20:56:59 +00:00
Init ( name string ) error
2019-01-21 20:11:40 +00:00
Load ( ) error
Shutdown ( )
GetProfileCopy ( ) * model . Profile
2019-01-14 20:09:25 +00:00
}
2019-01-21 20:11:40 +00:00
// NewProfileStore returns a profile store backed by a filestore listening for events and saving them
2019-01-29 20:56:59 +00:00
// directory should be $appDir/profiles/$rand
func NewProfileStore ( eventManager * event . Manager , directory , password string ) ProfileStore {
os . Mkdir ( directory , 0700 )
ps := & profileStore { fs : NewFileStore ( directory , profileFilename , password ) , password : password , directory : directory , profile : nil , eventManager : eventManager , streamStores : map [ string ] StreamStore { } }
ps . queue = event . NewEventQueue ( 100 )
go ps . eventHandler ( )
2019-01-21 20:11:40 +00:00
2019-01-29 20:56:59 +00:00
ps . eventManager . Subscribe ( event . BlockPeer , ps . queue . EventChannel )
2019-02-03 01:18:33 +00:00
ps . eventManager . Subscribe ( event . PeerCreated , ps . queue . EventChannel )
ps . eventManager . Subscribe ( event . GroupCreated , ps . queue . EventChannel )
2019-01-29 20:56:59 +00:00
ps . eventManager . Subscribe ( event . SetProfileName , ps . queue . EventChannel )
ps . eventManager . Subscribe ( event . SetAttribute , ps . queue . EventChannel )
ps . eventManager . Subscribe ( event . SetPeerAttribute , ps . queue . EventChannel )
ps . eventManager . Subscribe ( event . SetGroupAttribute , ps . queue . EventChannel )
ps . eventManager . Subscribe ( event . NewGroupInvite , ps . queue . EventChannel )
ps . eventManager . Subscribe ( event . NewMessageFromGroup , ps . queue . EventChannel )
return ps
2019-01-14 20:09:25 +00:00
}
2019-01-29 20:56:59 +00:00
func ( ps * profileStore ) Init ( name string ) error {
2019-01-21 20:11:40 +00:00
ps . profile = model . GenerateNewProfile ( name )
2019-01-29 20:56:59 +00:00
return ps . save ( )
2019-01-21 20:11:40 +00:00
}
2019-01-29 20:56:59 +00:00
func ( ps * profileStore ) save ( ) error {
2019-01-21 20:11:40 +00:00
bytes , _ := json . Marshal ( ps . profile )
2019-01-14 20:09:25 +00:00
return ps . fs . Save ( bytes )
}
// Load instantiates a cwtchPeer from the file store
2019-01-21 20:11:40 +00:00
func ( ps * profileStore ) Load ( ) error {
2019-01-14 20:09:25 +00:00
decrypted , err := ps . fs . Load ( )
if err != nil {
2019-01-21 20:11:40 +00:00
return err
2019-01-14 20:09:25 +00:00
}
cp := new ( model . Profile )
err = json . Unmarshal ( decrypted , & cp )
if err == nil {
2019-01-21 20:11:40 +00:00
ps . profile = cp
2019-01-29 20:56:59 +00:00
2019-02-03 01:18:33 +00:00
for _ , profile := range cp . Contacts {
ss := NewStreamStore ( ps . directory , profile . LocalID , ps . password )
profile . Timeline . SetMessages ( ss . Read ( ) )
ps . streamStores [ profile . Onion ] = ss
}
for _ , group := range cp . Groups {
log . Debugf ( "loading group %v" , group )
ss := NewStreamStore ( ps . directory , group . LocalID , ps . password )
group . Timeline . SetMessages ( ss . Read ( ) )
ps . streamStores [ group . GroupID ] = ss
log . Debugf ( "loading group %v" , group )
}
2019-01-29 20:56:59 +00:00
}
2019-01-21 20:11:40 +00:00
return err
}
func ( ps * profileStore ) GetProfileCopy ( ) * model . Profile {
return ps . profile . GetCopy ( )
}
func ( ps * profileStore ) eventHandler ( ) {
for {
ev := ps . queue . Next ( )
switch ev . EventType {
case event . BlockPeer :
contact , exists := ps . profile . GetContact ( ev . Data [ "Onion" ] )
if exists {
contact . Blocked = true
2019-01-29 20:56:59 +00:00
ps . save ( )
}
2019-02-03 01:18:33 +00:00
case event . PeerCreated :
var pp * model . PublicProfile
json . Unmarshal ( [ ] byte ( ev . Data [ event . Data ] ) , & pp )
ps . profile . AddContact ( ev . Data [ event . RemotePeer ] , pp )
ss := NewStreamStore ( ps . directory , pp . LocalID , ps . password )
pp . Timeline . SetMessages ( ss . Read ( ) )
ps . streamStores [ pp . Onion ] = ss
ps . save ( )
case event . GroupCreated :
var group * model . Group
json . Unmarshal ( [ ] byte ( ev . Data [ event . Data ] ) , & group )
ps . profile . AddGroup ( group )
ps . streamStores [ group . GroupID ] = NewStreamStore ( ps . directory , group . LocalID , ps . password )
ps . save ( )
2019-01-29 20:56:59 +00:00
case event . SetProfileName :
ps . profile . Name = ev . Data [ event . ProfileName ]
ps . profile . SetAttribute ( "name" , ev . Data [ event . ProfileName ] )
ps . save ( )
case event . SetAttribute :
ps . profile . SetAttribute ( ev . Data [ event . Key ] , ev . Data [ event . Data ] )
ps . save ( )
case event . SetPeerAttribute :
contact , exists := ps . profile . GetContact ( ev . Data [ event . RemotePeer ] )
if exists {
contact . SetAttribute ( ev . Data [ event . Key ] , ev . Data [ event . Data ] )
ps . save ( )
2019-01-21 20:11:40 +00:00
}
2019-01-29 20:56:59 +00:00
case event . SetGroupAttribute :
group , exists := ps . profile . Groups [ ev . Data [ event . GroupID ] ]
if exists {
group . SetAttribute ( ev . Data [ event . Key ] , ev . Data [ event . Data ] )
ps . save ( )
}
case event . NewGroupInvite :
var gci protocol . CwtchPeerPacket //protocol.GroupChatInvite
proto . Unmarshal ( [ ] byte ( ev . Data [ "GroupInvite" ] ) , & gci )
groupInvite := gci . GroupChatInvite
ps . profile . ProcessInvite ( groupInvite , ev . Data [ event . RemotePeer ] )
ps . save ( )
group := ps . profile . Groups [ groupInvite . GetGroupName ( ) ]
ps . streamStores [ group . GroupID ] = NewStreamStore ( ps . directory , group . LocalID , ps . password )
case event . NewMessageFromGroup :
groupid := ev . Data [ event . GroupID ]
received , _ := time . Parse ( time . RFC3339Nano , ev . Data [ event . TimestampReceived ] )
sent , _ := time . Parse ( time . RFC3339Nano , ev . Data [ event . TimestampSent ] )
// TODO: Sig, prev message Sig
message := model . Message { Received : received , Timestamp : sent , Message : ev . Data [ event . Data ] , PeerID : ev . Data [ event . RemotePeer ] }
//ps.profile.Groups[groupid].AddMessage(message) <- wants protocol.DecryptedGroupMessage so group.Timeline will drift here from launch when it's initialized
ps . streamStores [ groupid ] . Write ( message )
2019-01-21 20:11:40 +00:00
default :
return
}
}
}
func ( ps * profileStore ) Shutdown ( ) {
ps . queue . Shutdown ( )
2018-10-06 03:50:55 +00:00
}