From 488e2b00bf881b97bcc8e4bbe304845ff1d79a03 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 11:39:16 -0400 Subject: [PATCH 01/10] Refactor the "block the connection on bandwidth" logic Right now, this patch just introduces and exposes some new functions. Later, these functions will get a little more complexity. --- src/or/connection.c | 49 ++++++++++++++++++++++++++++++++++++--------- src/or/connection.h | 4 ++++ src/or/main.c | 15 ++++++++------ src/or/or.h | 1 + 4 files changed, 53 insertions(+), 16 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index 957398985..78befee0c 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -772,8 +772,8 @@ connection_close_immediate(connection_t *conn) connection_unregister_events(conn); /* Prevent the event from getting unblocked. */ - conn->read_blocked_on_bw = - conn->write_blocked_on_bw = 0; + conn->read_blocked_on_bw = 0; + conn->write_blocked_on_bw = 0; if (SOCKET_OK(conn->s)) tor_close_socket(conn->s); @@ -3052,9 +3052,37 @@ connection_buckets_decrement(connection_t *conn, time_t now, } } +/** + * Mark conn as needing to stop reading because bandwidth has been + * exhausted. If is_global_bw, it is closing because global bandwidth + * limit has been exhausted. Otherwise, it is closing because its own + * bandwidth limit has been exhausted. + */ +void +connection_read_bw_exhausted(connection_t *conn, bool is_global_bw) +{ + (void)is_global_bw; + conn->read_blocked_on_bw = 1; + connection_stop_reading(conn); +} + +/** + * Mark conn as needing to stop reading because write bandwidth has + * been exhausted. If is_global_bw, it is closing because global + * bandwidth limit has been exhausted. Otherwise, it is closing because its + * own bandwidth limit has been exhausted. +*/ +void +connection_write_bw_exhausted(connection_t *conn, bool is_global_bw) +{ + (void)is_global_bw; + conn->write_blocked_on_bw = 1; + connection_stop_reading(conn); +} + /** If we have exhausted our global buckets, or the buckets for conn, * stop reading. */ -static void +void connection_consider_empty_read_buckets(connection_t *conn) { const char *reason; @@ -3062,6 +3090,7 @@ connection_consider_empty_read_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ + bool is_global = true; if (token_bucket_rw_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && @@ -3071,17 +3100,17 @@ connection_consider_empty_read_buckets(connection_t *conn) conn->state == OR_CONN_STATE_OPEN && token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; + is_global = false; } else return; /* all good, no need to stop it */ LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); - conn->read_blocked_on_bw = 1; - connection_stop_reading(conn); + connection_read_bw_exhausted(conn, is_global); } /** If we have exhausted our global buckets, or the buckets for conn, * stop writing. */ -static void +void connection_consider_empty_write_buckets(connection_t *conn) { const char *reason; @@ -3089,6 +3118,7 @@ connection_consider_empty_write_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ + bool is_global = true; if (token_bucket_rw_get_write(&global_bucket) <= 0) { reason = "global write bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && @@ -3098,12 +3128,12 @@ connection_consider_empty_write_buckets(connection_t *conn) conn->state == OR_CONN_STATE_OPEN && token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection write bucket exhausted. Pausing."; + is_global = false; } else return; /* all good, no need to stop it */ LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); - conn->write_blocked_on_bw = 1; - connection_stop_writing(conn); + connection_write_bw_exhausted(conn, is_global); } /** Initialize the global buckets to the values configured in the @@ -3768,8 +3798,7 @@ connection_handle_write_impl(connection_t *conn, int force) /* Make sure to avoid a loop if the receive buckets are empty. */ log_debug(LD_NET,"wanted read."); if (!connection_is_reading(conn)) { - connection_stop_writing(conn); - conn->write_blocked_on_bw = 1; + connection_write_bw_exhausted(conn, true); /* we'll start reading again when we get more tokens in our * read bucket; then we'll start writing again too. */ diff --git a/src/or/connection.h b/src/or/connection.h index cfe31c372..83e2bd543 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -125,6 +125,10 @@ void connection_bucket_init(void); void connection_bucket_adjust(const or_options_t *options); void connection_bucket_refill(time_t now, uint32_t now_ts); +void connection_read_bw_exhausted(connection_t *conn, bool is_global_bw); +void connection_write_bw_exhausted(connection_t *conn, bool is_global_bw); +void connection_consider_empty_read_buckets(connection_t *conn); +void connection_consider_empty_write_buckets(connection_t *conn); int connection_handle_read(connection_t *conn); diff --git a/src/or/main.c b/src/or/main.c index a852d3273..e21ef24f8 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -1025,19 +1025,22 @@ conn_close_if_marked(int i) * busy Libevent loops where we keep ending up here and returning * 0 until we are no longer blocked on bandwidth. */ - if (connection_is_writing(conn)) { - conn->write_blocked_on_bw = 1; - connection_stop_writing(conn); + connection_consider_empty_read_buckets(conn); + connection_consider_empty_write_buckets(conn); + + /* Make sure that consider_empty_buckets really disabled the + * connection: */ + if (BUG(connection_is_writing(conn))) { + connection_write_bw_exhausted(conn, true); } - if (connection_is_reading(conn)) { + if (BUG(connection_is_reading(conn))) { /* XXXX+ We should make this code unreachable; if a connection is * marked for close and flushing, there is no point in reading to it * at all. Further, checking at this point is a bit of a hack: it * would make much more sense to react in * connection_handle_read_impl, or to just stop reading in * mark_and_flush */ - conn->read_blocked_on_bw = 1; - connection_stop_reading(conn); + connection_read_bw_exhausted(conn, true/* kludge. */); } } return 0; diff --git a/src/or/or.h b/src/or/or.h index c5a039e93..e27f25197 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -57,6 +57,7 @@ #ifdef HAVE_TIME_H #include #endif +#include #ifdef _WIN32 #include From 993f5d284d3a61b7bc397ad3671dc0ebd44b891b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 11:42:14 -0400 Subject: [PATCH 02/10] Rename connection_bucket_round_robin -> get_share There was nothing round_robinish about this function. --- src/or/connection.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index 78befee0c..1e308e9b4 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2845,14 +2845,14 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now) * write many of them or just a few; and conn_bucket (if * non-negative) provides an upper limit for our answer. */ static ssize_t -connection_bucket_round_robin(int base, int priority, - ssize_t global_bucket_val, ssize_t conn_bucket) +connection_bucket_get_share(int base, int priority, + ssize_t global_bucket_val, ssize_t conn_bucket) { ssize_t at_most; ssize_t num_bytes_high = (priority ? 32 : 16) * base; ssize_t num_bytes_low = (priority ? 4 : 2) * base; - /* Do a rudimentary round-robin so one circuit can't hog a connection. + /* Do a rudimentary limiting so one circuit can't hog a connection. * Pick at most 32 cells, at least 4 cells if possible, and if we're in * the middle pick 1/8 of the available bandwidth. */ at_most = global_bucket_val / 8; @@ -2899,8 +2899,8 @@ connection_bucket_read_limit(connection_t *conn, time_t now) global_bucket_val = MIN(global_bucket_val, relayed); } - return connection_bucket_round_robin(base, priority, - global_bucket_val, conn_bucket); + return connection_bucket_get_share(base, priority, + global_bucket_val, conn_bucket); } /** How many bytes at most can we write onto this connection? */ @@ -2931,8 +2931,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now) global_bucket_val = MIN(global_bucket_val, relayed); } - return connection_bucket_round_robin(base, priority, - global_bucket_val, conn_bucket); + return connection_bucket_get_share(base, priority, + global_bucket_val, conn_bucket); } /** Return 1 if the global write buckets are low enough that we From 1356d51af62f839bb29eb150e6cb37edcef24d4f Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 11:46:23 -0400 Subject: [PATCH 03/10] Rename connection_bucket_refill to connection_bucket_refill_all Also document its actual behavior --- src/or/connection.c | 5 +++-- src/or/connection.h | 4 ++-- src/or/main.c | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index 1e308e9b4..631d665af 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -3178,9 +3178,10 @@ connection_bucket_adjust(const or_options_t *options) } } -/** Time has passed; increment buckets appropriately. */ +/** Time has passed; increment buckets appropriately and re-enable formerly + * blocked connections. */ void -connection_bucket_refill(time_t now, uint32_t now_ts) +connection_bucket_refill_all(time_t now, uint32_t now_ts) { smartlist_t *conns = get_connection_array(); diff --git a/src/or/connection.h b/src/or/connection.h index 83e2bd543..a2dce2435 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -123,8 +123,8 @@ ssize_t connection_bucket_write_limit(connection_t *conn, time_t now); int global_write_bucket_low(connection_t *conn, size_t attempt, int priority); void connection_bucket_init(void); void connection_bucket_adjust(const or_options_t *options); -void connection_bucket_refill(time_t now, - uint32_t now_ts); +void connection_bucket_refill_all(time_t now, + uint32_t now_ts); void connection_read_bw_exhausted(connection_t *conn, bool is_global_bw); void connection_write_bw_exhausted(connection_t *conn, bool is_global_bw); void connection_consider_empty_read_buckets(connection_t *conn); diff --git a/src/or/main.c b/src/or/main.c index e21ef24f8..3217e2d9b 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -2407,8 +2407,8 @@ refill_callback(periodic_timer_t *timer, void *arg) accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over); if (milliseconds_elapsed > 0) { - connection_bucket_refill((time_t)now.tv_sec, - monotime_coarse_get_stamp()); + connection_bucket_refill_all((time_t)now.tv_sec, + monotime_coarse_get_stamp()); } stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket); From b36c450b572b561c615f0c6501664be17290318a Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 12:02:49 -0400 Subject: [PATCH 04/10] Amend token_bucket_rw_dec to indicate which buckets became empty. --- src/common/token_bucket.c | 13 +++++++++---- src/common/token_bucket.h | 4 ++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c index 747189e75..f2396ec58 100644 --- a/src/common/token_bucket.c +++ b/src/common/token_bucket.c @@ -238,13 +238,18 @@ token_bucket_rw_dec_write(token_bucket_rw_t *bucket, /** * As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single - * operation. + * operation. Return a bitmask of TB_READ and TB_WRITE to indicate + * which buckets became empty. */ -void +int token_bucket_rw_dec(token_bucket_rw_t *bucket, ssize_t n_read, ssize_t n_written) { - token_bucket_rw_dec_read(bucket, n_read); - token_bucket_rw_dec_write(bucket, n_written); + int flags = 0; + if (token_bucket_rw_dec_read(bucket, n_read)) + flags |= TB_READ; + if (token_bucket_rw_dec_write(bucket, n_written)) + flags |= TB_WRITE; + return flags; } diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h index fb5d9fc60..0e7832e83 100644 --- a/src/common/token_bucket.h +++ b/src/common/token_bucket.h @@ -85,8 +85,8 @@ int token_bucket_rw_dec_read(token_bucket_rw_t *bucket, int token_bucket_rw_dec_write(token_bucket_rw_t *bucket, ssize_t n); -void token_bucket_rw_dec(token_bucket_rw_t *bucket, - ssize_t n_read, ssize_t n_written); +int token_bucket_rw_dec(token_bucket_rw_t *bucket, + ssize_t n_read, ssize_t n_written); static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket); static inline size_t From 9af4cd6f31dd9cbfaee526f2042aa7fa501338ef Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 12:05:39 -0400 Subject: [PATCH 05/10] Refactor responsibility for checking global write bucket emptiness We used to do this 10x per second in connection_buckets_refill(); instead, we now do it when the bucket becomes empty. This change is part of the work of making connection_buckets_refill() obsolete. Closes ticket 25828; bugfix on 0.2.3.5-alpha. --- changes/bug25828 | 7 +++++++ src/or/connection.c | 28 ++++++++++++++++------------ 2 files changed, 23 insertions(+), 12 deletions(-) create mode 100644 changes/bug25828 diff --git a/changes/bug25828 b/changes/bug25828 new file mode 100644 index 000000000..45cd1f4ae --- /dev/null +++ b/changes/bug25828 @@ -0,0 +1,7 @@ + o Minor bugfixes (bandwidth management): + - Consider ourselves "low on write bandwidth" if we have exhausted our + write bandwidth some time in the last second. This was the + documented behavior before, but the actual behavior was to change + this value every TokenBucketRefillInterval. Fixes bug 25828; bugfix on + 0.2.3.5-alpha. + diff --git a/src/or/connection.c b/src/or/connection.c index 631d665af..062c85c7f 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2814,10 +2814,10 @@ connection_is_rate_limited(connection_t *conn) return 1; } -/** Did either global write bucket run dry last second? If so, - * we are likely to run dry again this second, so be stingy with the - * tokens we just put in. */ -static int write_buckets_empty_last_second = 0; +/** When was either global write bucket last empty? If this was recent, then + * we're probably low on bandwidth, and we should be stingy with our bandwidth + * usage. */ +static time_t write_buckets_last_empty_at = -100; /** How many seconds of no active local circuits will make the * connection revert to the "relayed" bandwidth class? */ @@ -2969,8 +2969,11 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) if (smaller_bucket < attempt) return 1; /* not enough space no matter the priority */ - if (write_buckets_empty_last_second) - return 1; /* we're already hitting our limits, no more please */ + { + const time_t diff = approx_time() - write_buckets_last_empty_at; + if (diff <= 1) + return 1; /* we're already hitting our limits, no more please */ + } if (priority == 1) { /* old-style v1 query */ /* Could we handle *two* of these requests within the next two seconds? */ @@ -3042,10 +3045,15 @@ connection_buckets_decrement(connection_t *conn, time_t now, if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ + unsigned flags = 0; if (connection_counts_as_relayed_traffic(conn, now)) { - token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written); + flags = token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written); + } + flags |= token_bucket_rw_dec(&global_bucket, num_read, num_written); + + if (flags & TB_WRITE) { + write_buckets_last_empty_at = now; } - token_bucket_rw_dec(&global_bucket, num_read, num_written); if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { or_connection_t *or_conn = TO_OR_CONN(conn); token_bucket_rw_dec(&or_conn->bucket, num_read, num_written); @@ -3185,10 +3193,6 @@ connection_bucket_refill_all(time_t now, uint32_t now_ts) { smartlist_t *conns = get_connection_array(); - write_buckets_empty_last_second = - token_bucket_rw_get_write(&global_bucket) <= 0 || - token_bucket_rw_get_write(&global_relayed_bucket) <= 0; - /* refill the global buckets */ token_bucket_rw_refill(&global_bucket, now_ts); token_bucket_rw_refill(&global_relayed_bucket, now_ts); From a2acb9b9e9f1a6e21625b2d77c2e7df4e35f3599 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 12:20:06 -0400 Subject: [PATCH 06/10] Refill each token bucket at the last instant before reading/writing. (This patch does not yet eliminate the global refill callback; fortunately, bucket refilling is idempotent.) --- src/or/connection.c | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/or/connection.c b/src/or/connection.c index 062c85c7f..7dc4ecf5c 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -3186,6 +3186,34 @@ connection_bucket_adjust(const or_options_t *options) } } +/** + * Cached value of the last coarse-timestamp when we refilled the + * global buckets. + */ +static uint32_t last_refilled_global_buckets_ts=0; +/** + * Refill the token buckets for a single connection conn, and the + * global token buckets as appropriate. Requires that now_ts is + * the time in coarse timestamp units. + */ +static void +connection_bucket_refill_single(connection_t *conn, uint32_t now_ts) +{ + /* Note that we only check for equality here: the underlying + * token bucket functions can handle moving backwards in time if they + * need to. */ + if (now_ts != last_refilled_global_buckets_ts) { + token_bucket_rw_refill(&global_bucket, now_ts); + token_bucket_rw_refill(&global_relayed_bucket, now_ts); + last_refilled_global_buckets_ts = now_ts; + } + + if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { + or_connection_t *or_conn = TO_OR_CONN(conn); + token_bucket_rw_refill(&or_conn->bucket, now_ts); + } +} + /** Time has passed; increment buckets appropriately and re-enable formerly * blocked connections. */ void @@ -3196,6 +3224,7 @@ connection_bucket_refill_all(time_t now, uint32_t now_ts) /* refill the global buckets */ token_bucket_rw_refill(&global_bucket, now_ts); token_bucket_rw_refill(&global_relayed_bucket, now_ts); + last_refilled_global_buckets_ts = now_ts; /* refill the per-connection buckets */ SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { @@ -3256,6 +3285,8 @@ connection_handle_read_impl(connection_t *conn) conn->timestamp_last_read_allowed = approx_time(); + connection_bucket_refill_single(conn, monotime_coarse_get_stamp()); + switch (conn->type) { case CONN_TYPE_OR_LISTENER: return connection_handle_listener_read(conn, CONN_TYPE_OR); @@ -3690,6 +3721,8 @@ connection_handle_write_impl(connection_t *conn, int force) conn->timestamp_last_write_allowed = now; + connection_bucket_refill_single(conn, monotime_coarse_get_stamp()); + /* Sometimes, "writable" means "connected". */ if (connection_state_is_connecting(conn)) { if (getsockopt(conn->s, SOL_SOCKET, SO_ERROR, (void*)&e, &len) < 0) { From 780d1b44cf24ad8ef321b99d8fc591f110456f98 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 16:19:45 -0400 Subject: [PATCH 07/10] Move responsibility for recording read/written bytes Previously this was done as part of the refill callback, but there's no real reason to do it like that. Since we're trying to remove the refill callback completely, we can do this work as part of record_num_bytes_transferred_impl(), which already does quite a lot of this. --- src/or/connection.c | 20 +++++++++++++++++ src/or/hibernate.c | 2 +- src/or/main.c | 52 +++++++++++++++++---------------------------- src/or/main.h | 1 + 4 files changed, 41 insertions(+), 34 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index 7dc4ecf5c..d80c68046 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -85,6 +85,7 @@ #include "ext_orport.h" #include "geoip.h" #include "main.h" +#include "hibernate.h" #include "hs_common.h" #include "hs_ident.h" #include "nodelist.h" @@ -2989,6 +2990,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) return 0; } +/** When did we last tell the accounting subsystem about transmitted + * bandwidth? */ +static time_t last_recorded_accounting_at = 0; + /** Helper: adjusts our bandwidth history and informs the controller as * appropriate, given that we have just read num_read bytes and written * num_written bytes on conn. */ @@ -3019,6 +3024,20 @@ record_num_bytes_transferred_impl(connection_t *conn, } if (conn->type == CONN_TYPE_EXIT) rep_hist_note_exit_bytes(conn->port, num_written, num_read); + + /* Remember these bytes towards statistics. */ + stats_increment_bytes_read_and_written(num_read, num_written); + + /* Remember these bytes towards accounting. */ + if (accounting_is_enabled(get_options())) { + if (now > last_recorded_accounting_at && last_recorded_accounting_at) { + accounting_add_bytes(num_read, num_written, + now - last_recorded_accounting_at); + } else { + accounting_add_bytes(num_read, num_written, 0); + } + last_recorded_accounting_at = now; + } } /** We just read num_read and wrote num_written bytes @@ -5196,6 +5215,7 @@ connection_free_all(void) tor_free(last_interface_ipv4); tor_free(last_interface_ipv6); + last_recorded_accounting_at = 0; } /** Log a warning, and possibly emit a control event, that received came diff --git a/src/or/hibernate.c b/src/or/hibernate.c index 7261cf800..9fed33855 100644 --- a/src/or/hibernate.c +++ b/src/or/hibernate.c @@ -297,7 +297,7 @@ accounting_get_end_time,(void)) return interval_end_time; } -/** Called from main.c to tell us that seconds seconds have +/** Called from connection.c to tell us that seconds seconds have * passed, n_read bytes have been read, and n_written * bytes have been written. */ void diff --git a/src/or/main.c b/src/or/main.c index 3217e2d9b..66c90d9a0 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -159,13 +159,6 @@ token_bucket_rw_t global_bucket; /* Token bucket for relayed traffic. */ token_bucket_rw_t global_relayed_bucket; -/** What was the read/write bucket before the last second_elapsed_callback() - * call? (used to determine how many bytes we've read). */ -static size_t stats_prev_global_read_bucket; -/** What was the write bucket before the last second_elapsed_callback() call? - * (used to determine how many bytes we've written). */ -static size_t stats_prev_global_write_bucket; - /* DOCDOC stats_prev_n_read */ static uint64_t stats_prev_n_read = 0; /* DOCDOC stats_prev_n_written */ @@ -479,21 +472,37 @@ get_connection_array, (void)) return connection_array; } -/** Provides the traffic read and written over the life of the process. */ - +/** + * Return the amount of network traffic read, in bytes, over the life of this + * process. + */ MOCK_IMPL(uint64_t, get_bytes_read,(void)) { return stats_n_bytes_read; } -/* DOCDOC get_bytes_written */ +/** + * Return the amount of network traffic read, in bytes, over the life of this + * process. + */ MOCK_IMPL(uint64_t, get_bytes_written,(void)) { return stats_n_bytes_written; } +/** + * Increment the amount of network traffic read and written, over the life of + * this process. + */ +void +stats_increment_bytes_read_and_written(uint64_t r, uint64_t w) +{ + stats_n_bytes_read += r; + stats_n_bytes_written += w; +} + /** Set the event mask on conn to events. (The event * mask is a bitmask whose bits are READ_EVENT and WRITE_EVENT) */ @@ -2374,12 +2383,7 @@ refill_callback(periodic_timer_t *timer, void *arg) { struct timeval now; - size_t bytes_written; - size_t bytes_read; int milliseconds_elapsed = 0; - int seconds_rolled_over = 0; - - const or_options_t *options = get_options(); (void)timer; (void)arg; @@ -2392,28 +2396,13 @@ refill_callback(periodic_timer_t *timer, void *arg) if (mdiff > INT_MAX) mdiff = INT_MAX; milliseconds_elapsed = (int)mdiff; - seconds_rolled_over = (int)(now.tv_sec - - refill_timer_current_millisecond.tv_sec); } - bytes_written = stats_prev_global_write_bucket - - token_bucket_rw_get_write(&global_bucket); - bytes_read = stats_prev_global_read_bucket - - token_bucket_rw_get_read(&global_bucket); - - stats_n_bytes_read += bytes_read; - stats_n_bytes_written += bytes_written; - if (accounting_is_enabled(options) && milliseconds_elapsed >= 0) - accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over); - if (milliseconds_elapsed > 0) { connection_bucket_refill_all((time_t)now.tv_sec, monotime_coarse_get_stamp()); } - stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket); - stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket); - /* remember what time it is, for next time */ refill_timer_current_millisecond = now; } @@ -2621,8 +2610,6 @@ do_main_loop(void) /* Set up our buckets */ connection_bucket_init(); - stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket); - stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket); /* initialize the bootstrap status events to know we're starting up */ control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0); @@ -3502,7 +3489,6 @@ tor_free_all(int postfork) memset(&global_bucket, 0, sizeof(global_bucket)); memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket)); - stats_prev_global_read_bucket = stats_prev_global_write_bucket = 0; stats_prev_n_read = stats_prev_n_written = 0; stats_n_bytes_read = stats_n_bytes_written = 0; time_of_process_start = 0; diff --git a/src/or/main.h b/src/or/main.h index e50d14d4d..0d2681bda 100644 --- a/src/or/main.h +++ b/src/or/main.h @@ -28,6 +28,7 @@ int connection_is_on_closeable_list(connection_t *conn); MOCK_DECL(smartlist_t *, get_connection_array, (void)); MOCK_DECL(uint64_t,get_bytes_read,(void)); MOCK_DECL(uint64_t,get_bytes_written,(void)); +void stats_increment_bytes_read_and_written(uint64_t r, uint64_t w); /** Bitmask for events that we can turn on and off with * connection_watch_events. */ From 47df912f1c5a8783fb6edafc72b044ec0ae774ec Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 18:20:03 -0400 Subject: [PATCH 08/10] Remove the periodic refill event entirely. Now that we update our buckets on demand before reading or writing, we no longer need to update them all every TokenBucketRefillInterval msec. When a connection runs out of bandwidth, we do need a way to reenable it, however. We do this by scheduling a timer to reenable all blocked connections for TokenBucketRefillInterval msec after a connection becomes blocked. (If we were using PerConnBWRate more, it might make sense to have a per-connection timer, rather than a single timeout. But since PerConnBWRate is currently (mostly) unused, I'm going to go for the simpler approach here, since usually whenever one connection has become blocked on bandwidth, most connections are blocked on bandwidth.) Implements ticket 25373. --- changes/bug25373 | 7 +++ doc/tor.1.txt | 8 +-- src/or/connection.c | 120 ++++++++++++++++++++++++++++---------------- src/or/main.c | 53 ------------------- 4 files changed, 88 insertions(+), 100 deletions(-) create mode 100644 changes/bug25373 diff --git a/changes/bug25373 b/changes/bug25373 new file mode 100644 index 000000000..03e870e69 --- /dev/null +++ b/changes/bug25373 @@ -0,0 +1,7 @@ + o Major features (main loop, CPU wakeup): + - The bandwidth-limitation logic has been refactored so that + bandwidth calculations are performed on-demand, rather than + every TokenBucketRefillInterval milliseconds. + This change should improve the granularity of our bandwidth + calculations, and limit the number of times that the Tor process needs + to wake up when it is idle. Closes ticket 25373. diff --git a/doc/tor.1.txt b/doc/tor.1.txt index 95620a334..1be9f7091 100644 --- a/doc/tor.1.txt +++ b/doc/tor.1.txt @@ -1285,9 +1285,11 @@ The following options are useful only for clients (that is, if 2 minutes) [[TokenBucketRefillInterval]] **TokenBucketRefillInterval** __NUM__ [**msec**|**second**]:: - Set the refill interval of Tor's token bucket to NUM milliseconds. - NUM must be between 1 and 1000, inclusive. Note that the configured - bandwidth limits are still expressed in bytes per second: this + Set the refill delay interval of Tor's token bucket to NUM milliseconds. + NUM must be between 1 and 1000, inclusive. When Tor is out of bandwidth, + on a connection or globally, it will wait up to this long before it tries + to use that connection again. + Note that bandwidth limits are still expressed in bytes per second: this option only affects the frequency with which Tor checks to see whether previously exhausted connections may read again. Can not be changed while tor is running. (Default: 100 msec) diff --git a/src/or/connection.c b/src/or/connection.c index d80c68046..444294ee7 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -138,6 +138,8 @@ static const char *proxy_type_to_string(int proxy_type); static int get_proxy_type(void); const tor_addr_t *conn_get_outbound_address(sa_family_t family, const or_options_t *options, unsigned int conn_type); +static void blocked_connection_reenable_init(const or_options_t *options); +static void schedule_blocked_connection_reenable(void); /** The last addresses that our network interface seemed to have been * binding to. We use this as one way to detect when our IP changes. @@ -3091,6 +3093,7 @@ connection_read_bw_exhausted(connection_t *conn, bool is_global_bw) (void)is_global_bw; conn->read_blocked_on_bw = 1; connection_stop_reading(conn); + schedule_blocked_connection_reenable(); } /** @@ -3105,6 +3108,7 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw) (void)is_global_bw; conn->write_blocked_on_bw = 1; connection_stop_reading(conn); + schedule_blocked_connection_reenable(); } /** If we have exhausted our global buckets, or the buckets for conn, @@ -3117,7 +3121,8 @@ connection_consider_empty_read_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ - bool is_global = true; + int is_global = 1; + if (token_bucket_rw_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && @@ -3185,6 +3190,8 @@ connection_bucket_init(void) (int32_t)options->BandwidthBurst, now_ts); } + + blocked_connection_reenable_init(options); } /** Update the global connection bucket settings to a new value. */ @@ -3233,55 +3240,76 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts) } } -/** Time has passed; increment buckets appropriately and re-enable formerly - * blocked connections. */ -void -connection_bucket_refill_all(time_t now, uint32_t now_ts) +/** + * Event to re-enable all connections that were previously blocked on read or + * write. + */ +static mainloop_event_t *reenable_blocked_connections_ev = NULL; + +/** True iff reenable_blocked_connections_ev is currently scheduled. */ +static int reenable_blocked_connections_is_scheduled = 0; + +/** Delay after which to run reenable_blocked_connections_ev. */ +static struct timeval reenable_blocked_connections_delay; + +/** + * Re-enable all connections that were previously blocked on read or write. + * This event is scheduled after enough time has elapsed to be sure + * that the buckets will refill when the connections have something to do. + */ +static void +reenable_blocked_connections_cb(mainloop_event_t *ev, void *arg) { - smartlist_t *conns = get_connection_array(); - - /* refill the global buckets */ - token_bucket_rw_refill(&global_bucket, now_ts); - token_bucket_rw_refill(&global_relayed_bucket, now_ts); - last_refilled_global_buckets_ts = now_ts; - - /* refill the per-connection buckets */ - SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { - if (connection_speaks_cells(conn)) { - or_connection_t *or_conn = TO_OR_CONN(conn); - - if (conn->state == OR_CONN_STATE_OPEN) { - token_bucket_rw_refill(&or_conn->bucket, now_ts); - } - } - - if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */ - && token_bucket_rw_get_read(&global_bucket) > 0 /* and we can read */ - && (!connection_counts_as_relayed_traffic(conn, now) || - token_bucket_rw_get_read(&global_relayed_bucket) > 0) - && (!connection_speaks_cells(conn) || - conn->state != OR_CONN_STATE_OPEN || - token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) > 0)) { - /* and either a non-cell conn or a cell conn with non-empty bucket */ - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "waking up conn (fd %d) for read", (int)conn->s)); - conn->read_blocked_on_bw = 0; + (void)ev; + (void)arg; + SMARTLIST_FOREACH_BEGIN(get_connection_array(), connection_t *, conn) { + if (conn->read_blocked_on_bw == 1) { connection_start_reading(conn); + conn->read_blocked_on_bw = 0; } - - if (conn->write_blocked_on_bw == 1 - && token_bucket_rw_get_write(&global_bucket) > 0 /* and we can write */ - && (!connection_counts_as_relayed_traffic(conn, now) || - token_bucket_rw_get_write(&global_relayed_bucket) > 0) - && (!connection_speaks_cells(conn) || - conn->state != OR_CONN_STATE_OPEN || - token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) > 0)) { - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, - "waking up conn (fd %d) for write", (int)conn->s)); - conn->write_blocked_on_bw = 0; + if (conn->write_blocked_on_bw == 1) { connection_start_writing(conn); + conn->write_blocked_on_bw = 0; } } SMARTLIST_FOREACH_END(conn); + + reenable_blocked_connections_is_scheduled = 0; +} + +/** + * Initialize the mainloop event that we use to wake up connections that + * find themselves blocked on bandwidth. + */ +static void +blocked_connection_reenable_init(const or_options_t *options) +{ + if (! reenable_blocked_connections_ev) { + reenable_blocked_connections_ev = + mainloop_event_new(reenable_blocked_connections_cb, NULL); + reenable_blocked_connections_is_scheduled = 0; + } + time_t sec = options->TokenBucketRefillInterval / 1000; + int msec = (options->TokenBucketRefillInterval % 1000); + reenable_blocked_connections_delay.tv_sec = sec; + reenable_blocked_connections_delay.tv_usec = msec * 1000; +} + +/** + * Called when we have blocked a connection for being low on bandwidth: + * schedule an event to reenable such connections, if it is not already + * scheduled. + */ +static void +schedule_blocked_connection_reenable(void) +{ + if (reenable_blocked_connections_is_scheduled) + return; + if (BUG(reenable_blocked_connections_ev == NULL)) { + blocked_connection_reenable_init(get_options()); + } + mainloop_event_schedule(reenable_blocked_connections_ev, + &reenable_blocked_connections_delay); + reenable_blocked_connections_is_scheduled = 1; } /** Read bytes from conn-\>s and process them. @@ -5216,6 +5244,10 @@ connection_free_all(void) tor_free(last_interface_ipv4); tor_free(last_interface_ipv6); last_recorded_accounting_at = 0; + + mainloop_event_free(reenable_blocked_connections_ev); + reenable_blocked_connections_is_scheduled = 0; + memset(&reenable_blocked_connections_delay, 0, sizeof(struct timeval)); } /** Log a warning, and possibly emit a control event, that received came diff --git a/src/or/main.c b/src/or/main.c index 66c90d9a0..40ca8e059 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -2370,43 +2370,6 @@ systemd_watchdog_callback(periodic_timer_t *timer, void *arg) } #endif /* defined(HAVE_SYSTEMD_209) */ -/** Timer: used to invoke refill_callback(). */ -static periodic_timer_t *refill_timer = NULL; - -/** Millisecond when refall_callback was last invoked. */ -static struct timeval refill_timer_current_millisecond; - -/** Libevent callback: invoked periodically to refill token buckets - * and count r/w bytes. */ -static void -refill_callback(periodic_timer_t *timer, void *arg) -{ - struct timeval now; - - int milliseconds_elapsed = 0; - - (void)timer; - (void)arg; - - tor_gettimeofday(&now); - - /* If this is our first time, no time has passed. */ - if (refill_timer_current_millisecond.tv_sec) { - long mdiff = tv_mdiff(&refill_timer_current_millisecond, &now); - if (mdiff > INT_MAX) - mdiff = INT_MAX; - milliseconds_elapsed = (int)mdiff; - } - - if (milliseconds_elapsed > 0) { - connection_bucket_refill_all((time_t)now.tv_sec, - monotime_coarse_get_stamp()); - } - - /* remember what time it is, for next time */ - refill_timer_current_millisecond = now; -} - #ifndef _WIN32 /** Called when a possibly ignorable libevent error occurs; ensures that we * don't get into an infinite loop by ignoring too many errors from @@ -2707,20 +2670,6 @@ do_main_loop(void) } #endif /* defined(HAVE_SYSTEMD_209) */ - if (!refill_timer) { - struct timeval refill_interval; - int msecs = get_options()->TokenBucketRefillInterval; - - refill_interval.tv_sec = msecs/1000; - refill_interval.tv_usec = (msecs%1000)*1000; - - refill_timer = periodic_timer_new(tor_libevent_get_base(), - &refill_interval, - refill_callback, - NULL); - tor_assert(refill_timer); - } - #ifdef HAVE_SYSTEMD { const int r = sd_notify(0, "READY=1"); @@ -3477,7 +3426,6 @@ tor_free_all(int postfork) smartlist_free(active_linked_connection_lst); periodic_timer_free(second_timer); teardown_periodic_events(); - periodic_timer_free(refill_timer); tor_event_free(shutdown_did_not_work_event); tor_event_free(initialize_periodic_events_event); mainloop_event_free(directory_all_unreachable_cb_event); @@ -3505,7 +3453,6 @@ tor_free_all(int postfork) heartbeat_callback_first_time = 1; n_libevent_errors = 0; current_second = 0; - memset(&refill_timer_current_millisecond, 0, sizeof(struct timeval)); if (!postfork) { release_lockfile(); From 087ace7009feff4259c5a150a38734c0ded90e80 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 17 Apr 2018 18:41:39 -0400 Subject: [PATCH 09/10] Fix a compilation warning on clang --- src/or/connection.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/or/connection.c b/src/or/connection.c index 444294ee7..b1a825f80 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -3034,7 +3034,7 @@ record_num_bytes_transferred_impl(connection_t *conn, if (accounting_is_enabled(get_options())) { if (now > last_recorded_accounting_at && last_recorded_accounting_at) { accounting_add_bytes(num_read, num_written, - now - last_recorded_accounting_at); + (int)(now - last_recorded_accounting_at)); } else { accounting_add_bytes(num_read, num_written, 0); } From bd3f8260a342b9eefc6fc164dc945605cf1ab0d1 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 18 Apr 2018 11:45:44 -0400 Subject: [PATCH 10/10] Rename some functions to start with a uniform prefix --- src/or/connection.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index b1a825f80..addf29ac5 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -1,4 +1,4 @@ - /* Copyright (c) 2001 Matej Pfajfar. +/* Copyright (c) 2001 Matej Pfajfar. * Copyright (c) 2001-2004, Roger Dingledine. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. * Copyright (c) 2007-2017, The Tor Project, Inc. */ @@ -138,8 +138,8 @@ static const char *proxy_type_to_string(int proxy_type); static int get_proxy_type(void); const tor_addr_t *conn_get_outbound_address(sa_family_t family, const or_options_t *options, unsigned int conn_type); -static void blocked_connection_reenable_init(const or_options_t *options); -static void schedule_blocked_connection_reenable(void); +static void reenable_blocked_connection_init(const or_options_t *options); +static void reenable_blocked_connection_schedule(void); /** The last addresses that our network interface seemed to have been * binding to. We use this as one way to detect when our IP changes. @@ -3093,7 +3093,7 @@ connection_read_bw_exhausted(connection_t *conn, bool is_global_bw) (void)is_global_bw; conn->read_blocked_on_bw = 1; connection_stop_reading(conn); - schedule_blocked_connection_reenable(); + reenable_blocked_connection_schedule(); } /** @@ -3108,7 +3108,7 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw) (void)is_global_bw; conn->write_blocked_on_bw = 1; connection_stop_reading(conn); - schedule_blocked_connection_reenable(); + reenable_blocked_connection_schedule(); } /** If we have exhausted our global buckets, or the buckets for conn, @@ -3191,7 +3191,7 @@ connection_bucket_init(void) now_ts); } - blocked_connection_reenable_init(options); + reenable_blocked_connection_init(options); } /** Update the global connection bucket settings to a new value. */ @@ -3281,7 +3281,7 @@ reenable_blocked_connections_cb(mainloop_event_t *ev, void *arg) * find themselves blocked on bandwidth. */ static void -blocked_connection_reenable_init(const or_options_t *options) +reenable_blocked_connection_init(const or_options_t *options) { if (! reenable_blocked_connections_ev) { reenable_blocked_connections_ev = @@ -3300,12 +3300,12 @@ blocked_connection_reenable_init(const or_options_t *options) * scheduled. */ static void -schedule_blocked_connection_reenable(void) +reenable_blocked_connection_schedule(void) { if (reenable_blocked_connections_is_scheduled) return; if (BUG(reenable_blocked_connections_ev == NULL)) { - blocked_connection_reenable_init(get_options()); + reenable_blocked_connection_init(get_options()); } mainloop_event_schedule(reenable_blocked_connections_ev, &reenable_blocked_connections_delay);