Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

187 lines
5.3 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #pragma once
  4. #ifndef _LARGEFILE64_SOURCE
  5. #define _LARGEFILE64_SOURCE
  6. #endif
  7. #include <string.h>
  8. #include <assert.h>
  9. #include <liburing.h>
  10. #include <string>
  11. #include <functional>
  12. #include <vector>
  13. static inline void my_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd, const void *addr, unsigned len, off_t offset)
  14. {
  15. sqe->opcode = op;
  16. sqe->flags = 0;
  17. sqe->ioprio = 0;
  18. sqe->fd = fd;
  19. sqe->off = offset;
  20. sqe->addr = (unsigned long) addr;
  21. sqe->len = len;
  22. sqe->rw_flags = 0;
  23. sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
  24. }
  25. static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
  26. {
  27. my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
  28. }
  29. static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index)
  30. {
  31. my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
  32. sqe->buf_index = buf_index;
  33. }
  34. static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
  35. {
  36. my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);
  37. }
  38. static inline void my_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, off_t offset, int buf_index)
  39. {
  40. my_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset);
  41. sqe->buf_index = buf_index;
  42. }
  43. static inline void my_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd, struct msghdr *msg, unsigned flags)
  44. {
  45. my_uring_prep_rw(IORING_OP_RECVMSG, sqe, fd, msg, 1, 0);
  46. sqe->msg_flags = flags;
  47. }
  48. static inline void my_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd, const struct msghdr *msg, unsigned flags)
  49. {
  50. my_uring_prep_rw(IORING_OP_SENDMSG, sqe, fd, msg, 1, 0);
  51. sqe->msg_flags = flags;
  52. }
  53. static inline void my_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask)
  54. {
  55. my_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, NULL, 0, 0);
  56. sqe->poll_events = poll_mask;
  57. }
  58. static inline void my_uring_prep_poll_remove(struct io_uring_sqe *sqe, void *user_data)
  59. {
  60. my_uring_prep_rw(IORING_OP_POLL_REMOVE, sqe, 0, user_data, 0, 0);
  61. }
  62. static inline void my_uring_prep_fsync(struct io_uring_sqe *sqe, int fd, unsigned fsync_flags)
  63. {
  64. my_uring_prep_rw(IORING_OP_FSYNC, sqe, fd, NULL, 0, 0);
  65. sqe->fsync_flags = fsync_flags;
  66. }
  67. static inline void my_uring_prep_nop(struct io_uring_sqe *sqe)
  68. {
  69. my_uring_prep_rw(IORING_OP_NOP, sqe, 0, NULL, 0, 0);
  70. }
  71. static inline void my_uring_prep_timeout(struct io_uring_sqe *sqe, struct __kernel_timespec *ts, unsigned count, unsigned flags)
  72. {
  73. my_uring_prep_rw(IORING_OP_TIMEOUT, sqe, 0, ts, 1, count);
  74. sqe->timeout_flags = flags;
  75. }
  76. static inline void my_uring_prep_timeout_remove(struct io_uring_sqe *sqe, __u64 user_data, unsigned flags)
  77. {
  78. my_uring_prep_rw(IORING_OP_TIMEOUT_REMOVE, sqe, 0, (void *)user_data, 0, 0);
  79. sqe->timeout_flags = flags;
  80. }
  81. static inline void my_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
  82. {
  83. my_uring_prep_rw(IORING_OP_ACCEPT, sqe, fd, addr, 0, (__u64) addrlen);
  84. sqe->accept_flags = flags;
  85. }
  86. static inline void my_uring_prep_cancel(struct io_uring_sqe *sqe, void *user_data, int flags)
  87. {
  88. my_uring_prep_rw(IORING_OP_ASYNC_CANCEL, sqe, 0, user_data, 0, 0);
  89. sqe->cancel_flags = flags;
  90. }
  91. struct ring_data_t
  92. {
  93. struct iovec iov; // for single-entry read/write operations
  94. int res;
  95. std::function<void(ring_data_t*)> callback;
  96. };
  97. struct ring_consumer_t
  98. {
  99. std::function<void(void)> loop;
  100. };
  101. class ring_loop_t
  102. {
  103. std::vector<std::pair<int,std::function<void()>>> get_sqe_queue;
  104. std::vector<ring_consumer_t*> consumers;
  105. struct ring_data_t *ring_datas;
  106. int *free_ring_data;
  107. int wait_sqe_id;
  108. unsigned free_ring_data_ptr;
  109. bool loop_again;
  110. struct io_uring ring;
  111. public:
  112. ring_loop_t(int qd);
  113. ~ring_loop_t();
  114. void register_consumer(ring_consumer_t *consumer);
  115. void unregister_consumer(ring_consumer_t *consumer);
  116. inline struct io_uring_sqe* get_sqe()
  117. {
  118. if (free_ring_data_ptr == 0)
  119. return NULL;
  120. struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
  121. if (sqe)
  122. io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
  123. return sqe;
  124. }
  125. inline int wait_sqe(std::function<void()> cb)
  126. {
  127. get_sqe_queue.push_back({ wait_sqe_id, cb });
  128. return wait_sqe_id++;
  129. }
  130. inline void cancel_wait_sqe(int wait_id)
  131. {
  132. for (int i = 0; i < get_sqe_queue.size(); i++)
  133. {
  134. if (get_sqe_queue[i].first == wait_id)
  135. {
  136. get_sqe_queue.erase(get_sqe_queue.begin()+i, get_sqe_queue.begin()+i+1);
  137. }
  138. }
  139. }
  140. inline int submit()
  141. {
  142. return io_uring_submit(&ring);
  143. }
  144. inline int wait()
  145. {
  146. struct io_uring_cqe *cqe;
  147. return io_uring_wait_cqe(&ring, &cqe);
  148. }
  149. inline unsigned space_left()
  150. {
  151. return free_ring_data_ptr;
  152. }
  153. inline bool has_work()
  154. {
  155. return loop_again;
  156. }
  157. void loop();
  158. void wakeup();
  159. unsigned save();
  160. void restore(unsigned sqe_tail);
  161. };