Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

96 lines
2.6 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #include <sys/epoll.h>
  4. #include <sys/poll.h>
  5. #include <unistd.h>
  6. #include <stdexcept>
  7. #include "epoll_manager.h"
  8. #define MAX_EPOLL_EVENTS 64
  9. epoll_manager_t::epoll_manager_t(ring_loop_t *ringloop)
  10. {
  11. this->ringloop = ringloop;
  12. epoll_fd = epoll_create(1);
  13. if (epoll_fd < 0)
  14. {
  15. throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
  16. }
  17. tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> handler) { set_fd_handler(fd, wr, handler); });
  18. handle_epoll_events();
  19. }
  20. epoll_manager_t::~epoll_manager_t()
  21. {
  22. if (tfd)
  23. {
  24. delete tfd;
  25. tfd = NULL;
  26. }
  27. close(epoll_fd);
  28. }
  29. void epoll_manager_t::set_fd_handler(int fd, bool wr, std::function<void(int, int)> handler)
  30. {
  31. if (handler != NULL)
  32. {
  33. bool exists = epoll_handlers.find(fd) != epoll_handlers.end();
  34. epoll_event ev;
  35. ev.data.fd = fd;
  36. ev.events = (wr ? EPOLLOUT : 0) | EPOLLIN | EPOLLRDHUP | EPOLLET;
  37. if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
  38. {
  39. throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
  40. }
  41. epoll_handlers[fd] = handler;
  42. }
  43. else
  44. {
  45. if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
  46. {
  47. throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
  48. }
  49. epoll_handlers.erase(fd);
  50. }
  51. }
  52. void epoll_manager_t::handle_epoll_events()
  53. {
  54. io_uring_sqe *sqe = ringloop->get_sqe();
  55. if (!sqe)
  56. {
  57. throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET");
  58. }
  59. ring_data_t *data = ((ring_data_t*)sqe->user_data);
  60. my_uring_prep_poll_add(sqe, epoll_fd, POLLIN);
  61. data->callback = [this](ring_data_t *data)
  62. {
  63. if (data->res < 0)
  64. {
  65. throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
  66. }
  67. handle_epoll_events();
  68. };
  69. ringloop->submit();
  70. int nfds;
  71. epoll_event events[MAX_EPOLL_EVENTS];
  72. do
  73. {
  74. nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
  75. for (int i = 0; i < nfds; i++)
  76. {
  77. auto cb_it = epoll_handlers.find(events[i].data.fd);
  78. if (cb_it != epoll_handlers.end())
  79. {
  80. auto & cb = cb_it->second;
  81. cb(events[i].data.fd, events[i].events);
  82. }
  83. }
  84. } while (nfds == MAX_EPOLL_EVENTS);
  85. }