From 68d4f2a481f6563ded9b517f708f92d91aa22858 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 22 Dec 2021 02:36:31 +0300 Subject: [PATCH] Use real messenger in fio_sec_osd --- src/CMakeLists.txt | 5 +- src/fio_sec_osd.cpp | 153 ++++++++++++++++++++++++-------------------- src/messenger.h | 6 +- 3 files changed, 89 insertions(+), 75 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 69dd915a..c28a0212 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -111,11 +111,12 @@ if (${WITH_FIO}) # libfio_vitastor_sec.so add_library(fio_vitastor_sec SHARED fio_sec_osd.cpp - rw_blocking.cpp - addr_util.cpp ) target_link_libraries(fio_vitastor_sec + vitastor_common tcmalloc_minimal + ${LIBURING_LIBRARIES} + ${IBVERBS_LIBRARIES} ) endif (${WITH_FIO}) diff --git a/src/fio_sec_osd.cpp b/src/fio_sec_osd.cpp index 0be2f764..02bcf5d7 100644 --- a/src/fio_sec_osd.cpp +++ b/src/fio_sec_osd.cpp @@ -29,16 +29,23 @@ #include #include "addr_util.h" -#include "rw_blocking.h" +#include "epoll_manager.h" +#include "ringloop.h" +#include "messenger.h" #include "osd_ops.h" #include "fio_headers.h" struct sec_data { + epoll_manager_t *epmgr; + ring_loop_t *ringloop; + osd_messenger_t *msgr; + ring_consumer_t looper; + void *bitmap_buf; int connect_fd; + uint64_t op_id; /* block_size = 1 << block_order (128KB by default) */ uint64_t block_order = 17, block_size = 1 << 17; - std::unordered_map queue; bool last_sync = false; /* The list of completed io_u structs. */ std::vector completed; @@ -111,9 +118,6 @@ static struct fio_option options[] = { static int sec_setup(struct thread_data *td) { sec_data *bsd; - //fio_file *f; - //int r; - //int64_t size; bsd = new sec_data; if (!bsd) @@ -130,8 +134,6 @@ static int sec_setup(struct thread_data *td) td->o.open_files++; } - //f = td->files[0]; - //f->real_file_size = size; return 0; } @@ -140,6 +142,10 @@ static void sec_cleanup(struct thread_data *td) sec_data *bsd = (sec_data*)td->io_ops_data; if (bsd) { + delete bsd->msgr; + delete bsd->epmgr; + delete bsd->ringloop; + free(bsd->bitmap_buf); close(bsd->connect_fd); delete bsd; } @@ -174,6 +180,45 @@ static int sec_init(struct thread_data *td) int one = 1; setsockopt(bsd->connect_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + fcntl(bsd->connect_fd, F_SETFL, fcntl(bsd->connect_fd, F_GETFL, 0) | O_NONBLOCK); + + json11::Json cfg = json11::Json::object{ { "use_rdma", 0 } }; + + bsd->bitmap_buf = malloc(4096); + + bsd->ringloop = new ring_loop_t(512); + bsd->epmgr = new epoll_manager_t(bsd->ringloop); + bsd->msgr = new osd_messenger_t(); + bsd->msgr->tfd = bsd->epmgr->tfd; + bsd->msgr->ringloop = bsd->ringloop; + bsd->msgr->repeer_pgs = [](osd_num_t){}; + bsd->msgr->parse_config(cfg); + bsd->msgr->init(); + + bsd->looper.loop = [bsd]() + { + bsd->msgr->read_requests(); + bsd->msgr->send_replies(); + bsd->ringloop->submit(); + }; + bsd->ringloop->register_consumer(&bsd->looper); + + int peer_fd = bsd->connect_fd; + bsd->msgr->clients[peer_fd] = new osd_client_t(); + bsd->msgr->clients[peer_fd]->peer_addr = addr; + bsd->msgr->clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port); + bsd->msgr->clients[peer_fd]->peer_fd = peer_fd; + bsd->msgr->clients[peer_fd]->peer_state = PEER_CONNECTED; + bsd->msgr->clients[peer_fd]->connect_timeout_id = -1; + bsd->msgr->clients[peer_fd]->osd_num = 1; + bsd->msgr->clients[peer_fd]->in_buf = malloc_or_die(bsd->msgr->receive_buffer_size); + bsd->epmgr->tfd->set_fd_handler(peer_fd, true, [msgr = bsd->msgr](int peer_fd, int epoll_events) + { + // Either OUT (connected) or HUP + msgr->handle_peer_epoll(peer_fd, epoll_events); + }); + bsd->msgr->osd_peer_fds[1] = peer_fd; + // FIXME: read config (block size) from OSD return 0; @@ -193,7 +238,12 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) } io->engine_data = bsd; - osd_any_op_t op = { 0 }; + + osd_op_t *oo = new osd_op_t(); + oo->op_type = OSD_OP_OUT; + oo->peer_fd = bsd->connect_fd; + + osd_any_op_t & op = oo->req; op.hdr.magic = SECONDARY_OSD_OP_MAGIC; op.hdr.id = n; @@ -210,6 +260,9 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) op.sec_rw.version = UINT64_MAX; // last unstable op.sec_rw.offset = io->offset % bsd->block_size; op.sec_rw.len = io->xfer_buflen; + op.sec_rw.attr_len = 4; + oo->bitmap = bsd->bitmap_buf; + oo->bitmap_len = 4; } else { @@ -218,6 +271,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) op.rw.offset = io->offset; op.rw.len = io->xfer_buflen; } + oo->iov.push_back(io->xfer_buf, io->xfer_buflen); bsd->last_sync = false; break; case DDIR_WRITE: @@ -239,6 +293,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) op.rw.offset = io->offset; op.rw.len = io->xfer_buflen; } + oo->iov.push_back(io->xfer_buf, io->xfer_buflen); bsd->last_sync = false; break; case DDIR_SYNC: @@ -260,6 +315,21 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) return FIO_Q_COMPLETED; } + oo->callback = [td, io](osd_op_t *oo) + { + sec_options *opt = (sec_options*)td->eo; + sec_data *bsd = (sec_data*)td->io_ops_data; + if (opt->trace) + { + printf("--- %s # %ld %ld\n", io->ddir == DDIR_READ ? "READ" : + (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), oo->reply.hdr.id, oo->reply.hdr.retval); + } + io->error = oo->reply.hdr.retval < 0 ? -oo->reply.hdr.retval : 0; + bsd->completed.push_back(io); + delete oo; + }; + bsd->msgr->outbox_push(oo); + if (opt->trace) { printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" : @@ -269,21 +339,6 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) io->error = 0; bsd->inflight++; bsd->op_n++; - bsd->queue[n] = io; - - iovec iov[2] = { { .iov_base = op.buf, .iov_len = OSD_PACKET_SIZE } }; - int iovcnt = 1, wtotal = OSD_PACKET_SIZE; - if (io->ddir == DDIR_WRITE) - { - iov[1] = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen }; - wtotal += io->xfer_buflen; - iovcnt++; - } - if (writev_blocking(bsd->connect_fd, iov, iovcnt) != wtotal) - { - perror("writev"); - exit(1); - } if (io->error != 0) return FIO_Q_COMPLETED; @@ -292,57 +347,13 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) { - sec_options *opt = (sec_options*)td->eo; sec_data *bsd = (sec_data*)td->io_ops_data; - // FIXME timeout, at least poll. Now it's the stupidest implementation possible - osd_any_reply_t reply; while (bsd->completed.size() < min) { - read_blocking(bsd->connect_fd, reply.buf, OSD_PACKET_SIZE); - if (reply.hdr.magic != SECONDARY_OSD_REPLY_MAGIC) - { - fprintf(stderr, "bad reply: magic = %lx instead of %lx\n", reply.hdr.magic, SECONDARY_OSD_REPLY_MAGIC); - exit(1); - } - auto it = bsd->queue.find(reply.hdr.id); - if (it == bsd->queue.end()) - { - fprintf(stderr, "bad reply: op id %lx missing in local queue\n", reply.hdr.id); - exit(1); - } - io_u* io = it->second; - bsd->queue.erase(it); - if (io->ddir == DDIR_READ) - { - if (reply.hdr.retval != io->xfer_buflen) - { - fprintf(stderr, "Short read: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); - exit(1); - } - read_blocking(bsd->connect_fd, io->xfer_buf, io->xfer_buflen); - } - else if (io->ddir == DDIR_WRITE) - { - if (reply.hdr.retval != io->xfer_buflen) - { - fprintf(stderr, "Short write: retval = %ld instead of %llu\n", reply.hdr.retval, io->xfer_buflen); - exit(1); - } - } - else if (io->ddir == DDIR_SYNC) - { - if (reply.hdr.retval != 0) - { - fprintf(stderr, "Sync failed: retval = %ld\n", reply.hdr.retval); - exit(1); - } - } - if (opt->trace) - { - printf("--- %s # %ld\n", io->ddir == DDIR_READ ? "READ" : - (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), reply.hdr.id); - } - bsd->completed.push_back(io); + bsd->ringloop->loop(); + if (bsd->completed.size() >= min) + break; + bsd->ringloop->wait(); } return bsd->completed.size(); } diff --git a/src/messenger.h b/src/messenger.h index 6b59325b..e2e5472c 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -120,7 +120,6 @@ struct osd_messenger_t protected: int keepalive_timer_id = -1; - uint32_t receive_buffer_size = 0; int peer_connect_interval = 0; int peer_connect_timeout = 0; int osd_idle_timeout = 0; @@ -142,6 +141,8 @@ protected: std::vector> set_immediate; public: + uint32_t receive_buffer_size = 0; + timerfd_manager_t *tfd; ring_loop_t *ringloop; // osd_num_t is only for logging and asserts @@ -172,10 +173,11 @@ public: bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg); #endif + void handle_peer_epoll(int peer_fd, int epoll_events); + protected: void try_connect_peer(uint64_t osd_num); void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); - void handle_peer_epoll(int peer_fd, int epoll_events); void handle_connect_epoll(int peer_fd); void on_connect_peer(osd_num_t peer_osd, int peer_fd); void check_peer_config(osd_client_t *cl);