Add support for multi-priority workqueues

Each piece of queued work now has an associated priority value; each
priority goes on a separate queue.

With probability (N-1)/N, the workers will take work from the highest
priority nonempty queue.  Otherwise, they'll look for work in a
queue of lower priority.  This behavior is meant to prevent
starvation for lower-priority tasks.
This commit is contained in:
Nick Mathewson 2017-07-12 11:47:01 -04:00
parent 5636b160d4
commit 10e0bff4ca
4 changed files with 126 additions and 15 deletions

5
changes/multi-priority Normal file
View File

@ -0,0 +1,5 @@
o Minor features (relay, thread pool):
- Allow background work to be queued with different priorities, so
that a big pile of slow low-priority jobs will not starve out
higher priority jobs. This lays the groundwork for a fix for bug
22883.

View File

@ -25,11 +25,19 @@
#include "orconfig.h"
#include "compat.h"
#include "compat_threads.h"
#include "crypto.h"
#include "util.h"
#include "workqueue.h"
#include "tor_queue.h"
#include "torlog.h"
#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s);
typedef struct work_tailq_t work_tailq_t;
struct threadpool_s {
/** An array of pointers to workerthread_t: one for each running worker
* thread. */
@ -38,8 +46,12 @@ struct threadpool_s {
/** Condition variable that we wait on when we have no work, and which
* gets signaled when our queue becomes nonempty. */
tor_cond_t condition;
/** Queue of pending work that we have to do. */
TOR_TAILQ_HEAD(, workqueue_entry_s) work;
/** Queues of pending work that we have to do. The queue with priority
* <b>p</b> is work[p]. */
work_tailq_t work[WORKQUEUE_N_PRIORITIES];
/** Weak RNG, used to decide when to ignore priority. */
tor_weak_rng_t weak_rng;
/** The current 'update generation' of the threadpool. Any thread that is
* at an earlier generation needs to run the update function. */
@ -66,6 +78,11 @@ struct threadpool_s {
void *new_thread_state_arg;
};
/** Used to put a workqueue_priority_t value into a bitfield. */
#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
/** Number of bits needed to hold all legal values of workqueue_priority_t */
#define WORKQUEUE_PRIORITY_BITS 2
struct workqueue_entry_s {
/** The next workqueue_entry_t that's pending on the same thread or
* reply queue. */
@ -76,6 +93,8 @@ struct workqueue_entry_s {
struct threadpool_s *on_pool;
/** True iff this entry is waiting for a worker to start processing it. */
uint8_t pending;
/** Priority of this entry. */
workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
/** Function to run in the worker thread. */
workqueue_reply_t (*fn)(void *state, void *arg);
/** Function to run while processing the reply queue. */
@ -125,6 +144,7 @@ workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
ent->fn = fn;
ent->reply_fn = reply_fn;
ent->arg = arg;
ent->priority = WQ_PRI_HIGH;
return ent;
}
@ -161,8 +181,9 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
int cancelled = 0;
void *result = NULL;
tor_mutex_acquire(&ent->on_pool->lock);
workqueue_priority_t prio = ent->priority;
if (ent->pending) {
TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work);
TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
cancelled = 1;
result = ent->arg;
}
@ -180,8 +201,50 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
static int
worker_thread_has_work(workerthread_t *thread)
{
return !TOR_TAILQ_EMPTY(&thread->in_pool->work) ||
thread->generation != thread->in_pool->generation;
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
return 1;
}
return thread->generation != thread->in_pool->generation;
}
/** With this probability, we'll consider taking work from a lower-priority
* queue when we already have higher-priority work. This keeps priority from
* becoming completely inverted. */
#define LOWER_PRIORITY_CHANCE 37
/** Extract the next workqueue_entry_t from the the thread's pool, removing
* it from the relevant queues and marking it as non-pending.
*
* The caller must hold the lock. */
static workqueue_entry_t *
worker_thread_extract_next_work(workerthread_t *thread)
{
threadpool_t *pool = thread->in_pool;
work_tailq_t *queue = NULL, *this_queue;
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
this_queue = &pool->work[i];
if (!TOR_TAILQ_EMPTY(this_queue)) {
queue = this_queue;
if (! tor_weak_random_one_in_n(&pool->weak_rng, LOWER_PRIORITY_CHANCE)) {
/* Usually we'll just break now, so that we can get out of the loop
* and use the queue where we found work. But with a small
* probability, we'll keep looking for lower priority work, so that
* we don't ignore our low-priority queues entirely. */
break;
}
}
}
if (queue == NULL)
return NULL;
workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
TOR_TAILQ_REMOVE(queue, work, next_work);
work->pending = 0;
return work;
}
/**
@ -217,9 +280,9 @@ worker_thread_main(void *thread_)
tor_mutex_acquire(&pool->lock);
continue;
}
work = TOR_TAILQ_FIRST(&pool->work);
TOR_TAILQ_REMOVE(&pool->work, work, next_work);
work->pending = 0;
work = worker_thread_extract_next_work(thread);
if (BUG(work == NULL))
break;
tor_mutex_release(&pool->lock);
/* We run the work function without holding the thread lock. This
@ -301,22 +364,31 @@ workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
* On success, return a workqueue_entry_t object that can be passed to
* workqueue_entry_cancel(). On failure, return NULL.
*
* Items are executed in a loose priority order -- each thread will usually
* take from the queued work with the highest prioirity, but will occasionally
* visit lower-priority queues to keep them from starving completely.
*
* Note that because each thread has its own work queue, work items may not
* be executed strictly in order.
*/
workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
threadpool_queue_work_priority(threadpool_t *pool,
workqueue_priority_t prio,
workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
tor_assert(prio >= WORKQUEUE_PRIORITY_FIRST &&
prio <= WORKQUEUE_PRIORITY_LAST);
workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
ent->on_pool = pool;
ent->pending = 1;
ent->priority = prio;
tor_mutex_acquire(&pool->lock);
TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work);
TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
tor_cond_signal_one(&pool->condition);
@ -325,6 +397,16 @@ threadpool_queue_work(threadpool_t *pool,
return ent;
}
/** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
}
/**
* Queue a copy of a work item for every thread in a pool. This can be used,
* for example, to tell the threads to update some parameter in their states.
@ -441,7 +523,15 @@ threadpool_new(int n_threads,
pool = tor_malloc_zero(sizeof(threadpool_t));
tor_mutex_init_nonrecursive(&pool->lock);
tor_cond_init(&pool->condition);
TOR_TAILQ_INIT(&pool->work);
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
TOR_TAILQ_INIT(&pool->work[i]);
}
{
unsigned seed;
crypto_rand((void*)&seed, sizeof(seed));
tor_init_weak_random(&pool->weak_rng, seed);
}
pool->new_thread_state_fn = new_thread_state_fn;
pool->new_thread_state_arg = arg;

View File

@ -22,6 +22,20 @@ typedef enum workqueue_reply_t {
WQ_RPL_SHUTDOWN = 2, /** indicates thread is shutting down */
} workqueue_reply_t;
/** Possible priorities for work. Lower numeric values are more important. */
typedef enum workqueue_priority_t {
WQ_PRI_HIGH = 0,
WQ_PRI_MED = 1,
WQ_PRI_LOW = 2,
} workqueue_priority_t;
workqueue_entry_t *threadpool_queue_work_priority(threadpool_t *pool,
workqueue_priority_t prio,
workqueue_reply_t (*fn)(void *,
void *),
void (*reply_fn)(void *),
void *arg);
workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
workqueue_reply_t (*fn)(void *,
void *),

View File

@ -200,7 +200,9 @@ add_work(threadpool_t *tp)
crypto_rand((char*)w->msg, 20);
w->msglen = 20;
++rsa_sent;
return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w);
return threadpool_queue_work_priority(tp,
WQ_PRI_MED,
workqueue_do_rsa, handle_reply, w);
} else {
ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
w->serial = n_sent++;