Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
11KB

  1. package storage
  2. import (
  3. "cwtch.im/cwtch/event"
  4. "cwtch.im/cwtch/model"
  5. "encoding/json"
  6. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  7. "os"
  8. "time"
  9. )
  10. const groupIDLen = 32
  11. const peerIDLen = 56
  12. const profileFilename = "profile"
  13. type profileStore struct {
  14. fs FileStore
  15. streamStores map[string]StreamStore // map [groupId|onion] StreamStore
  16. directory string
  17. password string
  18. profile *model.Profile
  19. eventManager event.Manager
  20. queue event.Queue
  21. writer bool
  22. }
  23. // ProfileStore is an interface to managing the storage of Cwtch Profiles
  24. type ProfileStore interface {
  25. Load() error
  26. Shutdown()
  27. Delete()
  28. GetProfileCopy(timeline bool) *model.Profile
  29. GetNewPeerMessage() *event.Event
  30. GetStatusMessages() []*event.Event
  31. }
  32. // NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
  33. // directory should be $appDir/profiles/$rand
  34. func NewProfileWriterStore(eventManager event.Manager, directory, password string, profile *model.Profile) ProfileStore {
  35. os.Mkdir(directory, 0700)
  36. ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
  37. //ps.queue = event.NewQueue(100)
  38. ps.queue = event.NewQueue()
  39. if profile != nil {
  40. ps.save()
  41. }
  42. go ps.eventHandler()
  43. ps.eventManager.Subscribe(event.BlockPeer, ps.queue)
  44. ps.eventManager.Subscribe(event.UnblockPeer, ps.queue)
  45. ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
  46. ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
  47. ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
  48. ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
  49. ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
  50. ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
  51. ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
  52. ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
  53. ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
  54. ps.eventManager.Subscribe(event.PeerStateChange, ps.queue)
  55. ps.eventManager.Subscribe(event.ServerStateChange, ps.queue)
  56. ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
  57. ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
  58. ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
  59. return ps
  60. }
  61. // ReadProfile reads a profile from storqage and returns the profile
  62. // directory should be $appDir/profiles/$rand
  63. func ReadProfile(directory, password string) (*model.Profile, error) {
  64. os.Mkdir(directory, 0700)
  65. ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: nil, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
  66. err := ps.Load()
  67. if err != nil {
  68. return nil, err
  69. }
  70. profile := ps.GetProfileCopy(true)
  71. return profile, nil
  72. }
  73. // NewProfile creates a new profile for use in the profile store.
  74. func NewProfile(name string) *model.Profile {
  75. profile := model.GenerateNewProfile(name)
  76. return profile
  77. }
  78. // GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
  79. func (ps *profileStore) GetNewPeerMessage() *event.Event {
  80. message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
  81. return &message
  82. }
  83. func (ps *profileStore) GetStatusMessages() []*event.Event {
  84. messages := []*event.Event{}
  85. for _, contact := range ps.profile.Contacts {
  86. message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
  87. event.RemotePeer: string(contact.Onion),
  88. event.ConnectionState: contact.State,
  89. })
  90. messages = append(messages, &message)
  91. }
  92. doneServers := make(map[string]bool)
  93. for _, group := range ps.profile.Groups {
  94. if _, exists := doneServers[group.GroupServer]; !exists {
  95. message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
  96. event.GroupServer: string(group.GroupServer),
  97. event.ConnectionState: group.State,
  98. })
  99. messages = append(messages, &message)
  100. doneServers[group.GroupServer] = true
  101. }
  102. }
  103. return messages
  104. }
  105. func (ps *profileStore) ChangePassword(oldpass, newpass, eventID string) {
  106. if oldpass != ps.password {
  107. ps.eventManager.Publish(event.NewEventList(event.ChangePasswordError, event.Error, "Supplied current password does not match", event.EventID, eventID))
  108. return
  109. }
  110. newStreamStores := map[string]StreamStore{}
  111. idToNewLocalID := map[string]string{}
  112. // Generate all new StreamStores with the new password and write all the old StreamStore data into these ones
  113. for ssid, ss := range ps.streamStores {
  114. // New ss with new pass and new localID
  115. newlocalID := model.GenerateRandomID()
  116. idToNewLocalID[ssid] = newlocalID
  117. newSS := NewStreamStore(ps.directory, newlocalID, newpass)
  118. newStreamStores[ssid] = newSS
  119. // write whole store
  120. messages := ss.Read()
  121. newSS.WriteN(messages)
  122. }
  123. // Switch over
  124. oldStreamStores := ps.streamStores
  125. ps.streamStores = newStreamStores
  126. for ssid, newLocalID := range idToNewLocalID {
  127. if len(ssid) == groupIDLen {
  128. ps.profile.Groups[ssid].LocalID = newLocalID
  129. } else {
  130. ps.profile.Contacts[ssid].LocalID = newLocalID
  131. }
  132. }
  133. ps.password = newpass
  134. ps.fs.ChangePassword(newpass)
  135. ps.save()
  136. // Clean up
  137. for _, oldss := range oldStreamStores {
  138. oldss.Delete()
  139. }
  140. ps.eventManager.Publish(event.NewEventList(event.ChangePasswordSuccess, event.EventID, eventID))
  141. return
  142. }
  143. func (ps *profileStore) save() error {
  144. if ps.writer {
  145. bytes, _ := json.Marshal(ps.profile)
  146. return ps.fs.Write(bytes)
  147. }
  148. return nil
  149. }
  150. // Read instantiates a cwtchPeer from the file store
  151. func (ps *profileStore) Load() error {
  152. decrypted, err := ps.fs.Read()
  153. if err != nil {
  154. return err
  155. }
  156. cp := new(model.Profile)
  157. err = json.Unmarshal(decrypted, &cp)
  158. if err == nil {
  159. ps.profile = cp
  160. for gid, group := range cp.Groups {
  161. ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
  162. cp.Groups[gid].Timeline.SetMessages(ss.Read())
  163. ps.streamStores[group.GroupID] = ss
  164. }
  165. }
  166. return err
  167. }
  168. func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile {
  169. return ps.profile.GetCopy(timeline)
  170. }
  171. func (ps *profileStore) eventHandler() {
  172. log.Infoln("eventHandler()!")
  173. for {
  174. ev := ps.queue.Next()
  175. log.Infof("eventHandler event %v\n", ev)
  176. switch ev.EventType {
  177. case event.BlockPeer:
  178. contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
  179. if exists {
  180. contact.Blocked = true
  181. ps.save()
  182. }
  183. case event.UnblockPeer:
  184. contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
  185. if exists {
  186. contact.Blocked = false
  187. ps.save()
  188. }
  189. case event.PeerCreated:
  190. var pp *model.PublicProfile
  191. json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
  192. ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
  193. // TODO: configure - allow peers to be configured to turn on limited storage
  194. /*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
  195. pp.Timeline.SetMessages(ss.Read())
  196. ps.streamStores[pp.Onion] = ss
  197. ps.save()*/
  198. case event.GroupCreated:
  199. var group *model.Group
  200. json.Unmarshal([]byte(ev.Data[event.Data]), &group)
  201. ps.profile.AddGroup(group)
  202. ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
  203. ps.save()
  204. case event.SetProfileName:
  205. ps.profile.Name = ev.Data[event.ProfileName]
  206. ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
  207. ps.save()
  208. case event.SetAttribute:
  209. ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  210. ps.save()
  211. case event.SetPeerAttribute:
  212. contact, exists := ps.profile.GetContact(ev.Data[event.RemotePeer])
  213. if exists {
  214. contact.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  215. ps.save()
  216. } else {
  217. log.Errorf("error setting attribute on peer %v peer does not exist", ev)
  218. }
  219. case event.SetGroupAttribute:
  220. group := ps.profile.GetGroup(ev.Data[event.GroupID])
  221. if group != nil {
  222. group.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
  223. ps.save()
  224. } else {
  225. log.Errorf("error setting attribute on group %v group does not exist", ev)
  226. }
  227. case event.AcceptGroupInvite:
  228. err := ps.profile.AcceptInvite(ev.Data[event.GroupID])
  229. if err == nil {
  230. ps.save()
  231. } else {
  232. log.Errorf("error accepting group invite")
  233. }
  234. case event.NewGroupInvite:
  235. gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
  236. log.Errorf("gid: %v err:%v\n", gid, err)
  237. if err == nil {
  238. ps.save()
  239. group := ps.profile.Groups[gid]
  240. ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password)
  241. } else {
  242. log.Errorf("error storing new group invite: %v (%v)", err, ev)
  243. }
  244. case event.NewMessageFromGroup:
  245. groupid := ev.Data[event.GroupID]
  246. received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
  247. sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
  248. message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: []byte(ev.Data[event.Signature]), PreviousMessageSig: []byte(ev.Data[event.PreviousSignature])}
  249. ss, exists := ps.streamStores[groupid]
  250. if exists {
  251. ss.Write(message)
  252. } else {
  253. log.Errorf("error storing new group message: %v stream store does not exist", ev)
  254. }
  255. case event.PeerStateChange:
  256. if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
  257. ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
  258. }
  259. case event.ServerStateChange:
  260. for _, group := range ps.profile.Groups {
  261. if group.GroupServer == ev.Data[event.GroupServer] {
  262. group.State = ev.Data[event.ConnectionState]
  263. }
  264. }
  265. case event.DeleteContact:
  266. onion := ev.Data[event.RemotePeer]
  267. ps.profile.DeleteContact(onion)
  268. ps.save()
  269. case event.DeleteGroup:
  270. groupID := ev.Data[event.GroupID]
  271. ps.profile.DeleteGroup(groupID)
  272. ps.save()
  273. ss, exists := ps.streamStores[groupID]
  274. if exists {
  275. ss.Delete()
  276. delete(ps.streamStores, groupID)
  277. }
  278. case event.ChangePassword:
  279. oldpass := ev.Data[event.Password]
  280. newpass := ev.Data[event.NewPassword]
  281. ps.ChangePassword(oldpass, newpass, ev.EventID)
  282. default:
  283. return
  284. }
  285. }
  286. }
  287. func (ps *profileStore) Shutdown() {
  288. if ps.queue != nil {
  289. ps.queue.Shutdown()
  290. }
  291. }
  292. func (ps *profileStore) Delete() {
  293. log.Debugf("Delete ProfileStore for %v\n", ps.profile.Onion)
  294. for _, ss := range ps.streamStores {
  295. ss.Delete()
  296. }
  297. ps.fs.Delete()
  298. err := os.RemoveAll(ps.directory)
  299. if err != nil {
  300. log.Errorf("ProfileStore Delete error on RemoveAll on %v was %v\n", ps.directory, err)
  301. }
  302. }