forked from cwtch.im/cwtch
Revert "adding Monitors; fixing metricsHistory rotation logic; wiring in Monitors; cleanup"
This reverts commit bacf733f52
.
This commit is contained in:
parent
e2538e8891
commit
3215478757
|
@ -85,7 +85,7 @@ func TestPeerPeerConnection(t *testing.T) {
|
||||||
t.Errorf("Onion address error %v", err)
|
t.Errorf("Onion address error %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
profile := model.GenerateNewProfile("alice")
|
profile := model.GenerateNewProfile("sarah")
|
||||||
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile)
|
ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile)
|
||||||
//numcalls := 0
|
//numcalls := 0
|
||||||
tp := new(TestPeer)
|
tp := new(TestPeer)
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"maxBufferLines": 100000,
|
"maxBufferLines": 100000
|
||||||
"serverReporting": {
|
serverReporting: {
|
||||||
"reportingGroupId": "",
|
reportingGroupId: ""
|
||||||
"reportingServerAddr": ""
|
reportingServerAddr: ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,10 +13,9 @@ type counter struct {
|
||||||
|
|
||||||
// Counter providers a threadsafe counter to use for storing long running counts
|
// Counter providers a threadsafe counter to use for storing long running counts
|
||||||
type Counter interface {
|
type Counter interface {
|
||||||
Add(unit int)
|
Add(unit uint64)
|
||||||
Reset()
|
|
||||||
|
|
||||||
Count() int
|
Count() uint64
|
||||||
GetStarttime() time.Time
|
GetStarttime() time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,18 +26,13 @@ func NewCounter() Counter {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add add a count of unit to the counter
|
// Add add a count of unit to the counter
|
||||||
func (c *counter) Add(unit int) {
|
func (c *counter) Add(unit uint64) {
|
||||||
atomic.AddUint64(&c.count, uint64(unit))
|
atomic.AddUint64(&c.count, unit)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count returns the count since Start
|
// Count returns the count since Start
|
||||||
func (c *counter) Count() int {
|
func (c *counter) Count() uint64 {
|
||||||
return int(atomic.LoadUint64(&c.count))
|
return atomic.LoadUint64(&c.count)
|
||||||
}
|
|
||||||
|
|
||||||
func (c *counter) Reset() {
|
|
||||||
atomic.StoreUint64(&c.count, 0)
|
|
||||||
c.startTime = time.Now()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStarttime returns the starttime of the counter
|
// GetStarttime returns the starttime of the counter
|
||||||
|
@ -48,17 +42,13 @@ func (c *counter) GetStarttime() time.Time {
|
||||||
|
|
||||||
type monitorHistory struct {
|
type monitorHistory struct {
|
||||||
starttime time.Time
|
starttime time.Time
|
||||||
perMinutePerHour [60]float64
|
perMinutePerHour [60]int
|
||||||
timeLastHourRotate time.Time
|
perHourForDay [24]int
|
||||||
perHourForDay [24]float64
|
perDayForWeek [7]int
|
||||||
timeLastDayRotate time.Time
|
perWeekForMonth [4]int
|
||||||
perDayForWeek [7]float64
|
perMonthForYear [12]int
|
||||||
timeLastWeekRotate time.Time
|
|
||||||
perWeekForMonth [4]float64
|
|
||||||
timeLastMonthRotate time.Time
|
|
||||||
perMonthForYear [12]float64
|
|
||||||
|
|
||||||
monitor func() float64
|
monitor func() int
|
||||||
|
|
||||||
breakChannel chan bool
|
breakChannel chan bool
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
@ -69,15 +59,15 @@ type MonitorHistory interface {
|
||||||
Start()
|
Start()
|
||||||
Stop()
|
Stop()
|
||||||
|
|
||||||
Minutes() []float64
|
Minutes() []int
|
||||||
Hours() []float64
|
Hours() []int
|
||||||
Days() []float64
|
Days() []int
|
||||||
Weeks() []float64
|
Weeks() []int
|
||||||
Months() []float64
|
Months() []int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
|
// NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor
|
||||||
func NewMonitorHistory(monitor func() float64) MonitorHistory {
|
func NewMonitorHistory(monitor func() int) MonitorHistory {
|
||||||
mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)}
|
mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)}
|
||||||
mh.Start()
|
mh.Start()
|
||||||
return mh
|
return mh
|
||||||
|
@ -94,32 +84,32 @@ func (mh *monitorHistory) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Minutes returns the last 60 minute monitoring results
|
// Minutes returns the last 60 minute monitoring results
|
||||||
func (mh *monitorHistory) Minutes() []float64 {
|
func (mh *monitorHistory) Minutes() []int {
|
||||||
return mh.returnCopy(mh.perMinutePerHour[:])
|
return mh.returnCopy(mh.perMinutePerHour[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hours returns the last 24 hourly averages of monitor results
|
// Hours returns the last 24 hourly averages of monitor results
|
||||||
func (mh *monitorHistory) Hours() []float64 {
|
func (mh *monitorHistory) Hours() []int {
|
||||||
return mh.returnCopy(mh.perHourForDay[:])
|
return mh.returnCopy(mh.perHourForDay[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Days returns the last 7 day averages of monitor results
|
// Days returns the last 7 day averages of monitor results
|
||||||
func (mh *monitorHistory) Days() []float64 {
|
func (mh *monitorHistory) Days() []int {
|
||||||
return mh.returnCopy(mh.perDayForWeek[:])
|
return mh.returnCopy(mh.perDayForWeek[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Weeks returns the last 4 weeks of averages of monitor results
|
// Weeks returns the last 4 weeks of averages of monitor results
|
||||||
func (mh *monitorHistory) Weeks() []float64 {
|
func (mh *monitorHistory) Weeks() []int {
|
||||||
return mh.returnCopy(mh.perWeekForMonth[:])
|
return mh.returnCopy(mh.perWeekForMonth[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Months returns the last 12 months of averages of monitor results
|
// Months returns the last 12 months of averages of monitor results
|
||||||
func (mh *monitorHistory) Months() []float64 {
|
func (mh *monitorHistory) Months() []int {
|
||||||
return mh.returnCopy(mh.perMonthForYear[:])
|
return mh.returnCopy(mh.perMonthForYear[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mh *monitorHistory) returnCopy(slice []float64) []float64 {
|
func (mh *monitorHistory) returnCopy(slice []int) []int {
|
||||||
retSlice := make([]float64, len(slice))
|
retSlice := make([]int, len(slice))
|
||||||
mh.lock.Lock()
|
mh.lock.Lock()
|
||||||
for i, v := range slice {
|
for i, v := range slice {
|
||||||
retSlice[i] = v
|
retSlice[i] = v
|
||||||
|
@ -128,22 +118,15 @@ func (mh *monitorHistory) returnCopy(slice []float64) []float64 {
|
||||||
return retSlice
|
return retSlice
|
||||||
}
|
}
|
||||||
|
|
||||||
func rotateAndAvg(array []float64, newVal float64) float64 {
|
func rotateAndAvg(array []int, newVal int) int {
|
||||||
total := float64(0.0)
|
total := 0
|
||||||
for i := len(array) - 1; i > 0; i-- {
|
for i := len(array) - 1; i > 0; i-- {
|
||||||
array[i] = array[i-1]
|
array[i] = array[i-1]
|
||||||
total += array[i]
|
total += array[i]
|
||||||
}
|
}
|
||||||
array[0] = newVal
|
array[0] = newVal
|
||||||
total += newVal
|
total += newVal
|
||||||
return total / float64(len(array))
|
return total / len(array)
|
||||||
}
|
|
||||||
func average(array []float64) float64 {
|
|
||||||
total := float64(0)
|
|
||||||
for _, x := range array {
|
|
||||||
total += x
|
|
||||||
}
|
|
||||||
return total / float64(len(array))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
|
// monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation
|
||||||
|
@ -156,26 +139,10 @@ func (mh *monitorHistory) monitorThread() {
|
||||||
mh.lock.Lock()
|
mh.lock.Lock()
|
||||||
|
|
||||||
minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor())
|
minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor())
|
||||||
|
hourAvg := rotateAndAvg(mh.perHourForDay[:], minuteAvg)
|
||||||
if time.Now().Sub(mh.timeLastHourRotate) > time.Hour {
|
dayAvg := rotateAndAvg(mh.perDayForWeek[:], hourAvg)
|
||||||
rotateAndAvg(mh.perHourForDay[:], minuteAvg)
|
weekAvg := rotateAndAvg(mh.perWeekForMonth[:], dayAvg)
|
||||||
mh.timeLastHourRotate = time.Now()
|
rotateAndAvg(mh.perMonthForYear[:], weekAvg)
|
||||||
}
|
|
||||||
|
|
||||||
if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 {
|
|
||||||
rotateAndAvg(mh.perDayForWeek[:], average(mh.perHourForDay[:]))
|
|
||||||
mh.timeLastDayRotate = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 {
|
|
||||||
rotateAndAvg(mh.perWeekForMonth[:], average(mh.perDayForWeek[:]))
|
|
||||||
mh.timeLastWeekRotate = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 {
|
|
||||||
rotateAndAvg(mh.perMonthForYear[:], average(mh.perWeekForMonth[:]))
|
|
||||||
mh.timeLastMonthRotate = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
mh.lock.Unlock()
|
mh.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -31,36 +31,36 @@ func TestCounter(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMonitorHistory(t *testing.T) {
|
func TestMonitorHistory(t *testing.T) {
|
||||||
value := float64(60 * 24 * 7 * 4 * 12)
|
value := 60 * 24 * 7 * 4 * 12
|
||||||
numGoRoutinesStart := runtime.NumGoroutine()
|
numGoRoutinesStart := runtime.NumGoroutine()
|
||||||
mh := NewMonitorHistory(func() float64 { return value })
|
mh := NewMonitorHistory(func() int { return value })
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
mh.Stop()
|
mh.Stop()
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
minutes := mh.Minutes()
|
minutes := mh.Minutes()
|
||||||
if minutes[0] != value {
|
if minutes[0] != value {
|
||||||
t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %f Actual: %f", value, minutes[0])
|
t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %d Actual: %d", value, minutes[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
hours := mh.Hours()
|
hours := mh.Hours()
|
||||||
if hours[0] != value/60 {
|
if hours[0] != value/60 {
|
||||||
t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %f Actual: %f", value, hours[0])
|
t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %d Actual: %d", value, hours[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
days := mh.Days()
|
days := mh.Days()
|
||||||
if days[0] != value/60/24 {
|
if days[0] != value/60/24 {
|
||||||
t.Errorf("Monitor day's first day slot had the wrong value. Expected: %f Actual: %f", value/24, days[0])
|
t.Errorf("Monitor day's first day slot had the wrong value. Expected: %d Actual: %d", value/24, days[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
weeks := mh.Weeks()
|
weeks := mh.Weeks()
|
||||||
if weeks[0] != value/60/24/7 {
|
if weeks[0] != value/60/24/7 {
|
||||||
t.Errorf("Monitor week's first day slot had the wrong value. Expected: %f Actual: %f", value/24/7, weeks[0])
|
t.Errorf("Monitor week's first day slot had the wrong value. Expected: %d Actual: %d", value/24/7, weeks[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
months := mh.Months()
|
months := mh.Months()
|
||||||
if months[0] != 12 {
|
if months[0] != 12 {
|
||||||
t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %f", 12, months[0])
|
t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %d", 12, months[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
if numGoRoutinesStart != runtime.NumGoroutine() {
|
if numGoRoutinesStart != runtime.NumGoroutine() {
|
||||||
|
|
|
@ -1,56 +0,0 @@
|
||||||
package metrics
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
|
||||||
"github.com/struCoder/pidusage"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns
|
|
||||||
type Monitors struct {
|
|
||||||
MessageCounter Counter
|
|
||||||
Messages MonitorHistory
|
|
||||||
CPU MonitorHistory
|
|
||||||
Memory MonitorHistory
|
|
||||||
ClientListenConns MonitorHistory
|
|
||||||
breakChannel chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start initializes a Monitors's monitors
|
|
||||||
func (mp *Monitors) Start(ra *application.RicochetApplication) {
|
|
||||||
mp.breakChannel = make(chan bool)
|
|
||||||
mp.MessageCounter = NewCounter()
|
|
||||||
mp.Messages = NewMonitorHistory(func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return })
|
|
||||||
mp.CPU = NewMonitorHistory(func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) })
|
|
||||||
mp.Memory = NewMonitorHistory(func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) })
|
|
||||||
mp.ClientListenConns = NewMonitorHistory(func() float64 { return float64(ra.ConnectionCount()) })
|
|
||||||
|
|
||||||
// Todo: replace with proper reporting
|
|
||||||
go mp.log()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mp *Monitors) log() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Minute):
|
|
||||||
messageMinutes := mp.Messages.Minutes()
|
|
||||||
cpuMinutes := mp.CPU.Minutes()
|
|
||||||
memoryMinutes := mp.Memory.Minutes()
|
|
||||||
listenConnsMinutes := mp.ClientListenConns.Minutes()
|
|
||||||
log.Printf("METRICS: Messages: %.0f ClientListenConns: %.0f CPU: %.2f Memory: %.0f\n", messageMinutes[0], listenConnsMinutes[0], cpuMinutes[0], memoryMinutes[0])
|
|
||||||
case <-mp.breakChannel:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop stops all the monitors in a Monitors
|
|
||||||
func (mp *Monitors) Stop() {
|
|
||||||
mp.breakChannel <- true
|
|
||||||
mp.Messages.Stop()
|
|
||||||
mp.CPU.Stop()
|
|
||||||
mp.Memory.Stop()
|
|
||||||
mp.ClientListenConns.Stop()
|
|
||||||
}
|
|
|
@ -3,7 +3,6 @@ package server
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/server/fetch"
|
"cwtch.im/cwtch/server/fetch"
|
||||||
"cwtch.im/cwtch/server/listen"
|
"cwtch.im/cwtch/server/listen"
|
||||||
"cwtch.im/cwtch/server/metrics"
|
|
||||||
"cwtch.im/cwtch/server/send"
|
"cwtch.im/cwtch/server/send"
|
||||||
"cwtch.im/cwtch/storage"
|
"cwtch.im/cwtch/storage"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
||||||
|
@ -15,7 +14,6 @@ import (
|
||||||
type Server struct {
|
type Server struct {
|
||||||
app *application.RicochetApplication
|
app *application.RicochetApplication
|
||||||
config *Config
|
config *Config
|
||||||
metricsPack metrics.Monitors
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts a server with the given privateKey
|
// Run starts a server with the given privateKey
|
||||||
|
@ -23,7 +21,6 @@ type Server struct {
|
||||||
func (s *Server) Run(serverConfig *Config) {
|
func (s *Server) Run(serverConfig *Config) {
|
||||||
s.config = serverConfig
|
s.config = serverConfig
|
||||||
cwtchserver := new(application.RicochetApplication)
|
cwtchserver := new(application.RicochetApplication)
|
||||||
s.metricsPack.Start(cwtchserver)
|
|
||||||
|
|
||||||
l, err := application.SetupOnion("127.0.0.1:9051", "tcp4", "", s.config.PrivateKey(), 9878)
|
l, err := application.SetupOnion("127.0.0.1:9051", "tcp4", "", s.config.PrivateKey(), 9878)
|
||||||
|
|
||||||
|
@ -34,8 +31,10 @@ func (s *Server) Run(serverConfig *Config) {
|
||||||
af := application.ApplicationInstanceFactory{}
|
af := application.ApplicationInstanceFactory{}
|
||||||
af.Init()
|
af.Init()
|
||||||
ms := new(storage.MessageStore)
|
ms := new(storage.MessageStore)
|
||||||
ms.Init("cwtch.messages", s.config.MaxBufferLines, s.metricsPack.MessageCounter)
|
ms.Init("cwtch.messages", s.config.MaxBufferLines)
|
||||||
af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler {
|
af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler {
|
||||||
|
si := new(Instance)
|
||||||
|
si.Init(rai, cwtchserver, ms)
|
||||||
return func() channels.Handler {
|
return func() channels.Handler {
|
||||||
cslc := new(listen.CwtchServerListenChannel)
|
cslc := new(listen.CwtchServerListenChannel)
|
||||||
return cslc
|
return cslc
|
||||||
|
@ -71,5 +70,4 @@ func (s *Server) Run(serverConfig *Config) {
|
||||||
// Shutdown kills the app closing all connections and freeing all goroutines
|
// Shutdown kills the app closing all connections and freeing all goroutines
|
||||||
func (s *Server) Shutdown() {
|
func (s *Server) Shutdown() {
|
||||||
s.app.Shutdown()
|
s.app.Shutdown()
|
||||||
s.metricsPack.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/protocol"
|
"cwtch.im/cwtch/protocol"
|
||||||
"cwtch.im/cwtch/server/metrics"
|
|
||||||
"cwtch.im/cwtch/storage"
|
"cwtch.im/cwtch/storage"
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
||||||
"os"
|
"os"
|
||||||
|
@ -16,7 +15,7 @@ func TestServerInstance(t *testing.T) {
|
||||||
ra := new(application.RicochetApplication)
|
ra := new(application.RicochetApplication)
|
||||||
msi := new(storage.MessageStore)
|
msi := new(storage.MessageStore)
|
||||||
os.Remove("ms.test")
|
os.Remove("ms.test")
|
||||||
msi.Init("ms.test", 5, metrics.NewCounter())
|
msi.Init("ms.test", 5)
|
||||||
gm := protocol.GroupMessage{
|
gm := protocol.GroupMessage{
|
||||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."),
|
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."),
|
||||||
Spamguard: []byte{},
|
Spamguard: []byte{},
|
||||||
|
|
|
@ -3,7 +3,6 @@ package storage
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"cwtch.im/cwtch/protocol"
|
"cwtch.im/cwtch/protocol"
|
||||||
"cwtch.im/cwtch/server/metrics"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
@ -22,7 +21,6 @@ type MessageStore struct {
|
||||||
file *os.File
|
file *os.File
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
messages []*protocol.GroupMessage
|
messages []*protocol.GroupMessage
|
||||||
messageCounter metrics.Counter
|
|
||||||
bufferSize int
|
bufferSize int
|
||||||
pos int
|
pos int
|
||||||
rotated bool
|
rotated bool
|
||||||
|
@ -46,7 +44,7 @@ func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init sets up a MessageStore of size bufferSize backed by filename
|
// Init sets up a MessageStore of size bufferSize backed by filename
|
||||||
func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) {
|
func (ms *MessageStore) Init(filename string, bufferSize int) {
|
||||||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -56,7 +54,6 @@ func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter met
|
||||||
ms.bufferSize = bufferSize
|
ms.bufferSize = bufferSize
|
||||||
ms.messages = make([]*protocol.GroupMessage, bufferSize)
|
ms.messages = make([]*protocol.GroupMessage, bufferSize)
|
||||||
ms.rotated = false
|
ms.rotated = false
|
||||||
ms.messageCounter = messageCounter
|
|
||||||
|
|
||||||
scanner := bufio.NewScanner(f)
|
scanner := bufio.NewScanner(f)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
|
@ -92,7 +89,6 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
|
||||||
|
|
||||||
// AddMessage adds a GroupMessage to the store
|
// AddMessage adds a GroupMessage to the store
|
||||||
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
|
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
|
||||||
ms.messageCounter.Add(1)
|
|
||||||
ms.lock.Lock()
|
ms.lock.Lock()
|
||||||
ms.updateBuffer(&gm)
|
ms.updateBuffer(&gm)
|
||||||
s, err := json.Marshal(gm)
|
s, err := json.Marshal(gm)
|
||||||
|
|
|
@ -2,7 +2,6 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/protocol"
|
"cwtch.im/cwtch/protocol"
|
||||||
"cwtch.im/cwtch/server/metrics"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -11,8 +10,7 @@ import (
|
||||||
func TestMessageStore(t *testing.T) {
|
func TestMessageStore(t *testing.T) {
|
||||||
os.Remove("ms.test")
|
os.Remove("ms.test")
|
||||||
ms := new(MessageStore)
|
ms := new(MessageStore)
|
||||||
counter := metrics.NewCounter()
|
ms.Init("ms.test", 100000)
|
||||||
ms.Init("ms.test", 100000, counter)
|
|
||||||
for i := 0; i < 50000; i++ {
|
for i := 0; i < 50000; i++ {
|
||||||
gm := protocol.GroupMessage{
|
gm := protocol.GroupMessage{
|
||||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
||||||
|
@ -20,18 +18,13 @@ func TestMessageStore(t *testing.T) {
|
||||||
}
|
}
|
||||||
ms.AddMessage(gm)
|
ms.AddMessage(gm)
|
||||||
}
|
}
|
||||||
if counter.Count() != 50000 {
|
|
||||||
t.Errorf("Counter should be at 50000 was %v", counter.Count())
|
|
||||||
}
|
|
||||||
ms.Close()
|
ms.Close()
|
||||||
ms.Init("ms.test", 100000, counter)
|
ms.Init("ms.test", 100000)
|
||||||
m := ms.FetchMessages()
|
m := ms.FetchMessages()
|
||||||
if len(m) != 50000 {
|
if len(m) != 50000 {
|
||||||
t.Errorf("Should have been 50000 was %v", len(m))
|
t.Errorf("Should have been 5000 was %v", len(m))
|
||||||
}
|
}
|
||||||
|
|
||||||
counter.Reset()
|
|
||||||
|
|
||||||
for i := 0; i < 100000; i++ {
|
for i := 0; i < 100000; i++ {
|
||||||
gm := protocol.GroupMessage{
|
gm := protocol.GroupMessage{
|
||||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
||||||
|
|
|
@ -28,7 +28,6 @@ var (
|
||||||
carolLines = []string{"Howdy, thanks!"}
|
carolLines = []string{"Howdy, thanks!"}
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: fix to load private key from server/app/serverConfig.json
|
|
||||||
func loadPrivateKey(t *testing.T) *rsa.PrivateKey {
|
func loadPrivateKey(t *testing.T) *rsa.PrivateKey {
|
||||||
if _, err := os.Stat(serverKeyfile); os.IsNotExist(err) {
|
if _, err := os.Stat(serverKeyfile); os.IsNotExist(err) {
|
||||||
return nil
|
return nil
|
||||||
|
@ -87,6 +86,7 @@ func serverCheck(t *testing.T, serverAddr string) bool {
|
||||||
func waitForPeerConnection(t *testing.T, peer peer.CwtchPeerInterface, server string) {
|
func waitForPeerConnection(t *testing.T, peer peer.CwtchPeerInterface, server string) {
|
||||||
for {
|
for {
|
||||||
servers := peer.GetServers()
|
servers := peer.GetServers()
|
||||||
|
fmt.Println(servers)
|
||||||
state, ok := servers[server]
|
state, ok := servers[server]
|
||||||
if ok {
|
if ok {
|
||||||
if state == connections.FAILED {
|
if state == connections.FAILED {
|
||||||
|
@ -298,8 +298,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
|
|
||||||
// ***** Verify Test *****
|
// ***** Verify Test *****
|
||||||
|
|
||||||
fmt.Println("Final syncing time...")
|
// final syncing time...
|
||||||
time.Sleep(time.Second * 30)
|
time.Sleep(time.Second * 15)
|
||||||
|
|
||||||
alicesGroup := alice.GetGroup(groupID)
|
alicesGroup := alice.GetGroup(groupID)
|
||||||
if alicesGroup == nil {
|
if alicesGroup == nil {
|
||||||
|
@ -333,18 +333,20 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
|
|
||||||
if len(alicesGroup.GetTimeline()) != 4 {
|
if len(alicesGroup.GetTimeline()) != 4 {
|
||||||
t.Errorf("Alice's timeline does not have all messages")
|
t.Errorf("Alice's timeline does not have all messages")
|
||||||
} else {
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(bobsGroup.GetTimeline()) != 6 {
|
||||||
|
t.Errorf("Bob's timeline does not have all messages")
|
||||||
|
}
|
||||||
|
|
||||||
// check message 0,1,2,3
|
// check message 0,1,2,3
|
||||||
aliceGroupTimeline := alicesGroup.GetTimeline()
|
aliceGroupTimeline := alicesGroup.GetTimeline()
|
||||||
if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
|
if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] ||
|
||||||
aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
|
aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] {
|
||||||
t.Errorf("Some of Alice's timeline messages did not have the expected content!")
|
t.Errorf("Some of Alice's timeline messages did not have the expected content!")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if len(bobsGroup.GetTimeline()) != 6 {
|
|
||||||
t.Errorf("Bob's timeline does not have all messages")
|
|
||||||
} else {
|
|
||||||
// check message 0,1,2,3,4,5
|
// check message 0,1,2,3,4,5
|
||||||
bobGroupTimeline := bobsGroup.GetTimeline()
|
bobGroupTimeline := bobsGroup.GetTimeline()
|
||||||
if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
|
if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] ||
|
||||||
|
@ -352,11 +354,11 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
|
bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] {
|
||||||
t.Errorf("Some of Bob's timeline messages did not have the expected content!")
|
t.Errorf("Some of Bob's timeline messages did not have the expected content!")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if len(carolsGroup.GetTimeline()) != 6 {
|
if len(carolsGroup.GetTimeline()) != 6 {
|
||||||
t.Errorf("Carol's timeline does not have all messages")
|
t.Errorf("Carol's timeline does not have all messages")
|
||||||
} else {
|
}
|
||||||
|
|
||||||
// check message 0,1,2,3,4,5
|
// check message 0,1,2,3,4,5
|
||||||
carolGroupTimeline := carolsGroup.GetTimeline()
|
carolGroupTimeline := carolsGroup.GetTimeline()
|
||||||
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
|
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
|
||||||
|
@ -364,7 +366,6 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
|
carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
|
||||||
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
|
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Shutting down Bob...")
|
fmt.Println("Shutting down Bob...")
|
||||||
bob.Shutdown()
|
bob.Shutdown()
|
||||||
|
|
Loading…
Reference in New Issue