diff --git a/.gitignore b/.gitignore index 92fd040..31e3ac6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ -tor/ vendor/ .idea diff --git a/networks/tor/BaseOnionService.go b/networks/tor/BaseOnionService.go new file mode 100644 index 0000000..ec62a8b --- /dev/null +++ b/networks/tor/BaseOnionService.go @@ -0,0 +1,108 @@ +package tor + +import ( + "crypto/rand" + "cwtch.im/tapir" + "encoding/base64" + "errors" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/identity" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "golang.org/x/crypto/ed25519" + "sync" + "time" +) + +// BaseOnionService is a concrete implementation of the service interface over Tor onion services. +type BaseOnionService struct { + connections sync.Map + acn connectivity.ACN + id identity.Identity + privateKey ed25519.PrivateKey +} + +// Init initializes a BaseOnionService with a given private key and identity +// The private key is needed to initialize the Onion listen socket, ideally we could just pass an Identity in here. +func (s *BaseOnionService) Init(acn connectivity.ACN, sk ed25519.PrivateKey, id identity.Identity) { + // run add onion + // get listen context + s.acn = acn + s.id = id + s.privateKey = sk +} + +// WaitForCapabilityOrClose blocks until the connection has the given capability or the underlying connection is closed +// (through error or user action) +func (s *BaseOnionService) WaitForCapabilityOrClose(cid string, name string) (*tapir.Connection, error) { + conn, err := s.GetConnection(cid) + if err == nil { + for { + if conn.HasCapability(name) { + return conn, nil + } + if conn.Closed { + return nil, errors.New("connection is closed") + } + time.Sleep(time.Millisecond * 200) + } + } + return nil, err +} + +// GetConnection returns a connection for a given hostname. +func (s *BaseOnionService) GetConnection(connectionID string) (*tapir.Connection, error) { + conn, ok := s.connections.Load(connectionID) + if !ok { + return nil, errors.New("no connection found") + } + connection := conn.(*tapir.Connection) + return connection, nil +} + +// Connect initializes a new outbound connection to the given peer, using the defined Application +func (s *BaseOnionService) Connect(hostname string, app tapir.Application) (string, error) { + // connects to a remote server + // spins off to a connection struct + log.Debugf("Connecting to %v", hostname) + conn, _, err := s.acn.Open(hostname) + if err == nil { + connectionID := s.getNewConnectionID() + log.Debugf("Connected to %v [%v]", hostname, connectionID) + s.connections.Store(connectionID, tapir.NewConnection(s.id, hostname, true, conn, app.NewInstance())) + return connectionID, nil + } + log.Debugf("Error connecting to %v %v", hostname, err) + return "", err +} + +func (s *BaseOnionService) getNewConnectionID() string { + id := make([]byte, 10) + rand.Read(id) + connectionID := "connection-" + base64.StdEncoding.EncodeToString(id) + return connectionID +} + +// Listen starts a blocking routine that waits for incoming connections and initializes connections with them based +// on the given Application. +func (s *BaseOnionService) Listen(app tapir.Application) error { + // accepts a new connection + // spins off to a connection struct + ls, err := s.acn.Listen(s.privateKey, 9878) + log.Debugf("Starting a service on %v ", ls.AddressFull()) + if err == nil { + for { + conn, err := ls.Accept() + if err == nil { + tempHostname := s.getNewConnectionID() + log.Debugf("Accepted connection from %v", tempHostname) + s.connections.Store(tempHostname, tapir.NewConnection(s.id, tempHostname, false, conn, app.NewInstance())) + } else { + ls.Close() + log.Debugf("Error accepting connection %v", err) + return err + } + } + } + log.Debugf("Error listening to connection %v", err) + return err +} diff --git a/notifications/main.go b/notifications/main.go index a887869..c3cd057 100644 --- a/notifications/main.go +++ b/notifications/main.go @@ -53,7 +53,7 @@ func (nc *NotificationClient) Init(connection *tapir.Connection) { func (nc NotificationClient) Publish(topic string, message string) { log.Debugf("Sending Publish Request") hashedTopic := sha512.Sum512([]byte(topic)) - data, _ := json.Marshal(NotificationRequest{RequestType: "Publish", RequestData: map[string]string{"Topic": hex.EncodeToString(hashedTopic[:])}}) + data, _ := json.Marshal(notificationRequest{RequestType: "Publish", RequestData: map[string]string{"Topic": hex.EncodeToString(hashedTopic[:])}}) nc.connection.Send([]byte(data)) } @@ -62,7 +62,7 @@ func (nc NotificationClient) Publish(topic string, message string) { func (nc NotificationClient) Check(topic string) bool { log.Debugf("Sending Filter Request") // Get an updated bloom filter - data, _ := json.Marshal(NotificationRequest{RequestType: "BloomFilter", RequestData: map[string]string{}}) + data, _ := json.Marshal(notificationRequest{RequestType: "BloomFilter", RequestData: map[string]string{}}) nc.connection.Send(data) response := nc.connection.Expect() var bf primitives.BloomFilter @@ -103,7 +103,6 @@ func (ns NotificationsServer) Init(connection *tapir.Connection) { log.Debugf("Received Request %v", nr) switch nr.RequestType { case "Publish": - { log.Debugf("Received Publish Request") topic := nr.RequestData["Topic"] // message := nr.RequestData["Message"] @@ -111,13 +110,10 @@ func (ns NotificationsServer) Init(connection *tapir.Connection) { if err == nil { ns.Filter.Insert(topicID) } - } case "BloomFilter": - { log.Debugf("Received Filter Request") response, _ := json.Marshal(ns.Filter) connection.Send(response) - } } } }