From 993295fedc5fefaefee9ac80e057d8bf08026ef4 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Sat, 17 Sep 2011 16:27:59 +0200 Subject: [PATCH 01/26] add qemu_send_full and qemu_recv_full Signed-off-by: Paolo Bonzini --- osdep.c | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++ qemu-common.h | 4 +++ 2 files changed, 71 insertions(+) diff --git a/osdep.c b/osdep.c index 56e6963f15..70bad27f6e 100644 --- a/osdep.c +++ b/osdep.c @@ -166,3 +166,70 @@ int qemu_accept(int s, struct sockaddr *addr, socklen_t *addrlen) return ret; } + +/* + * A variant of send(2) which handles partial write. + * + * Return the number of bytes transferred, which is only + * smaller than `count' if there is an error. + * + * This function won't work with non-blocking fd's. + * Any of the possibilities with non-bloking fd's is bad: + * - return a short write (then name is wrong) + * - busy wait adding (errno == EAGAIN) to the loop + */ +ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags) +{ + ssize_t ret = 0; + ssize_t total = 0; + + while (count) { + ret = send(fd, buf, count, flags); + if (ret < 0) { + if (errno == EINTR) { + continue; + } + break; + } + + count -= ret; + buf += ret; + total += ret; + } + + return total; +} + +/* + * A variant of recv(2) which handles partial write. + * + * Return the number of bytes transferred, which is only + * smaller than `count' if there is an error. + * + * This function won't work with non-blocking fd's. + * Any of the possibilities with non-bloking fd's is bad: + * - return a short write (then name is wrong) + * - busy wait adding (errno == EAGAIN) to the loop + */ +ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags) +{ + ssize_t ret = 0; + ssize_t total = 0; + + while (count) { + ret = qemu_recv(fd, buf, count, flags); + if (ret <= 0) { + if (ret < 0 && errno == EINTR) { + continue; + } + break; + } + + count -= ret; + buf += ret; + total += ret; + } + + return total; +} + diff --git a/qemu-common.h b/qemu-common.h index b2de015629..5c3f3af6aa 100644 --- a/qemu-common.h +++ b/qemu-common.h @@ -173,6 +173,10 @@ void *qemu_oom_check(void *ptr); int qemu_open(const char *name, int flags, ...); ssize_t qemu_write_full(int fd, const void *buf, size_t count) QEMU_WARN_UNUSED_RESULT; +ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags) + QEMU_WARN_UNUSED_RESULT; +ssize_t qemu_recv_full(int fd, const void *buf, size_t count, int flags) + QEMU_WARN_UNUSED_RESULT; void qemu_set_cloexec(int fd); #ifndef _WIN32 From 8c5135f90e2dcf1d5c3d03106e0ac6e371ccb572 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 8 Sep 2011 13:46:25 +0200 Subject: [PATCH 02/26] sheepdog: move coroutine send/recv function to generic code Outside coroutines, avoid busy waiting on EAGAIN by temporarily making the socket blocking. The API of qemu_recvv/qemu_sendv is slightly different from do_readv/do_writev because they do not handle coroutines. It returns the number of bytes written before encountering an EAGAIN. The specificity of yielding on EAGAIN is entirely in qemu-coroutine.c. Reviewed-by: MORITA Kazutaka Signed-off-by: Paolo Bonzini --- Makefile.objs | 2 +- block/sheepdog.c | 230 ++++---------------------------------------- cutils.c | 111 +++++++++++++++++++++ qemu-common.h | 32 +++++- qemu-coroutine-io.c | 96 ++++++++++++++++++ 5 files changed, 260 insertions(+), 211 deletions(-) create mode 100644 qemu-coroutine-io.c diff --git a/Makefile.objs b/Makefile.objs index f753d838ff..8813673584 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -12,7 +12,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o ####################################################################### # coroutines -coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o +coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o ifeq ($(CONFIG_UCONTEXT_COROUTINE),y) coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o else diff --git a/block/sheepdog.c b/block/sheepdog.c index aa9707f2ae..00ea5a0acc 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, return acb; } -#ifdef _WIN32 - -struct msghdr { - struct iovec *msg_iov; - size_t msg_iovlen; -}; - -static ssize_t sendmsg(int s, const struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } - - ret = send(s, buf, size, flags); - - g_free(buf); - return ret; -} - -static ssize_t recvmsg(int s, struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - ret = qemu_recv(s, buf, size, flags); - if (ret < 0) { - goto out; - } - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } -out: - g_free(buf); - return ret; -} - -#endif - -/* - * Send/recv data with iovec buffers - * - * This function send/recv data from/to the iovec buffer directly. - * The first `offset' bytes in the iovec buffer are skipped and next - * `len' bytes are used. - * - * For example, - * - * do_send_recv(sockfd, iov, len, offset, 1); - * - * is equals to - * - * char *buf = malloc(size); - * iov_to_buf(iov, iovcnt, buf, offset, size); - * send(sockfd, buf, size, 0); - * free(buf); - */ -static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset, - int write) -{ - struct msghdr msg; - int ret, diff; - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - len += offset; - - while (iov->iov_len < len) { - len -= iov->iov_len; - - iov++; - msg.msg_iovlen++; - } - - diff = iov->iov_len - len; - iov->iov_len -= diff; - - while (msg.msg_iov->iov_len <= offset) { - offset -= msg.msg_iov->iov_len; - - msg.msg_iov++; - msg.msg_iovlen--; - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset; - msg.msg_iov->iov_len -= offset; - - if (write) { - ret = sendmsg(sockfd, &msg, 0); - } else { - ret = recvmsg(sockfd, &msg, 0); - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset; - msg.msg_iov->iov_len += offset; - - iov->iov_len += diff; - return ret; -} - static int connect_to_sdog(const char *addr, const char *port) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; @@ -618,83 +495,19 @@ success: return fd; } -static int do_readv_writev(int sockfd, struct iovec *iov, int len, - int iov_offset, int write) -{ - int ret; -again: - ret = do_send_recv(sockfd, iov, len, iov_offset, write); - if (ret < 0) { - if (errno == EINTR) { - goto again; - } - if (errno == EAGAIN) { - if (qemu_in_coroutine()) { - qemu_coroutine_yield(); - } - goto again; - } - error_report("failed to recv a rsp, %s", strerror(errno)); - return 1; - } - - iov_offset += ret; - len -= ret; - if (len) { - goto again; - } - - return 0; -} - -static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 0); -} - -static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 1); -} - -static int do_read_write(int sockfd, void *buf, int len, int write) -{ - struct iovec iov; - - iov.iov_base = buf; - iov.iov_len = len; - - return do_readv_writev(sockfd, &iov, len, 0, write); -} - -static int do_read(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 0); -} - -static int do_write(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 1); -} - static int send_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen) { int ret; - struct iovec iov[2]; - iov[0].iov_base = hdr; - iov[0].iov_len = sizeof(*hdr); - - if (*wlen) { - iov[1].iov_base = data; - iov[1].iov_len = *wlen; + ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { + error_report("failed to send a req, %s", strerror(errno)); } - ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0); - if (ret) { + ret = qemu_send_full(sockfd, data, *wlen, 0); + if (ret < *wlen) { error_report("failed to send a req, %s", strerror(errno)); - ret = -1; } return ret; @@ -705,16 +518,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, { int ret; + socket_set_block(sockfd); ret = send_req(sockfd, hdr, data, wlen); - if (ret) { - ret = -1; + if (ret < 0) { goto out; } - ret = do_read(sockfd, hdr, sizeof(*hdr)); - if (ret) { + ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { error_report("failed to get a rsp, %s", strerror(errno)); - ret = -1; goto out; } @@ -723,15 +535,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, } if (*rlen) { - ret = do_read(sockfd, data, *rlen); - if (ret) { + ret = qemu_recv_full(sockfd, data, *rlen, 0); + if (ret < *rlen) { error_report("failed to get the data, %s", strerror(errno)); - ret = -1; goto out; } } ret = 0; out: + socket_set_nonblock(sockfd); return ret; } @@ -793,8 +605,8 @@ static void coroutine_fn aio_read_response(void *opaque) } /* read a header */ - ret = do_read(fd, &rsp, sizeof(rsp)); - if (ret) { + ret = qemu_co_recv(fd, &rsp, sizeof(rsp)); + if (ret < 0) { error_report("failed to get the header, %s", strerror(errno)); goto out; } @@ -839,9 +651,9 @@ static void coroutine_fn aio_read_response(void *opaque) } break; case AIOCB_READ_UDATA: - ret = do_readv(fd, acb->qiov->iov, rsp.data_length, - aio_req->iov_offset); - if (ret) { + ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length, + aio_req->iov_offset); + if (ret < 0) { error_report("failed to get the data, %s", strerror(errno)); goto out; } @@ -1114,16 +926,16 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, set_cork(s->fd, 1); /* send a header */ - ret = do_write(s->fd, &hdr, sizeof(hdr)); - if (ret) { + ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); + if (ret < 0) { qemu_co_mutex_unlock(&s->lock); error_report("failed to send a req, %s", strerror(errno)); return -EIO; } if (wlen) { - ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset); - if (ret) { + ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset); + if (ret < 0) { qemu_co_mutex_unlock(&s->lock); error_report("failed to send a data, %s", strerror(errno)); return -EIO; diff --git a/cutils.c b/cutils.c index 24b3fe355b..a6ffd46445 100644 --- a/cutils.c +++ b/cutils.c @@ -25,6 +25,8 @@ #include "host-utils.h" #include +#include "qemu_socket.h" + void pstrcpy(char *buf, int buf_size, const char *str) { int c; @@ -403,3 +405,112 @@ int qemu_parse_fd(const char *param) } return fd; } + +/* + * Send/recv data with iovec buffers + * + * This function send/recv data from/to the iovec buffer directly. + * The first `offset' bytes in the iovec buffer are skipped and next + * `len' bytes are used. + * + * For example, + * + * do_sendv_recvv(sockfd, iov, len, offset, 1); + * + * is equal to + * + * char *buf = malloc(size); + * iov_to_buf(iov, iovcnt, buf, offset, size); + * send(sockfd, buf, size, 0); + * free(buf); + */ +static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset, + int do_sendv) +{ + int ret, diff, iovlen; + struct iovec *last_iov; + + /* last_iov is inclusive, so count from one. */ + iovlen = 1; + last_iov = iov; + len += offset; + + while (last_iov->iov_len < len) { + len -= last_iov->iov_len; + + last_iov++; + iovlen++; + } + + diff = last_iov->iov_len - len; + last_iov->iov_len -= diff; + + while (iov->iov_len <= offset) { + offset -= iov->iov_len; + + iov++; + iovlen--; + } + + iov->iov_base = (char *) iov->iov_base + offset; + iov->iov_len -= offset; + + { +#if defined CONFIG_IOVEC && defined CONFIG_POSIX + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = iovlen; + + do { + if (do_sendv) { + ret = sendmsg(sockfd, &msg, 0); + } else { + ret = recvmsg(sockfd, &msg, 0); + } + } while (ret == -1 && errno == EINTR); +#else + struct iovec *p = iov; + ret = 0; + while (iovlen > 0) { + int rc; + if (do_sendv) { + rc = send(sockfd, p->iov_base, p->iov_len, 0); + } else { + rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0); + } + if (rc == -1) { + if (errno == EINTR) { + continue; + } + if (ret == 0) { + ret = -1; + } + break; + } + if (rc == 0) { + break; + } + ret += rc; + iovlen--, p++; + } +#endif + } + + /* Undo the changes above */ + iov->iov_base = (char *) iov->iov_base - offset; + iov->iov_len += offset; + last_iov->iov_len += diff; + return ret; +} + +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 0); +} + +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 1); +} + diff --git a/qemu-common.h b/qemu-common.h index 5c3f3af6aa..6ab7dfb1b9 100644 --- a/qemu-common.h +++ b/qemu-common.h @@ -175,7 +175,7 @@ ssize_t qemu_write_full(int fd, const void *buf, size_t count) QEMU_WARN_UNUSED_RESULT; ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags) QEMU_WARN_UNUSED_RESULT; -ssize_t qemu_recv_full(int fd, const void *buf, size_t count, int flags) +ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags) QEMU_WARN_UNUSED_RESULT; void qemu_set_cloexec(int fd); @@ -190,6 +190,9 @@ int qemu_pipe(int pipefd[2]); #define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags) #endif +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset); +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset); + /* Error handling. */ void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2); @@ -276,6 +279,33 @@ struct qemu_work_item { void qemu_init_vcpu(void *env); #endif +/** + * Sends an iovec (or optionally a part of it) down a socket, yielding + * when the socket is full. + */ +int qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset); + +/** + * Receives data into an iovec (or optionally into a part of it) from + * a socket, yielding when there is no data in the socket. + */ +int qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset); + + +/** + * Sends a buffer down a socket, yielding when the socket is full. + */ +int qemu_co_send(int sockfd, void *buf, int len); + +/** + * Receives data into a buffer from a socket, yielding when there + * is no data in the socket. + */ +int qemu_co_recv(int sockfd, void *buf, int len); + + typedef struct QEMUIOVector { struct iovec *iov; int niov; diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c new file mode 100644 index 0000000000..40fd514395 --- /dev/null +++ b/qemu-coroutine-io.c @@ -0,0 +1,96 @@ +/* + * Coroutine-aware I/O functions + * + * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation. + * Copyright (c) 2011, Red Hat, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include "qemu-common.h" +#include "qemu_socket.h" +#include "qemu-coroutine.h" + +int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_recvv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + if (ret == 0) { + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_sendv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_recvv(sockfd, &iov, len, 0); +} + +int coroutine_fn qemu_co_send(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_sendv(sockfd, &iov, len, 0); +} From ae255e523c256cf0708f1c16cb946ff96340a800 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 8 Sep 2011 14:28:59 +0200 Subject: [PATCH 03/26] nbd: switch to asynchronous operation Signed-off-by: Paolo Bonzini --- block/nbd.c | 188 ++++++++++++++++++++++++++++++++++------------------ nbd.c | 8 +++ 2 files changed, 131 insertions(+), 65 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 95212dac64..bea7acd213 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -47,13 +47,17 @@ #endif typedef struct BDRVNBDState { - CoMutex lock; int sock; uint32_t nbdflags; off_t size; size_t blocksize; char *export_name; /* An NBD server may export several devices */ + CoMutex mutex; + Coroutine *coroutine; + + struct nbd_reply reply; + /* If it begins with '/', this is a UNIX domain socket. Otherwise, * it's a string of the form :port */ @@ -106,6 +110,95 @@ out: return err; } +static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request) +{ + qemu_co_mutex_lock(&s->mutex); + s->coroutine = qemu_coroutine_self(); + request->handle = (uint64_t)(intptr_t)s; +} + +static int nbd_have_request(void *opaque) +{ + BDRVNBDState *s = opaque; + + return !!s->coroutine; +} + +static void nbd_reply_ready(void *opaque) +{ + BDRVNBDState *s = opaque; + + if (s->reply.handle == 0) { + /* No reply already in flight. Fetch a header. */ + if (nbd_receive_reply(s->sock, &s->reply) < 0) { + s->reply.handle = 0; + } + } + + /* There's no need for a mutex on the receive side, because the + * handler acts as a synchronization point and ensures that only + * one coroutine is called until the reply finishes. */ + if (s->coroutine) { + qemu_coroutine_enter(s->coroutine, NULL); + } +} + +static void nbd_restart_write(void *opaque) +{ + BDRVNBDState *s = opaque; + qemu_coroutine_enter(s->coroutine, NULL); +} + +static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, + struct iovec *iov, int offset) +{ + int rc, ret; + + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, + nbd_have_request, NULL, s); + rc = nbd_send_request(s->sock, request); + if (rc != -1 && iov) { + ret = qemu_co_sendv(s->sock, iov, request->len, offset); + if (ret != request->len) { + errno = -EIO; + rc = -1; + } + } + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); + return rc; +} + +static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, + struct nbd_reply *reply, + struct iovec *iov, int offset) +{ + int ret; + + /* Wait until we're woken up by the read handler. */ + qemu_coroutine_yield(); + *reply = s->reply; + if (reply->handle != request->handle) { + reply->error = EIO; + } else { + if (iov && reply->error == 0) { + ret = qemu_co_recvv(s->sock, iov, request->len, offset); + if (ret != request->len) { + reply->error = EIO; + } + } + + /* Tell the read handler to read another header. */ + s->reply.handle = 0; + } +} + +static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request) +{ + s->coroutine = NULL; + qemu_co_mutex_unlock(&s->mutex); +} + static int nbd_establish_connection(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; @@ -135,8 +228,11 @@ static int nbd_establish_connection(BlockDriverState *bs) return -errno; } - /* Now that we're connected, set the socket to be non-blocking */ + /* Now that we're connected, set the socket to be non-blocking and + * kick the reply mechanism. */ socket_set_nonblock(sock); + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); s->sock = sock; s->size = size; @@ -152,11 +248,11 @@ static void nbd_teardown_connection(BlockDriverState *bs) struct nbd_request request; request.type = NBD_CMD_DISC; - request.handle = (uint64_t)(intptr_t)bs; request.from = 0; request.len = 0; nbd_send_request(s->sock, &request); + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL); closesocket(s->sock); } @@ -165,6 +261,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) BDRVNBDState *s = bs->opaque; int result; + qemu_co_mutex_init(&s->mutex); + /* Pop the config into our state object. Exit if invalid. */ result = nbd_config(s, filename, flags); if (result != 0) { @@ -176,90 +274,50 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) */ result = nbd_establish_connection(bs); - qemu_co_mutex_init(&s->lock); return result; } -static int nbd_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) +static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_READ; - request.handle = (uint64_t)(intptr_t)bs; request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; - - if (reply.handle != request.handle) - return -EIO; - - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len) - return -EIO; - - return 0; } -static int nbd_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) +static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_WRITE; - request.handle = (uint64_t)(intptr_t)bs; request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; - - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len) - return -EIO; - - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; - - if (reply.handle != request.handle) - return -EIO; - - return 0; -} - -static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) -{ - int ret; - BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_read(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; -} - -static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) -{ - int ret; - BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_write(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; } static void nbd_close(BlockDriverState *bs) @@ -282,8 +340,8 @@ static BlockDriver bdrv_nbd = { .format_name = "nbd", .instance_size = sizeof(BDRVNBDState), .bdrv_file_open = nbd_open, - .bdrv_read = nbd_co_read, - .bdrv_write = nbd_co_write, + .bdrv_co_readv = nbd_co_readv, + .bdrv_co_writev = nbd_co_writev, .bdrv_close = nbd_close, .bdrv_getlength = nbd_getlength, .protocol_name = "nbd", diff --git a/nbd.c b/nbd.c index de880fe3c6..ff701d3dc8 100644 --- a/nbd.c +++ b/nbd.c @@ -81,6 +81,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; + if (qemu_in_coroutine()) { + if (do_read) { + return qemu_co_recv(fd, buffer, size); + } else { + return qemu_co_send(fd, buffer, size); + } + } + while (offset < size) { ssize_t len; From d9b09f13ca581aad92663dc90c2d57a775f3fb9c Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Sat, 10 Sep 2011 15:06:52 +0200 Subject: [PATCH 04/26] nbd: split requests qemu-nbd has a limit of slightly less than 1M per request. Work around this in the nbd block driver. Signed-off-by: Paolo Bonzini --- block/nbd.c | 52 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index bea7acd213..9d661c1a92 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -277,8 +277,9 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) return result; } -static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num, - int nb_sectors, QEMUIOVector *qiov) +static int nbd_co_readv_1(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov, + int offset) { BDRVNBDState *s = bs->opaque; struct nbd_request request; @@ -292,15 +293,16 @@ static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num, if (nbd_co_send_request(s, &request, NULL, 0) == -1) { reply.error = errno; } else { - nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0); + nbd_co_receive_reply(s, &request, &reply, qiov->iov, offset); } nbd_coroutine_end(s, &request); return -reply.error; } -static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, - int nb_sectors, QEMUIOVector *qiov) +static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov, + int offset) { BDRVNBDState *s = bs->opaque; struct nbd_request request; @@ -311,7 +313,7 @@ static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, request.len = nb_sectors * 512; nbd_coroutine_start(s, &request); - if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) { + if (nbd_co_send_request(s, &request, qiov->iov, offset) == -1) { reply.error = errno; } else { nbd_co_receive_reply(s, &request, &reply, NULL, 0); @@ -320,6 +322,44 @@ static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, return -reply.error; } +/* qemu-nbd has a limit of slightly less than 1M per request. Try to + * remain aligned to 4K. */ +#define NBD_MAX_SECTORS 2040 + +static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) +{ + int offset = 0; + int ret; + while (nb_sectors > NBD_MAX_SECTORS) { + ret = nbd_co_readv_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset); + if (ret < 0) { + return ret; + } + offset += NBD_MAX_SECTORS * 512; + sector_num += NBD_MAX_SECTORS; + nb_sectors -= NBD_MAX_SECTORS; + } + return nbd_co_readv_1(bs, sector_num, nb_sectors, qiov, offset); +} + +static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) +{ + int offset = 0; + int ret; + while (nb_sectors > NBD_MAX_SECTORS) { + ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset); + if (ret < 0) { + return ret; + } + offset += NBD_MAX_SECTORS * 512; + sector_num += NBD_MAX_SECTORS; + nb_sectors -= NBD_MAX_SECTORS; + } + return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset); +} + static void nbd_close(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; From ecda3447d18637c48469296c4ca8823f0a5c6717 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Sat, 10 Sep 2011 15:06:52 +0200 Subject: [PATCH 05/26] nbd: allow multiple in-flight requests Allow sending up to 16 requests, and drive the replies to the coroutine that did the request. The code is written to be exactly the same as before this patch when MAX_NBD_REQUESTS == 1 (modulo the extra mutex and state). Signed-off-by: Paolo Bonzini --- block/nbd.c | 69 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 13 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 9d661c1a92..3f693e3ffa 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -46,6 +46,10 @@ #define logout(fmt, ...) ((void)0) #endif +#define MAX_NBD_REQUESTS 16 +#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) +#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) + typedef struct BDRVNBDState { int sock; uint32_t nbdflags; @@ -53,9 +57,12 @@ typedef struct BDRVNBDState { size_t blocksize; char *export_name; /* An NBD server may export several devices */ - CoMutex mutex; - Coroutine *coroutine; + CoMutex send_mutex; + CoMutex free_sema; + Coroutine *send_coroutine; + int in_flight; + Coroutine *recv_coroutine[MAX_NBD_REQUESTS]; struct nbd_reply reply; /* If it begins with '/', this is a UNIX domain socket. Otherwise, @@ -112,41 +119,68 @@ out: static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request) { - qemu_co_mutex_lock(&s->mutex); - s->coroutine = qemu_coroutine_self(); - request->handle = (uint64_t)(intptr_t)s; + int i; + + /* Poor man semaphore. The free_sema is locked when no other request + * can be accepted, and unlocked after receiving one reply. */ + if (s->in_flight >= MAX_NBD_REQUESTS - 1) { + qemu_co_mutex_lock(&s->free_sema); + assert(s->in_flight < MAX_NBD_REQUESTS); + } + s->in_flight++; + + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i] == NULL) { + s->recv_coroutine[i] = qemu_coroutine_self(); + break; + } + } + + assert(i < MAX_NBD_REQUESTS); + request->handle = INDEX_TO_HANDLE(s, i); } static int nbd_have_request(void *opaque) { BDRVNBDState *s = opaque; - return !!s->coroutine; + return s->in_flight > 0; } static void nbd_reply_ready(void *opaque) { BDRVNBDState *s = opaque; + int i; if (s->reply.handle == 0) { /* No reply already in flight. Fetch a header. */ if (nbd_receive_reply(s->sock, &s->reply) < 0) { s->reply.handle = 0; + goto fail; } } /* There's no need for a mutex on the receive side, because the * handler acts as a synchronization point and ensures that only * one coroutine is called until the reply finishes. */ - if (s->coroutine) { - qemu_coroutine_enter(s->coroutine, NULL); + i = HANDLE_TO_INDEX(s, s->reply.handle); + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + return; + } + +fail: + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + } } } static void nbd_restart_write(void *opaque) { BDRVNBDState *s = opaque; - qemu_coroutine_enter(s->coroutine, NULL); + qemu_coroutine_enter(s->send_coroutine, NULL); } static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, @@ -154,6 +188,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, { int rc, ret; + qemu_co_mutex_lock(&s->send_mutex); + s->send_coroutine = qemu_coroutine_self(); qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, nbd_have_request, NULL, s); rc = nbd_send_request(s->sock, request); @@ -166,6 +202,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, } qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, nbd_have_request, NULL, s); + s->send_coroutine = NULL; + qemu_co_mutex_unlock(&s->send_mutex); return rc; } @@ -175,7 +213,8 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, { int ret; - /* Wait until we're woken up by the read handler. */ + /* Wait until we're woken up by the read handler. TODO: perhaps + * peek at the next reply and avoid yielding if it's ours? */ qemu_coroutine_yield(); *reply = s->reply; if (reply->handle != request->handle) { @@ -195,8 +234,11 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request) { - s->coroutine = NULL; - qemu_co_mutex_unlock(&s->mutex); + int i = HANDLE_TO_INDEX(s, request->handle); + s->recv_coroutine[i] = NULL; + if (s->in_flight-- == MAX_NBD_REQUESTS) { + qemu_co_mutex_unlock(&s->free_sema); + } } static int nbd_establish_connection(BlockDriverState *bs) @@ -261,7 +303,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) BDRVNBDState *s = bs->opaque; int result; - qemu_co_mutex_init(&s->mutex); + qemu_co_mutex_init(&s->send_mutex); + qemu_co_mutex_init(&s->free_sema); /* Pop the config into our state object. Exit if invalid. */ result = nbd_config(s, filename, flags); From adcf6302de40e50a8010e7f2c79b3dac2eea6e0c Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Tue, 13 Sep 2011 17:27:45 +0200 Subject: [PATCH 06/26] nbd: fix error handling in the server bdrv_read and bdrv_write return negative errno values, not -1. Signed-off-by: Paolo Bonzini --- nbd.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/nbd.c b/nbd.c index ff701d3dc8..5b718b5973 100644 --- a/nbd.c +++ b/nbd.c @@ -595,6 +595,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, { struct nbd_request request; struct nbd_reply reply; + int ret; TRACE("Reading request."); @@ -633,12 +634,13 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, case NBD_CMD_READ: TRACE("Request type is READ"); - if (bdrv_read(bs, (request.from + dev_offset) / 512, - data + NBD_REPLY_SIZE, - request.len / 512) == -1) { + ret = bdrv_read(bs, (request.from + dev_offset) / 512, + data + NBD_REPLY_SIZE, + request.len / 512); + if (ret < 0) { LOG("reading from file failed"); - errno = EINVAL; - return -1; + reply.error = -ret; + request.len = 0; } *offset += request.len; @@ -681,11 +683,12 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, } else { TRACE("Writing to device"); - if (bdrv_write(bs, (request.from + dev_offset) / 512, - data, request.len / 512) == -1) { + ret = bdrv_write(bs, (request.from + dev_offset) / 512, + data, request.len / 512); + if (ret < 0) { LOG("writing to file failed"); - errno = EINVAL; - return -1; + reply.error = -ret; + request.len = 0; } *offset += request.len; From 2c7989a9b13a45963548cf4d89535fe90d809cac Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 21 Oct 2011 13:16:28 +0200 Subject: [PATCH 07/26] nbd: add support for NBD_CMD_FLAG_FUA Signed-off-by: Paolo Bonzini --- block/nbd.c | 4 ++++ nbd.c | 13 +++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 3f693e3ffa..2f483cddde 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -352,6 +352,10 @@ static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num, struct nbd_reply reply; request.type = NBD_CMD_WRITE; + if (!bdrv_enable_write_cache(bs) && (s->nbdflags & NBD_FLAG_SEND_FUA)) { + request.type |= NBD_CMD_FLAG_FUA; + } + request.from = sector_num * 512; request.len = nb_sectors * 512; diff --git a/nbd.c b/nbd.c index 5b718b5973..c597d4733f 100644 --- a/nbd.c +++ b/nbd.c @@ -202,7 +202,8 @@ int nbd_negotiate(int csock, off_t size, uint32_t flags) memcpy(buf, "NBDMAGIC", 8); cpu_to_be64w((uint64_t*)(buf + 8), 0x00420281861253LL); cpu_to_be64w((uint64_t*)(buf + 16), size); - cpu_to_be32w((uint32_t*)(buf + 24), flags | NBD_FLAG_HAS_FLAGS); + cpu_to_be32w((uint32_t*)(buf + 24), + flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FUA); memset(buf + 28, 0, 124); if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { @@ -630,7 +631,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, reply.handle = request.handle; reply.error = 0; - switch (request.type) { + switch (request.type & NBD_CMD_MASK_COMMAND) { case NBD_CMD_READ: TRACE("Request type is READ"); @@ -692,6 +693,14 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, } *offset += request.len; + + if (request.type & NBD_CMD_FLAG_FUA) { + ret = bdrv_flush(bs); + if (ret < 0) { + LOG("flush failed"); + reply.error = -ret; + } + } } if (nbd_send_reply(csock, &reply) == -1) From 1486d04a1b41093e5f97f20ca7397e01df055f49 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 21 Oct 2011 13:17:14 +0200 Subject: [PATCH 08/26] nbd: add support for NBD_CMD_FLUSH Signed-off-by: Paolo Bonzini --- block/nbd.c | 45 +++++++++++++++++++++++++++++++++++++-------- nbd.c | 15 ++++++++++++++- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 2f483cddde..097b4188e5 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -407,6 +407,34 @@ static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset); } +static int nbd_co_flush(BlockDriverState *bs) +{ + BDRVNBDState *s = bs->opaque; + struct nbd_request request; + struct nbd_reply reply; + + if (!(s->nbdflags & NBD_FLAG_SEND_FLUSH)) { + return 0; + } + + request.type = NBD_CMD_FLUSH; + if (s->nbdflags & NBD_FLAG_SEND_FUA) { + request.type |= NBD_CMD_FLAG_FUA; + } + + request.from = 0; + request.len = 0; + + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; +} + static void nbd_close(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; @@ -424,14 +452,15 @@ static int64_t nbd_getlength(BlockDriverState *bs) } static BlockDriver bdrv_nbd = { - .format_name = "nbd", - .instance_size = sizeof(BDRVNBDState), - .bdrv_file_open = nbd_open, - .bdrv_co_readv = nbd_co_readv, - .bdrv_co_writev = nbd_co_writev, - .bdrv_close = nbd_close, - .bdrv_getlength = nbd_getlength, - .protocol_name = "nbd", + .format_name = "nbd", + .instance_size = sizeof(BDRVNBDState), + .bdrv_file_open = nbd_open, + .bdrv_co_readv = nbd_co_readv, + .bdrv_co_writev = nbd_co_writev, + .bdrv_close = nbd_close, + .bdrv_co_flush_to_os = nbd_co_flush, + .bdrv_getlength = nbd_getlength, + .protocol_name = "nbd", }; static void bdrv_nbd_init(void) diff --git a/nbd.c b/nbd.c index c597d4733f..4fd0f4ed28 100644 --- a/nbd.c +++ b/nbd.c @@ -203,7 +203,8 @@ int nbd_negotiate(int csock, off_t size, uint32_t flags) cpu_to_be64w((uint64_t*)(buf + 8), 0x00420281861253LL); cpu_to_be64w((uint64_t*)(buf + 16), size); cpu_to_be32w((uint32_t*)(buf + 24), - flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FUA); + flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH | + NBD_FLAG_SEND_FUA); memset(buf + 28, 0, 124); if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { @@ -710,6 +711,18 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, TRACE("Request type is DISCONNECT"); errno = 0; return 1; + case NBD_CMD_FLUSH: + TRACE("Request type is FLUSH"); + + ret = bdrv_flush(bs); + if (ret < 0) { + LOG("flush failed"); + reply.error = -ret; + } + + if (nbd_send_reply(csock, &reply) == -1) + return -1; + break; default: LOG("invalid request type (%u) received", request.type); errno = EINVAL; From 7a706633e9f77a15d12bcfdef5d80f09892fa70c Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 21 Oct 2011 13:17:14 +0200 Subject: [PATCH 09/26] nbd: add support for NBD_CMD_TRIM Signed-off-by: Paolo Bonzini --- block/nbd.c | 25 +++++++++++++++++++++++++ nbd.c | 15 +++++++++++++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/block/nbd.c b/block/nbd.c index 097b4188e5..161b299855 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -435,6 +435,30 @@ static int nbd_co_flush(BlockDriverState *bs) return -reply.error; } +static int nbd_co_discard(BlockDriverState *bs, int64_t sector_num, + int nb_sectors) +{ + BDRVNBDState *s = bs->opaque; + struct nbd_request request; + struct nbd_reply reply; + + if (!(s->nbdflags & NBD_FLAG_SEND_TRIM)) { + return 0; + } + request.type = NBD_CMD_TRIM; + request.from = sector_num * 512;; + request.len = nb_sectors * 512; + + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; +} + static void nbd_close(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; @@ -459,6 +483,7 @@ static BlockDriver bdrv_nbd = { .bdrv_co_writev = nbd_co_writev, .bdrv_close = nbd_close, .bdrv_co_flush_to_os = nbd_co_flush, + .bdrv_co_discard = nbd_co_discard, .bdrv_getlength = nbd_getlength, .protocol_name = "nbd", }; diff --git a/nbd.c b/nbd.c index 4fd0f4ed28..7ab1b1f5c0 100644 --- a/nbd.c +++ b/nbd.c @@ -203,8 +203,8 @@ int nbd_negotiate(int csock, off_t size, uint32_t flags) cpu_to_be64w((uint64_t*)(buf + 8), 0x00420281861253LL); cpu_to_be64w((uint64_t*)(buf + 16), size); cpu_to_be32w((uint32_t*)(buf + 24), - flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH | - NBD_FLAG_SEND_FUA); + flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM | + NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA); memset(buf + 28, 0, 124); if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { @@ -720,6 +720,17 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, reply.error = -ret; } + if (nbd_send_reply(csock, &reply) == -1) + return -1; + break; + case NBD_CMD_TRIM: + TRACE("Request type is TRIM"); + ret = bdrv_discard(bs, (request.from + dev_offset) / 512, + request.len / 512); + if (ret < 0) { + LOG("discard failed"); + reply.error = -ret; + } if (nbd_send_reply(csock, &reply) == -1) return -1; break; From 3e05c785516efa6911504b1ddf936d2386c2e0b6 Mon Sep 17 00:00:00 2001 From: Chunyan Liu Date: Fri, 2 Dec 2011 23:27:54 +0800 Subject: [PATCH 10/26] Update ioctl order in nbd_init() to detect EBUSY Update ioctl(s) in nbd_init() to detect device busy early. Current nbd_init() issues NBD_CLEAR_SOCKET before NBD_SET_SOCKET, if issuing "qemu-nbd -c /dev/nbd0 disk.img" twice, the second time won't detect EBUSY in nbd_init(), but in nbd_client will report EBUSY and do clear socket (the 1st time command will be affacted too because of no socket any more.) No change to previous version. Signed-off-by: Chunyan Liu Signed-off-by: Paolo Bonzini --- nbd.c | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/nbd.c b/nbd.c index 7ab1b1f5c0..73fedeb9ad 100644 --- a/nbd.c +++ b/nbd.c @@ -358,6 +358,15 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, #ifdef __linux__ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize) { + TRACE("Setting NBD socket"); + + if (ioctl(fd, NBD_SET_SOCK, csock) == -1) { + int serrno = errno; + LOG("Failed to set NBD socket"); + errno = serrno; + return -1; + } + TRACE("Setting block size to %lu", (unsigned long)blocksize); if (ioctl(fd, NBD_SET_BLKSIZE, blocksize) == -1) { @@ -396,24 +405,6 @@ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize) return -1; } - TRACE("Clearing NBD socket"); - - if (ioctl(fd, NBD_CLEAR_SOCK) == -1) { - int serrno = errno; - LOG("Failed clearing NBD socket"); - errno = serrno; - return -1; - } - - TRACE("Setting NBD socket"); - - if (ioctl(fd, NBD_SET_SOCK, csock) == -1) { - int serrno = errno; - LOG("Failed to set NBD socket"); - errno = serrno; - return -1; - } - TRACE("Negotiation ended"); return 0; From 94607e7a775ae7b57219e2078b00ed2930ab98de Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 13:50:54 +0200 Subject: [PATCH 11/26] qemu-nbd: remove offset argument to nbd_trip The argument is write-only. Signed-off-by: Paolo Bonzini --- nbd.c | 8 +++----- nbd.h | 2 +- qemu-nbd.c | 3 +-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/nbd.c b/nbd.c index 73fedeb9ad..1df2b91bf2 100644 --- a/nbd.c +++ b/nbd.c @@ -583,8 +583,9 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) return 0; } -int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, - off_t *offset, uint32_t nbdflags, uint8_t *data, int data_size) +int nbd_trip(BlockDriverState *bs, int csock, off_t size, + uint64_t dev_offset, uint32_t nbdflags, + uint8_t *data, int data_size) { struct nbd_request request; struct nbd_reply reply; @@ -635,7 +636,6 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, reply.error = -ret; request.len = 0; } - *offset += request.len; TRACE("Read %u byte(s)", request.len); @@ -684,8 +684,6 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, request.len = 0; } - *offset += request.len; - if (request.type & NBD_CMD_FLAG_FUA) { ret = bdrv_flush(bs); if (ret < 0) { diff --git a/nbd.h b/nbd.h index 61553f4128..ebdb2dbf41 100644 --- a/nbd.h +++ b/nbd.h @@ -72,7 +72,7 @@ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize); int nbd_send_request(int csock, struct nbd_request *request); int nbd_receive_reply(int csock, struct nbd_reply *reply); int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, - off_t *offset, uint32_t nbdflags, uint8_t *data, int data_size); + uint32_t nbdflags, uint8_t *data, int data_size); int nbd_client(int fd); int nbd_disconnect(int fd); diff --git a/qemu-nbd.c b/qemu-nbd.c index 291cba2eaa..f9ee9c58d1 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -248,7 +248,6 @@ int main(int argc, char **argv) { BlockDriverState *bs; off_t dev_offset = 0; - off_t offset = 0; uint32_t nbdflags = 0; bool disconnect = false; const char *bindto = "0.0.0.0"; @@ -542,7 +541,7 @@ int main(int argc, char **argv) for (i = 1; i < nb_fds && ret; i++) { if (FD_ISSET(sharing_fds[i], &fds)) { if (nbd_trip(bs, sharing_fds[i], fd_size, dev_offset, - &offset, nbdflags, data, NBD_BUFFER_SIZE) != 0) { + nbdflags, data, NBD_BUFFER_SIZE) != 0) { close(sharing_fds[i]); nb_fds--; sharing_fds[i] = sharing_fds[nb_fds]; From 3777b09fd75ba746cf65d0ba30d73a5dd7a02d65 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 7 Oct 2011 14:35:58 +0200 Subject: [PATCH 12/26] qemu-nbd: remove data_size argument to nbd_trip The size of the buffer is in practice part of the protocol. Signed-off-by: Paolo Bonzini --- nbd.c | 6 +++--- nbd.h | 4 +++- qemu-nbd.c | 4 +--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/nbd.c b/nbd.c index 1df2b91bf2..d8cc331a0b 100644 --- a/nbd.c +++ b/nbd.c @@ -585,7 +585,7 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, uint32_t nbdflags, - uint8_t *data, int data_size) + uint8_t *data) { struct nbd_request request; struct nbd_reply reply; @@ -596,9 +596,9 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, if (nbd_receive_request(csock, &request) == -1) return -1; - if (request.len + NBD_REPLY_SIZE > data_size) { + if (request.len + NBD_REPLY_SIZE > NBD_BUFFER_SIZE) { LOG("len (%u) is larger than max len (%u)", - request.len + NBD_REPLY_SIZE, data_size); + request.len + NBD_REPLY_SIZE, NBD_BUFFER_SIZE); errno = EINVAL; return -1; } diff --git a/nbd.h b/nbd.h index ebdb2dbf41..dbc4c0d627 100644 --- a/nbd.h +++ b/nbd.h @@ -57,6 +57,8 @@ enum { #define NBD_DEFAULT_PORT 10809 +#define NBD_BUFFER_SIZE (1024*1024) + size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read); int tcp_socket_outgoing(const char *address, uint16_t port); int tcp_socket_incoming(const char *address, uint16_t port); @@ -72,7 +74,7 @@ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize); int nbd_send_request(int csock, struct nbd_request *request); int nbd_receive_reply(int csock, struct nbd_reply *reply); int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, - uint32_t nbdflags, uint8_t *data, int data_size); + uint32_t nbdflags, uint8_t *data); int nbd_client(int fd); int nbd_disconnect(int fd); diff --git a/qemu-nbd.c b/qemu-nbd.c index f9ee9c58d1..d662268c81 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -35,8 +35,6 @@ #define SOCKET_PATH "/var/lock/qemu-nbd-%s" -#define NBD_BUFFER_SIZE (1024*1024) - static int sigterm_wfd; static int verbose; static char *device; @@ -541,7 +539,7 @@ int main(int argc, char **argv) for (i = 1; i < nb_fds && ret; i++) { if (FD_ISSET(sharing_fds[i], &fds)) { if (nbd_trip(bs, sharing_fds[i], fd_size, dev_offset, - nbdflags, data, NBD_BUFFER_SIZE) != 0) { + nbdflags, data) != 0) { close(sharing_fds[i]); nb_fds--; sharing_fds[i] = sharing_fds[nb_fds]; From 128aa58947637b0989330c2e6a22a824d39e2193 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 21 Sep 2011 12:36:48 +0200 Subject: [PATCH 13/26] move corking functions to osdep.c Signed-off-by: Paolo Bonzini --- block/sheepdog.c | 20 ++------------------ osdep.c | 9 +++++++++ qemu_socket.h | 1 + 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/block/sheepdog.c b/block/sheepdog.c index 00ea5a0acc..17a79beb24 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -702,22 +702,6 @@ static int aio_flush_request(void *opaque) return !QLIST_EMPTY(&s->outstanding_aio_head); } -#if !defined(SOL_TCP) || !defined(TCP_CORK) - -static int set_cork(int fd, int v) -{ - return 0; -} - -#else - -static int set_cork(int fd, int v) -{ - return setsockopt(fd, SOL_TCP, TCP_CORK, &v, sizeof(v)); -} - -#endif - static int set_nodelay(int fd) { int ret, opt; @@ -923,7 +907,7 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, s->co_send = qemu_coroutine_self(); qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, aio_flush_request, NULL, s); - set_cork(s->fd, 1); + socket_set_cork(s->fd, 1); /* send a header */ ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); @@ -942,7 +926,7 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, } } - set_cork(s->fd, 0); + socket_set_cork(s->fd, 0); qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, aio_flush_request, NULL, s); qemu_co_mutex_unlock(&s->lock); diff --git a/osdep.c b/osdep.c index 70bad27f6e..3e6badac1e 100644 --- a/osdep.c +++ b/osdep.c @@ -48,6 +48,15 @@ extern int madvise(caddr_t, size_t, int); #include "trace.h" #include "qemu_socket.h" +int socket_set_cork(int fd, int v) +{ +#if defined(SOL_TCP) && defined(TCP_CORK) + return setsockopt(fd, SOL_TCP, TCP_CORK, &v, sizeof(v)); +#else + return 0; +#endif +} + int qemu_madvise(void *addr, size_t len, int advice) { if (advice == QEMU_MADV_INVALID) { diff --git a/qemu_socket.h b/qemu_socket.h index 9e32fac651..fe4cf6ca61 100644 --- a/qemu_socket.h +++ b/qemu_socket.h @@ -35,6 +35,7 @@ int inet_aton(const char *cp, struct in_addr *ia); /* misc helpers */ int qemu_socket(int domain, int type, int protocol); int qemu_accept(int s, struct sockaddr *addr, socklen_t *addrlen); +int socket_set_cork(int fd, int v); void socket_set_block(int fd); void socket_set_nonblock(int fd); int send_all(int fd, const void *buf, int len1); From a478f6e595dab1801931d56d097623d65f5b6d1d Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 13:48:43 +0200 Subject: [PATCH 14/26] qemu-nbd: simplify nbd_trip Use TCP_CORK to remove a violation of encapsulation, that would later require nbd_trip to know too much about an NBD reply. We could also switch to sendmsg (qemu_co_sendv) later, it is even easier once coroutines are in. Signed-off-by: Paolo Bonzini --- nbd.c | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/nbd.c b/nbd.c index d8cc331a0b..d383d854fe 100644 --- a/nbd.c +++ b/nbd.c @@ -596,9 +596,9 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, if (nbd_receive_request(csock, &request) == -1) return -1; - if (request.len + NBD_REPLY_SIZE > NBD_BUFFER_SIZE) { + if (request.len > NBD_BUFFER_SIZE) { LOG("len (%u) is larger than max len (%u)", - request.len + NBD_REPLY_SIZE, NBD_BUFFER_SIZE); + request.len, NBD_BUFFER_SIZE); errno = EINVAL; return -1; } @@ -629,8 +629,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, TRACE("Request type is READ"); ret = bdrv_read(bs, (request.from + dev_offset) / 512, - data + NBD_REPLY_SIZE, - request.len / 512); + data, request.len / 512); if (ret < 0) { LOG("reading from file failed"); reply.error = -ret; @@ -638,26 +637,18 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, } TRACE("Read %u byte(s)", request.len); - - /* Reply - [ 0 .. 3] magic (NBD_REPLY_MAGIC) - [ 4 .. 7] error (0 == no error) - [ 7 .. 15] handle - */ - - cpu_to_be32w((uint32_t*)data, NBD_REPLY_MAGIC); - cpu_to_be32w((uint32_t*)(data + 4), reply.error); - cpu_to_be64w((uint64_t*)(data + 8), reply.handle); + socket_set_cork(csock, 1); + if (nbd_send_reply(csock, &reply) == -1) + return -1; TRACE("Sending data to client"); - if (write_sync(csock, data, - request.len + NBD_REPLY_SIZE) != - request.len + NBD_REPLY_SIZE) { + if (write_sync(csock, data, request.len) != request.len) { LOG("writing to socket failed"); errno = EINVAL; return -1; } + socket_set_cork(csock, 0); break; case NBD_CMD_WRITE: TRACE("Request type is WRITE"); From 220455920384af77670e84953f3763153e39bcfa Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 14:25:30 +0200 Subject: [PATCH 15/26] qemu-nbd: introduce nbd_do_send_reply Group the sending of a reply and the associated data into a new function. Without corking, the caller would be forced to leave 12 free bytes at the beginning of the data pointer. Not too ugly, but still ugly. :) Using nbd_do_send_reply everywhere will help when the routine will set up the write handler that re-enters the send coroutine. Signed-off-by: Paolo Bonzini --- nbd.c | 46 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/nbd.c b/nbd.c index d383d854fe..025c5b0f1d 100644 --- a/nbd.c +++ b/nbd.c @@ -583,6 +583,34 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) return 0; } +static int nbd_do_send_reply(int csock, struct nbd_reply *reply, + uint8_t *data, int len) +{ + int rc, ret; + + if (!len) { + rc = nbd_send_reply(csock, reply); + if (rc == -1) { + rc = -errno; + } + } else { + socket_set_cork(csock, 1); + rc = nbd_send_reply(csock, reply); + if (rc != -1) { + ret = write_sync(csock, data, len); + if (ret != len) { + errno = EIO; + rc = -1; + } + } + if (rc == -1) { + rc = -errno; + } + socket_set_cork(csock, 0); + } + return rc; +} + int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, uint32_t nbdflags, uint8_t *data) @@ -637,18 +665,8 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, } TRACE("Read %u byte(s)", request.len); - socket_set_cork(csock, 1); - if (nbd_send_reply(csock, &reply) == -1) + if (nbd_do_send_reply(csock, &reply, data, request.len) < 0) return -1; - - TRACE("Sending data to client"); - - if (write_sync(csock, data, request.len) != request.len) { - LOG("writing to socket failed"); - errno = EINVAL; - return -1; - } - socket_set_cork(csock, 0); break; case NBD_CMD_WRITE: TRACE("Request type is WRITE"); @@ -684,7 +702,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, } } - if (nbd_send_reply(csock, &reply) == -1) + if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) return -1; break; case NBD_CMD_DISC: @@ -700,7 +718,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, reply.error = -ret; } - if (nbd_send_reply(csock, &reply) == -1) + if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) return -1; break; case NBD_CMD_TRIM: @@ -711,7 +729,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, LOG("discard failed"); reply.error = -ret; } - if (nbd_send_reply(csock, &reply) == -1) + if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) return -1; break; default: From fae69416294223d84db425e58e7bbf6a08c45801 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 16:04:36 +0200 Subject: [PATCH 16/26] qemu-nbd: more robust handling of invalid requests Fail invalid requests with EINVAL instead of dropping them into the void. Signed-off-by: Paolo Bonzini --- nbd.c | 57 ++++++++++++++++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/nbd.c b/nbd.c index 025c5b0f1d..053ad8d00d 100644 --- a/nbd.c +++ b/nbd.c @@ -624,18 +624,19 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, if (nbd_receive_request(csock, &request) == -1) return -1; + reply.handle = request.handle; + reply.error = 0; + if (request.len > NBD_BUFFER_SIZE) { LOG("len (%u) is larger than max len (%u)", request.len, NBD_BUFFER_SIZE); - errno = EINVAL; - return -1; + goto invalid_request; } if ((request.from + request.len) < request.from) { LOG("integer overflow detected! " "you're probably being attacked"); - errno = EINVAL; - return -1; + goto invalid_request; } if ((request.from + request.len) > size) { @@ -643,15 +644,11 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, ", Offset: %" PRIu64 "\n", request.from, request.len, (uint64_t)size, dev_offset); LOG("requested operation past EOF--bad client?"); - errno = EINVAL; - return -1; + goto invalid_request; } TRACE("Decoding type"); - reply.handle = request.handle; - reply.error = 0; - switch (request.type & NBD_CMD_MASK_COMMAND) { case NBD_CMD_READ: TRACE("Request type is READ"); @@ -661,7 +658,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, if (ret < 0) { LOG("reading from file failed"); reply.error = -ret; - request.len = 0; + goto error_reply; } TRACE("Read %u byte(s)", request.len); @@ -681,24 +678,26 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, if (nbdflags & NBD_FLAG_READ_ONLY) { TRACE("Server is read-only, return error"); - reply.error = 1; - } else { - TRACE("Writing to device"); + reply.error = EROFS; + goto error_reply; + } - ret = bdrv_write(bs, (request.from + dev_offset) / 512, - data, request.len / 512); + TRACE("Writing to device"); + + ret = bdrv_write(bs, (request.from + dev_offset) / 512, + data, request.len / 512); + if (ret < 0) { + LOG("writing to file failed"); + reply.error = -ret; + goto error_reply; + } + + if (request.type & NBD_CMD_FLAG_FUA) { + ret = bdrv_flush(bs); if (ret < 0) { - LOG("writing to file failed"); + LOG("flush failed"); reply.error = -ret; - request.len = 0; - } - - if (request.type & NBD_CMD_FLAG_FUA) { - ret = bdrv_flush(bs); - if (ret < 0) { - LOG("flush failed"); - reply.error = -ret; - } + goto error_reply; } } @@ -734,8 +733,12 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, break; default: LOG("invalid request type (%u) received", request.type); - errno = EINVAL; - return -1; + invalid_request: + reply.error = -EINVAL; + error_reply: + if (nbd_do_send_reply(csock, &reply, NULL, 0) == -1) + return -1; + break; } TRACE("Request/Reply complete"); From a030b347aaa84e2d1b42c9cb2bc9e2d8bec50046 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 15:07:54 +0200 Subject: [PATCH 17/26] qemu-nbd: introduce nbd_do_receive_request Group the receiving of a response and the associated data into a new function. Signed-off-by: Paolo Bonzini --- nbd.c | 68 +++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/nbd.c b/nbd.c index 053ad8d00d..964a732d5c 100644 --- a/nbd.c +++ b/nbd.c @@ -611,6 +611,47 @@ static int nbd_do_send_reply(int csock, struct nbd_reply *reply, return rc; } +static int nbd_do_receive_request(int csock, struct nbd_request *request, + uint8_t *data) +{ + int rc; + + if (nbd_receive_request(csock, request) == -1) { + rc = -EIO; + goto out; + } + + if (request->len > NBD_BUFFER_SIZE) { + LOG("len (%u) is larger than max len (%u)", + request->len, NBD_BUFFER_SIZE); + rc = -EINVAL; + goto out; + } + + if ((request->from + request->len) < request->from) { + LOG("integer overflow detected! " + "you're probably being attacked"); + rc = -EINVAL; + goto out; + } + + TRACE("Decoding type"); + + if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) { + TRACE("Reading %u byte(s)", request->len); + + if (read_sync(csock, data, request->len) != request->len) { + LOG("reading from socket failed"); + rc = -EIO; + goto out; + } + } + rc = 0; + +out: + return rc; +} + int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, uint32_t nbdflags, uint8_t *data) @@ -621,22 +662,17 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, TRACE("Reading request."); - if (nbd_receive_request(csock, &request) == -1) + ret = nbd_do_receive_request(csock, &request, data); + if (ret == -EIO) { return -1; + } reply.handle = request.handle; reply.error = 0; - if (request.len > NBD_BUFFER_SIZE) { - LOG("len (%u) is larger than max len (%u)", - request.len, NBD_BUFFER_SIZE); - goto invalid_request; - } - - if ((request.from + request.len) < request.from) { - LOG("integer overflow detected! " - "you're probably being attacked"); - goto invalid_request; + if (ret < 0) { + reply.error = -ret; + goto error_reply; } if ((request.from + request.len) > size) { @@ -647,8 +683,6 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, goto invalid_request; } - TRACE("Decoding type"); - switch (request.type & NBD_CMD_MASK_COMMAND) { case NBD_CMD_READ: TRACE("Request type is READ"); @@ -668,14 +702,6 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, case NBD_CMD_WRITE: TRACE("Request type is WRITE"); - TRACE("Reading %u byte(s)", request.len); - - if (read_sync(csock, data, request.len) != request.len) { - LOG("reading from socket failed"); - errno = EINVAL; - return -1; - } - if (nbdflags & NBD_FLAG_READ_ONLY) { TRACE("Server is read-only, return error"); reply.error = EROFS; From af49bbbe78b16673cea13fea7f4d7d3433bf2646 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 14:03:37 +0200 Subject: [PATCH 18/26] qemu-nbd: introduce NBDExport Wrap the common parameters of nbd_trip and nbd_negotiate in a single opaque struct. Signed-off-by: Paolo Bonzini --- nbd.c | 64 ++++++++++++++++++++++++++++++++++++++++-------------- nbd.h | 11 +++++++--- qemu-nbd.c | 15 +++++-------- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/nbd.c b/nbd.c index 964a732d5c..8d2a3bc113 100644 --- a/nbd.c +++ b/nbd.c @@ -18,6 +18,7 @@ #include "nbd.h" #include "block.h" +#include "block_int.h" #include #include @@ -186,7 +187,7 @@ int unix_socket_outgoing(const char *path) Request (type == 2) */ -int nbd_negotiate(int csock, off_t size, uint32_t flags) +static int nbd_send_negotiate(int csock, off_t size, uint32_t flags) { char buf[8 + 8 + 8 + 128]; @@ -583,6 +584,33 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) return 0; } +struct NBDExport { + BlockDriverState *bs; + off_t dev_offset; + off_t size; + uint8_t *data; + uint32_t nbdflags; +}; + +NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, + off_t size, uint32_t nbdflags) +{ + NBDExport *exp = g_malloc0(sizeof(NBDExport)); + exp->bs = bs; + exp->dev_offset = dev_offset; + exp->nbdflags = nbdflags; + exp->size = size == -1 ? exp->bs->total_sectors * 512 : size; + exp->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); + return exp; +} + +void nbd_export_close(NBDExport *exp) +{ + qemu_vfree(exp->data); + bdrv_close(exp->bs); + g_free(exp); +} + static int nbd_do_send_reply(int csock, struct nbd_reply *reply, uint8_t *data, int len) { @@ -652,9 +680,7 @@ out: return rc; } -int nbd_trip(BlockDriverState *bs, int csock, off_t size, - uint64_t dev_offset, uint32_t nbdflags, - uint8_t *data) +int nbd_trip(NBDExport *exp, int csock) { struct nbd_request request; struct nbd_reply reply; @@ -662,7 +688,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, TRACE("Reading request."); - ret = nbd_do_receive_request(csock, &request, data); + ret = nbd_do_receive_request(csock, &request, exp->data); if (ret == -EIO) { return -1; } @@ -675,10 +701,11 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, goto error_reply; } - if ((request.from + request.len) > size) { + if ((request.from + request.len) > exp->size) { LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64 ", Offset: %" PRIu64 "\n", - request.from, request.len, (uint64_t)size, dev_offset); + request.from, request.len, + (uint64_t)exp->size, exp->dev_offset); LOG("requested operation past EOF--bad client?"); goto invalid_request; } @@ -687,8 +714,8 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, case NBD_CMD_READ: TRACE("Request type is READ"); - ret = bdrv_read(bs, (request.from + dev_offset) / 512, - data, request.len / 512); + ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512, + exp->data, request.len / 512); if (ret < 0) { LOG("reading from file failed"); reply.error = -ret; @@ -696,13 +723,13 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, } TRACE("Read %u byte(s)", request.len); - if (nbd_do_send_reply(csock, &reply, data, request.len) < 0) + if (nbd_do_send_reply(csock, &reply, exp->data, request.len) < 0) return -1; break; case NBD_CMD_WRITE: TRACE("Request type is WRITE"); - if (nbdflags & NBD_FLAG_READ_ONLY) { + if (exp->nbdflags & NBD_FLAG_READ_ONLY) { TRACE("Server is read-only, return error"); reply.error = EROFS; goto error_reply; @@ -710,8 +737,8 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, TRACE("Writing to device"); - ret = bdrv_write(bs, (request.from + dev_offset) / 512, - data, request.len / 512); + ret = bdrv_write(exp->bs, (request.from + exp->dev_offset) / 512, + exp->data, request.len / 512); if (ret < 0) { LOG("writing to file failed"); reply.error = -ret; @@ -719,7 +746,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, } if (request.type & NBD_CMD_FLAG_FUA) { - ret = bdrv_flush(bs); + ret = bdrv_flush(exp->bs); if (ret < 0) { LOG("flush failed"); reply.error = -ret; @@ -737,7 +764,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, case NBD_CMD_FLUSH: TRACE("Request type is FLUSH"); - ret = bdrv_flush(bs); + ret = bdrv_flush(exp->bs); if (ret < 0) { LOG("flush failed"); reply.error = -ret; @@ -748,7 +775,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, break; case NBD_CMD_TRIM: TRACE("Request type is TRIM"); - ret = bdrv_discard(bs, (request.from + dev_offset) / 512, + ret = bdrv_discard(exp->bs, (request.from + exp->dev_offset) / 512, request.len / 512); if (ret < 0) { LOG("discard failed"); @@ -771,3 +798,8 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, return 0; } + +int nbd_negotiate(NBDExport *exp, int csock) +{ + return nbd_send_negotiate(csock, exp->size, exp->nbdflags); +} diff --git a/nbd.h b/nbd.h index dbc4c0d627..c77c2fd116 100644 --- a/nbd.h +++ b/nbd.h @@ -67,15 +67,20 @@ int tcp_socket_incoming_spec(const char *address_and_port); int unix_socket_outgoing(const char *path); int unix_socket_incoming(const char *path); -int nbd_negotiate(int csock, off_t size, uint32_t flags); int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, off_t *size, size_t *blocksize); int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize); int nbd_send_request(int csock, struct nbd_request *request); int nbd_receive_reply(int csock, struct nbd_reply *reply); -int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, - uint32_t nbdflags, uint8_t *data); int nbd_client(int fd); int nbd_disconnect(int fd); +typedef struct NBDExport NBDExport; + +NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, + off_t size, uint32_t nbdflags); +void nbd_export_close(NBDExport *exp); +int nbd_negotiate(NBDExport *exp, int csock); +int nbd_trip(NBDExport *exp, int csock); + #endif diff --git a/qemu-nbd.c b/qemu-nbd.c index d662268c81..d5ac75ed5c 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -36,6 +36,7 @@ #define SOCKET_PATH "/var/lock/qemu-nbd-%s" static int sigterm_wfd; +static NBDExport *exp; static int verbose; static char *device; static char *srcpath; @@ -280,7 +281,6 @@ int main(int argc, char **argv) int partition = -1; int ret; int shared = 1; - uint8_t *data; fd_set fds; int *sharing_fds; int fd; @@ -489,6 +489,7 @@ int main(int argc, char **argv) err(EXIT_FAILURE, "Could not find partition %d", partition); } + exp = nbd_export_new(bs, dev_offset, fd_size, nbdflags); sharing_fds = g_malloc((shared + 1) * sizeof(int)); if (sockpath) { @@ -516,11 +517,6 @@ int main(int argc, char **argv) max_fd = sharing_fds[0]; nb_fds++; - data = qemu_blockalign(bs, NBD_BUFFER_SIZE); - if (data == NULL) { - errx(EXIT_FAILURE, "Cannot allocate data buffer"); - } - do { FD_ZERO(&fds); FD_SET(sigterm_fd[0], &fds); @@ -538,8 +534,7 @@ int main(int argc, char **argv) ret--; for (i = 1; i < nb_fds && ret; i++) { if (FD_ISSET(sharing_fds[i], &fds)) { - if (nbd_trip(bs, sharing_fds[i], fd_size, dev_offset, - nbdflags, data) != 0) { + if (nbd_trip(exp, sharing_fds[i]) != 0) { close(sharing_fds[i]); nb_fds--; sharing_fds[i] = sharing_fds[nb_fds]; @@ -555,7 +550,7 @@ int main(int argc, char **argv) (struct sockaddr *)&addr, &addr_len); if (sharing_fds[nb_fds] != -1 && - nbd_negotiate(sharing_fds[nb_fds], fd_size, nbdflags) != -1) { + nbd_negotiate(exp, sharing_fds[nb_fds]) != -1) { if (sharing_fds[nb_fds] > max_fd) max_fd = sharing_fds[nb_fds]; nb_fds++; @@ -563,9 +558,9 @@ int main(int argc, char **argv) } } } while (persistent || nb_fds > 1); - qemu_vfree(data); close(sharing_fds[0]); + nbd_export_close(exp); g_free(sharing_fds); if (sockpath) { unlink(sockpath); From d9a73806585fd89f75f0c411839151863dac7f61 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 14:18:33 +0200 Subject: [PATCH 19/26] qemu-nbd: introduce NBDRequest Move the buffer from NBDExport to a new structure, so that it will be possible to have multiple in-flight requests for the same export (and for the same client too---we get that for free). Signed-off-by: Paolo Bonzini --- nbd.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/nbd.c b/nbd.c index 8d2a3bc113..82eb98db27 100644 --- a/nbd.c +++ b/nbd.c @@ -36,6 +36,7 @@ #endif #include "qemu_socket.h" +#include "qemu-queue.h" //#define DEBUG_NBD @@ -584,29 +585,60 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) return 0; } +typedef struct NBDRequest NBDRequest; + +struct NBDRequest { + QSIMPLEQ_ENTRY(NBDRequest) entry; + uint8_t *data; +}; + struct NBDExport { BlockDriverState *bs; off_t dev_offset; off_t size; - uint8_t *data; uint32_t nbdflags; + QSIMPLEQ_HEAD(, NBDRequest) requests; }; +static NBDRequest *nbd_request_get(NBDExport *exp) +{ + NBDRequest *req; + if (QSIMPLEQ_EMPTY(&exp->requests)) { + req = g_malloc0(sizeof(NBDRequest)); + req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); + } else { + req = QSIMPLEQ_FIRST(&exp->requests); + QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); + } + return req; +} + +static void nbd_request_put(NBDExport *exp, NBDRequest *req) +{ + QSIMPLEQ_INSERT_HEAD(&exp->requests, req, entry); +} + NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size, uint32_t nbdflags) { NBDExport *exp = g_malloc0(sizeof(NBDExport)); + QSIMPLEQ_INIT(&exp->requests); exp->bs = bs; exp->dev_offset = dev_offset; exp->nbdflags = nbdflags; exp->size = size == -1 ? exp->bs->total_sectors * 512 : size; - exp->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); return exp; } void nbd_export_close(NBDExport *exp) { - qemu_vfree(exp->data); + while (!QSIMPLEQ_EMPTY(&exp->requests)) { + NBDRequest *first = QSIMPLEQ_FIRST(&exp->requests); + QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); + qemu_vfree(first->data); + g_free(first); + } + bdrv_close(exp->bs); g_free(exp); } @@ -682,15 +714,17 @@ out: int nbd_trip(NBDExport *exp, int csock) { + NBDRequest *req = nbd_request_get(exp); struct nbd_request request; struct nbd_reply reply; + int rc = -1; int ret; TRACE("Reading request."); - ret = nbd_do_receive_request(csock, &request, exp->data); + ret = nbd_do_receive_request(csock, &request, req->data); if (ret == -EIO) { - return -1; + goto out; } reply.handle = request.handle; @@ -715,7 +749,7 @@ int nbd_trip(NBDExport *exp, int csock) TRACE("Request type is READ"); ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512, - exp->data, request.len / 512); + req->data, request.len / 512); if (ret < 0) { LOG("reading from file failed"); reply.error = -ret; @@ -723,8 +757,8 @@ int nbd_trip(NBDExport *exp, int csock) } TRACE("Read %u byte(s)", request.len); - if (nbd_do_send_reply(csock, &reply, exp->data, request.len) < 0) - return -1; + if (nbd_do_send_reply(csock, &reply, req->data, request.len) < 0) + goto out; break; case NBD_CMD_WRITE: TRACE("Request type is WRITE"); @@ -738,7 +772,7 @@ int nbd_trip(NBDExport *exp, int csock) TRACE("Writing to device"); ret = bdrv_write(exp->bs, (request.from + exp->dev_offset) / 512, - exp->data, request.len / 512); + req->data, request.len / 512); if (ret < 0) { LOG("writing to file failed"); reply.error = -ret; @@ -755,7 +789,7 @@ int nbd_trip(NBDExport *exp, int csock) } if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) - return -1; + goto out; break; case NBD_CMD_DISC: TRACE("Request type is DISCONNECT"); @@ -771,7 +805,7 @@ int nbd_trip(NBDExport *exp, int csock) } if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) - return -1; + goto out; break; case NBD_CMD_TRIM: TRACE("Request type is TRIM"); @@ -782,7 +816,7 @@ int nbd_trip(NBDExport *exp, int csock) reply.error = -ret; } if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) - return -1; + goto out; break; default: LOG("invalid request type (%u) received", request.type); @@ -790,13 +824,16 @@ int nbd_trip(NBDExport *exp, int csock) reply.error = -EINVAL; error_reply: if (nbd_do_send_reply(csock, &reply, NULL, 0) == -1) - return -1; + goto out; break; } TRACE("Request/Reply complete"); - return 0; + rc = 0; +out: + nbd_request_put(exp, req); + return rc; } int nbd_negotiate(NBDExport *exp, int csock) From cbcfa0418f0c196afa765f5c9837b9344d1adcf3 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Sep 2011 16:20:11 +0200 Subject: [PATCH 20/26] link the main loop and its dependencies into the tools Using the main loop code from QEMU enables tools to operate fully asynchronously. Advantages include better Windows portability (for some definition of portability) over glib's. Signed-off-by: Paolo Bonzini --- Makefile | 5 +++-- main-loop.h | 6 ++++++ os-posix.c | 42 --------------------------------------- os-win32.c | 5 ----- oslib-posix.c | 43 ++++++++++++++++++++++++++++++++++++++++ oslib-win32.c | 5 +++++ qemu-tool.c | 54 +++++++++++++++++++++++++++------------------------ 7 files changed, 86 insertions(+), 74 deletions(-) diff --git a/Makefile b/Makefile index 2c030552a1..368eeae9b0 100644 --- a/Makefile +++ b/Makefile @@ -147,8 +147,9 @@ endif qemu-img.o: qemu-img-cmds.h qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o qemu-ga.o: $(GENERATED_HEADERS) -tools-obj-y = qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \ - qemu-timer-common.o cutils.o +tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \ + qemu-timer-common.o main-loop.o notify.o iohandler.o cutils.o async.o +tools-obj-$(CONFIG_POSIX) += compatfd.o qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) diff --git a/main-loop.h b/main-loop.h index 876092dd15..f9710136c9 100644 --- a/main-loop.h +++ b/main-loop.h @@ -324,6 +324,9 @@ int qemu_add_child_watch(pid_t pid); * by threads other than the main loop thread when calling * qemu_bh_new(), qemu_set_fd_handler() and basically all other * functions documented in this file. + * + * NOTE: tools currently are single-threaded and qemu_mutex_lock_iothread + * is a no-op there. */ void qemu_mutex_lock_iothread(void); @@ -336,6 +339,9 @@ void qemu_mutex_lock_iothread(void); * as soon as possible by threads other than the main loop thread, * because it prevents the main loop from processing callbacks, * including timers and bottom halves. + * + * NOTE: tools currently are single-threaded and qemu_mutex_unlock_iothread + * is a no-op there. */ void qemu_mutex_unlock_iothread(void); diff --git a/os-posix.c b/os-posix.c index dc4a6bb3ff..5c437ca12c 100644 --- a/os-posix.c +++ b/os-posix.c @@ -42,11 +42,6 @@ #ifdef CONFIG_LINUX #include -#include -#endif - -#ifdef CONFIG_EVENTFD -#include #endif static struct passwd *user_pwd; @@ -333,34 +328,6 @@ void os_set_line_buffering(void) setvbuf(stdout, NULL, _IOLBF, 0); } -/* - * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set. - */ -int qemu_eventfd(int fds[2]) -{ -#ifdef CONFIG_EVENTFD - int ret; - - ret = eventfd(0, 0); - if (ret >= 0) { - fds[0] = ret; - qemu_set_cloexec(ret); - if ((fds[1] = dup(ret)) == -1) { - close(ret); - return -1; - } - qemu_set_cloexec(fds[1]); - return 0; - } - - if (errno != ENOSYS) { - return -1; - } -#endif - - return qemu_pipe(fds); -} - int qemu_create_pidfile(const char *filename) { char buffer[128]; @@ -384,12 +351,3 @@ int qemu_create_pidfile(const char *filename) close(fd); return 0; } - -int qemu_get_thread_id(void) -{ -#if defined (__linux__) - return syscall(SYS_gettid); -#else - return getpid(); -#endif -} diff --git a/os-win32.c b/os-win32.c index 8523d8d0c4..ad76370c7c 100644 --- a/os-win32.c +++ b/os-win32.c @@ -151,8 +151,3 @@ int qemu_create_pidfile(const char *filename) } return 0; } - -int qemu_get_thread_id(void) -{ - return GetCurrentThreadId(); -} diff --git a/oslib-posix.c b/oslib-posix.c index ce755496b5..b6a3c7fc55 100644 --- a/oslib-posix.c +++ b/oslib-posix.c @@ -55,6 +55,21 @@ static int running_on_valgrind = -1; #else # define running_on_valgrind 0 #endif +#ifdef CONFIG_LINUX +#include +#endif +#ifdef CONFIG_EVENTFD +#include +#endif + +int qemu_get_thread_id(void) +{ +#if defined(__linux__) + return syscall(SYS_gettid); +#else + return getpid(); +#endif +} int qemu_daemon(int nochdir, int noclose) { @@ -162,6 +177,34 @@ int qemu_pipe(int pipefd[2]) return ret; } +/* + * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set. + */ +int qemu_eventfd(int fds[2]) +{ +#ifdef CONFIG_EVENTFD + int ret; + + ret = eventfd(0, 0); + if (ret >= 0) { + fds[0] = ret; + fds[1] = dup(ret); + if (fds[1] == -1) { + close(ret); + return -1; + } + qemu_set_cloexec(ret); + qemu_set_cloexec(fds[1]); + return 0; + } + if (errno != ENOSYS) { + return -1; + } +#endif + + return qemu_pipe(fds); +} + int qemu_utimens(const char *path, const struct timespec *times) { struct timeval tv[2], tv_now; diff --git a/oslib-win32.c b/oslib-win32.c index 5e3de7dc8a..ce3021e6c7 100644 --- a/oslib-win32.c +++ b/oslib-win32.c @@ -118,3 +118,8 @@ int qemu_gettimeofday(qemu_timeval *tp) Do not set errno on error. */ return 0; } + +int qemu_get_thread_id(void) +{ + return GetCurrentThreadId(); +} diff --git a/qemu-tool.c b/qemu-tool.c index 5df7279745..226b6e890e 100644 --- a/qemu-tool.c +++ b/qemu-tool.c @@ -16,12 +16,12 @@ #include "qemu-timer.h" #include "qemu-log.h" #include "migration.h" +#include "main-loop.h" +#include "qemu_socket.h" +#include "slirp/libslirp.h" #include -QEMUClock *rt_clock; -QEMUClock *vm_clock; - FILE *logfile; struct QEMUBH @@ -57,41 +57,45 @@ void monitor_protocol_event(MonitorEvent event, QObject *data) { } -int qemu_set_fd_handler2(int fd, - IOCanReadHandler *fd_read_poll, - IOHandler *fd_read, - IOHandler *fd_write, - void *opaque) +int64 cpu_get_clock(void) { - return 0; + abort(); } -void qemu_notify_event(void) +int64 cpu_get_icount(void) +{ + abort(); +} + +void qemu_mutex_lock_iothread(void) { } -QEMUTimer *qemu_new_timer(QEMUClock *clock, int scale, - QEMUTimerCB *cb, void *opaque) -{ - return g_malloc(1); -} - -void qemu_free_timer(QEMUTimer *ts) -{ - g_free(ts); -} - -void qemu_del_timer(QEMUTimer *ts) +void qemu_mutex_unlock_iothread(void) { } -void qemu_mod_timer(QEMUTimer *ts, int64_t expire_time) +int use_icount; + +void qemu_clock_warp(QEMUClock *clock) { } -int64_t qemu_get_clock_ns(QEMUClock *clock) +static void __attribute__((constructor)) init_main_loop(void) +{ + init_clocks(); + init_timer_alarm(); + qemu_clock_enable(vm_clock, false); +} + +void slirp_select_fill(int *pnfds, fd_set *readfds, + fd_set *writefds, fd_set *xfds) +{ +} + +void slirp_select_poll(fd_set *readfds, fd_set *writefds, + fd_set *xfds, int select_error) { - return 0; } void migrate_add_blocker(Error *reason) From a61c67828dea7c64edaf226cadb45b4ffcc1d411 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 12 Sep 2011 17:28:11 +0200 Subject: [PATCH 21/26] qemu-nbd: use common main loop Using a single main loop for sockets will help yielding from the socket coroutine back to the main loop, and later reentering it. Signed-off-by: Paolo Bonzini --- qemu-nbd.c | 112 +++++++++++++++++++++-------------------------------- 1 file changed, 45 insertions(+), 67 deletions(-) diff --git a/qemu-nbd.c b/qemu-nbd.c index d5ac75ed5c..347c776ab9 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -35,12 +35,15 @@ #define SOCKET_PATH "/var/lock/qemu-nbd-%s" -static int sigterm_wfd; static NBDExport *exp; static int verbose; static char *device; static char *srcpath; static char *sockpath; +static bool sigterm_reported; +static bool nbd_started; +static int shared = 1; +static int nb_fds; static void usage(const char *name) { @@ -169,10 +172,8 @@ static int find_partition(BlockDriverState *bs, int partition, static void termsig_handler(int signum) { - static int sigterm_reported; - if (!sigterm_reported) { - sigterm_reported = (write(sigterm_wfd, "", 1) == 1); - } + sigterm_reported = true; + qemu_notify_event(); } static void *show_parts(void *arg) @@ -243,6 +244,36 @@ out: return (void *) EXIT_FAILURE; } +static int nbd_can_accept(void *opaque) +{ + return nb_fds < shared; +} + +static void nbd_read(void *opaque) +{ + int fd = (uintptr_t) opaque; + + if (nbd_trip(exp, fd) != 0) { + qemu_set_fd_handler2(fd, NULL, NULL, NULL, NULL); + close(fd); + nb_fds--; + } +} + +static void nbd_accept(void *opaque) +{ + int server_fd = (uintptr_t) opaque; + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + + int fd = accept(server_fd, (struct sockaddr *)&addr, &addr_len); + nbd_started = true; + if (fd != -1 && nbd_negotiate(exp, fd) != -1) { + qemu_set_fd_handler2(fd, NULL, nbd_read, NULL, (void *) (intptr_t) fd); + nb_fds++; + } +} + int main(int argc, char **argv) { BlockDriverState *bs; @@ -251,8 +282,6 @@ int main(int argc, char **argv) bool disconnect = false; const char *bindto = "0.0.0.0"; int port = NBD_DEFAULT_PORT; - struct sockaddr_in addr; - socklen_t addr_len = sizeof(addr); off_t fd_size; const char *sopt = "hVb:o:p:rsnP:c:dvk:e:t"; struct option lopt[] = { @@ -280,13 +309,7 @@ int main(int argc, char **argv) int flags = BDRV_O_RDWR; int partition = -1; int ret; - int shared = 1; - fd_set fds; - int *sharing_fds; int fd; - int i; - int nb_fds = 0; - int max_fd; int persistent = 0; pthread_t client_thread; @@ -294,12 +317,6 @@ int main(int argc, char **argv) * handler ensures that "qemu-nbd -v -c" exits with a nice status code. */ struct sigaction sa_sigterm; - int sigterm_fd[2]; - if (qemu_pipe(sigterm_fd) == -1) { - err(EXIT_FAILURE, "Error setting up communication pipe"); - } - - sigterm_wfd = sigterm_fd[1]; memset(&sa_sigterm, 0, sizeof(sa_sigterm)); sa_sigterm.sa_handler = termsig_handler; sigaction(SIGTERM, &sa_sigterm, NULL); @@ -490,16 +507,16 @@ int main(int argc, char **argv) } exp = nbd_export_new(bs, dev_offset, fd_size, nbdflags); - sharing_fds = g_malloc((shared + 1) * sizeof(int)); if (sockpath) { - sharing_fds[0] = unix_socket_incoming(sockpath); + fd = unix_socket_incoming(sockpath); } else { - sharing_fds[0] = tcp_socket_incoming(bindto, port); + fd = tcp_socket_incoming(bindto, port); } - if (sharing_fds[0] == -1) + if (fd == -1) { return 1; + } if (device) { int ret; @@ -514,54 +531,15 @@ int main(int argc, char **argv) memset(&client_thread, 0, sizeof(client_thread)); } - max_fd = sharing_fds[0]; - nb_fds++; + qemu_init_main_loop(); + qemu_set_fd_handler2(fd, nbd_can_accept, nbd_accept, NULL, + (void *)(uintptr_t)fd); do { - FD_ZERO(&fds); - FD_SET(sigterm_fd[0], &fds); - for (i = 0; i < nb_fds; i++) - FD_SET(sharing_fds[i], &fds); + main_loop_wait(false); + } while (!sigterm_reported && (persistent || !nbd_started || nb_fds > 0)); - do { - ret = select(max_fd + 1, &fds, NULL, NULL, NULL); - } while (ret == -1 && errno == EINTR); - if (ret == -1 || FD_ISSET(sigterm_fd[0], &fds)) { - break; - } - - if (FD_ISSET(sharing_fds[0], &fds)) - ret--; - for (i = 1; i < nb_fds && ret; i++) { - if (FD_ISSET(sharing_fds[i], &fds)) { - if (nbd_trip(exp, sharing_fds[i]) != 0) { - close(sharing_fds[i]); - nb_fds--; - sharing_fds[i] = sharing_fds[nb_fds]; - i--; - } - ret--; - } - } - /* new connection ? */ - if (FD_ISSET(sharing_fds[0], &fds)) { - if (nb_fds < shared + 1) { - sharing_fds[nb_fds] = accept(sharing_fds[0], - (struct sockaddr *)&addr, - &addr_len); - if (sharing_fds[nb_fds] != -1 && - nbd_negotiate(exp, sharing_fds[nb_fds]) != -1) { - if (sharing_fds[nb_fds] > max_fd) - max_fd = sharing_fds[nb_fds]; - nb_fds++; - } - } - } - } while (persistent || nb_fds > 1); - - close(sharing_fds[0]); nbd_export_close(exp); - g_free(sharing_fds); if (sockpath) { unlink(sockpath); } From 1743b515860ef645b285908ee367c5e343e0020c Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 14:33:23 +0200 Subject: [PATCH 22/26] qemu-nbd: move client handling to nbd.c This patch sets up the fd handler in nbd.c instead of qemu-nbd.c. It introduces NBDClient, which wraps the arguments to nbd_trip in a single structure, so that we can add a notifier to it. This way, qemu-nbd can know about disconnections. Signed-off-by: Paolo Bonzini --- nbd.c | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++--- nbd.h | 5 +++-- qemu-nbd.c | 14 ++++-------- 3 files changed, 66 insertions(+), 15 deletions(-) diff --git a/nbd.c b/nbd.c index 82eb98db27..4822843b8a 100644 --- a/nbd.c +++ b/nbd.c @@ -600,6 +600,37 @@ struct NBDExport { QSIMPLEQ_HEAD(, NBDRequest) requests; }; +struct NBDClient { + int refcount; + void (*close)(NBDClient *client); + + NBDExport *exp; + int sock; +}; + +static void nbd_client_get(NBDClient *client) +{ + client->refcount++; +} + +static void nbd_client_put(NBDClient *client) +{ + if (--client->refcount == 0) { + g_free(client); + } +} + +static void nbd_client_close(NBDClient *client) +{ + qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL); + close(client->sock); + client->sock = -1; + if (client->close) { + client->close(client); + } + nbd_client_put(client); +} + static NBDRequest *nbd_request_get(NBDExport *exp) { NBDRequest *req; @@ -712,9 +743,11 @@ out: return rc; } -int nbd_trip(NBDExport *exp, int csock) +static int nbd_trip(NBDClient *client) { + NBDExport *exp = client->exp; NBDRequest *req = nbd_request_get(exp); + int csock = client->sock; struct nbd_request request; struct nbd_reply reply; int rc = -1; @@ -836,7 +869,30 @@ out: return rc; } -int nbd_negotiate(NBDExport *exp, int csock) +static void nbd_read(void *opaque) { - return nbd_send_negotiate(csock, exp->size, exp->nbdflags); + NBDClient *client = opaque; + + nbd_client_get(client); + if (nbd_trip(client) != 0) { + nbd_client_close(client); + } + + nbd_client_put(client); +} + +NBDClient *nbd_client_new(NBDExport *exp, int csock, + void (*close)(NBDClient *)) +{ + NBDClient *client; + if (nbd_send_negotiate(csock, exp->size, exp->nbdflags) == -1) { + return NULL; + } + client = g_malloc0(sizeof(NBDClient)); + client->refcount = 1; + client->exp = exp; + client->sock = csock; + client->close = close; + qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); + return client; } diff --git a/nbd.h b/nbd.h index c77c2fd116..a8382f096c 100644 --- a/nbd.h +++ b/nbd.h @@ -76,11 +76,12 @@ int nbd_client(int fd); int nbd_disconnect(int fd); typedef struct NBDExport NBDExport; +typedef struct NBDClient NBDClient; NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size, uint32_t nbdflags); void nbd_export_close(NBDExport *exp); -int nbd_negotiate(NBDExport *exp, int csock); -int nbd_trip(NBDExport *exp, int csock); +NBDClient *nbd_client_new(NBDExport *exp, int csock, + void (*close)(NBDClient *)); #endif diff --git a/qemu-nbd.c b/qemu-nbd.c index 347c776ab9..155b05840b 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -249,15 +249,10 @@ static int nbd_can_accept(void *opaque) return nb_fds < shared; } -static void nbd_read(void *opaque) +static void nbd_client_closed(NBDClient *client) { - int fd = (uintptr_t) opaque; - - if (nbd_trip(exp, fd) != 0) { - qemu_set_fd_handler2(fd, NULL, NULL, NULL, NULL); - close(fd); - nb_fds--; - } + nb_fds--; + qemu_notify_event(); } static void nbd_accept(void *opaque) @@ -268,8 +263,7 @@ static void nbd_accept(void *opaque) int fd = accept(server_fd, (struct sockaddr *)&addr, &addr_len); nbd_started = true; - if (fd != -1 && nbd_negotiate(exp, fd) != -1) { - qemu_set_fd_handler2(fd, NULL, nbd_read, NULL, (void *) (intptr_t) fd); + if (fd != -1 && nbd_client_new(exp, fd, nbd_client_closed)) { nb_fds++; } } From 72deddc5e61e974fbf561dc704742f102b91de82 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Fri, 7 Oct 2011 16:47:56 +0200 Subject: [PATCH 23/26] qemu-nbd: add client pointer to NBDRequest By attaching a client to an NBDRequest, we can avoid passing around the socket descriptor and data buffer. Also, we can now manage the reference count for the client in nbd_request_get/put request instead of having to do it ourselved in nbd_read. This simplifies things when coroutines are used. Signed-off-by: Paolo Bonzini --- nbd.c | 48 +++++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/nbd.c b/nbd.c index 4822843b8a..ca18c10a19 100644 --- a/nbd.c +++ b/nbd.c @@ -589,6 +589,7 @@ typedef struct NBDRequest NBDRequest; struct NBDRequest { QSIMPLEQ_ENTRY(NBDRequest) entry; + NBDClient *client; uint8_t *data; }; @@ -631,9 +632,11 @@ static void nbd_client_close(NBDClient *client) nbd_client_put(client); } -static NBDRequest *nbd_request_get(NBDExport *exp) +static NBDRequest *nbd_request_get(NBDClient *client) { NBDRequest *req; + NBDExport *exp = client->exp; + if (QSIMPLEQ_EMPTY(&exp->requests)) { req = g_malloc0(sizeof(NBDRequest)); req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); @@ -641,12 +644,16 @@ static NBDRequest *nbd_request_get(NBDExport *exp) req = QSIMPLEQ_FIRST(&exp->requests); QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); } + nbd_client_get(client); + req->client = client; return req; } -static void nbd_request_put(NBDExport *exp, NBDRequest *req) +static void nbd_request_put(NBDRequest *req) { - QSIMPLEQ_INSERT_HEAD(&exp->requests, req, entry); + NBDClient *client = req->client; + QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry); + nbd_client_put(client); } NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, @@ -674,9 +681,11 @@ void nbd_export_close(NBDExport *exp) g_free(exp); } -static int nbd_do_send_reply(int csock, struct nbd_reply *reply, - uint8_t *data, int len) +static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply, + int len) { + NBDClient *client = req->client; + int csock = client->sock; int rc, ret; if (!len) { @@ -688,7 +697,7 @@ static int nbd_do_send_reply(int csock, struct nbd_reply *reply, socket_set_cork(csock, 1); rc = nbd_send_reply(csock, reply); if (rc != -1) { - ret = write_sync(csock, data, len); + ret = write_sync(csock, req->data, len); if (ret != len) { errno = EIO; rc = -1; @@ -702,9 +711,10 @@ static int nbd_do_send_reply(int csock, struct nbd_reply *reply, return rc; } -static int nbd_do_receive_request(int csock, struct nbd_request *request, - uint8_t *data) +static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request) { + NBDClient *client = req->client; + int csock = client->sock; int rc; if (nbd_receive_request(csock, request) == -1) { @@ -731,7 +741,7 @@ static int nbd_do_receive_request(int csock, struct nbd_request *request, if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) { TRACE("Reading %u byte(s)", request->len); - if (read_sync(csock, data, request->len) != request->len) { + if (read_sync(csock, req->data, request->len) != request->len) { LOG("reading from socket failed"); rc = -EIO; goto out; @@ -745,9 +755,8 @@ out: static int nbd_trip(NBDClient *client) { + NBDRequest *req = nbd_request_get(client); NBDExport *exp = client->exp; - NBDRequest *req = nbd_request_get(exp); - int csock = client->sock; struct nbd_request request; struct nbd_reply reply; int rc = -1; @@ -755,7 +764,7 @@ static int nbd_trip(NBDClient *client) TRACE("Reading request."); - ret = nbd_do_receive_request(csock, &request, req->data); + ret = nbd_do_receive_request(req, &request); if (ret == -EIO) { goto out; } @@ -790,7 +799,7 @@ static int nbd_trip(NBDClient *client) } TRACE("Read %u byte(s)", request.len); - if (nbd_do_send_reply(csock, &reply, req->data, request.len) < 0) + if (nbd_do_send_reply(req, &reply, request.len) < 0) goto out; break; case NBD_CMD_WRITE: @@ -821,7 +830,7 @@ static int nbd_trip(NBDClient *client) } } - if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) + if (nbd_do_send_reply(req, &reply, 0) < 0) goto out; break; case NBD_CMD_DISC: @@ -837,7 +846,7 @@ static int nbd_trip(NBDClient *client) reply.error = -ret; } - if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) + if (nbd_do_send_reply(req, &reply, 0) < 0) goto out; break; case NBD_CMD_TRIM: @@ -848,7 +857,7 @@ static int nbd_trip(NBDClient *client) LOG("discard failed"); reply.error = -ret; } - if (nbd_do_send_reply(csock, &reply, NULL, 0) < 0) + if (nbd_do_send_reply(req, &reply, 0) < 0) goto out; break; default: @@ -856,7 +865,7 @@ static int nbd_trip(NBDClient *client) invalid_request: reply.error = -EINVAL; error_reply: - if (nbd_do_send_reply(csock, &reply, NULL, 0) == -1) + if (nbd_do_send_reply(req, &reply, 0) == -1) goto out; break; } @@ -865,7 +874,7 @@ static int nbd_trip(NBDClient *client) rc = 0; out: - nbd_request_put(exp, req); + nbd_request_put(req); return rc; } @@ -873,12 +882,9 @@ static void nbd_read(void *opaque) { NBDClient *client = opaque; - nbd_client_get(client); if (nbd_trip(client) != 0) { nbd_client_close(client); } - - nbd_client_put(client); } NBDClient *nbd_client_new(NBDExport *exp, int csock, From 262db38871b9a2613761cc5f05c4cf697e246a68 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 15:19:27 +0200 Subject: [PATCH 24/26] qemu-nbd: asynchronous operation Using coroutines enable asynchronous operation on both the network and the block side. Network can be owned by two coroutines at the same time, one writing and one reading. On the send side, mutual exclusion is guaranteed by a CoMutex. On the receive side, mutual exclusion is guaranteed because new coroutines immediately start receiving data, and no new coroutines are created as long as the previous one is receiving. Between receive and send, qemu-nbd can have an arbitrary number of in-flight block transfers. Throttling is implemented by the next patch. Signed-off-by: Paolo Bonzini --- nbd.c | 74 ++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/nbd.c b/nbd.c index ca18c10a19..6d7d1f8d59 100644 --- a/nbd.c +++ b/nbd.c @@ -20,6 +20,8 @@ #include "block.h" #include "block_int.h" +#include "qemu-coroutine.h" + #include #include #ifndef _WIN32 @@ -607,6 +609,11 @@ struct NBDClient { NBDExport *exp; int sock; + + Coroutine *recv_coroutine; + + CoMutex send_lock; + Coroutine *send_coroutine; }; static void nbd_client_get(NBDClient *client) @@ -681,13 +688,20 @@ void nbd_export_close(NBDExport *exp) g_free(exp); } -static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply, +static void nbd_read(void *opaque); +static void nbd_restart_write(void *opaque); + +static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, int len) { NBDClient *client = req->client; int csock = client->sock; int rc, ret; + qemu_co_mutex_lock(&client->send_lock); + qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client); + client->send_coroutine = qemu_coroutine_self(); + if (!len) { rc = nbd_send_reply(csock, reply); if (rc == -1) { @@ -697,7 +711,7 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply, socket_set_cork(csock, 1); rc = nbd_send_reply(csock, reply); if (rc != -1) { - ret = write_sync(csock, req->data, len); + ret = qemu_co_send(csock, req->data, len); if (ret != len) { errno = EIO; rc = -1; @@ -708,15 +722,20 @@ static int nbd_do_send_reply(NBDRequest *req, struct nbd_reply *reply, } socket_set_cork(csock, 0); } + + client->send_coroutine = NULL; + qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); + qemu_co_mutex_unlock(&client->send_lock); return rc; } -static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request) +static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request) { NBDClient *client = req->client; int csock = client->sock; int rc; + client->recv_coroutine = qemu_coroutine_self(); if (nbd_receive_request(csock, request) == -1) { rc = -EIO; goto out; @@ -741,7 +760,7 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request) if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) { TRACE("Reading %u byte(s)", request->len); - if (read_sync(csock, req->data, request->len) != request->len) { + if (qemu_co_recv(csock, req->data, request->len) != request->len) { LOG("reading from socket failed"); rc = -EIO; goto out; @@ -750,21 +769,22 @@ static int nbd_do_receive_request(NBDRequest *req, struct nbd_request *request) rc = 0; out: + client->recv_coroutine = NULL; return rc; } -static int nbd_trip(NBDClient *client) +static void nbd_trip(void *opaque) { + NBDClient *client = opaque; NBDRequest *req = nbd_request_get(client); NBDExport *exp = client->exp; struct nbd_request request; struct nbd_reply reply; - int rc = -1; int ret; TRACE("Reading request."); - ret = nbd_do_receive_request(req, &request); + ret = nbd_co_receive_request(req, &request); if (ret == -EIO) { goto out; } @@ -799,7 +819,7 @@ static int nbd_trip(NBDClient *client) } TRACE("Read %u byte(s)", request.len); - if (nbd_do_send_reply(req, &reply, request.len) < 0) + if (nbd_co_send_reply(req, &reply, request.len) < 0) goto out; break; case NBD_CMD_WRITE: @@ -822,7 +842,7 @@ static int nbd_trip(NBDClient *client) } if (request.type & NBD_CMD_FLAG_FUA) { - ret = bdrv_flush(exp->bs); + ret = bdrv_co_flush(exp->bs); if (ret < 0) { LOG("flush failed"); reply.error = -ret; @@ -830,34 +850,34 @@ static int nbd_trip(NBDClient *client) } } - if (nbd_do_send_reply(req, &reply, 0) < 0) + if (nbd_co_send_reply(req, &reply, 0) < 0) goto out; break; case NBD_CMD_DISC: TRACE("Request type is DISCONNECT"); errno = 0; - return 1; + goto out; case NBD_CMD_FLUSH: TRACE("Request type is FLUSH"); - ret = bdrv_flush(exp->bs); + ret = bdrv_co_flush(exp->bs); if (ret < 0) { LOG("flush failed"); reply.error = -ret; } - if (nbd_do_send_reply(req, &reply, 0) < 0) + if (nbd_co_send_reply(req, &reply, 0) < 0) goto out; break; case NBD_CMD_TRIM: TRACE("Request type is TRIM"); - ret = bdrv_discard(exp->bs, (request.from + exp->dev_offset) / 512, - request.len / 512); + ret = bdrv_co_discard(exp->bs, (request.from + exp->dev_offset) / 512, + request.len / 512); if (ret < 0) { LOG("discard failed"); reply.error = -ret; } - if (nbd_do_send_reply(req, &reply, 0) < 0) + if (nbd_co_send_reply(req, &reply, 0) < 0) goto out; break; default: @@ -865,28 +885,39 @@ static int nbd_trip(NBDClient *client) invalid_request: reply.error = -EINVAL; error_reply: - if (nbd_do_send_reply(req, &reply, 0) == -1) + if (nbd_co_send_reply(req, &reply, 0) == -1) goto out; break; } TRACE("Request/Reply complete"); - rc = 0; + nbd_request_put(req); + return; + out: nbd_request_put(req); - return rc; + nbd_client_close(client); } static void nbd_read(void *opaque) { NBDClient *client = opaque; - if (nbd_trip(client) != 0) { - nbd_client_close(client); + if (client->recv_coroutine) { + qemu_coroutine_enter(client->recv_coroutine, NULL); + } else { + qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client); } } +static void nbd_restart_write(void *opaque) +{ + NBDClient *client = opaque; + + qemu_coroutine_enter(client->send_coroutine, NULL); +} + NBDClient *nbd_client_new(NBDExport *exp, int csock, void (*close)(NBDClient *)) { @@ -899,6 +930,7 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock, client->exp = exp; client->sock = csock; client->close = close; + qemu_co_mutex_init(&client->send_lock); qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); return client; } From 41996e3803119541d43bfa59060024a22b803342 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 19 Sep 2011 15:25:40 +0200 Subject: [PATCH 25/26] qemu-nbd: throttle requests Limiting the number of in-flight requests is implemented very simply with a can_read callback. It does not require a semaphore, unlike the client side in block/nbd.c, because we can throttle directly the creation of coroutines. The client side can have a coroutine created at any time when an I/O request is made. Signed-off-by: Paolo Bonzini --- nbd.c | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/nbd.c b/nbd.c index 6d7d1f8d59..567e94e27a 100644 --- a/nbd.c +++ b/nbd.c @@ -587,6 +587,8 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) return 0; } +#define MAX_NBD_REQUESTS 16 + typedef struct NBDRequest NBDRequest; struct NBDRequest { @@ -614,6 +616,8 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; + + int nb_requests; }; static void nbd_client_get(NBDClient *client) @@ -644,6 +648,9 @@ static NBDRequest *nbd_request_get(NBDClient *client) NBDRequest *req; NBDExport *exp = client->exp; + assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); + client->nb_requests++; + if (QSIMPLEQ_EMPTY(&exp->requests)) { req = g_malloc0(sizeof(NBDRequest)); req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); @@ -660,6 +667,9 @@ static void nbd_request_put(NBDRequest *req) { NBDClient *client = req->client; QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry); + if (client->nb_requests-- == MAX_NBD_REQUESTS) { + qemu_notify_event(); + } nbd_client_put(client); } @@ -688,6 +698,7 @@ void nbd_export_close(NBDExport *exp) g_free(exp); } +static int nbd_can_read(void *opaque); static void nbd_read(void *opaque); static void nbd_restart_write(void *opaque); @@ -699,7 +710,8 @@ static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, int rc, ret; qemu_co_mutex_lock(&client->send_lock); - qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client); + qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, + nbd_restart_write, client); client->send_coroutine = qemu_coroutine_self(); if (!len) { @@ -724,7 +736,7 @@ static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, } client->send_coroutine = NULL; - qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); + qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); qemu_co_mutex_unlock(&client->send_lock); return rc; } @@ -900,6 +912,13 @@ out: nbd_client_close(client); } +static int nbd_can_read(void *opaque) +{ + NBDClient *client = opaque; + + return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS; +} + static void nbd_read(void *opaque) { NBDClient *client = opaque; @@ -931,6 +950,6 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock, client->sock = csock; client->close = close; qemu_co_mutex_init(&client->send_lock); - qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client); + qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); return client; } From 44f76b289a33399abedfbca2d92d21d910792264 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Thu, 22 Dec 2011 11:39:19 +0100 Subject: [PATCH 26/26] nbd: add myself as maintainer Not planning to do much else, hence listing it as "Odd Fixes". Signed-off-by: Paolo Bonzini --- MAINTAINERS | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/MAINTAINERS b/MAINTAINERS index e22bfa1a30..764c92dab6 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -473,6 +473,13 @@ M: Mark McLoughlin S: Maintained F: net/ +Network Block Device (NBD) +M: Paolo Bonzini +S: Odd Fixes +F: block/nbd.c +F: nbd.* +F: qemu-nbd.c + SLIRP M: Jan Kiszka S: Maintained