Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

111 lines
2.8 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #include <stdlib.h>
  4. #include <stdexcept>
  5. #include "ringloop.h"
  6. ring_loop_t::ring_loop_t(int qd)
  7. {
  8. int ret = io_uring_queue_init(qd, &ring, 0);
  9. if (ret < 0)
  10. {
  11. throw std::runtime_error(std::string("io_uring_queue_init: ") + strerror(-ret));
  12. }
  13. free_ring_data_ptr = *ring.cq.kring_entries;
  14. ring_datas = (struct ring_data_t*)calloc(free_ring_data_ptr, sizeof(ring_data_t));
  15. free_ring_data = (int*)malloc(sizeof(int) * free_ring_data_ptr);
  16. if (!ring_datas || !free_ring_data)
  17. {
  18. throw std::bad_alloc();
  19. }
  20. for (int i = 0; i < free_ring_data_ptr; i++)
  21. {
  22. free_ring_data[i] = i;
  23. }
  24. wait_sqe_id = 1;
  25. }
  26. ring_loop_t::~ring_loop_t()
  27. {
  28. free(free_ring_data);
  29. free(ring_datas);
  30. io_uring_queue_exit(&ring);
  31. }
  32. void ring_loop_t::register_consumer(ring_consumer_t *consumer)
  33. {
  34. unregister_consumer(consumer);
  35. consumers.push_back(consumer);
  36. }
  37. void ring_loop_t::wakeup()
  38. {
  39. loop_again = true;
  40. }
  41. void ring_loop_t::unregister_consumer(ring_consumer_t *consumer)
  42. {
  43. for (int i = 0; i < consumers.size(); i++)
  44. {
  45. if (consumers[i] == consumer)
  46. {
  47. consumers.erase(consumers.begin()+i, consumers.begin()+i+1);
  48. break;
  49. }
  50. }
  51. }
  52. void ring_loop_t::loop()
  53. {
  54. struct io_uring_cqe *cqe;
  55. while (!io_uring_peek_cqe(&ring, &cqe))
  56. {
  57. struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
  58. if (d->callback)
  59. {
  60. // First free ring_data item, then call the callback
  61. // so it has at least 1 free slot for the next event
  62. // which is required for EPOLLET to function properly
  63. struct ring_data_t dl;
  64. dl.iov = d->iov;
  65. dl.res = cqe->res;
  66. dl.callback.swap(d->callback);
  67. free_ring_data[free_ring_data_ptr++] = d - ring_datas;
  68. dl.callback(&dl);
  69. }
  70. else
  71. free_ring_data[free_ring_data_ptr++] = d - ring_datas;
  72. io_uring_cqe_seen(&ring, cqe);
  73. }
  74. while (get_sqe_queue.size() > 0)
  75. {
  76. (get_sqe_queue[0].second)();
  77. get_sqe_queue.erase(get_sqe_queue.begin());
  78. }
  79. do
  80. {
  81. loop_again = false;
  82. for (int i = 0; i < consumers.size(); i++)
  83. {
  84. consumers[i]->loop();
  85. }
  86. } while (loop_again);
  87. }
  88. unsigned ring_loop_t::save()
  89. {
  90. return ring.sq.sqe_tail;
  91. }
  92. void ring_loop_t::restore(unsigned sqe_tail)
  93. {
  94. assert(ring.sq.sqe_tail >= sqe_tail);
  95. for (unsigned i = sqe_tail; i < ring.sq.sqe_tail; i++)
  96. {
  97. free_ring_data[free_ring_data_ptr++] = ((ring_data_t*)ring.sq.sqes[i & *ring.sq.kring_mask].user_data) - ring_datas;
  98. }
  99. ring.sq.sqe_tail = sqe_tail;
  100. }