qemu-char: make writes thread-safe

This will let threads other than the I/O thread raise QMP events.

GIOChannel is thread-safe, and send and receive state is usually
well-separated.  The only driver that requires some care is the
pty driver, where some of the state is shared by the read and write
sides.  That state is protected with the chr_write_lock too.

Reviewed-by: Fam Zheng <famz@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Luiz Capitulino <lcapitulino@redhat.com>
This commit is contained in:
Paolo Bonzini 2014-06-18 08:43:58 +02:00 committed by Luiz Capitulino
parent 1bb7fe725c
commit 9005b2a758
2 changed files with 53 additions and 16 deletions

View file

@ -54,6 +54,7 @@ typedef struct {
typedef void IOEventHandler(void *opaque, int event); typedef void IOEventHandler(void *opaque, int event);
struct CharDriverState { struct CharDriverState {
QemuMutex chr_write_lock;
void (*init)(struct CharDriverState *s); void (*init)(struct CharDriverState *s);
int (*chr_write)(struct CharDriverState *s, const uint8_t *buf, int len); int (*chr_write)(struct CharDriverState *s, const uint8_t *buf, int len);
int (*chr_sync_read)(struct CharDriverState *s, int (*chr_sync_read)(struct CharDriverState *s,
@ -164,6 +165,7 @@ void qemu_chr_fe_event(CharDriverState *s, int event);
* @qemu_chr_fe_printf: * @qemu_chr_fe_printf:
* *
* Write to a character backend using a printf style interface. * Write to a character backend using a printf style interface.
* This function is thread-safe.
* *
* @fmt see #printf * @fmt see #printf
*/ */
@ -176,8 +178,9 @@ int qemu_chr_fe_add_watch(CharDriverState *s, GIOCondition cond,
/** /**
* @qemu_chr_fe_write: * @qemu_chr_fe_write:
* *
* Write data to a character backend from the front end. This function will * Write data to a character backend from the front end. This function
* send data from the front end to the back end. * will send data from the front end to the back end. This function
* is thread-safe.
* *
* @buf the data * @buf the data
* @len the number of bytes to send * @len the number of bytes to send
@ -192,7 +195,7 @@ int qemu_chr_fe_write(CharDriverState *s, const uint8_t *buf, int len);
* Write data to a character backend from the front end. This function will * Write data to a character backend from the front end. This function will
* send data from the front end to the back end. Unlike @qemu_chr_fe_write, * send data from the front end to the back end. Unlike @qemu_chr_fe_write,
* this function will block if the back end cannot consume all of the data * this function will block if the back end cannot consume all of the data
* attempted to be written. * attempted to be written. This function is thread-safe.
* *
* @buf the data * @buf the data
* @len the number of bytes to send * @len the number of bytes to send
@ -216,7 +219,7 @@ int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf, int len);
/** /**
* @qemu_chr_fe_ioctl: * @qemu_chr_fe_ioctl:
* *
* Issue a device specific ioctl to a backend. * Issue a device specific ioctl to a backend. This function is thread-safe.
* *
* @cmd see CHR_IOCTL_* * @cmd see CHR_IOCTL_*
* @arg the data associated with @cmd * @arg the data associated with @cmd

View file

@ -121,7 +121,12 @@ void qemu_chr_be_generic_open(CharDriverState *s)
int qemu_chr_fe_write(CharDriverState *s, const uint8_t *buf, int len) int qemu_chr_fe_write(CharDriverState *s, const uint8_t *buf, int len)
{ {
return s->chr_write(s, buf, len); int ret;
qemu_mutex_lock(&s->chr_write_lock);
ret = s->chr_write(s, buf, len);
qemu_mutex_unlock(&s->chr_write_lock);
return ret;
} }
int qemu_chr_fe_write_all(CharDriverState *s, const uint8_t *buf, int len) int qemu_chr_fe_write_all(CharDriverState *s, const uint8_t *buf, int len)
@ -129,6 +134,7 @@ int qemu_chr_fe_write_all(CharDriverState *s, const uint8_t *buf, int len)
int offset = 0; int offset = 0;
int res; int res;
qemu_mutex_lock(&s->chr_write_lock);
while (offset < len) { while (offset < len) {
do { do {
res = s->chr_write(s, buf + offset, len - offset); res = s->chr_write(s, buf + offset, len - offset);
@ -137,17 +143,17 @@ int qemu_chr_fe_write_all(CharDriverState *s, const uint8_t *buf, int len)
} }
} while (res == -1 && errno == EAGAIN); } while (res == -1 && errno == EAGAIN);
if (res == 0) { if (res <= 0) {
break; break;
} }
if (res < 0) {
return res;
}
offset += res; offset += res;
} }
qemu_mutex_unlock(&s->chr_write_lock);
if (res < 0) {
return res;
}
return offset; return offset;
} }
@ -315,11 +321,14 @@ typedef struct {
int prod[MAX_MUX]; int prod[MAX_MUX];
int cons[MAX_MUX]; int cons[MAX_MUX];
int timestamps; int timestamps;
/* Protected by the CharDriverState chr_write_lock. */
int linestart; int linestart;
int64_t timestamps_start; int64_t timestamps_start;
} MuxDriver; } MuxDriver;
/* Called with chr_write_lock held. */
static int mux_chr_write(CharDriverState *chr, const uint8_t *buf, int len) static int mux_chr_write(CharDriverState *chr, const uint8_t *buf, int len)
{ {
MuxDriver *d = chr->opaque; MuxDriver *d = chr->opaque;
@ -865,6 +874,7 @@ typedef struct FDCharDriver {
QTAILQ_ENTRY(FDCharDriver) node; QTAILQ_ENTRY(FDCharDriver) node;
} FDCharDriver; } FDCharDriver;
/* Called with chr_write_lock held. */
static int fd_chr_write(CharDriverState *chr, const uint8_t *buf, int len) static int fd_chr_write(CharDriverState *chr, const uint8_t *buf, int len)
{ {
FDCharDriver *s = chr->opaque; FDCharDriver *s = chr->opaque;
@ -1064,12 +1074,14 @@ static CharDriverState *qemu_chr_open_stdio(ChardevStdio *opts)
typedef struct { typedef struct {
GIOChannel *fd; GIOChannel *fd;
int connected;
int read_bytes; int read_bytes;
/* Protected by the CharDriverState chr_write_lock. */
int connected;
guint timer_tag; guint timer_tag;
} PtyCharDriver; } PtyCharDriver;
static void pty_chr_update_read_handler(CharDriverState *chr); static void pty_chr_update_read_handler_locked(CharDriverState *chr);
static void pty_chr_state(CharDriverState *chr, int connected); static void pty_chr_state(CharDriverState *chr, int connected);
static gboolean pty_chr_timer(gpointer opaque) static gboolean pty_chr_timer(gpointer opaque)
@ -1077,14 +1089,17 @@ static gboolean pty_chr_timer(gpointer opaque)
struct CharDriverState *chr = opaque; struct CharDriverState *chr = opaque;
PtyCharDriver *s = chr->opaque; PtyCharDriver *s = chr->opaque;
qemu_mutex_lock(&chr->chr_write_lock);
s->timer_tag = 0; s->timer_tag = 0;
if (!s->connected) { if (!s->connected) {
/* Next poll ... */ /* Next poll ... */
pty_chr_update_read_handler(chr); pty_chr_update_read_handler_locked(chr);
} }
qemu_mutex_unlock(&chr->chr_write_lock);
return FALSE; return FALSE;
} }
/* Called with chr_write_lock held. */
static void pty_chr_rearm_timer(CharDriverState *chr, int ms) static void pty_chr_rearm_timer(CharDriverState *chr, int ms)
{ {
PtyCharDriver *s = chr->opaque; PtyCharDriver *s = chr->opaque;
@ -1101,7 +1116,8 @@ static void pty_chr_rearm_timer(CharDriverState *chr, int ms)
} }
} }
static void pty_chr_update_read_handler(CharDriverState *chr) /* Called with chr_write_lock held. */
static void pty_chr_update_read_handler_locked(CharDriverState *chr)
{ {
PtyCharDriver *s = chr->opaque; PtyCharDriver *s = chr->opaque;
GPollFD pfd; GPollFD pfd;
@ -1117,13 +1133,21 @@ static void pty_chr_update_read_handler(CharDriverState *chr)
} }
} }
static void pty_chr_update_read_handler(CharDriverState *chr)
{
qemu_mutex_lock(&chr->chr_write_lock);
pty_chr_update_read_handler_locked(chr);
qemu_mutex_unlock(&chr->chr_write_lock);
}
/* Called with chr_write_lock held. */
static int pty_chr_write(CharDriverState *chr, const uint8_t *buf, int len) static int pty_chr_write(CharDriverState *chr, const uint8_t *buf, int len)
{ {
PtyCharDriver *s = chr->opaque; PtyCharDriver *s = chr->opaque;
if (!s->connected) { if (!s->connected) {
/* guest sends data, check for (re-)connect */ /* guest sends data, check for (re-)connect */
pty_chr_update_read_handler(chr); pty_chr_update_read_handler_locked(chr);
return 0; return 0;
} }
return io_channel_send(s->fd, buf, len); return io_channel_send(s->fd, buf, len);
@ -1169,6 +1193,7 @@ static gboolean pty_chr_read(GIOChannel *chan, GIOCondition cond, void *opaque)
return TRUE; return TRUE;
} }
/* Called with chr_write_lock held. */
static void pty_chr_state(CharDriverState *chr, int connected) static void pty_chr_state(CharDriverState *chr, int connected)
{ {
PtyCharDriver *s = chr->opaque; PtyCharDriver *s = chr->opaque;
@ -1659,9 +1684,12 @@ static CharDriverState *qemu_chr_open_pp_fd(int fd)
typedef struct { typedef struct {
int max_size; int max_size;
HANDLE hcom, hrecv, hsend; HANDLE hcom, hrecv, hsend;
OVERLAPPED orecv, osend; OVERLAPPED orecv;
BOOL fpipe; BOOL fpipe;
DWORD len; DWORD len;
/* Protected by the CharDriverState chr_write_lock. */
OVERLAPPED osend;
} WinCharState; } WinCharState;
typedef struct { typedef struct {
@ -1771,6 +1799,7 @@ static int win_chr_init(CharDriverState *chr, const char *filename)
return -1; return -1;
} }
/* Called with chr_write_lock held. */
static int win_chr_write(CharDriverState *chr, const uint8_t *buf, int len1) static int win_chr_write(CharDriverState *chr, const uint8_t *buf, int len1)
{ {
WinCharState *s = chr->opaque; WinCharState *s = chr->opaque;
@ -2213,6 +2242,7 @@ typedef struct {
int max_size; int max_size;
} NetCharDriver; } NetCharDriver;
/* Called with chr_write_lock held. */
static int udp_chr_write(CharDriverState *chr, const uint8_t *buf, int len) static int udp_chr_write(CharDriverState *chr, const uint8_t *buf, int len)
{ {
NetCharDriver *s = chr->opaque; NetCharDriver *s = chr->opaque;
@ -2403,6 +2433,7 @@ static int unix_send_msgfds(CharDriverState *chr, const uint8_t *buf, int len)
} }
#endif #endif
/* Called with chr_write_lock held. */
static int tcp_chr_write(CharDriverState *chr, const uint8_t *buf, int len) static int tcp_chr_write(CharDriverState *chr, const uint8_t *buf, int len)
{ {
TCPCharDriver *s = chr->opaque; TCPCharDriver *s = chr->opaque;
@ -3000,6 +3031,7 @@ static size_t ringbuf_count(const CharDriverState *chr)
return d->prod - d->cons; return d->prod - d->cons;
} }
/* Called with chr_write_lock held. */
static int ringbuf_chr_write(CharDriverState *chr, const uint8_t *buf, int len) static int ringbuf_chr_write(CharDriverState *chr, const uint8_t *buf, int len)
{ {
RingBufCharDriver *d = chr->opaque; RingBufCharDriver *d = chr->opaque;
@ -3024,9 +3056,11 @@ static int ringbuf_chr_read(CharDriverState *chr, uint8_t *buf, int len)
RingBufCharDriver *d = chr->opaque; RingBufCharDriver *d = chr->opaque;
int i; int i;
qemu_mutex_lock(&chr->chr_write_lock);
for (i = 0; i < len && d->cons != d->prod; i++) { for (i = 0; i < len && d->cons != d->prod; i++) {
buf[i] = d->cbuf[d->cons++ & (d->size - 1)]; buf[i] = d->cbuf[d->cons++ & (d->size - 1)];
} }
qemu_mutex_unlock(&chr->chr_write_lock);
return i; return i;
} }