package storage import ( "bufio" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/server/metrics" "encoding/json" "fmt" "log" "os" "sync" ) // MessageStoreInterface defines an interface to interact with a store of cwtch messages. type MessageStoreInterface interface { AddMessage(protocol.GroupMessage) FetchMessages() []*protocol.GroupMessage } // MessageStore is a file-backed implementation of MessageStoreInterface type MessageStore struct { file *os.File lock sync.Mutex messages []*protocol.GroupMessage messageCounter metrics.Counter bufferSize int pos int rotated bool } // Close closes the message store and underlying resources. func (ms *MessageStore) Close() { ms.lock.Lock() ms.messages = nil ms.file.Close() ms.lock.Unlock() } func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) { ms.messages[ms.pos] = gm ms.pos++ if ms.pos == ms.bufferSize { ms.pos = 0 ms.rotated = true } } // Init sets up a MessageStore of size bufferSize backed by filename func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) { f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) if err != nil { panic(err) } ms.file = f ms.pos = 0 ms.bufferSize = bufferSize ms.messages = make([]*protocol.GroupMessage, bufferSize) ms.rotated = false ms.messageCounter = messageCounter scanner := bufio.NewScanner(f) for scanner.Scan() { gms := scanner.Text() gm := &protocol.GroupMessage{} err := json.Unmarshal([]byte(gms), gm) if err == nil { ms.updateBuffer(gm) } else { panic(err) } } if err := scanner.Err(); err != nil { panic(err) } } // FetchMessages returns all messages from the backing file. func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { ms.lock.Lock() if !ms.rotated { messages = make([]*protocol.GroupMessage, ms.pos) copy(messages, ms.messages[0:ms.pos]) } else { messages = make([]*protocol.GroupMessage, ms.bufferSize) copy(messages, ms.messages) } ms.lock.Unlock() return } // AddMessage adds a GroupMessage to the store func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) { ms.messageCounter.Add(1) ms.lock.Lock() ms.updateBuffer(&gm) s, err := json.Marshal(gm) if err != nil { log.Printf("[ERROR] Failed to unmarshal group message %v\n", err) } fmt.Fprintf(ms.file, "%s\n", s) ms.lock.Unlock() }