From 592bcd3699a6e0a5748cd90386aaa93b72682513 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 26 Jun 2020 02:18:58 +0300 Subject: [PATCH] Fix QEMU driver bugs (QEMU and qemu-img now work! hooray!) --- Makefile | 9 ++++++--- blockstore_flush.cpp | 4 ++-- blockstore_read.cpp | 16 ++++++++++++++++ cluster_client.cpp | 31 ++++++++++++++++++++---------- cluster_client.h | 1 + http_client.cpp | 9 ++++++--- messenger.h | 2 +- msgr_send.cpp | 9 ++++++++- qemu_driver.c | 45 ++++++++++++++++++++++++++++++++------------ qemu_proxy.cpp | 17 +++++++++++------ qemu_proxy.h | 2 +- 11 files changed, 106 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 4c9ced70..f7bb90a8 100644 --- a/Makefile +++ b/Makefile @@ -41,9 +41,12 @@ FIO_CLUSTER_OBJS := cluster_client.o epoll_manager.o etcd_state_client.o \ libfio_cluster.so: fio_cluster.o $(FIO_CLUSTER_OBJS) g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) -luring -qemu_driver.so: qemu_driver.c qemu_proxy.o $(FIO_CLUSTER_OBJS) - gcc -I qemu/b/qemu `pkg-config glib-2.0 --cflags` `pkg-config glib-2.0 --libs` \ - -I qemu/include $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) qemu_proxy.o -luring +qemu_driver.o: qemu_driver.c qemu_proxy.h + gcc -I qemu/b/qemu `pkg-config glib-2.0 --cflags` \ + -I qemu/include $(CXXFLAGS) -c -o $@ $< + +qemu_driver.so: qemu_driver.o qemu_proxy.o $(FIO_CLUSTER_OBJS) + g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) qemu_driver.o qemu_proxy.o -luring test_blockstore: ./libblockstore.so test_blockstore.cpp timerfd_interval.o g++ $(CXXFLAGS) -o test_blockstore test_blockstore.cpp timerfd_interval.o ./libblockstore.so -ltcmalloc_minimal -luring diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index e2fa936b..08cd8545 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -553,13 +553,13 @@ bool journal_flusher_co::scan_dirty(int wait_base) if (bs->journal.inmemory) { // Take it from memory - memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len); + memcpy(it->buf, bs->journal.buffer + submit_offset, submit_len); } else { // Read it from disk await_sqe(0); - data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; + data->iov = (struct iovec){ it->buf, (size_t)submit_len }; data->callback = simple_callback_r; my_uring_prep_readv( sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 25ac6b5f..49f69bdf 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -37,6 +37,7 @@ int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_ return 1; } +// FIXME I've seen a bug here so I want some tests int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location) { @@ -49,8 +50,20 @@ int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfille while (1) { for (; it != PRIV(read_op)->read_vec.end(); it++) + { if (it->offset >= cur_start) + { break; + } + else if (it->offset + it->len > cur_start) + { + cur_start = it->offset + it->len; + if (cur_start >= item_end) + { + goto endwhile; + } + } + } if (it == PRIV(read_op)->read_vec.end() || it->offset > cur_start) { fulfill_read_t el = { @@ -69,9 +82,12 @@ int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfille } cur_start = it->offset + it->len; if (it == PRIV(read_op)->read_vec.end() || cur_start >= item_end) + { break; + } } } +endwhile: return 1; } diff --git a/cluster_client.cpp b/cluster_client.cpp index 8cef24a8..3b88b82a 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -79,6 +79,14 @@ cluster_client_t::~cluster_client_t() } } +void cluster_client_t::stop() +{ + while (msgr.clients.size() > 0) + { + msgr.stop_client(msgr.clients.begin()->first); + } +} + void cluster_client_t::continue_ops() { if (retry_timeout_id) @@ -433,7 +441,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op) int left = end-begin; while (left > 0 && iov_idx < op->iov.count) { - if (op->iov.buf[iov_idx].iov_len - iov_pos > left) + if (op->iov.buf[iov_idx].iov_len - iov_pos < left) { op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, op->iov.buf[iov_idx].iov_len - iov_pos); left -= (op->iov.buf[iov_idx].iov_len - iov_pos); @@ -575,7 +583,8 @@ void cluster_client_t::continue_sync() void cluster_client_t::finish_sync() { - if (cur_sync->retval == -EPIPE) + int retval = cur_sync->retval; + if (retval == -EPIPE) { // Retry later cur_sync->parts.clear(); @@ -585,7 +594,7 @@ void cluster_client_t::finish_sync() return; } std::function(cur_sync->callback)(cur_sync); - if (!cur_sync->retval) + if (!retval) { for (auto op: unsynced_writes) { @@ -599,20 +608,22 @@ void cluster_client_t::finish_sync() unsynced_writes.clear(); } cur_sync = NULL; - int i; - for (i = 0; i < next_writes.size() && !cur_sync; i++) + while (next_writes.size() > 0) { - if (next_writes[i]->opcode == OSD_OP_SYNC) + if (next_writes[0]->opcode == OSD_OP_SYNC) { - execute_sync(next_writes[i]); + cur_sync = next_writes[0]; + next_writes.erase(next_writes.begin(), next_writes.begin()+1); + continue_sync(); } else { - cur_ops.insert(next_writes[i]); - continue_rw(next_writes[i]); + auto wr = next_writes[0]; + cur_ops.insert(wr); + next_writes.erase(next_writes.begin(), next_writes.begin()+1); + continue_rw(wr); } } - next_writes.erase(next_writes.begin(), next_writes.begin()+i); } void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part) diff --git a/cluster_client.h b/cluster_client.h index 07782161..09349182 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -81,6 +81,7 @@ public: cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config); ~cluster_client_t(); void execute(cluster_op_t *op); + void stop(); protected: void continue_ops(); diff --git a/http_client.cpp b/http_client.cpp index ae406b77..61b496ba 100644 --- a/http_client.cpp +++ b/http_client.cpp @@ -302,15 +302,18 @@ void http_co_t::submit_read() { res = -errno; } - if (res == -EAGAIN || res == 0) + if (res == -EAGAIN) { epoll_events = epoll_events & ~EPOLLIN; } - else if (res < 0) + else if (res <= 0) { + // < 0 means error, 0 means EOF + if (!res) + epoll_events = epoll_events & ~EPOLLIN; end(); } - else if (res > 0) + else { response += std::string(rbuf.data(), res); handle_read(); diff --git a/messenger.h b/messenger.h index 7dc85964..c206beb7 100644 --- a/messenger.h +++ b/messenger.h @@ -116,7 +116,7 @@ struct osd_op_buf_list_t } else { - alloc = ((alloc/16)*16 + 1); + alloc = alloc < 16 ? 16 : (alloc+16); buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); } } diff --git a/msgr_send.cpp b/msgr_send.cpp index 200ebe39..03e202c2 100644 --- a/msgr_send.cpp +++ b/msgr_send.cpp @@ -28,7 +28,14 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) } } cl.outbox.push_back(cur_op); - if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl)) + if (!ringloop) + { + while (cl.write_op || cl.outbox.size()) + { + try_send(cl); + } + } + else if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl)) { if (cl.write_state == 0) { diff --git a/qemu_driver.c b/qemu_driver.c index 4de41f4b..10c7e816 100644 --- a/qemu_driver.c +++ b/qemu_driver.c @@ -19,8 +19,8 @@ typedef struct FalconClient { void *proxy; - const char *etcd_host; - const char *etcd_prefix; + char *etcd_host; + char *etcd_prefix; uint64_t inode; uint64_t size; int readonly; @@ -111,17 +111,17 @@ static void falcon_parse_filename(const char *filename, QDict *options, Error ** qdict_put_str(options, name, value); } } - if (!qdict_get_int(options, "inode")) + if (!qdict_get_try_int(options, "inode", 0)) { error_setg(errp, "inode is missing"); goto out; } - if (!qdict_get_int(options, "size")) + if (!qdict_get_try_int(options, "size", 0)) { error_setg(errp, "size is missing"); goto out; } - if (!qdict_get_int(options, "etcd_host")) + if (!qdict_get_str(options, "etcd_host")) { error_setg(errp, "etcd_host is missing"); goto out; @@ -136,14 +136,19 @@ static int falcon_file_open(BlockDriverState *bs, QDict *options, int flags, Err { FalconClient *client = bs->opaque; int64_t ret = 0; - client->etcd_host = qdict_get_try_str(options, "etcd_host"); - client->etcd_prefix = qdict_get_try_str(options, "etcd_prefix"); + client->etcd_host = g_strdup(qdict_get_try_str(options, "etcd_host")); + client->etcd_prefix = g_strdup(qdict_get_try_str(options, "etcd_prefix")); client->inode = qdict_get_int(options, "inode"); client->size = qdict_get_int(options, "size"); client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0; - client->proxy = falcon_proxy_create(client->etcd_host, client->etcd_prefix); + client->proxy = falcon_proxy_create(bdrv_get_aio_context(bs), client->etcd_host, client->etcd_prefix); //client->aio_context = bdrv_get_aio_context(bs); bs->total_sectors = client->size / BDRV_SECTOR_SIZE; + qdict_del(options, "etcd_host"); + qdict_del(options, "etcd_prefix"); + qdict_del(options, "inode"); + qdict_del(options, "size"); + qemu_mutex_init(&client->mutex); return ret; } @@ -151,6 +156,10 @@ static void falcon_close(BlockDriverState *bs) { FalconClient *client = bs->opaque; falcon_proxy_destroy(client->proxy); + qemu_mutex_destroy(&client->mutex); + g_free(client->etcd_host); + if (client->etcd_prefix) + g_free(client->etcd_prefix); } static int falcon_probe_blocksizes(BlockDriverState *bs, BlockSizes *bsz) @@ -167,7 +176,7 @@ static int coroutine_fn falcon_co_create_opts(BlockDriver *drv, const char *url, options = qdict_new(); falcon_parse_filename(url, options, errp); - if (errp) + if (*errp) { ret = -1; goto out; @@ -209,6 +218,13 @@ static int64_t falcon_getlength(BlockDriverState *bs) return client->size; } +static void falcon_refresh_limits(BlockDriverState *bs, Error **errp) +{ + bs->bl.request_alignment = 4096; + bs->bl.min_mem_alignment = 4096; + bs->bl.opt_mem_alignment = 4096; +} + static int64_t falcon_get_allocated_file_size(BlockDriverState *bs) { return 0; @@ -227,7 +243,10 @@ static void falcon_co_generic_bh_cb(int retval, void *opaque) FalconRPC *task = opaque; task->ret = retval; task->complete = 1; - aio_co_wake(task->co); + if (qemu_coroutine_self() != task->co) + { + aio_co_wake(task->co); + } } static int coroutine_fn falcon_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *iov, int flags) @@ -300,8 +319,9 @@ static QemuOptsList falcon_create_opts = { }; static const char *falcon_strong_runtime_opts[] = { - "image", - "server.", + "inode", + "etcd_host", + "etcd_prefix", NULL }; @@ -318,6 +338,7 @@ static BlockDriver bdrv_falcon = { .bdrv_get_info = falcon_get_info, .bdrv_getlength = falcon_getlength, .bdrv_probe_blocksizes = falcon_probe_blocksizes, + .bdrv_refresh_limits = falcon_refresh_limits, // FIXME: Implement it along with per-inode statistics //.bdrv_get_allocated_file_size = falcon_get_allocated_file_size, diff --git a/qemu_proxy.cpp b/qemu_proxy.cpp index 97672fda..2344e5f7 100644 --- a/qemu_proxy.cpp +++ b/qemu_proxy.cpp @@ -4,13 +4,15 @@ #include #include "cluster_client.h" + +typedef void* AioContext; #include "qemu_proxy.h" extern "C" { // QEMU typedef void IOHandler(void *opaque); - void qemu_set_fd_handler(int fd, IOHandler *fd_read, IOHandler *fd_write, void *opaque); + void aio_set_fd_handler(AioContext *ctx, int fd, int is_external, IOHandler *fd_read, IOHandler *fd_write, void *poll_fn, void *opaque); } struct QemuProxyData @@ -27,9 +29,11 @@ public: timerfd_manager_t *tfd; cluster_client_t *cli; + AioContext *ctx; - QemuProxy(const char *etcd_host, const char *etcd_prefix) + QemuProxy(AioContext *ctx, const char *etcd_host, const char *etcd_prefix) { + this->ctx = ctx; json11::Json cfg = json11::Json::object { { "etcd_address", std::string(etcd_host) }, { "etcd_prefix", std::string(etcd_prefix ? etcd_prefix : "/microceph") }, @@ -40,6 +44,7 @@ public: ~QemuProxy() { + cli->stop(); delete cli; delete tfd; } @@ -49,12 +54,12 @@ public: if (callback != NULL) { handlers[fd] = { .fd = fd, .callback = callback }; - qemu_set_fd_handler(fd, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, &handlers[fd]); + aio_set_fd_handler(ctx, fd, false, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, NULL, &handlers[fd]); } else { handlers.erase(fd); - qemu_set_fd_handler(fd, NULL, NULL, NULL); + aio_set_fd_handler(ctx, fd, false, NULL, NULL, NULL, NULL); } } @@ -73,9 +78,9 @@ public: extern "C" { -void* falcon_proxy_create(const char *etcd_host, const char *etcd_prefix) +void* falcon_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix) { - QemuProxy *p = new QemuProxy(etcd_host, etcd_prefix); + QemuProxy *p = new QemuProxy(ctx, etcd_host, etcd_prefix); return p; } diff --git a/qemu_proxy.h b/qemu_proxy.h index 837fda4e..43eb8e67 100644 --- a/qemu_proxy.h +++ b/qemu_proxy.h @@ -10,7 +10,7 @@ extern "C" { // Our exports typedef void FalconIOHandler(int retval, void *opaque); -void* falcon_proxy_create(const char *etcd_host, const char *etcd_prefix); +void* falcon_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix); void falcon_proxy_destroy(void *client); void falcon_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len, struct iovec *iov, int iovcnt, FalconIOHandler cb, void *opaque);