diff --git a/src/epoll_manager.cpp b/src/epoll_manager.cpp index bc37409c..2e4d5923 100644 --- a/src/epoll_manager.cpp +++ b/src/epoll_manager.cpp @@ -13,6 +13,7 @@ epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop) { this->ringloop = ringloop; + this->pending = false; epoll_fd = epoll_create(1); if (epoll_fd < 0) @@ -22,11 +23,19 @@ epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop) tfd = new timerfd_manager_t([this](int fd, bool wr, std::function handler) { set_fd_handler(fd, wr, handler); }); + consumer.loop = [this]() + { + if (pending) + handle_epoll_events(); + }; + ringloop->register_consumer(&consumer); + handle_epoll_events(); } epoll_manager_t::~epoll_manager_t() { + ringloop->unregister_consumer(&consumer); if (tfd) { delete tfd; @@ -64,8 +73,13 @@ void epoll_manager_t::handle_epoll_events() io_uring_sqe *sqe = ringloop->get_sqe(); if (!sqe) { - throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); + // Don't handle epoll events until we manage to post the next event handler + // otherwise we'll fall out of sync with EPOLLET + pending = true; + ringloop->wakeup(); + return; } + pending = false; ring_data_t *data = ((ring_data_t*)sqe->user_data); my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); data->callback = [this](ring_data_t *data) diff --git a/src/epoll_manager.h b/src/epoll_manager.h index bb9161f6..6cdee46e 100644 --- a/src/epoll_manager.h +++ b/src/epoll_manager.h @@ -11,6 +11,8 @@ class epoll_manager_t { int epoll_fd; + bool pending; + ring_consumer_t consumer; ring_loop_t *ringloop; std::map> epoll_handlers; public: