Merge branch 'token_bucket_once_again_squashed'
This commit is contained in:
commit
34c2574aa9
|
@ -9,8 +9,9 @@
|
|||
* Tor uses these token buckets to keep track of bandwidth usage, and
|
||||
* sometimes other things too.
|
||||
*
|
||||
* The time units we use internally are based on "timestamp" units -- see
|
||||
* monotime_coarse_to_stamp() for a rationale.
|
||||
* There are two layers of abstraction here: "raw" token buckets, in which all
|
||||
* the pieces are decoupled, and "read-write" token buckets, which combine all
|
||||
* the moving parts into one.
|
||||
*
|
||||
* Token buckets may become negative.
|
||||
**/
|
||||
|
@ -20,6 +21,92 @@
|
|||
#include "token_bucket.h"
|
||||
#include "util_bug.h"
|
||||
|
||||
/**
|
||||
* Set the <b>rate</b> and <b>burst</b> value in a token_bucket_cfg.
|
||||
*
|
||||
* Note that the <b>rate</b> value is in arbitrary units, but those units will
|
||||
* determine the units of token_bucket_raw_dec(), token_bucket_raw_refill, and
|
||||
* so on.
|
||||
*/
|
||||
void
|
||||
token_bucket_cfg_init(token_bucket_cfg_t *cfg,
|
||||
uint32_t rate,
|
||||
uint32_t burst)
|
||||
{
|
||||
tor_assert_nonfatal(rate > 0);
|
||||
tor_assert_nonfatal(burst > 0);
|
||||
if (burst > TOKEN_BUCKET_MAX_BURST)
|
||||
burst = TOKEN_BUCKET_MAX_BURST;
|
||||
|
||||
cfg->rate = rate;
|
||||
cfg->burst = burst;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a raw token bucket and its associated timestamp to the "full"
|
||||
* state, according to <b>cfg</b>.
|
||||
*/
|
||||
void
|
||||
token_bucket_raw_reset(token_bucket_raw_t *bucket,
|
||||
const token_bucket_cfg_t *cfg)
|
||||
{
|
||||
bucket->bucket = cfg->burst;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adust a preexisting token bucket to respect the new configuration
|
||||
* <b>cfg</b>, by decreasing its current level if needed. */
|
||||
void
|
||||
token_bucket_raw_adjust(token_bucket_raw_t *bucket,
|
||||
const token_bucket_cfg_t *cfg)
|
||||
{
|
||||
bucket->bucket = MIN(bucket->bucket, cfg->burst);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given an amount of <b>elapsed</b> time units, and a bucket configuration
|
||||
* <b>cfg</b>, refill the level of <b>bucket</b> accordingly. Note that the
|
||||
* units of time in <b>elapsed</b> must correspond to those used to set the
|
||||
* rate in <b>cfg</b>, or the result will be illogical.
|
||||
*/
|
||||
int
|
||||
token_bucket_raw_refill_steps(token_bucket_raw_t *bucket,
|
||||
const token_bucket_cfg_t *cfg,
|
||||
const uint32_t elapsed)
|
||||
{
|
||||
const int was_empty = (bucket->bucket <= 0);
|
||||
/* The casts here prevent an underflow.
|
||||
*
|
||||
* Note that even if the bucket value is negative, subtracting it from
|
||||
* "burst" will still produce a correct result. If this result is
|
||||
* ridiculously high, then the "elapsed > gap / rate" check below
|
||||
* should catch it. */
|
||||
const size_t gap = ((size_t)cfg->burst) - ((size_t)bucket->bucket);
|
||||
|
||||
if (elapsed > gap / cfg->rate) {
|
||||
bucket->bucket = cfg->burst;
|
||||
} else {
|
||||
bucket->bucket += cfg->rate * elapsed;
|
||||
}
|
||||
|
||||
return was_empty && bucket->bucket > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement a provided bucket by <b>n</b> units. Note that <b>n</b>
|
||||
* must be nonnegative.
|
||||
*/
|
||||
int
|
||||
token_bucket_raw_dec(token_bucket_raw_t *bucket,
|
||||
ssize_t n)
|
||||
{
|
||||
if (BUG(n < 0))
|
||||
return 0;
|
||||
const int becomes_empty = bucket->bucket > 0 && n >= bucket->bucket;
|
||||
bucket->bucket -= n;
|
||||
return becomes_empty;
|
||||
}
|
||||
|
||||
/** Convert a rate in bytes per second to a rate in bytes per step */
|
||||
static uint32_t
|
||||
rate_per_sec_to_rate_per_step(uint32_t rate)
|
||||
|
@ -30,8 +117,9 @@ rate_per_sec_to_rate_per_step(uint32_t rate)
|
|||
(rate / 1000) * to_approximate_msec(TICKS_PER_STEP). But to minimize
|
||||
rounding error, we do it this way instead, and divide last.
|
||||
*/
|
||||
return (uint32_t)
|
||||
uint32_t val = (uint32_t)
|
||||
monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000;
|
||||
return val ? val : 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -41,14 +129,14 @@ rate_per_sec_to_rate_per_step(uint32_t rate)
|
|||
* starts out full.
|
||||
*/
|
||||
void
|
||||
token_bucket_init(token_bucket_t *bucket,
|
||||
token_bucket_rw_init(token_bucket_rw_t *bucket,
|
||||
uint32_t rate,
|
||||
uint32_t burst,
|
||||
uint32_t now_ts)
|
||||
{
|
||||
memset(bucket, 0, sizeof(token_bucket_t));
|
||||
token_bucket_adjust(bucket, rate, burst);
|
||||
token_bucket_reset(bucket, now_ts);
|
||||
memset(bucket, 0, sizeof(token_bucket_rw_t));
|
||||
token_bucket_rw_adjust(bucket, rate, burst);
|
||||
token_bucket_rw_reset(bucket, now_ts);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -56,56 +144,27 @@ token_bucket_init(token_bucket_t *bucket,
|
|||
* for the token bucket in *<b>bucket</b>.
|
||||
*/
|
||||
void
|
||||
token_bucket_adjust(token_bucket_t *bucket,
|
||||
uint32_t rate,
|
||||
uint32_t burst)
|
||||
token_bucket_rw_adjust(token_bucket_rw_t *bucket,
|
||||
uint32_t rate,
|
||||
uint32_t burst)
|
||||
{
|
||||
tor_assert_nonfatal(rate > 0);
|
||||
tor_assert_nonfatal(burst > 0);
|
||||
if (burst > TOKEN_BUCKET_MAX_BURST)
|
||||
burst = TOKEN_BUCKET_MAX_BURST;
|
||||
|
||||
bucket->rate = rate_per_sec_to_rate_per_step(rate);
|
||||
bucket->burst = burst;
|
||||
bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst);
|
||||
bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst);
|
||||
token_bucket_cfg_init(&bucket->cfg,
|
||||
rate_per_sec_to_rate_per_step(rate),
|
||||
burst);
|
||||
token_bucket_raw_adjust(&bucket->read_bucket, &bucket->cfg);
|
||||
token_bucket_raw_adjust(&bucket->write_bucket, &bucket->cfg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>.
|
||||
*/
|
||||
void
|
||||
token_bucket_reset(token_bucket_t *bucket,
|
||||
uint32_t now_ts)
|
||||
token_bucket_rw_reset(token_bucket_rw_t *bucket,
|
||||
uint32_t now_ts)
|
||||
{
|
||||
bucket->read_bucket = bucket->burst;
|
||||
bucket->write_bucket = bucket->burst;
|
||||
bucket->last_refilled_at_ts = now_ts;
|
||||
}
|
||||
|
||||
/* Helper: see token_bucket_refill */
|
||||
static int
|
||||
refill_single_bucket(int32_t *bucketptr,
|
||||
const uint32_t rate,
|
||||
const int32_t burst,
|
||||
const uint32_t elapsed_steps)
|
||||
{
|
||||
const int was_empty = (*bucketptr <= 0);
|
||||
/* The casts here prevent an underflow.
|
||||
*
|
||||
* Note that even if the bucket value is negative, subtracting it from
|
||||
* "burst" will still produce a correct result. If this result is
|
||||
* ridiculously high, then the "elapsed_steps > gap / rate" check below
|
||||
* should catch it. */
|
||||
const size_t gap = ((size_t)burst) - ((size_t)*bucketptr);
|
||||
|
||||
if (elapsed_steps > gap / rate) {
|
||||
*bucketptr = burst;
|
||||
} else {
|
||||
*bucketptr += rate * elapsed_steps;
|
||||
}
|
||||
|
||||
return was_empty && *bucketptr > 0;
|
||||
token_bucket_raw_reset(&bucket->read_bucket, &bucket->cfg);
|
||||
token_bucket_raw_reset(&bucket->write_bucket, &bucket->cfg);
|
||||
bucket->last_refilled_at_timestamp = now_ts;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -116,10 +175,11 @@ refill_single_bucket(int32_t *bucketptr,
|
|||
* nonempty, and TB_WRITE iff the write bucket was empty and became nonempty.
|
||||
*/
|
||||
int
|
||||
token_bucket_refill(token_bucket_t *bucket,
|
||||
uint32_t now_ts)
|
||||
token_bucket_rw_refill(token_bucket_rw_t *bucket,
|
||||
uint32_t now_ts)
|
||||
{
|
||||
const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts);
|
||||
const uint32_t elapsed_ticks =
|
||||
(now_ts - bucket->last_refilled_at_timestamp);
|
||||
if (elapsed_ticks > UINT32_MAX-(300*1000)) {
|
||||
/* Either about 48 days have passed since the last refill, or the
|
||||
* monotonic clock has somehow moved backwards. (We're looking at you,
|
||||
|
@ -132,34 +192,23 @@ token_bucket_refill(token_bucket_t *bucket,
|
|||
|
||||
if (!elapsed_steps) {
|
||||
/* Note that if less than one whole step elapsed, we don't advance the
|
||||
* time in last_refilled_at_ts. That's intentional: we want to make sure
|
||||
* time in last_refilled_at. That's intentional: we want to make sure
|
||||
* that we add some bytes to it eventually. */
|
||||
return 0;
|
||||
}
|
||||
|
||||
int flags = 0;
|
||||
if (refill_single_bucket(&bucket->read_bucket,
|
||||
bucket->rate, bucket->burst, elapsed_steps))
|
||||
if (token_bucket_raw_refill_steps(&bucket->read_bucket,
|
||||
&bucket->cfg, elapsed_steps))
|
||||
flags |= TB_READ;
|
||||
if (refill_single_bucket(&bucket->write_bucket,
|
||||
bucket->rate, bucket->burst, elapsed_steps))
|
||||
if (token_bucket_raw_refill_steps(&bucket->write_bucket,
|
||||
&bucket->cfg, elapsed_steps))
|
||||
flags |= TB_WRITE;
|
||||
|
||||
bucket->last_refilled_at_ts = now_ts;
|
||||
bucket->last_refilled_at_timestamp = now_ts;
|
||||
return flags;
|
||||
}
|
||||
|
||||
static int
|
||||
decrement_single_bucket(int32_t *bucketptr,
|
||||
ssize_t n)
|
||||
{
|
||||
if (BUG(n < 0))
|
||||
return 0;
|
||||
const int becomes_empty = *bucketptr > 0 && n >= *bucketptr;
|
||||
*bucketptr -= n;
|
||||
return becomes_empty;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes.
|
||||
*
|
||||
|
@ -167,10 +216,10 @@ decrement_single_bucket(int32_t *bucketptr,
|
|||
* otherwise.
|
||||
*/
|
||||
int
|
||||
token_bucket_dec_read(token_bucket_t *bucket,
|
||||
token_bucket_rw_dec_read(token_bucket_rw_t *bucket,
|
||||
ssize_t n)
|
||||
{
|
||||
return decrement_single_bucket(&bucket->read_bucket, n);
|
||||
return token_bucket_raw_dec(&bucket->read_bucket, n);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -180,20 +229,21 @@ token_bucket_dec_read(token_bucket_t *bucket,
|
|||
* otherwise.
|
||||
*/
|
||||
int
|
||||
token_bucket_dec_write(token_bucket_t *bucket,
|
||||
token_bucket_rw_dec_write(token_bucket_rw_t *bucket,
|
||||
ssize_t n)
|
||||
{
|
||||
return decrement_single_bucket(&bucket->write_bucket, n);
|
||||
return token_bucket_raw_dec(&bucket->write_bucket, n);
|
||||
}
|
||||
|
||||
/**
|
||||
* As token_bucket_dec_read and token_bucket_dec_write, in a single operation.
|
||||
* As token_bucket_rw_dec_read and token_bucket_rw_dec_write, in a single
|
||||
* operation.
|
||||
*/
|
||||
void
|
||||
token_bucket_dec(token_bucket_t *bucket,
|
||||
ssize_t n_read, ssize_t n_written)
|
||||
token_bucket_rw_dec(token_bucket_rw_t *bucket,
|
||||
ssize_t n_read, ssize_t n_written)
|
||||
{
|
||||
token_bucket_dec_read(bucket, n_read);
|
||||
token_bucket_dec_read(bucket, n_written);
|
||||
token_bucket_rw_dec_read(bucket, n_read);
|
||||
token_bucket_rw_dec_write(bucket, n_written);
|
||||
}
|
||||
|
||||
|
|
|
@ -2,8 +2,8 @@
|
|||
/* See LICENSE for licensing information */
|
||||
|
||||
/**
|
||||
* \file token_bucket.h
|
||||
* \brief Headers for token_bucket.c
|
||||
* \file token_bucket_rw.h
|
||||
* \brief Headers for token_bucket_rw.c
|
||||
**/
|
||||
|
||||
#ifndef TOR_TOKEN_BUCKET_H
|
||||
|
@ -11,55 +11,95 @@
|
|||
|
||||
#include "torint.h"
|
||||
|
||||
typedef struct token_bucket_t {
|
||||
uint32_t rate;
|
||||
int32_t burst;
|
||||
int32_t read_bucket;
|
||||
int32_t write_bucket;
|
||||
uint32_t last_refilled_at_ts;
|
||||
} token_bucket_t;
|
||||
|
||||
/** Largest allowable burst value for a token buffer. */
|
||||
#define TOKEN_BUCKET_MAX_BURST INT32_MAX
|
||||
|
||||
void token_bucket_init(token_bucket_t *bucket,
|
||||
/** A generic token buffer configuration: determines the number of tokens
|
||||
* added to the bucket in each time unit (the "rate"), and the maximum number
|
||||
* of tokens in the bucket (the "burst") */
|
||||
typedef struct token_bucket_cfg_t {
|
||||
uint32_t rate;
|
||||
int32_t burst;
|
||||
} token_bucket_cfg_t;
|
||||
|
||||
/** A raw token bucket, decoupled from its configuration and timestamp. */
|
||||
typedef struct token_bucket_raw_t {
|
||||
int32_t bucket;
|
||||
} token_bucket_raw_t;
|
||||
|
||||
void token_bucket_cfg_init(token_bucket_cfg_t *cfg,
|
||||
uint32_t rate,
|
||||
uint32_t burst);
|
||||
|
||||
void token_bucket_raw_adjust(token_bucket_raw_t *bucket,
|
||||
const token_bucket_cfg_t *cfg);
|
||||
|
||||
void token_bucket_raw_reset(token_bucket_raw_t *bucket,
|
||||
const token_bucket_cfg_t *cfg);
|
||||
|
||||
int token_bucket_raw_dec(token_bucket_raw_t *bucket,
|
||||
ssize_t n);
|
||||
|
||||
int token_bucket_raw_refill_steps(token_bucket_raw_t *bucket,
|
||||
const token_bucket_cfg_t *cfg,
|
||||
const uint32_t elapsed_steps);
|
||||
|
||||
static inline size_t token_bucket_raw_get(const token_bucket_raw_t *bucket);
|
||||
/** Return the current number of bytes set in a token bucket. */
|
||||
static inline size_t
|
||||
token_bucket_raw_get(const token_bucket_raw_t *bucket)
|
||||
{
|
||||
return bucket->bucket >= 0 ? bucket->bucket : 0;
|
||||
}
|
||||
|
||||
/** A convenience type containing all the pieces needed for a coupled
|
||||
* read-bucket and write-bucket that have the same rate limit, and which use
|
||||
* "timestamp units" (see compat_time.h) for their time. */
|
||||
typedef struct token_bucket_rw_t {
|
||||
token_bucket_cfg_t cfg;
|
||||
token_bucket_raw_t read_bucket;
|
||||
token_bucket_raw_t write_bucket;
|
||||
uint32_t last_refilled_at_timestamp;
|
||||
} token_bucket_rw_t;
|
||||
|
||||
void token_bucket_rw_init(token_bucket_rw_t *bucket,
|
||||
uint32_t rate,
|
||||
uint32_t burst,
|
||||
uint32_t now_ts);
|
||||
|
||||
void token_bucket_adjust(token_bucket_t *bucket,
|
||||
void token_bucket_rw_adjust(token_bucket_rw_t *bucket,
|
||||
uint32_t rate, uint32_t burst);
|
||||
|
||||
void token_bucket_reset(token_bucket_t *bucket,
|
||||
void token_bucket_rw_reset(token_bucket_rw_t *bucket,
|
||||
uint32_t now_ts);
|
||||
|
||||
#define TB_READ 1
|
||||
#define TB_WRITE 2
|
||||
|
||||
int token_bucket_refill(token_bucket_t *bucket,
|
||||
int token_bucket_rw_refill(token_bucket_rw_t *bucket,
|
||||
uint32_t now_ts);
|
||||
|
||||
int token_bucket_dec_read(token_bucket_t *bucket,
|
||||
int token_bucket_rw_dec_read(token_bucket_rw_t *bucket,
|
||||
ssize_t n);
|
||||
int token_bucket_dec_write(token_bucket_t *bucket,
|
||||
int token_bucket_rw_dec_write(token_bucket_rw_t *bucket,
|
||||
ssize_t n);
|
||||
|
||||
void token_bucket_dec(token_bucket_t *bucket,
|
||||
void token_bucket_rw_dec(token_bucket_rw_t *bucket,
|
||||
ssize_t n_read, ssize_t n_written);
|
||||
|
||||
static inline size_t token_bucket_get_read(const token_bucket_t *bucket);
|
||||
static inline size_t token_bucket_rw_get_read(const token_bucket_rw_t *bucket);
|
||||
static inline size_t
|
||||
token_bucket_get_read(const token_bucket_t *bucket)
|
||||
token_bucket_rw_get_read(const token_bucket_rw_t *bucket)
|
||||
{
|
||||
const ssize_t b = bucket->read_bucket;
|
||||
return b >= 0 ? b : 0;
|
||||
return token_bucket_raw_get(&bucket->read_bucket);
|
||||
}
|
||||
|
||||
static inline size_t token_bucket_get_write(const token_bucket_t *bucket);
|
||||
static inline size_t token_bucket_rw_get_write(
|
||||
const token_bucket_rw_t *bucket);
|
||||
static inline size_t
|
||||
token_bucket_get_write(const token_bucket_t *bucket)
|
||||
token_bucket_rw_get_write(const token_bucket_rw_t *bucket)
|
||||
{
|
||||
const ssize_t b = bucket->write_bucket;
|
||||
return b >= 0 ? b : 0;
|
||||
return token_bucket_raw_get(&bucket->write_bucket);
|
||||
}
|
||||
|
||||
#ifdef TOKEN_BUCKET_PRIVATE
|
||||
|
|
|
@ -2880,12 +2880,12 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
|
|||
int base = RELAY_PAYLOAD_SIZE;
|
||||
int priority = conn->type != CONN_TYPE_DIR;
|
||||
ssize_t conn_bucket = -1;
|
||||
size_t global_bucket_val = token_bucket_get_read(&global_bucket);
|
||||
size_t global_bucket_val = token_bucket_rw_get_read(&global_bucket);
|
||||
|
||||
if (connection_speaks_cells(conn)) {
|
||||
or_connection_t *or_conn = TO_OR_CONN(conn);
|
||||
if (conn->state == OR_CONN_STATE_OPEN)
|
||||
conn_bucket = token_bucket_get_read(&or_conn->bucket);
|
||||
conn_bucket = token_bucket_rw_get_read(&or_conn->bucket);
|
||||
base = get_cell_network_size(or_conn->wide_circ_ids);
|
||||
}
|
||||
|
||||
|
@ -2895,7 +2895,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
|
|||
}
|
||||
|
||||
if (connection_counts_as_relayed_traffic(conn, now)) {
|
||||
size_t relayed = token_bucket_get_read(&global_relayed_bucket);
|
||||
size_t relayed = token_bucket_rw_get_read(&global_relayed_bucket);
|
||||
global_bucket_val = MIN(global_bucket_val, relayed);
|
||||
}
|
||||
|
||||
|
@ -2910,7 +2910,7 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
|
|||
int base = RELAY_PAYLOAD_SIZE;
|
||||
int priority = conn->type != CONN_TYPE_DIR;
|
||||
size_t conn_bucket = conn->outbuf_flushlen;
|
||||
size_t global_bucket_val = token_bucket_get_write(&global_bucket);
|
||||
size_t global_bucket_val = token_bucket_rw_get_write(&global_bucket);
|
||||
|
||||
if (!connection_is_rate_limited(conn)) {
|
||||
/* be willing to write to local conns even if our buckets are empty */
|
||||
|
@ -2921,12 +2921,13 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
|
|||
/* use the per-conn write limit if it's lower */
|
||||
or_connection_t *or_conn = TO_OR_CONN(conn);
|
||||
if (conn->state == OR_CONN_STATE_OPEN)
|
||||
conn_bucket = MIN(conn_bucket, token_bucket_get_write(&or_conn->bucket));
|
||||
conn_bucket = MIN(conn_bucket,
|
||||
token_bucket_rw_get_write(&or_conn->bucket));
|
||||
base = get_cell_network_size(or_conn->wide_circ_ids);
|
||||
}
|
||||
|
||||
if (connection_counts_as_relayed_traffic(conn, now)) {
|
||||
size_t relayed = token_bucket_get_write(&global_relayed_bucket);
|
||||
size_t relayed = token_bucket_rw_get_write(&global_relayed_bucket);
|
||||
global_bucket_val = MIN(global_bucket_val, relayed);
|
||||
}
|
||||
|
||||
|
@ -2956,8 +2957,9 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
|
|||
int
|
||||
global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
|
||||
{
|
||||
size_t smaller_bucket = MIN(token_bucket_get_write(&global_bucket),
|
||||
token_bucket_get_write(&global_relayed_bucket));
|
||||
size_t smaller_bucket =
|
||||
MIN(token_bucket_rw_get_write(&global_bucket),
|
||||
token_bucket_rw_get_write(&global_relayed_bucket));
|
||||
if (authdir_mode(get_options()) && priority>1)
|
||||
return 0; /* there's always room to answer v2 if we're an auth dir */
|
||||
|
||||
|
@ -3041,12 +3043,12 @@ connection_buckets_decrement(connection_t *conn, time_t now,
|
|||
return; /* local IPs are free */
|
||||
|
||||
if (connection_counts_as_relayed_traffic(conn, now)) {
|
||||
token_bucket_dec(&global_relayed_bucket, num_read, num_written);
|
||||
token_bucket_rw_dec(&global_relayed_bucket, num_read, num_written);
|
||||
}
|
||||
token_bucket_dec(&global_bucket, num_read, num_written);
|
||||
token_bucket_rw_dec(&global_bucket, num_read, num_written);
|
||||
if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
|
||||
or_connection_t *or_conn = TO_OR_CONN(conn);
|
||||
token_bucket_dec(&or_conn->bucket, num_read, num_written);
|
||||
token_bucket_rw_dec(&or_conn->bucket, num_read, num_written);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3060,14 +3062,14 @@ connection_consider_empty_read_buckets(connection_t *conn)
|
|||
if (!connection_is_rate_limited(conn))
|
||||
return; /* Always okay. */
|
||||
|
||||
if (token_bucket_get_read(&global_bucket) <= 0) {
|
||||
if (token_bucket_rw_get_read(&global_bucket) <= 0) {
|
||||
reason = "global read bucket exhausted. Pausing.";
|
||||
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
|
||||
token_bucket_get_read(&global_relayed_bucket) <= 0) {
|
||||
token_bucket_rw_get_read(&global_relayed_bucket) <= 0) {
|
||||
reason = "global relayed read bucket exhausted. Pausing.";
|
||||
} else if (connection_speaks_cells(conn) &&
|
||||
conn->state == OR_CONN_STATE_OPEN &&
|
||||
token_bucket_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
|
||||
token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
|
||||
reason = "connection read bucket exhausted. Pausing.";
|
||||
} else
|
||||
return; /* all good, no need to stop it */
|
||||
|
@ -3087,14 +3089,14 @@ connection_consider_empty_write_buckets(connection_t *conn)
|
|||
if (!connection_is_rate_limited(conn))
|
||||
return; /* Always okay. */
|
||||
|
||||
if (token_bucket_get_write(&global_bucket) <= 0) {
|
||||
if (token_bucket_rw_get_write(&global_bucket) <= 0) {
|
||||
reason = "global write bucket exhausted. Pausing.";
|
||||
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
|
||||
token_bucket_get_write(&global_relayed_bucket) <= 0) {
|
||||
token_bucket_rw_get_write(&global_relayed_bucket) <= 0) {
|
||||
reason = "global relayed write bucket exhausted. Pausing.";
|
||||
} else if (connection_speaks_cells(conn) &&
|
||||
conn->state == OR_CONN_STATE_OPEN &&
|
||||
token_bucket_get_write(&TO_OR_CONN(conn)->bucket) <= 0) {
|
||||
token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) <= 0) {
|
||||
reason = "connection write bucket exhausted. Pausing.";
|
||||
} else
|
||||
return; /* all good, no need to stop it */
|
||||
|
@ -3111,17 +3113,17 @@ connection_bucket_init(void)
|
|||
{
|
||||
const or_options_t *options = get_options();
|
||||
const uint32_t now_ts = monotime_coarse_get_stamp();
|
||||
token_bucket_init(&global_bucket,
|
||||
token_bucket_rw_init(&global_bucket,
|
||||
(int32_t)options->BandwidthRate,
|
||||
(int32_t)options->BandwidthBurst,
|
||||
now_ts);
|
||||
if (options->RelayBandwidthRate) {
|
||||
token_bucket_init(&global_relayed_bucket,
|
||||
token_bucket_rw_init(&global_relayed_bucket,
|
||||
(int32_t)options->RelayBandwidthRate,
|
||||
(int32_t)options->RelayBandwidthBurst,
|
||||
now_ts);
|
||||
} else {
|
||||
token_bucket_init(&global_relayed_bucket,
|
||||
token_bucket_rw_init(&global_relayed_bucket,
|
||||
(int32_t)options->BandwidthRate,
|
||||
(int32_t)options->BandwidthBurst,
|
||||
now_ts);
|
||||
|
@ -3132,15 +3134,15 @@ connection_bucket_init(void)
|
|||
void
|
||||
connection_bucket_adjust(const or_options_t *options)
|
||||
{
|
||||
token_bucket_adjust(&global_bucket,
|
||||
token_bucket_rw_adjust(&global_bucket,
|
||||
(int32_t)options->BandwidthRate,
|
||||
(int32_t)options->BandwidthBurst);
|
||||
if (options->RelayBandwidthRate) {
|
||||
token_bucket_adjust(&global_relayed_bucket,
|
||||
token_bucket_rw_adjust(&global_relayed_bucket,
|
||||
(int32_t)options->RelayBandwidthRate,
|
||||
(int32_t)options->RelayBandwidthBurst);
|
||||
} else {
|
||||
token_bucket_adjust(&global_relayed_bucket,
|
||||
token_bucket_rw_adjust(&global_relayed_bucket,
|
||||
(int32_t)options->BandwidthRate,
|
||||
(int32_t)options->BandwidthBurst);
|
||||
}
|
||||
|
@ -3153,12 +3155,12 @@ connection_bucket_refill(time_t now, uint32_t now_ts)
|
|||
smartlist_t *conns = get_connection_array();
|
||||
|
||||
write_buckets_empty_last_second =
|
||||
token_bucket_get_write(&global_bucket) <= 0 ||
|
||||
token_bucket_get_write(&global_relayed_bucket) <= 0;
|
||||
token_bucket_rw_get_write(&global_bucket) <= 0 ||
|
||||
token_bucket_rw_get_write(&global_relayed_bucket) <= 0;
|
||||
|
||||
/* refill the global buckets */
|
||||
token_bucket_refill(&global_bucket, now_ts);
|
||||
token_bucket_refill(&global_relayed_bucket, now_ts);
|
||||
token_bucket_rw_refill(&global_bucket, now_ts);
|
||||
token_bucket_rw_refill(&global_relayed_bucket, now_ts);
|
||||
|
||||
/* refill the per-connection buckets */
|
||||
SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
|
||||
|
@ -3166,17 +3168,17 @@ connection_bucket_refill(time_t now, uint32_t now_ts)
|
|||
or_connection_t *or_conn = TO_OR_CONN(conn);
|
||||
|
||||
if (conn->state == OR_CONN_STATE_OPEN) {
|
||||
token_bucket_refill(&or_conn->bucket, now_ts);
|
||||
token_bucket_rw_refill(&or_conn->bucket, now_ts);
|
||||
}
|
||||
}
|
||||
|
||||
if (conn->read_blocked_on_bw == 1 /* marked to turn reading back on now */
|
||||
&& token_bucket_get_read(&global_bucket) > 0 /* and we can read */
|
||||
&& token_bucket_rw_get_read(&global_bucket) > 0 /* and we can read */
|
||||
&& (!connection_counts_as_relayed_traffic(conn, now) ||
|
||||
token_bucket_get_read(&global_relayed_bucket) > 0)
|
||||
token_bucket_rw_get_read(&global_relayed_bucket) > 0)
|
||||
&& (!connection_speaks_cells(conn) ||
|
||||
conn->state != OR_CONN_STATE_OPEN ||
|
||||
token_bucket_get_read(&TO_OR_CONN(conn)->bucket) > 0)) {
|
||||
token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) > 0)) {
|
||||
/* and either a non-cell conn or a cell conn with non-empty bucket */
|
||||
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
|
||||
"waking up conn (fd %d) for read", (int)conn->s));
|
||||
|
@ -3185,12 +3187,12 @@ connection_bucket_refill(time_t now, uint32_t now_ts)
|
|||
}
|
||||
|
||||
if (conn->write_blocked_on_bw == 1
|
||||
&& token_bucket_get_write(&global_bucket) > 0 /* and we can write */
|
||||
&& token_bucket_rw_get_write(&global_bucket) > 0 /* and we can write */
|
||||
&& (!connection_counts_as_relayed_traffic(conn, now) ||
|
||||
token_bucket_get_write(&global_relayed_bucket) > 0)
|
||||
token_bucket_rw_get_write(&global_relayed_bucket) > 0)
|
||||
&& (!connection_speaks_cells(conn) ||
|
||||
conn->state != OR_CONN_STATE_OPEN ||
|
||||
token_bucket_get_write(&TO_OR_CONN(conn)->bucket) > 0)) {
|
||||
token_bucket_rw_get_write(&TO_OR_CONN(conn)->bucket) > 0)) {
|
||||
LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
|
||||
"waking up conn (fd %d) for write", (int)conn->s));
|
||||
conn->write_blocked_on_bw = 0;
|
||||
|
|
|
@ -793,9 +793,9 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
|
|||
(int)options->BandwidthBurst, 1, INT32_MAX);
|
||||
}
|
||||
|
||||
token_bucket_adjust(&conn->bucket, rate, burst);
|
||||
token_bucket_rw_adjust(&conn->bucket, rate, burst);
|
||||
if (reset) {
|
||||
token_bucket_reset(&conn->bucket, monotime_coarse_get_stamp());
|
||||
token_bucket_rw_reset(&conn->bucket, monotime_coarse_get_stamp());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -154,10 +154,10 @@ static void shutdown_did_not_work_callback(evutil_socket_t fd, short event,
|
|||
/********* START VARIABLES **********/
|
||||
|
||||
/* Token bucket for all traffic. */
|
||||
token_bucket_t global_bucket;
|
||||
token_bucket_rw_t global_bucket;
|
||||
|
||||
/* Token bucket for relayed traffic. */
|
||||
token_bucket_t global_relayed_bucket;
|
||||
token_bucket_rw_t global_relayed_bucket;
|
||||
|
||||
/** What was the read/write bucket before the last second_elapsed_callback()
|
||||
* call? (used to determine how many bytes we've read). */
|
||||
|
@ -2394,9 +2394,9 @@ refill_callback(periodic_timer_t *timer, void *arg)
|
|||
}
|
||||
|
||||
bytes_written = stats_prev_global_write_bucket -
|
||||
token_bucket_get_write(&global_bucket);
|
||||
token_bucket_rw_get_write(&global_bucket);
|
||||
bytes_read = stats_prev_global_read_bucket -
|
||||
token_bucket_get_read(&global_bucket);
|
||||
token_bucket_rw_get_read(&global_bucket);
|
||||
|
||||
stats_n_bytes_read += bytes_read;
|
||||
stats_n_bytes_written += bytes_written;
|
||||
|
@ -2408,8 +2408,8 @@ refill_callback(periodic_timer_t *timer, void *arg)
|
|||
monotime_coarse_get_stamp());
|
||||
}
|
||||
|
||||
stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket);
|
||||
stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket);
|
||||
stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket);
|
||||
stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket);
|
||||
|
||||
/* remember what time it is, for next time */
|
||||
refill_timer_current_millisecond = now;
|
||||
|
@ -2618,8 +2618,8 @@ do_main_loop(void)
|
|||
|
||||
/* Set up our buckets */
|
||||
connection_bucket_init();
|
||||
stats_prev_global_read_bucket = token_bucket_get_read(&global_bucket);
|
||||
stats_prev_global_write_bucket = token_bucket_get_write(&global_bucket);
|
||||
stats_prev_global_read_bucket = token_bucket_rw_get_read(&global_bucket);
|
||||
stats_prev_global_write_bucket = token_bucket_rw_get_write(&global_bucket);
|
||||
|
||||
/* initialize the bootstrap status events to know we're starting up */
|
||||
control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
|
||||
|
|
|
@ -88,8 +88,8 @@ uint64_t get_main_loop_idle_count(void);
|
|||
|
||||
extern time_t time_of_process_start;
|
||||
extern int quiet_level;
|
||||
extern token_bucket_t global_bucket;
|
||||
extern token_bucket_t global_relayed_bucket;
|
||||
extern token_bucket_rw_t global_bucket;
|
||||
extern token_bucket_rw_t global_relayed_bucket;
|
||||
|
||||
#ifdef MAIN_PRIVATE
|
||||
STATIC void init_connection_lists(void);
|
||||
|
|
|
@ -1661,7 +1661,7 @@ typedef struct or_connection_t {
|
|||
|
||||
time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/
|
||||
|
||||
token_bucket_t bucket; /**< Used for rate limiting when the connection is
|
||||
token_bucket_rw_t bucket; /**< Used for rate limiting when the connection is
|
||||
* in state CONN_OPEN. */
|
||||
|
||||
/*
|
||||
|
|
|
@ -21,23 +21,23 @@ static void
|
|||
test_bwmgt_token_buf_init(void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
token_bucket_t b;
|
||||
token_bucket_rw_t b;
|
||||
|
||||
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
|
||||
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
|
||||
// Burst is correct
|
||||
tt_uint_op(b.burst, OP_EQ, 64*KB);
|
||||
tt_uint_op(b.cfg.burst, OP_EQ, 64*KB);
|
||||
// Rate is correct, within 1 percent.
|
||||
{
|
||||
uint32_t ticks_per_sec =
|
||||
(uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000);
|
||||
uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP);
|
||||
uint32_t rate_per_sec = (b.cfg.rate * ticks_per_sec / TICKS_PER_STEP);
|
||||
|
||||
tt_uint_op(rate_per_sec, OP_GT, 16*KB-160);
|
||||
tt_uint_op(rate_per_sec, OP_LT, 16*KB+160);
|
||||
}
|
||||
// Bucket starts out full:
|
||||
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS);
|
||||
tt_int_op(b.read_bucket, OP_EQ, 64*KB);
|
||||
tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS);
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, 64*KB);
|
||||
|
||||
done:
|
||||
;
|
||||
|
@ -47,35 +47,35 @@ static void
|
|||
test_bwmgt_token_buf_adjust(void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
token_bucket_t b;
|
||||
token_bucket_rw_t b;
|
||||
|
||||
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
|
||||
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
|
||||
|
||||
uint32_t rate_orig = b.rate;
|
||||
uint32_t rate_orig = b.cfg.rate;
|
||||
// Increasing burst
|
||||
token_bucket_adjust(&b, 16*KB, 128*KB);
|
||||
tt_uint_op(b.rate, OP_EQ, rate_orig);
|
||||
tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
|
||||
tt_uint_op(b.burst, OP_EQ, 128*KB);
|
||||
token_bucket_rw_adjust(&b, 16*KB, 128*KB);
|
||||
tt_uint_op(b.cfg.rate, OP_EQ, rate_orig);
|
||||
tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB);
|
||||
tt_uint_op(b.cfg.burst, OP_EQ, 128*KB);
|
||||
|
||||
// Decreasing burst but staying above bucket
|
||||
token_bucket_adjust(&b, 16*KB, 96*KB);
|
||||
tt_uint_op(b.rate, OP_EQ, rate_orig);
|
||||
tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
|
||||
tt_uint_op(b.burst, OP_EQ, 96*KB);
|
||||
token_bucket_rw_adjust(&b, 16*KB, 96*KB);
|
||||
tt_uint_op(b.cfg.rate, OP_EQ, rate_orig);
|
||||
tt_uint_op(b.read_bucket.bucket, OP_EQ, 64*KB);
|
||||
tt_uint_op(b.cfg.burst, OP_EQ, 96*KB);
|
||||
|
||||
// Decreasing burst below bucket,
|
||||
token_bucket_adjust(&b, 16*KB, 48*KB);
|
||||
tt_uint_op(b.rate, OP_EQ, rate_orig);
|
||||
tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
|
||||
tt_uint_op(b.burst, OP_EQ, 48*KB);
|
||||
token_bucket_rw_adjust(&b, 16*KB, 48*KB);
|
||||
tt_uint_op(b.cfg.rate, OP_EQ, rate_orig);
|
||||
tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB);
|
||||
tt_uint_op(b.cfg.burst, OP_EQ, 48*KB);
|
||||
|
||||
// Changing rate.
|
||||
token_bucket_adjust(&b, 32*KB, 48*KB);
|
||||
tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10);
|
||||
tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10);
|
||||
tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
|
||||
tt_uint_op(b.burst, OP_EQ, 48*KB);
|
||||
token_bucket_rw_adjust(&b, 32*KB, 48*KB);
|
||||
tt_uint_op(b.cfg.rate, OP_GE, rate_orig*2 - 10);
|
||||
tt_uint_op(b.cfg.rate, OP_LE, rate_orig*2 + 10);
|
||||
tt_uint_op(b.read_bucket.bucket, OP_EQ, 48*KB);
|
||||
tt_uint_op(b.cfg.burst, OP_EQ, 48*KB);
|
||||
|
||||
done:
|
||||
;
|
||||
|
@ -85,34 +85,34 @@ static void
|
|||
test_bwmgt_token_buf_dec(void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
token_bucket_t b;
|
||||
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
|
||||
token_bucket_rw_t b;
|
||||
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
|
||||
|
||||
// full-to-not-full.
|
||||
tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB));
|
||||
tt_int_op(b.read_bucket, OP_EQ, 63*KB);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, KB));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, 63*KB);
|
||||
|
||||
// Full to almost-not-full
|
||||
tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1));
|
||||
tt_int_op(b.read_bucket, OP_EQ, 1);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 63*KB - 1));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, 1);
|
||||
|
||||
// almost-not-full to empty.
|
||||
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1));
|
||||
tt_int_op(b.read_bucket, OP_EQ, 0);
|
||||
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 1));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, 0);
|
||||
|
||||
// reset bucket, try full-to-empty
|
||||
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
|
||||
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB));
|
||||
tt_int_op(b.read_bucket, OP_EQ, 0);
|
||||
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
|
||||
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, 0);
|
||||
|
||||
// reset bucket, try underflow.
|
||||
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
|
||||
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1));
|
||||
tt_int_op(b.read_bucket, OP_EQ, -1);
|
||||
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
|
||||
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, 64*KB + 1));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, -1);
|
||||
|
||||
// A second underflow does not make the bucket empty.
|
||||
tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000));
|
||||
tt_int_op(b.read_bucket, OP_EQ, -1001);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_dec_read(&b, 1000));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, -1001);
|
||||
|
||||
done:
|
||||
;
|
||||
|
@ -122,71 +122,71 @@ static void
|
|||
test_bwmgt_token_buf_refill(void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
token_bucket_t b;
|
||||
token_bucket_rw_t b;
|
||||
const uint32_t SEC =
|
||||
(uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000);
|
||||
printf("%d\n", (int)SEC);
|
||||
token_bucket_init(&b, 16*KB, 64*KB, START_TS);
|
||||
token_bucket_rw_init(&b, 16*KB, 64*KB, START_TS);
|
||||
|
||||
/* Make the buffer much emptier, then let one second elapse. */
|
||||
token_bucket_dec_read(&b, 48*KB);
|
||||
tt_int_op(b.read_bucket, OP_EQ, 16*KB);
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC));
|
||||
tt_int_op(b.read_bucket, OP_GT, 32*KB - 300);
|
||||
tt_int_op(b.read_bucket, OP_LT, 32*KB + 300);
|
||||
token_bucket_rw_dec_read(&b, 48*KB);
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, 16*KB);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC));
|
||||
tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB - 300);
|
||||
tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB + 300);
|
||||
|
||||
/* Another half second. */
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
|
||||
tt_int_op(b.read_bucket, OP_GT, 40*KB - 400);
|
||||
tt_int_op(b.read_bucket, OP_LT, 40*KB + 400);
|
||||
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2));
|
||||
tt_int_op(b.read_bucket.bucket, OP_GT, 40*KB - 400);
|
||||
tt_int_op(b.read_bucket.bucket, OP_LT, 40*KB + 400);
|
||||
tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS + SEC*3/2);
|
||||
|
||||
/* No time: nothing happens. */
|
||||
{
|
||||
const uint32_t bucket_orig = b.read_bucket;
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
|
||||
tt_int_op(b.read_bucket, OP_EQ, bucket_orig);
|
||||
const uint32_t bucket_orig = b.read_bucket.bucket;
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, bucket_orig);
|
||||
}
|
||||
|
||||
/* Another 30 seconds: fill the bucket. */
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30));
|
||||
tt_int_op(b.read_bucket, OP_EQ, b.burst);
|
||||
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*30));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst);
|
||||
tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS + SEC*3/2 + SEC*30);
|
||||
|
||||
/* Another 30 seconds: nothing happens. */
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60));
|
||||
tt_int_op(b.read_bucket, OP_EQ, b.burst);
|
||||
tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*60));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst);
|
||||
tt_uint_op(b.last_refilled_at_timestamp, OP_EQ, START_TS + SEC*3/2 + SEC*60);
|
||||
|
||||
/* Empty the bucket, let two seconds pass, and make sure that a refill is
|
||||
* noticed. */
|
||||
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst));
|
||||
tt_int_op(0, OP_EQ, b.read_bucket);
|
||||
tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61));
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62));
|
||||
tt_int_op(b.read_bucket, OP_GT, 32*KB-400);
|
||||
tt_int_op(b.read_bucket, OP_LT, 32*KB+400);
|
||||
tt_int_op(1, OP_EQ, token_bucket_rw_dec_read(&b, b.cfg.burst));
|
||||
tt_int_op(0, OP_EQ, b.read_bucket.bucket);
|
||||
tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*61));
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*3/2 + SEC*62));
|
||||
tt_int_op(b.read_bucket.bucket, OP_GT, 32*KB-400);
|
||||
tt_int_op(b.read_bucket.bucket, OP_LT, 32*KB+400);
|
||||
|
||||
/* Underflow the bucket, make sure we detect when it has tokens again. */
|
||||
tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB));
|
||||
tt_int_op(-16*KB, OP_EQ, b.read_bucket);
|
||||
tt_int_op(1, OP_EQ,
|
||||
token_bucket_rw_dec_read(&b, b.read_bucket.bucket+16*KB));
|
||||
tt_int_op(-16*KB, OP_EQ, b.read_bucket.bucket);
|
||||
// half a second passes...
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
|
||||
tt_int_op(b.read_bucket, OP_GT, -8*KB-300);
|
||||
tt_int_op(b.read_bucket, OP_LT, -8*KB+300);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64));
|
||||
tt_int_op(b.read_bucket.bucket, OP_GT, -8*KB-300);
|
||||
tt_int_op(b.read_bucket.bucket, OP_LT, -8*KB+300);
|
||||
// a second passes
|
||||
tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65));
|
||||
tt_int_op(b.read_bucket, OP_GT, 8*KB-400);
|
||||
tt_int_op(b.read_bucket, OP_LT, 8*KB+400);
|
||||
tt_int_op(1, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*65));
|
||||
tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400);
|
||||
tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400);
|
||||
|
||||
// We step a second backwards, and nothing happens.
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
|
||||
tt_int_op(b.read_bucket, OP_GT, 8*KB-400);
|
||||
tt_int_op(b.read_bucket, OP_LT, 8*KB+400);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, START_TS + SEC*64));
|
||||
tt_int_op(b.read_bucket.bucket, OP_GT, 8*KB-400);
|
||||
tt_int_op(b.read_bucket.bucket, OP_LT, 8*KB+400);
|
||||
|
||||
// A ridiculous amount of time passes.
|
||||
tt_int_op(0, OP_EQ, token_bucket_refill(&b, INT32_MAX));
|
||||
tt_int_op(b.read_bucket, OP_EQ, b.burst);
|
||||
tt_int_op(0, OP_EQ, token_bucket_rw_refill(&b, INT32_MAX));
|
||||
tt_int_op(b.read_bucket.bucket, OP_EQ, b.cfg.burst);
|
||||
|
||||
done:
|
||||
;
|
||||
|
|
Loading…
Reference in New Issue