Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

190 lines
5.4 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. #pragma once
  2. #ifndef _LARGEFILE64_SOURCE
  3. #define _LARGEFILE64_SOURCE
  4. #endif
  5. #include <stdio.h>
  6. #include <time.h>
  7. #include <string.h>
  8. #include <assert.h>
  9. #include <liburing.h>
  10. #include <functional>
  11. #include <vector>
  12. static inline void my_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd, const void *addr, unsigned len, off_t offset)
  13. {
  14. sqe->opcode = op;
  15. sqe->flags = 0;
  16. sqe->ioprio = 0;
  17. sqe->fd = fd;
  18. sqe->off = offset;
  19. sqe->addr = (unsigned long) addr;
  20. sqe->len = len;
  21. sqe->rw_flags = 0;
  22. sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
  23. }
  24. static inline void my_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
  25. {
  26. my_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset);
  27. }
  28. static inline void my_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset, int buf_index)
  29. {
  30. my_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset);
  31. sqe->buf_index = buf_index;
  32. }
  33. static inline void my_uring_prep_writev(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
  34. {
  35. my_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset);
  36. }
  37. static inline void my_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, off_t offset, int buf_index)
  38. {
  39. my_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset);
  40. sqe->buf_index = buf_index;
  41. }
  42. static inline void my_uring_prep_recvmsg(struct io_uring_sqe *sqe, int fd, struct msghdr *msg, unsigned flags)
  43. {
  44. my_uring_prep_rw(IORING_OP_RECVMSG, sqe, fd, msg, 1, 0);
  45. sqe->msg_flags = flags;
  46. }
  47. static inline void my_uring_prep_sendmsg(struct io_uring_sqe *sqe, int fd, const struct msghdr *msg, unsigned flags)
  48. {
  49. my_uring_prep_rw(IORING_OP_SENDMSG, sqe, fd, msg, 1, 0);
  50. sqe->msg_flags = flags;
  51. }
  52. static inline void my_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask)
  53. {
  54. my_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, NULL, 0, 0);
  55. sqe->poll_events = poll_mask;
  56. }
  57. static inline void my_uring_prep_poll_remove(struct io_uring_sqe *sqe, void *user_data)
  58. {
  59. my_uring_prep_rw(IORING_OP_POLL_REMOVE, sqe, 0, user_data, 0, 0);
  60. }
  61. static inline void my_uring_prep_fsync(struct io_uring_sqe *sqe, int fd, unsigned fsync_flags)
  62. {
  63. my_uring_prep_rw(IORING_OP_FSYNC, sqe, fd, NULL, 0, 0);
  64. sqe->fsync_flags = fsync_flags;
  65. }
  66. static inline void my_uring_prep_nop(struct io_uring_sqe *sqe)
  67. {
  68. my_uring_prep_rw(IORING_OP_NOP, sqe, 0, NULL, 0, 0);
  69. }
  70. static inline void my_uring_prep_timeout(struct io_uring_sqe *sqe, struct __kernel_timespec *ts, unsigned count, unsigned flags)
  71. {
  72. my_uring_prep_rw(IORING_OP_TIMEOUT, sqe, 0, ts, 1, count);
  73. sqe->timeout_flags = flags;
  74. }
  75. static inline void my_uring_prep_timeout_remove(struct io_uring_sqe *sqe, __u64 user_data, unsigned flags)
  76. {
  77. my_uring_prep_rw(IORING_OP_TIMEOUT_REMOVE, sqe, 0, (void *)user_data, 0, 0);
  78. sqe->timeout_flags = flags;
  79. }
  80. static inline void my_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)
  81. {
  82. my_uring_prep_rw(IORING_OP_ACCEPT, sqe, fd, addr, 0, (__u64) addrlen);
  83. sqe->accept_flags = flags;
  84. }
  85. static inline void my_uring_prep_cancel(struct io_uring_sqe *sqe, void *user_data, int flags)
  86. {
  87. my_uring_prep_rw(IORING_OP_ASYNC_CANCEL, sqe, 0, user_data, 0, 0);
  88. sqe->cancel_flags = flags;
  89. }
  90. struct ring_data_t
  91. {
  92. struct iovec iov; // for single-entry read/write operations
  93. int res;
  94. std::function<void(ring_data_t*)> callback;
  95. };
  96. struct ring_consumer_t
  97. {
  98. std::function<void(void)> loop;
  99. };
  100. class ring_loop_t
  101. {
  102. std::vector<std::pair<int,std::function<void()>>> get_sqe_queue;
  103. std::vector<ring_consumer_t*> consumers;
  104. struct ring_data_t *ring_datas;
  105. int *free_ring_data;
  106. int wait_sqe_id;
  107. unsigned free_ring_data_ptr;
  108. bool loop_again;
  109. struct io_uring ring;
  110. public:
  111. ring_loop_t(int qd);
  112. ~ring_loop_t();
  113. void register_consumer(ring_consumer_t *consumer);
  114. void unregister_consumer(ring_consumer_t *consumer);
  115. inline struct io_uring_sqe* get_sqe()
  116. {
  117. if (free_ring_data_ptr == 0)
  118. return NULL;
  119. struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
  120. if (sqe)
  121. io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]);
  122. return sqe;
  123. }
  124. inline int wait_sqe(std::function<void()> cb)
  125. {
  126. get_sqe_queue.push_back({ wait_sqe_id, cb });
  127. return wait_sqe_id++;
  128. }
  129. inline void cancel_wait_sqe(int wait_id)
  130. {
  131. for (int i = 0; i < get_sqe_queue.size(); i++)
  132. {
  133. if (get_sqe_queue[i].first == wait_id)
  134. {
  135. get_sqe_queue.erase(get_sqe_queue.begin()+i, get_sqe_queue.begin()+i+1);
  136. }
  137. }
  138. }
  139. inline int submit()
  140. {
  141. int r = io_uring_submit(&ring);
  142. {
  143. timespec now;
  144. clock_gettime(CLOCK_REALTIME, &now);
  145. printf("submit %s %d %ld.%06ld\n", __FILE__, __LINE__, now.tv_sec, now.tv_nsec/1000);
  146. }
  147. return r;
  148. }
  149. inline int wait()
  150. {
  151. struct io_uring_cqe *cqe;
  152. return io_uring_wait_cqe(&ring, &cqe);
  153. }
  154. inline unsigned space_left()
  155. {
  156. return free_ring_data_ptr;
  157. }
  158. inline bool has_work()
  159. {
  160. return loop_again;
  161. }
  162. void loop();
  163. void wakeup();
  164. unsigned save();
  165. void restore(unsigned sqe_tail);
  166. };