Successful listen example
This commit is contained in:
parent
5bc2e90d25
commit
5cd6cc57ee
|
@ -2,6 +2,7 @@ package control
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -106,6 +107,42 @@ func EventCodes() []EventCode {
|
|||
return ret
|
||||
}
|
||||
|
||||
// ErrEventWaitSynchronousResponseOccurred is returned from EventWait (see docs)
|
||||
var ErrEventWaitSynchronousResponseOccurred = errors.New("Synchronous event occurred during EventWait")
|
||||
|
||||
// EventWait waits for the predicate to be satisified or a non-event message to
|
||||
// come through. If a non-event comes through, the error
|
||||
// ErrEventWaitSynchronousResponseOccurred is returned. If there is an error in
|
||||
// the predicate or if the context completes or there is an error internally
|
||||
// handling the event, the error is returned. Otherwise, the event that true was
|
||||
// returned from the predicate for is returned.
|
||||
func (c *Conn) EventWait(
|
||||
ctx context.Context, events []EventCode, predicate func(Event) (bool, error),
|
||||
) (Event, error) {
|
||||
eventCh := make(chan Event, 10)
|
||||
defer close(eventCh)
|
||||
if err := c.AddEventListener(eventCh, events...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.RemoveEventListener(eventCh, events...)
|
||||
eventCtx, eventCancel := context.WithCancel(ctx)
|
||||
defer eventCancel()
|
||||
errCh := make(chan error, 1)
|
||||
go func() { errCh <- c.HandleEvents(eventCtx) }()
|
||||
for {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return nil, err
|
||||
case event := <-eventCh:
|
||||
if ok, err := predicate(event); err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
return event, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HandleEvents loops until the context is closed dispatching async events. Can
|
||||
// dispatch events even after context is done and of course during synchronous
|
||||
// request. This will always end with an error, either from ctx.Done() or from
|
||||
|
@ -159,18 +196,13 @@ func (c *Conn) HandleNextEvent() error {
|
|||
// this is essentially a no-op. The EventCodeUnrecognized event code can be used
|
||||
// to listen for unrecognized events.
|
||||
func (c *Conn) AddEventListener(ch chan<- Event, events ...EventCode) error {
|
||||
// TODO: do we want to set the local map first? Or do we want to lock on the net request too?
|
||||
c.eventListenersLock.Lock()
|
||||
for _, event := range events {
|
||||
// Must completely replace the array, never mutate it
|
||||
prevArr := c.eventListeners[event]
|
||||
newArr := make([]chan<- Event, len(prevArr)+1)
|
||||
copy(newArr, prevArr)
|
||||
newArr[len(newArr)-1] = ch
|
||||
c.eventListeners[event] = newArr
|
||||
c.addEventListenerToMap(ch, events...)
|
||||
// If there is an error updating the events, remove what we just added
|
||||
err := c.sendSetEvents()
|
||||
if err != nil {
|
||||
c.removeEventListenerFromMap(ch, events...)
|
||||
}
|
||||
c.eventListenersLock.Unlock()
|
||||
return c.sendSetEvents()
|
||||
return err
|
||||
}
|
||||
|
||||
// RemoveEventListener removes the given channel from being sent to by the given
|
||||
|
@ -179,7 +211,26 @@ func (c *Conn) AddEventListener(ch chan<- Event, events ...EventCode) error {
|
|||
// no longer be listened to. If no events are provided, this is essentially a
|
||||
// no-op.
|
||||
func (c *Conn) RemoveEventListener(ch chan<- Event, events ...EventCode) error {
|
||||
|
||||
return c.sendSetEvents()
|
||||
}
|
||||
|
||||
func (c *Conn) addEventListenerToMap(ch chan<- Event, events ...EventCode) {
|
||||
c.eventListenersLock.Lock()
|
||||
defer c.eventListenersLock.Unlock()
|
||||
for _, event := range events {
|
||||
// Must completely replace the array, never mutate it
|
||||
prevArr := c.eventListeners[event]
|
||||
newArr := make([]chan<- Event, len(prevArr)+1)
|
||||
copy(newArr, prevArr)
|
||||
newArr[len(newArr)-1] = ch
|
||||
c.eventListeners[event] = newArr
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) removeEventListenerFromMap(ch chan<- Event, events ...EventCode) {
|
||||
c.eventListenersLock.Lock()
|
||||
defer c.eventListenersLock.Unlock()
|
||||
for _, event := range events {
|
||||
arr := c.eventListeners[event]
|
||||
index := -1
|
||||
|
@ -201,8 +252,6 @@ func (c *Conn) RemoveEventListener(ch chan<- Event, events ...EventCode) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
c.eventListenersLock.Unlock()
|
||||
return c.sendSetEvents()
|
||||
}
|
||||
|
||||
func (c *Conn) sendSetEvents() error {
|
||||
|
|
|
@ -150,7 +150,7 @@ func (c *Conn) AddOnion(req *AddOnionRequest) (*AddOnionResponse, error) {
|
|||
if req.Key == nil {
|
||||
return nil, c.protoErr("Key required")
|
||||
}
|
||||
cmd := "ADDONION " + string(req.Key.Type()) + ":" + req.Key.Blob()
|
||||
cmd := "ADD_ONION " + string(req.Key.Type()) + ":" + req.Key.Blob()
|
||||
if len(req.Flags) > 0 {
|
||||
cmd += " Flags=" + strings.Join(req.Flags, ",")
|
||||
}
|
||||
|
@ -197,5 +197,5 @@ func (c *Conn) AddOnion(req *AddOnionRequest) (*AddOnionResponse, error) {
|
|||
|
||||
// DelOnion invokes DELONION.
|
||||
func (c *Conn) DelOnion(serviceID string) error {
|
||||
return c.sendRequestIgnoreResponse("DELONION %v", serviceID)
|
||||
return c.sendRequestIgnoreResponse("DEL_ONION %v", serviceID)
|
||||
}
|
||||
|
|
|
@ -1,25 +1,43 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/cretz/bine/tor"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Start tor with default config
|
||||
t, err := tor.Start(nil)
|
||||
if err != nil {
|
||||
if err := run(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func run() error {
|
||||
// Start tor with default config (can set start conf's DebugWriter to os.Stdout for debug logs)
|
||||
fmt.Println("Starting and registering onion service, please wait a couple of minutes...")
|
||||
t, err := tor.Start(nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer t.Close()
|
||||
// Add a handler
|
||||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("Hello, World!"))
|
||||
w.Write([]byte("Hello, Dark World!"))
|
||||
})
|
||||
// Wait at most a few minutes to publish the service
|
||||
listenCtx, listenCancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||
defer listenCancel()
|
||||
// Create an onion service to listen on 8080 but show as 80
|
||||
l, err := t.Listen(&tor.OnionConf{Port: 80, TargetPort: 8080})
|
||||
onion, err := t.Listen(listenCtx, &tor.ListenConf{LocalPort: 8080, RemotePorts: []int{80}})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return err
|
||||
}
|
||||
defer onion.Close()
|
||||
// Serve on HTTP
|
||||
log.Fatal(http.Serve(l, nil))
|
||||
fmt.Printf("Open Tor browser and navigate to http://%v.onion\n", onion.ID)
|
||||
return http.Serve(onion, nil)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto"
|
||||
"crypto/rsa"
|
||||
"fmt"
|
||||
|
@ -103,14 +104,22 @@ type ListenConf struct {
|
|||
// maximum number of streams is exceeded. If true, the circuit is closed. If
|
||||
// false, the stream is simply not connected but the circuit stays open.
|
||||
MaxStreamsCloseCircuit bool
|
||||
|
||||
// NoWait if true will not wait until the onion service is built. If false,
|
||||
// the network will be enabled if it's not and then we will wait until
|
||||
// the onion service is built.
|
||||
NoWait bool
|
||||
}
|
||||
|
||||
// Listen creates an onion service and local listener. If conf is nil, the
|
||||
// default struct value is used. Note, if this errors, any listeners created
|
||||
// here are closed but if a LocalListener is provided it may remain open.
|
||||
func (t *Tor) Listen(conf *ListenConf) (*OnionService, error) {
|
||||
// Listen creates an onion service and local listener. The context can be nil.
|
||||
// If conf is nil, the default struct value is used. Note, if this errors, any
|
||||
// listeners created here are closed but if a LocalListener is provided it may remain open.
|
||||
func (t *Tor) Listen(ctx context.Context, conf *ListenConf) (*OnionService, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
// Create the service up here and make sure we close it no matter the error within
|
||||
svc := &OnionService{Key: conf.Key, CloseLocalListenerOnClose: conf.LocalListener == nil}
|
||||
svc := &OnionService{Tor: t, Key: conf.Key, CloseLocalListenerOnClose: conf.LocalListener == nil}
|
||||
var err error
|
||||
|
||||
// Create the local listener if necessary
|
||||
|
@ -184,7 +193,7 @@ func (t *Tor) Listen(conf *ListenConf) (*OnionService, error) {
|
|||
if _, ok := svc.LocalListener.(*net.UnixListener); ok {
|
||||
localAddr = "unix:" + localAddr
|
||||
}
|
||||
for _, remotePort := range conf.RemotePorts {
|
||||
for _, remotePort := range svc.RemotePorts {
|
||||
req.Ports = append(req.Ports, &control.KeyVal{Key: strconv.Itoa(remotePort), Val: localAddr})
|
||||
}
|
||||
}
|
||||
|
@ -220,6 +229,41 @@ func (t *Tor) Listen(conf *ListenConf) (*OnionService, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Wait if necessary
|
||||
if err == nil && !conf.NoWait {
|
||||
t.Debugf("Enabling network before waiting for publication")
|
||||
// First make sure network is enabled
|
||||
if err = t.EnableNetwork(ctx, true); err == nil {
|
||||
t.Debugf("Waiting for publication")
|
||||
// Now we'll take a similar approach to Stem. Several UPLOADs are sent out, so we count em. If we see
|
||||
// UPLOADED, we succeeded. If we see failed, we count those. If there are as many failures as uploads, they
|
||||
// all failed and it's a failure. NOTE: unlike Stem's comments that say they don't, we are actually seeing
|
||||
// the service IDs for UPLOADED so we don't keep a map.
|
||||
uploadsAttempted := 0
|
||||
failures := []string{}
|
||||
_, err = t.Control.EventWait(ctx, []control.EventCode{control.EventCodeHSDesc},
|
||||
func(evt control.Event) (bool, error) {
|
||||
hs, _ := evt.(*control.HSDescEvent)
|
||||
if hs != nil && hs.Address == svc.ID {
|
||||
switch hs.Action {
|
||||
case "UPLOAD":
|
||||
uploadsAttempted++
|
||||
case "FAILED":
|
||||
failures = append(failures,
|
||||
fmt.Sprintf("Failed uploading to dir %v - reason: %v", hs.HSDir, hs.Reason))
|
||||
if len(failures) == uploadsAttempted {
|
||||
return false, fmt.Errorf("Failed all uploads, reasons: %v", failures)
|
||||
}
|
||||
case "UPLOADED":
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Give back err and close if there is an err
|
||||
if err != nil {
|
||||
if closeErr := svc.Close(); closeErr != nil {
|
||||
err = fmt.Errorf("Error on listen: %v (also got error trying to close: %v)", err, closeErr)
|
||||
|
@ -252,14 +296,11 @@ func (o *OnionService) String() string {
|
|||
// Close implements net.Listener.Close and deletes the onion service and closes
|
||||
// the LocalListener if CloseLocalListenerOnClose is true.
|
||||
func (o *OnionService) Close() (err error) {
|
||||
o.Tor.Debugf("Closing onion %v", o)
|
||||
// Delete the onion first
|
||||
if o.ID != "" {
|
||||
if o.Tor == nil {
|
||||
err = fmt.Errorf("No Tor object")
|
||||
} else {
|
||||
err = o.Tor.Control.DelOnion(o.ID)
|
||||
o.ID = ""
|
||||
}
|
||||
err = o.Tor.Control.DelOnion(o.ID)
|
||||
o.ID = ""
|
||||
}
|
||||
// Now if the local one needs to be closed, do it
|
||||
if o.CloseLocalListenerOnClose && o.LocalListener != nil {
|
||||
|
@ -272,5 +313,8 @@ func (o *OnionService) Close() (err error) {
|
|||
}
|
||||
o.LocalListener = nil
|
||||
}
|
||||
if err != nil {
|
||||
o.Tor.Debugf("Failed closing onion: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
39
tor/tor.go
39
tor/tor.go
|
@ -243,9 +243,46 @@ func (t *Tor) connectController(ctx context.Context, conf *StartConf) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// EnableNetwork sets DisableNetwork to 0 and optionally waits for bootstrap to
|
||||
// complete. The context can be nil. If DisableNetwork isnt 1, this does
|
||||
// nothing.
|
||||
func (t *Tor) EnableNetwork(ctx context.Context, wait bool) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
// Only enable if DisableNetwork is 1
|
||||
if vals, err := t.Control.GetConf("DisableNetwork"); err != nil {
|
||||
return err
|
||||
} else if len(vals) == 0 || vals[0].Key != "DisableNetwork" || vals[0].Val != "1" {
|
||||
return nil
|
||||
}
|
||||
// Enable the network
|
||||
if err := t.Control.SetConf(control.KeyVals("DisableNetwork", "0")...); err != nil {
|
||||
return nil
|
||||
}
|
||||
// If not waiting, leave
|
||||
if !wait {
|
||||
return nil
|
||||
}
|
||||
// Wait for progress to hit 100
|
||||
_, err := t.Control.EventWait(ctx, []control.EventCode{control.EventCodeStatusClient},
|
||||
func(evt control.Event) (bool, error) {
|
||||
if status, _ := evt.(*control.StatusEvent); status != nil && status.Action == "BOOTSTRAP" {
|
||||
if status.Severity == "NOTICE" && status.Arguments["PROGRESS"] == "100" {
|
||||
return true, nil
|
||||
} else if status.Severity != "NOTICE" {
|
||||
return false, fmt.Errorf("Failing bootstrapping, Tor warning: %v", status.Arguments["WARNING"])
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Close sends a halt to the Tor process if it can, closes the controller
|
||||
// connection, and stops the process.
|
||||
func (t *Tor) Close() error {
|
||||
t.Debugf("Closing Tor")
|
||||
errs := []error{}
|
||||
// If controller is authenticated, send the quit signal to the process. Otherwise, just close the controller.
|
||||
sentHalt := false
|
||||
|
@ -295,7 +332,9 @@ func (t *Tor) Close() error {
|
|||
if len(errs) == 0 {
|
||||
return nil
|
||||
} else if len(errs) == 1 {
|
||||
t.Debugf("Error while closing Tor: %v", errs[0])
|
||||
return errs[0]
|
||||
}
|
||||
t.Debugf("Errors while closing Tor: %v", errs)
|
||||
return fmt.Errorf("Got %v errors while closing - %v", len(errs), errs)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue