Merge io/ 2017-01-23

-----BEGIN PGP SIGNATURE-----
 
 iQIcBAABCAAGBQJYhieeAAoJEL6G67QVEE/fFyQP/0+/WnEgLhiluHnY5DX3NsIL
 dCH5dAfixbeamNH2/wTOc2V7VJuqHU7cclGgK874k5IZutxNR2fW5TQ0Cl8MXbu8
 egtvEBwKfDXUhvgSRT3iVCbvRw+MTEBjbOJdV2Gkm7qPGxmQDTaYve8qfgeYVtat
 qdDF0YRBy4aSmcAqII6qyoVIIwDf8xuL5wZHgL+0AtDPj7PP3xsGz6tUOWX+pjdF
 lFoDsWT0ldMlFwfEA/JS+Sq8XAPKpAoCGEZqpqMpiR11mvnb93Bj7igniiw0c18u
 ZZhtS4UYWr3fc8Xcya2MRkgPXexXfnHLx6+QYoBH9jwNhsRR/s6++y2gaK9Gt5bs
 g7cK0m7Aea77bj9xNDkei0LOQNBTlFuxIVwDe0JZtgS9FkINx9Di5/AqnHS0g6Go
 varNEkGphu1ZiB38ZKBxtE9CnJDNy9X2jeiU5owNPRjwmoCKA+pOpKiVjA3nGFmN
 OBn5U1as3xKwLGmvcWHp1ZErHaoeKDF6DM69ZV7As130vckDAZ64z4OdoslLYs7k
 DuTohwspVGIF11JHXvhaYsafTt0YKOfH/3ndNJiG4CdB3PC4b15EzpjZbVGZp5DB
 RBbNWyypeJjz6d3qajTV6LuKgfAf6BMo3ayVHS1pkNpWgU1BX2zWReF9O+KFKSII
 1F4ZQ/KKqJ8d2RMW/TK6
 =oHBV
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/berrange/tags/pull-qio-2017-01-23-2' into staging

Merge io/ 2017-01-23

# gpg: Signature made Mon 23 Jan 2017 15:56:14 GMT
# gpg:                using RSA key 0xBE86EBB415104FDF
# gpg: Good signature from "Daniel P. Berrange <dan@berrange.com>"
# gpg:                 aka "Daniel P. Berrange <berrange@redhat.com>"
# Primary key fingerprint: DAF3 A6FD B26B 6291 2D0E  8E3F BE86 EBB4 1510 4FDF

* remotes/berrange/tags/pull-qio-2017-01-23-2:
  io: introduce a DNS resolver API
  io: remove Error parameter from QIOTask thread worker
  io: change the QIOTask callback signature
  io: add ability to associate an error with a task
  io: add ability to associate an opaque "result" with with a task
  io: fix typo in docs for QIOTask
  io: stop incrementing reference in qio_task_get_source
  sockets: add ability to disable DNS resolution for InetSocketAddress

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2017-01-23 15:59:09 +00:00
commit 3879284d65
22 changed files with 760 additions and 169 deletions

228
include/io/dns-resolver.h Normal file
View file

@ -0,0 +1,228 @@
/*
* QEMU DNS resolver
*
* Copyright (c) 2016-2017 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef QIO_DNS_RESOLVER_H
#define QIO_DNS_RESOLVER_H
#include "qemu-common.h"
#include "qom/object.h"
#include "io/task.h"
#define TYPE_QIO_DNS_RESOLVER "qio-dns-resolver"
#define QIO_DNS_RESOLVER(obj) \
OBJECT_CHECK(QIODNSResolver, (obj), TYPE_QIO_DNS_RESOLVER)
#define QIO_DNS_RESOLVER_CLASS(klass) \
OBJECT_CLASS_CHECK(QIODNSResolverClass, klass, TYPE_QIO_DNS_RESOLVER)
#define QIO_DNS_RESOLVER_GET_CLASS(obj) \
OBJECT_GET_CLASS(QIODNSResolverClass, obj, TYPE_QIO_DNS_RESOLVER)
typedef struct QIODNSResolver QIODNSResolver;
typedef struct QIODNSResolverClass QIODNSResolverClass;
/**
* QIODNSResolver:
*
* The QIODNSResolver class provides a framework for doing
* DNS resolution on SocketAddress objects, independently
* of socket creation.
*
* <example>
* <title>Resolving addresses synchronously</title>
* <programlisting>
* int mylisten(SocketAddress *addr, Error **errp) {
* QIODNSResolver *resolver = qio_dns_resolver_get_instance();
* SocketAddress **rawaddrs = NULL;
* size_t nrawaddrs = 0;
* Error *err = NULL;
* QIOChannel **socks = NULL;
* size_t nsocks = 0;
*
* if (qio_dns_resolver_lookup_sync(dns, addr, &nrawaddrs,
* &rawaddrs, errp) < 0) {
* return -1;
* }
*
* for (i = 0; i < nrawaddrs; i++) {
* QIOChannel *sock = qio_channel_new();
* Error *local_err = NULL;
* qio_channel_listen_sync(sock, rawaddrs[i], &local_err);
* if (local_err) {
* error_propagate(&err, local_err);
* } else {
* socks = g_renew(QIOChannelSocket *, socks, nsocks + 1);
* socks[nsocks++] = sock;
* }
* qapi_free_SocketAddress(rawaddrs[i]);
* }
* g_free(rawaddrs);
*
* if (nsocks == 0) {
* error_propagate(errp, err);
* } else {
* error_free(err);
* }
* }
* </programlisting>
* </example>
*
* <example>
* <title>Resolving addresses asynchronously</title>
* <programlisting>
* typedef struct MyListenData {
* Error *err;
* QIOChannelSocket **socks;
* size_t nsocks;
* } MyListenData;
*
* void mylistenresult(QIOTask *task, void *opaque) {
* MyListenData *data = opaque;
* QIODNSResolver *resolver =
* QIO_DNS_RESOLVER(qio_task_get_source(task);
* SocketAddress **rawaddrs = NULL;
* size_t nrawaddrs = 0;
* Error *err = NULL;
*
* if (qio_task_propagate_error(task, &data->err)) {
* return;
* }
*
* qio_dns_resolver_lookup_result(resolver, task,
* &nrawaddrs, &rawaddrs);
*
* for (i = 0; i < nrawaddrs; i++) {
* QIOChannel *sock = qio_channel_new();
* Error *local_err = NULL;
* qio_channel_listen_sync(sock, rawaddrs[i], &local_err);
* if (local_err) {
* error_propagate(&err, local_err);
* } else {
* socks = g_renew(QIOChannelSocket *, socks, nsocks + 1);
* socks[nsocks++] = sock;
* }
* qapi_free_SocketAddress(rawaddrs[i]);
* }
* g_free(rawaddrs);
*
* if (nsocks == 0) {
* error_propagate(&data->err, err);
* } else {
* error_free(err);
* }
* }
*
* void mylisten(SocketAddress *addr, MyListenData *data) {
* QIODNSResolver *resolver = qio_dns_resolver_get_instance();
* qio_dns_resolver_lookup_async(dns, addr,
* mylistenresult, data, NULL);
* }
* </programlisting>
* </example>
*/
struct QIODNSResolver {
Object parent;
};
struct QIODNSResolverClass {
ObjectClass parent;
};
/**
* qio_dns_resolver_get_instance:
*
* Get the singleton dns resolver instance. The caller
* does not own a reference on the returned object.
*
* Returns: the single dns resolver instance
*/
QIODNSResolver *qio_dns_resolver_get_instance(void);
/**
* qio_dns_resolver_lookup_sync:
* @resolver: the DNS resolver instance
* @addr: the address to resolve
* @naddr: pointer to hold number of resolved addresses
* @addrs: pointer to hold resolved addresses
* @errp: pointer to NULL initialized error object
*
* This will attempt to resolve the address provided
* in @addr. If resolution succeeds, @addrs will be filled
* with all the resolved addresses. @naddrs will specify
* the number of entries allocated in @addrs. The caller
* is responsible for freeing each entry in @addrs, as
* well as @addrs itself. @naddrs is guaranteed to be
* greater than zero on success.
*
* DNS resolution will be done synchronously so execution
* of the caller may be blocked for an arbitrary length
* of time.
*
* Returns: 0 if resolution was successful, -1 on error
*/
int qio_dns_resolver_lookup_sync(QIODNSResolver *resolver,
SocketAddress *addr,
size_t *naddrs,
SocketAddress ***addrs,
Error **errp);
/**
* qio_dns_resolver_lookup_async:
* @resolver: the DNS resolver instance
* @addr: the address to resolve
* @func: the callback to invoke on lookup completion
* @opaque: data blob to pass to @func
* @notify: the callback to free @opaque, or NULL
*
* This will attempt to resolve the address provided
* in @addr. The callback @func will be invoked when
* resolution has either completed or failed. On
* success, the @func should call the method
* qio_dns_resolver_lookup_result() to obtain the
* results.
*
* DNS resolution will be done asynchronously so execution
* of the caller will not be blocked.
*/
void qio_dns_resolver_lookup_async(QIODNSResolver *resolver,
SocketAddress *addr,
QIOTaskFunc func,
gpointer opaque,
GDestroyNotify notify);
/**
* qio_dns_resolver_lookup_result:
* @resolver: the DNS resolver instance
* @task: the task object to get results for
* @naddr: pointer to hold number of resolved addresses
* @addrs: pointer to hold resolved addresses
*
* This method should be called from the callback passed
* to qio_dns_resolver_lookup_async() in order to obtain
* results. @addrs will be filled with all the resolved
* addresses. @naddrs will specify the number of entries
* allocated in @addrs. The caller is responsible for
* freeing each entry in @addrs, as well as @addrs itself.
*/
void qio_dns_resolver_lookup_result(QIODNSResolver *resolver,
QIOTask *task,
size_t *naddrs,
SocketAddress ***addrs);
#endif /* QIO_DNS_RESOLVER_H */

View file

@ -26,13 +26,11 @@
typedef struct QIOTask QIOTask;
typedef void (*QIOTaskFunc)(Object *source,
Error *err,
typedef void (*QIOTaskFunc)(QIOTask *task,
gpointer opaque);
typedef int (*QIOTaskWorker)(QIOTask *task,
Error **errp,
gpointer opaque);
typedef void (*QIOTaskWorker)(QIOTask *task,
gpointer opaque);
/**
* QIOTask:
@ -44,12 +42,12 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* a public API which accepts a task callback:
*
* <example>
* <title>Task callback function signature</title>
* <title>Task function signature</title>
* <programlisting>
* void myobject_operation(QMyObject *obj,
* QIOTaskFunc *func,
* gpointer opaque,
* GDestroyNotify *notify);
* GDestroyNotify notify);
* </programlisting>
* </example>
*
@ -57,17 +55,41 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* is data to pass to it. The optional 'notify' function is used
* to free 'opaque' when no longer needed.
*
* Now, lets say the implementation of this method wants to set
* a timer to run once a second checking for completion of some
* activity. It would do something like
* When the operation completes, the 'func' callback will be
* invoked, allowing the calling code to determine the result
* of the operation. An example QIOTaskFunc implementation may
* look like
*
* <example>
* <title>Task callback function implementation</title>
* <title>Task callback implementation</title>
* <programlisting>
* static void myobject_operation_notify(QIOTask *task,
* gpointer opaque)
* {
* Error *err = NULL;
* if (qio_task_propagate_error(task, &err)) {
* ...deal with the failure...
* error_free(err);
* } else {
* QMyObject *src = QMY_OBJECT(qio_task_get_source(task));
* ...deal with the completion...
* }
* }
* </programlisting>
* </example>
*
* Now, lets say the implementation of the method using the
* task wants to set a timer to run once a second checking
* for completion of some activity. It would do something
* like
*
* <example>
* <title>Task function implementation</title>
* <programlisting>
* void myobject_operation(QMyObject *obj,
* QIOTaskFunc *func,
* gpointer opaque,
* GDestroyNotify *notify)
* GDestroyNotify notify)
* {
* QIOTask *task;
*
@ -102,8 +124,8 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
*
* ...check something important...
* if (err) {
* qio_task_abort(task, err);
* error_free(task);
* qio_task_set_error(task, err);
* qio_task_complete(task);
* return FALSE;
* } else if (...work is completed ...) {
* qio_task_complete(task);
@ -115,6 +137,10 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* </programlisting>
* </example>
*
* The 'qio_task_complete' call in this method will trigger
* the callback func 'myobject_operation_notify' shown
* earlier to deal with the results.
*
* Once this function returns false, object_unref will be called
* automatically on the task causing it to be released and the
* ref on QMyObject dropped too.
@ -136,25 +162,23 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* socket listen using QIOTask would require:
*
* <example>
* static int myobject_listen_worker(QIOTask *task,
* Error **errp,
* gpointer opaque)
* static void myobject_listen_worker(QIOTask *task,
* gpointer opaque)
* {
* QMyObject obj = QMY_OBJECT(qio_task_get_source(task));
* SocketAddress *addr = opaque;
* Error *err = NULL;
*
* obj->fd = socket_listen(addr, errp);
* if (obj->fd < 0) {
* return -1;
* }
* return 0;
* obj->fd = socket_listen(addr, &err);
*
qio_task_set_error(task, err);
* }
*
* void myobject_listen_async(QMyObject *obj,
* SocketAddress *addr,
* QIOTaskFunc *func,
* gpointer opaque,
* GDestroyNotify *notify)
* GDestroyNotify notify)
* {
* QIOTask *task;
* SocketAddress *addrCopy;
@ -187,8 +211,8 @@ typedef int (*QIOTaskWorker)(QIOTask *task,
* 'err' attribute in the task object to determine if
* the operation was successful or not.
*
* The returned task will be released when one of
* qio_task_abort() or qio_task_complete() are invoked.
* The returned task will be released when qio_task_complete()
* is invoked.
*
* Returns: the task struct
*/
@ -204,10 +228,8 @@ QIOTask *qio_task_new(Object *source,
* @opaque: opaque data to pass to @worker
* @destroy: function to free @opaque
*
* Run a task in a background thread. If @worker
* returns 0 it will call qio_task_complete() in
* the main event thread context. If @worker
* returns -1 it will call qio_task_abort() in
* Run a task in a background thread. When @worker
* returns it will call qio_task_complete() in
* the main event thread context.
*/
void qio_task_run_in_thread(QIOTask *task,
@ -219,24 +241,69 @@ void qio_task_run_in_thread(QIOTask *task,
* qio_task_complete:
* @task: the task struct
*
* Mark the operation as successfully completed
* and free the memory for @task.
* Invoke the completion callback for @task and
* then free its memory.
*/
void qio_task_complete(QIOTask *task);
/**
* qio_task_abort:
* qio_task_set_error:
* @task: the task struct
* @err: the error to record for the operation
* @err: pointer to the error, or NULL
*
* Mark the operation as failed, with @err providing
* details about the failure. The @err may be freed
* afer the function returns, as the notification
* callback is invoked synchronously. The @task will
* be freed when this call completes.
* Associate an error with the task, which can later
* be retrieved with the qio_task_propagate_error()
* method. This method takes ownership of @err, so
* it is not valid to access it after this call
* completes. If @err is NULL this is a no-op. If
* this is call multiple times, only the first
* provided @err will be recorded, later ones will
* be discarded and freed.
*/
void qio_task_abort(QIOTask *task,
Error *err);
void qio_task_set_error(QIOTask *task,
Error *err);
/**
* qio_task_propagate_error:
* @task: the task struct
* @errp: pointer to a NULL-initialized error object
*
* Propagate the error associated with @task
* into @errp.
*
* Returns: true if an error was propagated, false otherwise
*/
bool qio_task_propagate_error(QIOTask *task,
Error **errp);
/**
* qio_task_set_result_pointer:
* @task: the task struct
* @result: pointer to the result data
*
* Associate an opaque result with the task,
* which can later be retrieved with the
* qio_task_get_result_pointer() method
*
*/
void qio_task_set_result_pointer(QIOTask *task,
gpointer result,
GDestroyNotify notify);
/**
* qio_task_get_result_pointer:
* @task: the task struct
*
* Retrieve the opaque result data associated
* with the task, if any.
*
* Returns: the task result, or NULL
*/
gpointer qio_task_get_result_pointer(QIOTask *task);
/**
@ -244,9 +311,10 @@ void qio_task_abort(QIOTask *task,
* @task: the task struct
*
* Get the source object associated with the background
* task. This returns a new reference to the object,
* which the caller must released with object_unref()
* when no longer required.
* task. The caller does not own a reference on the
* returned Object, and so should call object_ref()
* if it wants to keep the object pointer outside the
* lifetime of the QIOTask object.
*
* Returns: the source object
*/

View file

@ -32,6 +32,8 @@ int socket_set_fast_reuse(int fd);
*/
typedef void NonBlockingConnectHandler(int fd, Error *err, void *opaque);
int inet_ai_family_from_address(InetSocketAddress *addr,
Error **errp);
InetSocketAddress *inet_parse(const char *str, Error **errp);
int inet_connect(const char *str, Error **errp);
int inet_connect_saddr(InetSocketAddress *saddr, Error **errp,

View file

@ -7,4 +7,5 @@ io-obj-y += channel-tls.o
io-obj-y += channel-watch.o
io-obj-y += channel-websock.o
io-obj-y += channel-util.o
io-obj-y += dns-resolver.o
io-obj-y += task.o

View file

@ -156,20 +156,16 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
}
static int qio_channel_socket_connect_worker(QIOTask *task,
Error **errp,
gpointer opaque)
static void qio_channel_socket_connect_worker(QIOTask *task,
gpointer opaque)
{
QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
SocketAddress *addr = opaque;
int ret;
Error *err = NULL;
ret = qio_channel_socket_connect_sync(ioc,
addr,
errp);
qio_channel_socket_connect_sync(ioc, addr, &err);
object_unref(OBJECT(ioc));
return ret;
qio_task_set_error(task, err);
}
@ -219,20 +215,16 @@ int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
}
static int qio_channel_socket_listen_worker(QIOTask *task,
Error **errp,
gpointer opaque)
static void qio_channel_socket_listen_worker(QIOTask *task,
gpointer opaque)
{
QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
SocketAddress *addr = opaque;
int ret;
Error *err = NULL;
ret = qio_channel_socket_listen_sync(ioc,
addr,
errp);
qio_channel_socket_listen_sync(ioc, addr, &err);
object_unref(OBJECT(ioc));
return ret;
qio_task_set_error(task, err);
}
@ -295,22 +287,18 @@ static void qio_channel_socket_dgram_worker_free(gpointer opaque)
g_free(data);
}
static int qio_channel_socket_dgram_worker(QIOTask *task,
Error **errp,
gpointer opaque)
static void qio_channel_socket_dgram_worker(QIOTask *task,
gpointer opaque)
{
QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
struct QIOChannelSocketDGramWorkerData *data = opaque;
int ret;
Error *err = NULL;
/* socket_dgram() blocks in DNS lookups, so we must use a thread */
ret = qio_channel_socket_dgram_sync(ioc,
data->localAddr,
data->remoteAddr,
errp);
qio_channel_socket_dgram_sync(ioc, data->localAddr,
data->remoteAddr, &err);
object_unref(OBJECT(ioc));
return ret;
qio_task_set_error(task, err);
}

View file

@ -153,8 +153,9 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
if (qcrypto_tls_session_handshake(ioc->session, &err) < 0) {
trace_qio_channel_tls_handshake_fail(ioc);
qio_task_abort(task, err);
goto cleanup;
qio_task_set_error(task, err);
qio_task_complete(task);
return;
}
status = qcrypto_tls_session_get_handshake_status(ioc->session);
@ -163,10 +164,10 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
if (qcrypto_tls_session_check_credentials(ioc->session,
&err) < 0) {
trace_qio_channel_tls_credentials_deny(ioc);
qio_task_abort(task, err);
goto cleanup;
qio_task_set_error(task, err);
} else {
trace_qio_channel_tls_credentials_allow(ioc);
}
trace_qio_channel_tls_credentials_allow(ioc);
qio_task_complete(task);
} else {
GIOCondition condition;
@ -183,9 +184,6 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
task,
NULL);
}
cleanup:
error_free(err);
}
@ -200,8 +198,6 @@ static gboolean qio_channel_tls_handshake_io(QIOChannel *ioc,
qio_channel_tls_handshake_task(
tioc, task);
object_unref(OBJECT(tioc));
return FALSE;
}

View file

@ -279,8 +279,8 @@ static gboolean qio_channel_websock_handshake_send(QIOChannel *ioc,
if (ret < 0) {
trace_qio_channel_websock_handshake_fail(ioc);
qio_task_abort(task, err);
error_free(err);
qio_task_set_error(task, err);
qio_task_complete(task);
return FALSE;
}
@ -307,8 +307,8 @@ static gboolean qio_channel_websock_handshake_io(QIOChannel *ioc,
ret = qio_channel_websock_handshake_read(wioc, &err);
if (ret < 0) {
trace_qio_channel_websock_handshake_fail(ioc);
qio_task_abort(task, err);
error_free(err);
qio_task_set_error(task, err);
qio_task_complete(task);
return FALSE;
}
if (ret == 0) {

276
io/dns-resolver.c Normal file
View file

@ -0,0 +1,276 @@
/*
* QEMU DNS resolver
*
* Copyright (c) 2016 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*
*/
#include "qemu/osdep.h"
#include "io/dns-resolver.h"
#include "qapi/clone-visitor.h"
#include "qemu/sockets.h"
#include "qapi/error.h"
#include "qemu/cutils.h"
#ifndef AI_NUMERICSERV
# define AI_NUMERICSERV 0
#endif
static QIODNSResolver *instance;
static GOnce instance_init = G_ONCE_INIT;
static gpointer qio_dns_resolve_init_instance(gpointer unused G_GNUC_UNUSED)
{
instance = QIO_DNS_RESOLVER(object_new(TYPE_QIO_DNS_RESOLVER));
return NULL;
}
QIODNSResolver *qio_dns_resolver_get_instance(void)
{
g_once(&instance_init, qio_dns_resolve_init_instance, NULL);
return instance;
}
static int qio_dns_resolver_lookup_sync_inet(QIODNSResolver *resolver,
SocketAddress *addr,
size_t *naddrs,
SocketAddress ***addrs,
Error **errp)
{
struct addrinfo ai, *res, *e;
InetSocketAddress *iaddr = addr->u.inet.data;
char port[33];
char uaddr[INET6_ADDRSTRLEN + 1];
char uport[33];
int rc;
Error *err = NULL;
size_t i;
*naddrs = 0;
*addrs = NULL;
memset(&ai, 0, sizeof(ai));
ai.ai_flags = AI_PASSIVE;
if (iaddr->has_numeric && iaddr->numeric) {
ai.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
}
ai.ai_family = inet_ai_family_from_address(iaddr, &err);
ai.ai_socktype = SOCK_STREAM;
if (err) {
error_propagate(errp, err);
return -1;
}
if (iaddr->host == NULL) {
error_setg(errp, "host not specified");
return -1;
}
if (iaddr->port != NULL) {
pstrcpy(port, sizeof(port), iaddr->port);
} else {
port[0] = '\0';
}
rc = getaddrinfo(strlen(iaddr->host) ? iaddr->host : NULL,
strlen(port) ? port : NULL, &ai, &res);
if (rc != 0) {
error_setg(errp, "address resolution failed for %s:%s: %s",
iaddr->host, port, gai_strerror(rc));
return -1;
}
for (e = res; e != NULL; e = e->ai_next) {
(*naddrs)++;
}
*addrs = g_new0(SocketAddress *, *naddrs);
/* create socket + bind */
for (i = 0, e = res; e != NULL; i++, e = e->ai_next) {
SocketAddress *newaddr = g_new0(SocketAddress, 1);
InetSocketAddress *newiaddr = g_new0(InetSocketAddress, 1);
newaddr->u.inet.data = newiaddr;
newaddr->type = SOCKET_ADDRESS_KIND_INET;
getnameinfo((struct sockaddr *)e->ai_addr, e->ai_addrlen,
uaddr, INET6_ADDRSTRLEN, uport, 32,
NI_NUMERICHOST | NI_NUMERICSERV);
*newiaddr = (InetSocketAddress){
.host = g_strdup(uaddr),
.port = g_strdup(uport),
.has_numeric = true,
.numeric = true,
.has_to = iaddr->has_to,
.to = iaddr->to,
.has_ipv4 = false,
.has_ipv6 = false,
};
(*addrs)[i] = newaddr;
}
freeaddrinfo(res);
return 0;
}
static int qio_dns_resolver_lookup_sync_nop(QIODNSResolver *resolver,
SocketAddress *addr,
size_t *naddrs,
SocketAddress ***addrs,
Error **errp)
{
*naddrs = 1;
*addrs = g_new0(SocketAddress *, 1);
(*addrs)[0] = QAPI_CLONE(SocketAddress, addr);
return 0;
}
int qio_dns_resolver_lookup_sync(QIODNSResolver *resolver,
SocketAddress *addr,
size_t *naddrs,
SocketAddress ***addrs,
Error **errp)
{
switch (addr->type) {
case SOCKET_ADDRESS_KIND_INET:
return qio_dns_resolver_lookup_sync_inet(resolver,
addr,
naddrs,
addrs,
errp);
case SOCKET_ADDRESS_KIND_UNIX:
case SOCKET_ADDRESS_KIND_VSOCK:
return qio_dns_resolver_lookup_sync_nop(resolver,
addr,
naddrs,
addrs,
errp);
default:
error_setg(errp, "Unknown socket address kind");
return -1;
}
}
struct QIODNSResolverLookupData {
SocketAddress *addr;
SocketAddress **addrs;
size_t naddrs;
};
static void qio_dns_resolver_lookup_data_free(gpointer opaque)
{
struct QIODNSResolverLookupData *data = opaque;
size_t i;
qapi_free_SocketAddress(data->addr);
for (i = 0; i < data->naddrs; i++) {
qapi_free_SocketAddress(data->addrs[i]);
}
g_free(data->addrs);
g_free(data);
}
static void qio_dns_resolver_lookup_worker(QIOTask *task,
gpointer opaque)
{
QIODNSResolver *resolver = QIO_DNS_RESOLVER(qio_task_get_source(task));
struct QIODNSResolverLookupData *data = opaque;
Error *err = NULL;
qio_dns_resolver_lookup_sync(resolver,
data->addr,
&data->naddrs,
&data->addrs,
&err);
if (err) {
qio_task_set_error(task, err);
} else {
qio_task_set_result_pointer(task, opaque, NULL);
}
object_unref(OBJECT(resolver));
}
void qio_dns_resolver_lookup_async(QIODNSResolver *resolver,
SocketAddress *addr,
QIOTaskFunc func,
gpointer opaque,
GDestroyNotify notify)
{
QIOTask *task;
struct QIODNSResolverLookupData *data =
g_new0(struct QIODNSResolverLookupData, 1);
data->addr = QAPI_CLONE(SocketAddress, addr);
task = qio_task_new(OBJECT(resolver), func, opaque, notify);
qio_task_run_in_thread(task,
qio_dns_resolver_lookup_worker,
data,
qio_dns_resolver_lookup_data_free);
}
void qio_dns_resolver_lookup_result(QIODNSResolver *resolver,
QIOTask *task,
size_t *naddrs,
SocketAddress ***addrs)
{
struct QIODNSResolverLookupData *data =
qio_task_get_result_pointer(task);
size_t i;
*naddrs = 0;
*addrs = NULL;
if (!data) {
return;
}
*naddrs = data->naddrs;
*addrs = g_new0(SocketAddress *, data->naddrs);
for (i = 0; i < data->naddrs; i++) {
(*addrs)[i] = QAPI_CLONE(SocketAddress, data->addrs[i]);
}
}
static const TypeInfo qio_dns_resolver_info = {
.parent = TYPE_OBJECT,
.name = TYPE_QIO_DNS_RESOLVER,
.instance_size = sizeof(QIODNSResolver),
.class_size = sizeof(QIODNSResolverClass),
};
static void qio_dns_resolver_register_types(void)
{
type_register_static(&qio_dns_resolver_info);
}
type_init(qio_dns_resolver_register_types);

View file

@ -29,6 +29,9 @@ struct QIOTask {
QIOTaskFunc func;
gpointer opaque;
GDestroyNotify destroy;
Error *err;
gpointer result;
GDestroyNotify destroyResult;
};
@ -57,6 +60,12 @@ static void qio_task_free(QIOTask *task)
if (task->destroy) {
task->destroy(task->opaque);
}
if (task->destroyResult) {
task->destroyResult(task->result);
}
if (task->err) {
error_free(task->err);
}
object_unref(task->source);
g_free(task);
@ -68,8 +77,6 @@ struct QIOTaskThreadData {
QIOTaskWorker worker;
gpointer opaque;
GDestroyNotify destroy;
Error *err;
int ret;
};
@ -78,13 +85,8 @@ static gboolean gio_task_thread_result(gpointer opaque)
struct QIOTaskThreadData *data = opaque;
trace_qio_task_thread_result(data->task);
if (data->ret == 0) {
qio_task_complete(data->task);
} else {
qio_task_abort(data->task, data->err);
}
qio_task_complete(data->task);
error_free(data->err);
if (data->destroy) {
data->destroy(data->opaque);
}
@ -100,10 +102,7 @@ static gpointer qio_task_thread_worker(gpointer opaque)
struct QIOTaskThreadData *data = opaque;
trace_qio_task_thread_run(data->task);
data->ret = data->worker(data->task, &data->err, data->opaque);
if (data->ret < 0 && data->err == NULL) {
error_setg(&data->err, "Task worker failed but did not set an error");
}
data->worker(data->task, data->opaque);
/* We're running in the background thread, and must only
* ever report the task results in the main event loop
@ -140,22 +139,47 @@ void qio_task_run_in_thread(QIOTask *task,
void qio_task_complete(QIOTask *task)
{
task->func(task->source, NULL, task->opaque);
task->func(task, task->opaque);
trace_qio_task_complete(task);
qio_task_free(task);
}
void qio_task_abort(QIOTask *task,
Error *err)
void qio_task_set_error(QIOTask *task,
Error *err)
{
task->func(task->source, err, task->opaque);
trace_qio_task_abort(task);
qio_task_free(task);
error_propagate(&task->err, err);
}
bool qio_task_propagate_error(QIOTask *task,
Error **errp)
{
if (task->err) {
error_propagate(errp, task->err);
return true;
}
return false;
}
void qio_task_set_result_pointer(QIOTask *task,
gpointer result,
GDestroyNotify destroy)
{
task->result = result;
task->destroyResult = destroy;
}
gpointer qio_task_get_result_pointer(QIOTask *task)
{
return task->result;
}
Object *qio_task_get_source(QIOTask *task)
{
object_ref(task->source);
return task->source;
}

View file

@ -3,7 +3,6 @@
# io/task.c
qio_task_new(void *task, void *source, void *func, void *opaque) "Task new task=%p source=%p func=%p opaque=%p"
qio_task_complete(void *task) "Task complete task=%p"
qio_task_abort(void *task) "Task abort task=%p"
qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start task=%p worker=%p opaque=%p"
qio_task_thread_run(void *task) "Task thread run task=%p"
qio_task_thread_exit(void *task) "Task thread exit task=%p"

View file

@ -70,22 +70,23 @@ static void socket_connect_data_free(void *opaque)
g_free(data);
}
static void socket_outgoing_migration(Object *src,
Error *err,
static void socket_outgoing_migration(QIOTask *task,
gpointer opaque)
{
struct SocketConnectData *data = opaque;
QIOChannel *sioc = QIO_CHANNEL(src);
QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL;
if (err) {
if (qio_task_propagate_error(task, &err)) {
trace_migration_socket_outgoing_error(error_get_pretty(err));
data->s->to_dst_file = NULL;
migrate_fd_error(data->s, err);
error_free(err);
} else {
trace_migration_socket_outgoing_connected(data->hostname);
migration_channel_connect(data->s, sioc, data->hostname);
}
object_unref(src);
object_unref(OBJECT(sioc));
}
static void socket_start_outgoing_migration(MigrationState *s,

View file

@ -61,15 +61,15 @@ migration_tls_get_creds(MigrationState *s,
}
static void migration_tls_incoming_handshake(Object *src,
Error *err,
static void migration_tls_incoming_handshake(QIOTask *task,
gpointer opaque)
{
QIOChannel *ioc = QIO_CHANNEL(src);
QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL;
if (err) {
if (qio_task_propagate_error(task, &err)) {
trace_migration_tls_incoming_handshake_error(error_get_pretty(err));
error_report("%s", error_get_pretty(err));
error_report_err(err);
} else {
trace_migration_tls_incoming_handshake_complete();
migration_channel_process_incoming(migrate_get_current(), ioc);
@ -107,17 +107,18 @@ void migration_tls_channel_process_incoming(MigrationState *s,
}
static void migration_tls_outgoing_handshake(Object *src,
Error *err,
static void migration_tls_outgoing_handshake(QIOTask *task,
gpointer opaque)
{
MigrationState *s = opaque;
QIOChannel *ioc = QIO_CHANNEL(src);
QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
Error *err = NULL;
if (err) {
if (qio_task_propagate_error(task, &err)) {
trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
s->to_dst_file = NULL;
migrate_fd_error(s, err);
error_free(err);
} else {
trace_migration_tls_outgoing_handshake_complete();
migration_channel_connect(s, ioc, NULL);

View file

@ -78,15 +78,13 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc,
}
void nbd_tls_handshake(Object *src,
Error *err,
void nbd_tls_handshake(QIOTask *task,
void *opaque)
{
struct NBDTLSHandshakeData *data = opaque;
if (err) {
TRACE("TLS failed %s", error_get_pretty(err));
data->error = error_copy(err);
if (qio_task_propagate_error(task, &data->error)) {
TRACE("TLS failed %s", error_get_pretty(data->error));
}
data->complete = true;
g_main_loop_quit(data->loop);

View file

@ -120,8 +120,7 @@ struct NBDTLSHandshakeData {
};
void nbd_tls_handshake(Object *src,
Error *err,
void nbd_tls_handshake(QIOTask *task,
void *opaque);
#endif

View file

@ -3983,6 +3983,10 @@
#
# @port: port part of the address, or lowest port if @to is present
#
# @numeric: #optional true if the host/port are guaranteed to be numeric,
# false if name resolution should be attempted. Defaults to false.
# (Since 2.9)
#
# @to: highest port to try
#
# @ipv4: whether to accept IPv4 addresses, default try both IPv4 and IPv6
@ -3997,6 +4001,7 @@
'data': {
'host': 'str',
'port': 'str',
'*numeric': 'bool',
'*to': 'uint16',
'*ipv4': 'bool',
'*ipv6': 'bool' } }

View file

@ -3277,14 +3277,13 @@ static void tcp_chr_telnet_init(CharDriverState *chr)
}
static void tcp_chr_tls_handshake(Object *source,
Error *err,
static void tcp_chr_tls_handshake(QIOTask *task,
gpointer user_data)
{
CharDriverState *chr = user_data;
TCPCharDriver *s = chr->opaque;
if (err) {
if (qio_task_propagate_error(task, NULL)) {
tcp_chr_disconnect(chr);
} else {
if (s->do_telnetopt) {
@ -3492,20 +3491,23 @@ static void tcp_chr_free(CharDriverState *chr)
}
static void qemu_chr_socket_connected(Object *src, Error *err, void *opaque)
static void qemu_chr_socket_connected(QIOTask *task, void *opaque)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(src);
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
CharDriverState *chr = opaque;
TCPCharDriver *s = chr->opaque;
Error *err = NULL;
if (err) {
if (qio_task_propagate_error(task, &err)) {
check_report_connect_error(chr, err);
object_unref(src);
return;
error_free(err);
goto cleanup;
}
s->connect_err_reported = false;
tcp_chr_new_client(chr, sioc);
cleanup:
object_unref(OBJECT(sioc));
}

View file

@ -156,12 +156,11 @@ struct TestIOChannelData {
};
static void test_io_channel_complete(Object *src,
Error *err,
static void test_io_channel_complete(QIOTask *task,
gpointer opaque)
{
struct TestIOChannelData *data = opaque;
data->err = err != NULL;
data->err = qio_task_propagate_error(task, NULL);
g_main_loop_quit(data->loop);
}

View file

@ -53,14 +53,13 @@ struct QIOChannelTLSHandshakeData {
bool failed;
};
static void test_tls_handshake_done(Object *source,
Error *err,
static void test_tls_handshake_done(QIOTask *task,
gpointer opaque)
{
struct QIOChannelTLSHandshakeData *data = opaque;
data->finished = true;
data->failed = err != NULL;
data->failed = qio_task_propagate_error(task, NULL);
}

View file

@ -50,14 +50,13 @@ struct TestTaskData {
};
static void task_callback(Object *source,
Error *err,
static void task_callback(QIOTask *task,
gpointer opaque)
{
struct TestTaskData *data = opaque;
data->source = source;
data->err = err;
data->source = qio_task_get_source(task);
qio_task_propagate_error(task, &data->err);
}
@ -76,7 +75,6 @@ static void test_task_complete(void)
g_assert(obj == src);
object_unref(obj);
object_unref(src);
g_assert(data.source == obj);
g_assert(data.err == NULL);
@ -121,9 +119,9 @@ static void test_task_failure(void)
error_setg(&err, "Some error");
qio_task_abort(task, err);
qio_task_set_error(task, err);
qio_task_complete(task);
error_free(err);
object_unref(obj);
g_assert(data.source == obj);
@ -142,31 +140,28 @@ struct TestThreadWorkerData {
GMainLoop *loop;
};
static int test_task_thread_worker(QIOTask *task,
Error **errp,
gpointer opaque)
static void test_task_thread_worker(QIOTask *task,
gpointer opaque)
{
struct TestThreadWorkerData *data = opaque;
data->worker = g_thread_self();
if (data->fail) {
error_setg(errp, "Testing fail");
return -1;
Error *err = NULL;
error_setg(&err, "Testing fail");
qio_task_set_error(task, err);
}
return 0;
}
static void test_task_thread_callback(Object *source,
Error *err,
static void test_task_thread_callback(QIOTask *task,
gpointer opaque)
{
struct TestThreadWorkerData *data = opaque;
data->source = source;
data->err = err;
data->source = qio_task_get_source(task);
qio_task_propagate_error(task, &data->err);
data->complete = g_thread_self();

View file

@ -65,16 +65,17 @@ static void start_auth_vencrypt_subauth(VncState *vs)
}
}
static void vnc_tls_handshake_done(Object *source,
Error *err,
static void vnc_tls_handshake_done(QIOTask *task,
gpointer user_data)
{
VncState *vs = user_data;
Error *err = NULL;
if (err) {
if (qio_task_propagate_error(task, &err)) {
VNC_DEBUG("Handshake failed %s\n",
error_get_pretty(err));
vnc_client_error(vs);
error_free(err);
} else {
vs->ioc_tag = qio_channel_add_watch(
vs->ioc, G_IO_IN | G_IO_OUT, vnc_client_io, vs, NULL);

View file

@ -24,15 +24,16 @@
#include "io/channel-websock.h"
#include "qemu/bswap.h"
static void vncws_tls_handshake_done(Object *source,
Error *err,
static void vncws_tls_handshake_done(QIOTask *task,
gpointer user_data)
{
VncState *vs = user_data;
Error *err = NULL;
if (err) {
if (qio_task_propagate_error(task, &err)) {
VNC_DEBUG("Handshake failed %s\n", error_get_pretty(err));
vnc_client_error(vs);
error_free(err);
} else {
VNC_DEBUG("TLS handshake complete, starting websocket handshake\n");
vs->ioc_tag = qio_channel_add_watch(
@ -83,15 +84,16 @@ gboolean vncws_tls_handshake_io(QIOChannel *ioc G_GNUC_UNUSED,
}
static void vncws_handshake_done(Object *source,
Error *err,
static void vncws_handshake_done(QIOTask *task,
gpointer user_data)
{
VncState *vs = user_data;
Error *err = NULL;
if (err) {
if (qio_task_propagate_error(task, &err)) {
VNC_DEBUG("Websock handshake failed %s\n", error_get_pretty(err));
vnc_client_error(vs);
error_free(err);
} else {
VNC_DEBUG("Websock handshake complete, starting VNC protocol\n");
vnc_start_protocol(vs);

View file

@ -38,6 +38,10 @@
# define AI_V4MAPPED 0
#endif
#ifndef AI_NUMERICSERV
# define AI_NUMERICSERV 0
#endif
static int inet_getport(struct addrinfo *e)
{
@ -110,8 +114,8 @@ NetworkAddressFamily inet_netfamily(int family)
* outside scope of this method and not currently handled by
* callers at all.
*/
static int inet_ai_family_from_address(InetSocketAddress *addr,
Error **errp)
int inet_ai_family_from_address(InetSocketAddress *addr,
Error **errp)
{
if (addr->has_ipv6 && addr->has_ipv4 &&
!addr->ipv6 && !addr->ipv4) {
@ -141,6 +145,9 @@ static int inet_listen_saddr(InetSocketAddress *saddr,
memset(&ai,0, sizeof(ai));
ai.ai_flags = AI_PASSIVE;
if (saddr->has_numeric && saddr->numeric) {
ai.ai_flags |= AI_NUMERICHOST | AI_NUMERICSERV;
}
ai.ai_family = inet_ai_family_from_address(saddr, &err);
ai.ai_socktype = SOCK_STREAM;