From 1e93b9fba2e0e23105e91574fcfc193f899d8e74 Mon Sep 17 00:00:00 2001 From: Vladimir Sementsov-Ogievskiy Date: Tue, 8 Nov 2016 01:50:34 -0500 Subject: [PATCH 01/13] blockjob: fix dead pointer in txn list Though it is not intended to be reached through normal circumstances, if we do not gracefully deconstruct the transaction QLIST, we may wind up with stale pointers in the list. The rest of this series attempts to address the underlying issues, but this should fix list inconsistencies. Signed-off-by: Vladimir Sementsov-Ogievskiy Tested-by: John Snow Reviewed-by: John Snow Reviewed-by: Eric Blake Reviewed-by: Kevin Wolf Signed-off-by: John Snow Message-id: 1478587839-9834-2-git-send-email-jsnow@redhat.com [Rewrote commit message. --js] Signed-off-by: John Snow Reviewed-by: Eric Blake Reviewed-by: Kevin Wolf Signed-off-by: John Snow Signed-off-by: Jeff Cody --- blockjob.c | 1 + 1 file changed, 1 insertion(+) diff --git a/blockjob.c b/blockjob.c index 4aa14a4974..4d0ef53fba 100644 --- a/blockjob.c +++ b/blockjob.c @@ -256,6 +256,7 @@ static void block_job_completed_single(BlockJob *job) } if (job->txn) { + QLIST_REMOVE(job, txn_list); block_job_txn_unref(job->txn); } block_job_unref(job); From e8a40bf71d606f9f87866fb2461eaed52814b38e Mon Sep 17 00:00:00 2001 From: John Snow Date: Tue, 8 Nov 2016 01:50:35 -0500 Subject: [PATCH 02/13] blockjob: add .clean property Cleaning up after we have deferred to the main thread but before the transaction has converged can be dangerous and result in deadlocks if the job cleanup invokes any BH polling loops. A job may attempt to begin cleaning up, but may induce another job to enter its cleanup routine. The second job, part of our same transaction, will block waiting for the first job to finish, so neither job may now make progress. To rectify this, allow jobs to register a cleanup operation that will always run regardless of if the job was in a transaction or not, and if the transaction job group completed successfully or not. Move sensitive cleanup to this callback instead which is guaranteed to be run only after the transaction has converged, which removes sensitive timing constraints from said cleanup. Furthermore, in future patches these cleanup operations will be performed regardless of whether or not we actually started the job. Therefore, cleanup callbacks should essentially confine themselves to undoing create operations, e.g. setup actions taken in what is now backup_start. Reported-by: Vladimir Sementsov-Ogievskiy Signed-off-by: John Snow Reviewed-by: Kevin Wolf Message-id: 1478587839-9834-3-git-send-email-jsnow@redhat.com Signed-off-by: Jeff Cody --- block/backup.c | 15 ++++++++++----- blockjob.c | 3 +++ include/block/blockjob_int.h | 8 ++++++++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/block/backup.c b/block/backup.c index 7b5d8a3757..734a24c698 100644 --- a/block/backup.c +++ b/block/backup.c @@ -242,6 +242,14 @@ static void backup_abort(BlockJob *job) } } +static void backup_clean(BlockJob *job) +{ + BackupBlockJob *s = container_of(job, BackupBlockJob, common); + assert(s->target); + blk_unref(s->target); + s->target = NULL; +} + static void backup_attached_aio_context(BlockJob *job, AioContext *aio_context) { BackupBlockJob *s = container_of(job, BackupBlockJob, common); @@ -321,6 +329,7 @@ static const BlockJobDriver backup_job_driver = { .set_speed = backup_set_speed, .commit = backup_commit, .abort = backup_abort, + .clean = backup_clean, .attached_aio_context = backup_attached_aio_context, .drain = backup_drain, }; @@ -343,12 +352,8 @@ typedef struct { static void backup_complete(BlockJob *job, void *opaque) { - BackupBlockJob *s = container_of(job, BackupBlockJob, common); BackupCompleteData *data = opaque; - blk_unref(s->target); - s->target = NULL; - block_job_completed(job, data->ret); g_free(data); } @@ -658,7 +663,7 @@ void backup_start(const char *job_id, BlockDriverState *bs, bdrv_reclaim_dirty_bitmap(bs, sync_bitmap, NULL); } if (job) { - blk_unref(job->target); + backup_clean(&job->common); block_job_unref(&job->common); } } diff --git a/blockjob.c b/blockjob.c index 4d0ef53fba..e3c458c021 100644 --- a/blockjob.c +++ b/blockjob.c @@ -241,6 +241,9 @@ static void block_job_completed_single(BlockJob *job) job->driver->abort(job); } } + if (job->driver->clean) { + job->driver->clean(job); + } if (job->cb) { job->cb(job->opaque, job->ret); diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h index 40275e4437..60d91a0678 100644 --- a/include/block/blockjob_int.h +++ b/include/block/blockjob_int.h @@ -73,6 +73,14 @@ struct BlockJobDriver { */ void (*abort)(BlockJob *job); + /** + * If the callback is not NULL, it will be invoked after a call to either + * .commit() or .abort(). Regardless of which callback is invoked after + * completion, .clean() will always be called, even if the job does not + * belong to a transaction group. + */ + void (*clean)(BlockJob *job); + /** * If the callback is not NULL, it will be invoked when the job transitions * into the paused state. Paused jobs must not perform any asynchronous From a7815a764c40c9dcf204f666c2d90248095376a8 Mon Sep 17 00:00:00 2001 From: John Snow Date: Tue, 8 Nov 2016 01:50:36 -0500 Subject: [PATCH 03/13] blockjob: add .start field Add an explicit start field to specify the entrypoint. We already have ownership of the coroutine itself AND managing the lifetime of the coroutine, let's take control of creation of the coroutine, too. This will allow us to delay creation of the actual coroutine until we know we'll actually start a BlockJob in block_job_start. This avoids the sticky question of how to "un-create" a Coroutine that hasn't been started yet. Signed-off-by: John Snow Reviewed-by: Kevin Wolf Message-id: 1478587839-9834-4-git-send-email-jsnow@redhat.com Signed-off-by: Jeff Cody --- block/backup.c | 25 +++++++++++++------------ block/commit.c | 3 ++- block/mirror.c | 4 +++- block/stream.c | 3 ++- include/block/blockjob_int.h | 3 +++ 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/block/backup.c b/block/backup.c index 734a24c698..4ed449448a 100644 --- a/block/backup.c +++ b/block/backup.c @@ -323,17 +323,6 @@ static void backup_drain(BlockJob *job) } } -static const BlockJobDriver backup_job_driver = { - .instance_size = sizeof(BackupBlockJob), - .job_type = BLOCK_JOB_TYPE_BACKUP, - .set_speed = backup_set_speed, - .commit = backup_commit, - .abort = backup_abort, - .clean = backup_clean, - .attached_aio_context = backup_attached_aio_context, - .drain = backup_drain, -}; - static BlockErrorAction backup_error_action(BackupBlockJob *job, bool read, int error) { @@ -542,6 +531,18 @@ static void coroutine_fn backup_run(void *opaque) block_job_defer_to_main_loop(&job->common, backup_complete, data); } +static const BlockJobDriver backup_job_driver = { + .instance_size = sizeof(BackupBlockJob), + .job_type = BLOCK_JOB_TYPE_BACKUP, + .start = backup_run, + .set_speed = backup_set_speed, + .commit = backup_commit, + .abort = backup_abort, + .clean = backup_clean, + .attached_aio_context = backup_attached_aio_context, + .drain = backup_drain, +}; + void backup_start(const char *job_id, BlockDriverState *bs, BlockDriverState *target, int64_t speed, MirrorSyncMode sync_mode, BdrvDirtyBitmap *sync_bitmap, @@ -653,7 +654,7 @@ void backup_start(const char *job_id, BlockDriverState *bs, block_job_add_bdrv(&job->common, target); job->common.len = len; - job->common.co = qemu_coroutine_create(backup_run, job); + job->common.co = qemu_coroutine_create(job->common.driver->start, job); block_job_txn_add_job(txn, &job->common); qemu_coroutine_enter(job->common.co); return; diff --git a/block/commit.c b/block/commit.c index e1eda8908b..20d27e27a8 100644 --- a/block/commit.c +++ b/block/commit.c @@ -205,6 +205,7 @@ static const BlockJobDriver commit_job_driver = { .instance_size = sizeof(CommitBlockJob), .job_type = BLOCK_JOB_TYPE_COMMIT, .set_speed = commit_set_speed, + .start = commit_run, }; void commit_start(const char *job_id, BlockDriverState *bs, @@ -288,7 +289,7 @@ void commit_start(const char *job_id, BlockDriverState *bs, s->backing_file_str = g_strdup(backing_file_str); s->on_error = on_error; - s->common.co = qemu_coroutine_create(commit_run, s); + s->common.co = qemu_coroutine_create(s->common.driver->start, s); trace_commit_start(bs, base, top, s, s->common.co); qemu_coroutine_enter(s->common.co); diff --git a/block/mirror.c b/block/mirror.c index b2c1fb855b..659e09cbaf 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -920,6 +920,7 @@ static const BlockJobDriver mirror_job_driver = { .instance_size = sizeof(MirrorBlockJob), .job_type = BLOCK_JOB_TYPE_MIRROR, .set_speed = mirror_set_speed, + .start = mirror_run, .complete = mirror_complete, .pause = mirror_pause, .attached_aio_context = mirror_attached_aio_context, @@ -930,6 +931,7 @@ static const BlockJobDriver commit_active_job_driver = { .instance_size = sizeof(MirrorBlockJob), .job_type = BLOCK_JOB_TYPE_COMMIT, .set_speed = mirror_set_speed, + .start = mirror_run, .complete = mirror_complete, .pause = mirror_pause, .attached_aio_context = mirror_attached_aio_context, @@ -1007,7 +1009,7 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, } } - s->common.co = qemu_coroutine_create(mirror_run, s); + s->common.co = qemu_coroutine_create(s->common.driver->start, s); trace_mirror_start(bs, s, s->common.co, opaque); qemu_coroutine_enter(s->common.co); } diff --git a/block/stream.c b/block/stream.c index b05856bd65..92309ffc2e 100644 --- a/block/stream.c +++ b/block/stream.c @@ -218,6 +218,7 @@ static const BlockJobDriver stream_job_driver = { .instance_size = sizeof(StreamBlockJob), .job_type = BLOCK_JOB_TYPE_STREAM, .set_speed = stream_set_speed, + .start = stream_run, }; void stream_start(const char *job_id, BlockDriverState *bs, @@ -254,7 +255,7 @@ void stream_start(const char *job_id, BlockDriverState *bs, s->bs_flags = orig_bs_flags; s->on_error = on_error; - s->common.co = qemu_coroutine_create(stream_run, s); + s->common.co = qemu_coroutine_create(s->common.driver->start, s); trace_stream_start(bs, base, s, s->common.co); qemu_coroutine_enter(s->common.co); } diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h index 60d91a0678..82238229c6 100644 --- a/include/block/blockjob_int.h +++ b/include/block/blockjob_int.h @@ -47,6 +47,9 @@ struct BlockJobDriver { /** Optional callback for job types that need to forward I/O status reset */ void (*iostatus_reset)(BlockJob *job); + /** Mandatory: Entrypoint for the Coroutine. */ + CoroutineEntry *start; + /** * Optional callback for job types whose completion must be triggered * manually. From 5ccac6f186e4a480074880d958760c7105cf9ba8 Mon Sep 17 00:00:00 2001 From: John Snow Date: Tue, 8 Nov 2016 01:50:37 -0500 Subject: [PATCH 04/13] blockjob: add block_job_start Instead of automatically starting jobs at creation time via backup_start et al, we'd like to return a job object pointer that can be started manually at later point in time. For now, add the block_job_start mechanism and start the jobs automatically as we have been doing, with conversions job-by-job coming in later patches. Of note: cancellation of unstarted jobs will perform all the normal cleanup as if the job had started, particularly abort and clean. The only difference is that we will not emit any events, because the job never actually started. Signed-off-by: John Snow Message-id: 1478587839-9834-5-git-send-email-jsnow@redhat.com Signed-off-by: Jeff Cody --- block/backup.c | 3 +-- block/commit.c | 5 ++-- block/mirror.c | 5 ++-- block/stream.c | 5 ++-- block/trace-events | 6 ++--- blockjob.c | 54 ++++++++++++++++++++++++++++++--------- include/block/blockjob.h | 9 +++++++ tests/test-blockjob-txn.c | 12 ++++----- 8 files changed, 67 insertions(+), 32 deletions(-) diff --git a/block/backup.c b/block/backup.c index 4ed449448a..ae1b99aa84 100644 --- a/block/backup.c +++ b/block/backup.c @@ -654,9 +654,8 @@ void backup_start(const char *job_id, BlockDriverState *bs, block_job_add_bdrv(&job->common, target); job->common.len = len; - job->common.co = qemu_coroutine_create(job->common.driver->start, job); block_job_txn_add_job(txn, &job->common); - qemu_coroutine_enter(job->common.co); + block_job_start(&job->common); return; error: diff --git a/block/commit.c b/block/commit.c index 20d27e27a8..c284e8535d 100644 --- a/block/commit.c +++ b/block/commit.c @@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs, s->backing_file_str = g_strdup(backing_file_str); s->on_error = on_error; - s->common.co = qemu_coroutine_create(s->common.driver->start, s); - trace_commit_start(bs, base, top, s, s->common.co); - qemu_coroutine_enter(s->common.co); + trace_commit_start(bs, base, top, s); + block_job_start(&s->common); } diff --git a/block/mirror.c b/block/mirror.c index 659e09cbaf..62ac87f0c4 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -1009,9 +1009,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, } } - s->common.co = qemu_coroutine_create(s->common.driver->start, s); - trace_mirror_start(bs, s, s->common.co, opaque); - qemu_coroutine_enter(s->common.co); + trace_mirror_start(bs, s, opaque); + block_job_start(&s->common); } void mirror_start(const char *job_id, BlockDriverState *bs, diff --git a/block/stream.c b/block/stream.c index 92309ffc2e..1523ba7dfb 100644 --- a/block/stream.c +++ b/block/stream.c @@ -255,7 +255,6 @@ void stream_start(const char *job_id, BlockDriverState *bs, s->bs_flags = orig_bs_flags; s->on_error = on_error; - s->common.co = qemu_coroutine_create(s->common.driver->start, s); - trace_stream_start(bs, base, s, s->common.co); - qemu_coroutine_enter(s->common.co); + trace_stream_start(bs, base, s); + block_job_start(&s->common); } diff --git a/block/trace-events b/block/trace-events index 882c9034c2..cfc05f2478 100644 --- a/block/trace-events +++ b/block/trace-events @@ -19,14 +19,14 @@ bdrv_co_do_copy_on_readv(void *bs, int64_t offset, unsigned int bytes, int64_t c # block/stream.c stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d" -stream_start(void *bs, void *base, void *s, void *co) "bs %p base %p s %p co %p" +stream_start(void *bs, void *base, void *s) "bs %p base %p s %p" # block/commit.c commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d" -commit_start(void *bs, void *base, void *top, void *s, void *co) "bs %p base %p top %p s %p co %p" +commit_start(void *bs, void *base, void *top, void *s) "bs %p base %p top %p s %p" # block/mirror.c -mirror_start(void *bs, void *s, void *co, void *opaque) "bs %p s %p co %p opaque %p" +mirror_start(void *bs, void *s, void *opaque) "bs %p s %p opaque %p" mirror_restart_iter(void *s, int64_t cnt) "s %p dirty count %"PRId64 mirror_before_flush(void *s) "s %p" mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64 diff --git a/blockjob.c b/blockjob.c index e3c458c021..513620c199 100644 --- a/blockjob.c +++ b/blockjob.c @@ -174,7 +174,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, job->blk = blk; job->cb = cb; job->opaque = opaque; - job->busy = true; + job->busy = false; + job->paused = true; + job->pause_count = 1; job->refcnt = 1; bs->job = job; @@ -202,6 +204,23 @@ bool block_job_is_internal(BlockJob *job) return (job->id == NULL); } +static bool block_job_started(BlockJob *job) +{ + return job->co; +} + +void block_job_start(BlockJob *job) +{ + assert(job && !block_job_started(job) && job->paused && + !job->busy && job->driver->start); + job->co = qemu_coroutine_create(job->driver->start, job); + if (--job->pause_count == 0) { + job->paused = false; + job->busy = true; + qemu_coroutine_enter(job->co); + } +} + void block_job_ref(BlockJob *job) { ++job->refcnt; @@ -248,14 +267,18 @@ static void block_job_completed_single(BlockJob *job) if (job->cb) { job->cb(job->opaque, job->ret); } - if (block_job_is_cancelled(job)) { - block_job_event_cancelled(job); - } else { - const char *msg = NULL; - if (job->ret < 0) { - msg = strerror(-job->ret); + + /* Emit events only if we actually started */ + if (block_job_started(job)) { + if (block_job_is_cancelled(job)) { + block_job_event_cancelled(job); + } else { + const char *msg = NULL; + if (job->ret < 0) { + msg = strerror(-job->ret); + } + block_job_event_completed(job, msg); } - block_job_event_completed(job, msg); } if (job->txn) { @@ -363,7 +386,8 @@ void block_job_complete(BlockJob *job, Error **errp) { /* Should not be reachable via external interface for internal jobs */ assert(job->id); - if (job->pause_count || job->cancelled || !job->driver->complete) { + if (job->pause_count || job->cancelled || + !block_job_started(job) || !job->driver->complete) { error_setg(errp, "The active block job '%s' cannot be completed", job->id); return; @@ -395,6 +419,8 @@ bool block_job_user_paused(BlockJob *job) void coroutine_fn block_job_pause_point(BlockJob *job) { + assert(job && block_job_started(job)); + if (!block_job_should_pause(job)) { return; } @@ -446,9 +472,13 @@ void block_job_enter(BlockJob *job) void block_job_cancel(BlockJob *job) { - job->cancelled = true; - block_job_iostatus_reset(job); - block_job_enter(job); + if (block_job_started(job)) { + job->cancelled = true; + block_job_iostatus_reset(job); + block_job_enter(job); + } else { + block_job_completed(job, -ECANCELED); + } } bool block_job_is_cancelled(BlockJob *job) diff --git a/include/block/blockjob.h b/include/block/blockjob.h index 356cacf004..1acb256223 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -188,6 +188,15 @@ void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs); */ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp); +/** + * block_job_start: + * @job: A job that has not yet been started. + * + * Begins execution of a block job. + * Takes ownership of one reference to the job object. + */ +void block_job_start(BlockJob *job); + /** * block_job_cancel: * @job: The job to be canceled. diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c index f9afc3be41..b132e39097 100644 --- a/tests/test-blockjob-txn.c +++ b/tests/test-blockjob-txn.c @@ -24,10 +24,6 @@ typedef struct { int *result; } TestBlockJob; -static const BlockJobDriver test_block_job_driver = { - .instance_size = sizeof(TestBlockJob), -}; - static void test_block_job_complete(BlockJob *job, void *opaque) { BlockDriverState *bs = blk_bs(job->blk); @@ -77,6 +73,11 @@ static void test_block_job_cb(void *opaque, int ret) g_free(data); } +static const BlockJobDriver test_block_job_driver = { + .instance_size = sizeof(TestBlockJob), + .start = test_block_job_run, +}; + /* Create a block job that completes with a given return code after a given * number of event loop iterations. The return code is stored in the given * result pointer. @@ -104,10 +105,9 @@ static BlockJob *test_block_job_start(unsigned int iterations, s->use_timer = use_timer; s->rc = rc; s->result = result; - s->common.co = qemu_coroutine_create(test_block_job_run, s); data->job = s; data->result = result; - qemu_coroutine_enter(s->common.co); + block_job_start(&s->common); return &s->common; } From 111049a4ecefc9cf1ac75c773f4c5c165f27fe63 Mon Sep 17 00:00:00 2001 From: John Snow Date: Tue, 8 Nov 2016 01:50:38 -0500 Subject: [PATCH 05/13] blockjob: refactor backup_start as backup_job_create Refactor backup_start as backup_job_create, which only creates the job, but does not automatically start it. The old interface, 'backup_start', is not kept in favor of limiting the number of nearly-identical interfaces that would have to be edited to keep up with QAPI changes in the future. Callers that wish to synchronously start the backup_block_job can instead just call block_job_start immediately after calling backup_job_create. Transactions are updated to use the new interface, calling block_job_start only during the .commit phase, which helps prevent race conditions where jobs may finish before we even finish building the transaction. This may happen, for instance, during empty block backup jobs. Reported-by: Vladimir Sementsov-Ogievskiy Signed-off-by: John Snow Message-id: 1478587839-9834-6-git-send-email-jsnow@redhat.com Signed-off-by: Jeff Cody --- block/backup.c | 26 +++++++------ block/replication.c | 12 +++--- blockdev.c | 81 +++++++++++++++++++++++++-------------- include/block/block_int.h | 23 +++++------ 4 files changed, 85 insertions(+), 57 deletions(-) diff --git a/block/backup.c b/block/backup.c index ae1b99aa84..ea38733849 100644 --- a/block/backup.c +++ b/block/backup.c @@ -543,7 +543,7 @@ static const BlockJobDriver backup_job_driver = { .drain = backup_drain, }; -void backup_start(const char *job_id, BlockDriverState *bs, +BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs, BlockDriverState *target, int64_t speed, MirrorSyncMode sync_mode, BdrvDirtyBitmap *sync_bitmap, bool compress, @@ -563,52 +563,52 @@ void backup_start(const char *job_id, BlockDriverState *bs, if (bs == target) { error_setg(errp, "Source and target cannot be the same"); - return; + return NULL; } if (!bdrv_is_inserted(bs)) { error_setg(errp, "Device is not inserted: %s", bdrv_get_device_name(bs)); - return; + return NULL; } if (!bdrv_is_inserted(target)) { error_setg(errp, "Device is not inserted: %s", bdrv_get_device_name(target)); - return; + return NULL; } if (compress && target->drv->bdrv_co_pwritev_compressed == NULL) { error_setg(errp, "Compression is not supported for this drive %s", bdrv_get_device_name(target)); - return; + return NULL; } if (bdrv_op_is_blocked(bs, BLOCK_OP_TYPE_BACKUP_SOURCE, errp)) { - return; + return NULL; } if (bdrv_op_is_blocked(target, BLOCK_OP_TYPE_BACKUP_TARGET, errp)) { - return; + return NULL; } if (sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) { if (!sync_bitmap) { error_setg(errp, "must provide a valid bitmap name for " "\"incremental\" sync mode"); - return; + return NULL; } /* Create a new bitmap, and freeze/disable this one. */ if (bdrv_dirty_bitmap_create_successor(bs, sync_bitmap, errp) < 0) { - return; + return NULL; } } else if (sync_bitmap) { error_setg(errp, "a sync_bitmap was provided to backup_run, " "but received an incompatible sync_mode (%s)", MirrorSyncMode_lookup[sync_mode]); - return; + return NULL; } len = bdrv_getlength(bs); @@ -655,8 +655,8 @@ void backup_start(const char *job_id, BlockDriverState *bs, block_job_add_bdrv(&job->common, target); job->common.len = len; block_job_txn_add_job(txn, &job->common); - block_job_start(&job->common); - return; + + return &job->common; error: if (sync_bitmap) { @@ -666,4 +666,6 @@ void backup_start(const char *job_id, BlockDriverState *bs, backup_clean(&job->common); block_job_unref(&job->common); } + + return NULL; } diff --git a/block/replication.c b/block/replication.c index d5e2b0f497..729dd12499 100644 --- a/block/replication.c +++ b/block/replication.c @@ -421,6 +421,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode, int64_t active_length, hidden_length, disk_length; AioContext *aio_context; Error *local_err = NULL; + BlockJob *job; aio_context = bdrv_get_aio_context(bs); aio_context_acquire(aio_context); @@ -508,17 +509,18 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode, bdrv_op_block_all(top_bs, s->blocker); bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker); - backup_start(NULL, s->secondary_disk->bs, s->hidden_disk->bs, 0, - MIRROR_SYNC_MODE_NONE, NULL, false, - BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT, - BLOCK_JOB_INTERNAL, backup_job_completed, bs, - NULL, &local_err); + job = backup_job_create(NULL, s->secondary_disk->bs, s->hidden_disk->bs, + 0, MIRROR_SYNC_MODE_NONE, NULL, false, + BLOCKDEV_ON_ERROR_REPORT, + BLOCKDEV_ON_ERROR_REPORT, BLOCK_JOB_INTERNAL, + backup_job_completed, bs, NULL, &local_err); if (local_err) { error_propagate(errp, local_err); backup_job_cleanup(bs); aio_context_release(aio_context); return; } + block_job_start(job); break; default: aio_context_release(aio_context); diff --git a/blockdev.c b/blockdev.c index 102ca9fe01..245e1e1d17 100644 --- a/blockdev.c +++ b/blockdev.c @@ -1811,7 +1811,7 @@ typedef struct DriveBackupState { BlockJob *job; } DriveBackupState; -static void do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, +static BlockJob *do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, Error **errp); static void drive_backup_prepare(BlkActionState *common, Error **errp) @@ -1835,23 +1835,26 @@ static void drive_backup_prepare(BlkActionState *common, Error **errp) bdrv_drained_begin(bs); state->bs = bs; - do_drive_backup(backup, common->block_job_txn, &local_err); + state->job = do_drive_backup(backup, common->block_job_txn, &local_err); if (local_err) { error_propagate(errp, local_err); return; } +} - state->job = state->bs->job; +static void drive_backup_commit(BlkActionState *common) +{ + DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common); + assert(state->job); + block_job_start(state->job); } static void drive_backup_abort(BlkActionState *common) { DriveBackupState *state = DO_UPCAST(DriveBackupState, common, common); - BlockDriverState *bs = state->bs; - /* Only cancel if it's the job we started */ - if (bs && bs->job && bs->job == state->job) { - block_job_cancel_sync(bs->job); + if (state->job) { + block_job_cancel_sync(state->job); } } @@ -1872,8 +1875,8 @@ typedef struct BlockdevBackupState { AioContext *aio_context; } BlockdevBackupState; -static void do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn, - Error **errp); +static BlockJob *do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn, + Error **errp); static void blockdev_backup_prepare(BlkActionState *common, Error **errp) { @@ -1906,23 +1909,26 @@ static void blockdev_backup_prepare(BlkActionState *common, Error **errp) state->bs = bs; bdrv_drained_begin(state->bs); - do_blockdev_backup(backup, common->block_job_txn, &local_err); + state->job = do_blockdev_backup(backup, common->block_job_txn, &local_err); if (local_err) { error_propagate(errp, local_err); return; } +} - state->job = state->bs->job; +static void blockdev_backup_commit(BlkActionState *common) +{ + BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common); + assert(state->job); + block_job_start(state->job); } static void blockdev_backup_abort(BlkActionState *common) { BlockdevBackupState *state = DO_UPCAST(BlockdevBackupState, common, common); - BlockDriverState *bs = state->bs; - /* Only cancel if it's the job we started */ - if (bs && bs->job && bs->job == state->job) { - block_job_cancel_sync(bs->job); + if (state->job) { + block_job_cancel_sync(state->job); } } @@ -2072,12 +2078,14 @@ static const BlkActionOps actions[] = { [TRANSACTION_ACTION_KIND_DRIVE_BACKUP] = { .instance_size = sizeof(DriveBackupState), .prepare = drive_backup_prepare, + .commit = drive_backup_commit, .abort = drive_backup_abort, .clean = drive_backup_clean, }, [TRANSACTION_ACTION_KIND_BLOCKDEV_BACKUP] = { .instance_size = sizeof(BlockdevBackupState), .prepare = blockdev_backup_prepare, + .commit = blockdev_backup_commit, .abort = blockdev_backup_abort, .clean = blockdev_backup_clean, }, @@ -3106,11 +3114,13 @@ out: aio_context_release(aio_context); } -static void do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, Error **errp) +static BlockJob *do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, + Error **errp) { BlockDriverState *bs; BlockDriverState *target_bs; BlockDriverState *source = NULL; + BlockJob *job = NULL; BdrvDirtyBitmap *bmap = NULL; AioContext *aio_context; QDict *options = NULL; @@ -3139,7 +3149,7 @@ static void do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, Error **errp) bs = qmp_get_root_bs(backup->device, errp); if (!bs) { - return; + return NULL; } aio_context = bdrv_get_aio_context(bs); @@ -3213,10 +3223,10 @@ static void do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, Error **errp) } } - backup_start(backup->job_id, bs, target_bs, backup->speed, backup->sync, - bmap, backup->compress, backup->on_source_error, - backup->on_target_error, BLOCK_JOB_DEFAULT, - NULL, NULL, txn, &local_err); + job = backup_job_create(backup->job_id, bs, target_bs, backup->speed, + backup->sync, bmap, backup->compress, + backup->on_source_error, backup->on_target_error, + BLOCK_JOB_DEFAULT, NULL, NULL, txn, &local_err); bdrv_unref(target_bs); if (local_err != NULL) { error_propagate(errp, local_err); @@ -3225,11 +3235,17 @@ static void do_drive_backup(DriveBackup *backup, BlockJobTxn *txn, Error **errp) out: aio_context_release(aio_context); + return job; } void qmp_drive_backup(DriveBackup *arg, Error **errp) { - return do_drive_backup(arg, NULL, errp); + + BlockJob *job; + job = do_drive_backup(arg, NULL, errp); + if (job) { + block_job_start(job); + } } BlockDeviceInfoList *qmp_query_named_block_nodes(Error **errp) @@ -3237,12 +3253,14 @@ BlockDeviceInfoList *qmp_query_named_block_nodes(Error **errp) return bdrv_named_nodes_list(errp); } -void do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn, Error **errp) +BlockJob *do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn, + Error **errp) { BlockDriverState *bs; BlockDriverState *target_bs; Error *local_err = NULL; AioContext *aio_context; + BlockJob *job = NULL; if (!backup->has_speed) { backup->speed = 0; @@ -3262,7 +3280,7 @@ void do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn, Error **errp) bs = qmp_get_root_bs(backup->device, errp); if (!bs) { - return; + return NULL; } aio_context = bdrv_get_aio_context(bs); @@ -3284,20 +3302,25 @@ void do_blockdev_backup(BlockdevBackup *backup, BlockJobTxn *txn, Error **errp) goto out; } } - backup_start(backup->job_id, bs, target_bs, backup->speed, backup->sync, - NULL, backup->compress, backup->on_source_error, - backup->on_target_error, BLOCK_JOB_DEFAULT, - NULL, NULL, txn, &local_err); + job = backup_job_create(backup->job_id, bs, target_bs, backup->speed, + backup->sync, NULL, backup->compress, + backup->on_source_error, backup->on_target_error, + BLOCK_JOB_DEFAULT, NULL, NULL, txn, &local_err); if (local_err != NULL) { error_propagate(errp, local_err); } out: aio_context_release(aio_context); + return job; } void qmp_blockdev_backup(BlockdevBackup *arg, Error **errp) { - do_blockdev_backup(arg, NULL, errp); + BlockJob *job; + job = do_blockdev_backup(arg, NULL, errp); + if (job) { + block_job_start(job); + } } /* Parameter check and block job starting for drive mirroring. diff --git a/include/block/block_int.h b/include/block/block_int.h index b02abbd618..83a423c580 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -748,7 +748,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs, bool unmap, Error **errp); /* - * backup_start: + * backup_job_create: * @job_id: The id of the newly-created job, or %NULL to use the * device name of @bs. * @bs: Block device to operate on. @@ -764,18 +764,19 @@ void mirror_start(const char *job_id, BlockDriverState *bs, * @opaque: Opaque pointer value passed to @cb. * @txn: Transaction that this job is part of (may be NULL). * - * Start a backup operation on @bs. Clusters in @bs are written to @target + * Create a backup operation on @bs. Clusters in @bs are written to @target * until the job is cancelled or manually completed. */ -void backup_start(const char *job_id, BlockDriverState *bs, - BlockDriverState *target, int64_t speed, - MirrorSyncMode sync_mode, BdrvDirtyBitmap *sync_bitmap, - bool compress, - BlockdevOnError on_source_error, - BlockdevOnError on_target_error, - int creation_flags, - BlockCompletionFunc *cb, void *opaque, - BlockJobTxn *txn, Error **errp); +BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs, + BlockDriverState *target, int64_t speed, + MirrorSyncMode sync_mode, + BdrvDirtyBitmap *sync_bitmap, + bool compress, + BlockdevOnError on_source_error, + BlockdevOnError on_target_error, + int creation_flags, + BlockCompletionFunc *cb, void *opaque, + BlockJobTxn *txn, Error **errp); void hmp_drive_add_node(Monitor *mon, const char *optstr); From 0aef09b9c916e8824cabc55a7fd2ca1609b94810 Mon Sep 17 00:00:00 2001 From: John Snow Date: Tue, 8 Nov 2016 01:50:39 -0500 Subject: [PATCH 06/13] iotests: add transactional failure race test Add a regression test for the case found by Vladimir. Reported-by: Vladimir Sementsov-Ogievskiy Signed-off-by: John Snow Reviewed-by: Kevin Wolf Message-id: 1478587839-9834-7-git-send-email-jsnow@redhat.com Signed-off-by: Jeff Cody --- tests/qemu-iotests/124 | 53 +++++++++++++++++++++++++------------- tests/qemu-iotests/124.out | 4 +-- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/tests/qemu-iotests/124 b/tests/qemu-iotests/124 index f06938eeb7..d0d2c2bfb0 100644 --- a/tests/qemu-iotests/124 +++ b/tests/qemu-iotests/124 @@ -395,19 +395,7 @@ class TestIncrementalBackup(TestIncrementalBackupBase): self.check_backups() - def test_transaction_failure(self): - '''Test: Verify backups made from a transaction that partially fails. - - Add a second drive with its own unique pattern, and add a bitmap to each - drive. Use blkdebug to interfere with the backup on just one drive and - attempt to create a coherent incremental backup across both drives. - - verify a failure in one but not both, then delete the failed stubs and - re-run the same transaction. - - verify that both incrementals are created successfully. - ''' - + def do_transaction_failure_test(self, race=False): # Create a second drive, with pattern: drive1 = self.add_node('drive1') self.img_create(drive1['file'], drive1['fmt']) @@ -451,9 +439,10 @@ class TestIncrementalBackup(TestIncrementalBackupBase): self.assertFalse(self.vm.get_qmp_events(wait=False)) # Emulate some writes - self.hmp_io_writes(drive0['id'], (('0xab', 0, 512), - ('0xfe', '16M', '256k'), - ('0x64', '32736k', '64k'))) + if not race: + self.hmp_io_writes(drive0['id'], (('0xab', 0, 512), + ('0xfe', '16M', '256k'), + ('0x64', '32736k', '64k'))) self.hmp_io_writes(drive1['id'], (('0xba', 0, 512), ('0xef', '16M', '256k'), ('0x46', '32736k', '64k'))) @@ -463,7 +452,8 @@ class TestIncrementalBackup(TestIncrementalBackupBase): target1 = self.prepare_backup(dr1bm0) # Ask for a new incremental backup per-each drive, - # expecting drive1's backup to fail: + # expecting drive1's backup to fail. In the 'race' test, + # we expect drive1 to attempt to cancel the empty drive0 job. transaction = [ transaction_drive_backup(drive0['id'], target0, sync='incremental', format=drive0['fmt'], mode='existing', @@ -488,9 +478,15 @@ class TestIncrementalBackup(TestIncrementalBackupBase): self.assert_no_active_block_jobs() # Delete drive0's successful target and eliminate our record of the - # unsuccessful drive1 target. Then re-run the same transaction. + # unsuccessful drive1 target. dr0bm0.del_target() dr1bm0.del_target() + if race: + # Don't re-run the transaction, we only wanted to test the race. + self.vm.shutdown() + return + + # Re-run the same transaction: target0 = self.prepare_backup(dr0bm0) target1 = self.prepare_backup(dr1bm0) @@ -511,6 +507,27 @@ class TestIncrementalBackup(TestIncrementalBackupBase): self.vm.shutdown() self.check_backups() + def test_transaction_failure(self): + '''Test: Verify backups made from a transaction that partially fails. + + Add a second drive with its own unique pattern, and add a bitmap to each + drive. Use blkdebug to interfere with the backup on just one drive and + attempt to create a coherent incremental backup across both drives. + + verify a failure in one but not both, then delete the failed stubs and + re-run the same transaction. + + verify that both incrementals are created successfully. + ''' + self.do_transaction_failure_test() + + def test_transaction_failure_race(self): + '''Test: Verify that transactions with jobs that have no data to + transfer do not cause race conditions in the cancellation of the entire + transaction job group. + ''' + self.do_transaction_failure_test(race=True) + def test_sync_dirty_bitmap_missing(self): self.assert_no_active_block_jobs() diff --git a/tests/qemu-iotests/124.out b/tests/qemu-iotests/124.out index 36376bed87..e56cae021b 100644 --- a/tests/qemu-iotests/124.out +++ b/tests/qemu-iotests/124.out @@ -1,5 +1,5 @@ -.......... +........... ---------------------------------------------------------------------- -Ran 10 tests +Ran 11 tests OK From 0a4c0c3f90c3fb0239cb17d63a68c14742965a1c Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 9 Nov 2016 17:20:07 +0100 Subject: [PATCH 07/13] qemu-iotests: avoid spurious failure on test 109 In some cases it is possible that query-io-status is called just before the job is completed, causing -{"timestamp": {"seconds": TIMESTAMP, "microseconds": TIMESTAMP}, "event": "BLOCK_JOB_COMPLETED", "data": {"device": "src", "len": 31457280, "offset": OFFSET, "speed": 0, "type": "mirror", "error": "Operation not permitted"}} -{"return": []} +{"return": [{"io-status": "ok", "device": "src", "busy": true, "len": 31457280, "offset": OFFSET, "paused": false, "speed": 0, "ready": false, "type": "mirror"}]} Assert that the completeion event eventually happens. Signed-off-by: Paolo Bonzini Message-id: 20161109162008.27287-1-pbonzini@redhat.com Reviewed-by: Jeff Cody Signed-off-by: Jeff Cody --- tests/qemu-iotests/109 | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/qemu-iotests/109 b/tests/qemu-iotests/109 index 280ed27aa9..927151a285 100755 --- a/tests/qemu-iotests/109 +++ b/tests/qemu-iotests/109 @@ -62,6 +62,9 @@ function run_qemu() "return" _send_qemu_cmd $QEMU_HANDLE '' "$qmp_event" + if test "$qmp_event" = BLOCK_JOB_ERROR; then + _send_qemu_cmd $QEMU_HANDLE '' "BLOCK_JOB_COMPLETED" + fi _send_qemu_cmd $QEMU_HANDLE '{"execute":"query-block-jobs"}' "return" _cleanup_qemu } From 23dce3873f3aee6ee7d4a1c17dd26fb5f453bc5a Mon Sep 17 00:00:00 2001 From: Max Reitz Date: Wed, 2 Nov 2016 18:55:37 +0100 Subject: [PATCH 08/13] block/curl: Drop TFTP "support" Because TFTP does not support byte ranges, it was never usable with our curl block driver. Since apparently nobody has ever complained loudly enough for someone to take care of the issue until now, it seems reasonable to assume that nobody has ever actually used it. Therefore, it should be safe to just drop it from curl's protocol list. [Jeff Cody: Below is additional summary pulled, with some rewording, from followup emails between Max and Markus, to explain what worked and what didn't] TFTP would sometimes work, to a limited extent, for images <= the curl "readahead" size, so long as reads started at offset zero. By default, that readahead size is 256KB. Reads starting at a non-zero offset would also have returned data from a zero offset. It can become more complicated still, with mixed reads at zero offset and non-zero offsets, due to data buffering. In short, TFTP could only have worked before in very specific scenarios with unrealistic expectations and constraints. Signed-off-by: Max Reitz Reviewed-by: Kevin Wolf Reviewed-by: Jeff Cody Message-id: 20161102175539.4375-4-mreitz@redhat.com Signed-off-by: Jeff Cody --- block/curl.c | 20 +------------------- docs/qmp-commands.txt | 2 +- qapi/block-core.json | 7 +++---- qemu-options.hx | 6 +++--- 4 files changed, 8 insertions(+), 27 deletions(-) diff --git a/block/curl.c b/block/curl.c index e5eaa7ba0a..ba8adae74f 100644 --- a/block/curl.c +++ b/block/curl.c @@ -68,8 +68,7 @@ static CURLMcode __curl_multi_socket_action(CURLM *multi_handle, #endif #define PROTOCOLS (CURLPROTO_HTTP | CURLPROTO_HTTPS | \ - CURLPROTO_FTP | CURLPROTO_FTPS | \ - CURLPROTO_TFTP) + CURLPROTO_FTP | CURLPROTO_FTPS) #define CURL_NUM_STATES 8 #define CURL_NUM_ACB 8 @@ -886,29 +885,12 @@ static BlockDriver bdrv_ftps = { .bdrv_attach_aio_context = curl_attach_aio_context, }; -static BlockDriver bdrv_tftp = { - .format_name = "tftp", - .protocol_name = "tftp", - - .instance_size = sizeof(BDRVCURLState), - .bdrv_parse_filename = curl_parse_filename, - .bdrv_file_open = curl_open, - .bdrv_close = curl_close, - .bdrv_getlength = curl_getlength, - - .bdrv_aio_readv = curl_aio_readv, - - .bdrv_detach_aio_context = curl_detach_aio_context, - .bdrv_attach_aio_context = curl_attach_aio_context, -}; - static void curl_block_init(void) { bdrv_register(&bdrv_http); bdrv_register(&bdrv_https); bdrv_register(&bdrv_ftp); bdrv_register(&bdrv_ftps); - bdrv_register(&bdrv_tftp); } block_init(curl_block_init); diff --git a/docs/qmp-commands.txt b/docs/qmp-commands.txt index 6afa87298d..abf210a596 100644 --- a/docs/qmp-commands.txt +++ b/docs/qmp-commands.txt @@ -1803,7 +1803,7 @@ Each json-object contain the following: "file", "file", "ftp", "ftps", "host_cdrom", "host_device", "http", "https", "nbd", "parallels", "qcow", "qcow2", "raw", - "tftp", "vdi", "vmdk", "vpc", "vvfat" + "vdi", "vmdk", "vpc", "vvfat" - "backing_file": backing file name (json-string, optional) - "backing_file_depth": number of files in the backing file chain (json-int) - "encrypted": true if encrypted, false otherwise (json-bool) diff --git a/qapi/block-core.json b/qapi/block-core.json index bcd3b9effe..c29bef7ee1 100644 --- a/qapi/block-core.json +++ b/qapi/block-core.json @@ -243,12 +243,12 @@ # 0.14.0 this can be: 'blkdebug', 'bochs', 'cloop', 'cow', 'dmg', # 'file', 'file', 'ftp', 'ftps', 'host_cdrom', 'host_device', # 'http', 'https', 'luks', 'nbd', 'parallels', 'qcow', -# 'qcow2', 'raw', 'tftp', 'vdi', 'vmdk', 'vpc', 'vvfat' +# 'qcow2', 'raw', 'vdi', 'vmdk', 'vpc', 'vvfat' # 2.2: 'archipelago' added, 'cow' dropped # 2.3: 'host_floppy' deprecated # 2.5: 'host_floppy' dropped # 2.6: 'luks' added -# 2.8: 'replication' added +# 2.8: 'replication' added, 'tftp' dropped # # @backing_file: #optional the name of the backing file (for copy-on-write) # @@ -1723,7 +1723,7 @@ 'dmg', 'file', 'ftp', 'ftps', 'gluster', 'host_cdrom', 'host_device', 'http', 'https', 'luks', 'nbd', 'nfs', 'null-aio', 'null-co', 'parallels', 'qcow', 'qcow2', 'qed', 'quorum', 'raw', - 'replication', 'ssh', 'tftp', 'vdi', 'vhdx', 'vmdk', 'vpc', + 'replication', 'ssh', 'vdi', 'vhdx', 'vmdk', 'vpc', 'vvfat' ] } ## @@ -2410,7 +2410,6 @@ 'replication':'BlockdevOptionsReplication', # TODO sheepdog: Wait for structured options 'ssh': 'BlockdevOptionsSsh', - 'tftp': 'BlockdevOptionsCurl', 'vdi': 'BlockdevOptionsGenericFormat', 'vhdx': 'BlockdevOptionsGenericFormat', 'vmdk': 'BlockdevOptionsGenericCOWFormat', diff --git a/qemu-options.hx b/qemu-options.hx index 4536e18ac0..4a5b29f349 100644 --- a/qemu-options.hx +++ b/qemu-options.hx @@ -2606,8 +2606,8 @@ qemu-system-x86_64 --drive file=gluster://192.0.2.1/testvol/a.img See also @url{http://www.gluster.org}. -@item HTTP/HTTPS/FTP/FTPS/TFTP -QEMU supports read-only access to files accessed over http(s), ftp(s) and tftp. +@item HTTP/HTTPS/FTP/FTPS +QEMU supports read-only access to files accessed over http(s) and ftp(s). Syntax using a single filename: @example @@ -2617,7 +2617,7 @@ Syntax using a single filename: where: @table @option @item protocol -'http', 'https', 'ftp', 'ftps', or 'tftp'. +'http', 'https', 'ftp', or 'ftps'. @item username Optional username for authentication to the remote server. From 9054d9f6b00a3f0576b1a7310a3886d1783ad382 Mon Sep 17 00:00:00 2001 From: Max Reitz Date: Tue, 25 Oct 2016 04:54:28 +0200 Subject: [PATCH 09/13] block/curl: Use BDRV_SECTOR_SIZE Currently, curl defines its own constant SECTOR_SIZE. There is no advantage over using the global BDRV_SECTOR_SIZE, so drop it. Cc: qemu-stable@nongnu.org Signed-off-by: Max Reitz Reviewed-by: Eric Blake Message-id: 20161025025431.24714-2-mreitz@redhat.com Signed-off-by: Jeff Cody --- block/curl.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/block/curl.c b/block/curl.c index ba8adae74f..1d19e64f0f 100644 --- a/block/curl.c +++ b/block/curl.c @@ -72,7 +72,6 @@ static CURLMcode __curl_multi_socket_action(CURLM *multi_handle, #define CURL_NUM_STATES 8 #define CURL_NUM_ACB 8 -#define SECTOR_SIZE 512 #define READ_AHEAD_DEFAULT (256 * 1024) #define CURL_TIMEOUT_DEFAULT 5 #define CURL_TIMEOUT_MAX 10000 @@ -737,12 +736,12 @@ static void curl_readv_bh_cb(void *p) CURLAIOCB *acb = p; BDRVCURLState *s = acb->common.bs->opaque; - size_t start = acb->sector_num * SECTOR_SIZE; + size_t start = acb->sector_num * BDRV_SECTOR_SIZE; size_t end; // In case we have the requested data already (e.g. read-ahead), // we can just call the callback and be done. - switch (curl_find_buf(s, start, acb->nb_sectors * SECTOR_SIZE, acb)) { + switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) { case FIND_RET_OK: qemu_aio_unref(acb); // fall through @@ -761,7 +760,7 @@ static void curl_readv_bh_cb(void *p) } acb->start = 0; - acb->end = (acb->nb_sectors * SECTOR_SIZE); + acb->end = (acb->nb_sectors * BDRV_SECTOR_SIZE); state->buf_off = 0; g_free(state->orig_buf); @@ -778,8 +777,8 @@ static void curl_readv_bh_cb(void *p) state->acb[0] = acb; snprintf(state->range, 127, "%zd-%zd", start, end); - DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n", - (acb->nb_sectors * SECTOR_SIZE), start, state->range); + DPRINTF("CURL (AIO): Reading %llu at %zd (%s)\n", + (acb->nb_sectors * BDRV_SECTOR_SIZE), start, state->range); curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); curl_multi_add_handle(s->multi, state->curl); From 4e7676571bccb42dd49b5efbb91ac49077ea5197 Mon Sep 17 00:00:00 2001 From: Max Reitz Date: Tue, 25 Oct 2016 04:54:29 +0200 Subject: [PATCH 10/13] block/curl: Fix return value from curl_read_cb While commit 38bbc0a580f9f10570b1d1b5d3e92f0e6feb2970 is correct in that the callback is supposed to return the number of bytes handled; what it does not mention is that libcurl will throw an error if the callback did not "handle" all of the data passed to it. Therefore, if the callback receives some data that it cannot handle (either because the receive buffer has not been set up yet or because it would not fit into the receive buffer) and we have to ignore it, we still have to report that the data has been handled. Obviously, this should not happen normally. But it does happen at least for FTP connections where some data (that we do not expect) may be generated when the connection is established. Cc: qemu-stable@nongnu.org Signed-off-by: Max Reitz Reviewed-by: Eric Blake Message-id: 20161025025431.24714-3-mreitz@redhat.com Signed-off-by: Jeff Cody --- block/curl.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/block/curl.c b/block/curl.c index 1d19e64f0f..7a7e831866 100644 --- a/block/curl.c +++ b/block/curl.c @@ -211,12 +211,13 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) DPRINTF("CURL: Just reading %zd bytes\n", realsize); - if (!s || !s->orig_buf) - return 0; + if (!s || !s->orig_buf) { + goto read_end; + } if (s->buf_off >= s->buf_len) { /* buffer full, read nothing */ - return 0; + goto read_end; } realsize = MIN(realsize, s->buf_len - s->buf_off); memcpy(s->orig_buf + s->buf_off, ptr, realsize); @@ -237,7 +238,9 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) } } - return realsize; +read_end: + /* curl will error out if we do not return this value */ + return size * nmemb; } static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len, From ff5ca1664af85b24a4180d595ea6873fd3deac57 Mon Sep 17 00:00:00 2001 From: Max Reitz Date: Tue, 25 Oct 2016 04:54:30 +0200 Subject: [PATCH 11/13] block/curl: Remember all sockets For some connection types (like FTP, generally), more than one socket may be used (in FTP's case: control vs. data stream). As of commit 838ef602498b8d1985a231a06f5e328e2946a81d ("curl: Eliminate unnecessary use of curl_multi_socket_all"), we have to remember all of the sockets used by libcurl, but in fact we only did that for a single one. Since one libcurl connection may use multiple sockets, however, we have to remember them all. Cc: qemu-stable@nongnu.org Signed-off-by: Max Reitz Message-id: 20161025025431.24714-4-mreitz@redhat.com Signed-off-by: Jeff Cody --- block/curl.c | 47 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/block/curl.c b/block/curl.c index 7a7e831866..273f329d26 100644 --- a/block/curl.c +++ b/block/curl.c @@ -103,12 +103,17 @@ typedef struct CURLAIOCB { size_t end; } CURLAIOCB; +typedef struct CURLSocket { + int fd; + QLIST_ENTRY(CURLSocket) next; +} CURLSocket; + typedef struct CURLState { struct BDRVCURLState *s; CURLAIOCB *acb[CURL_NUM_ACB]; CURL *curl; - curl_socket_t sock_fd; + QLIST_HEAD(, CURLSocket) sockets; char *orig_buf; size_t buf_start; size_t buf_off; @@ -162,10 +167,27 @@ static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action, { BDRVCURLState *s; CURLState *state = NULL; + CURLSocket *socket; + curl_easy_getinfo(curl, CURLINFO_PRIVATE, (char **)&state); - state->sock_fd = fd; s = state->s; + QLIST_FOREACH(socket, &state->sockets, next) { + if (socket->fd == fd) { + if (action == CURL_POLL_REMOVE) { + QLIST_REMOVE(socket, next); + g_free(socket); + } + break; + } + } + if (!socket) { + socket = g_new0(CURLSocket, 1); + socket->fd = fd; + QLIST_INSERT_HEAD(&state->sockets, socket, next); + } + socket = NULL; + DPRINTF("CURL (AIO): Sock action %d on fd %d\n", action, (int)fd); switch (action) { case CURL_POLL_IN: @@ -353,6 +375,7 @@ static void curl_multi_check_completion(BDRVCURLState *s) static void curl_multi_do(void *arg) { CURLState *s = (CURLState *)arg; + CURLSocket *socket, *next_socket; int running; int r; @@ -360,10 +383,13 @@ static void curl_multi_do(void *arg) return; } - do { - r = curl_multi_socket_action(s->s->multi, s->sock_fd, 0, &running); - } while(r == CURLM_CALL_MULTI_PERFORM); - + /* Need to use _SAFE because curl_multi_socket_action() may trigger + * curl_sock_cb() which might modify this list */ + QLIST_FOREACH_SAFE(socket, &s->sockets, next, next_socket) { + do { + r = curl_multi_socket_action(s->s->multi, socket->fd, 0, &running); + } while (r == CURLM_CALL_MULTI_PERFORM); + } } static void curl_multi_read(void *arg) @@ -467,6 +493,7 @@ static CURLState *curl_init_state(BlockDriverState *bs, BDRVCURLState *s) #endif } + QLIST_INIT(&state->sockets); state->s = s; return state; @@ -476,6 +503,14 @@ static void curl_clean_state(CURLState *s) { if (s->s->multi) curl_multi_remove_handle(s->s->multi, s->curl); + + while (!QLIST_EMPTY(&s->sockets)) { + CURLSocket *socket = QLIST_FIRST(&s->sockets); + + QLIST_REMOVE(socket, next); + g_free(socket); + } + s->in_use = 0; } From 4e504535c16dfa66290281e704384abfaca08673 Mon Sep 17 00:00:00 2001 From: Max Reitz Date: Tue, 25 Oct 2016 04:54:31 +0200 Subject: [PATCH 12/13] block/curl: Do not wait for data beyond EOF libcurl will only give us as much data as there is, not more. The block layer will deny requests beyond the end of file for us; but since this block driver is still using a sector-based interface, we can still get in trouble if the file size is not a multiple of 512. While we have already made sure not to attempt transfers beyond the end of the file, we are currently still trying to receive data from there if the original request exceeds the file size. This patch fixes this issue and invokes qemu_iovec_memset() on the iovec's tail. Cc: qemu-stable@nongnu.org Signed-off-by: Max Reitz Message-id: 20161025025431.24714-5-mreitz@redhat.com Signed-off-by: Jeff Cody --- block/curl.c | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/block/curl.c b/block/curl.c index 273f329d26..0404c1b5fa 100644 --- a/block/curl.c +++ b/block/curl.c @@ -252,8 +252,17 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) continue; if ((s->buf_off >= acb->end)) { + size_t request_length = acb->nb_sectors * BDRV_SECTOR_SIZE; + qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start, acb->end - acb->start); + + if (acb->end - acb->start < request_length) { + size_t offset = acb->end - acb->start; + qemu_iovec_memset(acb->qiov, offset, 0, + request_length - offset); + } + acb->common.cb(acb->common.opaque, 0); qemu_aio_unref(acb); s->acb[i] = NULL; @@ -270,6 +279,8 @@ static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len, { int i; size_t end = start + len; + size_t clamped_end = MIN(end, s->len); + size_t clamped_len = clamped_end - start; for (i=0; istates[i]; @@ -284,12 +295,15 @@ static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len, // Does the existing buffer cover our section? if ((start >= state->buf_start) && (start <= buf_end) && - (end >= state->buf_start) && - (end <= buf_end)) + (clamped_end >= state->buf_start) && + (clamped_end <= buf_end)) { char *buf = state->orig_buf + (start - state->buf_start); - qemu_iovec_from_buf(acb->qiov, 0, buf, len); + qemu_iovec_from_buf(acb->qiov, 0, buf, clamped_len); + if (clamped_len < len) { + qemu_iovec_memset(acb->qiov, clamped_len, 0, len - clamped_len); + } acb->common.cb(acb->common.opaque, 0); return FIND_RET_OK; @@ -299,13 +313,13 @@ static int curl_find_buf(BDRVCURLState *s, size_t start, size_t len, if (state->in_use && (start >= state->buf_start) && (start <= buf_fend) && - (end >= state->buf_start) && - (end <= buf_fend)) + (clamped_end >= state->buf_start) && + (clamped_end <= buf_fend)) { int j; acb->start = start - state->buf_start; - acb->end = acb->start + len; + acb->end = acb->start + clamped_len; for (j=0; jacb[j]) { @@ -798,13 +812,13 @@ static void curl_readv_bh_cb(void *p) } acb->start = 0; - acb->end = (acb->nb_sectors * BDRV_SECTOR_SIZE); + acb->end = MIN(acb->nb_sectors * BDRV_SECTOR_SIZE, s->len - start); state->buf_off = 0; g_free(state->orig_buf); state->buf_start = start; - state->buf_len = acb->end + s->readahead_size; - end = MIN(start + state->buf_len, s->len) - 1; + state->buf_len = MIN(acb->end + s->readahead_size, s->len - start); + end = start + state->buf_len - 1; state->orig_buf = g_try_malloc(state->buf_len); if (state->buf_len && state->orig_buf == NULL) { curl_clean_state(state); From bdffb31d8eece1cbd4d88f136daccfe1f93a1bf6 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 9 Nov 2016 17:20:08 +0100 Subject: [PATCH 13/13] mirror: do not flush every time the disks are synced This puts a huge strain on the disks when there are many concurrent migrations. With this patch we only flush twice: just before issuing the event, and just before pivoting to the destination. If management will complete the job close to the BLOCK_JOB_READY event, the cost of the second flush should be small anyway. Signed-off-by: Paolo Bonzini Message-id: 20161109162008.27287-2-pbonzini@redhat.com Signed-off-by: Jeff Cody --- block/mirror.c | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/block/mirror.c b/block/mirror.c index 62ac87f0c4..301ba9219a 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -615,6 +615,20 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s) return 0; } +/* Called when going out of the streaming phase to flush the bulk of the + * data to the medium, or just before completing. + */ +static int mirror_flush(MirrorBlockJob *s) +{ + int ret = blk_flush(s->target); + if (ret < 0) { + if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) { + s->ret = ret; + } + } + return ret; +} + static void coroutine_fn mirror_run(void *opaque) { MirrorBlockJob *s = opaque; @@ -727,27 +741,23 @@ static void coroutine_fn mirror_run(void *opaque) should_complete = false; if (s->in_flight == 0 && cnt == 0) { trace_mirror_before_flush(s); - ret = blk_flush(s->target); - if (ret < 0) { - if (mirror_error_action(s, false, -ret) == - BLOCK_ERROR_ACTION_REPORT) { - goto immediate_exit; + if (!s->synced) { + if (mirror_flush(s) < 0) { + /* Go check s->ret. */ + continue; } - } else { /* We're out of the streaming phase. From now on, if the job * is cancelled we will actually complete all pending I/O and * report completion. This way, block-job-cancel will leave * the target in a consistent state. */ - if (!s->synced) { - block_job_event_ready(&s->common); - s->synced = true; - } - - should_complete = s->should_complete || - block_job_is_cancelled(&s->common); - cnt = bdrv_get_dirty_count(s->dirty_bitmap); + block_job_event_ready(&s->common); + s->synced = true; } + + should_complete = s->should_complete || + block_job_is_cancelled(&s->common); + cnt = bdrv_get_dirty_count(s->dirty_bitmap); } if (cnt == 0 && should_complete) { @@ -765,7 +775,7 @@ static void coroutine_fn mirror_run(void *opaque) bdrv_drained_begin(bs); cnt = bdrv_get_dirty_count(s->dirty_bitmap); - if (cnt > 0) { + if (cnt > 0 || mirror_flush(s) < 0) { bdrv_drained_end(bs); continue; }