Merge remote-tracking branch 'stefanha/block' into staging

# By Stefan Hajnoczi (14) and others
# Via Stefan Hajnoczi
* stefanha/block: (28 commits)
  blockdev: Fix up copyright and permission notice
  qemu-iotests: use -nographic in test case 007
  qemu-iotests: add tests for rebasing zero clusters
  dataplane: fix hang introduced by AioContext transition
  coroutine: use AioContext for CoQueue BH
  threadpool: drop global thread pool
  block: add bdrv_get_aio_context()
  aio: add a ThreadPool instance to AioContext
  threadpool: add thread_pool_new() and thread_pool_free()
  threadpool: move globals into struct ThreadPool
  main-loop: add qemu_get_aio_context()
  sheepdog: set io_flush handler in do_co_req
  sheepdog: use non-blocking fd in coroutine context
  qcow2: make is_allocated return true for zero clusters
  qcow2: drop unnecessary flush in qcow2_update_snapshot_refcount()
  qcow2: drop flush in update_cluster_refcount()
  qcow2: flush in qcow2_update_snapshot_refcount()
  qcow2: set L2 cache dependency in qcow2_alloc_bytes()
  qcow2: flush refcount cache correctly in qcow2_write_snapshots()
  qcow2: flush refcount cache correctly in alloc_refcount_block()
  ...
This commit is contained in:
Anthony Liguori 2013-03-15 10:47:21 -05:00
commit dc0b0616f7
49 changed files with 700 additions and 231 deletions

11
async.c
View file

@ -24,6 +24,7 @@
#include "qemu-common.h" #include "qemu-common.h"
#include "block/aio.h" #include "block/aio.h"
#include "block/thread-pool.h"
#include "qemu/main-loop.h" #include "qemu/main-loop.h"
/***********************************************************/ /***********************************************************/
@ -172,6 +173,7 @@ aio_ctx_finalize(GSource *source)
{ {
AioContext *ctx = (AioContext *) source; AioContext *ctx = (AioContext *) source;
thread_pool_free(ctx->thread_pool);
aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL); aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL);
event_notifier_cleanup(&ctx->notifier); event_notifier_cleanup(&ctx->notifier);
g_array_free(ctx->pollfds, TRUE); g_array_free(ctx->pollfds, TRUE);
@ -190,6 +192,14 @@ GSource *aio_get_g_source(AioContext *ctx)
return &ctx->source; return &ctx->source;
} }
ThreadPool *aio_get_thread_pool(AioContext *ctx)
{
if (!ctx->thread_pool) {
ctx->thread_pool = thread_pool_new(ctx);
}
return ctx->thread_pool;
}
void aio_notify(AioContext *ctx) void aio_notify(AioContext *ctx)
{ {
event_notifier_set(&ctx->notifier); event_notifier_set(&ctx->notifier);
@ -200,6 +210,7 @@ AioContext *aio_context_new(void)
AioContext *ctx; AioContext *ctx;
ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
ctx->thread_pool = NULL;
event_notifier_init(&ctx->notifier, false); event_notifier_init(&ctx->notifier, false);
aio_set_event_notifier(ctx, &ctx->notifier, aio_set_event_notifier(ctx, &ctx->notifier,
(EventNotifierHandler *) (EventNotifierHandler *)

87
block.c
View file

@ -665,15 +665,18 @@ static int bdrv_open_flags(BlockDriverState *bs, int flags)
/* /*
* Common part for opening disk images and files * Common part for opening disk images and files
*
* Removes all processed options from *options.
*/ */
static int bdrv_open_common(BlockDriverState *bs, BlockDriverState *file, static int bdrv_open_common(BlockDriverState *bs, BlockDriverState *file,
const char *filename, const char *filename, QDict *options,
int flags, BlockDriver *drv) int flags, BlockDriver *drv)
{ {
int ret, open_flags; int ret, open_flags;
assert(drv != NULL); assert(drv != NULL);
assert(bs->file == NULL); assert(bs->file == NULL);
assert(options == NULL || bs->options != options);
trace_bdrv_open_common(bs, filename, flags, drv->format_name); trace_bdrv_open_common(bs, filename, flags, drv->format_name);
@ -710,7 +713,7 @@ static int bdrv_open_common(BlockDriverState *bs, BlockDriverState *file,
} else { } else {
assert(file != NULL); assert(file != NULL);
bs->file = file; bs->file = file;
ret = drv->bdrv_open(bs, open_flags); ret = drv->bdrv_open(bs, options, open_flags);
} }
if (ret < 0) { if (ret < 0) {
@ -752,7 +755,7 @@ int bdrv_file_open(BlockDriverState **pbs, const char *filename, int flags)
} }
bs = bdrv_new(""); bs = bdrv_new("");
ret = bdrv_open_common(bs, NULL, filename, flags, drv); ret = bdrv_open_common(bs, NULL, filename, NULL, flags, drv);
if (ret < 0) { if (ret < 0) {
bdrv_delete(bs); bdrv_delete(bs);
return ret; return ret;
@ -788,7 +791,8 @@ int bdrv_open_backing_file(BlockDriverState *bs)
/* backing files always opened read-only */ /* backing files always opened read-only */
back_flags = bs->open_flags & ~(BDRV_O_RDWR | BDRV_O_SNAPSHOT); back_flags = bs->open_flags & ~(BDRV_O_RDWR | BDRV_O_SNAPSHOT);
ret = bdrv_open(bs->backing_hd, backing_filename, back_flags, back_drv); ret = bdrv_open(bs->backing_hd, backing_filename, NULL,
back_flags, back_drv);
if (ret < 0) { if (ret < 0) {
bdrv_delete(bs->backing_hd); bdrv_delete(bs->backing_hd);
bs->backing_hd = NULL; bs->backing_hd = NULL;
@ -800,15 +804,29 @@ int bdrv_open_backing_file(BlockDriverState *bs)
/* /*
* Opens a disk image (raw, qcow2, vmdk, ...) * Opens a disk image (raw, qcow2, vmdk, ...)
*
* options is a QDict of options to pass to the block drivers, or NULL for an
* empty set of options. The reference to the QDict belongs to the block layer
* after the call (even on failure), so if the caller intends to reuse the
* dictionary, it needs to use QINCREF() before calling bdrv_open.
*/ */
int bdrv_open(BlockDriverState *bs, const char *filename, int flags, int bdrv_open(BlockDriverState *bs, const char *filename, QDict *options,
BlockDriver *drv) int flags, BlockDriver *drv)
{ {
int ret; int ret;
/* TODO: extra byte is a hack to ensure MAX_PATH space on Windows. */ /* TODO: extra byte is a hack to ensure MAX_PATH space on Windows. */
char tmp_filename[PATH_MAX + 1]; char tmp_filename[PATH_MAX + 1];
BlockDriverState *file = NULL; BlockDriverState *file = NULL;
/* NULL means an empty set of options */
if (options == NULL) {
options = qdict_new();
}
bs->options = options;
options = qdict_clone_shallow(options);
/* For snapshot=on, create a temporary qcow2 overlay */
if (flags & BDRV_O_SNAPSHOT) { if (flags & BDRV_O_SNAPSHOT) {
BlockDriverState *bs1; BlockDriverState *bs1;
int64_t total_size; int64_t total_size;
@ -822,10 +840,10 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
/* if there is a backing file, use it */ /* if there is a backing file, use it */
bs1 = bdrv_new(""); bs1 = bdrv_new("");
ret = bdrv_open(bs1, filename, 0, drv); ret = bdrv_open(bs1, filename, NULL, 0, drv);
if (ret < 0) { if (ret < 0) {
bdrv_delete(bs1); bdrv_delete(bs1);
return ret; goto fail;
} }
total_size = bdrv_getlength(bs1) & BDRV_SECTOR_MASK; total_size = bdrv_getlength(bs1) & BDRV_SECTOR_MASK;
@ -836,15 +854,17 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
ret = get_tmp_filename(tmp_filename, sizeof(tmp_filename)); ret = get_tmp_filename(tmp_filename, sizeof(tmp_filename));
if (ret < 0) { if (ret < 0) {
return ret; goto fail;
} }
/* Real path is meaningless for protocols */ /* Real path is meaningless for protocols */
if (is_protocol) if (is_protocol) {
snprintf(backing_filename, sizeof(backing_filename), snprintf(backing_filename, sizeof(backing_filename),
"%s", filename); "%s", filename);
else if (!realpath(filename, backing_filename)) } else if (!realpath(filename, backing_filename)) {
return -errno; ret = -errno;
goto fail;
}
bdrv_qcow2 = bdrv_find_format("qcow2"); bdrv_qcow2 = bdrv_find_format("qcow2");
options = parse_option_parameters("", bdrv_qcow2->create_options, NULL); options = parse_option_parameters("", bdrv_qcow2->create_options, NULL);
@ -859,7 +879,7 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
ret = bdrv_create(bdrv_qcow2, tmp_filename, options); ret = bdrv_create(bdrv_qcow2, tmp_filename, options);
free_option_parameters(options); free_option_parameters(options);
if (ret < 0) { if (ret < 0) {
return ret; goto fail;
} }
filename = tmp_filename; filename = tmp_filename;
@ -874,7 +894,7 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
ret = bdrv_file_open(&file, filename, bdrv_open_flags(bs, flags)); ret = bdrv_file_open(&file, filename, bdrv_open_flags(bs, flags));
if (ret < 0) { if (ret < 0) {
return ret; goto fail;
} }
/* Find the right image format driver */ /* Find the right image format driver */
@ -887,7 +907,7 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
} }
/* Open the image */ /* Open the image */
ret = bdrv_open_common(bs, file, filename, flags, drv); ret = bdrv_open_common(bs, file, filename, options, flags, drv);
if (ret < 0) { if (ret < 0) {
goto unlink_and_fail; goto unlink_and_fail;
} }
@ -901,11 +921,22 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
if ((flags & BDRV_O_NO_BACKING) == 0) { if ((flags & BDRV_O_NO_BACKING) == 0) {
ret = bdrv_open_backing_file(bs); ret = bdrv_open_backing_file(bs);
if (ret < 0) { if (ret < 0) {
bdrv_close(bs); goto close_and_fail;
return ret;
} }
} }
/* Check if any unknown options were used */
if (qdict_size(options) != 0) {
const QDictEntry *entry = qdict_first(options);
qerror_report(ERROR_CLASS_GENERIC_ERROR, "Block format '%s' used by "
"device '%s' doesn't support the option '%s'",
drv->format_name, bs->device_name, entry->key);
ret = -EINVAL;
goto close_and_fail;
}
QDECREF(options);
if (!bdrv_key_required(bs)) { if (!bdrv_key_required(bs)) {
bdrv_dev_change_media_cb(bs, true); bdrv_dev_change_media_cb(bs, true);
} }
@ -924,6 +955,15 @@ unlink_and_fail:
if (bs->is_temporary) { if (bs->is_temporary) {
unlink(filename); unlink(filename);
} }
fail:
QDECREF(bs->options);
QDECREF(options);
bs->options = NULL;
return ret;
close_and_fail:
bdrv_close(bs);
QDECREF(options);
return ret; return ret;
} }
@ -1193,6 +1233,8 @@ void bdrv_close(BlockDriverState *bs)
bs->valid_key = 0; bs->valid_key = 0;
bs->sg = 0; bs->sg = 0;
bs->growable = 0; bs->growable = 0;
QDECREF(bs->options);
bs->options = NULL;
if (bs->file != NULL) { if (bs->file != NULL) {
bdrv_delete(bs->file); bdrv_delete(bs->file);
@ -3190,7 +3232,7 @@ int bdrv_snapshot_goto(BlockDriverState *bs,
if (bs->file) { if (bs->file) {
drv->bdrv_close(bs); drv->bdrv_close(bs);
ret = bdrv_snapshot_goto(bs->file, snapshot_id); ret = bdrv_snapshot_goto(bs->file, snapshot_id);
open_ret = drv->bdrv_open(bs, bs->open_flags); open_ret = drv->bdrv_open(bs, NULL, bs->open_flags);
if (open_ret < 0) { if (open_ret < 0) {
bdrv_delete(bs->file); bdrv_delete(bs->file);
bs->drv = NULL; bs->drv = NULL;
@ -4594,7 +4636,8 @@ void bdrv_img_create(const char *filename, const char *fmt,
bs = bdrv_new(""); bs = bdrv_new("");
ret = bdrv_open(bs, backing_file->value.s, back_flags, backing_drv); ret = bdrv_open(bs, backing_file->value.s, NULL, back_flags,
backing_drv);
if (ret < 0) { if (ret < 0) {
error_setg_errno(errp, -ret, "Could not open '%s'", error_setg_errno(errp, -ret, "Could not open '%s'",
backing_file->value.s); backing_file->value.s);
@ -4638,3 +4681,9 @@ out:
bdrv_delete(bs); bdrv_delete(bs);
} }
} }
AioContext *bdrv_get_aio_context(BlockDriverState *bs)
{
/* Currently BlockDriverState always uses the main loop AioContext */
return qemu_get_aio_context();
}

View file

@ -98,7 +98,7 @@ static int blkverify_open(BlockDriverState *bs, const char *filename, int flags)
/* Open the test file */ /* Open the test file */
s->test_file = bdrv_new(""); s->test_file = bdrv_new("");
ret = bdrv_open(s->test_file, filename, flags, NULL); ret = bdrv_open(s->test_file, filename, NULL, flags, NULL);
if (ret < 0) { if (ret < 0) {
bdrv_delete(s->test_file); bdrv_delete(s->test_file);
s->test_file = NULL; s->test_file = NULL;

View file

@ -108,7 +108,7 @@ static int bochs_probe(const uint8_t *buf, int buf_size, const char *filename)
return 0; return 0;
} }
static int bochs_open(BlockDriverState *bs, int flags) static int bochs_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVBochsState *s = bs->opaque; BDRVBochsState *s = bs->opaque;
int i; int i;

View file

@ -53,7 +53,7 @@ static int cloop_probe(const uint8_t *buf, int buf_size, const char *filename)
return 0; return 0;
} }
static int cloop_open(BlockDriverState *bs, int flags) static int cloop_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVCloopState *s = bs->opaque; BDRVCloopState *s = bs->opaque;
uint32_t offsets_size, max_compressed_block_size = 1, i; uint32_t offsets_size, max_compressed_block_size = 1, i;

View file

@ -58,7 +58,7 @@ static int cow_probe(const uint8_t *buf, int buf_size, const char *filename)
return 0; return 0;
} }
static int cow_open(BlockDriverState *bs, int flags) static int cow_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVCowState *s = bs->opaque; BDRVCowState *s = bs->opaque;
struct cow_header_v2 cow_header; struct cow_header_v2 cow_header;

View file

@ -85,7 +85,7 @@ static int read_uint32(BlockDriverState *bs, int64_t offset, uint32_t *result)
return 0; return 0;
} }
static int dmg_open(BlockDriverState *bs, int flags) static int dmg_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVDMGState *s = bs->opaque; BDRVDMGState *s = bs->opaque;
uint64_t info_begin,info_end,last_in_offset,last_out_offset; uint64_t info_begin,info_end,last_in_offset,last_out_offset;

View file

@ -68,7 +68,7 @@ static int parallels_probe(const uint8_t *buf, int buf_size, const char *filenam
return 0; return 0;
} }
static int parallels_open(BlockDriverState *bs, int flags) static int parallels_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVParallelsState *s = bs->opaque; BDRVParallelsState *s = bs->opaque;
int i; int i;

View file

@ -92,7 +92,7 @@ static int qcow_probe(const uint8_t *buf, int buf_size, const char *filename)
return 0; return 0;
} }
static int qcow_open(BlockDriverState *bs, int flags) static int qcow_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVQcowState *s = bs->opaque; BDRVQcowState *s = bs->opaque;
int len, i, shift, ret; int len, i, shift, ret;

View file

@ -454,6 +454,9 @@ int qcow2_get_cluster_offset(BlockDriverState *bs, uint64_t offset,
*cluster_offset &= L2E_COMPRESSED_OFFSET_SIZE_MASK; *cluster_offset &= L2E_COMPRESSED_OFFSET_SIZE_MASK;
break; break;
case QCOW2_CLUSTER_ZERO: case QCOW2_CLUSTER_ZERO:
if (s->qcow_version < 3) {
return -EIO;
}
c = count_contiguous_clusters(nb_clusters, s->cluster_size, c = count_contiguous_clusters(nb_clusters, s->cluster_size,
&l2_table[l2_index], 0, &l2_table[l2_index], 0,
QCOW_OFLAG_COMPRESSED | QCOW_OFLAG_ZERO); QCOW_OFLAG_COMPRESSED | QCOW_OFLAG_ZERO);
@ -668,7 +671,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
} }
/* Update L2 table. */ /* Update L2 table. */
if (s->compatible_features & QCOW2_COMPAT_LAZY_REFCOUNTS) { if (s->use_lazy_refcounts) {
qcow2_mark_dirty(bs); qcow2_mark_dirty(bs);
} }
if (qcow2_need_accurate_refcounts(s)) { if (qcow2_need_accurate_refcounts(s)) {

View file

@ -201,7 +201,10 @@ static int alloc_refcount_block(BlockDriverState *bs,
*refcount_block = NULL; *refcount_block = NULL;
/* We write to the refcount table, so we might depend on L2 tables */ /* We write to the refcount table, so we might depend on L2 tables */
qcow2_cache_flush(bs, s->l2_table_cache); ret = qcow2_cache_flush(bs, s->l2_table_cache);
if (ret < 0) {
return ret;
}
/* Allocate the refcount block itself and mark it as used */ /* Allocate the refcount block itself and mark it as used */
int64_t new_block = alloc_clusters_noref(bs, s->cluster_size); int64_t new_block = alloc_clusters_noref(bs, s->cluster_size);
@ -237,7 +240,10 @@ static int alloc_refcount_block(BlockDriverState *bs,
goto fail_block; goto fail_block;
} }
bdrv_flush(bs->file); ret = qcow2_cache_flush(bs, s->refcount_block_cache);
if (ret < 0) {
goto fail_block;
}
/* Initialize the new refcount block only after updating its refcount, /* Initialize the new refcount block only after updating its refcount,
* update_refcount uses the refcount cache itself */ * update_refcount uses the refcount cache itself */
@ -526,8 +532,6 @@ static int update_cluster_refcount(BlockDriverState *bs,
return ret; return ret;
} }
bdrv_flush(bs->file);
return get_refcount(bs, cluster_index); return get_refcount(bs, cluster_index);
} }
@ -663,7 +667,11 @@ int64_t qcow2_alloc_bytes(BlockDriverState *bs, int size)
} }
} }
bdrv_flush(bs->file); /* The cluster refcount was incremented, either by qcow2_alloc_clusters()
* or explicitly by update_cluster_refcount(). Refcount blocks must be
* flushed before the caller's L2 table updates.
*/
qcow2_cache_set_dependency(bs, s->l2_table_cache, s->refcount_block_cache);
return offset; return offset;
} }
@ -782,10 +790,6 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
if (ret < 0) { if (ret < 0) {
goto fail; goto fail;
} }
/* TODO Flushing once for the whole function should
* be enough */
bdrv_flush(bs->file);
} }
/* compressed clusters are never modified */ /* compressed clusters are never modified */
refcount = 2; refcount = 2;
@ -841,7 +845,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
} }
} }
ret = 0; ret = bdrv_flush(bs);
fail: fail:
if (l2_table) { if (l2_table) {
qcow2_cache_put(bs, s->l2_table_cache, (void**) &l2_table); qcow2_cache_put(bs, s->l2_table_cache, (void**) &l2_table);

View file

@ -180,11 +180,14 @@ static int qcow2_write_snapshots(BlockDriverState *bs)
/* Allocate space for the new snapshot list */ /* Allocate space for the new snapshot list */
snapshots_offset = qcow2_alloc_clusters(bs, snapshots_size); snapshots_offset = qcow2_alloc_clusters(bs, snapshots_size);
bdrv_flush(bs->file);
offset = snapshots_offset; offset = snapshots_offset;
if (offset < 0) { if (offset < 0) {
return offset; return offset;
} }
ret = bdrv_flush(bs);
if (ret < 0) {
return ret;
}
/* Write all snapshots to the new list */ /* Write all snapshots to the new list */
for(i = 0; i < s->nb_snapshots; i++) { for(i = 0; i < s->nb_snapshots; i++) {
@ -378,11 +381,6 @@ int qcow2_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
goto fail; goto fail;
} }
ret = bdrv_flush(bs);
if (ret < 0) {
goto fail;
}
/* Append the new snapshot to the snapshot list */ /* Append the new snapshot to the snapshot list */
new_snapshot_list = g_malloc((s->nb_snapshots + 1) * sizeof(QCowSnapshot)); new_snapshot_list = g_malloc((s->nb_snapshots + 1) * sizeof(QCowSnapshot));
if (s->snapshots) { if (s->snapshots) {

View file

@ -285,11 +285,26 @@ static int qcow2_check(BlockDriverState *bs, BdrvCheckResult *result,
return ret; return ret;
} }
static int qcow2_open(BlockDriverState *bs, int flags) static QemuOptsList qcow2_runtime_opts = {
.name = "qcow2",
.head = QTAILQ_HEAD_INITIALIZER(qcow2_runtime_opts.head),
.desc = {
{
.name = "lazy_refcounts",
.type = QEMU_OPT_BOOL,
.help = "Postpone refcount updates",
},
{ /* end of list */ }
},
};
static int qcow2_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVQcowState *s = bs->opaque; BDRVQcowState *s = bs->opaque;
int len, i, ret = 0; int len, i, ret = 0;
QCowHeader header; QCowHeader header;
QemuOpts *opts;
Error *local_err = NULL;
uint64_t ext_end; uint64_t ext_end;
ret = bdrv_pread(bs->file, 0, &header, sizeof(header)); ret = bdrv_pread(bs->file, 0, &header, sizeof(header));
@ -495,6 +510,28 @@ static int qcow2_open(BlockDriverState *bs, int flags)
} }
} }
/* Enable lazy_refcounts according to image and command line options */
opts = qemu_opts_create_nofail(&qcow2_runtime_opts);
qemu_opts_absorb_qdict(opts, options, &local_err);
if (error_is_set(&local_err)) {
qerror_report_err(local_err);
error_free(local_err);
ret = -EINVAL;
goto fail;
}
s->use_lazy_refcounts = qemu_opt_get_bool(opts, "lazy_refcounts",
(s->compatible_features & QCOW2_COMPAT_LAZY_REFCOUNTS));
qemu_opts_del(opts);
if (s->use_lazy_refcounts && s->qcow_version < 3) {
qerror_report(ERROR_CLASS_GENERIC_ERROR, "Lazy refcounts require "
"a qcow2 image with at least qemu 1.1 compatibility level");
ret = -EINVAL;
goto fail;
}
#ifdef DEBUG_ALLOC #ifdef DEBUG_ALLOC
{ {
BdrvCheckResult result = {0}; BdrvCheckResult result = {0};
@ -584,7 +621,7 @@ static int coroutine_fn qcow2_co_is_allocated(BlockDriverState *bs,
*pnum = 0; *pnum = 0;
} }
return (cluster_offset != 0); return (cluster_offset != 0) || (ret == QCOW2_CLUSTER_ZERO);
} }
/* handle reading after the end of the backing file */ /* handle reading after the end of the backing file */
@ -665,10 +702,6 @@ static coroutine_fn int qcow2_co_readv(BlockDriverState *bs, int64_t sector_num,
break; break;
case QCOW2_CLUSTER_ZERO: case QCOW2_CLUSTER_ZERO:
if (s->qcow_version < 3) {
ret = -EIO;
goto fail;
}
qemu_iovec_memset(&hd_qiov, 0, 0, 512 * cur_nr_sectors); qemu_iovec_memset(&hd_qiov, 0, 0, 512 * cur_nr_sectors);
break; break;
@ -912,7 +945,7 @@ static void qcow2_invalidate_cache(BlockDriverState *bs)
qcow2_close(bs); qcow2_close(bs);
memset(s, 0, sizeof(BDRVQcowState)); memset(s, 0, sizeof(BDRVQcowState));
qcow2_open(bs, flags); qcow2_open(bs, NULL, flags);
if (crypt_method) { if (crypt_method) {
s->crypt_method = crypt_method; s->crypt_method = crypt_method;
@ -1265,7 +1298,7 @@ static int qcow2_create2(const char *filename, int64_t total_size,
*/ */
BlockDriver* drv = bdrv_find_format("qcow2"); BlockDriver* drv = bdrv_find_format("qcow2");
assert(drv != NULL); assert(drv != NULL);
ret = bdrv_open(bs, filename, ret = bdrv_open(bs, filename, NULL,
BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, drv); BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, drv);
if (ret < 0) { if (ret < 0) {
goto out; goto out;

View file

@ -173,6 +173,7 @@ typedef struct BDRVQcowState {
int flags; int flags;
int qcow_version; int qcow_version;
bool use_lazy_refcounts;
uint64_t incompatible_features; uint64_t incompatible_features;
uint64_t compatible_features; uint64_t compatible_features;

View file

@ -373,7 +373,7 @@ static void bdrv_qed_rebind(BlockDriverState *bs)
s->bs = bs; s->bs = bs;
} }
static int bdrv_qed_open(BlockDriverState *bs, int flags) static int bdrv_qed_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVQEDState *s = bs->opaque; BDRVQEDState *s = bs->opaque;
QEDHeader le_header; QEDHeader le_header;
@ -1526,7 +1526,7 @@ static void bdrv_qed_invalidate_cache(BlockDriverState *bs)
bdrv_qed_close(bs); bdrv_qed_close(bs);
memset(s, 0, sizeof(BDRVQEDState)); memset(s, 0, sizeof(BDRVQEDState));
bdrv_qed_open(bs, bs->open_flags); bdrv_qed_open(bs, NULL, bs->open_flags);
} }
static int bdrv_qed_check(BlockDriverState *bs, BdrvCheckResult *result, static int bdrv_qed_check(BlockDriverState *bs, BdrvCheckResult *result,

View file

@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
BlockDriverCompletionFunc *cb, void *opaque, int type) BlockDriverCompletionFunc *cb, void *opaque, int type)
{ {
RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
ThreadPool *pool;
acb->bs = bs; acb->bs = bs;
acb->aio_type = type; acb->aio_type = type;
@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
acb->aio_offset = sector_num * 512; acb->aio_offset = sector_num * 512;
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
return thread_pool_submit_aio(aio_worker, acb, cb, opaque); pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
} }
static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
{ {
BDRVRawState *s = bs->opaque; BDRVRawState *s = bs->opaque;
RawPosixAIOData *acb; RawPosixAIOData *acb;
ThreadPool *pool;
if (fd_open(bs) < 0) if (fd_open(bs) < 0)
return NULL; return NULL;
@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
acb->aio_offset = 0; acb->aio_offset = 0;
acb->aio_ioctl_buf = buf; acb->aio_ioctl_buf = buf;
acb->aio_ioctl_cmd = req; acb->aio_ioctl_cmd = req;
return thread_pool_submit_aio(aio_worker, acb, cb, opaque); pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
} }
#elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)

View file

@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
BlockDriverCompletionFunc *cb, void *opaque, int type) BlockDriverCompletionFunc *cb, void *opaque, int type)
{ {
RawWin32AIOData *acb = g_slice_new(RawWin32AIOData); RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
ThreadPool *pool;
acb->bs = bs; acb->bs = bs;
acb->hfile = hfile; acb->hfile = hfile;
@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
acb->aio_offset = sector_num * 512; acb->aio_offset = sector_num * 512;
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
return thread_pool_submit_aio(aio_worker, acb, cb, opaque); pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
} }
int qemu_ftruncate64(int fd, int64_t length) int qemu_ftruncate64(int fd, int64_t length)

View file

@ -3,7 +3,7 @@
#include "block/block_int.h" #include "block/block_int.h"
#include "qemu/module.h" #include "qemu/module.h"
static int raw_open(BlockDriverState *bs, int flags) static int raw_open(BlockDriverState *bs, QDict *options, int flags)
{ {
bs->sg = bs->file->sg; bs->sg = bs->file->sg;
return 0; return 0;

View file

@ -468,6 +468,8 @@ static int connect_to_sdog(BDRVSheepdogState *s)
if (err != NULL) { if (err != NULL) {
qerror_report_err(err); qerror_report_err(err);
error_free(err); error_free(err);
} else {
socket_set_nonblock(fd);
} }
return fd; return fd;
@ -499,6 +501,13 @@ static void restart_co_req(void *opaque)
qemu_coroutine_enter(co, NULL); qemu_coroutine_enter(co, NULL);
} }
static int have_co_req(void *opaque)
{
/* this handler is set only when there is a pending request, so
* always returns 1. */
return 1;
}
typedef struct SheepdogReqCo { typedef struct SheepdogReqCo {
int sockfd; int sockfd;
SheepdogReq *hdr; SheepdogReq *hdr;
@ -521,15 +530,14 @@ static coroutine_fn void do_co_req(void *opaque)
unsigned int *rlen = srco->rlen; unsigned int *rlen = srco->rlen;
co = qemu_coroutine_self(); co = qemu_coroutine_self();
qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co); qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, have_co_req, co);
socket_set_block(sockfd);
ret = send_co_req(sockfd, hdr, data, wlen); ret = send_co_req(sockfd, hdr, data, wlen);
if (ret < 0) { if (ret < 0) {
goto out; goto out;
} }
qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co); qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, have_co_req, co);
ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
if (ret < sizeof(*hdr)) { if (ret < sizeof(*hdr)) {
@ -552,8 +560,9 @@ static coroutine_fn void do_co_req(void *opaque)
} }
ret = 0; ret = 0;
out: out:
/* there is at most one request for this sockfd, so it is safe to
* set each handler to NULL. */
qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL); qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL);
socket_set_nonblock(sockfd);
srco->ret = ret; srco->ret = ret;
srco->finished = true; srco->finished = true;
@ -776,8 +785,6 @@ static int get_sheep_fd(BDRVSheepdogState *s)
return fd; return fd;
} }
socket_set_nonblock(fd);
qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s); qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s);
return fd; return fd;
} }

View file

@ -364,7 +364,7 @@ static int vdi_probe(const uint8_t *buf, int buf_size, const char *filename)
return result; return result;
} }
static int vdi_open(BlockDriverState *bs, int flags) static int vdi_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVVdiState *s = bs->opaque; BDRVVdiState *s = bs->opaque;
VdiHeader header; VdiHeader header;

View file

@ -723,7 +723,7 @@ static int vmdk_open_desc_file(BlockDriverState *bs, int flags,
return vmdk_parse_extents(buf, bs, bs->file->filename); return vmdk_parse_extents(buf, bs, bs->file->filename);
} }
static int vmdk_open(BlockDriverState *bs, int flags) static int vmdk_open(BlockDriverState *bs, QDict *options, int flags)
{ {
int ret; int ret;
BDRVVmdkState *s = bs->opaque; BDRVVmdkState *s = bs->opaque;
@ -1527,7 +1527,7 @@ static int vmdk_create(const char *filename, QEMUOptionParameter *options)
if (backing_file) { if (backing_file) {
char parent_filename[PATH_MAX]; char parent_filename[PATH_MAX];
BlockDriverState *bs = bdrv_new(""); BlockDriverState *bs = bdrv_new("");
ret = bdrv_open(bs, backing_file, 0, NULL); ret = bdrv_open(bs, backing_file, NULL, 0, NULL);
if (ret != 0) { if (ret != 0) {
bdrv_delete(bs); bdrv_delete(bs);
return ret; return ret;

View file

@ -155,7 +155,7 @@ static int vpc_probe(const uint8_t *buf, int buf_size, const char *filename)
return 0; return 0;
} }
static int vpc_open(BlockDriverState *bs, int flags) static int vpc_open(BlockDriverState *bs, QDict *options, int flags)
{ {
BDRVVPCState *s = bs->opaque; BDRVVPCState *s = bs->opaque;
int i; int i;

View file

@ -2830,7 +2830,7 @@ static int enable_write_target(BDRVVVFATState *s)
return -1; return -1;
} }
ret = bdrv_open(s->qcow, s->qcow_filename, ret = bdrv_open(s->qcow, s->qcow_filename, NULL,
BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, bdrv_qcow); BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, bdrv_qcow);
if (ret < 0) { if (ret < 0) {
return ret; return ret;

View file

@ -5,6 +5,29 @@
* *
* This work is licensed under the terms of the GNU GPL, version 2 or * This work is licensed under the terms of the GNU GPL, version 2 or
* later. See the COPYING file in the top-level directory. * later. See the COPYING file in the top-level directory.
*
* This file incorporates work covered by the following copyright and
* permission notice:
*
* Copyright (c) 2003-2008 Fabrice Bellard
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/ */
#include "sysemu/blockdev.h" #include "sysemu/blockdev.h"
@ -22,6 +45,7 @@
#include "sysemu/arch_init.h" #include "sysemu/arch_init.h"
static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives); static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives);
extern QemuOptsList qemu_common_drive_opts;
static const char *const if_name[IF_COUNT] = { static const char *const if_name[IF_COUNT] = {
[IF_NONE] = "none", [IF_NONE] = "none",
@ -191,6 +215,7 @@ static void drive_uninit(DriveInfo *dinfo)
bdrv_delete(dinfo->bdrv); bdrv_delete(dinfo->bdrv);
g_free(dinfo->id); g_free(dinfo->id);
QTAILQ_REMOVE(&drives, dinfo, next); QTAILQ_REMOVE(&drives, dinfo, next);
g_free(dinfo->serial);
g_free(dinfo); g_free(dinfo);
} }
@ -287,7 +312,7 @@ static bool do_check_io_limits(BlockIOLimit *io_limits, Error **errp)
return true; return true;
} }
DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type) DriveInfo *drive_init(QemuOpts *all_opts, BlockInterfaceType block_default_type)
{ {
const char *buf; const char *buf;
const char *file = NULL; const char *file = NULL;
@ -310,10 +335,36 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type)
bool copy_on_read; bool copy_on_read;
int ret; int ret;
Error *error = NULL; Error *error = NULL;
QemuOpts *opts;
QDict *bs_opts;
const char *id;
translation = BIOS_ATA_TRANSLATION_AUTO; translation = BIOS_ATA_TRANSLATION_AUTO;
media = MEDIA_DISK; media = MEDIA_DISK;
/* Check common options by copying from all_opts to opts, all other options
* are stored in bs_opts. */
id = qemu_opts_id(all_opts);
opts = qemu_opts_create(&qemu_common_drive_opts, id, 1, &error);
if (error_is_set(&error)) {
qerror_report_err(error);
error_free(error);
return NULL;
}
bs_opts = qdict_new();
qemu_opts_to_qdict(all_opts, bs_opts);
qemu_opts_absorb_qdict(opts, bs_opts, &error);
if (error_is_set(&error)) {
qerror_report_err(error);
error_free(error);
return NULL;
}
if (id) {
qdict_del(bs_opts, "id");
}
/* extract parameters */ /* extract parameters */
bus_id = qemu_opt_get_number(opts, "bus", 0); bus_id = qemu_opt_get_number(opts, "bus", 0);
unit_id = qemu_opt_get_number(opts, "unit", -1); unit_id = qemu_opt_get_number(opts, "unit", -1);
@ -564,9 +615,11 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type)
dinfo->heads = heads; dinfo->heads = heads;
dinfo->secs = secs; dinfo->secs = secs;
dinfo->trans = translation; dinfo->trans = translation;
dinfo->opts = opts; dinfo->opts = all_opts;
dinfo->refcount = 1; dinfo->refcount = 1;
dinfo->serial = serial; if (serial != NULL) {
dinfo->serial = g_strdup(serial);
}
QTAILQ_INSERT_TAIL(&drives, dinfo, next); QTAILQ_INSERT_TAIL(&drives, dinfo, next);
bdrv_set_on_error(dinfo->bdrv, on_read_error, on_write_error); bdrv_set_on_error(dinfo->bdrv, on_read_error, on_write_error);
@ -587,17 +640,20 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type)
case IF_MTD: case IF_MTD:
break; break;
case IF_VIRTIO: case IF_VIRTIO:
{
/* add virtio block device */ /* add virtio block device */
opts = qemu_opts_create_nofail(qemu_find_opts("device")); QemuOpts *devopts;
devopts = qemu_opts_create_nofail(qemu_find_opts("device"));
if (arch_type == QEMU_ARCH_S390X) { if (arch_type == QEMU_ARCH_S390X) {
qemu_opt_set(opts, "driver", "virtio-blk-s390"); qemu_opt_set(devopts, "driver", "virtio-blk-s390");
} else { } else {
qemu_opt_set(opts, "driver", "virtio-blk-pci"); qemu_opt_set(devopts, "driver", "virtio-blk-pci");
} }
qemu_opt_set(opts, "drive", dinfo->id); qemu_opt_set(devopts, "drive", dinfo->id);
if (devaddr) if (devaddr)
qemu_opt_set(opts, "addr", devaddr); qemu_opt_set(devopts, "addr", devaddr);
break; break;
}
default: default:
abort(); abort();
} }
@ -635,7 +691,9 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type)
error_report("warning: disabling copy_on_read on readonly drive"); error_report("warning: disabling copy_on_read on readonly drive");
} }
ret = bdrv_open(dinfo->bdrv, file, bdrv_flags, drv); ret = bdrv_open(dinfo->bdrv, file, bs_opts, bdrv_flags, drv);
bs_opts = NULL;
if (ret < 0) { if (ret < 0) {
if (ret == -EMEDIUMTYPE) { if (ret == -EMEDIUMTYPE) {
error_report("could not open disk image %s: not in %s format", error_report("could not open disk image %s: not in %s format",
@ -649,9 +707,14 @@ DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type)
if (bdrv_key_required(dinfo->bdrv)) if (bdrv_key_required(dinfo->bdrv))
autostart = 0; autostart = 0;
qemu_opts_del(opts);
return dinfo; return dinfo;
err: err:
qemu_opts_del(opts);
QDECREF(bs_opts);
bdrv_delete(dinfo->bdrv); bdrv_delete(dinfo->bdrv);
g_free(dinfo->id); g_free(dinfo->id);
QTAILQ_REMOVE(&drives, dinfo, next); QTAILQ_REMOVE(&drives, dinfo, next);
@ -820,7 +883,9 @@ void qmp_transaction(BlockdevActionList *dev_list, Error **errp)
/* We will manually add the backing_hd field to the bs later */ /* We will manually add the backing_hd field to the bs later */
states->new_bs = bdrv_new(""); states->new_bs = bdrv_new("");
ret = bdrv_open(states->new_bs, new_image_file, /* TODO Inherit bs->options or only take explicit options with an
* extended QMP command? */
ret = bdrv_open(states->new_bs, new_image_file, NULL,
flags | BDRV_O_NO_BACKING, drv); flags | BDRV_O_NO_BACKING, drv);
if (ret != 0) { if (ret != 0) {
error_set(errp, QERR_OPEN_FILE_FAILED, new_image_file); error_set(errp, QERR_OPEN_FILE_FAILED, new_image_file);
@ -921,7 +986,7 @@ static void qmp_bdrv_open_encrypted(BlockDriverState *bs, const char *filename,
int bdrv_flags, BlockDriver *drv, int bdrv_flags, BlockDriver *drv,
const char *password, Error **errp) const char *password, Error **errp)
{ {
if (bdrv_open(bs, filename, bdrv_flags, drv) < 0) { if (bdrv_open(bs, filename, NULL, bdrv_flags, drv) < 0) {
error_set(errp, QERR_OPEN_FILE_FAILED, filename); error_set(errp, QERR_OPEN_FILE_FAILED, filename);
return; return;
} }
@ -1330,7 +1395,7 @@ void qmp_drive_mirror(const char *device, const char *target,
* file. * file.
*/ */
target_bs = bdrv_new(""); target_bs = bdrv_new("");
ret = bdrv_open(target_bs, target, flags | BDRV_O_NO_BACKING, drv); ret = bdrv_open(target_bs, target, NULL, flags | BDRV_O_NO_BACKING, drv);
if (ret < 0) { if (ret < 0) {
bdrv_delete(target_bs); bdrv_delete(target_bs);
@ -1459,9 +1524,9 @@ BlockJobInfoList *qmp_query_block_jobs(Error **errp)
return dummy.next; return dummy.next;
} }
QemuOptsList qemu_drive_opts = { QemuOptsList qemu_common_drive_opts = {
.name = "drive", .name = "drive",
.head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head), .head = QTAILQ_HEAD_INITIALIZER(qemu_common_drive_opts.head),
.desc = { .desc = {
{ {
.name = "bus", .name = "bus",
@ -1580,3 +1645,15 @@ QemuOptsList qemu_drive_opts = {
{ /* end of list */ } { /* end of list */ }
}, },
}; };
QemuOptsList qemu_drive_opts = {
.name = "drive",
.head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head),
.desc = {
/*
* no elements => accept any params
* validation will happen later
*/
{ /* end of list */ }
},
};

View file

@ -263,6 +263,11 @@ static int process_request(IOQueue *ioq, struct iovec iov[],
} }
} }
static int flush_true(EventNotifier *e)
{
return true;
}
static void handle_notify(EventNotifier *e) static void handle_notify(EventNotifier *e)
{ {
VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane, VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
@ -342,6 +347,14 @@ static void handle_notify(EventNotifier *e)
} }
} }
static int flush_io(EventNotifier *e)
{
VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
io_notifier);
return s->num_reqs > 0;
}
static void handle_io(EventNotifier *e) static void handle_io(EventNotifier *e)
{ {
VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane, VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
@ -472,7 +485,7 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
exit(1); exit(1);
} }
s->host_notifier = *virtio_queue_get_host_notifier(vq); s->host_notifier = *virtio_queue_get_host_notifier(vq);
aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify, NULL); aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify, flush_true);
/* Set up ioqueue */ /* Set up ioqueue */
ioq_init(&s->ioqueue, s->fd, REQ_MAX); ioq_init(&s->ioqueue, s->fd, REQ_MAX);
@ -480,7 +493,7 @@ void virtio_blk_data_plane_start(VirtIOBlockDataPlane *s)
ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb); ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb);
} }
s->io_notifier = *ioq_get_notifier(&s->ioqueue); s->io_notifier = *ioq_get_notifier(&s->ioqueue);
aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io, NULL); aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io, flush_io);
s->started = true; s->started = true;
trace_virtio_blk_data_plane_start(s); trace_virtio_blk_data_plane_start(s);

View file

@ -763,7 +763,7 @@ static int blk_init(struct XenDevice *xendev)
xen_be_printf(&blkdev->xendev, 2, "create new bdrv (xenbus setup)\n"); xen_be_printf(&blkdev->xendev, 2, "create new bdrv (xenbus setup)\n");
blkdev->bs = bdrv_new(blkdev->dev); blkdev->bs = bdrv_new(blkdev->dev);
if (blkdev->bs) { if (blkdev->bs) {
if (bdrv_open(blkdev->bs, blkdev->filename, qflags, if (bdrv_open(blkdev->bs, blkdev->filename, NULL, qflags,
bdrv_find_whitelisted_format(blkdev->fileproto)) != 0) { bdrv_find_whitelisted_format(blkdev->fileproto)) != 0) {
bdrv_delete(blkdev->bs); bdrv_delete(blkdev->bs);
blkdev->bs = NULL; blkdev->bs = NULL;

View file

@ -66,6 +66,9 @@ typedef struct AioContext {
/* GPollFDs for aio_poll() */ /* GPollFDs for aio_poll() */
GArray *pollfds; GArray *pollfds;
/* Thread pool for performing work and receiving completion callbacks */
struct ThreadPool *thread_pool;
} AioContext; } AioContext;
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */ /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
@ -223,6 +226,9 @@ void aio_set_event_notifier(AioContext *ctx,
*/ */
GSource *aio_get_g_source(AioContext *ctx); GSource *aio_get_g_source(AioContext *ctx);
/* Return the ThreadPool bound to this AioContext */
struct ThreadPool *aio_get_thread_pool(AioContext *ctx);
/* Functions to operate on the main QEMU AioContext. */ /* Functions to operate on the main QEMU AioContext. */
bool qemu_aio_wait(void); bool qemu_aio_wait(void);

View file

@ -137,8 +137,8 @@ int bdrv_parse_cache_flags(const char *mode, int *flags);
int bdrv_parse_discard_flags(const char *mode, int *flags); int bdrv_parse_discard_flags(const char *mode, int *flags);
int bdrv_file_open(BlockDriverState **pbs, const char *filename, int flags); int bdrv_file_open(BlockDriverState **pbs, const char *filename, int flags);
int bdrv_open_backing_file(BlockDriverState *bs); int bdrv_open_backing_file(BlockDriverState *bs);
int bdrv_open(BlockDriverState *bs, const char *filename, int flags, int bdrv_open(BlockDriverState *bs, const char *filename, QDict *options,
BlockDriver *drv); int flags, BlockDriver *drv);
BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue, BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue,
BlockDriverState *bs, int flags); BlockDriverState *bs, int flags);
int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp); int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp);

View file

@ -82,7 +82,7 @@ struct BlockDriver {
void (*bdrv_reopen_commit)(BDRVReopenState *reopen_state); void (*bdrv_reopen_commit)(BDRVReopenState *reopen_state);
void (*bdrv_reopen_abort)(BDRVReopenState *reopen_state); void (*bdrv_reopen_abort)(BDRVReopenState *reopen_state);
int (*bdrv_open)(BlockDriverState *bs, int flags); int (*bdrv_open)(BlockDriverState *bs, QDict *options, int flags);
int (*bdrv_file_open)(BlockDriverState *bs, const char *filename, int flags); int (*bdrv_file_open)(BlockDriverState *bs, const char *filename, int flags);
int (*bdrv_read)(BlockDriverState *bs, int64_t sector_num, int (*bdrv_read)(BlockDriverState *bs, int64_t sector_num,
uint8_t *buf, int nb_sectors); uint8_t *buf, int nb_sectors);
@ -286,6 +286,7 @@ struct BlockDriverState {
/* long-running background operation */ /* long-running background operation */
BlockJob *job; BlockJob *job;
QDict *options;
}; };
int get_tmp_filename(char *filename, int size); int get_tmp_filename(char *filename, int size);
@ -293,6 +294,13 @@ int get_tmp_filename(char *filename, int size);
void bdrv_set_io_limits(BlockDriverState *bs, void bdrv_set_io_limits(BlockDriverState *bs,
BlockIOLimit *io_limits); BlockIOLimit *io_limits);
/**
* bdrv_get_aio_context:
*
* Returns: the currently bound #AioContext
*/
AioContext *bdrv_get_aio_context(BlockDriverState *bs);
#ifdef _WIN32 #ifdef _WIN32
int is_windows_drive(const char *filename); int is_windows_drive(const char *filename);
#endif #endif

View file

@ -104,6 +104,7 @@ bool qemu_in_coroutine(void);
*/ */
typedef struct CoQueue { typedef struct CoQueue {
QTAILQ_HEAD(, Coroutine) entries; QTAILQ_HEAD(, Coroutine) entries;
AioContext *ctx;
} CoQueue; } CoQueue;
/** /**

View file

@ -26,9 +26,16 @@
typedef int ThreadPoolFunc(void *opaque); typedef int ThreadPoolFunc(void *opaque);
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, typedef struct ThreadPool ThreadPool;
BlockDriverCompletionFunc *cb, void *opaque);
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); ThreadPool *thread_pool_new(struct AioContext *ctx);
void thread_pool_submit(ThreadPoolFunc *func, void *arg); void thread_pool_free(ThreadPool *pool);
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
ThreadPoolFunc *func, void *arg,
BlockDriverCompletionFunc *cb, void *opaque);
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
ThreadPoolFunc *func, void *arg);
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
#endif #endif

View file

@ -64,4 +64,6 @@ int64_t qdict_get_try_int(const QDict *qdict, const char *key,
int qdict_get_try_bool(const QDict *qdict, const char *key, int def_value); int qdict_get_try_bool(const QDict *qdict, const char *key, int def_value);
const char *qdict_get_try_str(const QDict *qdict, const char *key); const char *qdict_get_try_str(const QDict *qdict, const char *key);
QDict *qdict_clone_shallow(const QDict *src);
#endif /* QDICT_H */ #endif /* QDICT_H */

View file

@ -81,6 +81,11 @@ int qemu_init_main_loop(void);
*/ */
int main_loop_wait(int nonblocking); int main_loop_wait(int nonblocking);
/**
* qemu_get_aio_context: Return the main loop's AioContext
*/
AioContext *qemu_get_aio_context(void);
/** /**
* qemu_notify_event: Force processing of pending events. * qemu_notify_event: Force processing of pending events.
* *

View file

@ -149,6 +149,7 @@ void qemu_opts_set_defaults(QemuOptsList *list, const char *params,
QemuOpts *qemu_opts_from_qdict(QemuOptsList *list, const QDict *qdict, QemuOpts *qemu_opts_from_qdict(QemuOptsList *list, const QDict *qdict,
Error **errp); Error **errp);
QDict *qemu_opts_to_qdict(QemuOpts *opts, QDict *qdict); QDict *qemu_opts_to_qdict(QemuOpts *opts, QDict *qdict);
void qemu_opts_absorb_qdict(QemuOpts *opts, QDict *qdict, Error **errp);
typedef int (*qemu_opts_loopfunc)(QemuOpts *opts, void *opaque); typedef int (*qemu_opts_loopfunc)(QemuOpts *opts, void *opaque);
int qemu_opts_print(QemuOpts *opts, void *dummy); int qemu_opts_print(QemuOpts *opts, void *dummy);

View file

@ -40,7 +40,7 @@ struct DriveInfo {
int media_cd; int media_cd;
int cyls, heads, secs, trans; int cyls, heads, secs, trans;
QemuOpts *opts; QemuOpts *opts;
const char *serial; char *serial;
QTAILQ_ENTRY(DriveInfo) next; QTAILQ_ENTRY(DriveInfo) next;
int refcount; int refcount;
}; };

View file

@ -109,6 +109,11 @@ static int qemu_signal_init(void)
static AioContext *qemu_aio_context; static AioContext *qemu_aio_context;
AioContext *qemu_get_aio_context(void)
{
return qemu_aio_context;
}
void qemu_notify_event(void) void qemu_notify_event(void)
{ {
if (!qemu_aio_context) { if (!qemu_aio_context) {

View file

@ -29,28 +29,36 @@
#include "block/aio.h" #include "block/aio.h"
#include "trace.h" #include "trace.h"
static QTAILQ_HEAD(, Coroutine) unlock_bh_queue = /* Coroutines are awoken from a BH to allow the current coroutine to complete
QTAILQ_HEAD_INITIALIZER(unlock_bh_queue); * its flow of execution. The BH may run after the CoQueue has been destroyed,
static QEMUBH* unlock_bh; * so keep BH data in a separate heap-allocated struct.
*/
typedef struct {
QEMUBH *bh;
QTAILQ_HEAD(, Coroutine) entries;
} CoQueueNextData;
static void qemu_co_queue_next_bh(void *opaque) static void qemu_co_queue_next_bh(void *opaque)
{ {
CoQueueNextData *data = opaque;
Coroutine *next; Coroutine *next;
trace_qemu_co_queue_next_bh(); trace_qemu_co_queue_next_bh();
while ((next = QTAILQ_FIRST(&unlock_bh_queue))) { while ((next = QTAILQ_FIRST(&data->entries))) {
QTAILQ_REMOVE(&unlock_bh_queue, next, co_queue_next); QTAILQ_REMOVE(&data->entries, next, co_queue_next);
qemu_coroutine_enter(next, NULL); qemu_coroutine_enter(next, NULL);
} }
qemu_bh_delete(data->bh);
g_slice_free(CoQueueNextData, data);
} }
void qemu_co_queue_init(CoQueue *queue) void qemu_co_queue_init(CoQueue *queue)
{ {
QTAILQ_INIT(&queue->entries); QTAILQ_INIT(&queue->entries);
if (!unlock_bh) { /* This will be exposed to callers once there are multiple AioContexts */
unlock_bh = qemu_bh_new(qemu_co_queue_next_bh, NULL); queue->ctx = qemu_get_aio_context();
}
} }
void coroutine_fn qemu_co_queue_wait(CoQueue *queue) void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
@ -69,26 +77,39 @@ void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue)
assert(qemu_in_coroutine()); assert(qemu_in_coroutine());
} }
bool qemu_co_queue_next(CoQueue *queue) static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
{ {
Coroutine *next; Coroutine *next;
CoQueueNextData *data;
next = QTAILQ_FIRST(&queue->entries); if (QTAILQ_EMPTY(&queue->entries)) {
if (next) { return false;
QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
QTAILQ_INSERT_TAIL(&unlock_bh_queue, next, co_queue_next);
trace_qemu_co_queue_next(next);
qemu_bh_schedule(unlock_bh);
} }
return (next != NULL); data = g_slice_new(CoQueueNextData);
data->bh = aio_bh_new(queue->ctx, qemu_co_queue_next_bh, data);
QTAILQ_INIT(&data->entries);
qemu_bh_schedule(data->bh);
while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) {
QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
QTAILQ_INSERT_TAIL(&data->entries, next, co_queue_next);
trace_qemu_co_queue_next(next);
if (single) {
break;
}
}
return true;
}
bool qemu_co_queue_next(CoQueue *queue)
{
return qemu_co_queue_do_restart(queue, true);
} }
void qemu_co_queue_restart_all(CoQueue *queue) void qemu_co_queue_restart_all(CoQueue *queue)
{ {
while (qemu_co_queue_next(queue)) { qemu_co_queue_do_restart(queue, false);
/* Do nothing */
}
} }
bool qemu_co_queue_empty(CoQueue *queue) bool qemu_co_queue_empty(CoQueue *queue)

View file

@ -276,7 +276,7 @@ static BlockDriverState *bdrv_new_open(const char *filename,
drv = NULL; drv = NULL;
} }
ret = bdrv_open(bs, filename, flags, drv); ret = bdrv_open(bs, filename, NULL, flags, drv);
if (ret < 0) { if (ret < 0) {
error_report("Could not open '%s': %s", filename, strerror(-ret)); error_report("Could not open '%s': %s", filename, strerror(-ret));
goto fail; goto fail;
@ -2156,7 +2156,7 @@ static int img_rebase(int argc, char **argv)
bs_old_backing = bdrv_new("old_backing"); bs_old_backing = bdrv_new("old_backing");
bdrv_get_backing_filename(bs, backing_name, sizeof(backing_name)); bdrv_get_backing_filename(bs, backing_name, sizeof(backing_name));
ret = bdrv_open(bs_old_backing, backing_name, BDRV_O_FLAGS, ret = bdrv_open(bs_old_backing, backing_name, NULL, BDRV_O_FLAGS,
old_backing_drv); old_backing_drv);
if (ret) { if (ret) {
error_report("Could not open old backing file '%s'", backing_name); error_report("Could not open old backing file '%s'", backing_name);
@ -2164,7 +2164,7 @@ static int img_rebase(int argc, char **argv)
} }
if (out_baseimg[0]) { if (out_baseimg[0]) {
bs_new_backing = bdrv_new("new_backing"); bs_new_backing = bdrv_new("new_backing");
ret = bdrv_open(bs_new_backing, out_baseimg, BDRV_O_FLAGS, ret = bdrv_open(bs_new_backing, out_baseimg, NULL, BDRV_O_FLAGS,
new_backing_drv); new_backing_drv);
if (ret) { if (ret) {
error_report("Could not open new backing file '%s'", error_report("Could not open new backing file '%s'",

View file

@ -1773,7 +1773,7 @@ static int openfile(char *name, int flags, int growable)
} else { } else {
bs = bdrv_new("hda"); bs = bdrv_new("hda");
if (bdrv_open(bs, name, flags, NULL) < 0) { if (bdrv_open(bs, name, NULL, flags, NULL) < 0) {
fprintf(stderr, "%s: can't open device %s\n", progname, name); fprintf(stderr, "%s: can't open device %s\n", progname, name);
bdrv_delete(bs); bdrv_delete(bs);
bs = NULL; bs = NULL;

View file

@ -557,7 +557,7 @@ int main(int argc, char **argv)
bs = bdrv_new("hda"); bs = bdrv_new("hda");
srcpath = argv[optind]; srcpath = argv[optind];
if ((ret = bdrv_open(bs, srcpath, flags, NULL)) < 0) { if ((ret = bdrv_open(bs, srcpath, NULL, flags, NULL)) < 0) {
errno = -ret; errno = -ret;
err(EXIT_FAILURE, "Failed to bdrv_open '%s'", argv[optind]); err(EXIT_FAILURE, "Failed to bdrv_open '%s'", argv[optind]);
} }

View file

@ -400,6 +400,28 @@ const QDictEntry *qdict_next(const QDict *qdict, const QDictEntry *entry)
return ret; return ret;
} }
/**
* qdict_clone_shallow(): Clones a given QDict. Its entries are not copied, but
* another reference is added.
*/
QDict *qdict_clone_shallow(const QDict *src)
{
QDict *dest;
QDictEntry *entry;
int i;
dest = qdict_new();
for (i = 0; i < QDICT_BUCKET_MAX; i++) {
QLIST_FOREACH(entry, &src->table[i], next) {
qobject_incref(entry->value);
qdict_put_obj(dest, entry->key, entry->value);
}
}
return dest;
}
/** /**
* qentry_destroy(): Free all the memory allocated by a QDictEntry * qentry_destroy(): Free all the memory allocated by a QDictEntry
*/ */

View file

@ -50,10 +50,9 @@ _make_test_img 1M
for i in `seq 1 10`; do for i in `seq 1 10`; do
echo "savevm $i" echo "savevm $i"
# XXX(hch): adding -nographic would be good, but hangs the test $QEMU -nographic -hda $TEST_IMG -serial none -monitor stdio >/dev/null 2>&1 <<EOF
$QEMU -hda $TEST_IMG -monitor stdio >/dev/null 2>&1 <<EOF savevm test-$i
savevm test-$i quit
quit
EOF EOF
done done

75
tests/qemu-iotests/050 Executable file
View file

@ -0,0 +1,75 @@
#!/bin/bash
#
# Test qemu-img rebase with zero clusters
#
# Copyright (C) 2013 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# creator
owner=pbonzini@redhat.com
seq=`basename $0`
echo "QA output created by $seq"
here=`pwd`
tmp=/tmp/$$
status=1 # failure is the default!
_cleanup()
{
_cleanup_test_img
rm -f $TEST_IMG.old
rm -f $TEST_IMG.new
}
trap "_cleanup; exit \$status" 0 1 2 3 15
# get standard environment, filters and checks
. ./common.rc
. ./common.filter
_supported_fmt qcow2 qed
_supported_proto file
_supported_os Linux
if test "$IMGFMT" = qcow2 && test $IMGOPTS = ""; then
IMGOPTS=compat=1.1
fi
echo
echo "== Creating images =="
size=10M
_make_test_img $size
$QEMU_IO -c "write -P 0x40 0 1048576" $TEST_IMG | _filter_qemu_io
mv $TEST_IMG $TEST_IMG.old
_make_test_img $size
$QEMU_IO -c "write -P 0x5a 0 1048576" $TEST_IMG | _filter_qemu_io
mv $TEST_IMG $TEST_IMG.new
_make_test_img -b $TEST_IMG.old $size
$QEMU_IO -c "write -z 0 1048576" $TEST_IMG | _filter_qemu_io
echo
echo "== Rebasing the image =="
$QEMU_IMG rebase -b $TEST_IMG.new $TEST_IMG
$QEMU_IO -c "read -P 0x00 0 1048576" $TEST_IMG | _filter_qemu_io
# success, all done
echo "*** done"
rm -f $seq.full
status=0

View file

@ -0,0 +1,17 @@
QA output created by 050
== Creating images ==
Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760
wrote 1048576/1048576 bytes at offset 0
1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760
wrote 1048576/1048576 bytes at offset 0
1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760 backing_file='TEST_DIR/t.IMGFMT.old'
wrote 1048576/1048576 bytes at offset 0
1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
== Rebasing the image ==
read 1048576/1048576 bytes at offset 0
1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
*** done

View file

@ -56,3 +56,4 @@
047 rw auto 047 rw auto
048 img auto quick 048 img auto quick
049 rw auto 049 rw auto
050 rw auto backing quick

View file

@ -4,6 +4,8 @@
#include "block/thread-pool.h" #include "block/thread-pool.h"
#include "block/block.h" #include "block/block.h"
static AioContext *ctx;
static ThreadPool *pool;
static int active; static int active;
typedef struct { typedef struct {
@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret)
active--; active--;
} }
/* A non-blocking poll of the main AIO context (we cannot use aio_poll
* because we do not know the AioContext).
*/
static void qemu_aio_wait_nonblocking(void)
{
qemu_notify_event();
qemu_aio_wait();
}
/* Wait until all aio and bh activity has finished */ /* Wait until all aio and bh activity has finished */
static void qemu_aio_wait_all(void) static void qemu_aio_wait_all(void)
{ {
while (qemu_aio_wait()) { while (aio_poll(ctx, true)) {
/* Do nothing */ /* Do nothing */
} }
} }
@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void)
static void test_submit(void) static void test_submit(void)
{ {
WorkerTestData data = { .n = 0 }; WorkerTestData data = { .n = 0 };
thread_pool_submit(worker_cb, &data); thread_pool_submit(pool, worker_cb, &data);
qemu_aio_wait_all(); qemu_aio_wait_all();
g_assert_cmpint(data.n, ==, 1); g_assert_cmpint(data.n, ==, 1);
} }
@ -66,7 +59,8 @@ static void test_submit(void)
static void test_submit_aio(void) static void test_submit_aio(void)
{ {
WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data); data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
done_cb, &data);
/* The callbacks are not called until after the first wait. */ /* The callbacks are not called until after the first wait. */
active = 1; active = 1;
@ -84,7 +78,7 @@ static void co_test_cb(void *opaque)
active = 1; active = 1;
data->n = 0; data->n = 0;
data->ret = -EINPROGRESS; data->ret = -EINPROGRESS;
thread_pool_submit_co(worker_cb, data); thread_pool_submit_co(pool, worker_cb, data);
/* The test continues in test_submit_co, after qemu_coroutine_enter... */ /* The test continues in test_submit_co, after qemu_coroutine_enter... */
@ -126,12 +120,12 @@ static void test_submit_many(void)
for (i = 0; i < 100; i++) { for (i = 0; i < 100; i++) {
data[i].n = 0; data[i].n = 0;
data[i].ret = -EINPROGRESS; data[i].ret = -EINPROGRESS;
thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
} }
active = 100; active = 100;
while (active > 0) { while (active > 0) {
qemu_aio_wait(); aio_poll(ctx, true);
} }
for (i = 0; i < 100; i++) { for (i = 0; i < 100; i++) {
g_assert_cmpint(data[i].n, ==, 1); g_assert_cmpint(data[i].n, ==, 1);
@ -154,7 +148,7 @@ static void test_cancel(void)
for (i = 0; i < 100; i++) { for (i = 0; i < 100; i++) {
data[i].n = 0; data[i].n = 0;
data[i].ret = -EINPROGRESS; data[i].ret = -EINPROGRESS;
data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
done_cb, &data[i]); done_cb, &data[i]);
} }
@ -162,7 +156,8 @@ static void test_cancel(void)
* run, but do not waste too much time... * run, but do not waste too much time...
*/ */
active = 100; active = 100;
qemu_aio_wait_nonblocking(); aio_notify(ctx);
aio_poll(ctx, false);
/* Wait some time for the threads to start, with some sanity /* Wait some time for the threads to start, with some sanity
* testing on the behavior of the scheduler... * testing on the behavior of the scheduler...
@ -208,11 +203,10 @@ static void test_cancel(void)
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
/* These should be removed once each AioContext has its thread pool. int ret;
* The test should create its own AioContext.
*/ ctx = aio_context_new();
qemu_init_main_loop(); pool = aio_get_thread_pool(ctx);
bdrv_init();
g_test_init(&argc, &argv, NULL); g_test_init(&argc, &argv, NULL);
g_test_add_func("/thread-pool/submit", test_submit); g_test_add_func("/thread-pool/submit", test_submit);
@ -220,5 +214,9 @@ int main(int argc, char **argv)
g_test_add_func("/thread-pool/submit-co", test_submit_co); g_test_add_func("/thread-pool/submit-co", test_submit_co);
g_test_add_func("/thread-pool/submit-many", test_submit_many); g_test_add_func("/thread-pool/submit-many", test_submit_many);
g_test_add_func("/thread-pool/cancel", test_cancel); g_test_add_func("/thread-pool/cancel", test_cancel);
return g_test_run();
ret = g_test_run();
aio_context_unref(ctx);
return ret;
} }

View file

@ -24,7 +24,7 @@
#include "qemu/event_notifier.h" #include "qemu/event_notifier.h"
#include "block/thread-pool.h" #include "block/thread-pool.h"
static void do_spawn_thread(void); static void do_spawn_thread(ThreadPool *pool);
typedef struct ThreadPoolElement ThreadPoolElement; typedef struct ThreadPoolElement ThreadPoolElement;
@ -37,6 +37,7 @@ enum ThreadState {
struct ThreadPoolElement { struct ThreadPoolElement {
BlockDriverAIOCB common; BlockDriverAIOCB common;
ThreadPool *pool;
ThreadPoolFunc *func; ThreadPoolFunc *func;
void *arg; void *arg;
@ -54,49 +55,56 @@ struct ThreadPoolElement {
QLIST_ENTRY(ThreadPoolElement) all; QLIST_ENTRY(ThreadPoolElement) all;
}; };
static EventNotifier notifier; struct ThreadPool {
static QemuMutex lock; EventNotifier notifier;
static QemuCond check_cancel; AioContext *ctx;
static QemuSemaphore sem; QemuMutex lock;
static int max_threads = 64; QemuCond check_cancel;
static QEMUBH *new_thread_bh; QemuCond worker_stopped;
QemuSemaphore sem;
int max_threads;
QEMUBH *new_thread_bh;
/* The following variables are protected by the global mutex. */ /* The following variables are only accessed from one AioContext. */
static QLIST_HEAD(, ThreadPoolElement) head; QLIST_HEAD(, ThreadPoolElement) head;
/* The following variables are protected by lock. */ /* The following variables are protected by lock. */
static QTAILQ_HEAD(, ThreadPoolElement) request_list; QTAILQ_HEAD(, ThreadPoolElement) request_list;
static int cur_threads; int cur_threads;
static int idle_threads; int idle_threads;
static int new_threads; /* backlog of threads we need to create */ int new_threads; /* backlog of threads we need to create */
static int pending_threads; /* threads created but not running yet */ int pending_threads; /* threads created but not running yet */
static int pending_cancellations; /* whether we need a cond_broadcast */ int pending_cancellations; /* whether we need a cond_broadcast */
bool stopping;
};
static void *worker_thread(void *unused) static void *worker_thread(void *opaque)
{ {
qemu_mutex_lock(&lock); ThreadPool *pool = opaque;
pending_threads--;
do_spawn_thread();
while (1) { qemu_mutex_lock(&pool->lock);
pool->pending_threads--;
do_spawn_thread(pool);
while (!pool->stopping) {
ThreadPoolElement *req; ThreadPoolElement *req;
int ret; int ret;
do { do {
idle_threads++; pool->idle_threads++;
qemu_mutex_unlock(&lock); qemu_mutex_unlock(&pool->lock);
ret = qemu_sem_timedwait(&sem, 10000); ret = qemu_sem_timedwait(&pool->sem, 10000);
qemu_mutex_lock(&lock); qemu_mutex_lock(&pool->lock);
idle_threads--; pool->idle_threads--;
} while (ret == -1 && !QTAILQ_EMPTY(&request_list)); } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
if (ret == -1) { if (ret == -1 || pool->stopping) {
break; break;
} }
req = QTAILQ_FIRST(&request_list); req = QTAILQ_FIRST(&pool->request_list);
QTAILQ_REMOVE(&request_list, req, reqs); QTAILQ_REMOVE(&pool->request_list, req, reqs);
req->state = THREAD_ACTIVE; req->state = THREAD_ACTIVE;
qemu_mutex_unlock(&lock); qemu_mutex_unlock(&pool->lock);
ret = req->func(req->arg); ret = req->func(req->arg);
@ -105,45 +113,48 @@ static void *worker_thread(void *unused)
smp_wmb(); smp_wmb();
req->state = THREAD_DONE; req->state = THREAD_DONE;
qemu_mutex_lock(&lock); qemu_mutex_lock(&pool->lock);
if (pending_cancellations) { if (pool->pending_cancellations) {
qemu_cond_broadcast(&check_cancel); qemu_cond_broadcast(&pool->check_cancel);
} }
event_notifier_set(&notifier); event_notifier_set(&pool->notifier);
} }
cur_threads--; pool->cur_threads--;
qemu_mutex_unlock(&lock); qemu_cond_signal(&pool->worker_stopped);
qemu_mutex_unlock(&pool->lock);
return NULL; return NULL;
} }
static void do_spawn_thread(void) static void do_spawn_thread(ThreadPool *pool)
{ {
QemuThread t; QemuThread t;
/* Runs with lock taken. */ /* Runs with lock taken. */
if (!new_threads) { if (!pool->new_threads) {
return; return;
} }
new_threads--; pool->new_threads--;
pending_threads++; pool->pending_threads++;
qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED); qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
} }
static void spawn_thread_bh_fn(void *opaque) static void spawn_thread_bh_fn(void *opaque)
{ {
qemu_mutex_lock(&lock); ThreadPool *pool = opaque;
do_spawn_thread();
qemu_mutex_unlock(&lock); qemu_mutex_lock(&pool->lock);
do_spawn_thread(pool);
qemu_mutex_unlock(&pool->lock);
} }
static void spawn_thread(void) static void spawn_thread(ThreadPool *pool)
{ {
cur_threads++; pool->cur_threads++;
new_threads++; pool->new_threads++;
/* If there are threads being created, they will spawn new workers, so /* If there are threads being created, they will spawn new workers, so
* we don't spend time creating many threads in a loop holding a mutex or * we don't spend time creating many threads in a loop holding a mutex or
* starving the current vcpu. * starving the current vcpu.
@ -151,23 +162,25 @@ static void spawn_thread(void)
* If there are no idle threads, ask the main thread to create one, so we * If there are no idle threads, ask the main thread to create one, so we
* inherit the correct affinity instead of the vcpu affinity. * inherit the correct affinity instead of the vcpu affinity.
*/ */
if (!pending_threads) { if (!pool->pending_threads) {
qemu_bh_schedule(new_thread_bh); qemu_bh_schedule(pool->new_thread_bh);
} }
} }
static void event_notifier_ready(EventNotifier *notifier) static void event_notifier_ready(EventNotifier *notifier)
{ {
ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
ThreadPoolElement *elem, *next; ThreadPoolElement *elem, *next;
event_notifier_test_and_clear(notifier); event_notifier_test_and_clear(notifier);
restart: restart:
QLIST_FOREACH_SAFE(elem, &head, all, next) { QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
continue; continue;
} }
if (elem->state == THREAD_DONE) { if (elem->state == THREAD_DONE) {
trace_thread_pool_complete(elem, elem->common.opaque, elem->ret); trace_thread_pool_complete(pool, elem, elem->common.opaque,
elem->ret);
} }
if (elem->state == THREAD_DONE && elem->common.cb) { if (elem->state == THREAD_DONE && elem->common.cb) {
QLIST_REMOVE(elem, all); QLIST_REMOVE(elem, all);
@ -186,34 +199,36 @@ restart:
static int thread_pool_active(EventNotifier *notifier) static int thread_pool_active(EventNotifier *notifier)
{ {
return !QLIST_EMPTY(&head); ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
return !QLIST_EMPTY(&pool->head);
} }
static void thread_pool_cancel(BlockDriverAIOCB *acb) static void thread_pool_cancel(BlockDriverAIOCB *acb)
{ {
ThreadPoolElement *elem = (ThreadPoolElement *)acb; ThreadPoolElement *elem = (ThreadPoolElement *)acb;
ThreadPool *pool = elem->pool;
trace_thread_pool_cancel(elem, elem->common.opaque); trace_thread_pool_cancel(elem, elem->common.opaque);
qemu_mutex_lock(&lock); qemu_mutex_lock(&pool->lock);
if (elem->state == THREAD_QUEUED && if (elem->state == THREAD_QUEUED &&
/* No thread has yet started working on elem. we can try to "steal" /* No thread has yet started working on elem. we can try to "steal"
* the item from the worker if we can get a signal from the * the item from the worker if we can get a signal from the
* semaphore. Because this is non-blocking, we can do it with * semaphore. Because this is non-blocking, we can do it with
* the lock taken and ensure that elem will remain THREAD_QUEUED. * the lock taken and ensure that elem will remain THREAD_QUEUED.
*/ */
qemu_sem_timedwait(&sem, 0) == 0) { qemu_sem_timedwait(&pool->sem, 0) == 0) {
QTAILQ_REMOVE(&request_list, elem, reqs); QTAILQ_REMOVE(&pool->request_list, elem, reqs);
elem->state = THREAD_CANCELED; elem->state = THREAD_CANCELED;
event_notifier_set(&notifier); event_notifier_set(&pool->notifier);
} else { } else {
pending_cancellations++; pool->pending_cancellations++;
while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
qemu_cond_wait(&check_cancel, &lock); qemu_cond_wait(&pool->check_cancel, &pool->lock);
} }
pending_cancellations--; pool->pending_cancellations--;
} }
qemu_mutex_unlock(&lock); qemu_mutex_unlock(&pool->lock);
} }
static const AIOCBInfo thread_pool_aiocb_info = { static const AIOCBInfo thread_pool_aiocb_info = {
@ -221,7 +236,8 @@ static const AIOCBInfo thread_pool_aiocb_info = {
.cancel = thread_pool_cancel, .cancel = thread_pool_cancel,
}; };
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
ThreadPoolFunc *func, void *arg,
BlockDriverCompletionFunc *cb, void *opaque) BlockDriverCompletionFunc *cb, void *opaque)
{ {
ThreadPoolElement *req; ThreadPoolElement *req;
@ -230,18 +246,19 @@ BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
req->func = func; req->func = func;
req->arg = arg; req->arg = arg;
req->state = THREAD_QUEUED; req->state = THREAD_QUEUED;
req->pool = pool;
QLIST_INSERT_HEAD(&head, req, all); QLIST_INSERT_HEAD(&pool->head, req, all);
trace_thread_pool_submit(req, arg); trace_thread_pool_submit(pool, req, arg);
qemu_mutex_lock(&lock); qemu_mutex_lock(&pool->lock);
if (idle_threads == 0 && cur_threads < max_threads) { if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
spawn_thread(); spawn_thread(pool);
} }
QTAILQ_INSERT_TAIL(&request_list, req, reqs); QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
qemu_mutex_unlock(&lock); qemu_mutex_unlock(&pool->lock);
qemu_sem_post(&sem); qemu_sem_post(&pool->sem);
return &req->common; return &req->common;
} }
@ -258,32 +275,80 @@ static void thread_pool_co_cb(void *opaque, int ret)
qemu_coroutine_enter(co->co, NULL); qemu_coroutine_enter(co->co, NULL);
} }
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
void *arg)
{ {
ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
assert(qemu_in_coroutine()); assert(qemu_in_coroutine());
thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
qemu_coroutine_yield(); qemu_coroutine_yield();
return tpc.ret; return tpc.ret;
} }
void thread_pool_submit(ThreadPoolFunc *func, void *arg) void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
{ {
thread_pool_submit_aio(func, arg, NULL, NULL); thread_pool_submit_aio(pool, func, arg, NULL, NULL);
} }
static void thread_pool_init(void) static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{ {
QLIST_INIT(&head); if (!ctx) {
event_notifier_init(&notifier, false); ctx = qemu_get_aio_context();
qemu_mutex_init(&lock); }
qemu_cond_init(&check_cancel);
qemu_sem_init(&sem, 0);
qemu_aio_set_event_notifier(&notifier, event_notifier_ready,
thread_pool_active);
QTAILQ_INIT(&request_list); memset(pool, 0, sizeof(*pool));
new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL); event_notifier_init(&pool->notifier, false);
pool->ctx = ctx;
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->check_cancel);
qemu_cond_init(&pool->worker_stopped);
qemu_sem_init(&pool->sem, 0);
pool->max_threads = 64;
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
QLIST_INIT(&pool->head);
QTAILQ_INIT(&pool->request_list);
aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
thread_pool_active);
} }
block_init(thread_pool_init) ThreadPool *thread_pool_new(AioContext *ctx)
{
ThreadPool *pool = g_new(ThreadPool, 1);
thread_pool_init_one(pool, ctx);
return pool;
}
void thread_pool_free(ThreadPool *pool)
{
if (!pool) {
return;
}
assert(QLIST_EMPTY(&pool->head));
qemu_mutex_lock(&pool->lock);
/* Stop new threads from spawning */
qemu_bh_delete(pool->new_thread_bh);
pool->cur_threads -= pool->new_threads;
pool->new_threads = 0;
/* Wait for worker threads to terminate */
pool->stopping = true;
while (pool->cur_threads > 0) {
qemu_sem_post(&pool->sem);
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
}
qemu_mutex_unlock(&pool->lock);
aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
qemu_sem_destroy(&pool->sem);
qemu_cond_destroy(&pool->check_cancel);
qemu_cond_destroy(&pool->worker_stopped);
qemu_mutex_destroy(&pool->lock);
event_notifier_cleanup(&pool->notifier);
g_free(pool);
}

View file

@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat
vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p" vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
# thread-pool.c # thread-pool.c
thread_pool_submit(void *req, void *opaque) "req %p opaque %p" thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d" thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
# posix-aio-compat.c # posix-aio-compat.c

View file

@ -1066,6 +1066,40 @@ QemuOpts *qemu_opts_from_qdict(QemuOptsList *list, const QDict *qdict,
return opts; return opts;
} }
/*
* Adds all QDict entries to the QemuOpts that can be added and removes them
* from the QDict. When this function returns, the QDict contains only those
* entries that couldn't be added to the QemuOpts.
*/
void qemu_opts_absorb_qdict(QemuOpts *opts, QDict *qdict, Error **errp)
{
const QDictEntry *entry, *next;
entry = qdict_first(qdict);
while (entry != NULL) {
Error *local_err = NULL;
OptsFromQDictState state = {
.errp = &local_err,
.opts = opts,
};
next = qdict_next(qdict, entry);
if (find_desc_by_name(opts->list->desc, entry->key)) {
qemu_opts_from_qdict_1(entry->key, entry->value, &state);
if (error_is_set(&local_err)) {
error_propagate(errp, local_err);
return;
} else {
qdict_del(qdict, entry->key);
}
}
entry = next;
}
}
/* /*
* Convert from QemuOpts to QDict. * Convert from QemuOpts to QDict.
* The QDict values are of type QString. * The QDict values are of type QString.