From 3cada87147b7636f5a551f120ed927cc732911d4 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 5 Jun 2019 11:44:26 -0700 Subject: [PATCH] New Prototype --- cmd/main.go | 8 +- notifications/main.go | 178 ++++++++++++++++++++++++++++++++++++++++++ primitives/bloom.go | 60 ++++++++++++++ service.go | 70 +++++++++++++---- 4 files changed, 296 insertions(+), 20 deletions(-) create mode 100644 notifications/main.go create mode 100644 primitives/bloom.go diff --git a/cmd/main.go b/cmd/main.go index a4b1ddc..cf2ee34 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -50,13 +50,13 @@ func main() { pk := ed25519.PublicKey(pubkey) id := identity.InitializeV3("server", &sk, &pk) - // Start a Client to Connect to the Server + // Init a Client to Connect to the Server go client(acn, pubkey) - // Start the Server running the Simple App. + // Init the Server running the Simple App. var service tapir.Service service = new(tapir.BaseOnionService) - service.Start(acn, sk, id) + service.Init(acn, sk, id) service.Listen(SimpleApp{}) } @@ -68,7 +68,7 @@ func client(acn connectivity.ACN, key ed25519.PublicKey) { id := identity.InitializeV3("client", &sk, &pk) var client tapir.Service client = new(tapir.BaseOnionService) - client.Start(acn, sk, id) + client.Init(acn, sk, id) cid, _ := client.Connect(utils.GetTorV3Hostname(key), SimpleApp{}) // Once connected, it shouldn't take long to authenticate and run the application. So for the purposes of this demo diff --git a/notifications/main.go b/notifications/main.go new file mode 100644 index 0000000..de7fd2a --- /dev/null +++ b/notifications/main.go @@ -0,0 +1,178 @@ +package main + +import ( + "crypto/rand" + "crypto/sha512" + "cwtch.im/tapir" + "cwtch.im/tapir/primitives" + "encoding/hex" + "encoding/json" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/identity" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "git.openprivacy.ca/openprivacy/libricochet-go/utils" + "golang.org/x/crypto/ed25519" + "os" +) + +// This example implements a basic notification application which allows peers to notify each other of new messages without downloading +// the entire contents of the server. +// NOTE: Very Incomplete Prototype. + +// Notification contains a Topic string and a Message. +type Notification struct { + Topic string // A hex encoded string of the hash of the topic string + Message string +} + +type NotificationClient struct { + tapir.AuthApp + connection *tapir.Connection +} + +// NewInstance should always return a new instantiation of the application. +func (nc NotificationClient) NewInstance() tapir.Application { + app := new(NotificationClient) + return app +} + +// Init is run when the connection is first started. +func (nc * NotificationClient) Init(connection *tapir.Connection) { + // First run the Authentication App + nc.AuthApp.Init(connection) + if connection.HasCapability(tapir.AuthCapability) { + nc.connection = connection + } +} + +// Publish transforms the given topic string into a hashed ID, and sends the ID along with the message +// NOTE: Server learns the hash of the topic (and therefore can correlate repeated use of the same topic) +func (nc NotificationClient) Publish(topic string, message string) { + log.Debugf("Sending Publish Request") + hashedTopic := sha512.Sum512([]byte(topic)) + data,_ := json.Marshal(NotificationRequest{RequestType: "Publish", RequestData: map[string]string{"Topic": hex.EncodeToString(hashedTopic[:])}}) + nc.connection.Send([]byte(data)) +} + +// Check returns true if the server might have notifications related to the topic. +// This check reveals nothing about the topic to the server. +func (nc NotificationClient) Check(topic string) bool { + log.Debugf("Sending Filter Request") + // Get an updated bloom filter + data,_ := json.Marshal(NotificationRequest{RequestType: "BloomFilter", RequestData: map[string]string{}}) + nc.connection.Send(data) + response := nc.connection.Expect() + var bf primitives.BloomFilter + json.Unmarshal(response, &bf) + + // Check the topic handle in the bloom filter + hashedTopic := sha512.Sum512([]byte(topic)) + return bf.Check(hashedTopic[:]) +} + + +type NotificationRequest struct { + RequestType string + RequestData map[string]string +} + +// PIRApp is a trivial implementation of a basic p2p application +type NotificationsServer struct { + tapir.AuthApp + Filter * primitives.BloomFilter +} + +// NewInstance should always return a new instantiation of the application. +func (ns NotificationsServer) NewInstance() tapir.Application { + app := new(NotificationsServer) + app.Filter = ns.Filter + return app +} + +func (ns NotificationsServer) Init(connection *tapir.Connection) { + // First run the Authentication App + ns.AuthApp.Init(connection) + if connection.HasCapability(tapir.AuthCapability) { + for { + request := connection.Expect() + var nr NotificationRequest + json.Unmarshal(request, &nr) + log.Debugf("Received Request %v", nr) + switch nr.RequestType { + case "Publish": + { + log.Debugf("Received Publish Request") + topic := nr.RequestData["Topic"] + // message := nr.RequestData["Message"] + topicID, err := hex.DecodeString(topic) + if err == nil { + ns.Filter.Insert(topicID) + } + } + case "BloomFilter": + { + log.Debugf("Received Filter Request") + response, _ := json.Marshal(ns.Filter) + connection.Send(response) + } + } + } + } +} + +func main() { + + log.SetLevel(log.LevelDebug) + + // Connect to Tor + var acn connectivity.ACN + acn, _ = connectivity.StartTor("./", "") + acn.WaitTillBootstrapped() + + // Generate Server Keys + pubkey, privateKey, _ := ed25519.GenerateKey(rand.Reader) + sk := ed25519.PrivateKey(privateKey) + pk := ed25519.PublicKey(pubkey) + id := identity.InitializeV3("server", &sk, &pk) + + // Init a Client to Connect to the Server + go client(acn, pubkey) + + // Init the Server running the Simple App. + var service tapir.Service + service = new(tapir.BaseOnionService) + service.Init(acn, sk, id) + bf := new(primitives.BloomFilter) + bf.Init(1024) + service.Listen(NotificationsServer{Filter:bf}) +} + +// Client will Connect and launch it's own Echo App goroutine. +func client(acn connectivity.ACN, key ed25519.PublicKey) { + pubkey, privateKey, _ := ed25519.GenerateKey(rand.Reader) + sk := ed25519.PrivateKey(privateKey) + pk := ed25519.PublicKey(pubkey) + id := identity.InitializeV3("client", &sk, &pk) + var client tapir.Service + client = new(tapir.BaseOnionService) + client.Init(acn, sk, id) + + cid, _ := client.Connect(utils.GetTorV3Hostname(key), new(NotificationClient)) + + + conn, err := client.WaitForCapabilityOrClose(cid, tapir.AuthCapability) + if err == nil { + log.Debugf("Client has Auth: %v", conn.HasCapability(tapir.AuthCapability)) + nc := conn.App.(*NotificationClient) + + // Basic Demonstration of Notification + log.Infof("Checking #astronomy: %v", nc.Check("#astronomy")) + log.Infof("Publishing to #astronomy: %v", nc.Check("#astronomy")) + nc.Publish("#astronomy", "New #Astronomy Post!") + log.Infof("Checking #astronomy: %v", nc.Check("#astronomy")) + } + + + os.Exit(0) +} + diff --git a/primitives/bloom.go b/primitives/bloom.go new file mode 100644 index 0000000..e698086 --- /dev/null +++ b/primitives/bloom.go @@ -0,0 +1,60 @@ +package primitives + +import ( + "crypto/sha256" +) + +// BloomFilter implements a bloom filter +type BloomFilter struct { + B []bool +} + + +// Init constructs a bloom filter of size m +func (bf * BloomFilter) Init(m int) { + bf.B = make([]bool, m) +} + +// Hash transforms a message to a set of bit flips +// Supports up to m == 65535 +func (bf BloomFilter) Hash(msg []byte) []int { + hash := sha256.Sum256(msg) + + pos1a := (int(hash[0])+ int(hash[1]) + int(hash[2]) + int(hash[3])) % 0xFF + pos1b := (int(hash[4])+ int(hash[5]) + int(hash[6]) + int(hash[7])) % 0xFF + pos1 := ((pos1a <<8) + pos1b) & (0xFFFF % len(bf.B)) + + pos2a := (int(hash[8])+ int(hash[9]) + int(hash[10]) + int(hash[11])) % 0xFF + pos2b := (int(hash[12])+ int(hash[13]) + int(hash[14]) + int(hash[15])) % 0xFF + pos2 := ((pos2a <<8) + pos2b) & (0xFFFF % len(bf.B)) + + pos3a := (int(hash[16])+ int(hash[17]) + int(hash[18]) + int(hash[19])) % 0xFF + pos3b := (int(hash[20])+ int(hash[21]) + int(hash[22]) + int(hash[23])) % 0xFF + pos3:= ((pos3a <<8) + pos3b) & (0xFFFF % len(bf.B)) + + pos4a := (int(hash[24])+ int(hash[25]) + int(hash[26]) + int(hash[27])) % 0xFF + pos4b := (int(hash[28])+ int(hash[29]) + int(hash[30]) + int(hash[31])) % 0xFF + pos4:= ((pos4a <<8) + pos4b) & (0xFFFF % len(bf.B)) + + return []int{pos1, pos2, pos3, pos4} +} + +// Insert updates the BloomFilter +func (bf * BloomFilter) Insert(msg []byte) { + pos := bf.Hash(msg) + bf.B[pos[0]] = true + bf.B[pos[1]] = true + bf.B[pos[2]] = true + bf.B[pos[3]] = true +} + +// Check returns true if the messages might be in the BloomFilter +// (No false positives, possible false negatives due to the probabilistic nature of the filter) +func (bf BloomFilter) Check(msg []byte) bool { + pos := bf.Hash(msg) + if bf.B[pos[0]] && bf.B[pos[1]] && bf.B[pos[2]] && bf.B[pos[3]]{ + return true + } + return false +} + diff --git a/service.go b/service.go index 746fa0d..4ae4ed2 100644 --- a/service.go +++ b/service.go @@ -13,16 +13,22 @@ import ( "io" "net" "sync" + "time" ) // Service defines the interface for a Tapir Service type Service interface { - Start(acn connectivity.ACN, privateKey ed25519.PrivateKey, identity identity.Identity) + Init(acn connectivity.ACN, privateKey ed25519.PrivateKey, identity identity.Identity) Connect(hostname string, application Application) (string, error) Listen(application Application) error GetConnection(connectionID string) (*Connection, error) + WaitForCapabilityOrClose(connectionID string, capability string) (*Connection, error) } +// This is a fairly arbitrary number that very much depends on the application and the available bandwidth/server +// storage. +const MaxLength = 8192 + // Connection defines a Tapir Connection type Connection struct { hostname string @@ -30,9 +36,10 @@ type Connection struct { capabilities sync.Map encrypted bool key [32]byte - app Application + App Application ID identity.Identity Outbound bool + closed bool } // NewConnection creates a new Connection @@ -40,10 +47,10 @@ func NewConnection(id identity.Identity, hostname string, outbound bool, conn ne connection := new(Connection) connection.hostname = hostname connection.conn = conn - connection.app = app + connection.App = app connection.ID = id connection.Outbound = outbound - go connection.app.Init(connection) + go connection.App.Init(connection) return connection } @@ -65,16 +72,18 @@ func (c *Connection) HasCapability(name string) bool { return ok } + // Expect blocks and reads a single Tapir packet (1024 bytes), from the connection. func (c *Connection) Expect() []byte { - buffer := make([]byte, 1024) + buffer := make([]byte, MaxLength) n, err := io.ReadFull(c.conn, buffer) - if n != 1024 || err != nil { + if n != MaxLength || err != nil { log.Errorf("[%v -> %v] Wire Error Reading, Read %d bytes, Error: %v", c.hostname, c.ID.Hostname(), n, err) + c.conn.Close() + c.closed = true return []byte{} } - //log.Debugf("[%v -> %v] Wire Receive: %x", c.hostname, c.ID.Hostname(), buffer) if c.encrypted { var decryptNonce [24]byte @@ -84,10 +93,13 @@ func (c *Connection) Expect() []byte { copy(buffer, decrypted) } else { log.Errorf("[%v -> %v] Error Decrypting Message On Wire", c.hostname, c.ID.Hostname()) + c.conn.Close() + c.closed = true return []byte{} } } len, _ := binary.Uvarint(buffer[0:2]) + //log.Debugf("[%v -> %v] Wire Receive: (%d) %x", c.hostname, c.ID.Hostname(), len, buffer) return buffer[2 : len+2] } @@ -100,22 +112,27 @@ func (c *Connection) SetEncryptionKey(key [32]byte) { // Send writes a given message to a Tapir packet (of 1024 bytes in length). func (c *Connection) Send(message []byte) { - maxLength := 1024 - buffer := make([]byte, maxLength) + buffer := make([]byte, MaxLength) binary.PutUvarint(buffer[0:2], uint64(len(message))) copy(buffer[2:], message) if c.encrypted { var nonce [24]byte if _, err := io.ReadFull(rand.Reader, nonce[:]); err != nil { - panic(err) + // TODO: Surface is Erro + c.conn.Close() + c.closed = true } // MaxLength - 40 = MaxLength - 24 nonce bytes and 16 auth tag. - encrypted := secretbox.Seal(nonce[:], buffer[0:maxLength-40], &nonce, &c.key) - copy(buffer, encrypted[0:1024]) + encrypted := secretbox.Seal(nonce[:], buffer[0:MaxLength-40], &nonce, &c.key) + copy(buffer, encrypted[0:MaxLength]) + } + log.Debugf("[%v -> %v] Wire Send %x", c.ID.Hostname(), c.hostname, buffer) + _, err := c.conn.Write(buffer) + if err != nil { + c.conn.Close() + c.closed = true } - //log.Debugf("[%v -> %v] Wire Send %x", c.ID.Hostname(), c.hostname, buffer) - c.conn.Write(buffer) } // BaseOnionService is a concrete implementation of the service interface over Tor onion services. @@ -126,9 +143,9 @@ type BaseOnionService struct { privateKey ed25519.PrivateKey } -// Start initializes a BaseOnionService with a given private key and identity +// Init initializes a BaseOnionService with a given private key and identity // The private key is needed to initialize the Onion listen socket, ideally we could just pass an Identity in here. -func (s *BaseOnionService) Start(acn connectivity.ACN, sk ed25519.PrivateKey, id identity.Identity) { +func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id identity.Identity) { // run add onion // get listen context s.acn = acn @@ -136,6 +153,27 @@ func (s *BaseOnionService) Start(acn connectivity.ACN, sk ed25519.PrivateKey, id s.privateKey = sk } + + +// WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed +// (through error or user action) +func (s * BaseOnionService) WaitForCapabilityOrClose(cid string, name string) (*Connection, error) { + conn, err := s.GetConnection(cid) + if err == nil { + for { + if conn.HasCapability(name) { + return conn, nil + } + if conn.closed { + return nil, errors.New("connection is closed") + } + time.Sleep(time.Millisecond * 200) + } + } + return nil, err +} + + // GetConnection returns a connection for a given hostname. func (s *BaseOnionService) GetConnection(connectionID string) (*Connection, error) { conn, ok := s.connections.Load(connectionID)