diff --git a/.travis.yml b/.travis.yml index 27a2d9cfb3..3c7a5cbe25 100644 --- a/.travis.yml +++ b/.travis.yml @@ -86,6 +86,9 @@ matrix: - env: CONFIG="--enable-trace-backends=ust" TEST_CMD="" compiler: gcc + - env: CONFIG="--disable-tcg" + TEST_CMD="" + compiler: gcc - env: CONFIG="" os: osx compiler: clang diff --git a/block/io.c b/block/io.c index b413727524..aece54c015 100644 --- a/block/io.c +++ b/block/io.c @@ -149,6 +149,37 @@ bool bdrv_requests_pending(BlockDriverState *bs) return false; } +typedef struct { + Coroutine *co; + BlockDriverState *bs; + bool done; +} BdrvCoDrainData; + +static void coroutine_fn bdrv_drain_invoke_entry(void *opaque) +{ + BdrvCoDrainData *data = opaque; + BlockDriverState *bs = data->bs; + + bs->drv->bdrv_co_drain(bs); + + /* Set data->done before reading bs->wakeup. */ + atomic_mb_set(&data->done, true); + bdrv_wakeup(bs); +} + +static void bdrv_drain_invoke(BlockDriverState *bs) +{ + BdrvCoDrainData data = { .bs = bs, .done = false }; + + if (!bs->drv || !bs->drv->bdrv_co_drain) { + return; + } + + data.co = qemu_coroutine_create(bdrv_drain_invoke_entry, &data); + bdrv_coroutine_enter(bs, data.co); + BDRV_POLL_WHILE(bs, !data.done); +} + static bool bdrv_drain_recurse(BlockDriverState *bs) { BdrvChild *child, *tmp; @@ -156,9 +187,8 @@ static bool bdrv_drain_recurse(BlockDriverState *bs) waited = BDRV_POLL_WHILE(bs, atomic_read(&bs->in_flight) > 0); - if (bs->drv && bs->drv->bdrv_drain) { - bs->drv->bdrv_drain(bs); - } + /* Ensure any pending metadata writes are submitted to bs->file. */ + bdrv_drain_invoke(bs); QLIST_FOREACH_SAFE(child, &bs->children, next, tmp) { BlockDriverState *bs = child->bs; @@ -184,12 +214,6 @@ static bool bdrv_drain_recurse(BlockDriverState *bs) return waited; } -typedef struct { - Coroutine *co; - BlockDriverState *bs; - bool done; -} BdrvCoDrainData; - static void bdrv_co_drain_bh_cb(void *opaque) { BdrvCoDrainData *data = opaque; diff --git a/block/qcow2.c b/block/qcow2.c index c144ea5620..d5790af1e0 100644 --- a/block/qcow2.c +++ b/block/qcow2.c @@ -2025,8 +2025,6 @@ static coroutine_fn int qcow2_co_pwritev(BlockDriverState *bs, uint64_t offset, ret = 0; fail: - qemu_co_mutex_unlock(&s->lock); - while (l2meta != NULL) { QCowL2Meta *next; @@ -2040,6 +2038,8 @@ fail: l2meta = next; } + qemu_co_mutex_unlock(&s->lock); + qemu_iovec_destroy(&hd_qiov); qemu_vfree(cluster_data); trace_qcow2_writev_done_req(qemu_coroutine_self(), ret); diff --git a/block/qed-cluster.c b/block/qed-cluster.c index d8d6e66a0f..672e2e654b 100644 --- a/block/qed-cluster.c +++ b/block/qed-cluster.c @@ -85,6 +85,8 @@ static unsigned int qed_count_contiguous_clusters(BDRVQEDState *s, * * On failure QED_CLUSTER_L2 or QED_CLUSTER_L1 is returned for missing L2 or L1 * table offset, respectively. len is number of contiguous unallocated bytes. + * + * Called with table_lock held. */ int coroutine_fn qed_find_cluster(BDRVQEDState *s, QEDRequest *request, uint64_t pos, size_t *len, @@ -112,7 +114,6 @@ int coroutine_fn qed_find_cluster(BDRVQEDState *s, QEDRequest *request, } ret = qed_read_l2_table(s, request, l2_offset); - qed_acquire(s); if (ret) { goto out; } @@ -137,6 +138,5 @@ int coroutine_fn qed_find_cluster(BDRVQEDState *s, QEDRequest *request, out: *img_offset = offset; - qed_release(s); return ret; } diff --git a/block/qed-l2-cache.c b/block/qed-l2-cache.c index 5cba794650..b548362398 100644 --- a/block/qed-l2-cache.c +++ b/block/qed-l2-cache.c @@ -101,6 +101,8 @@ CachedL2Table *qed_alloc_l2_cache_entry(L2TableCache *l2_cache) /** * Decrease an entry's reference count and free if necessary when the reference * count drops to zero. + * + * Called with table_lock held. */ void qed_unref_l2_cache_entry(CachedL2Table *entry) { @@ -122,6 +124,8 @@ void qed_unref_l2_cache_entry(CachedL2Table *entry) * * For a cached entry, this function increases the reference count and returns * the entry. + * + * Called with table_lock held. */ CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset) { @@ -150,6 +154,8 @@ CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset) * N.B. This function steals a reference to the l2_table from the caller so the * caller must obtain a new reference by issuing a call to * qed_find_l2_cache_entry(). + * + * Called with table_lock held. */ void qed_commit_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *l2_table) { diff --git a/block/qed-table.c b/block/qed-table.c index ebee2c50f0..eead8b0fc7 100644 --- a/block/qed-table.c +++ b/block/qed-table.c @@ -18,6 +18,7 @@ #include "qed.h" #include "qemu/bswap.h" +/* Called either from qed_check or with table_lock held. */ static int qed_read_table(BDRVQEDState *s, uint64_t offset, QEDTable *table) { QEMUIOVector qiov; @@ -32,18 +33,22 @@ static int qed_read_table(BDRVQEDState *s, uint64_t offset, QEDTable *table) trace_qed_read_table(s, offset, table); + if (qemu_in_coroutine()) { + qemu_co_mutex_unlock(&s->table_lock); + } ret = bdrv_preadv(s->bs->file, offset, &qiov); + if (qemu_in_coroutine()) { + qemu_co_mutex_lock(&s->table_lock); + } if (ret < 0) { goto out; } /* Byteswap offsets */ - qed_acquire(s); noffsets = qiov.size / sizeof(uint64_t); for (i = 0; i < noffsets; i++) { table->offsets[i] = le64_to_cpu(table->offsets[i]); } - qed_release(s); ret = 0; out: @@ -61,6 +66,8 @@ out: * @index: Index of first element * @n: Number of elements * @flush: Whether or not to sync to disk + * + * Called either from qed_check or with table_lock held. */ static int qed_write_table(BDRVQEDState *s, uint64_t offset, QEDTable *table, unsigned int index, unsigned int n, bool flush) @@ -97,16 +104,20 @@ static int qed_write_table(BDRVQEDState *s, uint64_t offset, QEDTable *table, /* Adjust for offset into table */ offset += start * sizeof(uint64_t); + if (qemu_in_coroutine()) { + qemu_co_mutex_unlock(&s->table_lock); + } ret = bdrv_pwritev(s->bs->file, offset, &qiov); + if (qemu_in_coroutine()) { + qemu_co_mutex_lock(&s->table_lock); + } trace_qed_write_table_cb(s, table, flush, ret); if (ret < 0) { goto out; } if (flush) { - qed_acquire(s); ret = bdrv_flush(s->bs); - qed_release(s); if (ret < 0) { goto out; } @@ -123,6 +134,7 @@ int qed_read_l1_table_sync(BDRVQEDState *s) return qed_read_table(s, s->header.l1_table_offset, s->l1_table); } +/* Called either from qed_check or with table_lock held. */ int qed_write_l1_table(BDRVQEDState *s, unsigned int index, unsigned int n) { BLKDBG_EVENT(s->bs->file, BLKDBG_L1_UPDATE); @@ -136,6 +148,7 @@ int qed_write_l1_table_sync(BDRVQEDState *s, unsigned int index, return qed_write_l1_table(s, index, n); } +/* Called either from qed_check or with table_lock held. */ int qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset) { int ret; @@ -154,7 +167,6 @@ int qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset) BLKDBG_EVENT(s->bs->file, BLKDBG_L2_LOAD); ret = qed_read_table(s, offset, request->l2_table->table); - qed_acquire(s); if (ret) { /* can't trust loaded L2 table anymore */ qed_unref_l2_cache_entry(request->l2_table); @@ -170,7 +182,6 @@ int qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset) request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, offset); assert(request->l2_table != NULL); } - qed_release(s); return ret; } @@ -180,6 +191,7 @@ int qed_read_l2_table_sync(BDRVQEDState *s, QEDRequest *request, uint64_t offset return qed_read_l2_table(s, request, offset); } +/* Called either from qed_check or with table_lock held. */ int qed_write_l2_table(BDRVQEDState *s, QEDRequest *request, unsigned int index, unsigned int n, bool flush) { diff --git a/block/qed.c b/block/qed.c index 86cad2188c..dc54bf4a93 100644 --- a/block/qed.c +++ b/block/qed.c @@ -93,6 +93,8 @@ int qed_write_header_sync(BDRVQEDState *s) * * This function only updates known header fields in-place and does not affect * extra data after the QED header. + * + * No new allocating reqs can start while this function runs. */ static int coroutine_fn qed_write_header(BDRVQEDState *s) { @@ -109,6 +111,8 @@ static int coroutine_fn qed_write_header(BDRVQEDState *s) QEMUIOVector qiov; int ret; + assert(s->allocating_acb || s->allocating_write_reqs_plugged); + buf = qemu_blockalign(s->bs, len); iov = (struct iovec) { .iov_base = buf, @@ -219,6 +223,8 @@ static int qed_read_string(BdrvChild *file, uint64_t offset, size_t n, * This function only produces the offset where the new clusters should be * written. It updates BDRVQEDState but does not make any changes to the image * file. + * + * Called with table_lock held. */ static uint64_t qed_alloc_clusters(BDRVQEDState *s, unsigned int n) { @@ -236,6 +242,8 @@ QEDTable *qed_alloc_table(BDRVQEDState *s) /** * Allocate a new zeroed L2 table + * + * Called with table_lock held. */ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s) { @@ -249,19 +257,32 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s) return l2_table; } -static void qed_plug_allocating_write_reqs(BDRVQEDState *s) +static bool qed_plug_allocating_write_reqs(BDRVQEDState *s) { + qemu_co_mutex_lock(&s->table_lock); + + /* No reentrancy is allowed. */ assert(!s->allocating_write_reqs_plugged); + if (s->allocating_acb != NULL) { + /* Another allocating write came concurrently. This cannot happen + * from bdrv_qed_co_drain, but it can happen when the timer runs. + */ + qemu_co_mutex_unlock(&s->table_lock); + return false; + } s->allocating_write_reqs_plugged = true; + qemu_co_mutex_unlock(&s->table_lock); + return true; } static void qed_unplug_allocating_write_reqs(BDRVQEDState *s) { + qemu_co_mutex_lock(&s->table_lock); assert(s->allocating_write_reqs_plugged); - s->allocating_write_reqs_plugged = false; - qemu_co_enter_next(&s->allocating_write_reqs); + qemu_co_queue_next(&s->allocating_write_reqs); + qemu_co_mutex_unlock(&s->table_lock); } static void coroutine_fn qed_need_check_timer_entry(void *opaque) @@ -269,17 +290,14 @@ static void coroutine_fn qed_need_check_timer_entry(void *opaque) BDRVQEDState *s = opaque; int ret; - /* The timer should only fire when allocating writes have drained */ - assert(!s->allocating_acb); - trace_qed_need_check_timer_cb(s); - qed_acquire(s); - qed_plug_allocating_write_reqs(s); + if (!qed_plug_allocating_write_reqs(s)) { + return; + } /* Ensure writes are on disk before clearing flag */ ret = bdrv_co_flush(s->bs->file->bs); - qed_release(s); if (ret < 0) { qed_unplug_allocating_write_reqs(s); return; @@ -301,16 +319,6 @@ static void qed_need_check_timer_cb(void *opaque) qemu_coroutine_enter(co); } -void qed_acquire(BDRVQEDState *s) -{ - aio_context_acquire(bdrv_get_aio_context(s->bs)); -} - -void qed_release(BDRVQEDState *s) -{ - aio_context_release(bdrv_get_aio_context(s->bs)); -} - static void qed_start_need_check_timer(BDRVQEDState *s) { trace_qed_start_need_check_timer(s); @@ -350,7 +358,7 @@ static void bdrv_qed_attach_aio_context(BlockDriverState *bs, } } -static void bdrv_qed_drain(BlockDriverState *bs) +static void coroutine_fn bdrv_qed_co_drain(BlockDriverState *bs) { BDRVQEDState *s = bs->opaque; @@ -359,10 +367,20 @@ static void bdrv_qed_drain(BlockDriverState *bs) */ if (s->need_check_timer && timer_pending(s->need_check_timer)) { qed_cancel_need_check_timer(s); - qed_need_check_timer_cb(s); + qed_need_check_timer_entry(s); } } +static void bdrv_qed_init_state(BlockDriverState *bs) +{ + BDRVQEDState *s = bs->opaque; + + memset(s, 0, sizeof(BDRVQEDState)); + s->bs = bs; + qemu_co_mutex_init(&s->table_lock); + qemu_co_queue_init(&s->allocating_write_reqs); +} + static int bdrv_qed_do_open(BlockDriverState *bs, QDict *options, int flags, Error **errp) { @@ -371,9 +389,6 @@ static int bdrv_qed_do_open(BlockDriverState *bs, QDict *options, int flags, int64_t file_size; int ret; - s->bs = bs; - qemu_co_queue_init(&s->allocating_write_reqs); - ret = bdrv_pread(bs->file, 0, &le_header, sizeof(le_header)); if (ret < 0) { return ret; @@ -507,6 +522,7 @@ static int bdrv_qed_open(BlockDriverState *bs, QDict *options, int flags, return -EINVAL; } + bdrv_qed_init_state(bs); return bdrv_qed_do_open(bs, options, flags, errp); } @@ -681,6 +697,7 @@ typedef struct { BlockDriverState **file; } QEDIsAllocatedCB; +/* Called with table_lock held. */ static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t len) { QEDIsAllocatedCB *cb = opaque; @@ -728,6 +745,7 @@ static int64_t coroutine_fn bdrv_qed_co_get_block_status(BlockDriverState *bs, uint64_t offset; int ret; + qemu_co_mutex_lock(&s->table_lock); ret = qed_find_cluster(s, &request, cb.pos, &len, &offset); qed_is_allocated_cb(&cb, ret, offset, len); @@ -735,6 +753,7 @@ static int64_t coroutine_fn bdrv_qed_co_get_block_status(BlockDriverState *bs, assert(cb.status != BDRV_BLOCK_OFFSET_MASK); qed_unref_l2_cache_entry(request.l2_table); + qemu_co_mutex_unlock(&s->table_lock); return cb.status; } @@ -865,6 +884,8 @@ out: * * The cluster offset may be an allocated byte offset in the image file, the * zero cluster marker, or the unallocated cluster marker. + * + * Called with table_lock held. */ static void coroutine_fn qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index, unsigned int n, @@ -880,6 +901,7 @@ static void coroutine_fn qed_update_l2_table(BDRVQEDState *s, QEDTable *table, } } +/* Called with table_lock held. */ static void coroutine_fn qed_aio_complete(QEDAIOCB *acb) { BDRVQEDState *s = acb_to_s(acb); @@ -903,7 +925,7 @@ static void coroutine_fn qed_aio_complete(QEDAIOCB *acb) if (acb == s->allocating_acb) { s->allocating_acb = NULL; if (!qemu_co_queue_empty(&s->allocating_write_reqs)) { - qemu_co_enter_next(&s->allocating_write_reqs); + qemu_co_queue_next(&s->allocating_write_reqs); } else if (s->header.features & QED_F_NEED_CHECK) { qed_start_need_check_timer(s); } @@ -912,6 +934,8 @@ static void coroutine_fn qed_aio_complete(QEDAIOCB *acb) /** * Update L1 table with new L2 table offset and write it out + * + * Called with table_lock held. */ static int coroutine_fn qed_aio_write_l1_update(QEDAIOCB *acb) { @@ -940,6 +964,8 @@ static int coroutine_fn qed_aio_write_l1_update(QEDAIOCB *acb) /** * Update L2 table with new cluster offsets and write them out + * + * Called with table_lock held. */ static int coroutine_fn qed_aio_write_l2_update(QEDAIOCB *acb, uint64_t offset) { @@ -976,50 +1002,26 @@ static int coroutine_fn qed_aio_write_l2_update(QEDAIOCB *acb, uint64_t offset) /** * Write data to the image file + * + * Called with table_lock *not* held. */ static int coroutine_fn qed_aio_write_main(QEDAIOCB *acb) { BDRVQEDState *s = acb_to_s(acb); uint64_t offset = acb->cur_cluster + qed_offset_into_cluster(s, acb->cur_pos); - int ret; trace_qed_aio_write_main(s, acb, 0, offset, acb->cur_qiov.size); BLKDBG_EVENT(s->bs->file, BLKDBG_WRITE_AIO); - ret = bdrv_co_pwritev(s->bs->file, offset, acb->cur_qiov.size, - &acb->cur_qiov, 0); - if (ret < 0) { - return ret; - } - - if (acb->find_cluster_ret != QED_CLUSTER_FOUND) { - if (s->bs->backing) { - /* - * Flush new data clusters before updating the L2 table - * - * This flush is necessary when a backing file is in use. A crash - * during an allocating write could result in empty clusters in the - * image. If the write only touched a subregion of the cluster, - * then backing image sectors have been lost in the untouched - * region. The solution is to flush after writing a new data - * cluster and before updating the L2 table. - */ - ret = bdrv_co_flush(s->bs->file->bs); - if (ret < 0) { - return ret; - } - } - ret = qed_aio_write_l2_update(acb, acb->cur_cluster); - if (ret < 0) { - return ret; - } - } - return 0; + return bdrv_co_pwritev(s->bs->file, offset, acb->cur_qiov.size, + &acb->cur_qiov, 0); } /** * Populate untouched regions of new data cluster + * + * Called with table_lock held. */ static int coroutine_fn qed_aio_write_cow(QEDAIOCB *acb) { @@ -1027,6 +1029,8 @@ static int coroutine_fn qed_aio_write_cow(QEDAIOCB *acb) uint64_t start, len, offset; int ret; + qemu_co_mutex_unlock(&s->table_lock); + /* Populate front untouched region of new data cluster */ start = qed_start_of_cluster(s, acb->cur_pos); len = qed_offset_into_cluster(s, acb->cur_pos); @@ -1034,7 +1038,7 @@ static int coroutine_fn qed_aio_write_cow(QEDAIOCB *acb) trace_qed_aio_write_prefill(s, acb, start, len, acb->cur_cluster); ret = qed_copy_from_backing_file(s, start, len, acb->cur_cluster); if (ret < 0) { - return ret; + goto out; } /* Populate back untouched region of new data cluster */ @@ -1047,10 +1051,31 @@ static int coroutine_fn qed_aio_write_cow(QEDAIOCB *acb) trace_qed_aio_write_postfill(s, acb, start, len, offset); ret = qed_copy_from_backing_file(s, start, len, offset); if (ret < 0) { - return ret; + goto out; } - return qed_aio_write_main(acb); + ret = qed_aio_write_main(acb); + if (ret < 0) { + goto out; + } + + if (s->bs->backing) { + /* + * Flush new data clusters before updating the L2 table + * + * This flush is necessary when a backing file is in use. A crash + * during an allocating write could result in empty clusters in the + * image. If the write only touched a subregion of the cluster, + * then backing image sectors have been lost in the untouched + * region. The solution is to flush after writing a new data + * cluster and before updating the L2 table. + */ + ret = bdrv_co_flush(s->bs->file->bs); + } + +out: + qemu_co_mutex_lock(&s->table_lock); + return ret; } /** @@ -1073,6 +1098,8 @@ static bool qed_should_set_need_check(BDRVQEDState *s) * @len: Length in bytes * * This path is taken when writing to previously unallocated clusters. + * + * Called with table_lock held. */ static int coroutine_fn qed_aio_write_alloc(QEDAIOCB *acb, size_t len) { @@ -1087,7 +1114,7 @@ static int coroutine_fn qed_aio_write_alloc(QEDAIOCB *acb, size_t len) /* Freeze this request if another allocating write is in progress */ if (s->allocating_acb != acb || s->allocating_write_reqs_plugged) { if (s->allocating_acb != NULL) { - qemu_co_queue_wait(&s->allocating_write_reqs, NULL); + qemu_co_queue_wait(&s->allocating_write_reqs, &s->table_lock); assert(s->allocating_acb == NULL); } s->allocating_acb = acb; @@ -1103,6 +1130,7 @@ static int coroutine_fn qed_aio_write_alloc(QEDAIOCB *acb, size_t len) if (acb->find_cluster_ret == QED_CLUSTER_ZERO) { return 0; } + acb->cur_cluster = 1; } else { acb->cur_cluster = qed_alloc_clusters(s, acb->cur_nclusters); } @@ -1115,15 +1143,14 @@ static int coroutine_fn qed_aio_write_alloc(QEDAIOCB *acb, size_t len) } } - if (acb->flags & QED_AIOCB_ZERO) { - ret = qed_aio_write_l2_update(acb, 1); - } else { + if (!(acb->flags & QED_AIOCB_ZERO)) { ret = qed_aio_write_cow(acb); + if (ret < 0) { + return ret; + } } - if (ret < 0) { - return ret; - } - return 0; + + return qed_aio_write_l2_update(acb, acb->cur_cluster); } /** @@ -1134,10 +1161,17 @@ static int coroutine_fn qed_aio_write_alloc(QEDAIOCB *acb, size_t len) * @len: Length in bytes * * This path is taken when writing to already allocated clusters. + * + * Called with table_lock held. */ static int coroutine_fn qed_aio_write_inplace(QEDAIOCB *acb, uint64_t offset, size_t len) { + BDRVQEDState *s = acb_to_s(acb); + int r; + + qemu_co_mutex_unlock(&s->table_lock); + /* Allocate buffer for zero writes */ if (acb->flags & QED_AIOCB_ZERO) { struct iovec *iov = acb->qiov->iov; @@ -1145,7 +1179,8 @@ static int coroutine_fn qed_aio_write_inplace(QEDAIOCB *acb, uint64_t offset, if (!iov->iov_base) { iov->iov_base = qemu_try_blockalign(acb->bs, iov->iov_len); if (iov->iov_base == NULL) { - return -ENOMEM; + r = -ENOMEM; + goto out; } memset(iov->iov_base, 0, iov->iov_len); } @@ -1155,8 +1190,11 @@ static int coroutine_fn qed_aio_write_inplace(QEDAIOCB *acb, uint64_t offset, acb->cur_cluster = offset; qemu_iovec_concat(&acb->cur_qiov, acb->qiov, acb->qiov_offset, len); - /* Do the actual write */ - return qed_aio_write_main(acb); + /* Do the actual write. */ + r = qed_aio_write_main(acb); +out: + qemu_co_mutex_lock(&s->table_lock); + return r; } /** @@ -1166,6 +1204,8 @@ static int coroutine_fn qed_aio_write_inplace(QEDAIOCB *acb, uint64_t offset, * @ret: QED_CLUSTER_FOUND, QED_CLUSTER_L2 or QED_CLUSTER_L1 * @offset: Cluster offset in bytes * @len: Length in bytes + * + * Called with table_lock held. */ static int coroutine_fn qed_aio_write_data(void *opaque, int ret, uint64_t offset, size_t len) @@ -1197,6 +1237,8 @@ static int coroutine_fn qed_aio_write_data(void *opaque, int ret, * @ret: QED_CLUSTER_FOUND, QED_CLUSTER_L2 or QED_CLUSTER_L1 * @offset: Cluster offset in bytes * @len: Length in bytes + * + * Called with table_lock held. */ static int coroutine_fn qed_aio_read_data(void *opaque, int ret, uint64_t offset, size_t len) @@ -1204,6 +1246,9 @@ static int coroutine_fn qed_aio_read_data(void *opaque, int ret, QEDAIOCB *acb = opaque; BDRVQEDState *s = acb_to_s(acb); BlockDriverState *bs = acb->bs; + int r; + + qemu_co_mutex_unlock(&s->table_lock); /* Adjust offset into cluster */ offset += qed_offset_into_cluster(s, acb->cur_pos); @@ -1212,22 +1257,23 @@ static int coroutine_fn qed_aio_read_data(void *opaque, int ret, qemu_iovec_concat(&acb->cur_qiov, acb->qiov, acb->qiov_offset, len); - /* Handle zero cluster and backing file reads */ + /* Handle zero cluster and backing file reads, otherwise read + * data cluster directly. + */ if (ret == QED_CLUSTER_ZERO) { qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size); - return 0; + r = 0; } else if (ret != QED_CLUSTER_FOUND) { - return qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov, - &acb->backing_qiov); + r = qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov, + &acb->backing_qiov); + } else { + BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO); + r = bdrv_co_preadv(bs->file, offset, acb->cur_qiov.size, + &acb->cur_qiov, 0); } - BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO); - ret = bdrv_co_preadv(bs->file, offset, acb->cur_qiov.size, - &acb->cur_qiov, 0); - if (ret < 0) { - return ret; - } - return 0; + qemu_co_mutex_lock(&s->table_lock); + return r; } /** @@ -1240,6 +1286,7 @@ static int coroutine_fn qed_aio_next_io(QEDAIOCB *acb) size_t len; int ret; + qemu_co_mutex_lock(&s->table_lock); while (1) { trace_qed_aio_next_io(s, acb, 0, acb->cur_pos + acb->cur_qiov.size); @@ -1279,6 +1326,7 @@ static int coroutine_fn qed_aio_next_io(QEDAIOCB *acb) trace_qed_aio_complete(s, acb, ret); qed_aio_complete(acb); + qemu_co_mutex_unlock(&s->table_lock); return ret; } @@ -1474,8 +1522,14 @@ static void bdrv_qed_invalidate_cache(BlockDriverState *bs, Error **errp) bdrv_qed_close(bs); - memset(s, 0, sizeof(BDRVQEDState)); + bdrv_qed_init_state(bs); + if (qemu_in_coroutine()) { + qemu_co_mutex_lock(&s->table_lock); + } ret = bdrv_qed_do_open(bs, NULL, bs->open_flags, &local_err); + if (qemu_in_coroutine()) { + qemu_co_mutex_unlock(&s->table_lock); + } if (local_err) { error_propagate(errp, local_err); error_prepend(errp, "Could not reopen qed layer: "); @@ -1554,7 +1608,7 @@ static BlockDriver bdrv_qed = { .bdrv_check = bdrv_qed_check, .bdrv_detach_aio_context = bdrv_qed_detach_aio_context, .bdrv_attach_aio_context = bdrv_qed_attach_aio_context, - .bdrv_drain = bdrv_qed_drain, + .bdrv_co_drain = bdrv_qed_co_drain, }; static void bdrv_qed_init(void) diff --git a/block/qed.h b/block/qed.h index dd3a2d5519..f35341f134 100644 --- a/block/qed.h +++ b/block/qed.h @@ -151,15 +151,21 @@ typedef struct QEDAIOCB { typedef struct { BlockDriverState *bs; /* device */ - uint64_t file_size; /* length of image file, in bytes */ + /* Written only by an allocating write or the timer handler (the latter + * while allocating reqs are plugged). + */ QEDHeader header; /* always cpu-endian */ + + /* Protected by table_lock. */ + CoMutex table_lock; QEDTable *l1_table; L2TableCache l2_cache; /* l2 table cache */ uint32_t table_nelems; uint32_t l1_shift; uint32_t l2_shift; uint32_t l2_mask; + uint64_t file_size; /* length of image file, in bytes */ /* Allocating write request queue */ QEDAIOCB *allocating_acb; @@ -177,9 +183,6 @@ enum { QED_CLUSTER_L1, /* cluster missing in L1 */ }; -void qed_acquire(BDRVQEDState *s); -void qed_release(BDRVQEDState *s); - /** * Header functions */ diff --git a/block/sheepdog.c b/block/sheepdog.c index b7b7e6bbe5..abb2e79065 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -390,6 +390,7 @@ struct BDRVSheepdogState { QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head; QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head; + CoMutex queue_lock; CoQueue overlapping_queue; QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head; }; @@ -488,7 +489,7 @@ static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb) retry: QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) { if (AIOCBOverlapping(acb, cb)) { - qemu_co_queue_wait(&s->overlapping_queue, NULL); + qemu_co_queue_wait(&s->overlapping_queue, &s->queue_lock); goto retry; } } @@ -525,8 +526,10 @@ static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s, return; } + qemu_co_mutex_lock(&s->queue_lock); wait_for_overlapping_aiocb(s, acb); QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings); + qemu_co_mutex_unlock(&s->queue_lock); } static SocketAddress *sd_socket_address(const char *path, @@ -785,6 +788,7 @@ static coroutine_fn void reconnect_to_sdog(void *opaque) * have to move all the inflight requests to the failed queue before * resend_aioreq() is called. */ + qemu_co_mutex_lock(&s->queue_lock); QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) { QLIST_REMOVE(aio_req, aio_siblings); QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings); @@ -794,8 +798,11 @@ static coroutine_fn void reconnect_to_sdog(void *opaque) while (!QLIST_EMPTY(&s->failed_aio_head)) { aio_req = QLIST_FIRST(&s->failed_aio_head); QLIST_REMOVE(aio_req, aio_siblings); + qemu_co_mutex_unlock(&s->queue_lock); resend_aioreq(s, aio_req); + qemu_co_mutex_lock(&s->queue_lock); } + qemu_co_mutex_unlock(&s->queue_lock); } /* @@ -887,7 +894,10 @@ static void coroutine_fn aio_read_response(void *opaque) */ s->co_recv = NULL; + qemu_co_mutex_lock(&s->queue_lock); QLIST_REMOVE(aio_req, aio_siblings); + qemu_co_mutex_unlock(&s->queue_lock); + switch (rsp.result) { case SD_RES_SUCCESS: break; @@ -1307,7 +1317,9 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, uint64_t old_oid = aio_req->base_oid; bool create = aio_req->create; + qemu_co_mutex_lock(&s->queue_lock); QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings); + qemu_co_mutex_unlock(&s->queue_lock); if (!nr_copies) { error_report("bug"); @@ -1678,6 +1690,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags, bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE; pstrcpy(s->name, sizeof(s->name), vdi); qemu_co_mutex_init(&s->lock); + qemu_co_mutex_init(&s->queue_lock); qemu_co_queue_init(&s->overlapping_queue); qemu_opts_del(opts); g_free(buf); @@ -2438,12 +2451,16 @@ static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb) static void sd_aio_complete(SheepdogAIOCB *acb) { + BDRVSheepdogState *s; if (acb->aiocb_type == AIOCB_FLUSH_CACHE) { return; } + s = acb->s; + qemu_co_mutex_lock(&s->queue_lock); QLIST_REMOVE(acb, aiocb_siblings); - qemu_co_queue_restart_all(&acb->s->overlapping_queue); + qemu_co_queue_restart_all(&s->overlapping_queue); + qemu_co_mutex_unlock(&s->queue_lock); } static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num, diff --git a/block/ssh.c b/block/ssh.c index 07a57eb466..e8f0404c03 100644 --- a/block/ssh.c +++ b/block/ssh.c @@ -888,13 +888,22 @@ static int ssh_has_zero_init(BlockDriverState *bs) return has_zero_init; } +typedef struct BDRVSSHRestart { + BlockDriverState *bs; + Coroutine *co; +} BDRVSSHRestart; + static void restart_coroutine(void *opaque) { - Coroutine *co = opaque; + BDRVSSHRestart *restart = opaque; + BlockDriverState *bs = restart->bs; + BDRVSSHState *s = bs->opaque; + AioContext *ctx = bdrv_get_aio_context(bs); - DPRINTF("co=%p", co); + DPRINTF("co=%p", restart->co); + aio_set_fd_handler(ctx, s->sock, false, NULL, NULL, NULL, NULL); - aio_co_wake(co); + aio_co_wake(restart->co); } /* A non-blocking call returned EAGAIN, so yield, ensuring the @@ -905,7 +914,10 @@ static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs) { int r; IOHandler *rd_handler = NULL, *wr_handler = NULL; - Coroutine *co = qemu_coroutine_self(); + BDRVSSHRestart restart = { + .bs = bs, + .co = qemu_coroutine_self() + }; r = libssh2_session_block_directions(s->session); @@ -920,11 +932,9 @@ static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs) rd_handler, wr_handler); aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, - false, rd_handler, wr_handler, NULL, co); + false, rd_handler, wr_handler, NULL, &restart); qemu_coroutine_yield(); DPRINTF("s->sock=%d - back", s->sock); - aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false, - NULL, NULL, NULL, NULL); } /* SFTP has a function `libssh2_sftp_seek64' which seeks to a position diff --git a/block/vdi.c b/block/vdi.c index 2b6e8fa1ed..8da5dfc897 100644 --- a/block/vdi.c +++ b/block/vdi.c @@ -172,7 +172,7 @@ typedef struct { /* VDI header (converted to host endianness). */ VdiHeader header; - CoMutex write_lock; + CoRwlock bmap_lock; Error *migration_blocker; } BDRVVdiState; @@ -485,7 +485,7 @@ static int vdi_open(BlockDriverState *bs, QDict *options, int flags, goto fail_free_bmap; } - qemu_co_mutex_init(&s->write_lock); + qemu_co_rwlock_init(&s->bmap_lock); return 0; @@ -557,7 +557,9 @@ vdi_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, n_bytes, offset); /* prepare next AIO request */ + qemu_co_rwlock_rdlock(&s->bmap_lock); bmap_entry = le32_to_cpu(s->bmap[block_index]); + qemu_co_rwlock_unlock(&s->bmap_lock); if (!VDI_IS_ALLOCATED(bmap_entry)) { /* Block not allocated, return zeros, no need to wait. */ qemu_iovec_memset(qiov, bytes_done, 0, n_bytes); @@ -595,6 +597,7 @@ vdi_co_pwritev(BlockDriverState *bs, uint64_t offset, uint64_t bytes, uint32_t block_index; uint32_t offset_in_block; uint32_t n_bytes; + uint64_t data_offset; uint32_t bmap_first = VDI_UNALLOCATED; uint32_t bmap_last = VDI_UNALLOCATED; uint8_t *block = NULL; @@ -614,10 +617,19 @@ vdi_co_pwritev(BlockDriverState *bs, uint64_t offset, uint64_t bytes, n_bytes, offset); /* prepare next AIO request */ + qemu_co_rwlock_rdlock(&s->bmap_lock); bmap_entry = le32_to_cpu(s->bmap[block_index]); if (!VDI_IS_ALLOCATED(bmap_entry)) { /* Allocate new block and write to it. */ uint64_t data_offset; + qemu_co_rwlock_upgrade(&s->bmap_lock); + bmap_entry = le32_to_cpu(s->bmap[block_index]); + if (VDI_IS_ALLOCATED(bmap_entry)) { + /* A concurrent allocation did the work for us. */ + qemu_co_rwlock_downgrade(&s->bmap_lock); + goto nonallocating_write; + } + bmap_entry = s->header.blocks_allocated; s->bmap[block_index] = cpu_to_le32(bmap_entry); s->header.blocks_allocated++; @@ -635,30 +647,18 @@ vdi_co_pwritev(BlockDriverState *bs, uint64_t offset, uint64_t bytes, memset(block + offset_in_block + n_bytes, 0, s->block_size - n_bytes - offset_in_block); - /* Note that this coroutine does not yield anywhere from reading the - * bmap entry until here, so in regards to all the coroutines trying - * to write to this cluster, the one doing the allocation will - * always be the first to try to acquire the lock. - * Therefore, it is also the first that will actually be able to - * acquire the lock and thus the padded cluster is written before - * the other coroutines can write to the affected area. */ - qemu_co_mutex_lock(&s->write_lock); + /* Write the new block under CoRwLock write-side protection, + * so this full-cluster write does not overlap a partial write + * of the same cluster, issued from the "else" branch. + */ ret = bdrv_pwrite(bs->file, data_offset, block, s->block_size); - qemu_co_mutex_unlock(&s->write_lock); + qemu_co_rwlock_unlock(&s->bmap_lock); } else { - uint64_t data_offset = s->header.offset_data + - (uint64_t)bmap_entry * s->block_size + - offset_in_block; - qemu_co_mutex_lock(&s->write_lock); - /* This lock is only used to make sure the following write operation - * is executed after the write issued by the coroutine allocating - * this cluster, therefore we do not need to keep it locked. - * As stated above, the allocating coroutine will always try to lock - * the mutex before all the other concurrent accesses to that - * cluster, therefore at this point we can be absolutely certain - * that that write operation has returned (there may be other writes - * in flight, but they do not concern this very operation). */ - qemu_co_mutex_unlock(&s->write_lock); +nonallocating_write: + data_offset = s->header.offset_data + + (uint64_t)bmap_entry * s->block_size + + offset_in_block; + qemu_co_rwlock_unlock(&s->bmap_lock); qemu_iovec_reset(&local_qiov); qemu_iovec_concat(&local_qiov, qiov, bytes_done, n_bytes); diff --git a/block/vpc.c b/block/vpc.c index 9a6f8173a5..8057d42a23 100644 --- a/block/vpc.c +++ b/block/vpc.c @@ -496,12 +496,6 @@ static inline int64_t get_image_offset(BlockDriverState *bs, uint64_t offset, return block_offset; } -static inline int64_t get_sector_offset(BlockDriverState *bs, - int64_t sector_num, bool write) -{ - return get_image_offset(bs, sector_num * BDRV_SECTOR_SIZE, write); -} - /* * Writes the footer to the end of the image file. This is needed when the * file grows as it overwrites the old footer @@ -696,6 +690,7 @@ static int64_t coroutine_fn vpc_co_get_block_status(BlockDriverState *bs, VHDFooter *footer = (VHDFooter*) s->footer_buf; int64_t start, offset; bool allocated; + int64_t ret; int n; if (be32_to_cpu(footer->type) == VHD_FIXED) { @@ -705,10 +700,13 @@ static int64_t coroutine_fn vpc_co_get_block_status(BlockDriverState *bs, (sector_num << BDRV_SECTOR_BITS); } - offset = get_sector_offset(bs, sector_num, 0); + qemu_co_mutex_lock(&s->lock); + + offset = get_image_offset(bs, sector_num << BDRV_SECTOR_BITS, false); start = offset; allocated = (offset != -1); *pnum = 0; + ret = 0; do { /* All sectors in a block are contiguous (without using the bitmap) */ @@ -723,15 +721,17 @@ static int64_t coroutine_fn vpc_co_get_block_status(BlockDriverState *bs, * sectors since there is always a bitmap in between. */ if (allocated) { *file = bs->file->bs; - return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | start; + ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | start; + break; } if (nb_sectors == 0) { break; } - offset = get_sector_offset(bs, sector_num, 0); + offset = get_image_offset(bs, sector_num << BDRV_SECTOR_BITS, false); } while (offset == -1); - return 0; + qemu_co_mutex_unlock(&s->lock); + return ret; } /* diff --git a/block/vvfat.c b/block/vvfat.c index 4fd28e1e87..4dae790203 100644 --- a/block/vvfat.c +++ b/block/vvfat.c @@ -3078,8 +3078,14 @@ static int coroutine_fn write_target_commit(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags) { + int ret; + BDRVVVFATState* s = *((BDRVVVFATState**) bs->opaque); - return try_commit(s); + qemu_co_mutex_lock(&s->lock); + ret = try_commit(s); + qemu_co_mutex_unlock(&s->lock); + + return ret; } static void write_target_close(BlockDriverState *bs) { diff --git a/include/block/block_int.h b/include/block/block_int.h index 669a2797fd..5c6b761d81 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -324,7 +324,7 @@ struct BlockDriver { * Drain and stop any internal sources of requests in the driver, and * remain so until next I/O callback (e.g. bdrv_co_writev) is called. */ - void (*bdrv_drain)(BlockDriverState *bs); + void coroutine_fn (*bdrv_co_drain)(BlockDriverState *bs); void (*bdrv_add_child)(BlockDriverState *parent, BlockDriverState *child, Error **errp); diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index a4509bd977..9aff9a735e 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -228,6 +228,24 @@ void qemu_co_rwlock_init(CoRwlock *lock); */ void qemu_co_rwlock_rdlock(CoRwlock *lock); +/** + * Write Locks the CoRwlock from a reader. This is a bit more efficient than + * @qemu_co_rwlock_unlock followed by a separate @qemu_co_rwlock_wrlock. + * However, if the lock cannot be upgraded immediately, control is transferred + * to the caller of the current coroutine. Also, @qemu_co_rwlock_upgrade + * only overrides CoRwlock fairness if there are no concurrent readers, so + * another writer might run while @qemu_co_rwlock_upgrade blocks. + */ +void qemu_co_rwlock_upgrade(CoRwlock *lock); + +/** + * Downgrades a write-side critical section to a reader. Downgrading with + * @qemu_co_rwlock_downgrade never blocks, unlike @qemu_co_rwlock_unlock + * followed by @qemu_co_rwlock_rdlock. This makes it more efficient, but + * may also sometimes be necessary for correctness. + */ +void qemu_co_rwlock_downgrade(CoRwlock *lock); + /** * Write Locks the mutex. If the lock cannot be taken immediately because * of a parallel reader, control is transferred to the caller of the current diff --git a/tests/docker/Makefile.include b/tests/docker/Makefile.include index 037cb9e9e7..012a2fc1af 100644 --- a/tests/docker/Makefile.include +++ b/tests/docker/Makefile.include @@ -106,6 +106,8 @@ docker: @echo ' (default is 1)' @echo ' DEBUG=1 Stop and drop to shell in the created container' @echo ' before running the command.' + @echo ' NETWORK=1 Enable virtual network interface with default backend.' + @echo ' NETWORK=$BACKEND Enable virtual network interface with $BACKEND.' @echo ' NOUSER Define to disable adding current user to containers passwd.' @echo ' NOCACHE=1 Ignore cache when build images.' @echo ' EXECUTABLE= Include executable in image.' @@ -132,7 +134,8 @@ docker-run: docker-qemu-src $(SRC_PATH)/tests/docker/docker.py run \ $(if $(NOUSER),,-u $(shell id -u)) -t \ $(if $V,,--rm) \ - $(if $(DEBUG),-i,--net=none) \ + $(if $(DEBUG),-i,) \ + $(if $(NETWORK),$(if $(subst $(NETWORK),,1),--net=$(NETWORK)),--net=none) \ -e TARGET_LIST=$(TARGET_LIST) \ -e EXTRA_CONFIGURE_OPTS="$(EXTRA_CONFIGURE_OPTS)" \ -e V=$V -e J=$J -e DEBUG=$(DEBUG) \ diff --git a/tests/docker/docker.py b/tests/docker/docker.py index e707e5bcca..ee40ca04d9 100755 --- a/tests/docker/docker.py +++ b/tests/docker/docker.py @@ -112,13 +112,16 @@ class Docker(object): signal.signal(signal.SIGTERM, self._kill_instances) signal.signal(signal.SIGHUP, self._kill_instances) - def _do(self, cmd, quiet=True, infile=None, **kwargs): + def _do(self, cmd, quiet=True, **kwargs): if quiet: kwargs["stdout"] = DEVNULL - if infile: - kwargs["stdin"] = infile return subprocess.call(self._command + cmd, **kwargs) + def _do_check(self, cmd, quiet=True, **kwargs): + if quiet: + kwargs["stdout"] = DEVNULL + return subprocess.check_call(self._command + cmd, **kwargs) + def _do_kill_instances(self, only_known, only_active=True): cmd = ["ps", "-q"] if not only_active: @@ -177,14 +180,14 @@ class Docker(object): extra_files_cksum))) tmp_df.flush() - self._do(["build", "-t", tag, "-f", tmp_df.name] + argv + \ - [docker_dir], - quiet=quiet) + self._do_check(["build", "-t", tag, "-f", tmp_df.name] + argv + \ + [docker_dir], + quiet=quiet) def update_image(self, tag, tarball, quiet=True): "Update a tagged image using " - self._do(["build", "-t", tag, "-"], quiet=quiet, infile=tarball) + self._do_check(["build", "-t", tag, "-"], quiet=quiet, stdin=tarball) def image_matches_dockerfile(self, tag, dockerfile): try: @@ -197,9 +200,9 @@ class Docker(object): label = uuid.uuid1().hex if not keep: self._instances.append(label) - ret = self._do(["run", "--label", - "com.qemu.instance.uuid=" + label] + cmd, - quiet=quiet) + ret = self._do_check(["run", "--label", + "com.qemu.instance.uuid=" + label] + cmd, + quiet=quiet) if not keep: self._instances.remove(label) return ret diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index b44b5d55eb..846ff9167f 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -402,6 +402,21 @@ void qemu_co_rwlock_unlock(CoRwlock *lock) qemu_co_mutex_unlock(&lock->mutex); } +void qemu_co_rwlock_downgrade(CoRwlock *lock) +{ + Coroutine *self = qemu_coroutine_self(); + + /* lock->mutex critical section started in qemu_co_rwlock_wrlock or + * qemu_co_rwlock_upgrade. + */ + assert(lock->reader == 0); + lock->reader++; + qemu_co_mutex_unlock(&lock->mutex); + + /* The rest of the read-side critical section is run without the mutex. */ + self->locks_held++; +} + void qemu_co_rwlock_wrlock(CoRwlock *lock) { qemu_co_mutex_lock(&lock->mutex); @@ -416,3 +431,23 @@ void qemu_co_rwlock_wrlock(CoRwlock *lock) * There is no need to update self->locks_held. */ } + +void qemu_co_rwlock_upgrade(CoRwlock *lock) +{ + Coroutine *self = qemu_coroutine_self(); + + qemu_co_mutex_lock(&lock->mutex); + assert(lock->reader > 0); + lock->reader--; + lock->pending_writer++; + while (lock->reader) { + qemu_co_queue_wait(&lock->queue, &lock->mutex); + } + lock->pending_writer--; + + /* The rest of the write-side critical section is run with + * the mutex taken, similar to qemu_co_rwlock_wrlock. Do + * not account for the lock twice in self->locks_held. + */ + self->locks_held--; +}