Remove the periodic refill event entirely.

Now that we update our buckets on demand before reading or writing,
we no longer need to update them all every TokenBucketRefillInterval
msec.

When a connection runs out of bandwidth, we do need a way to
reenable it, however.  We do this by scheduling a timer to reenable
all blocked connections for TokenBucketRefillInterval msec after a
connection becomes blocked.

(If we were using PerConnBWRate more, it might make sense to have a
per-connection timer, rather than a single timeout. But since
PerConnBWRate is currently (mostly) unused, I'm going to go for the
simpler approach here, since usually whenever one connection has
become blocked on bandwidth, most connections are blocked on
bandwidth.)

Implements ticket 25373.
This commit is contained in:
Nick Mathewson 2018-04-17 18:20:03 -04:00
parent 780d1b44cf
commit 47df912f1c
4 changed files with 88 additions and 100 deletions

7
changes/bug25373 Normal file
View File

@ -0,0 +1,7 @@
o Major features (main loop, CPU wakeup):
- The bandwidth-limitation logic has been refactored so that
bandwidth calculations are performed on-demand, rather than
every TokenBucketRefillInterval milliseconds.
This change should improve the granularity of our bandwidth
calculations, and limit the number of times that the Tor process needs
to wake up when it is idle. Closes ticket 25373.

View File

@ -1285,9 +1285,11 @@ The following options are useful only for clients (that is, if
2 minutes)
[[TokenBucketRefillInterval]] **TokenBucketRefillInterval** __NUM__ [**msec**|**second**]::
Set the refill interval of Tor's token bucket to NUM milliseconds.
NUM must be between 1 and 1000, inclusive. Note that the configured
bandwidth limits are still expressed in bytes per second: this
Set the refill delay interval of Tor's token bucket to NUM milliseconds.
NUM must be between 1 and 1000, inclusive. When Tor is out of bandwidth,
on a connection or globally, it will wait up to this long before it tries
to use that connection again.
Note that bandwidth limits are still expressed in bytes per second: this
option only affects the frequency with which Tor checks to see whether
previously exhausted connections may read again.
Can not be changed while tor is running. (Default: 100 msec)

View File

@ -138,6 +138,8 @@ static const char *proxy_type_to_string(int proxy_type);
static int get_proxy_type(void);
const tor_addr_t *conn_get_outbound_address(sa_family_t family,
const or_options_t *options, unsigned int conn_type);
static void blocked_connection_reenable_init(const or_options_t *options);
static void schedule_blocked_connection_reenable(void);
/** The last addresses that our network interface seemed to have been
* binding to. We use this as one way to detect when our IP changes.
@ -3091,6 +3093,7 @@ connection_read_bw_exhausted(connection_t *conn, bool is_global_bw)
(void)is_global_bw;
conn->read_blocked_on_bw = 1;
connection_stop_reading(conn);
schedule_blocked_connection_reenable();
}
/**
@ -3105,6 +3108,7 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
(void)is_global_bw;
conn->write_blocked_on_bw = 1;
connection_stop_reading(conn);
schedule_blocked_connection_reenable();
}
/** If we have exhausted our global buckets, or the buckets for conn,
@ -3117,7 +3121,8 @@ connection_consider_empty_read_buckets(connection_t *conn)
if (!connection_is_rate_limited(conn))
return; /* Always okay. */
bool is_global = true;
int is_global = 1;
if (token_bucket_rw_get_read(&global_bucket) <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
@ -3185,6 +3190,8 @@ connection_bucket_init(void)
(int32_t)options->BandwidthBurst,
now_ts);
}
blocked_connection_reenable_init(options);
}
/** Update the global connection bucket settings to a new value. */
@ -3233,55 +3240,76 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
}
}
/** Time has passed; increment buckets appropriately and re-enable formerly
* blocked connections. */
void
connection_bucket_refill_all(time_t now, uint32_t now_ts)
/**
* Event to re-enable all connections that were previously blocked on read or
* write.
*/
static mainloop_event_t *reenable_blocked_connections_ev = NULL;
/** True iff reenable_blocked_connections_ev is currently scheduled. */
static int reenable_blocked_connections_is_scheduled = 0;
/** Delay after which to run reenable_blocked_connections_ev. */
static struct timeval reenable_blocked_connections_delay;
/**
* Re-enable all connections that were previously blocked on read or write.
* This event is scheduled after enough time has elapsed to be sure
* that the buckets will refill when the connections have something to do.
*/
static void
reenable_blocked_connections_cb(mainloop_event_t *ev, void *arg)
{
smartlist_t *conns = get_connection_array();
/* refill the global buckets */
token_bucket_rw_refill(&global_bucket, now_ts);
token_bucket_rw_refill(&global_relayed_bucket, now_ts);
last_refilled_global_buckets_ts = now_ts;
/* refill the per-connection buckets */
SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
if (connection_speaks_cells(conn)) {
or_connection_t *or_conn = TO_OR_CONN(conn);
if (conn->state == OR_CONN_STATE_OPEN) {
token_bucket_rw_refill(&or_conn->bucket, now_ts);
}
}
if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
&& token_bucket_rw_get_read(&global_bucket) > 0 /* and we can read */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
token_bucket_rw_get_read(&global_relayed_bucket) > 0)
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) > 0)) {
/* and either a non-cell conn or a cell conn with non-empty bucket */
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for read", (int)conn->s));
conn->read_blocked_on_bw = 0;
(void)ev;
(void)arg;
SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) {
if (conn->read_blocked_on_bw == 1) {
connection_start_reading(conn);
conn->read_blocked_on_bw = 0;
}
if (conn->write_blocked_on_bw == 1
&& token_bucket_rw_get_write(&global_bucket) > 0 /* and we can write */
&& (!connection_counts_as_relayed_traffic(conn, now) ||
token_bucket_rw_get_write(&global_relayed_bucket) > 0)
&& (!connection_speaks_cells(conn) ||
conn->state != OR_CONN_STATE_OPEN ||
token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) > 0)) {
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
"waking up conn (fd %d) for write", (int)conn->s));
conn->write_blocked_on_bw = 0;
if (conn->write_blocked_on_bw == 1) {
connection_start_writing(conn);
conn->write_blocked_on_bw = 0;
}
} SMARTLIST_FOREACH_END(conn);
reenable_blocked_connections_is_scheduled = 0;
}
/**
* Initialize the mainloop event that we use to wake up connections that
* find themselves blocked on bandwidth.
*/
static void
blocked_connection_reenable_init(const or_options_t *options)
{
if (! reenable_blocked_connections_ev) {
reenable_blocked_connections_ev =
mainloop_event_new(reenable_blocked_connections_cb, NULL);
reenable_blocked_connections_is_scheduled = 0;
}
time_t sec = options->TokenBucketRefillInterval / 1000;
int msec = (options->TokenBucketRefillInterval % 1000);
reenable_blocked_connections_delay.tv_sec = sec;
reenable_blocked_connections_delay.tv_usec = msec * 1000;
}
/**
* Called when we have blocked a connection for being low on bandwidth:
* schedule an event to reenable such connections, if it is not already
* scheduled.
*/
static void
schedule_blocked_connection_reenable(void)
{
if (reenable_blocked_connections_is_scheduled)
return;
if (BUG(reenable_blocked_connections_ev == NULL)) {
blocked_connection_reenable_init(get_options());
}
mainloop_event_schedule(reenable_blocked_connections_ev,
&reenable_blocked_connections_delay);
reenable_blocked_connections_is_scheduled = 1;
}
/** Read bytes from conn-\>s and process them.
@ -5216,6 +5244,10 @@ connection_free_all(void)
tor_free(last_interface_ipv4);
tor_free(last_interface_ipv6);
last_recorded_accounting_at = 0;
mainloop_event_free(reenable_blocked_connections_ev);
reenable_blocked_connections_is_scheduled = 0;
memset(&reenable_blocked_connections_delay, 0, sizeof(struct timeval));
}
/** Log a warning, and possibly emit a control event, that <b>received</b> came

View File

@ -2370,43 +2370,6 @@ systemd_watchdog_callback(periodic_timer_t *timer, void *arg)
}
#endif /* defined(HAVE_SYSTEMD_209) */
/** Timer: used to invoke refill_callback(). */
static periodic_timer_t *refill_timer = NULL;
/** Millisecond when refall_callback was last invoked. */
static struct timeval refill_timer_current_millisecond;
/** Libevent callback: invoked periodically to refill token buckets
* and count r/w bytes. */
static void
refill_callback(periodic_timer_t *timer, void *arg)
{
struct timeval now;
int milliseconds_elapsed = 0;
(void)timer;
(void)arg;
tor_gettimeofday(&now);
/* If this is our first time, no time has passed. */
if (refill_timer_current_millisecond.tv_sec) {
long mdiff = tv_mdiff(&refill_timer_current_millisecond, &now);
if (mdiff > INT_MAX)
mdiff = INT_MAX;
milliseconds_elapsed = (int)mdiff;
}
if (milliseconds_elapsed > 0) {
connection_bucket_refill_all((time_t)now.tv_sec,
monotime_coarse_get_stamp());
}
/* remember what time it is, for next time */
refill_timer_current_millisecond = now;
}
#ifndef _WIN32
/** Called when a possibly ignorable libevent error occurs; ensures that we
* don't get into an infinite loop by ignoring too many errors from
@ -2707,20 +2670,6 @@ do_main_loop(void)
}
#endif /* defined(HAVE_SYSTEMD_209) */
if (!refill_timer) {
struct timeval refill_interval;
int msecs = get_options()->TokenBucketRefillInterval;
refill_interval.tv_sec = msecs/1000;
refill_interval.tv_usec = (msecs%1000)*1000;
refill_timer = periodic_timer_new(tor_libevent_get_base(),
&refill_interval,
refill_callback,
NULL);
tor_assert(refill_timer);
}
#ifdef HAVE_SYSTEMD
{
const int r = sd_notify(0, "READY=1");
@ -3477,7 +3426,6 @@ tor_free_all(int postfork)
smartlist_free(active_linked_connection_lst);
periodic_timer_free(second_timer);
teardown_periodic_events();
periodic_timer_free(refill_timer);
tor_event_free(shutdown_did_not_work_event);
tor_event_free(initialize_periodic_events_event);
mainloop_event_free(directory_all_unreachable_cb_event);
@ -3505,7 +3453,6 @@ tor_free_all(int postfork)
heartbeat_callback_first_time = 1;
n_libevent_errors = 0;
current_second = 0;
memset(&refill_timer_current_millisecond, 0, sizeof(struct timeval));
if (!postfork) {
release_lockfile();