From 36fe7d394ba5179e8261c589e006e38ea33f9695 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 27 May 2020 20:22:11 +0300 Subject: [PATCH] EPOLLLT --- cluster_client.cpp | 7 +++---- http_client.cpp | 32 +++++++++++++++----------------- osd.cpp | 15 ++++++++------- osd.h | 2 +- timerfd_manager.cpp | 6 +++--- timerfd_manager.h | 4 ++-- 6 files changed, 32 insertions(+), 34 deletions(-) diff --git a/cluster_client.cpp b/cluster_client.cpp index e2f6d403..5acf6cf7 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -115,7 +115,7 @@ void cluster_client_t::try_connect_peer_addr(osd_num_t peer_osd, const char *pee .osd_num = peer_osd, .in_buf = malloc(receive_buffer_size), }; - tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) { // Either OUT (connected) or HUP handle_connect_epoll(peer_fd); @@ -146,8 +146,7 @@ void cluster_client_t::handle_connect_epoll(int peer_fd) int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); cl.peer_state = PEER_CONNECTED; - // FIXME Disable EPOLLOUT on this fd - tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) { handle_peer_epoll(peer_fd, epoll_events); }); @@ -320,7 +319,7 @@ void cluster_client_t::stop_client(int peer_fd) } } clients.erase(it); - tfd->set_fd_handler(peer_fd, NULL); + tfd->set_fd_handler(peer_fd, false, NULL); if (cl.osd_num) { osd_peer_fds.erase(cl.osd_num); diff --git a/http_client.cpp b/http_client.cpp index 04ee0297..042046c5 100644 --- a/http_client.cpp +++ b/http_client.cpp @@ -149,7 +149,7 @@ http_co_t::~http_co_t() } if (peer_fd >= 0) { - tfd->set_fd_handler(peer_fd, NULL); + tfd->set_fd_handler(peer_fd, false, NULL); close(peer_fd); peer_fd = -1; } @@ -204,24 +204,10 @@ void http_co_t::start_connection() delete this; }); } - tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) { this->epoll_events |= epoll_events; - if (state == HTTP_CO_CONNECTING) - { - handle_connect_result(); - } - else - { - if (this->epoll_events & EPOLLIN) - { - submit_read(); - } - else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR)) - { - delete this; - } - } + handle_connect_result(); }); epoll_events = 0; // Finally call connect @@ -253,6 +239,18 @@ void http_co_t::handle_connect_result() } int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) + { + this->epoll_events |= epoll_events; + if (this->epoll_events & EPOLLIN) + { + submit_read(); + } + else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR)) + { + delete this; + } + }); state = HTTP_CO_SENDING_REQUEST; submit_send(); } diff --git a/osd.cpp b/osd.cpp index 8aab34d7..800bd7b6 100644 --- a/osd.cpp +++ b/osd.cpp @@ -42,7 +42,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); } - this->tfd = new timerfd_manager_t([this](int fd, std::function handler) { set_fd_handler(fd, handler); }); + this->tfd = new timerfd_manager_t([this](int fd, bool out, std::function handler) { set_fd_handler(fd, out, handler); }); this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) { print_stats(); @@ -202,7 +202,7 @@ void osd_t::bind_socket() epoll_event ev; ev.data.fd = listen_fd; - ev.events = EPOLLIN | EPOLLET; + ev.events = EPOLLIN; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) { close(listen_fd); @@ -234,14 +234,14 @@ void osd_t::loop() ringloop->submit(); } -void osd_t::set_fd_handler(int fd, std::function handler) +void osd_t::set_fd_handler(int fd, bool out, std::function handler) { if (handler != NULL) { bool exists = epoll_handlers.find(fd) != epoll_handlers.end(); epoll_event ev; ev.data.fd = fd; - ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; + ev.events = EPOLLIN | (out ? EPOLLOUT : 0) | EPOLLRDHUP; if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); @@ -260,11 +260,13 @@ void osd_t::set_fd_handler(int fd, std::function handler) void osd_t::handle_epoll_events() { + wait_state = 0; io_uring_sqe *sqe = ringloop->get_sqe(); if (!sqe) { - throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); + return; } + wait_state = 1; ring_data_t *data = ((ring_data_t*)sqe->user_data); my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); data->callback = [this](ring_data_t *data) @@ -275,7 +277,6 @@ void osd_t::handle_epoll_events() } handle_epoll_events(); }; - ringloop->submit(); int nfds; epoll_event events[MAX_EPOLL_EVENTS]; restart: @@ -305,7 +306,7 @@ restart: .in_buf = malloc(c_cli.receive_buffer_size), }; // Add FD to epoll - set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) { c_cli.handle_peer_epoll(peer_fd, epoll_events); }); diff --git a/osd.h b/osd.h index f808e26b..57c28069 100644 --- a/osd.h +++ b/osd.h @@ -149,7 +149,7 @@ class osd_t // event loop, socket read/write void loop(); - void set_fd_handler(int fd, std::function handler); + void set_fd_handler(int fd, bool out, std::function handler); void handle_epoll_events(); // peer handling (primary OSD logic) diff --git a/timerfd_manager.cpp b/timerfd_manager.cpp index c7bc736b..fdd20edd 100644 --- a/timerfd_manager.cpp +++ b/timerfd_manager.cpp @@ -6,7 +6,7 @@ #include #include "timerfd_manager.h" -timerfd_manager_t::timerfd_manager_t(std::function)> set_fd_handler) +timerfd_manager_t::timerfd_manager_t(std::function)> set_fd_handler) { this->set_fd_handler = set_fd_handler; wait_state = 0; @@ -15,7 +15,7 @@ timerfd_manager_t::timerfd_manager_t(std::function)> set_fd_handler; + std::function)> set_fd_handler; - timerfd_manager_t(std::function)> set_fd_handler); + timerfd_manager_t(std::function)> set_fd_handler); ~timerfd_manager_t(); int set_timer(uint64_t millis, bool repeat, std::function callback); void clear_timer(int timer_id);