diff --git a/Makefile b/Makefile index 00dc6139..4c9ced70 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o # -fsanitize=address CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -all: libfio_blockstore.so osd libfio_sec_osd.so libfio_cluster.so stub_osd stub_uring_osd stub_bench osd_test dump_journal +all: libfio_blockstore.so osd libfio_sec_osd.so libfio_cluster.so stub_osd stub_uring_osd stub_bench osd_test dump_journal qemu_driver.so clean: rm -f *.o @@ -36,10 +36,14 @@ osd_peering_pg_test: osd_peering_pg_test.cpp osd_peering_pg.o libfio_sec_osd.so: fio_sec_osd.o rw_blocking.o g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ fio_sec_osd.o rw_blocking.o -FIO_CLUSTER_OBJS := fio_cluster.o cluster_client.o epoll_manager.o etcd_state_client.o \ +FIO_CLUSTER_OBJS := cluster_client.o epoll_manager.o etcd_state_client.o \ messenger.o msgr_send.o msgr_receive.o ringloop.o json11.o http_client.o pg_states.o timerfd_manager.o base64.o -libfio_cluster.so: $(FIO_CLUSTER_OBJS) - g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $(FIO_CLUSTER_OBJS) -luring +libfio_cluster.so: fio_cluster.o $(FIO_CLUSTER_OBJS) + g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) -luring + +qemu_driver.so: qemu_driver.c qemu_proxy.o $(FIO_CLUSTER_OBJS) + gcc -I qemu/b/qemu `pkg-config glib-2.0 --cflags` `pkg-config glib-2.0 --libs` \ + -I qemu/include $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $< $(FIO_CLUSTER_OBJS) qemu_proxy.o -luring test_blockstore: ./libblockstore.so test_blockstore.cpp timerfd_interval.o g++ $(CXXFLAGS) -o test_blockstore test_blockstore.cpp timerfd_interval.o ./libblockstore.so -ltcmalloc_minimal -luring @@ -131,6 +135,8 @@ osd_test.o: osd_test.cpp object_id.h osd_id.h osd_ops.h rw_blocking.h test_patte g++ $(CXXFLAGS) -c -o $@ $< pg_states.o: pg_states.cpp pg_states.h g++ $(CXXFLAGS) -c -o $@ $< +qemu_proxy.o: qemu_proxy.cpp cluster_client.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h qemu_proxy.h ringloop.h timerfd_manager.h + g++ $(CXXFLAGS) -c -o $@ $< ringloop.o: ringloop.cpp ringloop.h g++ $(CXXFLAGS) -c -o $@ $< rw_blocking.o: rw_blocking.cpp rw_blocking.h diff --git a/qemu_driver.c b/qemu_driver.c new file mode 100644 index 00000000..4de41f4b --- /dev/null +++ b/qemu_driver.c @@ -0,0 +1,352 @@ +// QEMU block driver + +#define _GNU_SOURCE +#include "qemu/osdep.h" +#include "qemu/units.h" +#include "block/block_int.h" +#include "block/qdict.h" +#include "qapi/error.h" +#include "qapi/qmp/qdict.h" +#include "qapi/qmp/qerror.h" +#include "qemu/uri.h" +#include "qemu/error-report.h" +#include "qemu/module.h" +#include "qemu/option.h" +#include "qemu/cutils.h" + +#include "qemu_proxy.h" + +typedef struct FalconClient +{ + void *proxy; + const char *etcd_host; + const char *etcd_prefix; + uint64_t inode; + uint64_t size; + int readonly; + QemuMutex mutex; +} FalconClient; + +typedef struct FalconRPC +{ + BlockDriverState *bs; + Coroutine *co; + QEMUIOVector *iov; + int ret; + int complete; +} FalconRPC; + +static char *qemu_rbd_next_tok(char *src, char delim, char **p) +{ + char *end; + *p = NULL; + for (end = src; *end; ++end) + { + if (*end == delim) + break; + if (*end == '\\' && end[1] != '\0') + end++; + } + if (*end == delim) + { + *p = end + 1; + *end = '\0'; + } + return src; +} + +static void qemu_rbd_unescape(char *src) +{ + char *p; + for (p = src; *src; ++src, ++p) + { + if (*src == '\\' && src[1] != '\0') + src++; + *p = *src; + } + *p = '\0'; +} + +// falcon[:key=value]* +// falcon:etcd_host=127.0.0.1:inode=1 +static void falcon_parse_filename(const char *filename, QDict *options, Error **errp) +{ + const char *start; + char *p, *buf; + + if (!strstart(filename, "falcon:", &start)) + { + error_setg(errp, "File name must start with 'falcon:'"); + return; + } + + buf = g_strdup(start); + p = buf; + + // The following are all key/value pairs + while (p) + { + char *name, *value; + name = qemu_rbd_next_tok(p, '=', &p); + if (!p) + { + error_setg(errp, "conf option %s has no value", name); + break; + } + qemu_rbd_unescape(name); + value = qemu_rbd_next_tok(p, ':', &p); + qemu_rbd_unescape(value); + if (!strcmp(name, "inode") || !strcmp(name, "size")) + { + unsigned long long num_val; + if (parse_uint_full(value, &num_val, 0)) + { + error_setg(errp, "Illegal %s: %s", name, value); + goto out; + } + qdict_put_int(options, name, num_val); + } + else + { + qdict_put_str(options, name, value); + } + } + if (!qdict_get_int(options, "inode")) + { + error_setg(errp, "inode is missing"); + goto out; + } + if (!qdict_get_int(options, "size")) + { + error_setg(errp, "size is missing"); + goto out; + } + if (!qdict_get_int(options, "etcd_host")) + { + error_setg(errp, "etcd_host is missing"); + goto out; + } + +out: + g_free(buf); + return; +} + +static int falcon_file_open(BlockDriverState *bs, QDict *options, int flags, Error **errp) +{ + FalconClient *client = bs->opaque; + int64_t ret = 0; + client->etcd_host = qdict_get_try_str(options, "etcd_host"); + client->etcd_prefix = qdict_get_try_str(options, "etcd_prefix"); + client->inode = qdict_get_int(options, "inode"); + client->size = qdict_get_int(options, "size"); + client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0; + client->proxy = falcon_proxy_create(client->etcd_host, client->etcd_prefix); + //client->aio_context = bdrv_get_aio_context(bs); + bs->total_sectors = client->size / BDRV_SECTOR_SIZE; + return ret; +} + +static void falcon_close(BlockDriverState *bs) +{ + FalconClient *client = bs->opaque; + falcon_proxy_destroy(client->proxy); +} + +static int falcon_probe_blocksizes(BlockDriverState *bs, BlockSizes *bsz) +{ + bsz->phys = 4096; + bsz->log = 4096; + return 0; +} + +static int coroutine_fn falcon_co_create_opts(BlockDriver *drv, const char *url, QemuOpts *opts, Error **errp) +{ + QDict *options; + int ret; + + options = qdict_new(); + falcon_parse_filename(url, options, errp); + if (errp) + { + ret = -1; + goto out; + } + + // inodes don't require creation in Falcon. FIXME: They will when there will be some metadata + + ret = 0; +out: + qobject_unref(options); + return ret; +} + +static int coroutine_fn falcon_co_truncate(BlockDriverState *bs, int64_t offset, bool exact, PreallocMode prealloc, Error **errp) +{ + FalconClient *client = bs->opaque; + + if (prealloc != PREALLOC_MODE_OFF) + { + error_setg(errp, "Unsupported preallocation mode '%s'", PreallocMode_str(prealloc)); + return -ENOTSUP; + } + + // TODO: Resize inode to bytes + client->size = offset / BDRV_SECTOR_SIZE; + + return 0; +} + +static int falcon_get_info(BlockDriverState *bs, BlockDriverInfo *bdi) +{ + bdi->cluster_size = 4096; + return 0; +} + +static int64_t falcon_getlength(BlockDriverState *bs) +{ + FalconClient *client = bs->opaque; + return client->size; +} + +static int64_t falcon_get_allocated_file_size(BlockDriverState *bs) +{ + return 0; +} + +static void falcon_co_init_task(BlockDriverState *bs, FalconRPC *task) +{ + *task = (FalconRPC) { + .co = qemu_coroutine_self(), + .bs = bs, + }; +} + +static void falcon_co_generic_bh_cb(int retval, void *opaque) +{ + FalconRPC *task = opaque; + task->ret = retval; + task->complete = 1; + aio_co_wake(task->co); +} + +static int coroutine_fn falcon_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *iov, int flags) +{ + FalconClient *client = bs->opaque; + FalconRPC task; + falcon_co_init_task(bs, &task); + task.iov = iov; + + qemu_mutex_lock(&client->mutex); + falcon_proxy_rw(0, client->proxy, client->inode, offset, bytes, iov->iov, iov->niov, falcon_co_generic_bh_cb, &task); + qemu_mutex_unlock(&client->mutex); + + while (!task.complete) + { + qemu_coroutine_yield(); + } + + return task.ret; +} + +static int coroutine_fn falcon_co_pwritev(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *iov, int flags) +{ + FalconClient *client = bs->opaque; + FalconRPC task; + falcon_co_init_task(bs, &task); + task.iov = iov; + + qemu_mutex_lock(&client->mutex); + falcon_proxy_rw(1, client->proxy, client->inode, offset, bytes, iov->iov, iov->niov, falcon_co_generic_bh_cb, &task); + qemu_mutex_unlock(&client->mutex); + + while (!task.complete) + { + qemu_coroutine_yield(); + } + + return task.ret; +} + +static int coroutine_fn falcon_co_flush(BlockDriverState *bs) +{ + FalconClient *client = bs->opaque; + FalconRPC task; + falcon_co_init_task(bs, &task); + + qemu_mutex_lock(&client->mutex); + falcon_proxy_sync(client->proxy, falcon_co_generic_bh_cb, &task); + qemu_mutex_unlock(&client->mutex); + + while (!task.complete) + { + qemu_coroutine_yield(); + } + + return task.ret; +} + +static QemuOptsList falcon_create_opts = { + .name = "falcon-create-opts", + .head = QTAILQ_HEAD_INITIALIZER(falcon_create_opts.head), + .desc = { + { + .name = BLOCK_OPT_SIZE, + .type = QEMU_OPT_SIZE, + .help = "Virtual disk size" + }, + { /* end of list */ } + } +}; + +static const char *falcon_strong_runtime_opts[] = { + "image", + "server.", + + NULL +}; + +static BlockDriver bdrv_falcon = { + .format_name = "falcon", + .protocol_name = "falcon", + + .instance_size = sizeof(FalconClient), + .bdrv_parse_filename = falcon_parse_filename, + + .bdrv_has_zero_init = bdrv_has_zero_init_1, + .bdrv_has_zero_init_truncate = bdrv_has_zero_init_1, + .bdrv_get_info = falcon_get_info, + .bdrv_getlength = falcon_getlength, + .bdrv_probe_blocksizes = falcon_probe_blocksizes, + + // FIXME: Implement it along with per-inode statistics + //.bdrv_get_allocated_file_size = falcon_get_allocated_file_size, + + .bdrv_file_open = falcon_file_open, + .bdrv_close = falcon_close, + + // Option list for the create operation + .create_opts = &falcon_create_opts, + + // For qmp_blockdev_create(), used by the qemu monitor / QAPI + // Requires patching QAPI IDL, thus unimplemented + //.bdrv_co_create = falcon_co_create, + + // For bdrv_create(), used by qemu-img + .bdrv_co_create_opts = falcon_co_create_opts, + + .bdrv_co_truncate = falcon_co_truncate, + + .bdrv_co_preadv = falcon_co_preadv, + .bdrv_co_pwritev = falcon_co_pwritev, + .bdrv_co_flush_to_disk = falcon_co_flush, + + .strong_runtime_opts = falcon_strong_runtime_opts, +}; + +static void falcon_block_init(void) +{ + bdrv_register(&bdrv_falcon); +} + +block_init(falcon_block_init); diff --git a/qemu_proxy.cpp b/qemu_proxy.cpp new file mode 100644 index 00000000..97672fda --- /dev/null +++ b/qemu_proxy.cpp @@ -0,0 +1,122 @@ +// C-C++ proxy for the QEMU driver +// (QEMU headers don't compile with g++) + +#include + +#include "cluster_client.h" +#include "qemu_proxy.h" + +extern "C" +{ + // QEMU + typedef void IOHandler(void *opaque); + void qemu_set_fd_handler(int fd, IOHandler *fd_read, IOHandler *fd_write, void *opaque); +} + +struct QemuProxyData +{ + int fd; + std::function callback; +}; + +class QemuProxy +{ + std::map handlers; + +public: + + timerfd_manager_t *tfd; + cluster_client_t *cli; + + QemuProxy(const char *etcd_host, const char *etcd_prefix) + { + json11::Json cfg = json11::Json::object { + { "etcd_address", std::string(etcd_host) }, + { "etcd_prefix", std::string(etcd_prefix ? etcd_prefix : "/microceph") }, + }; + tfd = new timerfd_manager_t([this](int fd, bool wr, std::function callback) { set_fd_handler(fd, wr, callback); }); + cli = new cluster_client_t(NULL, tfd, cfg); + } + + ~QemuProxy() + { + delete cli; + delete tfd; + } + + void set_fd_handler(int fd, bool wr, std::function callback) + { + if (callback != NULL) + { + handlers[fd] = { .fd = fd, .callback = callback }; + qemu_set_fd_handler(fd, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, &handlers[fd]); + } + else + { + handlers.erase(fd); + qemu_set_fd_handler(fd, NULL, NULL, NULL); + } + } + + static void read_handler(void *opaque) + { + QemuProxyData *data = (QemuProxyData *)opaque; + data->callback(data->fd, EPOLLIN); + } + + static void write_handler(void *opaque) + { + QemuProxyData *data = (QemuProxyData *)opaque; + data->callback(data->fd, EPOLLOUT); + } +}; + +extern "C" { + +void* falcon_proxy_create(const char *etcd_host, const char *etcd_prefix) +{ + QemuProxy *p = new QemuProxy(etcd_host, etcd_prefix); + return p; +} + +void falcon_proxy_destroy(void *client) +{ + QemuProxy *p = (QemuProxy*)client; + delete p; +} + +void falcon_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len, + iovec *iov, int iovcnt, FalconIOHandler cb, void *opaque) +{ + QemuProxy *p = (QemuProxy*)client; + cluster_op_t *op = new cluster_op_t; + op->opcode = write ? OSD_OP_WRITE : OSD_OP_READ; + op->inode = inode; + op->offset = offset; + op->len = len; + for (int i = 0; i < iovcnt; i++) + { + op->iov.push_back(iov[i].iov_base, iov[i].iov_len); + } + op->callback = [cb, opaque](cluster_op_t *op) + { + cb(op->retval, opaque); + delete op; + }; + p->cli->execute(op); +} + +void falcon_proxy_sync(void *client, FalconIOHandler cb, void *opaque) +{ + QemuProxy *p = (QemuProxy*)client; + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_SYNC; + op->callback = [cb, opaque](cluster_op_t *op) + { + cb(op->retval, opaque); + delete op; + }; + p->cli->execute(op); +} + +} diff --git a/qemu_proxy.h b/qemu_proxy.h new file mode 100644 index 00000000..837fda4e --- /dev/null +++ b/qemu_proxy.h @@ -0,0 +1,23 @@ +#ifndef FALCON_QEMU_PROXY_H +#define FALCON_QEMU_PROXY_H + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// Our exports +typedef void FalconIOHandler(int retval, void *opaque); +void* falcon_proxy_create(const char *etcd_host, const char *etcd_prefix); +void falcon_proxy_destroy(void *client); +void falcon_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len, + struct iovec *iov, int iovcnt, FalconIOHandler cb, void *opaque); +void falcon_proxy_sync(void *client, FalconIOHandler cb, void *opaque); + +#ifdef __cplusplus +} +#endif + +#endif