From 16f08de0fd85b9fe8ace9f4905190fa6dc27e4ea Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 12:16:21 -0400 Subject: [PATCH 01/11] Remove TestingEnableTbEmptyEvent This option was used for shadow testing previously, but is no longer used for anything. It interferes with refactoring our token buckets. --- changes/ticket25760 | 5 ++ doc/tor.1.txt | 6 -- src/or/config.c | 9 +- src/or/connection.c | 131 ------------------------------ src/or/connection.h | 7 -- src/or/control.c | 23 ------ src/or/control.h | 5 +- src/or/or.h | 10 --- src/test/test_controller_events.c | 75 ----------------- src/test/test_options.c | 10 --- 10 files changed, 7 insertions(+), 274 deletions(-) create mode 100644 changes/ticket25760 diff --git a/changes/ticket25760 b/changes/ticket25760 new file mode 100644 index 000000000..504fd60de --- /dev/null +++ b/changes/ticket25760 @@ -0,0 +1,5 @@ + o Removed features: + - The TestingEnableTbEmptyEvent option has been removed. It was used + in testing simulations to measure how often connection buckets were + emptied, in order to improve our scheduling, but it has not + been actively used in years. Closes ticket 25760. diff --git a/doc/tor.1.txt b/doc/tor.1.txt index 5f31344f6..c044a765f 100644 --- a/doc/tor.1.txt +++ b/doc/tor.1.txt @@ -2896,7 +2896,6 @@ The following options are used for running a testing Tor network. TestingDirConnectionMaxStall 30 seconds TestingEnableConnBwEvent 1 TestingEnableCellStatsEvent 1 - TestingEnableTbEmptyEvent 1 [[TestingV3AuthInitialVotingInterval]] **TestingV3AuthInitialVotingInterval** __N__ **minutes**|**hours**:: Like V3AuthVotingInterval, but for initial voting interval before the first @@ -3034,11 +3033,6 @@ The following options are used for running a testing Tor network. events. Changing this requires that **TestingTorNetwork** is set. (Default: 0) -[[TestingEnableTbEmptyEvent]] **TestingEnableTbEmptyEvent** **0**|**1**:: - If this option is set, then Tor controllers may register for TB_EMPTY - events. Changing this requires that **TestingTorNetwork** is set. - (Default: 0) - [[TestingMinExitFlagThreshold]] **TestingMinExitFlagThreshold** __N__ **KBytes**|**MBytes**|**GBytes**|**TBytes**|**KBits**|**MBits**|**GBits**|**TBits**:: Sets a lower-bound for assigning an exit flag when running as an authority on a testing network. Overrides the usual default lower bound diff --git a/src/or/config.c b/src/or/config.c index 3c3fd46e6..71f8528b6 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -334,7 +334,7 @@ static config_var_t option_vars_[] = { V(DownloadExtraInfo, BOOL, "0"), V(TestingEnableConnBwEvent, BOOL, "0"), V(TestingEnableCellStatsEvent, BOOL, "0"), - V(TestingEnableTbEmptyEvent, BOOL, "0"), + OBSOLETE("TestingEnableTbEmptyEvent"), V(EnforceDistinctSubnets, BOOL, "1"), V(EntryNodes, ROUTERSET, NULL), V(EntryStatistics, BOOL, "0"), @@ -704,7 +704,6 @@ static const config_var_t testing_tor_network_defaults[] = { V(TestingDirConnectionMaxStall, INTERVAL, "30 seconds"), V(TestingEnableConnBwEvent, BOOL, "1"), V(TestingEnableCellStatsEvent, BOOL, "1"), - V(TestingEnableTbEmptyEvent, BOOL, "1"), VAR("___UsingTestNetworkDefaults", BOOL, UsingTestNetworkDefaults_, "1"), V(RendPostPeriod, INTERVAL, "2 minutes"), @@ -4499,12 +4498,6 @@ options_validate(or_options_t *old_options, or_options_t *options, "Tor networks!"); } - if (options->TestingEnableTbEmptyEvent && - !options->TestingTorNetwork && !options->UsingTestNetworkDefaults_) { - REJECT("TestingEnableTbEmptyEvent may only be changed in testing " - "Tor networks!"); - } - if (options->TestingTorNetwork) { log_warn(LD_CONFIG, "TestingTorNetwork is set. This will make your node " "almost unusable in the public Tor network, and is " diff --git a/src/or/connection.c b/src/or/connection.c index 5532551cf..e2fd196dd 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -3019,57 +3019,6 @@ record_num_bytes_transferred_impl(connection_t *conn, rep_hist_note_exit_bytes(conn->port, num_written, num_read); } -/** Helper: convert given tvnow time value to milliseconds since - * midnight. */ -static uint32_t -msec_since_midnight(const struct timeval *tvnow) -{ - return (uint32_t)(((tvnow->tv_sec % 86400L) * 1000L) + - ((uint32_t)tvnow->tv_usec / (uint32_t)1000L)); -} - -/** Helper: return the time in milliseconds since last_empty_time - * when a bucket ran empty that previously had tokens_before tokens - * now has tokens_after tokens after refilling at timestamp - * tvnow, capped at milliseconds_elapsed milliseconds since - * last refilling that bucket. Return 0 if the bucket has not been empty - * since the last refill or has not been refilled. */ -uint32_t -bucket_millis_empty(int tokens_before, uint32_t last_empty_time, - int tokens_after, int milliseconds_elapsed, - const struct timeval *tvnow) -{ - uint32_t result = 0, refilled; - if (tokens_before <= 0 && tokens_after > tokens_before) { - refilled = msec_since_midnight(tvnow); - result = (uint32_t)((refilled + 86400L * 1000L - last_empty_time) % - (86400L * 1000L)); - if (result > (uint32_t)milliseconds_elapsed) - result = (uint32_t)milliseconds_elapsed; - } - return result; -} - -/** Check if a bucket which had tokens_before tokens and which got - * tokens_removed tokens removed at timestamp tvnow has run - * out of tokens, and if so, note the milliseconds since midnight in - * timestamp_var for the next TB_EMPTY event. */ -void -connection_buckets_note_empty_ts(uint32_t *timestamp_var, - int tokens_before, size_t tokens_removed, - const struct timeval *tvnow) -{ - if (tokens_before > 0 && (uint32_t)tokens_before <= tokens_removed) - *timestamp_var = msec_since_midnight(tvnow); -} - -/** Last time at which the global or relay buckets were emptied in msec - * since midnight. */ -static uint32_t global_relayed_read_emptied = 0, - global_relayed_write_emptied = 0, - global_read_emptied = 0, - global_write_emptied = 0; - /** We just read num_read and wrote num_written bytes * onto conn. Decrement buckets appropriately. */ static void @@ -3094,30 +3043,6 @@ connection_buckets_decrement(connection_t *conn, time_t now, if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ - /* If one or more of our token buckets ran dry just now, note the - * timestamp for TB_EMPTY events. */ - if (get_options()->TestingEnableTbEmptyEvent) { - struct timeval tvnow; - tor_gettimeofday_cached(&tvnow); - if (connection_counts_as_relayed_traffic(conn, now)) { - connection_buckets_note_empty_ts(&global_relayed_read_emptied, - global_relayed_read_bucket, num_read, &tvnow); - connection_buckets_note_empty_ts(&global_relayed_write_emptied, - global_relayed_write_bucket, num_written, &tvnow); - } - connection_buckets_note_empty_ts(&global_read_emptied, - global_read_bucket, num_read, &tvnow); - connection_buckets_note_empty_ts(&global_write_emptied, - global_write_bucket, num_written, &tvnow); - if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - or_connection_t *or_conn = TO_OR_CONN(conn); - connection_buckets_note_empty_ts(&or_conn->read_emptied_time, - or_conn->read_bucket, num_read, &tvnow); - connection_buckets_note_empty_ts(&or_conn->write_emptied_time, - or_conn->write_bucket, num_written, &tvnow); - } - } - if (connection_counts_as_relayed_traffic(conn, now)) { global_relayed_read_bucket -= (int)num_read; global_relayed_write_bucket -= (int)num_written; @@ -3238,12 +3163,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) smartlist_t *conns = get_connection_array(); int bandwidthrate, bandwidthburst, relayrate, relayburst; - int prev_global_read = global_read_bucket; - int prev_global_write = global_write_bucket; - int prev_relay_read = global_relayed_read_bucket; - int prev_relay_write = global_relayed_write_bucket; - struct timeval tvnow; /*< Only used if TB_EMPTY events are enabled. */ - bandwidthrate = (int)options->BandwidthRate; bandwidthburst = (int)options->BandwidthBurst; @@ -3278,32 +3197,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) milliseconds_elapsed, "global_relayed_write_bucket"); - /* If buckets were empty before and have now been refilled, tell any - * interested controllers. */ - if (get_options()->TestingEnableTbEmptyEvent) { - uint32_t global_read_empty_time, global_write_empty_time, - relay_read_empty_time, relay_write_empty_time; - tor_gettimeofday_cached(&tvnow); - global_read_empty_time = bucket_millis_empty(prev_global_read, - global_read_emptied, global_read_bucket, - milliseconds_elapsed, &tvnow); - global_write_empty_time = bucket_millis_empty(prev_global_write, - global_write_emptied, global_write_bucket, - milliseconds_elapsed, &tvnow); - control_event_tb_empty("GLOBAL", global_read_empty_time, - global_write_empty_time, milliseconds_elapsed); - relay_read_empty_time = bucket_millis_empty(prev_relay_read, - global_relayed_read_emptied, - global_relayed_read_bucket, - milliseconds_elapsed, &tvnow); - relay_write_empty_time = bucket_millis_empty(prev_relay_write, - global_relayed_write_emptied, - global_relayed_write_bucket, - milliseconds_elapsed, &tvnow); - control_event_tb_empty("RELAY", relay_read_empty_time, - relay_write_empty_time, milliseconds_elapsed); - } - /* refill the per-connection buckets */ SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { if (connection_speaks_cells(conn)) { @@ -3311,9 +3204,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) int orbandwidthrate = or_conn->bandwidthrate; int orbandwidthburst = or_conn->bandwidthburst; - int prev_conn_read = or_conn->read_bucket; - int prev_conn_write = or_conn->write_bucket; - if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) { connection_bucket_refill_helper(&or_conn->read_bucket, orbandwidthrate, @@ -3328,27 +3218,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) milliseconds_elapsed, "or_conn->write_bucket"); } - - /* If buckets were empty before and have now been refilled, tell any - * interested controllers. */ - if (get_options()->TestingEnableTbEmptyEvent) { - char *bucket; - uint32_t conn_read_empty_time, conn_write_empty_time; - tor_asprintf(&bucket, "ORCONN ID="U64_FORMAT, - U64_PRINTF_ARG(or_conn->base_.global_identifier)); - conn_read_empty_time = bucket_millis_empty(prev_conn_read, - or_conn->read_emptied_time, - or_conn->read_bucket, - milliseconds_elapsed, &tvnow); - conn_write_empty_time = bucket_millis_empty(prev_conn_write, - or_conn->write_emptied_time, - or_conn->write_bucket, - milliseconds_elapsed, &tvnow); - control_event_tb_empty(bucket, conn_read_empty_time, - conn_write_empty_time, - milliseconds_elapsed); - tor_free(bucket); - } } if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */ diff --git a/src/or/connection.h b/src/or/connection.h index 6bc5a7cfd..930e5f81a 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -272,13 +272,6 @@ void connection_check_oos(int n_socks, int failed); STATIC void connection_free_minimal(connection_t *conn); /* Used only by connection.c and test*.c */ -uint32_t bucket_millis_empty(int tokens_before, uint32_t last_empty_time, - int tokens_after, int milliseconds_elapsed, - const struct timeval *tvnow); -void connection_buckets_note_empty_ts(uint32_t *timestamp_var, - int tokens_before, - size_t tokens_removed, - const struct timeval *tvnow); MOCK_DECL(STATIC int,connection_connect_sockaddr, (connection_t *conn, const struct sockaddr *sa, diff --git a/src/or/control.c b/src/or/control.c index 5a2fae64e..0539ddaca 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -1214,7 +1214,6 @@ static const struct control_event_t control_event_table[] = { { EVENT_CONF_CHANGED, "CONF_CHANGED"}, { EVENT_CONN_BW, "CONN_BW" }, { EVENT_CELL_STATS, "CELL_STATS" }, - { EVENT_TB_EMPTY, "TB_EMPTY" }, { EVENT_CIRC_BANDWIDTH_USED, "CIRC_BW" }, { EVENT_TRANSPORT_LAUNCHED, "TRANSPORT_LAUNCHED" }, { EVENT_HS_DESC, "HS_DESC" }, @@ -6077,28 +6076,6 @@ control_event_circuit_cell_stats(void) return 0; } -/** Tokens in bucket have been refilled: the read bucket was empty - * for read_empty_time millis, the write bucket was empty for - * write_empty_time millis, and buckets were last refilled - * milliseconds_elapsed millis ago. Only emit TB_EMPTY event if - * either read or write bucket have been empty before. */ -int -control_event_tb_empty(const char *bucket, uint32_t read_empty_time, - uint32_t write_empty_time, - int milliseconds_elapsed) -{ - if (get_options()->TestingEnableTbEmptyEvent && - EVENT_IS_INTERESTING(EVENT_TB_EMPTY) && - (read_empty_time > 0 || write_empty_time > 0)) { - send_control_event(EVENT_TB_EMPTY, - "650 TB_EMPTY %s READ=%d WRITTEN=%d " - "LAST=%d\r\n", - bucket, read_empty_time, write_empty_time, - milliseconds_elapsed); - } - return 0; -} - /* about 5 minutes worth. */ #define N_BW_EVENTS_TO_CACHE 300 /* Index into cached_bw_events to next write. */ diff --git a/src/or/control.h b/src/or/control.h index 28ffeaed8..2fd3c553f 100644 --- a/src/or/control.h +++ b/src/or/control.h @@ -59,9 +59,6 @@ int control_event_circ_bandwidth_used(void); int control_event_conn_bandwidth(connection_t *conn); int control_event_conn_bandwidth_used(void); int control_event_circuit_cell_stats(void); -int control_event_tb_empty(const char *bucket, uint32_t read_empty_time, - uint32_t write_empty_time, - int milliseconds_elapsed); void control_event_logmsg(int severity, uint32_t domain, const char *msg); int control_event_descriptors_changed(smartlist_t *routers); int control_event_address_mapped(const char *from, const char *to, @@ -194,7 +191,7 @@ void control_free_all(void); #define EVENT_CONF_CHANGED 0x0019 #define EVENT_CONN_BW 0x001A #define EVENT_CELL_STATS 0x001B -#define EVENT_TB_EMPTY 0x001C +/* UNUSED : 0x001C */ #define EVENT_CIRC_BANDWIDTH_USED 0x001D #define EVENT_TRANSPORT_LAUNCHED 0x0020 #define EVENT_HS_DESC 0x0021 diff --git a/src/or/or.h b/src/or/or.h index 25ad35175..bcce33755 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1660,13 +1660,6 @@ typedef struct or_connection_t { * bandwidthburst. (OPEN ORs only) */ int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */ - /** Last emptied read token bucket in msec since midnight; only used if - * TB_EMPTY events are enabled. */ - uint32_t read_emptied_time; - /** Last emptied write token bucket in msec since midnight; only used if - * TB_EMPTY events are enabled. */ - uint32_t write_emptied_time; - /* * Count the number of bytes flushed out on this orconn, and the number of * bytes TLS actually sent - used for overhead estimation for scheduling. @@ -4427,9 +4420,6 @@ typedef struct { /** Enable CELL_STATS events. Only altered on testing networks. */ int TestingEnableCellStatsEvent; - /** Enable TB_EMPTY events. Only altered on testing networks. */ - int TestingEnableTbEmptyEvent; - /** If true, and we have GeoIP data, and we're a bridge, keep a per-country * count of how many client addresses have contacted us so that we can help * the bridge authority guess which countries have blocked access to us. */ diff --git a/src/test/test_controller_events.c b/src/test/test_controller_events.c index 901ad7ab3..e81aea8d6 100644 --- a/src/test/test_controller_events.c +++ b/src/test/test_controller_events.c @@ -11,79 +11,6 @@ #include "control.h" #include "test.h" -static void -help_test_bucket_note_empty(uint32_t expected_msec_since_midnight, - int tokens_before, size_t tokens_removed, - uint32_t msec_since_epoch) -{ - uint32_t timestamp_var = 0; - struct timeval tvnow; - tvnow.tv_sec = msec_since_epoch / 1000; - tvnow.tv_usec = (msec_since_epoch % 1000) * 1000; - connection_buckets_note_empty_ts(×tamp_var, tokens_before, - tokens_removed, &tvnow); - tt_int_op(expected_msec_since_midnight, OP_EQ, timestamp_var); - - done: - ; -} - -static void -test_cntev_bucket_note_empty(void *arg) -{ - (void)arg; - - /* Two cases with nothing to note, because bucket was empty before; - * 86442200 == 1970-01-02 00:00:42.200000 */ - help_test_bucket_note_empty(0, 0, 0, 86442200); - help_test_bucket_note_empty(0, -100, 100, 86442200); - - /* Nothing to note, because bucket has not been emptied. */ - help_test_bucket_note_empty(0, 101, 100, 86442200); - - /* Bucket was emptied, note 42200 msec since midnight. */ - help_test_bucket_note_empty(42200, 101, 101, 86442200); - help_test_bucket_note_empty(42200, 101, 102, 86442200); -} - -static void -test_cntev_bucket_millis_empty(void *arg) -{ - struct timeval tvnow; - (void)arg; - - /* 1970-01-02 00:00:42.200000 */ - tvnow.tv_sec = 86400 + 42; - tvnow.tv_usec = 200000; - - /* Bucket has not been refilled. */ - tt_int_op(0, OP_EQ, bucket_millis_empty(0, 42120, 0, 100, &tvnow)); - tt_int_op(0, OP_EQ, bucket_millis_empty(-10, 42120, -10, 100, &tvnow)); - - /* Bucket was not empty. */ - tt_int_op(0, OP_EQ, bucket_millis_empty(10, 42120, 20, 100, &tvnow)); - - /* Bucket has been emptied 80 msec ago and has just been refilled. */ - tt_int_op(80, OP_EQ, bucket_millis_empty(-20, 42120, -10, 100, &tvnow)); - tt_int_op(80, OP_EQ, bucket_millis_empty(-10, 42120, 0, 100, &tvnow)); - tt_int_op(80, OP_EQ, bucket_millis_empty(0, 42120, 10, 100, &tvnow)); - - /* Bucket has been emptied 180 msec ago, last refill was 100 msec ago - * which was insufficient to make it positive, so cap msec at 100. */ - tt_int_op(100, OP_EQ, bucket_millis_empty(0, 42020, 1, 100, &tvnow)); - - /* 1970-01-02 00:00:00:050000 */ - tvnow.tv_sec = 86400; - tvnow.tv_usec = 50000; - - /* Last emptied 30 msec before midnight, tvnow is 50 msec after - * midnight, that's 80 msec in total. */ - tt_int_op(80, OP_EQ, bucket_millis_empty(0, 86400000 - 30, 1, 100, &tvnow)); - - done: - ; -} - static void add_testing_cell_stats_entry(circuit_t *circ, uint8_t command, unsigned int waiting_time, @@ -395,8 +322,6 @@ test_cntev_event_mask(void *arg) { #name, test_cntev_ ## name, flags, 0, NULL } struct testcase_t controller_event_tests[] = { - TEST(bucket_note_empty, TT_FORK), - TEST(bucket_millis_empty, TT_FORK), TEST(sum_up_cell_stats, TT_FORK), TEST(append_cell_stats, TT_FORK), TEST(format_cell_stats, TT_FORK), diff --git a/src/test/test_options.c b/src/test/test_options.c index eaf503439..0a04e8e40 100644 --- a/src/test/test_options.c +++ b/src/test/test_options.c @@ -4132,16 +4132,6 @@ test_options_validate__testing_options(void *ignored) tt_assert(!msg); tor_free(msg); - free_options_test_data(tdata); - tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES - "TestingEnableTbEmptyEvent 1\n" - ); - ret = options_validate(tdata->old_opt, tdata->opt, tdata->def_opt, 0, &msg); - tt_int_op(ret, OP_EQ, -1); - tt_str_op(msg, OP_EQ, "TestingEnableTbEmptyEvent may only be changed " - "in testing Tor networks!"); - tor_free(msg); - free_options_test_data(tdata); tdata = get_options_test_data(TEST_OPTIONS_DEFAULT_VALUES "TestingEnableTbEmptyEvent 1\n" From d8ef9a2d1e0701073d8209115bc3c34857c2d7b4 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 10:47:11 -0400 Subject: [PATCH 02/11] Expose a function that computes stamp units from msec. (It turns out we can't just expose STAMP_TICKS_PER_SECOND, since Apple doesn't have that.) --- src/common/compat_time.c | 13 +++++++++++++ src/common/compat_time.h | 1 + src/test/test_util.c | 7 +++++++ 3 files changed, 21 insertions(+) diff --git a/src/common/compat_time.c b/src/common/compat_time.c index 183a60a48..b940447b6 100644 --- a/src/common/compat_time.c +++ b/src/common/compat_time.c @@ -830,11 +830,24 @@ monotime_coarse_stamp_units_to_approx_msec(uint64_t units) return (abstime_diff * mach_time_info.numer) / (mach_time_info.denom * ONE_MILLION); } +uint64_t +monotime_msec_to_approx_coarse_stamp_units(uint64_t msec) +{ + uint64_t abstime_val = + (((uint64_t)msec) * ONE_MILLION * mach_time_info.denom) / + mach_time_info.numer; + return abstime_val >> monotime_shift; +} #else uint64_t monotime_coarse_stamp_units_to_approx_msec(uint64_t units) { return (units * 1000) / STAMP_TICKS_PER_SECOND; } +uint64_t +monotime_msec_to_approx_coarse_stamp_units(uint64_t msec) +{ + return (msec * STAMP_TICKS_PER_SECOND) / 1000; +} #endif diff --git a/src/common/compat_time.h b/src/common/compat_time.h index 6ddd11883..75b57f6f2 100644 --- a/src/common/compat_time.h +++ b/src/common/compat_time.h @@ -150,6 +150,7 @@ uint32_t monotime_coarse_to_stamp(const monotime_coarse_t *t); * into an approximate number of milliseconds. */ uint64_t monotime_coarse_stamp_units_to_approx_msec(uint64_t units); +uint64_t monotime_msec_to_approx_coarse_stamp_units(uint64_t msec); uint32_t monotime_coarse_get_stamp(void); #if defined(MONOTIME_COARSE_TYPE_IS_DIFFERENT) diff --git a/src/test/test_util.c b/src/test/test_util.c index ce8567d9a..3dd2b51a3 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -5907,6 +5907,13 @@ test_util_monotonic_time(void *arg) tt_u64_op(coarse_stamp_diff, OP_GE, 120); tt_u64_op(coarse_stamp_diff, OP_LE, 1200); + { + uint64_t units = monotime_msec_to_approx_coarse_stamp_units(5000); + uint64_t ms = monotime_coarse_stamp_units_to_approx_msec(units); + tt_int_op(ms, OP_GE, 4950); + tt_int_op(ms, OP_LT, 5050); + } + done: ; } From c376200f6a77b2509928bc08d2aa1245028cec30 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 11:23:14 -0400 Subject: [PATCH 03/11] Add a new token-bucket backend abstraction, with tests This differs from our previous token bucket abstraction in a few ways: 1) It is an abstraction, and not a collection of fields. 2) It is meant to be used with monotonic timestamps, which should produce better results than calling gettimeofday over and over. --- src/common/include.am | 2 + src/common/token_bucket.c | 180 ++++++++++++++++++++++++++++++++++ src/common/token_bucket.h | 72 ++++++++++++++ src/test/include.am | 1 + src/test/test.c | 1 + src/test/test.h | 1 + src/test/test_bwmgt.c | 199 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 456 insertions(+) create mode 100644 src/common/token_bucket.c create mode 100644 src/common/token_bucket.h create mode 100644 src/test/test_bwmgt.c diff --git a/src/common/include.am b/src/common/include.am index 73c51ff0b..87ab9d79e 100644 --- a/src/common/include.am +++ b/src/common/include.am @@ -97,6 +97,7 @@ LIBOR_A_SRC = \ src/common/util_process.c \ src/common/sandbox.c \ src/common/storagedir.c \ + src/common/token_bucket.c \ src/common/workqueue.c \ $(libor_extra_source) \ $(threads_impl_source) \ @@ -184,6 +185,7 @@ COMMONHEADERS = \ src/common/storagedir.h \ src/common/testsupport.h \ src/common/timers.h \ + src/common/token_bucket.h \ src/common/torint.h \ src/common/torlog.h \ src/common/tortls.h \ diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c new file mode 100644 index 000000000..f4d2cccff --- /dev/null +++ b/src/common/token_bucket.c @@ -0,0 +1,180 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket.c + * \brief Functions to use and manipulate token buckets, used for + * rate-limiting on connections and globally. + * + * Tor uses these token buckets to keep track of bandwidth usage, and + * sometimes other things too. + * + * The time units we use internally are based on "timestamp" units -- see + * monotime_coarse_to_stamp() for a rationale. + * + * Token buckets may become negative. + **/ + +#define TOKEN_BUCKET_PRIVATE + +#include "token_bucket.h" +#include "util_bug.h" + +/** Convert a rate in bytes per second to a rate in bytes per step */ +static uint32_t +rate_per_sec_to_rate_per_step(uint32_t rate) +{ + /* + The precise calculation we'd want to do is + + (rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize + rounding error, we do it this way instead, and divide last. + */ + return (uint32_t) + monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000; +} + +/** + * Initialize a token bucket in *bucket, set up to allow rate + * bytes per second, with a maximum burst of burst bytes. The bucket + * is created such that now_ts is the current timestamp. The bucket + * starts out full. + */ +void +token_bucket_init(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts) +{ + memset(bucket, 0, sizeof(token_bucket_t)); + token_bucket_adjust(bucket, rate, burst); + token_bucket_reset(bucket, now_ts); +} + +/** + * Change the configured rate (in bytes per second) and burst (in bytes) + * for the token bucket in *bucket. + */ +void +token_bucket_adjust(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst) +{ + tor_assert_nonfatal(rate > 0); + tor_assert_nonfatal(burst > 0); + if (burst > TOKEN_BUCKET_MAX_BURST) + burst = TOKEN_BUCKET_MAX_BURST; + + bucket->rate = rate_per_sec_to_rate_per_step(rate); + bucket->burst = burst; + bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst); + bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst); +} + +/** + * Reset bucket to be full, as of timestamp now_ts. + */ +void +token_bucket_reset(token_bucket_t *bucket, + uint32_t now_ts) +{ + bucket->read_bucket = bucket->burst; + bucket->write_bucket = bucket->burst; + bucket->last_refilled_at_ts = now_ts; +} + +/* Helper: see token_bucket_refill */ +static int +refill_single_bucket(int32_t *bucketptr, + const uint32_t rate, + const int32_t burst, + const uint32_t elapsed_steps) +{ + const int was_empty = (*bucketptr <= 0); + /* The casts here prevent an underflow. + * + * Note that even if the bucket value is negative, subtracting it from + * "burst" will still produce a correct result. If this result is + * ridiculously high, then the "elapsed_steps > gap / rate" check below + * should catch it. */ + const size_t gap = ((size_t)burst) - ((size_t)*bucketptr); + + if (elapsed_steps > gap / rate) { + *bucketptr = burst; + } else { + *bucketptr += rate * elapsed_steps; + } + + return was_empty && *bucketptr > 0; +} + +/** + * Refill bucket as appropriate, given that the current timestamp + * is now_ts. + * + * Return a bitmask containing TB_READ iff read bucket was empty and became + * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty. + */ +int +token_bucket_refill(token_bucket_t *bucket, + uint32_t now_ts) +{ + const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts); + const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP; + + if (!elapsed_steps) { + /* Note that if less than one whole step elapsed, we don't advance the + * time in last_refilled_at_ts. That's intentional: we want to make sure + * that we add some bytes to it eventually. */ + return 0; + } + + int flags = 0; + if (refill_single_bucket(&bucket->read_bucket, + bucket->rate, bucket->burst, elapsed_steps)) + flags |= TB_READ; + if (refill_single_bucket(&bucket->write_bucket, + bucket->rate, bucket->burst, elapsed_steps)) + flags |= TB_WRITE; + + bucket->last_refilled_at_ts = now_ts; + return flags; +} + +static int +decrement_single_bucket(int32_t *bucketptr, + ssize_t n) +{ + if (BUG(n < 0)) + return 0; + const int becomes_empty = *bucketptr > 0 && n >= *bucketptr; + *bucketptr -= n; + return becomes_empty; +} + +/** + * Decrement the read token bucket in bucket by n bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_dec_read(token_bucket_t *bucket, + ssize_t n) +{ + return decrement_single_bucket(&bucket->read_bucket, n); +} + +/** + * Decrement the write token bucket in bucket by n bytes. + * + * Return true if the bucket was nonempty and became empty; return false + * otherwise. + */ +int +token_bucket_dec_write(token_bucket_t *bucket, + ssize_t n) +{ + return decrement_single_bucket(&bucket->write_bucket, n); +} + diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h new file mode 100644 index 000000000..ef0735219 --- /dev/null +++ b/src/common/token_bucket.h @@ -0,0 +1,72 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file token_bucket.h + * \brief Headers for token_bucket.c + **/ + +#ifndef TOR_TOKEN_BUCKET_H +#define TOR_TOKEN_BUCKET_H + +#include "torint.h" + +typedef struct token_bucket_t { + uint32_t rate; + int32_t burst; + int32_t read_bucket; + int32_t write_bucket; + uint32_t last_refilled_at_ts; +} token_bucket_t; + +#define TOKEN_BUCKET_MAX_BURST INT32_MAX + +void token_bucket_init(token_bucket_t *bucket, + uint32_t rate, + uint32_t burst, + uint32_t now_ts); + +void token_bucket_adjust(token_bucket_t *bucket, + uint32_t rate, uint32_t burst); + +void token_bucket_reset(token_bucket_t *bucket, + uint32_t now_ts); + +#define TB_READ 1 +#define TB_WRITE 2 + +int token_bucket_refill(token_bucket_t *bucket, + uint32_t now_ts); + +int token_bucket_dec_read(token_bucket_t *bucket, + ssize_t n); +int token_bucket_dec_write(token_bucket_t *bucket, + ssize_t n); + +static inline size_t token_bucket_get_read(const token_bucket_t *bucket); +static inline size_t +token_bucket_get_read(const token_bucket_t *bucket) +{ + const ssize_t b = bucket->read_bucket; + return b >= 0 ? b : 0; +} + +static inline size_t token_bucket_get_write(const token_bucket_t *bucket); +static inline size_t +token_bucket_get_write(const token_bucket_t *bucket) +{ + const ssize_t b = bucket->write_bucket; + return b >= 0 ? b : 0; +} + +#ifdef TOKEN_BUCKET_PRIVATE + +/* To avoid making the rates too small, we consider units of "steps", + * where a "step" is defined as this many timestamp ticks. Keep this + * a power of two if you can. */ +#define TICKS_PER_STEP 16 + +#endif + +#endif /* TOR_TOKEN_BUCKET_H */ + diff --git a/src/test/include.am b/src/test/include.am index e98b056a4..2da50de01 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -89,6 +89,7 @@ src_test_test_SOURCES = \ src/test/test_address.c \ src/test/test_address_set.c \ src/test/test_buffers.c \ + src/test/test_bwmgt.c \ src/test/test_cell_formats.c \ src/test/test_cell_queue.c \ src/test/test_channel.c \ diff --git a/src/test/test.c b/src/test/test.c index 4f2fbc693..7df385bc3 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -812,6 +812,7 @@ struct testgroup_t testgroups[] = { { "address/", address_tests }, { "address_set/", address_set_tests }, { "buffer/", buffer_tests }, + { "bwmgt/", bwmgt_tests }, { "cellfmt/", cell_format_tests }, { "cellqueue/", cell_queue_tests }, { "channel/", channel_tests }, diff --git a/src/test/test.h b/src/test/test.h index 02ec9bda8..95715da7a 100644 --- a/src/test/test.h +++ b/src/test/test.h @@ -178,6 +178,7 @@ extern struct testcase_t accounting_tests[]; extern struct testcase_t addr_tests[]; extern struct testcase_t address_tests[]; extern struct testcase_t address_set_tests[]; +extern struct testcase_t bwmgt_tests[]; extern struct testcase_t buffer_tests[]; extern struct testcase_t cell_format_tests[]; extern struct testcase_t cell_queue_tests[]; diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c new file mode 100644 index 000000000..7bcfcf7fe --- /dev/null +++ b/src/test/test_bwmgt.c @@ -0,0 +1,199 @@ +/* Copyright (c) 2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file test_bwmgt.c + * \brief tests for bandwidth management / token bucket functions + */ + +#define TOKEN_BUCKET_PRIVATE + +#include "or.h" +#include "test.h" + +#include "token_bucket.h" + +// an imaginary time, in timestamp units. Chosen so it will roll over. +static const uint32_t START_TS = UINT32_MAX-10; +static const int32_t KB = 1024; + +static void +test_bwmgt_token_buf_init(void *arg) +{ + (void)arg; + token_bucket_t b; + + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + // Burst is correct + tt_uint_op(b.burst, OP_EQ, 64*KB); + // Rate is correct, within 1 percent. + { + uint32_t ticks_per_sec = + (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000); + uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP); + + tt_uint_op(rate_per_sec, OP_GT, 16*KB-160); + tt_uint_op(rate_per_sec, OP_LT, 16*KB+160); + } + // Bucket starts out full: + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS); + tt_int_op(b.read_bucket, OP_EQ, 64*KB); + + done: + ; +} + +static void +test_bwmgt_token_buf_adjust(void *arg) +{ + (void)arg; + token_bucket_t b; + + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + uint32_t rate_orig = b.rate; + // Increasing burst + token_bucket_adjust(&b, 16*KB, 128*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.burst, OP_EQ, 128*KB); + + // Decreasing burst but staying above bucket + token_bucket_adjust(&b, 16*KB, 96*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 64*KB); + tt_uint_op(b.burst, OP_EQ, 96*KB); + + // Decreasing burst below bucket, + token_bucket_adjust(&b, 16*KB, 48*KB); + tt_uint_op(b.rate, OP_EQ, rate_orig); + tt_uint_op(b.read_bucket, OP_EQ, 48*KB); + tt_uint_op(b.burst, OP_EQ, 48*KB); + + // Changing rate. + token_bucket_adjust(&b, 32*KB, 48*KB); + tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10); + tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10); + tt_uint_op(b.read_bucket, OP_EQ, 48*KB); + tt_uint_op(b.burst, OP_EQ, 48*KB); + + done: + ; +} + +static void +test_bwmgt_token_buf_dec(void *arg) +{ + (void)arg; + token_bucket_t b; + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + // full-to-not-full. + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB)); + tt_int_op(b.read_bucket, OP_EQ, 63*KB); + + // Full to almost-not-full + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1)); + tt_int_op(b.read_bucket, OP_EQ, 1); + + // almost-not-full to empty. + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1)); + tt_int_op(b.read_bucket, OP_EQ, 0); + + // reset bucket, try full-to-empty + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB)); + tt_int_op(b.read_bucket, OP_EQ, 0); + + // reset bucket, try underflow. + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1)); + tt_int_op(b.read_bucket, OP_EQ, -1); + + // A second underflow does not make the bucket empty. + tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000)); + tt_int_op(b.read_bucket, OP_EQ, -1001); + + done: + ; +} + +static void +test_bwmgt_token_buf_refill(void *arg) +{ + (void)arg; + token_bucket_t b; + const uint32_t SEC = + (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000); + token_bucket_init(&b, 16*KB, 64*KB, START_TS); + + /* Make the buffer much emptier, then let one second elapse. */ + token_bucket_dec_read(&b, 48*KB); + tt_int_op(b.read_bucket, OP_EQ, 16*KB); + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC)); + tt_int_op(b.read_bucket, OP_GT, 32*KB - 300); + tt_int_op(b.read_bucket, OP_LT, 32*KB + 300); + + /* Another half second. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket, OP_GT, 40*KB - 400); + tt_int_op(b.read_bucket, OP_LT, 40*KB + 400); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2); + + /* No time: nothing happens. */ + { + const uint32_t bucket_orig = b.read_bucket; + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2)); + tt_int_op(b.read_bucket, OP_EQ, bucket_orig); + } + + /* Another 30 seconds: fill the bucket. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30); + + /* Another 30 seconds: nothing happens. */ + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60); + + /* Empty the bucket, let two seconds pass, and make sure that a refill is + * noticed. */ + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst)); + tt_int_op(0, OP_EQ, b.read_bucket); + tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61)); + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62)); + tt_int_op(b.read_bucket, OP_GT, 32*KB-300); + tt_int_op(b.read_bucket, OP_LT, 32*KB+300); + + /* Underflow the bucket, make sure we detect when it has tokens again. */ + tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB)); + tt_int_op(-16*KB, OP_EQ, b.read_bucket); + // half a second passes... + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket, OP_GT, -8*KB-200); + tt_int_op(b.read_bucket, OP_LT, -8*KB+200); + // a second passes + tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65)); + tt_int_op(b.read_bucket, OP_GT, 8*KB-200); + tt_int_op(b.read_bucket, OP_LT, 8*KB+200); + + // a ridiculous amount of time passes + tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket, OP_EQ, b.burst); + + done: + ; +} + +#define BWMGT(name) \ + { #name, test_bwmgt_ ## name , 0, NULL, NULL } + +struct testcase_t bwmgt_tests[] = { + BWMGT(token_buf_init), + BWMGT(token_buf_adjust), + BWMGT(token_buf_dec), + BWMGT(token_buf_refill), + END_OF_TESTCASES +}; + From 8a852397462e39cfbc21e3cea20ddd39be40598d Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 12:33:30 -0400 Subject: [PATCH 04/11] Add a helper function to decrement read and write at the same time --- src/common/token_bucket.c | 11 +++++++++++ src/common/token_bucket.h | 3 +++ 2 files changed, 14 insertions(+) diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c index f4d2cccff..abb050183 100644 --- a/src/common/token_bucket.c +++ b/src/common/token_bucket.c @@ -178,3 +178,14 @@ token_bucket_dec_write(token_bucket_t *bucket, return decrement_single_bucket(&bucket->write_bucket, n); } +/** + * As token_bucket_dec_read and token_bucket_dec_write, in a single operation. + */ +void +token_bucket_dec(token_bucket_t *bucket, + ssize_t n_read, ssize_t n_written) +{ + token_bucket_dec_read(bucket, n_read); + token_bucket_dec_read(bucket, n_written); +} + diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h index ef0735219..2d1ccd5cf 100644 --- a/src/common/token_bucket.h +++ b/src/common/token_bucket.h @@ -43,6 +43,9 @@ int token_bucket_dec_read(token_bucket_t *bucket, int token_bucket_dec_write(token_bucket_t *bucket, ssize_t n); +void token_bucket_dec(token_bucket_t *bucket, + ssize_t n_read, ssize_t n_written); + static inline size_t token_bucket_get_read(const token_bucket_t *bucket); static inline size_t token_bucket_get_read(const token_bucket_t *bucket) From 9fced56ef1a6eae0ce01de310c871823998bd791 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 12:34:28 -0400 Subject: [PATCH 05/11] Refactor or_connection token buckets to use token_bucket_t --- src/or/connection.c | 58 +++++++++--------------------------------- src/or/connection.h | 3 ++- src/or/connection_or.c | 14 +++------- src/or/main.c | 6 +++-- src/or/or.h | 10 +++----- 5 files changed, 24 insertions(+), 67 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index e2fd196dd..4013e0538 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -119,8 +119,6 @@ static connection_t *connection_listener_new( static void connection_init(time_t now, connection_t *conn, int type, int socket_family); static int connection_handle_listener_read(connection_t *conn, int new_type); -static int connection_bucket_should_increase(int bucket, - or_connection_t *conn); static int connection_finished_flushing(connection_t *conn); static int connection_flushed_some(connection_t *conn); static int connection_finished_connecting(connection_t *conn); @@ -2887,7 +2885,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now) if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); if (conn->state == OR_CONN_STATE_OPEN) - conn_bucket = or_conn->read_bucket; + conn_bucket = token_bucket_get_read(&or_conn->bucket); base = get_cell_network_size(or_conn->wide_circ_ids); } @@ -2919,13 +2917,10 @@ connection_bucket_write_limit(connection_t *conn, time_t now) } if (connection_speaks_cells(conn)) { - /* use the per-conn write limit if it's lower, but if it's less - * than zero just use zero */ + /* use the per-conn write limit if it's lower */ or_connection_t *or_conn = TO_OR_CONN(conn); if (conn->state == OR_CONN_STATE_OPEN) - if (or_conn->write_bucket < conn_bucket) - conn_bucket = or_conn->write_bucket >= 0 ? - or_conn->write_bucket : 0; + conn_bucket = MIN(conn_bucket, token_bucket_get_write(&or_conn->bucket)); base = get_cell_network_size(or_conn->wide_circ_ids); } @@ -3050,8 +3045,8 @@ connection_buckets_decrement(connection_t *conn, time_t now, global_read_bucket -= (int)num_read; global_write_bucket -= (int)num_written; if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - TO_OR_CONN(conn)->read_bucket -= (int)num_read; - TO_OR_CONN(conn)->write_bucket -= (int)num_written; + or_connection_t *or_conn = TO_OR_CONN(conn); + token_bucket_dec(&or_conn->bucket, num_read, num_written); } } @@ -3072,7 +3067,7 @@ connection_consider_empty_read_buckets(connection_t *conn) reason = "global relayed read bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && - TO_OR_CONN(conn)->read_bucket <= 0) { + token_bucket_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; } else return; /* all good, no need to stop it */ @@ -3099,7 +3094,7 @@ connection_consider_empty_write_buckets(connection_t *conn) reason = "global relayed write bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && - TO_OR_CONN(conn)->write_bucket <= 0) { + token_bucket_get_write(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection write bucket exhausted. Pausing."; } else return; /* all good, no need to stop it */ @@ -3157,7 +3152,7 @@ connection_bucket_refill_helper(int *bucket, int rate, int burst, /** Time has passed; increment buckets appropriately. */ void -connection_bucket_refill(int milliseconds_elapsed, time_t now) +connection_bucket_refill(int milliseconds_elapsed, time_t now, uint32_t now_ts) { const or_options_t *options = get_options(); smartlist_t *conns = get_connection_array(); @@ -3201,22 +3196,9 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); - int orbandwidthrate = or_conn->bandwidthrate; - int orbandwidthburst = or_conn->bandwidthburst; - if (connection_bucket_should_increase(or_conn->read_bucket, or_conn)) { - connection_bucket_refill_helper(&or_conn->read_bucket, - orbandwidthrate, - orbandwidthburst, - milliseconds_elapsed, - "or_conn->read_bucket"); - } - if (connection_bucket_should_increase(or_conn->write_bucket, or_conn)) { - connection_bucket_refill_helper(&or_conn->write_bucket, - orbandwidthrate, - orbandwidthburst, - milliseconds_elapsed, - "or_conn->write_bucket"); + if (conn->state == OR_CONN_STATE_OPEN) { + token_bucket_refill(&or_conn->bucket, now_ts); } } @@ -3226,7 +3208,7 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) global_relayed_read_bucket > 0) /* even if we're relayed traffic */ && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || - TO_OR_CONN(conn)->read_bucket > 0)) { + token_bucket_get_read(&TO_OR_CONN(conn)->bucket) > 0)) { /* and either a non-cell conn or a cell conn with non-empty bucket */ LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, "waking up conn (fd %d) for read", (int)conn->s)); @@ -3240,7 +3222,7 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) global_relayed_write_bucket > 0) /* even if it's relayed traffic */ && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || - TO_OR_CONN(conn)->write_bucket > 0)) { + token_bucket_get_write(&TO_OR_CONN(conn)->bucket) > 0)) { LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, "waking up conn (fd %d) for write", (int)conn->s)); conn->write_blocked_on_bw = 0; @@ -3249,22 +3231,6 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now) } SMARTLIST_FOREACH_END(conn); } -/** Is the bucket for connection conn low enough that we - * should add another pile of tokens to it? - */ -static int -connection_bucket_should_increase(int bucket, or_connection_t *conn) -{ - tor_assert(conn); - - if (conn->base_.state != OR_CONN_STATE_OPEN) - return 0; /* only open connections play the rate limiting game */ - if (bucket >= conn->bandwidthburst) - return 0; - - return 1; -} - /** Read bytes from conn-\>s and process them. * * It calls connection_buf_read_from_socket() to bring in any new bytes, diff --git a/src/or/connection.h b/src/or/connection.h index 930e5f81a..acc63640d 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -122,7 +122,8 @@ void connection_mark_all_noncontrol_connections(void); ssize_t connection_bucket_write_limit(connection_t *conn, time_t now); int global_write_bucket_low(connection_t *conn, size_t attempt, int priority); void connection_bucket_init(void); -void connection_bucket_refill(int seconds_elapsed, time_t now); +void connection_bucket_refill(int seconds_elapsed, time_t now, + uint32_t now_ts); int connection_handle_read(connection_t *conn); diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 267463312..3afdfa6b5 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -793,18 +793,10 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset, (int)options->BandwidthBurst, 1, INT32_MAX); } - conn->bandwidthrate = rate; - conn->bandwidthburst = burst; - if (reset) { /* set up the token buckets to be full */ - conn->read_bucket = conn->write_bucket = burst; - return; + token_bucket_adjust(&conn->bucket, rate, burst); + if (reset) { + token_bucket_reset(&conn->bucket, monotime_coarse_get_stamp()); } - /* If the new token bucket is smaller, take out the extra tokens. - * (If it's larger, don't -- the buckets can grow to reach the cap.) */ - if (conn->read_bucket > burst) - conn->read_bucket = burst; - if (conn->write_bucket > burst) - conn->write_bucket = burst; } /** Either our set of relays or our per-conn rate limits have changed. diff --git a/src/or/main.c b/src/or/main.c index d1df11af5..b12effce1 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -2426,8 +2426,10 @@ refill_callback(periodic_timer_t *timer, void *arg) if (accounting_is_enabled(options) && milliseconds_elapsed >= 0) accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over); - if (milliseconds_elapsed > 0) - connection_bucket_refill(milliseconds_elapsed, (time_t)now.tv_sec); + if (milliseconds_elapsed > 0) { + connection_bucket_refill(milliseconds_elapsed, (time_t)now.tv_sec, + monotime_coarse_get_stamp()); + } stats_prev_global_read_bucket = global_read_bucket; stats_prev_global_write_bucket = global_write_bucket; diff --git a/src/or/or.h b/src/or/or.h index bcce33755..a826cacbf 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -80,6 +80,7 @@ #include "crypto_curve25519.h" #include "crypto_ed25519.h" #include "tor_queue.h" +#include "token_bucket.h" #include "util_format.h" #include "hs_circuitmap.h" @@ -1652,13 +1653,8 @@ typedef struct or_connection_t { time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/ - /* bandwidth* and *_bucket only used by ORs in OPEN state: */ - int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */ - int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */ - int read_bucket; /**< When this hits 0, stop receiving. Every second we - * add 'bandwidthrate' to this, capping it at - * bandwidthburst. (OPEN ORs only) */ - int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */ + token_bucket_t bucket; /**< Used for rate limiting when the connection is + * in state CONN_OPEN. */ /* * Count the number of bytes flushed out on this orconn, and the number of From a38fd9bc5bf508d029ff2557311caeb487638968 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 13:19:25 -0400 Subject: [PATCH 06/11] Replace the global buckets with token_bucket_t --- src/or/connection.c | 143 ++++++++++++++------------------------------ src/or/connection.h | 2 +- src/or/main.c | 36 +++++------ src/or/main.h | 6 +- 4 files changed, 68 insertions(+), 119 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index 4013e0538..b2ef28776 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2846,7 +2846,7 @@ connection_counts_as_relayed_traffic(connection_t *conn, time_t now) * non-negative) provides an upper limit for our answer. */ static ssize_t connection_bucket_round_robin(int base, int priority, - ssize_t global_bucket, ssize_t conn_bucket) + ssize_t global_bucket_val, ssize_t conn_bucket) { ssize_t at_most; ssize_t num_bytes_high = (priority ? 32 : 16) * base; @@ -2855,15 +2855,15 @@ connection_bucket_round_robin(int base, int priority, /* Do a rudimentary round-robin so one circuit can't hog a connection. * Pick at most 32 cells, at least 4 cells if possible, and if we're in * the middle pick 1/8 of the available bandwidth. */ - at_most = global_bucket / 8; + at_most = global_bucket_val / 8; at_most -= (at_most % base); /* round down */ if (at_most > num_bytes_high) /* 16 KB, or 8 KB for low-priority */ at_most = num_bytes_high; else if (at_most < num_bytes_low) /* 2 KB, or 1 KB for low-priority */ at_most = num_bytes_low; - if (at_most > global_bucket) - at_most = global_bucket; + if (at_most > global_bucket_val) + at_most = global_bucket_val; if (conn_bucket >= 0 && at_most > conn_bucket) at_most = conn_bucket; @@ -2880,7 +2880,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now) int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; int conn_bucket = -1; - int global_bucket = global_read_bucket; + int global_bucket_val = (int) token_bucket_get_read(&global_bucket); if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); @@ -2894,12 +2894,13 @@ connection_bucket_read_limit(connection_t *conn, time_t now) return conn_bucket>=0 ? conn_bucket : 1<<14; } - if (connection_counts_as_relayed_traffic(conn, now) && - global_relayed_read_bucket <= global_read_bucket) - global_bucket = global_relayed_read_bucket; + if (connection_counts_as_relayed_traffic(conn, now)) { + int relayed = token_bucket_get_read(&global_relayed_bucket); + global_bucket_val = MIN(global_bucket_val, relayed); + } return connection_bucket_round_robin(base, priority, - global_bucket, conn_bucket); + global_bucket_val, conn_bucket); } /** How many bytes at most can we write onto this connection? */ @@ -2909,7 +2910,7 @@ connection_bucket_write_limit(connection_t *conn, time_t now) int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; int conn_bucket = (int)conn->outbuf_flushlen; - int global_bucket = global_write_bucket; + int global_bucket_val = (int) token_bucket_get_write(&global_bucket); if (!connection_is_rate_limited(conn)) { /* be willing to write to local conns even if our buckets are empty */ @@ -2924,12 +2925,13 @@ connection_bucket_write_limit(connection_t *conn, time_t now) base = get_cell_network_size(or_conn->wide_circ_ids); } - if (connection_counts_as_relayed_traffic(conn, now) && - global_relayed_write_bucket <= global_write_bucket) - global_bucket = global_relayed_write_bucket; + if (connection_counts_as_relayed_traffic(conn, now)) { + int relayed = token_bucket_get_write(&global_relayed_bucket); + global_bucket_val = MIN(global_bucket_val, relayed); + } return connection_bucket_round_robin(base, priority, - global_bucket, conn_bucket); + global_bucket_val, conn_bucket); } /** Return 1 if the global write buckets are low enough that we @@ -2954,8 +2956,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now) int global_write_bucket_low(connection_t *conn, size_t attempt, int priority) { - int smaller_bucket = global_write_bucket < global_relayed_write_bucket ? - global_write_bucket : global_relayed_write_bucket; + int smaller_bucket = MIN(token_bucket_get_write(&global_bucket), + token_bucket_get_write(&global_relayed_bucket)); if (authdir_mode(get_options()) && priority>1) return 0; /* there's always room to answer v2 if we're an auth dir */ @@ -3039,11 +3041,9 @@ connection_buckets_decrement(connection_t *conn, time_t now, return; /* local IPs are free */ if (connection_counts_as_relayed_traffic(conn, now)) { - global_relayed_read_bucket -= (int)num_read; - global_relayed_write_bucket -= (int)num_written; + token_bucket_dec(&global_relayed_bucket, num_read, num_written); } - global_read_bucket -= (int)num_read; - global_write_bucket -= (int)num_written; + token_bucket_dec(&global_bucket, num_read, num_written); if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { or_connection_t *or_conn = TO_OR_CONN(conn); token_bucket_dec(&or_conn->bucket, num_read, num_written); @@ -3060,10 +3060,10 @@ connection_consider_empty_read_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ - if (global_read_bucket <= 0) { + if (token_bucket_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && - global_relayed_read_bucket <= 0) { + token_bucket_get_read(&global_relayed_bucket) <= 0) { reason = "global relayed read bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && @@ -3087,10 +3087,10 @@ connection_consider_empty_write_buckets(connection_t *conn) if (!connection_is_rate_limited(conn)) return; /* Always okay. */ - if (global_write_bucket <= 0) { + if (token_bucket_get_write(&global_bucket) <= 0) { reason = "global write bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && - global_relayed_write_bucket <= 0) { + token_bucket_get_write(&global_relayed_bucket) <= 0) { reason = "global relayed write bucket exhausted. Pausing."; } else if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && @@ -3109,88 +3109,37 @@ void connection_bucket_init(void) { const or_options_t *options = get_options(); - /* start it at max traffic */ - global_read_bucket = (int)options->BandwidthBurst; - global_write_bucket = (int)options->BandwidthBurst; + const uint32_t now_ts = monotime_coarse_get_stamp(); + token_bucket_init(&global_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst, + now_ts); if (options->RelayBandwidthRate) { - global_relayed_read_bucket = (int)options->RelayBandwidthBurst; - global_relayed_write_bucket = (int)options->RelayBandwidthBurst; + token_bucket_init(&global_relayed_bucket, + (int32_t)options->RelayBandwidthRate, + (int32_t)options->RelayBandwidthBurst, + now_ts); } else { - global_relayed_read_bucket = (int)options->BandwidthBurst; - global_relayed_write_bucket = (int)options->BandwidthBurst; - } -} - -/** Refill a single bucket called name with bandwidth rate per - * second rate and bandwidth burst burst, assuming that - * milliseconds_elapsed milliseconds have passed since the last - * call. */ -static void -connection_bucket_refill_helper(int *bucket, int rate, int burst, - int milliseconds_elapsed, - const char *name) -{ - int starting_bucket = *bucket; - if (starting_bucket < burst && milliseconds_elapsed > 0) { - int64_t incr = (((int64_t)rate) * milliseconds_elapsed) / 1000; - if ((burst - starting_bucket) < incr) { - *bucket = burst; /* We would overflow the bucket; just set it to - * the maximum. */ - } else { - *bucket += (int)incr; - if (*bucket > burst || *bucket < starting_bucket) { - /* If we overflow the burst, or underflow our starting bucket, - * cap the bucket value to burst. */ - /* XXXX this might be redundant now, but it doesn't show up - * in profiles. Remove it after analysis. */ - *bucket = burst; - } - } - log_debug(LD_NET,"%s now %d.", name, *bucket); + token_bucket_init(&global_relayed_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst, + now_ts); } } /** Time has passed; increment buckets appropriately. */ void -connection_bucket_refill(int milliseconds_elapsed, time_t now, uint32_t now_ts) +connection_bucket_refill(time_t now, uint32_t now_ts) { - const or_options_t *options = get_options(); smartlist_t *conns = get_connection_array(); - int bandwidthrate, bandwidthburst, relayrate, relayburst; - - bandwidthrate = (int)options->BandwidthRate; - bandwidthburst = (int)options->BandwidthBurst; - - if (options->RelayBandwidthRate) { - relayrate = (int)options->RelayBandwidthRate; - relayburst = (int)options->RelayBandwidthBurst; - } else { - relayrate = bandwidthrate; - relayburst = bandwidthburst; - } - - tor_assert(milliseconds_elapsed >= 0); write_buckets_empty_last_second = - global_relayed_write_bucket <= 0 || global_write_bucket <= 0; + token_bucket_get_write(&global_bucket) <= 0 || + token_bucket_get_write(&global_relayed_bucket) <= 0; /* refill the global buckets */ - connection_bucket_refill_helper(&global_read_bucket, - bandwidthrate, bandwidthburst, - milliseconds_elapsed, - "global_read_bucket"); - connection_bucket_refill_helper(&global_write_bucket, - bandwidthrate, bandwidthburst, - milliseconds_elapsed, - "global_write_bucket"); - connection_bucket_refill_helper(&global_relayed_read_bucket, - relayrate, relayburst, - milliseconds_elapsed, - "global_relayed_read_bucket"); - connection_bucket_refill_helper(&global_relayed_write_bucket, - relayrate, relayburst, - milliseconds_elapsed, - "global_relayed_write_bucket"); + token_bucket_refill(&global_bucket, now_ts); + token_bucket_refill(&global_relayed_bucket, now_ts); /* refill the per-connection buckets */ SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) { @@ -3203,9 +3152,9 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now, uint32_t now_ts) } if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */ - && global_read_bucket > 0 /* and we're allowed to read */ + && token_bucket_get_read(&global_bucket) > 0 /* and we can read */ && (!connection_counts_as_relayed_traffic(conn, now) || - global_relayed_read_bucket > 0) /* even if we're relayed traffic */ + token_bucket_get_read(&global_relayed_bucket) > 0) && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || token_bucket_get_read(&TO_OR_CONN(conn)->bucket) > 0)) { @@ -3217,9 +3166,9 @@ connection_bucket_refill(int milliseconds_elapsed, time_t now, uint32_t now_ts) } if (conn->write_blocked_on_bw == 1 - && global_write_bucket > 0 /* and we're allowed to write */ + && token_bucket_get_write(&global_bucket) > 0 /* and we can write */ && (!connection_counts_as_relayed_traffic(conn, now) || - global_relayed_write_bucket > 0) /* even if it's relayed traffic */ + token_bucket_get_write(&global_relayed_bucket) > 0) && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || token_bucket_get_write(&TO_OR_CONN(conn)->bucket) > 0)) { diff --git a/src/or/connection.h b/src/or/connection.h index acc63640d..4a57bd311 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -122,7 +122,7 @@ void connection_mark_all_noncontrol_connections(void); ssize_t connection_bucket_write_limit(connection_t *conn, time_t now); int global_write_bucket_low(connection_t *conn, size_t attempt, int priority); void connection_bucket_init(void); -void connection_bucket_refill(int seconds_elapsed, time_t now, +void connection_bucket_refill(time_t now, uint32_t now_ts); int connection_handle_read(connection_t *conn); diff --git a/src/or/main.c b/src/or/main.c index b12effce1..f59ee24a7 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -152,15 +152,15 @@ static void shutdown_did_not_work_callback(evutil_socket_t fd, short event, void *arg) ATTR_NORETURN; /********* START VARIABLES **********/ -int global_read_bucket; /**< Max number of bytes I can read this second. */ -int global_write_bucket; /**< Max number of bytes I can write this second. */ -/** Max number of relayed (bandwidth class 1) bytes I can read this second. */ -int global_relayed_read_bucket; -/** Max number of relayed (bandwidth class 1) bytes I can write this second. */ -int global_relayed_write_bucket; -/** What was the read bucket before the last second_elapsed_callback() call? - * (used to determine how many bytes we've read). */ +/* Token bucket for all traffic. */ +token_bucket_t global_bucket; + +/* Token bucket for relayed traffic. */ +token_bucket_t global_relayed_bucket; + +/** What was the read/write bucket before the last second_elapsed_callback() + * call? (used to determine how many bytes we've read). */ static int stats_prev_global_read_bucket; /** What was the write bucket before the last second_elapsed_callback() call? * (used to determine how many bytes we've written). */ @@ -2418,8 +2418,10 @@ refill_callback(periodic_timer_t *timer, void *arg) refill_timer_current_millisecond.tv_sec); } - bytes_written = stats_prev_global_write_bucket - global_write_bucket; - bytes_read = stats_prev_global_read_bucket - global_read_bucket; + bytes_written = stats_prev_global_write_bucket - + token_bucket_get_write(&global_bucket); + bytes_read = stats_prev_global_read_bucket - + token_bucket_get_read(&global_bucket); stats_n_bytes_read += bytes_read; stats_n_bytes_written += bytes_written; @@ -2427,12 +2429,12 @@ refill_callback(periodic_timer_t *timer, void *arg) accounting_add_bytes(bytes_read, bytes_written, seconds_rolled_over); if (milliseconds_elapsed > 0) { - connection_bucket_refill(milliseconds_elapsed, (time_t)now.tv_sec, + connection_bucket_refill((time_t)now.tv_sec, monotime_coarse_get_stamp()); } - stats_prev_global_read_bucket = global_read_bucket; - stats_prev_global_write_bucket = global_write_bucket; + stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket); + stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket); /* remember what time it is, for next time */ refill_timer_current_millisecond = now; @@ -2636,8 +2638,8 @@ do_main_loop(void) /* Set up our buckets */ connection_bucket_init(); - stats_prev_global_read_bucket = global_read_bucket; - stats_prev_global_write_bucket = global_write_bucket; + stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket); + stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket); /* initialize the bootstrap status events to know we're starting up */ control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0); @@ -3532,8 +3534,8 @@ tor_free_all(int postfork) periodic_timer_free(systemd_watchdog_timer); #endif - global_read_bucket = global_write_bucket = 0; - global_relayed_read_bucket = global_relayed_write_bucket = 0; + memset(&global_bucket, 0, sizeof(global_bucket)); + memset(&global_relayed_bucket, 0, sizeof(global_relayed_bucket)); stats_prev_global_read_bucket = stats_prev_global_write_bucket = 0; stats_prev_n_read = stats_prev_n_written = 0; stats_n_bytes_read = stats_n_bytes_written = 0; diff --git a/src/or/main.h b/src/or/main.h index f01506fce..9ef5b9472 100644 --- a/src/or/main.h +++ b/src/or/main.h @@ -89,10 +89,8 @@ uint64_t get_main_loop_idle_count(void); extern time_t time_of_process_start; extern int quiet_level; -extern int global_read_bucket; -extern int global_write_bucket; -extern int global_relayed_read_bucket; -extern int global_relayed_write_bucket; +extern token_bucket_t global_bucket; +extern token_bucket_t global_relayed_bucket; #ifdef MAIN_PRIVATE STATIC void init_connection_lists(void); From 6be994fa717cf73e9cfcb63f49f2d335f2d39bb9 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 13:40:34 -0400 Subject: [PATCH 07/11] Ensure that global buckets are updated on configuration change --- src/or/config.c | 6 ++++++ src/or/connection.c | 21 ++++++++++++++++++++- src/or/connection.h | 1 + 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/or/config.c b/src/or/config.c index 71f8528b6..c62441948 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -2219,6 +2219,12 @@ options_act(const or_options_t *old_options) options->PerConnBWBurst != old_options->PerConnBWBurst) connection_or_update_token_buckets(get_connection_array(), options); + if (options->BandwidthRate != old_options->BandwidthRate || + options->BandwidthBurst != old_options->BandwidthBurst || + options->BandwidthRate != old_options->BandwidthRate || + options->RelayBandwidthBurst != old_options->RelayBandwidthBurst) + connection_bucket_adjust(options); + if (options->MainloopStats != old_options->MainloopStats) { reset_main_loop_counters(); } diff --git a/src/or/connection.c b/src/or/connection.c index b2ef28776..83bab10eb 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -3104,7 +3104,8 @@ connection_consider_empty_write_buckets(connection_t *conn) connection_stop_writing(conn); } -/** Initialize the global read bucket to options-\>BandwidthBurst. */ +/** Initialize the global buckets to the values configured in the + * options */ void connection_bucket_init(void) { @@ -3127,6 +3128,24 @@ connection_bucket_init(void) } } +/** Update the global connection bucket settings to a new value. */ +void +connection_bucket_adjust(const or_options_t *options) +{ + token_bucket_adjust(&global_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst); + if (options->RelayBandwidthRate) { + token_bucket_adjust(&global_relayed_bucket, + (int32_t)options->RelayBandwidthRate, + (int32_t)options->RelayBandwidthBurst); + } else { + token_bucket_adjust(&global_relayed_bucket, + (int32_t)options->BandwidthRate, + (int32_t)options->BandwidthBurst); + } +} + /** Time has passed; increment buckets appropriately. */ void connection_bucket_refill(time_t now, uint32_t now_ts) diff --git a/src/or/connection.h b/src/or/connection.h index 4a57bd311..cfe31c372 100644 --- a/src/or/connection.h +++ b/src/or/connection.h @@ -122,6 +122,7 @@ void connection_mark_all_noncontrol_connections(void); ssize_t connection_bucket_write_limit(connection_t *conn, time_t now); int global_write_bucket_low(connection_t *conn, size_t attempt, int priority); void connection_bucket_init(void); +void connection_bucket_adjust(const or_options_t *options); void connection_bucket_refill(time_t now, uint32_t now_ts); From 12f58f2f87b2f0c513f63018da4170b7b663e4e9 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 13:56:45 -0400 Subject: [PATCH 08/11] Remove a bunch of int casts; make clang happier. --- src/or/connection.c | 22 +++++++++++----------- src/or/main.c | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/or/connection.c b/src/or/connection.c index 83bab10eb..1aad68678 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -2879,8 +2879,8 @@ connection_bucket_read_limit(connection_t *conn, time_t now) { int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; - int conn_bucket = -1; - int global_bucket_val = (int) token_bucket_get_read(&global_bucket); + ssize_t conn_bucket = -1; + size_t global_bucket_val = token_bucket_get_read(&global_bucket); if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); @@ -2895,7 +2895,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now) } if (connection_counts_as_relayed_traffic(conn, now)) { - int relayed = token_bucket_get_read(&global_relayed_bucket); + size_t relayed = token_bucket_get_read(&global_relayed_bucket); global_bucket_val = MIN(global_bucket_val, relayed); } @@ -2909,8 +2909,8 @@ connection_bucket_write_limit(connection_t *conn, time_t now) { int base = RELAY_PAYLOAD_SIZE; int priority = conn->type != CONN_TYPE_DIR; - int conn_bucket = (int)conn->outbuf_flushlen; - int global_bucket_val = (int) token_bucket_get_write(&global_bucket); + size_t conn_bucket = conn->outbuf_flushlen; + size_t global_bucket_val = token_bucket_get_write(&global_bucket); if (!connection_is_rate_limited(conn)) { /* be willing to write to local conns even if our buckets are empty */ @@ -2926,7 +2926,7 @@ connection_bucket_write_limit(connection_t *conn, time_t now) } if (connection_counts_as_relayed_traffic(conn, now)) { - int relayed = token_bucket_get_write(&global_relayed_bucket); + size_t relayed = token_bucket_get_write(&global_relayed_bucket); global_bucket_val = MIN(global_bucket_val, relayed); } @@ -2956,15 +2956,15 @@ connection_bucket_write_limit(connection_t *conn, time_t now) int global_write_bucket_low(connection_t *conn, size_t attempt, int priority) { - int smaller_bucket = MIN(token_bucket_get_write(&global_bucket), - token_bucket_get_write(&global_relayed_bucket)); + size_t smaller_bucket = MIN(token_bucket_get_write(&global_bucket), + token_bucket_get_write(&global_relayed_bucket)); if (authdir_mode(get_options()) && priority>1) return 0; /* there's always room to answer v2 if we're an auth dir */ if (!connection_is_rate_limited(conn)) return 0; /* local conns don't get limited */ - if (smaller_bucket < (int)attempt) + if (smaller_bucket < attempt) return 1; /* not enough space no matter the priority */ if (write_buckets_empty_last_second) @@ -2973,10 +2973,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority) if (priority == 1) { /* old-style v1 query */ /* Could we handle *two* of these requests within the next two seconds? */ const or_options_t *options = get_options(); - int64_t can_write = (int64_t)smaller_bucket + size_t can_write = smaller_bucket + 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate : options->BandwidthRate); - if (can_write < 2*(int64_t)attempt) + if (can_write < 2*attempt) return 1; } else { /* v2 query */ /* no further constraints yet */ diff --git a/src/or/main.c b/src/or/main.c index f59ee24a7..875791617 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -161,10 +161,10 @@ token_bucket_t global_relayed_bucket; /** What was the read/write bucket before the last second_elapsed_callback() * call? (used to determine how many bytes we've read). */ -static int stats_prev_global_read_bucket; +static size_t stats_prev_global_read_bucket; /** What was the write bucket before the last second_elapsed_callback() call? * (used to determine how many bytes we've written). */ -static int stats_prev_global_write_bucket; +static size_t stats_prev_global_write_bucket; /* DOCDOC stats_prev_n_read */ static uint64_t stats_prev_n_read = 0; From c63761a0a67a2d606deb749a5bcf680de9b3df36 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 10 Apr 2018 14:01:30 -0400 Subject: [PATCH 09/11] changes file for token_bucket refactoring --- changes/ticket25766 | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ticket25766 diff --git a/changes/ticket25766 b/changes/ticket25766 new file mode 100644 index 000000000..6382b6215 --- /dev/null +++ b/changes/ticket25766 @@ -0,0 +1,3 @@ + o Code simplification and refactoring: + - Refactor token-bucket implementations to use a common backend. + Closes ticket 25766. From 3f514fe3b1d217c80edb1524976203bc535f9502 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 12 Apr 2018 13:11:35 -0400 Subject: [PATCH 10/11] Accept small hops backward in the monotonic timer. --- src/common/token_bucket.c | 8 ++++++++ src/test/test_bwmgt.c | 7 ++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c index abb050183..6af298214 100644 --- a/src/common/token_bucket.c +++ b/src/common/token_bucket.c @@ -120,6 +120,14 @@ token_bucket_refill(token_bucket_t *bucket, uint32_t now_ts) { const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts); + if (elapsed_ticks > UINT32_MAX-(300*1000)) { + /* Either about 48 days have passed since the last refill, or the + * monotonic clock has somehow moved backwards. (We're looking at you, + * Windows.). We accept up to a 5 minute jump backwards as + * "unremarkable". + */ + return 0; + } const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP; if (!elapsed_steps) { diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c index 7bcfcf7fe..2428b1505 100644 --- a/src/test/test_bwmgt.c +++ b/src/test/test_bwmgt.c @@ -178,8 +178,13 @@ test_bwmgt_token_buf_refill(void *arg) tt_int_op(b.read_bucket, OP_GT, 8*KB-200); tt_int_op(b.read_bucket, OP_LT, 8*KB+200); - // a ridiculous amount of time passes + // We step a second backwards, and nothing happens. tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); + tt_int_op(b.read_bucket, OP_GT, 8*KB-200); + tt_int_op(b.read_bucket, OP_LT, 8*KB+200); + + // A ridiculous amount of time passes. + tt_int_op(0, OP_EQ, token_bucket_refill(&b, INT32_MAX)); tt_int_op(b.read_bucket, OP_EQ, b.burst); done: From 787bafc0f916c143ac244a217accf755817512df Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 13 Apr 2018 10:37:06 -0400 Subject: [PATCH 11/11] Increase tolerances for imprecise time. --- src/test/test_bwmgt.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c index 2428b1505..1a54f44fc 100644 --- a/src/test/test_bwmgt.c +++ b/src/test/test_bwmgt.c @@ -125,6 +125,7 @@ test_bwmgt_token_buf_refill(void *arg) token_bucket_t b; const uint32_t SEC = (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000); + printf("%d\n", (int)SEC); token_bucket_init(&b, 16*KB, 64*KB, START_TS); /* Make the buffer much emptier, then let one second elapse. */ @@ -163,25 +164,25 @@ test_bwmgt_token_buf_refill(void *arg) tt_int_op(0, OP_EQ, b.read_bucket); tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61)); tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62)); - tt_int_op(b.read_bucket, OP_GT, 32*KB-300); - tt_int_op(b.read_bucket, OP_LT, 32*KB+300); + tt_int_op(b.read_bucket, OP_GT, 32*KB-400); + tt_int_op(b.read_bucket, OP_LT, 32*KB+400); /* Underflow the bucket, make sure we detect when it has tokens again. */ tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB)); tt_int_op(-16*KB, OP_EQ, b.read_bucket); // half a second passes... tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); - tt_int_op(b.read_bucket, OP_GT, -8*KB-200); - tt_int_op(b.read_bucket, OP_LT, -8*KB+200); + tt_int_op(b.read_bucket, OP_GT, -8*KB-300); + tt_int_op(b.read_bucket, OP_LT, -8*KB+300); // a second passes tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65)); - tt_int_op(b.read_bucket, OP_GT, 8*KB-200); - tt_int_op(b.read_bucket, OP_LT, 8*KB+200); + tt_int_op(b.read_bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket, OP_LT, 8*KB+400); // We step a second backwards, and nothing happens. tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64)); - tt_int_op(b.read_bucket, OP_GT, 8*KB-200); - tt_int_op(b.read_bucket, OP_LT, 8*KB+200); + tt_int_op(b.read_bucket, OP_GT, 8*KB-400); + tt_int_op(b.read_bucket, OP_LT, 8*KB+400); // A ridiculous amount of time passes. tt_int_op(0, OP_EQ, token_bucket_refill(&b, INT32_MAX));