Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

profile_store.go 8.5KB


  1. package storage
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/model"
  5. "encoding/json"
  6. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  7. "os"
  8. "time"
  9. )
  10. const profileFilename = "profile"
  11. type profileStore struct {
  12. fs FileStore
  13. streamStores map[string]StreamStore
  14. directory string
  15. password string
  16. profile *model.Profile
  17. eventManager event.Manager
  18. queue event.Queue
  19. writer bool
  20. }
  21. // ProfileStore is an interface to managing the storage of Cwtch Profiles
  22. type ProfileStore interface {
  23. Load() error
  24. Shutdown()
  25. GetProfileCopy(timeline bool) *model.Profile
  26. GetNewPeerMessage() *event.Event
  27. GetStatusMessages() []*event.Event
  28. }
  29. // NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
  30. // directory should be $appDir/profiles/$rand
  31. func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
  32. os.Mkdir(directory, 0700)
  33. ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
  34. //ps.queue = event.NewQueue(100)
  35. ps.queue = event.NewQueue()
  36. if profile != nil {
  37. ps.save()
  38. }
  39. go ps.eventHandler()
  40. ps.eventManager.Subscribe(event.BlockPeer, ps.queue)
  41. ps.eventManager.Subscribe(event.UnblockPeer, ps.queue)
  42. ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
  43. ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
  44. ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
  45. ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
  46. ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
  47. ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
  48. ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
  49. ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
  50. ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
  51. ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
  52. ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
  53. ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
  54. ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
  55. return ps
  56. }
  57. // ReadProfile reads a profile from storqage and returns the profile
  58. // directory should be $appDir/profiles/$rand
  59. func ReadProfile(directory, password string) (*model.Profile, error) {
  60. os.Mkdir(directory, 0700)
  61. ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
  62. err := ps.Load()
  63. if err != nil {
  64. return nil, err
  65. }
  66. profile := ps.GetProfileCopy(true)
  67. return profile, nil
  68. }
  69. // NewProfile creates a new profile for use in the profile store.
  70. func NewProfile(name string) *model.Profile {
  71. profile := model.GenerateNewProfile(name)
  72. return profile
  73. }
  74. // GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
  75. func (ps *profileStore) GetNewPeerMessage() *event.Event {
  76. message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
  77. return &message
  78. }
  79. func (ps *profileStore) GetStatusMessages() []*event.Event {
  80. messages := []*event.Event{}
  81. for _, contact := range ps.profile.Contacts {
  82. message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
  83. event.RemotePeer: string(contact.Onion),
  84. event.ConnectionState: contact.State,
  85. })
  86. messages = append(messages, &message)
  87. }
  88. doneServers := make(map[string]bool)
  89. for _, group := range ps.profile.Groups {
  90. if _, exists := doneServers[group.GroupServer]; !exists {
  91. message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
  92. event.GroupServer: string(group.GroupServer),
  93. event.ConnectionState: group.State,
  94. })
  95. messages = append(messages, &message)
  96. doneServers[group.GroupServer] = true
  97. }
  98. }
  99. return messages
  100. }
  101. func (ps *profileStore) save() error {
  102. if ps.writer {
  103. bytes, _ := json.Marshal(ps.profile)
  104. return ps.fs.Save(bytes)
  105. }
  106. return nil
  107. }
  108. // Load instantiates a cwtchPeer from the file store
  109. func (ps *profileStore) Load() error {
  110. decrypted, err := ps.fs.Load()
  111. if err != nil {
  112. return err
  113. }
  114. cp := new(model.Profile)
  115. err = json.Unmarshal(decrypted, &cp)
  116. if err == nil {
  117. ps.profile = cp
  118. for gid, group := range cp.Groups {
  119. ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
  120. cp.Groups[gid].Timeline.SetMessages(ss.Read())
  121. ps.streamStores[group.GroupID] = ss
  122. }
  123. }
  124. return err
  125. }
  126. func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile {
  127. return ps.profile.GetCopy(timeline)
  128. }
  129. func (ps *profileStore) eventHandler() {
  130. for {
  131. ev := ps.queue.Next()
  132. switch ev.EventType {
  133. case event.BlockPeer:
  134. contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
  135. if exists {
  136. contact.Blocked = true
  137. ps.save()
  138. }
  139. case event.UnblockPeer:
  140. contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
  141. if exists {
  142. contact.Blocked = false
  143. ps.save()
  144. }
  145. case event.PeerCreated:
  146. var pp *model.PublicProfile
  147. json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
  148. ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
  149. // TODO: configure - allow peers to be configured to turn on limited storage
  150. /*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
  151. pp.Timeline.SetMessages(ss.Read())
  152. ps.streamStores[pp.Onion] = ss
  153. ps.save()*/
  154. case event.GroupCreated:
  155. var group *model.Group
  156. json.Unmarshal([]byte(ev.Data[event.Data]), &group)
  157. ps.profile.AddGroup(group)
  158. ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
  159. ps.save()
  160. case event.SetProfileName:
  161. ps.profile.Name = ev.Data[event.ProfileName]
  162. ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
  163. ps.save()
  164. case event.SetAttribute:
  165. ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  166. ps.save()
  167. case event.SetPeerAttribute:
  168. contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
  169. if exists {
  170. contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  171. ps.save()
  172. } else {
  173. log.Errorf("error setting attribute on peer %v peer does not exist", ev)
  174. }
  175. case event.SetGroupAttribute:
  176. group := ps.profile.GetGroup(ev.Data[event.GroupID])
  177. if group != nil {
  178. group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  179. ps.save()
  180. } else {
  181. log.Errorf("error setting attribute on group %v group does not exist", ev)
  182. }
  183. case event.AcceptGroupInvite:
  184. err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
  185. if err == nil {
  186. ps.save()
  187. } else {
  188. log.Errorf("error accepting group invite")
  189. }
  190. case event.NewGroupInvite:
  191. gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
  192. log.Errorf("gid: %v err:%v\n", gid, err)
  193. if err == nil {
  194. ps.save()
  195. group := ps.profile.Groups[gid]
  196. ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
  197. } else {
  198. log.Errorf("error storing new group invite: %v (%v)", err, ev)
  199. }
  200. case event.NewMessageFromGroup:
  201. groupid := ev.Data[event.GroupID]
  202. received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
  203. sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
  204. message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
  205. ss, exists := ps.streamStores[groupid]
  206. if exists {
  207. ss.Write(message)
  208. } else {
  209. log.Errorf("error storing new group message: %v stream store does not exist", ev)
  210. }
  211. case event.PeerStateChange:
  212. if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
  213. ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
  214. }
  215. case event.ServerStateChange:
  216. for _, group := range ps.profile.Groups {
  217. if group.GroupServer == ev.Data[event.GroupServer] {
  218. group.State = ev.Data[event.ConnectionState]
  219. }
  220. }
  221. case event.DeleteContact:
  222. onion := ev.Data[event.RemotePeer]
  223. ps.profile.DeleteContact(onion)
  224. ps.save()
  225. case event.DeleteGroup:
  226. groupID := ev.Data[event.GroupID]
  227. ps.profile.DeleteGroup(groupID)
  228. ps.save()
  229. ss, exists := ps.streamStores[groupID]
  230. if exists {
  231. ss.Delete()
  232. delete(ps.streamStores, groupID)
  233. }
  234. default:
  235. return
  236. }
  237. }
  238. }
  239. func (ps *profileStore) Shutdown() {
  240. if ps.queue != nil {
  241. ps.queue.Shutdown()
  242. }
  243. }