Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

profile_store.go 5.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package storage
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/model"
  5. "cwtch.im/cwtch/protocol"
  6. "encoding/json"
  7. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  8. "github.com/golang/protobuf/proto"
  9. "os"
  10. "time"
  11. )
  12. const profileFilename = "profile"
  13. type profileStore struct {
  14. fs FileStore
  15. streamStores map[string]StreamStore
  16. directory string
  17. password string
  18. profile *model.Profile
  19. eventManager *event.Manager
  20. queue *event.Queue
  21. }
  22. // ProfileStore is an interface to managing the storage of Cwtch Profiles
  23. type ProfileStore interface {
  24. Load() error
  25. Shutdown()
  26. GetProfileCopy() *model.Profile
  27. }
  28. // NewProfileStore returns a profile store backed by a filestore listening for events and saving them
  29. // directory should be $appDir/profiles/$rand
  30. func NewProfileStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore {
  31. os.Mkdir(directory, 0700)
  32. ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}}
  33. ps.queue = event.NewEventQueue(100)
  34. go ps.eventHandler()
  35. ps.eventManager.Subscribe(event.BlockPeer, ps.queue.EventChannel)
  36. ps.eventManager.Subscribe(event.PeerCreated, ps.queue.EventChannel)
  37. ps.eventManager.Subscribe(event.GroupCreated, ps.queue.EventChannel)
  38. ps.eventManager.Subscribe(event.SetProfileName, ps.queue.EventChannel)
  39. ps.eventManager.Subscribe(event.SetAttribute, ps.queue.EventChannel)
  40. ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue.EventChannel)
  41. ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue.EventChannel)
  42. ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel)
  43. ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel)
  44. return ps
  45. }
  46. // NewProfile creates a new profile for use in the profile store.
  47. func NewProfile(name string) *model.Profile {
  48. profile := model.GenerateNewProfile(name)
  49. return profile
  50. }
  51. func (ps *profileStore) save() error {
  52. bytes, _ := json.Marshal(ps.profile)
  53. return ps.fs.Save(bytes)
  54. }
  55. // Load instantiates a cwtchPeer from the file store
  56. func (ps *profileStore) Load() error {
  57. decrypted, err := ps.fs.Load()
  58. if err != nil {
  59. return err
  60. }
  61. cp := new(model.Profile)
  62. err = json.Unmarshal(decrypted, &cp)
  63. if err == nil {
  64. ps.profile = cp
  65. for _, profile := range cp.Contacts {
  66. ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
  67. profile.Timeline.SetMessages(ss.Read())
  68. ps.streamStores[profile.Onion] = ss
  69. }
  70. for _, group := range cp.Groups {
  71. log.Debugf("loading group %v", group)
  72. ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
  73. group.Timeline.SetMessages(ss.Read())
  74. ps.streamStores[group.GroupID] = ss
  75. log.Debugf("loading group %v", group)
  76. }
  77. }
  78. return err
  79. }
  80. func (ps *profileStore) GetProfileCopy() *model.Profile {
  81. return ps.profile.GetCopy()
  82. }
  83. func (ps *profileStore) eventHandler() {
  84. for {
  85. ev := ps.queue.Next()
  86. switch ev.EventType {
  87. case event.BlockPeer:
  88. contact, exists := ps.profile.GetContact(ev.Data["Onion"])
  89. if exists {
  90. contact.Blocked = true
  91. ps.save()
  92. }
  93. case event.PeerCreated:
  94. var pp *model.PublicProfile
  95. json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
  96. ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
  97. ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
  98. pp.Timeline.SetMessages(ss.Read())
  99. ps.streamStores[pp.Onion] = ss
  100. ps.save()
  101. case event.GroupCreated:
  102. var group *model.Group
  103. json.Unmarshal([]byte(ev.Data[event.Data]), &group)
  104. ps.profile.AddGroup(group)
  105. ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
  106. ps.save()
  107. case event.SetProfileName:
  108. ps.profile.Name = ev.Data[event.ProfileName]
  109. ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
  110. ps.save()
  111. case event.SetAttribute:
  112. ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  113. ps.save()
  114. case event.SetPeerAttribute:
  115. contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
  116. if exists {
  117. contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  118. ps.save()
  119. }
  120. case event.SetGroupAttribute:
  121. group, exists := ps.profile.Groups[ev.Data[event.GroupID]]
  122. if exists {
  123. group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  124. ps.save()
  125. }
  126. case event.NewGroupInvite:
  127. var gci protocol.CwtchPeerPacket //protocol.GroupChatInvite
  128. proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &gci)
  129. groupInvite := gci.GroupChatInvite
  130. ps.profile.ProcessInvite(groupInvite, ev.Data[event.RemotePeer])
  131. ps.save()
  132. group := ps.profile.Groups[groupInvite.GetGroupName()]
  133. ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
  134. case event.NewMessageFromGroup:
  135. groupid := ev.Data[event.GroupID]
  136. received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
  137. sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
  138. // TODO: Sig, prev message Sig
  139. message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature])}
  140. //ps.profile.Groups[groupid].AddMessage(message) <- wants protocol.DecryptedGroupMessage so group.Timeline will drift here from launch when it's initialized
  141. ps.streamStores[groupid].Write(message)
  142. default:
  143. return
  144. }
  145. }
  146. }
  147. func (ps *profileStore) Shutdown() {
  148. ps.queue.Shutdown()
  149. }