rewire in message count metrics #9
|
@ -12,7 +12,6 @@ import (
|
||||||
type counter struct {
|
type counter struct {
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
count uint64
|
count uint64
|
||||||
total uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Counter providers a threadsafe counter to use for storing long running counts
|
// Counter providers a threadsafe counter to use for storing long running counts
|
||||||
|
@ -26,7 +25,7 @@ type Counter interface {
|
||||||
|
|
||||||
// NewCounter initializes a counter starting at time.Now() and a count of 0 and returns it
|
// NewCounter initializes a counter starting at time.Now() and a count of 0 and returns it
|
||||||
func NewCounter() Counter {
|
func NewCounter() Counter {
|
||||||
c := &counter{startTime: time.Now(), count: 0, total: 0}
|
c := &counter{startTime: time.Now(), count: 0}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ func (s *Server) Run(acn connectivity.ACN) error {
|
||||||
log.Infof("cwtch server running on cwtch:%s\n", addressIdentity+".onion:")
|
log.Infof("cwtch server running on cwtch:%s\n", addressIdentity+".onion:")
|
||||||
s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
|
s.metricsPack.Start(service, s.config.ConfigDir, s.config.ServerReporting.LogMetricsToFile)
|
||||||
|
|
||||||
ms, err := storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"))
|
ms, err := storage.InitializeSqliteMessageStore(path.Join(s.config.ConfigDir, "cwtch.messages"), s.metricsPack.MessageCounter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not open database: %v", err)
|
return fmt.Errorf("could not open database: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/protocol/groups"
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
|
"git.openprivacy.ca/cwtch.im/server/metrics"
|
||||||
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -18,7 +19,8 @@ type MessageStoreInterface interface {
|
||||||
|
|
||||||
// SqliteMessageStore is an sqlite3 backed message store
|
// SqliteMessageStore is an sqlite3 backed message store
|
||||||
type SqliteMessageStore struct {
|
type SqliteMessageStore struct {
|
||||||
database *sql.DB
|
messageCounter metrics.Counter
|
||||||
|
database *sql.DB
|
||||||
|
|
||||||
// Some prepared queries...
|
// Some prepared queries...
|
||||||
preparedInsertStatement *sql.Stmt // A Stmt is safe for concurrent use by multiple goroutines.
|
preparedInsertStatement *sql.Stmt // A Stmt is safe for concurrent use by multiple goroutines.
|
||||||
|
@ -34,7 +36,7 @@ func (s *SqliteMessageStore) Close() {
|
||||||
|
|
||||||
// AddMessage implements the MessageStoreInterface AddMessage for sqlite message store
|
// AddMessage implements the MessageStoreInterface AddMessage for sqlite message store
|
||||||
func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) {
|
func (s *SqliteMessageStore) AddMessage(message groups.EncryptedGroupMessage) {
|
||||||
|
s.messageCounter.Add(1)
|
||||||
// ignore this clearly invalid message...
|
// ignore this clearly invalid message...
|
||||||
if len(message.Signature) == 0 {
|
if len(message.Signature) == 0 {
|
||||||
return
|
return
|
||||||
|
@ -105,7 +107,7 @@ func (s *SqliteMessageStore) compileRows(rows *sql.Rows) []*groups.EncryptedGrou
|
||||||
|
|
||||||
// InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist)
|
// InitializeSqliteMessageStore creates a database `dbfile` with the necessary tables (if it doesn't already exist)
|
||||||
// and returns an open database
|
// and returns an open database
|
||||||
func InitializeSqliteMessageStore(dbfile string) (*SqliteMessageStore, error) {
|
func InitializeSqliteMessageStore(dbfile string, messageCounter metrics.Counter) (*SqliteMessageStore, error) {
|
||||||
db, err := sql.Open("sqlite3", dbfile)
|
db, err := sql.Open("sqlite3", dbfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("database %v cannot be created or opened %v", dbfile, err)
|
log.Errorf("database %v cannot be created or opened %v", dbfile, err)
|
||||||
|
@ -121,6 +123,7 @@ func InitializeSqliteMessageStore(dbfile string) (*SqliteMessageStore, error) {
|
||||||
log.Infof("Database Initialized")
|
log.Infof("Database Initialized")
|
||||||
slms := new(SqliteMessageStore)
|
slms := new(SqliteMessageStore)
|
||||||
slms.database = db
|
slms.database = db
|
||||||
|
slms.messageCounter = messageCounter
|
||||||
|
|
||||||
sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);`
|
sqlStmt = `INSERT INTO messages(signature, ciphertext) values (?,?);`
|
||||||
stmt, err := slms.database.Prepare(sqlStmt)
|
stmt, err := slms.database.Prepare(sqlStmt)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/protocol/groups"
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
|
"git.openprivacy.ca/cwtch.im/server/metrics"
|
||||||
sarah
commented
wrong package wrong package
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"os"
|
"os"
|
||||||
|
@ -13,7 +14,8 @@ func TestMessageStore(t *testing.T) {
|
||||||
filename := "../testcwtchmessages.db"
|
filename := "../testcwtchmessages.db"
|
||||||
os.Remove(filename)
|
os.Remove(filename)
|
||||||
log.SetLevel(log.LevelDebug)
|
log.SetLevel(log.LevelDebug)
|
||||||
db, err := InitializeSqliteMessageStore(filename)
|
counter := metrics.NewCounter()
|
||||||
|
db, err := InitializeSqliteMessageStore(filename, counter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error: %v", err)
|
t.Fatalf("Error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -37,6 +39,9 @@ func TestMessageStore(t *testing.T) {
|
||||||
db.AddMessage(message)
|
db.AddMessage(message)
|
||||||
}
|
}
|
||||||
t.Logf("Time to Insert: %v", time.Since(start))
|
t.Logf("Time to Insert: %v", time.Since(start))
|
||||||
|
if counter.Count() != numMessages {
|
||||||
|
t.Errorf("Counter should be at %v was %v", numMessages, counter.Count())
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for inserts to complete..
|
// Wait for inserts to complete..
|
||||||
fetchedMessages := db.FetchMessages()
|
fetchedMessages := db.FetchMessages()
|
||||||
|
|
Loading…
Reference in New Issue
wrong package