|
|
|
@ -8,6 +8,7 @@ import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"log"
|
|
|
|
|
"os"
|
|
|
|
|
"path"
|
|
|
|
|
"sync"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -57,7 +58,7 @@ func (ms *MessageStore) initAndLoadFiles() error {
|
|
|
|
|
ms.activeLogFile = nil
|
|
|
|
|
for i := fileStorePartitions - 1; i >= 0; i-- {
|
|
|
|
|
ms.filePos = 0
|
|
|
|
|
filename := fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i)
|
|
|
|
|
filename := path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i))
|
|
|
|
|
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("Error: MessageStore could not open: %v: %v", filename, err)
|
|
|
|
@ -96,13 +97,13 @@ func (ms *MessageStore) updateFile(gm *protocol.GroupMessage) {
|
|
|
|
|
|
|
|
|
|
func (ms *MessageStore) rotateFileStore() {
|
|
|
|
|
ms.activeLogFile.Close()
|
|
|
|
|
os.Remove(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, fileStorePartitions-1))
|
|
|
|
|
os.Remove(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, fileStorePartitions-1)))
|
|
|
|
|
|
|
|
|
|
for i := fileStorePartitions - 2; i >= 0; i-- {
|
|
|
|
|
os.Rename(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i), fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, i+1))
|
|
|
|
|
os.Rename(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i)), path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, i+1)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
f, err := os.OpenFile(fmt.Sprintf("%s%s.%d", ms.storeDirectory, fileStoreFilename, 0), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
|
|
|
|
f, err := os.OpenFile(path.Join(ms.storeDirectory, fmt.Sprintf("%s.%d", fileStoreFilename, 0)), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("ERROR: Could not open new message store file in: %s", ms.storeDirectory)
|
|
|
|
|
}
|
|
|
|
@ -112,7 +113,7 @@ func (ms *MessageStore) rotateFileStore() {
|
|
|
|
|
|
|
|
|
|
// Init sets up a MessageStore of size maxBufferLines (# of messages) backed by filename
|
|
|
|
|
func (ms *MessageStore) Init(appDirectory string, maxBufferLines int, messageCounter metrics.Counter) error {
|
|
|
|
|
ms.storeDirectory = appDirectory + "/" + directory + "/"
|
|
|
|
|
ms.storeDirectory = path.Join(appDirectory, directory)
|
|
|
|
|
os.Mkdir(ms.storeDirectory, 0700)
|
|
|
|
|
|
|
|
|
|
ms.bufferPos = 0
|
|
|
|
|