From 60450e81babe16ccefc5c5bc83e18ea9c93e0782 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 10 Jun 2019 13:36:18 -0700 Subject: [PATCH] Moving towards a workable notifications prototype --- .gitignore | 1 + notifications/main.go | 60 +++++++++++++++++++++++++++++++------------ primitives/bloom.go | 4 +-- primitives/time.go | 20 +++++++++++++++ 4 files changed, 66 insertions(+), 19 deletions(-) create mode 100644 primitives/time.go diff --git a/.gitignore b/.gitignore index 31e3ac6..5089fe7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ vendor/ .idea +/tor/ diff --git a/notifications/main.go b/notifications/main.go index c3cd057..d83534e 100644 --- a/notifications/main.go +++ b/notifications/main.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "compress/gzip" "crypto/rand" "crypto/sha512" "cwtch.im/tapir" @@ -14,7 +16,9 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/utils" "golang.org/x/crypto/ed25519" + "io/ioutil" "os" + "time" ) // This example implements a basic notification application which allows peers to notify each other of new messages without downloading @@ -65,12 +69,14 @@ func (nc NotificationClient) Check(topic string) bool { data, _ := json.Marshal(notificationRequest{RequestType: "BloomFilter", RequestData: map[string]string{}}) nc.connection.Send(data) response := nc.connection.Expect() - var bf primitives.BloomFilter - json.Unmarshal(response, &bf) + var bf []primitives.BloomFilter + r, _ := gzip.NewReader(bytes.NewReader(response)) + bfb, _ := ioutil.ReadAll(r) + json.Unmarshal(bfb, &bf) // Check the topic handle in the bloom filter hashedTopic := sha512.Sum512([]byte(topic)) - return bf.Check(hashedTopic[:]) + return bf[time.Now().Hour()].Check(hashedTopic[:]) } type notificationRequest struct { @@ -81,16 +87,30 @@ type notificationRequest struct { // NotificationsServer implements the metadata resistant notifications server type NotificationsServer struct { applications.AuthApp - Filter *primitives.BloomFilter + Filter []*primitives.BloomFilter + timeProvider primitives.TimeProvider } +const DefaultNumberOfBuckets = 24 // 1 per hour of the day + // NewInstance should always return a new instantiation of the application. func (ns NotificationsServer) NewInstance() tapir.Application { app := new(NotificationsServer) - app.Filter = ns.Filter + + app.timeProvider = new(primitives.OSTimeProvider) + app.Filter = make([]*primitives.BloomFilter, DefaultNumberOfBuckets) + for i := range app.Filter { + app.Filter[i] = new(primitives.BloomFilter) + app.Filter[i].Init(1024) + } return app } +// Configure overrides the default parameters for the Notification Server +func (ns NotificationsServer) Configure(timeProvider primitives.TimeProvider) { + ns.timeProvider = timeProvider +} + // Init initializes the application. func (ns NotificationsServer) Init(connection *tapir.Connection) { // First run the Authentication App @@ -103,17 +123,22 @@ func (ns NotificationsServer) Init(connection *tapir.Connection) { log.Debugf("Received Request %v", nr) switch nr.RequestType { case "Publish": - log.Debugf("Received Publish Request") - topic := nr.RequestData["Topic"] - // message := nr.RequestData["Message"] - topicID, err := hex.DecodeString(topic) - if err == nil { - ns.Filter.Insert(topicID) - } + log.Debugf("Received Publish Request") + topic := nr.RequestData["Topic"] + // message := nr.RequestData["Message"] + topicID, err := hex.DecodeString(topic) + if err == nil { + currentBucket := ns.timeProvider.GetCurrentTime().Hour() + ns.Filter[currentBucket].Insert(topicID) + } case "BloomFilter": - log.Debugf("Received Filter Request") - response, _ := json.Marshal(ns.Filter) - connection.Send(response) + log.Debugf("Received Filter Request") + response, _ := json.Marshal(ns.Filter) + var b bytes.Buffer + w := gzip.NewWriter(&b) + w.Write(response) + w.Close() + connection.Send(b.Bytes()) } } } @@ -142,8 +167,9 @@ func main() { service = new(tor.BaseOnionService) service.Init(acn, sk, id) bf := new(primitives.BloomFilter) - bf.Init(1024) - service.Listen(NotificationsServer{Filter: bf}) + bf.Init(256) + var ns NotificationsServer + service.Listen(ns) } // Client will Connect and launch it's own Echo App goroutine. diff --git a/primitives/bloom.go b/primitives/bloom.go index 877180b..fe558a9 100644 --- a/primitives/bloom.go +++ b/primitives/bloom.go @@ -7,12 +7,12 @@ import ( // BloomFilter implements a bloom filter type BloomFilter struct { - B []bool + B []bool lock sync.Mutex } // Init constructs a bloom filter of size m -func (bf *BloomFilter) Init(m int) { +func (bf *BloomFilter) Init(m int16) { bf.B = make([]bool, m) } diff --git a/primitives/time.go b/primitives/time.go new file mode 100644 index 0000000..1956360 --- /dev/null +++ b/primitives/time.go @@ -0,0 +1,20 @@ +package primitives + +import ( + "time" +) + +// TimeProvider is an interface used by services to timestamp events. Why not just have them use time.Now()? We want +// to be able to write tests that simulate behavior over several hours, and thus having an interface to abstract away +// time details for the services is very useful. +type TimeProvider interface { + GetCurrentTime() time.Time +} + +// OSTimeProvider provides a wrapper around time provider which simply provides the time as given by the operating system. +type OSTimeProvider struct { +} + +func (ostp OSTimeProvider) GetCurrentTime() time.Time { + return time.Now() +}