diff --git a/README.md b/README.md index bc83c57..72513b5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Cwtch: Anonymous, Decentralized, Group Messaging +# Cwtch: Privacy Preserving Infrastructure for Asynchronous, Decentralized and Metadata Resistant Applications Communications metadata is known to be exploited by various adversaries to undermine the security of systems, to track victims and to conduct large scale social network analysis to feed mass surveillance. Metadata resistant tools are in their infancy and research into the construction and user experience of such tools is lacking. We present Cwtch, and extension of the metadata resistant protocol [Ricochet](https://ricochet.im) to support asynchronous, multi-peer group communications through the use of discardable, untrusted, anonymous infrastructure. diff --git a/connectivity/tor/tormanager.go b/connectivity/tor/tormanager.go index 04bb85e..c62fcc3 100644 --- a/connectivity/tor/tormanager.go +++ b/connectivity/tor/tormanager.go @@ -88,6 +88,9 @@ func (tm *Manager) TestConnection() error { if c != nil { c.Close() } + if err == nil { + return nil + } return fmt.Errorf("could not connect to Tor Control Port %v %v", tm.controlPort, err) } return errors.New(proxyStatusMessage(proxyStatus)) diff --git a/peer/connections/peerpeerconnection_test.go b/peer/connections/peerpeerconnection_test.go index 73ede7d..c03226b 100644 --- a/peer/connections/peerpeerconnection_test.go +++ b/peer/connections/peerpeerconnection_test.go @@ -85,7 +85,7 @@ func TestPeerPeerConnection(t *testing.T) { t.Errorf("Onion address error %v", err) } - profile := model.GenerateNewProfile("alice") + profile := model.GenerateNewProfile("sarah") ppc := NewPeerPeerConnection("127.0.0.1:5452|"+onionAddr, profile) //numcalls := 0 tp := new(TestPeer) diff --git a/server/app/serverConfig.json.sample b/server/app/serverConfig.json.sample index a6b2d13..985da45 100644 --- a/server/app/serverConfig.json.sample +++ b/server/app/serverConfig.json.sample @@ -1,7 +1,7 @@ { - "maxBufferLines": 100000, - "serverReporting": { - "reportingGroupId": "", - "reportingServerAddr": "" + "maxBufferLines": 100000 + serverReporting: { + reportingGroupId: "" + reportingServerAddr: "" } } diff --git a/server/metrics/metrics.go b/server/metrics/metrics.go index 47b81e2..13a53e9 100644 --- a/server/metrics/metrics.go +++ b/server/metrics/metrics.go @@ -13,10 +13,9 @@ type counter struct { // Counter providers a threadsafe counter to use for storing long running counts type Counter interface { - Add(unit int) - Reset() + Add(unit uint64) - Count() int + Count() uint64 GetStarttime() time.Time } @@ -27,18 +26,13 @@ func NewCounter() Counter { } // Add add a count of unit to the counter -func (c *counter) Add(unit int) { - atomic.AddUint64(&c.count, uint64(unit)) +func (c *counter) Add(unit uint64) { + atomic.AddUint64(&c.count, unit) } // Count returns the count since Start -func (c *counter) Count() int { - return int(atomic.LoadUint64(&c.count)) -} - -func (c *counter) Reset() { - atomic.StoreUint64(&c.count, 0) - c.startTime = time.Now() +func (c *counter) Count() uint64 { + return atomic.LoadUint64(&c.count) } // GetStarttime returns the starttime of the counter @@ -47,18 +41,14 @@ func (c *counter) GetStarttime() time.Time { } type monitorHistory struct { - starttime time.Time - perMinutePerHour [60]float64 - timeLastHourRotate time.Time - perHourForDay [24]float64 - timeLastDayRotate time.Time - perDayForWeek [7]float64 - timeLastWeekRotate time.Time - perWeekForMonth [4]float64 - timeLastMonthRotate time.Time - perMonthForYear [12]float64 + starttime time.Time + perMinutePerHour [60]int + perHourForDay [24]int + perDayForWeek [7]int + perWeekForMonth [4]int + perMonthForYear [12]int - monitor func() float64 + monitor func() int breakChannel chan bool lock sync.Mutex @@ -69,15 +59,15 @@ type MonitorHistory interface { Start() Stop() - Minutes() []float64 - Hours() []float64 - Days() []float64 - Weeks() []float64 - Months() []float64 + Minutes() []int + Hours() []int + Days() []int + Weeks() []int + Months() []int } // NewMonitorHistory returns a new MonitorHistory with starttime of time.Now and Started running with supplied monitor -func NewMonitorHistory(monitor func() float64) MonitorHistory { +func NewMonitorHistory(monitor func() int) MonitorHistory { mh := &monitorHistory{starttime: time.Now(), monitor: monitor, breakChannel: make(chan bool)} mh.Start() return mh @@ -94,32 +84,32 @@ func (mh *monitorHistory) Stop() { } // Minutes returns the last 60 minute monitoring results -func (mh *monitorHistory) Minutes() []float64 { +func (mh *monitorHistory) Minutes() []int { return mh.returnCopy(mh.perMinutePerHour[:]) } // Hours returns the last 24 hourly averages of monitor results -func (mh *monitorHistory) Hours() []float64 { +func (mh *monitorHistory) Hours() []int { return mh.returnCopy(mh.perHourForDay[:]) } // Days returns the last 7 day averages of monitor results -func (mh *monitorHistory) Days() []float64 { +func (mh *monitorHistory) Days() []int { return mh.returnCopy(mh.perDayForWeek[:]) } // Weeks returns the last 4 weeks of averages of monitor results -func (mh *monitorHistory) Weeks() []float64 { +func (mh *monitorHistory) Weeks() []int { return mh.returnCopy(mh.perWeekForMonth[:]) } // Months returns the last 12 months of averages of monitor results -func (mh *monitorHistory) Months() []float64 { +func (mh *monitorHistory) Months() []int { return mh.returnCopy(mh.perMonthForYear[:]) } -func (mh *monitorHistory) returnCopy(slice []float64) []float64 { - retSlice := make([]float64, len(slice)) +func (mh *monitorHistory) returnCopy(slice []int) []int { + retSlice := make([]int, len(slice)) mh.lock.Lock() for i, v := range slice { retSlice[i] = v @@ -128,22 +118,15 @@ func (mh *monitorHistory) returnCopy(slice []float64) []float64 { return retSlice } -func rotateAndAvg(array []float64, newVal float64) float64 { - total := float64(0.0) +func rotateAndAvg(array []int, newVal int) int { + total := 0 for i := len(array) - 1; i > 0; i-- { array[i] = array[i-1] total += array[i] } array[0] = newVal total += newVal - return total / float64(len(array)) -} -func average(array []float64) float64 { - total := float64(0) - for _, x := range array { - total += x - } - return total / float64(len(array)) + return total / len(array) } // monitorThread is the goroutine in a monitorHistory that does per minute monitoring and rotation @@ -156,26 +139,10 @@ func (mh *monitorHistory) monitorThread() { mh.lock.Lock() minuteAvg := rotateAndAvg(mh.perMinutePerHour[:], mh.monitor()) - - if time.Now().Sub(mh.timeLastHourRotate) > time.Hour { - rotateAndAvg(mh.perHourForDay[:], minuteAvg) - mh.timeLastHourRotate = time.Now() - } - - if time.Now().Sub(mh.timeLastDayRotate) > time.Hour*24 { - rotateAndAvg(mh.perDayForWeek[:], average(mh.perHourForDay[:])) - mh.timeLastDayRotate = time.Now() - } - - if time.Now().Sub(mh.timeLastWeekRotate) > time.Hour*24*7 { - rotateAndAvg(mh.perWeekForMonth[:], average(mh.perDayForWeek[:])) - mh.timeLastWeekRotate = time.Now() - } - - if time.Now().Sub(mh.timeLastMonthRotate) > time.Hour*24*7*4 { - rotateAndAvg(mh.perMonthForYear[:], average(mh.perWeekForMonth[:])) - mh.timeLastMonthRotate = time.Now() - } + hourAvg := rotateAndAvg(mh.perHourForDay[:], minuteAvg) + dayAvg := rotateAndAvg(mh.perDayForWeek[:], hourAvg) + weekAvg := rotateAndAvg(mh.perWeekForMonth[:], dayAvg) + rotateAndAvg(mh.perMonthForYear[:], weekAvg) mh.lock.Unlock() diff --git a/server/metrics/metrics_test.go b/server/metrics/metrics_test.go index 7ffabac..f6bec06 100644 --- a/server/metrics/metrics_test.go +++ b/server/metrics/metrics_test.go @@ -31,36 +31,36 @@ func TestCounter(t *testing.T) { } func TestMonitorHistory(t *testing.T) { - value := float64(60 * 24 * 7 * 4 * 12) + value := 60 * 24 * 7 * 4 * 12 numGoRoutinesStart := runtime.NumGoroutine() - mh := NewMonitorHistory(func() float64 { return value }) + mh := NewMonitorHistory(func() int { return value }) time.Sleep(1 * time.Second) mh.Stop() time.Sleep(1 * time.Second) minutes := mh.Minutes() if minutes[0] != value { - t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %f Actual: %f", value, minutes[0]) + t.Errorf("Monitor minutes's first minutes slot had the wrong value. Expected: %d Actual: %d", value, minutes[0]) } hours := mh.Hours() if hours[0] != value/60 { - t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %f Actual: %f", value, hours[0]) + t.Errorf("Monitor hour's first hour slot had the wrong value. Expected: %d Actual: %d", value, hours[0]) } days := mh.Days() if days[0] != value/60/24 { - t.Errorf("Monitor day's first day slot had the wrong value. Expected: %f Actual: %f", value/24, days[0]) + t.Errorf("Monitor day's first day slot had the wrong value. Expected: %d Actual: %d", value/24, days[0]) } weeks := mh.Weeks() if weeks[0] != value/60/24/7 { - t.Errorf("Monitor week's first day slot had the wrong value. Expected: %f Actual: %f", value/24/7, weeks[0]) + t.Errorf("Monitor week's first day slot had the wrong value. Expected: %d Actual: %d", value/24/7, weeks[0]) } months := mh.Months() if months[0] != 12 { - t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %f", 12, months[0]) + t.Errorf("Monitor months's first day slot had the wrong value. Expected: %d Actual: %d", 12, months[0]) } if numGoRoutinesStart != runtime.NumGoroutine() { diff --git a/server/metrics/monitors.go b/server/metrics/monitors.go deleted file mode 100644 index 72b0d02..0000000 --- a/server/metrics/monitors.go +++ /dev/null @@ -1,56 +0,0 @@ -package metrics - -import ( - "git.openprivacy.ca/openprivacy/libricochet-go/application" - "github.com/struCoder/pidusage" - "log" - "os" - "time" -) - -// Monitors is a package of metrics for a Cwtch Server including message count, CPU, Mem, and conns -type Monitors struct { - MessageCounter Counter - Messages MonitorHistory - CPU MonitorHistory - Memory MonitorHistory - ClientListenConns MonitorHistory - breakChannel chan bool -} - -// Start initializes a Monitors's monitors -func (mp *Monitors) Start(ra *application.RicochetApplication) { - mp.breakChannel = make(chan bool) - mp.MessageCounter = NewCounter() - mp.Messages = NewMonitorHistory(func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return }) - mp.CPU = NewMonitorHistory(func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) }) - mp.Memory = NewMonitorHistory(func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) }) - mp.ClientListenConns = NewMonitorHistory(func() float64 { return float64(ra.ConnectionCount()) }) - - // Todo: replace with proper reporting - go mp.log() -} - -func (mp *Monitors) log() { - for { - select { - case <-time.After(time.Minute): - messageMinutes := mp.Messages.Minutes() - cpuMinutes := mp.CPU.Minutes() - memoryMinutes := mp.Memory.Minutes() - listenConnsMinutes := mp.ClientListenConns.Minutes() - log.Printf("METRICS: Messages: %.0f ClientListenConns: %.0f CPU: %.2f Memory: %.0f\n", messageMinutes[0], listenConnsMinutes[0], cpuMinutes[0], memoryMinutes[0]) - case <-mp.breakChannel: - return - } - } -} - -// Stop stops all the monitors in a Monitors -func (mp *Monitors) Stop() { - mp.breakChannel <- true - mp.Messages.Stop() - mp.CPU.Stop() - mp.Memory.Stop() - mp.ClientListenConns.Stop() -} diff --git a/server/server.go b/server/server.go index e10825b..d79dd02 100644 --- a/server/server.go +++ b/server/server.go @@ -3,7 +3,6 @@ package server import ( "cwtch.im/cwtch/server/fetch" "cwtch.im/cwtch/server/listen" - "cwtch.im/cwtch/server/metrics" "cwtch.im/cwtch/server/send" "cwtch.im/cwtch/storage" "git.openprivacy.ca/openprivacy/libricochet-go/application" @@ -13,9 +12,8 @@ import ( // Server encapsulates a complete, compliant Cwtch server. type Server struct { - app *application.RicochetApplication - config *Config - metricsPack metrics.Monitors + app *application.RicochetApplication + config *Config } // Run starts a server with the given privateKey @@ -23,7 +21,6 @@ type Server struct { func (s *Server) Run(serverConfig *Config) { s.config = serverConfig cwtchserver := new(application.RicochetApplication) - s.metricsPack.Start(cwtchserver) l, err := application.SetupOnion("127.0.0.1:9051", "tcp4", "", s.config.PrivateKey(), 9878) @@ -34,8 +31,10 @@ func (s *Server) Run(serverConfig *Config) { af := application.ApplicationInstanceFactory{} af.Init() ms := new(storage.MessageStore) - ms.Init("cwtch.messages", s.config.MaxBufferLines, s.metricsPack.MessageCounter) + ms.Init("cwtch.messages", s.config.MaxBufferLines) af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler { + si := new(Instance) + si.Init(rai, cwtchserver, ms) return func() channels.Handler { cslc := new(listen.CwtchServerListenChannel) return cslc @@ -71,5 +70,4 @@ func (s *Server) Run(serverConfig *Config) { // Shutdown kills the app closing all connections and freeing all goroutines func (s *Server) Shutdown() { s.app.Shutdown() - s.metricsPack.Stop() } diff --git a/server/server_instance_test.go b/server/server_instance_test.go index 14afb32..088fdb1 100644 --- a/server/server_instance_test.go +++ b/server/server_instance_test.go @@ -2,7 +2,6 @@ package server import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/server/metrics" "cwtch.im/cwtch/storage" "git.openprivacy.ca/openprivacy/libricochet-go/application" "os" @@ -16,7 +15,7 @@ func TestServerInstance(t *testing.T) { ra := new(application.RicochetApplication) msi := new(storage.MessageStore) os.Remove("ms.test") - msi.Init("ms.test", 5, metrics.NewCounter()) + msi.Init("ms.test", 5) gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."), Spamguard: []byte{}, diff --git a/storage/message_store.go b/storage/message_store.go index 41ed151..7841d8a 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -3,7 +3,6 @@ package storage import ( "bufio" "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/server/metrics" "encoding/json" "fmt" "log" @@ -19,13 +18,12 @@ type MessageStoreInterface interface { // MessageStore is a file-backed implementation of MessageStoreInterface type MessageStore struct { - file *os.File - lock sync.Mutex - messages []*protocol.GroupMessage - messageCounter metrics.Counter - bufferSize int - pos int - rotated bool + file *os.File + lock sync.Mutex + messages []*protocol.GroupMessage + bufferSize int + pos int + rotated bool } // Close closes the message store and underlying resources. @@ -46,7 +44,7 @@ func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) { } // Init sets up a MessageStore of size bufferSize backed by filename -func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter metrics.Counter) { +func (ms *MessageStore) Init(filename string, bufferSize int) { f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) if err != nil { panic(err) @@ -56,7 +54,6 @@ func (ms *MessageStore) Init(filename string, bufferSize int, messageCounter met ms.bufferSize = bufferSize ms.messages = make([]*protocol.GroupMessage, bufferSize) ms.rotated = false - ms.messageCounter = messageCounter scanner := bufio.NewScanner(f) for scanner.Scan() { @@ -92,7 +89,6 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { // AddMessage adds a GroupMessage to the store func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) { - ms.messageCounter.Add(1) ms.lock.Lock() ms.updateBuffer(&gm) s, err := json.Marshal(gm) diff --git a/storage/message_store_test.go b/storage/message_store_test.go index da10bf9..e6b5407 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -2,7 +2,6 @@ package storage import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/server/metrics" "os" "strconv" "testing" @@ -11,8 +10,7 @@ import ( func TestMessageStore(t *testing.T) { os.Remove("ms.test") ms := new(MessageStore) - counter := metrics.NewCounter() - ms.Init("ms.test", 100000, counter) + ms.Init("ms.test", 100000) for i := 0; i < 50000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), @@ -20,18 +18,13 @@ func TestMessageStore(t *testing.T) { } ms.AddMessage(gm) } - if counter.Count() != 50000 { - t.Errorf("Counter should be at 50000 was %v", counter.Count()) - } ms.Close() - ms.Init("ms.test", 100000, counter) + ms.Init("ms.test", 100000) m := ms.FetchMessages() if len(m) != 50000 { - t.Errorf("Should have been 50000 was %v", len(m)) + t.Errorf("Should have been 5000 was %v", len(m)) } - counter.Reset() - for i := 0; i < 100000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index 0615de6..79b3456 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -28,7 +28,6 @@ var ( carolLines = []string{"Howdy, thanks!"} ) -// TODO: fix to load private key from server/app/serverConfig.json func loadPrivateKey(t *testing.T) *rsa.PrivateKey { if _, err := os.Stat(serverKeyfile); os.IsNotExist(err) { return nil @@ -87,6 +86,7 @@ func serverCheck(t *testing.T, serverAddr string) bool { func waitForPeerConnection(t *testing.T, peer peer.CwtchPeerInterface, server string) { for { servers := peer.GetServers() + fmt.Println(servers) state, ok := servers[server] if ok { if state == connections.FAILED { @@ -298,8 +298,8 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Verify Test ***** - fmt.Println("Final syncing time...") - time.Sleep(time.Second * 30) + // final syncing time... + time.Sleep(time.Second * 15) alicesGroup := alice.GetGroup(groupID) if alicesGroup == nil { @@ -333,37 +333,38 @@ func TestCwtchPeerIntegration(t *testing.T) { if len(alicesGroup.GetTimeline()) != 4 { t.Errorf("Alice's timeline does not have all messages") - } else { - // check message 0,1,2,3 - aliceGroupTimeline := alicesGroup.GetTimeline() - if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || - aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { - t.Errorf("Some of Alice's timeline messages did not have the expected content!") - } + return } if len(bobsGroup.GetTimeline()) != 6 { t.Errorf("Bob's timeline does not have all messages") - } else { - // check message 0,1,2,3,4,5 - bobGroupTimeline := bobsGroup.GetTimeline() - if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || - bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || - bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { - t.Errorf("Some of Bob's timeline messages did not have the expected content!") - } + } + + // check message 0,1,2,3 + aliceGroupTimeline := alicesGroup.GetTimeline() + if aliceGroupTimeline[0].Message != aliceLines[0] || aliceGroupTimeline[1].Message != bobLines[0] || + aliceGroupTimeline[2].Message != aliceLines[1] || aliceGroupTimeline[3].Message != bobLines[1] { + t.Errorf("Some of Alice's timeline messages did not have the expected content!") + } + + // check message 0,1,2,3,4,5 + bobGroupTimeline := bobsGroup.GetTimeline() + if bobGroupTimeline[0].Message != aliceLines[0] || bobGroupTimeline[1].Message != bobLines[0] || + bobGroupTimeline[2].Message != aliceLines[1] || bobGroupTimeline[3].Message != bobLines[1] || + bobGroupTimeline[4].Message != bobLines[2] || bobGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Bob's timeline messages did not have the expected content!") } if len(carolsGroup.GetTimeline()) != 6 { t.Errorf("Carol's timeline does not have all messages") - } else { - // check message 0,1,2,3,4,5 - carolGroupTimeline := carolsGroup.GetTimeline() - if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || - carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || - carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { - t.Errorf("Some of Carol's timeline messages did not have the expected content!") - } + } + + // check message 0,1,2,3,4,5 + carolGroupTimeline := carolsGroup.GetTimeline() + if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || + carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || + carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { + t.Errorf("Some of Carol's timeline messages did not have the expected content!") } fmt.Println("Shutting down Bob...")