From 437dc5b6308600837e044f28fe5d2b2767d33b89 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 7 Jun 2020 00:29:09 +0300 Subject: [PATCH] Implement a FIO engine for testing cluster I/O --- Makefile | 21 +++- cluster_client.cpp | 38 ++++++ cluster_client.h | 8 +- epoll_manager.cpp | 89 ++++++++++++++ epoll_manager.h | 20 +++ fio_cluster.cpp | 298 +++++++++++++++++++++++++++++++++++++++++++++ messenger.h | 1 - osd.cpp | 2 + 8 files changed, 469 insertions(+), 8 deletions(-) create mode 100644 epoll_manager.cpp create mode 100644 epoll_manager.h create mode 100644 fio_cluster.cpp diff --git a/Makefile b/Makefile index fb411ecd..22002477 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o # -fsanitize=address CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always -all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so stub_osd stub_bench osd_test dump_journal +all: libfio_blockstore.so osd libfio_sec_osd.so libfio_cluster.so stub_osd stub_bench osd_test dump_journal clean: rm -f *.o @@ -10,9 +10,9 @@ dump_journal: dump_journal.cpp crc32c.o blockstore_journal.h g++ $(CXXFLAGS) -o $@ $< crc32c.o libblockstore.so: $(BLOCKSTORE_OBJS) - g++ $(CXXFLAGS) -o libblockstore.so -shared $(BLOCKSTORE_OBJS) -ltcmalloc_minimal -luring -libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o - g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring + g++ $(CXXFLAGS) -o $@ -shared $(BLOCKSTORE_OBJS) -ltcmalloc_minimal -luring +libfio_blockstore.so: ./libblockstore.so fio_engine.o json11.o + g++ $(CXXFLAGS) -shared -o $@ fio_engine.o json11.o ./libblockstore.so -ltcmalloc_minimal -luring OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_flush.o osd_peering_pg.o \ osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \ @@ -29,8 +29,13 @@ osd_test: osd_test.cpp osd_ops.h rw_blocking.o osd_peering_pg_test: osd_peering_pg_test.cpp osd_peering_pg.o g++ $(CXXFLAGS) -o $@ $< osd_peering_pg.o -ltcmalloc_minimal -libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h rw_blocking.o - g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o libfio_sec_osd.so fio_sec_osd.cpp rw_blocking.o -luring +libfio_sec_osd.so: fio_sec_osd.o rw_blocking.o + g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ fio_sec_osd.o rw_blocking.o + +FIO_CLUSTER_OBJS := fio_cluster.o cluster_client.o epoll_manager.o etcd_state_client.o \ + messenger.o msgr_send.o msgr_receive.o ringloop.o json11.o http_client.o pg_states.o timerfd_manager.o base64.o +libfio_cluster.so: $(FIO_CLUSTER_OBJS) + g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o $@ $(FIO_CLUSTER_OBJS) -luring test_blockstore: ./libblockstore.so test_blockstore.cpp timerfd_interval.o g++ $(CXXFLAGS) -o test_blockstore test_blockstore.cpp timerfd_interval.o ./libblockstore.so -ltcmalloc_minimal -luring @@ -76,8 +81,12 @@ cluster_client.o: cluster_client.cpp cluster_client.h etcd_state_client.h http_c g++ $(CXXFLAGS) -c -o $@ $< dump_journal.o: dump_journal.cpp allocator.h blockstore.h blockstore_flush.h blockstore_impl.h blockstore_init.h blockstore_journal.h cpp-btree/btree_map.h crc32c.h object_id.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< +epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h + g++ $(CXXFLAGS) -c -o $@ $< etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< +fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h + g++ $(CXXFLAGS) -c -o $@ $< fio_engine.o: fio_engine.cpp blockstore.h fio/fio.h fio/optgroup.h json11/json11.hpp object_id.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h diff --git a/cluster_client.cpp b/cluster_client.cpp index 3f4e70fb..8d6601c7 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -104,6 +104,11 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) if (!pg_stripe_size) pg_stripe_size = DEFAULT_PG_STRIPE_SIZE; } + if (config["immediate_commit"] == "all") + { + // Cluster-wide immediate_commit mode + immediate_commit = true; + } msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value(); if (!msgr.peer_connect_interval) msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; @@ -148,8 +153,16 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd) } } +// FIXME: Implement OSD_OP_SYNC for immediate_commit == false void cluster_client_t::execute(cluster_op_t *op) { + if (op->opcode == OSD_OP_SYNC && immediate_commit) + { + // Syncs are not required in the immediate_commit mode + op->retval = 0; + std::function(op->callback)(op); + return; + } if (op->opcode != OSD_OP_READ && op->opcode != OSD_OP_OUT || !op->inode || !op->len || op->offset % bs_disk_alignment || op->len % bs_disk_alignment) { @@ -163,6 +176,30 @@ void cluster_client_t::execute(cluster_op_t *op) unsent_ops.insert(op); return; } + if (op->opcode == OSD_OP_WRITE && !immediate_commit) + { + // Copy operation + cluster_op_t *op_copy = new cluster_op_t(); + op_copy->opcode = op->opcode; + op_copy->inode = op->inode; + op_copy->offset = op->offset; + op_copy->len = op->len; + op_copy->buf = malloc(op->len); + memcpy(op_copy->buf, op->buf, op->len); + unsynced_ops.push_back(op_copy); + unsynced_bytes += op->len; + if (inmemory_commit) + { + // Immediately acknowledge write and continue with the copy + op->retval = op->len; + std::function(op->callback)(op); + op = op_copy; + } + if (unsynced_bytes >= inmemory_dirty_limit) + { + // Push an extra SYNC operation + } + } // Slice the request into individual object stripe requests // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC uint64_t pg_block_size = bs_block_size * pg_part_count; @@ -304,6 +341,7 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) if (op->done_count >= op->parts.size()) { // Finished! + sent_ops.erase(op); op->retval = op->len; std::function(op->callback)(op); } diff --git a/cluster_client.h b/cluster_client.h index d423593c..d12c0ae4 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -27,7 +27,7 @@ struct cluster_op_part_t struct cluster_op_t { - uint64_t opcode; // OSD_OP_READ, OSD_OP_WRITE + uint64_t opcode; // OSD_OP_READ, OSD_OP_WRITE, OSD_OP_SYNC uint64_t inode; uint64_t offset; uint64_t len; @@ -52,12 +52,18 @@ class cluster_client_t uint64_t bs_disk_alignment = 0; uint64_t bs_bitmap_granularity = 0; uint64_t pg_count = 0; + bool immediate_commit = false; + bool inmemory_commit = false; + uint64_t inmemory_dirty_limit = 32*1024*1024; int log_level; uint64_t op_id = 1; etcd_state_client_t st_cli; osd_messenger_t msgr; std::set sent_ops, unsent_ops; + // unsynced operations are copied in memory to allow replay when cluster isn't in the immediate_commit mode + std::vector unsynced_ops; + uint64_t unsynced_bytes = 0; public: cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config); diff --git a/epoll_manager.cpp b/epoll_manager.cpp new file mode 100644 index 00000000..f79cbba9 --- /dev/null +++ b/epoll_manager.cpp @@ -0,0 +1,89 @@ +#include +#include +#include + +#include "epoll_manager.h" + +#define MAX_EPOLL_EVENTS 64 + +epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop) +{ + this->ringloop = ringloop; + + epoll_fd = epoll_create(1); + if (epoll_fd < 0) + { + throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); + } + + tfd = new timerfd_manager_t([this](int fd, std::function handler) { set_fd_handler(fd, handler); }); + + handle_epoll_events(); +} + +epoll_manager_t::~epoll_manager_t() +{ + if (tfd) + { + delete tfd; + tfd = NULL; + } + close(epoll_fd); +} + +void epoll_manager_t::set_fd_handler(int fd, std::function handler) +{ + if (handler != NULL) + { + bool exists = epoll_handlers.find(fd) != epoll_handlers.end(); + epoll_event ev; + ev.data.fd = fd; + ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + epoll_handlers[fd] = handler; + } + else + { + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT) + { + throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + } + epoll_handlers.erase(fd); + } +} + +void epoll_manager_t::handle_epoll_events() +{ + io_uring_sqe *sqe = ringloop->get_sqe(); + if (!sqe) + { + throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); + } + ring_data_t *data = ((ring_data_t*)sqe->user_data); + my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); + data->callback = [this](ring_data_t *data) + { + if (data->res < 0) + { + throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); + } + handle_epoll_events(); + }; + ringloop->submit(); + int nfds; + epoll_event events[MAX_EPOLL_EVENTS]; +restart: + nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0); + for (int i = 0; i < nfds; i++) + { + auto & cb = epoll_handlers[events[i].data.fd]; + cb(events[i].data.fd, events[i].events); + } + if (nfds == MAX_EPOLL_EVENTS) + { + goto restart; + } +} diff --git a/epoll_manager.h b/epoll_manager.h new file mode 100644 index 00000000..8d1a2185 --- /dev/null +++ b/epoll_manager.h @@ -0,0 +1,20 @@ +#pragma once + +#include + +#include "ringloop.h" +#include "timerfd_manager.h" + +class epoll_manager_t +{ + int epoll_fd; + ring_loop_t *ringloop; + std::map> epoll_handlers; +public: + epoll_manager_t(ring_loop_t *ringloop); + ~epoll_manager_t(); + void set_fd_handler(int fd, std::function handler); + void handle_epoll_events(); + + timerfd_manager_t *tfd; +}; diff --git a/fio_cluster.cpp b/fio_cluster.cpp new file mode 100644 index 00000000..fa153158 --- /dev/null +++ b/fio_cluster.cpp @@ -0,0 +1,298 @@ +// FIO engine to test cluster I/O +// +// Random write: +// +// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \ +// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M +// +// Linear write: +// +// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \ +// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M +// +// Random read (run with -iodepth=32 or -iodepth=1): +// +// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \ +// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M + +#include +#include +#include +#include + +#include +#include + +#include "epoll_manager.h" +#include "cluster_client.h" +extern "C" { +#define CONFIG_HAVE_GETTID +#define CONFIG_PWRITEV2 +#include "fio/fio.h" +#include "fio/optgroup.h" +} + +struct sec_data +{ + ring_loop_t *ringloop = NULL; + epoll_manager_t *epmgr = NULL; + cluster_client_t *cli = NULL; + bool last_sync = false; + /* The list of completed io_u structs. */ + std::vector completed; + uint64_t op_n = 0, inflight = 0; + bool trace = false; +}; + +struct sec_options +{ + int __pad; + char *etcd_host = NULL; + char *etcd_prefix = NULL; + int inode = 0; + int trace = 0; +}; + +static struct fio_option options[] = { + { + .name = "etcd", + .lname = "etcd address", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct sec_options, etcd_host), + .help = "etcd address in the form HOST:PORT[/PATH]", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "etcd", + .lname = "etcd key prefix", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct sec_options, etcd_prefix), + .help = "etcd key prefix, by default /microceph", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "inode", + .lname = "inode to run tests on", + .type = FIO_OPT_INT, + .off1 = offsetof(struct sec_options, inode), + .help = "inode to run tests on (1 by default)", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = "osd_trace", + .lname = "OSD trace", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct sec_options, trace), + .help = "Trace OSD operations", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, + { + .name = NULL, + }, +}; + +static int sec_setup(struct thread_data *td) +{ + sec_data *bsd; + + bsd = new sec_data; + if (!bsd) + { + td_verror(td, errno, "calloc"); + return 1; + } + td->io_ops_data = bsd; + + if (!td->files_index) + { + add_file(td, "osd_cluster", 0, 0); + td->o.nr_files = td->o.nr_files ? : 1; + td->o.open_files++; + } + + return 0; +} + +static void sec_cleanup(struct thread_data *td) +{ + sec_data *bsd = (sec_data*)td->io_ops_data; + if (bsd) + { + delete bsd->cli; + delete bsd->epmgr; + delete bsd->ringloop; + bsd->cli = NULL; + bsd->epmgr = NULL; + bsd->ringloop = NULL; + } +} + +/* Connect to the server from each thread. */ +static int sec_init(struct thread_data *td) +{ + sec_options *o = (sec_options*)td->eo; + sec_data *bsd = (sec_data*)td->io_ops_data; + + json11::Json cfg = json11::Json::object { + { "etcd_address", std::string(o->etcd_host) }, + { "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/microceph") }, + }; + + bsd->ringloop = new ring_loop_t(512); + bsd->epmgr = new epoll_manager_t(bsd->ringloop); + bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg); + + bsd->trace = o->trace ? true : false; + + return 0; +} + +/* Begin read or write request. */ +static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) +{ + sec_options *opt = (sec_options*)td->eo; + sec_data *bsd = (sec_data*)td->io_ops_data; + int n = bsd->op_n; + + fio_ro_check(td, io); + if (io->ddir == DDIR_SYNC && bsd->last_sync) + { + return FIO_Q_COMPLETED; + } + + io->engine_data = bsd; + cluster_op_t *op = new cluster_op_t; + + switch (io->ddir) + { + case DDIR_READ: + op->opcode = OSD_OP_READ; + op->inode = opt->inode; + op->offset = io->offset; + op->len = io->xfer_buflen; + op->buf = io->xfer_buf; + bsd->last_sync = false; + break; + case DDIR_WRITE: + op->opcode = OSD_OP_WRITE; + op->inode = opt->inode; + op->offset = io->offset; + op->len = io->xfer_buflen; + op->buf = io->xfer_buf; + bsd->last_sync = false; + break; + case DDIR_SYNC: + op->opcode = OSD_OP_SYNC; + bsd->last_sync = true; + break; + default: + io->error = EINVAL; + return FIO_Q_COMPLETED; + } + + op->callback = [io, n](cluster_op_t *op) + { + io->error = op->retval < 0 ? -op->retval : 0; + sec_data *bsd = (sec_data*)io->engine_data; + bsd->inflight--; + bsd->completed.push_back(io); + if (bsd->trace) + { + printf("--- %s n=%d retval=%d\n", io->ddir == DDIR_READ ? "READ" : + (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n, op->retval); + } + delete op; + }; + + if (opt->trace) + { + printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" : + (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n); + } + + io->error = 0; + bsd->inflight++; + bsd->op_n++; + bsd->cli->execute(op); + + if (io->error != 0) + return FIO_Q_COMPLETED; + return FIO_Q_QUEUED; +} + +static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) +{ + sec_data *bsd = (sec_data*)td->io_ops_data; + while (true) + { + bsd->ringloop->loop(); + if (bsd->completed.size() >= min) + break; + bsd->ringloop->wait(); + } + return bsd->completed.size(); +} + +static struct io_u *sec_event(struct thread_data *td, int event) +{ + sec_data *bsd = (sec_data*)td->io_ops_data; + if (bsd->completed.size() == 0) + return NULL; + /* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */ + struct io_u *ev = bsd->completed.back(); + bsd->completed.pop_back(); + return ev; +} + +static int sec_io_u_init(struct thread_data *td, struct io_u *io) +{ + io->engine_data = NULL; + return 0; +} + +static void sec_io_u_free(struct thread_data *td, struct io_u *io) +{ +} + +static int sec_open_file(struct thread_data *td, struct fio_file *f) +{ + return 0; +} + +static int sec_invalidate(struct thread_data *td, struct fio_file *f) +{ + return 0; +} + +struct ioengine_ops ioengine = { + .name = "microceph_cluster", + .version = FIO_IOOPS_VERSION, + .flags = FIO_MEMALIGN | FIO_DISKLESSIO | FIO_NOEXTEND, + .setup = sec_setup, + .init = sec_init, + .queue = sec_queue, + .getevents = sec_getevents, + .event = sec_event, + .cleanup = sec_cleanup, + .open_file = sec_open_file, + .invalidate = sec_invalidate, + .io_u_init = sec_io_u_init, + .io_u_free = sec_io_u_free, + .option_struct_size = sizeof(struct sec_options), + .options = options, +}; + +static void fio_init fio_sec_register(void) +{ + register_ioengine(&ioengine); +} + +static void fio_exit fio_sec_unregister(void) +{ + unregister_ioengine(&ioengine); +} diff --git a/messenger.h b/messenger.h index 3c6968a5..83a7cadd 100644 --- a/messenger.h +++ b/messenger.h @@ -23,7 +23,6 @@ #define CL_READ_REPLY_DATA 3 #define CL_WRITE_READY 1 #define CL_WRITE_REPLY 2 -#define MAX_EPOLL_EVENTS 64 #define OSD_OP_INLINE_BUF_COUNT 16 #define PEER_CONNECTING 1 diff --git a/osd.cpp b/osd.cpp index cf4e31be..29e07c87 100644 --- a/osd.cpp +++ b/osd.cpp @@ -7,6 +7,8 @@ #include "osd.h" +#define MAX_EPOLL_EVENTS 64 + const char* osd_op_names[] = { "", "read",