From 85010fed380542d67adde5c3548ba1a3bd7ece36 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 30 May 2020 10:24:50 +0300 Subject: [PATCH] Use a separate thread for epoll Simplest, but absolutely inefficient, way to test openonload epoll --- Makefile | 2 +- osd.cpp | 92 ++++++++++++++++++++++++++++++++++++++------------------ osd.h | 8 ++++- 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index 33840428..153a8874 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \ osd_rmw.o json11.o base64.o timerfd_manager.o osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) - g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring + g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lpthread stub_osd: stub_osd.o rw_blocking.o g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal diff --git a/osd.cpp b/osd.cpp index f1e2982e..d8813e6f 100644 --- a/osd.cpp +++ b/osd.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -43,6 +44,11 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo { throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); } + event_fd = eventfd(0, EFD_NONBLOCK); + if (event_fd < 0) + { + throw std::runtime_error(std::string("eventfd: ") + strerror(errno)); + } this->tfd = new timerfd_manager_t(ringloop); this->tfd->set_fd_handler = [this](int fd, std::function handler) { set_fd_handler(fd, handler); }; @@ -60,17 +66,40 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo consumer.loop = [this]() { loop(); }; ringloop->register_consumer(&consumer); + epoll_thread = new std::thread([this]() + { + int nfds; + epoll_event events[MAX_EPOLL_EVENTS]; + while (1) + { + nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, -1); + { + std::lock_guard guard(epoll_mutex); + for (int i = 0; i < nfds; i++) + { + int fd = events[i].data.fd; + int ev = events[i].events; + epoll_ready[fd] |= ev; + } + uint64_t n = 1; + write(event_fd, &n, 8); + } + } + }); } osd_t::~osd_t() { + close(epoll_fd); + epoll_thread->join(); + delete epoll_thread; if (tfd) { delete tfd; tfd = NULL; } ringloop->unregister_consumer(&consumer); - close(epoll_fd); + close(event_fd); close(listen_fd); } @@ -189,8 +218,13 @@ void osd_t::bind_socket() { close(listen_fd); close(epoll_fd); - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + throw std::runtime_error(std::string("epoll_ctl (add listen_fd): ") + strerror(errno)); } + + epoll_handlers[listen_fd] = [this](int peer_fd, int epoll_events) + { + c_cli.accept_connections(listen_fd); + }; } bool osd_t::shutdown() @@ -205,10 +239,23 @@ bool osd_t::shutdown() void osd_t::loop() { - if (!wait_state) + std::map cur_epoll; { - handle_epoll_events(); - wait_state = 1; + std::lock_guard guard(epoll_mutex); + cur_epoll.swap(epoll_ready); + } + for (auto p: cur_epoll) + { + auto cb_it = epoll_handlers.find(p.first); + if (cb_it != epoll_handlers.end()) + { + cb_it->second(p.first, p.second); + } + } + if (!(wait_state & 2)) + { + handle_eventfd(); + wait_state = wait_state | 2; } handle_peers(); c_cli.read_requests(); @@ -226,7 +273,7 @@ void osd_t::set_fd_handler(int fd, std::function handler) ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + throw std::runtime_error(std::string(exists ? "epoll_ctl (mod fd): " : "epoll_ctl (add fd): ") + strerror(errno)); } epoll_handlers[fd] = handler; } @@ -234,49 +281,36 @@ void osd_t::set_fd_handler(int fd, std::function handler) { if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT) { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); + throw std::runtime_error(std::string("epoll_ctl (remove fd): ") + strerror(errno)); } epoll_handlers.erase(fd); } } -void osd_t::handle_epoll_events() +void osd_t::handle_eventfd() { io_uring_sqe *sqe = ringloop->get_sqe(); if (!sqe) { - throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); + throw std::runtime_error("can't get SQE, will fall out of sync with eventfd"); } ring_data_t *data = ((ring_data_t*)sqe->user_data); - my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); + my_uring_prep_poll_add(sqe, event_fd, POLLIN); data->callback = [this](ring_data_t *data) { if (data->res < 0) { throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); } - handle_epoll_events(); + handle_eventfd(); }; ringloop->submit(); - int nfds; - epoll_event events[MAX_EPOLL_EVENTS]; -restart: - nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0); - for (int i = 0; i < nfds; i++) + uint64_t n = 0; + size_t res = read(event_fd, &n, 8); + if (res == 8) { - if (events[i].data.fd == listen_fd) - { - c_cli.accept_connections(listen_fd); - } - else - { - auto & cb = epoll_handlers[events[i].data.fd]; - cb(events[i].data.fd, events[i].events); - } - } - if (nfds == MAX_EPOLL_EVENTS) - { - goto restart; + // No need to do anything, the loop has already woken up + ringloop->wakeup(); } } diff --git a/osd.h b/osd.h index 20d4d212..37209a7f 100644 --- a/osd.h +++ b/osd.h @@ -12,6 +12,8 @@ #include #include +#include +#include #include "blockstore.h" #include "ringloop.h" @@ -114,6 +116,10 @@ class osd_t int wait_state = 0; int epoll_fd = 0; + int event_fd = 0; + std::thread *epoll_thread = NULL; + std::mutex epoll_mutex; + std::map epoll_ready; int listening_port = 0; int listen_fd = 0; ring_consumer_t consumer; @@ -150,7 +156,7 @@ class osd_t // event loop, socket read/write void loop(); void set_fd_handler(int fd, std::function handler); - void handle_epoll_events(); + void handle_eventfd(); // peer handling (primary OSD logic) void parse_test_peer(std::string peer);