diff --git a/block/block-copy.c b/block/block-copy.c index f560338647..03500680f7 100644 --- a/block/block-copy.c +++ b/block/block-copy.c @@ -19,15 +19,29 @@ #include "block/block-copy.h" #include "sysemu/block-backend.h" #include "qemu/units.h" +#include "qemu/coroutine.h" +#include "block/aio_task.h" #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB) #define BLOCK_COPY_MAX_BUFFER (1 * MiB) #define BLOCK_COPY_MAX_MEM (128 * MiB) +#define BLOCK_COPY_MAX_WORKERS 64 + +static coroutine_fn int block_copy_task_entry(AioTask *task); + +typedef struct BlockCopyCallState { + bool failed; + bool error_is_read; +} BlockCopyCallState; typedef struct BlockCopyTask { + AioTask task; + BlockCopyState *s; + BlockCopyCallState *call_state; int64_t offset; int64_t bytes; + bool zeroes; QLIST_ENTRY(BlockCopyTask) list; CoQueue wait_queue; /* coroutines blocked on this task */ } BlockCopyTask; @@ -116,6 +130,7 @@ static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset, * the beginning of it. */ static BlockCopyTask *block_copy_task_create(BlockCopyState *s, + BlockCopyCallState *call_state, int64_t offset, int64_t bytes) { BlockCopyTask *task; @@ -135,7 +150,9 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s, task = g_new(BlockCopyTask, 1); *task = (BlockCopyTask) { + .task.func = block_copy_task_entry, .s = s, + .call_state = call_state, .offset = offset, .bytes = bytes, }; @@ -263,6 +280,38 @@ void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm) s->progress = pm; } +/* + * Takes ownership of @task + * + * If pool is NULL directly run the task, otherwise schedule it into the pool. + * + * Returns: task.func return code if pool is NULL + * otherwise -ECANCELED if pool status is bad + * otherwise 0 (successfully scheduled) + */ +static coroutine_fn int block_copy_task_run(AioTaskPool *pool, + BlockCopyTask *task) +{ + if (!pool) { + int ret = task->task.func(&task->task); + + g_free(task); + return ret; + } + + aio_task_pool_wait_slot(pool); + if (aio_task_pool_status(pool) < 0) { + co_put_to_shres(task->s->mem, task->bytes); + block_copy_task_end(task, -ECANCELED); + g_free(task); + return -ECANCELED; + } + + aio_task_pool_start_task(pool, &task->task); + + return 0; +} + /* * block_copy_do_copy * @@ -366,6 +415,27 @@ out: return ret; } +static coroutine_fn int block_copy_task_entry(AioTask *task) +{ + BlockCopyTask *t = container_of(task, BlockCopyTask, task); + bool error_is_read; + int ret; + + ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes, + &error_is_read); + if (ret < 0 && !t->call_state->failed) { + t->call_state->failed = true; + t->call_state->error_is_read = error_is_read; + } else { + progress_work_done(t->s->progress, t->bytes); + t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque); + } + co_put_to_shres(t->s->mem, t->bytes); + block_copy_task_end(t, ret); + + return ret; +} + static int block_copy_block_status(BlockCopyState *s, int64_t offset, int64_t bytes, int64_t *pnum) { @@ -484,6 +554,8 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, int ret = 0; bool found_dirty = false; int64_t end = offset + bytes; + AioTaskPool *aio = NULL; + BlockCopyCallState call_state = {false, false}; /* * block_copy() user is responsible for keeping source and target in same @@ -495,11 +567,11 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); - while (bytes) { - g_autofree BlockCopyTask *task = NULL; + while (bytes && aio_task_pool_status(aio) == 0) { + BlockCopyTask *task; int64_t status_bytes; - task = block_copy_task_create(s, offset, bytes); + task = block_copy_task_create(s, &call_state, offset, bytes); if (!task) { /* No more dirty bits in the bitmap */ trace_block_copy_skip_range(s, offset, bytes); @@ -519,6 +591,7 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, } if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) { block_copy_task_end(task, 0); + g_free(task); progress_set_remaining(s->progress, bdrv_get_dirty_count(s->copy_bitmap) + s->in_flight_bytes); @@ -527,25 +600,45 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, bytes = end - offset; continue; } + task->zeroes = ret & BDRV_BLOCK_ZERO; trace_block_copy_process(s, task->offset); co_get_from_shres(s->mem, task->bytes); - ret = block_copy_do_copy(s, task->offset, task->bytes, - ret & BDRV_BLOCK_ZERO, error_is_read); - co_put_to_shres(s->mem, task->bytes); - block_copy_task_end(task, ret); - if (ret < 0) { - return ret; - } - progress_work_done(s->progress, task->bytes); - s->progress_bytes_callback(task->bytes, s->progress_opaque); offset = task_end(task); bytes = end - offset; + + if (!aio && bytes) { + aio = aio_task_pool_new(BLOCK_COPY_MAX_WORKERS); + } + + ret = block_copy_task_run(aio, task); + if (ret < 0) { + goto out; + } } - return found_dirty; +out: + if (aio) { + aio_task_pool_wait_all(aio); + + /* + * We are not really interested in -ECANCELED returned from + * block_copy_task_run. If it fails, it means some task already failed + * for real reason, let's return first failure. + * Still, assert that we don't rewrite failure by success. + */ + assert(ret == 0 || aio_task_pool_status(aio) < 0); + ret = aio_task_pool_status(aio); + + aio_task_pool_free(aio); + } + if (error_is_read && ret < 0) { + *error_is_read = call_state.error_is_read; + } + + return ret < 0 ? ret : found_dirty; } /*