aio: introduce aio_co_schedule and aio_co_wake

aio_co_wake provides the infrastructure to start a coroutine on a "home"
AioContext.  It will be used by CoMutex and CoQueue, so that coroutines
don't jump from one context to another when they go to sleep on a
mutex or waitqueue.  However, it can also be used as a more efficient
alternative to one-shot bottom halves, and saves the effort of tracking
which AioContext a coroutine is running on.

aio_co_schedule is the part of aio_co_wake that starts a coroutine
on a remove AioContext, but it is also useful to implement e.g.
bdrv_set_aio_context callbacks.

The implementation of aio_co_schedule is based on a lock-free
multiple-producer, single-consumer queue.  The multiple producers use
cmpxchg to add to a LIFO stack.  The consumer (a per-AioContext bottom
half) grabs all items added so far, inverts the list to make it FIFO,
and goes through it one item at a time until it's empty.  The data
structure was inspired by OSv, which uses it in the very code we'll
"port" to QEMU for the thread-safe CoMutex.

Most of the new code is really tests.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Message-id: 20170213135235.12274-3-pbonzini@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
Paolo Bonzini 2017-02-13 14:52:19 +01:00 committed by Stefan Hajnoczi
parent c2b38b277a
commit 0c330a734b
9 changed files with 453 additions and 4 deletions

View file

@ -47,6 +47,7 @@ typedef void QEMUBHFunc(void *opaque);
typedef bool AioPollFn(void *opaque);
typedef void IOHandler(void *opaque);
struct Coroutine;
struct ThreadPool;
struct LinuxAioState;
@ -108,6 +109,9 @@ struct AioContext {
bool notified;
EventNotifier notifier;
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
QEMUBH *co_schedule_bh;
/* Thread pool for performing work and receiving completion callbacks.
* Has its own locking.
*/
@ -482,6 +486,34 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
return !is_external || !atomic_read(&ctx->external_disable_cnt);
}
/**
* aio_co_schedule:
* @ctx: the aio context
* @co: the coroutine
*
* Start a coroutine on a remote AioContext.
*
* The coroutine must not be entered by anyone else while aio_co_schedule()
* is active. In addition the coroutine must have yielded unless ctx
* is the context in which the coroutine is running (i.e. the value of
* qemu_get_current_aio_context() from the coroutine itself).
*/
void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
/**
* aio_co_wake:
* @co: the coroutine
*
* Restart a coroutine on the AioContext where it was running last, thus
* preventing coroutines from jumping from one context to another when they
* go to sleep.
*
* aio_co_wake may be executed either in coroutine or non-coroutine
* context. The coroutine must not be entered by anyone else while
* aio_co_wake() is active.
*/
void aio_co_wake(struct Coroutine *co);
/**
* Return the AioContext whose event loop runs in the current thread.
*

View file

@ -40,12 +40,21 @@ struct Coroutine {
CoroutineEntry *entry;
void *entry_arg;
Coroutine *caller;
/* Only used when the coroutine has terminated. */
QSLIST_ENTRY(Coroutine) pool_next;
size_t locks_held;
/* Coroutines that should be woken up when we yield or terminate */
/* Coroutines that should be woken up when we yield or terminate.
* Only used when the coroutine is running.
*/
QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
/* Only used when the coroutine has yielded. */
AioContext *ctx;
QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
QSLIST_ENTRY(Coroutine) co_scheduled_next;
};
Coroutine *qemu_coroutine_new(void);

View file

@ -48,9 +48,10 @@ check-unit-y += tests/test-aio$(EXESUF)
gcov-files-test-aio-y = util/async.c util/qemu-timer.o
gcov-files-test-aio-$(CONFIG_WIN32) += util/aio-win32.c
gcov-files-test-aio-$(CONFIG_POSIX) += util/aio-posix.c
check-unit-y += tests/test-aio-multithread$(EXESUF)
gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
check-unit-y += tests/test-throttle$(EXESUF)
gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
check-unit-y += tests/test-thread-pool$(EXESUF)
gcov-files-test-thread-pool-y = thread-pool.c
gcov-files-test-hbitmap-y = util/hbitmap.c
@ -508,7 +509,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
$(test-qom-obj-y)
test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
test-block-obj-y = $(block-obj-y) $(test-io-obj-y)
test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o
tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
@ -523,6 +524,7 @@ tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
tests/test-char$(EXESUF): tests/test-char.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y) $(chardev-obj-y)
tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)

91
tests/iothread.c Normal file
View file

@ -0,0 +1,91 @@
/*
* Event loop thread implementation for unit tests
*
* Copyright Red Hat Inc., 2013, 2016
*
* Authors:
* Stefan Hajnoczi <stefanha@redhat.com>
* Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*
*/
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "block/aio.h"
#include "qemu/main-loop.h"
#include "qemu/rcu.h"
#include "iothread.h"
struct IOThread {
AioContext *ctx;
QemuThread thread;
QemuMutex init_done_lock;
QemuCond init_done_cond; /* is thread initialization done? */
bool stopping;
};
static __thread IOThread *my_iothread;
AioContext *qemu_get_current_aio_context(void)
{
return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
}
static void *iothread_run(void *opaque)
{
IOThread *iothread = opaque;
rcu_register_thread();
my_iothread = iothread;
qemu_mutex_lock(&iothread->init_done_lock);
iothread->ctx = aio_context_new(&error_abort);
qemu_cond_signal(&iothread->init_done_cond);
qemu_mutex_unlock(&iothread->init_done_lock);
while (!atomic_read(&iothread->stopping)) {
aio_poll(iothread->ctx, true);
}
rcu_unregister_thread();
return NULL;
}
void iothread_join(IOThread *iothread)
{
iothread->stopping = true;
aio_notify(iothread->ctx);
qemu_thread_join(&iothread->thread);
qemu_cond_destroy(&iothread->init_done_cond);
qemu_mutex_destroy(&iothread->init_done_lock);
aio_context_unref(iothread->ctx);
g_free(iothread);
}
IOThread *iothread_new(void)
{
IOThread *iothread = g_new0(IOThread, 1);
qemu_mutex_init(&iothread->init_done_lock);
qemu_cond_init(&iothread->init_done_cond);
qemu_thread_create(&iothread->thread, NULL, iothread_run,
iothread, QEMU_THREAD_JOINABLE);
/* Wait for initialization to complete */
qemu_mutex_lock(&iothread->init_done_lock);
while (iothread->ctx == NULL) {
qemu_cond_wait(&iothread->init_done_cond,
&iothread->init_done_lock);
}
qemu_mutex_unlock(&iothread->init_done_lock);
return iothread;
}
AioContext *iothread_get_aio_context(IOThread *iothread)
{
return iothread->ctx;
}

25
tests/iothread.h Normal file
View file

@ -0,0 +1,25 @@
/*
* Event loop thread implementation for unit tests
*
* Copyright Red Hat Inc., 2013, 2016
*
* Authors:
* Stefan Hajnoczi <stefanha@redhat.com>
* Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#ifndef TEST_IOTHREAD_H
#define TEST_IOTHREAD_H
#include "block/aio.h"
#include "qemu/thread.h"
typedef struct IOThread IOThread;
IOThread *iothread_new(void);
void iothread_join(IOThread *iothread);
AioContext *iothread_get_aio_context(IOThread *iothread);
#endif

View file

@ -0,0 +1,213 @@
/*
* AioContext multithreading tests
*
* Copyright Red Hat, Inc. 2016
*
* Authors:
* Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
* See the COPYING.LIB file in the top-level directory.
*/
#include "qemu/osdep.h"
#include <glib.h>
#include "block/aio.h"
#include "qapi/error.h"
#include "qemu/coroutine.h"
#include "qemu/thread.h"
#include "qemu/error-report.h"
#include "iothread.h"
/* AioContext management */
#define NUM_CONTEXTS 5
static IOThread *threads[NUM_CONTEXTS];
static AioContext *ctx[NUM_CONTEXTS];
static __thread int id = -1;
static QemuEvent done_event;
/* Run a function synchronously on a remote iothread. */
typedef struct CtxRunData {
QEMUBHFunc *cb;
void *arg;
} CtxRunData;
static void ctx_run_bh_cb(void *opaque)
{
CtxRunData *data = opaque;
data->cb(data->arg);
qemu_event_set(&done_event);
}
static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
{
CtxRunData data = {
.cb = cb,
.arg = opaque
};
qemu_event_reset(&done_event);
aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
qemu_event_wait(&done_event);
}
/* Starting the iothreads. */
static void set_id_cb(void *opaque)
{
int *i = opaque;
id = *i;
}
static void create_aio_contexts(void)
{
int i;
for (i = 0; i < NUM_CONTEXTS; i++) {
threads[i] = iothread_new();
ctx[i] = iothread_get_aio_context(threads[i]);
}
qemu_event_init(&done_event, false);
for (i = 0; i < NUM_CONTEXTS; i++) {
ctx_run(i, set_id_cb, &i);
}
}
/* Stopping the iothreads. */
static void join_aio_contexts(void)
{
int i;
for (i = 0; i < NUM_CONTEXTS; i++) {
aio_context_ref(ctx[i]);
}
for (i = 0; i < NUM_CONTEXTS; i++) {
iothread_join(threads[i]);
}
for (i = 0; i < NUM_CONTEXTS; i++) {
aio_context_unref(ctx[i]);
}
qemu_event_destroy(&done_event);
}
/* Basic test for the stuff above. */
static void test_lifecycle(void)
{
create_aio_contexts();
join_aio_contexts();
}
/* aio_co_schedule test. */
static Coroutine *to_schedule[NUM_CONTEXTS];
static bool now_stopping;
static int count_retry;
static int count_here;
static int count_other;
static bool schedule_next(int n)
{
Coroutine *co;
co = atomic_xchg(&to_schedule[n], NULL);
if (!co) {
atomic_inc(&count_retry);
return false;
}
if (n == id) {
atomic_inc(&count_here);
} else {
atomic_inc(&count_other);
}
aio_co_schedule(ctx[n], co);
return true;
}
static void finish_cb(void *opaque)
{
schedule_next(id);
}
static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
{
g_assert(to_schedule[id] == NULL);
atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
while (!atomic_mb_read(&now_stopping)) {
int n;
n = g_test_rand_int_range(0, NUM_CONTEXTS);
schedule_next(n);
qemu_coroutine_yield();
g_assert(to_schedule[id] == NULL);
atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
}
}
static void test_multi_co_schedule(int seconds)
{
int i;
count_here = count_other = count_retry = 0;
now_stopping = false;
create_aio_contexts();
for (i = 0; i < NUM_CONTEXTS; i++) {
Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
aio_co_schedule(ctx[i], co1);
}
g_usleep(seconds * 1000000);
atomic_mb_set(&now_stopping, true);
for (i = 0; i < NUM_CONTEXTS; i++) {
ctx_run(i, finish_cb, NULL);
to_schedule[i] = NULL;
}
join_aio_contexts();
g_test_message("scheduled %d, queued %d, retry %d, total %d\n",
count_other, count_here, count_retry,
count_here + count_other + count_retry);
}
static void test_multi_co_schedule_1(void)
{
test_multi_co_schedule(1);
}
static void test_multi_co_schedule_10(void)
{
test_multi_co_schedule(10);
}
/* End of tests. */
int main(int argc, char **argv)
{
init_clocks();
g_test_init(&argc, &argv, NULL);
g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
if (g_test_quick()) {
g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
} else {
g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
}
return g_test_run();
}

View file

@ -31,6 +31,8 @@
#include "qemu/main-loop.h"
#include "qemu/atomic.h"
#include "block/raw-aio.h"
#include "qemu/coroutine_int.h"
#include "trace.h"
/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */
@ -275,6 +277,9 @@ aio_ctx_finalize(GSource *source)
}
#endif
assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
qemu_bh_delete(ctx->co_schedule_bh);
qemu_lockcnt_lock(&ctx->list_lock);
assert(!qemu_lockcnt_count(&ctx->list_lock));
while (ctx->first_bh) {
@ -364,6 +369,28 @@ static bool event_notifier_poll(void *opaque)
return atomic_read(&ctx->notified);
}
static void co_schedule_bh_cb(void *opaque)
{
AioContext *ctx = opaque;
QSLIST_HEAD(, Coroutine) straight, reversed;
QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
QSLIST_INIT(&straight);
while (!QSLIST_EMPTY(&reversed)) {
Coroutine *co = QSLIST_FIRST(&reversed);
QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
}
while (!QSLIST_EMPTY(&straight)) {
Coroutine *co = QSLIST_FIRST(&straight);
QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
trace_aio_co_schedule_bh_cb(ctx, co);
qemu_coroutine_enter(co);
}
}
AioContext *aio_context_new(Error **errp)
{
int ret;
@ -379,6 +406,10 @@ AioContext *aio_context_new(Error **errp)
}
g_source_set_can_recurse(&ctx->source, true);
qemu_lockcnt_init(&ctx->list_lock);
ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
QSLIST_INIT(&ctx->scheduled_coroutines);
aio_set_event_notifier(ctx, &ctx->notifier,
false,
(EventNotifierHandler *)
@ -402,6 +433,40 @@ fail:
return NULL;
}
void aio_co_schedule(AioContext *ctx, Coroutine *co)
{
trace_aio_co_schedule(ctx, co);
QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
co, co_scheduled_next);
qemu_bh_schedule(ctx->co_schedule_bh);
}
void aio_co_wake(struct Coroutine *co)
{
AioContext *ctx;
/* Read coroutine before co->ctx. Matches smp_wmb in
* qemu_coroutine_enter.
*/
smp_read_barrier_depends();
ctx = atomic_read(&co->ctx);
if (ctx != qemu_get_current_aio_context()) {
aio_co_schedule(ctx, co);
return;
}
if (qemu_in_coroutine()) {
Coroutine *self = qemu_coroutine_self();
assert(self != co);
QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
} else {
aio_context_acquire(ctx);
qemu_coroutine_enter(co);
aio_context_release(ctx);
}
}
void aio_context_ref(AioContext *ctx)
{
g_source_ref(&ctx->source);

View file

@ -19,6 +19,7 @@
#include "qemu/atomic.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
#include "block/aio.h"
enum {
POOL_BATCH_SIZE = 64,
@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co)
}
co->caller = self;
co->ctx = qemu_get_current_aio_context();
/* Store co->ctx before anything that stores co. Matches
* barrier in aio_co_wake.
*/
smp_wmb();
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
qemu_co_queue_run_restart(co);

View file

@ -6,6 +6,10 @@ run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
# util/async.c
aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
# util/thread-pool.c
thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"