Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

stream_store.go 3.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package storage
  2. import (
  3. "cwtch.im/cwtch/model"
  4. "encoding/json"
  5. "fmt"
  6. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sync"
  11. )
  12. const (
  13. fileStorePartitions = 16
  14. bytesPerFile = 15 * 1024
  15. )
  16. // streamStore is a file-backed implementation of StreamStore using an in memory buffer of ~16KB and a rotating set of files
  17. type streamStore struct {
  18. password string
  19. storeDirectory string
  20. filenameBase string
  21. // Buffer is used just for current file to write to
  22. messages []model.Message
  23. bufferByteCount int
  24. lock sync.Mutex
  25. }
  26. // StreamStore provides a stream like interface to encrypted storage
  27. type StreamStore interface {
  28. Write(message model.Message)
  29. Read() []model.Message
  30. Delete()
  31. }
  32. // NewStreamStore returns an initialized StreamStore ready for reading and writing
  33. func NewStreamStore(directory string, filenameBase string, password string) (store StreamStore) {
  34. ss := &streamStore{storeDirectory: directory, filenameBase: filenameBase, password: password}
  35. os.Mkdir(ss.storeDirectory, 0700)
  36. ss.initBuffer()
  37. ss.initBufferFromStorage()
  38. return ss
  39. }
  40. func (ss *streamStore) initBuffer() {
  41. ss.messages = []model.Message{}
  42. ss.bufferByteCount = 0
  43. }
  44. func (ss *streamStore) initBufferFromStorage() error {
  45. filename := fmt.Sprintf("%s.%d", ss.filenameBase, 0)
  46. bytes, _ := readEncryptedFile(ss.storeDirectory, filename, ss.password)
  47. msgs := []model.Message{}
  48. err := json.Unmarshal([]byte(bytes), &msgs)
  49. if err != nil {
  50. return err
  51. }
  52. for _, message := range msgs {
  53. ss.updateBuffer(message)
  54. }
  55. return nil
  56. }
  57. func (ss *streamStore) updateBuffer(m model.Message) {
  58. ss.messages = append(ss.messages, m)
  59. ss.bufferByteCount += (model.MessageBaseSize * 1.5) + len(m.Message)
  60. }
  61. func (ss *streamStore) updateFile() error {
  62. msgs, err := json.Marshal(ss.messages)
  63. if err != nil {
  64. log.Errorf("Failed to marshal group messages %v\n", err)
  65. }
  66. // ENCRYPT
  67. key, salt, _ := createKey(ss.password)
  68. encryptedMsgs, err := encryptFileData(msgs, key)
  69. if err != nil {
  70. log.Errorf("Failed to encrypt messages: %v\n", err)
  71. return err
  72. }
  73. encryptedMsgs = append(salt[:], encryptedMsgs...)
  74. ioutil.WriteFile(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, 0)), encryptedMsgs, 0700)
  75. return nil
  76. }
  77. func (ss *streamStore) rotateFileStore() {
  78. os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, fileStorePartitions-1)))
  79. for i := fileStorePartitions - 2; i >= 0; i-- {
  80. os.Rename(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)), path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i+1)))
  81. }
  82. }
  83. // Delete deletes all the files associated with this streamStore
  84. func (ss *streamStore) Delete() {
  85. for i := fileStorePartitions - 1; i >= 0; i-- {
  86. os.Remove(path.Join(ss.storeDirectory, fmt.Sprintf("%s.%d", ss.filenameBase, i)))
  87. }
  88. }
  89. // Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
  90. func (ss *streamStore) Read() (messages []model.Message) {
  91. ss.lock.Lock()
  92. defer ss.lock.Unlock()
  93. resp := []model.Message{}
  94. for i := fileStorePartitions - 1; i >= 0; i-- {
  95. filename := fmt.Sprintf("%s.%d", ss.filenameBase, i)
  96. bytes, err := readEncryptedFile(ss.storeDirectory, filename, ss.password)
  97. if err != nil {
  98. continue
  99. }
  100. msgs := []model.Message{}
  101. json.Unmarshal([]byte(bytes), &msgs)
  102. resp = append(resp, msgs...)
  103. }
  104. // 2019.10.10 "Acknowledged" & "ReceivedByServer" are added to the struct, populate it as true for old ones without
  105. for i := 0; i < len(resp) && (resp[i].Acknowledged == false && resp[i].ReceivedByServer == false); i++ {
  106. resp[i].Acknowledged = true
  107. resp[i].ReceivedByServer = true
  108. }
  109. return resp
  110. }
  111. // AddMessage adds a GroupMessage to the store
  112. func (ss *streamStore) Write(m model.Message) {
  113. ss.lock.Lock()
  114. defer ss.lock.Unlock()
  115. ss.updateBuffer(m)
  116. ss.updateFile()
  117. if ss.bufferByteCount > bytesPerFile {
  118. log.Debugf("rotating log file")
  119. ss.rotateFileStore()
  120. ss.initBuffer()
  121. }
  122. }