Fix QEMU driver bugs (QEMU and qemu-img now work! hooray!)

Vitaliy Filippov 2020-06-26 02:18:58 +03:00
parent 5e1e39633d
commit 592bcd3699
11 changed files with 106 additions and 39 deletions

View File

@ -41,9 +41,12 @@ FIO_CLUSTER_OBJS := cluster_client.o epoll_manager.o etcd_state_client.o \
libfio_cluster.so: fio_cluster.o $(FIO_CLUSTER_OBJS)
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) -luring
qemu_driver.so: qemu_driver.c qemu_proxy.o $(FIO_CLUSTER_OBJS)
gcc -I qemu/b/qemu `pkg-config glib-2.0 --cflags` `pkg-config glib-2.0 --libs` \
-I qemu/include $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) qemu_proxy.o -luring
qemu_driver.o: qemu_driver.c qemu_proxy.h
gcc -I qemu/b/qemu `pkg-config glib-2.0 --cflags` \
-I qemu/include $(CXXFLAGS) -c -o $@ $<
qemu_driver.so: qemu_driver.o qemu_proxy.o $(FIO_CLUSTER_OBJS)
g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) qemu_driver.o qemu_proxy.o -luring
test_blockstore: ./libblockstore.so test_blockstore.cpp timerfd_interval.o
g++ $(CXXFLAGS) -o test_blockstore test_blockstore.cpp timerfd_interval.o ./libblockstore.so -ltcmalloc_minimal -luring

View File

@ -553,13 +553,13 @@ bool journal_flusher_co::scan_dirty(int wait_base)
if (bs->journal.inmemory)
{
// Take it from memory
memcpy(v.back().buf, bs->journal.buffer + submit_offset, submit_len);
memcpy(it->buf, bs->journal.buffer + submit_offset, submit_len);
}
else
{
// Read it from disk
await_sqe(0);
data->iov = (struct iovec){ v.back().buf, (size_t)submit_len };
data->iov = (struct iovec){ it->buf, (size_t)submit_len };
data->callback = simple_callback_r;
my_uring_prep_readv(
sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset

View File

@ -37,6 +37,7 @@ int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_
return 1;
}
// FIXME I've seen a bug here so I want some tests
int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location)
{
@ -49,8 +50,20 @@ int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfille
while (1)
{
for (; it != PRIV(read_op)->read_vec.end(); it++)
{
if (it->offset >= cur_start)
{
break;
}
else if (it->offset + it->len > cur_start)
{
cur_start = it->offset + it->len;
if (cur_start >= item_end)
{
goto endwhile;
}
}
}
if (it == PRIV(read_op)->read_vec.end() || it->offset > cur_start)
{
fulfill_read_t el = {
@ -69,9 +82,12 @@ int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfille
}
cur_start = it->offset + it->len;
if (it == PRIV(read_op)->read_vec.end() || cur_start >= item_end)
{
break;
}
}
}
endwhile:
return 1;
}

View File

@ -79,6 +79,14 @@ cluster_client_t::~cluster_client_t()
}
}
void cluster_client_t::stop()
{
while (msgr.clients.size() > 0)
{
msgr.stop_client(msgr.clients.begin()->first);
}
}
void cluster_client_t::continue_ops()
{
if (retry_timeout_id)
@ -433,7 +441,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
int left = end-begin;
while (left > 0 && iov_idx < op->iov.count)
{
if (op->iov.buf[iov_idx].iov_len - iov_pos > left)
if (op->iov.buf[iov_idx].iov_len - iov_pos < left)
{
op->parts[i].iov.push_back(op->iov.buf[iov_idx].iov_base + iov_pos, op->iov.buf[iov_idx].iov_len - iov_pos);
left -= (op->iov.buf[iov_idx].iov_len - iov_pos);
@ -575,7 +583,8 @@ void cluster_client_t::continue_sync()
void cluster_client_t::finish_sync()
{
if (cur_sync->retval == -EPIPE)
int retval = cur_sync->retval;
if (retval == -EPIPE)
{
// Retry later
cur_sync->parts.clear();
@ -585,7 +594,7 @@ void cluster_client_t::finish_sync()
return;
}
std::function<void(cluster_op_t*)>(cur_sync->callback)(cur_sync);
if (!cur_sync->retval)
if (!retval)
{
for (auto op: unsynced_writes)
{
@ -599,20 +608,22 @@ void cluster_client_t::finish_sync()
unsynced_writes.clear();
}
cur_sync = NULL;
int i;
for (i = 0; i < next_writes.size() && !cur_sync; i++)
while (next_writes.size() > 0)
{
if (next_writes[i]->opcode == OSD_OP_SYNC)
if (next_writes[0]->opcode == OSD_OP_SYNC)
{
execute_sync(next_writes[i]);
cur_sync = next_writes[0];
next_writes.erase(next_writes.begin(), next_writes.begin()+1);
continue_sync();
}
else
{
cur_ops.insert(next_writes[i]);
continue_rw(next_writes[i]);
auto wr = next_writes[0];
cur_ops.insert(wr);
next_writes.erase(next_writes.begin(), next_writes.begin()+1);
continue_rw(wr);
}
}
next_writes.erase(next_writes.begin(), next_writes.begin()+i);
}
void cluster_client_t::send_sync(cluster_op_t *op, cluster_op_part_t *part)

View File

@ -81,6 +81,7 @@ public:
cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
~cluster_client_t();
void execute(cluster_op_t *op);
void stop();
protected:
void continue_ops();

View File

@ -302,15 +302,18 @@ void http_co_t::submit_read()
{
res = -errno;
}
if (res == -EAGAIN || res == 0)
if (res == -EAGAIN)
{
epoll_events = epoll_events & ~EPOLLIN;
}
else if (res < 0)
else if (res <= 0)
{
// < 0 means error, 0 means EOF
if (!res)
epoll_events = epoll_events & ~EPOLLIN;
end();
}
else if (res > 0)
else
{
response += std::string(rbuf.data(), res);
handle_read();

View File

@ -116,7 +116,7 @@ struct osd_op_buf_list_t
}
else
{
alloc = ((alloc/16)*16 + 1);
alloc = alloc < 16 ? 16 : (alloc+16);
buf = (iovec*)realloc(buf, sizeof(iovec) * alloc);
}
}

View File

@ -28,7 +28,14 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
}
}
cl.outbox.push_back(cur_op);
if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl))
if (!ringloop)
{
while (cl.write_op || cl.outbox.size())
{
try_send(cl);
}
}
else if (cl.write_op || cl.outbox.size() > 1 || !try_send(cl))
{
if (cl.write_state == 0)
{

View File

@ -19,8 +19,8 @@
typedef struct FalconClient
{
void *proxy;
const char *etcd_host;
const char *etcd_prefix;
char *etcd_host;
char *etcd_prefix;
uint64_t inode;
uint64_t size;
int readonly;
@ -111,17 +111,17 @@ static void falcon_parse_filename(const char *filename, QDict *options, Error **
qdict_put_str(options, name, value);
}
}
if (!qdict_get_int(options, "inode"))
if (!qdict_get_try_int(options, "inode", 0))
{
error_setg(errp, "inode is missing");
goto out;
}
if (!qdict_get_int(options, "size"))
if (!qdict_get_try_int(options, "size", 0))
{
error_setg(errp, "size is missing");
goto out;
}
if (!qdict_get_int(options, "etcd_host"))
if (!qdict_get_str(options, "etcd_host"))
{
error_setg(errp, "etcd_host is missing");
goto out;
@ -136,14 +136,19 @@ static int falcon_file_open(BlockDriverState *bs, QDict *options, int flags, Err
{
FalconClient *client = bs->opaque;
int64_t ret = 0;
client->etcd_host = qdict_get_try_str(options, "etcd_host");
client->etcd_prefix = qdict_get_try_str(options, "etcd_prefix");
client->etcd_host = g_strdup(qdict_get_try_str(options, "etcd_host"));
client->etcd_prefix = g_strdup(qdict_get_try_str(options, "etcd_prefix"));
client->inode = qdict_get_int(options, "inode");
client->size = qdict_get_int(options, "size");
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
client->proxy = falcon_proxy_create(client->etcd_host, client->etcd_prefix);
client->proxy = falcon_proxy_create(bdrv_get_aio_context(bs), client->etcd_host, client->etcd_prefix);
//client->aio_context = bdrv_get_aio_context(bs);
bs->total_sectors = client->size / BDRV_SECTOR_SIZE;
qdict_del(options, "etcd_host");
qdict_del(options, "etcd_prefix");
qdict_del(options, "inode");
qdict_del(options, "size");
qemu_mutex_init(&client->mutex);
return ret;
}
@ -151,6 +156,10 @@ static void falcon_close(BlockDriverState *bs)
{
FalconClient *client = bs->opaque;
falcon_proxy_destroy(client->proxy);
qemu_mutex_destroy(&client->mutex);
g_free(client->etcd_host);
if (client->etcd_prefix)
g_free(client->etcd_prefix);
}
static int falcon_probe_blocksizes(BlockDriverState *bs, BlockSizes *bsz)
@ -167,7 +176,7 @@ static int coroutine_fn falcon_co_create_opts(BlockDriver *drv, const char *url,
options = qdict_new();
falcon_parse_filename(url, options, errp);
if (errp)
if (*errp)
{
ret = -1;
goto out;
@ -209,6 +218,13 @@ static int64_t falcon_getlength(BlockDriverState *bs)
return client->size;
}
static void falcon_refresh_limits(BlockDriverState *bs, Error **errp)
{
bs->bl.request_alignment = 4096;
bs->bl.min_mem_alignment = 4096;
bs->bl.opt_mem_alignment = 4096;
}
static int64_t falcon_get_allocated_file_size(BlockDriverState *bs)
{
return 0;
@ -227,8 +243,11 @@ static void falcon_co_generic_bh_cb(int retval, void *opaque)
FalconRPC *task = opaque;
task->ret = retval;
task->complete = 1;
if (qemu_coroutine_self() != task->co)
{
aio_co_wake(task->co);
}
}
static int coroutine_fn falcon_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *iov, int flags)
{
@ -300,8 +319,9 @@ static QemuOptsList falcon_create_opts = {
};
static const char *falcon_strong_runtime_opts[] = {
"image",
"server.",
"inode",
"etcd_host",
"etcd_prefix",
NULL
};
@ -318,6 +338,7 @@ static BlockDriver bdrv_falcon = {
.bdrv_get_info = falcon_get_info,
.bdrv_getlength = falcon_getlength,
.bdrv_probe_blocksizes = falcon_probe_blocksizes,
.bdrv_refresh_limits = falcon_refresh_limits,
// FIXME: Implement it along with per-inode statistics
//.bdrv_get_allocated_file_size = falcon_get_allocated_file_size,

View File

@ -4,13 +4,15 @@
#include <sys/epoll.h>
#include "cluster_client.h"
typedef void* AioContext;
#include "qemu_proxy.h"
extern "C"
{
// QEMU
typedef void IOHandler(void *opaque);
void qemu_set_fd_handler(int fd, IOHandler *fd_read, IOHandler *fd_write, void *opaque);
void aio_set_fd_handler(AioContext *ctx, int fd, int is_external, IOHandler *fd_read, IOHandler *fd_write, void *poll_fn, void *opaque);
}
struct QemuProxyData
@ -27,9 +29,11 @@ public:
timerfd_manager_t *tfd;
cluster_client_t *cli;
AioContext *ctx;
QemuProxy(const char *etcd_host, const char *etcd_prefix)
QemuProxy(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
{
this->ctx = ctx;
json11::Json cfg = json11::Json::object {
{ "etcd_address", std::string(etcd_host) },
{ "etcd_prefix", std::string(etcd_prefix ? etcd_prefix : "/microceph") },
@ -40,6 +44,7 @@ public:
~QemuProxy()
{
cli->stop();
delete cli;
delete tfd;
}
@ -49,12 +54,12 @@ public:
if (callback != NULL)
{
handlers[fd] = { .fd = fd, .callback = callback };
qemu_set_fd_handler(fd, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, &handlers[fd]);
aio_set_fd_handler(ctx, fd, false, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, NULL, &handlers[fd]);
}
else
{
handlers.erase(fd);
qemu_set_fd_handler(fd, NULL, NULL, NULL);
aio_set_fd_handler(ctx, fd, false, NULL, NULL, NULL, NULL);
}
}
@ -73,9 +78,9 @@ public:
extern "C" {
void* falcon_proxy_create(const char *etcd_host, const char *etcd_prefix)
void* falcon_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix)
{
QemuProxy *p = new QemuProxy(etcd_host, etcd_prefix);
QemuProxy *p = new QemuProxy(ctx, etcd_host, etcd_prefix);
return p;
}

View File

@ -10,7 +10,7 @@ extern "C" {
// Our exports
typedef void FalconIOHandler(int retval, void *opaque);
void* falcon_proxy_create(const char *etcd_host, const char *etcd_prefix);
void* falcon_proxy_create(AioContext *ctx, const char *etcd_host, const char *etcd_prefix);
void falcon_proxy_destroy(void *client);
void falcon_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, FalconIOHandler cb, void *opaque);