Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

311 lines
7.7 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #pragma once
  4. #include <sys/types.h>
  5. #include <stdint.h>
  6. #include <arpa/inet.h>
  7. #include <set>
  8. #include <map>
  9. #include <deque>
  10. #include <vector>
  11. #include "malloc_or_die.h"
  12. #include "json11/json11.hpp"
  13. #include "osd_ops.h"
  14. #include "timerfd_manager.h"
  15. #include "ringloop.h"
  16. #define OSD_OP_IN 0
  17. #define OSD_OP_OUT 1
  18. #define CL_READ_HDR 1
  19. #define CL_READ_DATA 2
  20. #define CL_READ_REPLY_DATA 3
  21. #define CL_WRITE_READY 1
  22. #define CL_WRITE_REPLY 2
  23. #define OSD_OP_INLINE_BUF_COUNT 16
  24. #define PEER_CONNECTING 1
  25. #define PEER_CONNECTED 2
  26. #define PEER_STOPPED 3
  27. #define DEFAULT_PEER_CONNECT_INTERVAL 5
  28. #define DEFAULT_PEER_CONNECT_TIMEOUT 5
  29. #define DEFAULT_BITMAP_GRANULARITY 4096
  30. // Kind of a vector with small-list-optimisation
  31. struct osd_op_buf_list_t
  32. {
  33. int count = 0, alloc = OSD_OP_INLINE_BUF_COUNT, done = 0;
  34. iovec *buf = NULL;
  35. iovec inline_buf[OSD_OP_INLINE_BUF_COUNT];
  36. inline osd_op_buf_list_t()
  37. {
  38. buf = inline_buf;
  39. }
  40. inline osd_op_buf_list_t(const osd_op_buf_list_t & other)
  41. {
  42. buf = inline_buf;
  43. append(other);
  44. }
  45. inline osd_op_buf_list_t & operator = (const osd_op_buf_list_t & other)
  46. {
  47. reset();
  48. append(other);
  49. return *this;
  50. }
  51. inline ~osd_op_buf_list_t()
  52. {
  53. if (buf && buf != inline_buf)
  54. {
  55. free(buf);
  56. }
  57. }
  58. inline void reset()
  59. {
  60. count = 0;
  61. done = 0;
  62. }
  63. inline iovec* get_iovec()
  64. {
  65. return buf + done;
  66. }
  67. inline int get_size()
  68. {
  69. return count - done;
  70. }
  71. inline void append(const osd_op_buf_list_t & other)
  72. {
  73. if (count+other.count > alloc)
  74. {
  75. if (buf == inline_buf)
  76. {
  77. int old = alloc;
  78. alloc = (((count+other.count+15)/16)*16);
  79. buf = (iovec*)malloc(sizeof(iovec) * alloc);
  80. if (!buf)
  81. {
  82. printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc);
  83. exit(1);
  84. }
  85. memcpy(buf, inline_buf, sizeof(iovec) * old);
  86. }
  87. else
  88. {
  89. alloc = (((count+other.count+15)/16)*16);
  90. buf = (iovec*)realloc(buf, sizeof(iovec) * alloc);
  91. if (!buf)
  92. {
  93. printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc);
  94. exit(1);
  95. }
  96. }
  97. }
  98. for (int i = 0; i < other.count; i++)
  99. {
  100. buf[count++] = other.buf[i];
  101. }
  102. }
  103. inline void push_back(void *nbuf, size_t len)
  104. {
  105. if (count >= alloc)
  106. {
  107. if (buf == inline_buf)
  108. {
  109. int old = alloc;
  110. alloc = ((alloc/16)*16 + 1);
  111. buf = (iovec*)malloc(sizeof(iovec) * alloc);
  112. if (!buf)
  113. {
  114. printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc);
  115. exit(1);
  116. }
  117. memcpy(buf, inline_buf, sizeof(iovec)*old);
  118. }
  119. else
  120. {
  121. alloc = alloc < 16 ? 16 : (alloc+16);
  122. buf = (iovec*)realloc(buf, sizeof(iovec) * alloc);
  123. if (!buf)
  124. {
  125. printf("Failed to allocate %lu bytes\n", sizeof(iovec) * alloc);
  126. exit(1);
  127. }
  128. }
  129. }
  130. buf[count++] = { .iov_base = nbuf, .iov_len = len };
  131. }
  132. inline void eat(int result)
  133. {
  134. while (result > 0 && done < count)
  135. {
  136. iovec & iov = buf[done];
  137. if (iov.iov_len <= result)
  138. {
  139. result -= iov.iov_len;
  140. done++;
  141. }
  142. else
  143. {
  144. iov.iov_len -= result;
  145. iov.iov_base += result;
  146. break;
  147. }
  148. }
  149. }
  150. };
  151. struct blockstore_op_t;
  152. struct osd_primary_op_data_t;
  153. struct osd_op_t
  154. {
  155. timespec tv_begin = { 0 }, tv_end = { 0 };
  156. uint64_t op_type = OSD_OP_IN;
  157. int peer_fd;
  158. osd_any_op_t req;
  159. osd_any_reply_t reply;
  160. blockstore_op_t *bs_op = NULL;
  161. void *buf = NULL;
  162. void *bitmap = NULL;
  163. unsigned bitmap_len = 0;
  164. unsigned bmp_data = 0;
  165. void *rmw_buf = NULL;
  166. osd_primary_op_data_t* op_data = NULL;
  167. std::function<void(osd_op_t*)> callback;
  168. osd_op_buf_list_t iov;
  169. ~osd_op_t();
  170. };
  171. struct osd_client_t
  172. {
  173. int refs = 0;
  174. sockaddr_in peer_addr;
  175. int peer_port;
  176. int peer_fd;
  177. int peer_state;
  178. int connect_timeout_id = -1;
  179. osd_num_t osd_num = 0;
  180. void *in_buf = NULL;
  181. // Read state
  182. int read_ready = 0;
  183. osd_op_t *read_op = NULL;
  184. iovec read_iov = { 0 };
  185. msghdr read_msg = { 0 };
  186. int read_remaining = 0;
  187. int read_state = 0;
  188. osd_op_buf_list_t recv_list;
  189. // Incoming operations
  190. std::vector<osd_op_t*> received_ops;
  191. // Outbound operations
  192. std::map<uint64_t, osd_op_t*> sent_ops;
  193. // PGs dirtied by this client's primary-writes
  194. std::set<pool_pg_num_t> dirty_pgs;
  195. // Write state
  196. msghdr write_msg = { 0 };
  197. int write_state = 0;
  198. std::vector<iovec> send_list, next_send_list;
  199. std::vector<osd_op_t*> outbox, next_outbox;
  200. };
  201. struct osd_wanted_peer_t
  202. {
  203. json11::Json address_list;
  204. int port;
  205. time_t last_connect_attempt;
  206. bool connecting, address_changed;
  207. int address_index;
  208. std::string cur_addr;
  209. int cur_port;
  210. };
  211. struct osd_op_stats_t
  212. {
  213. uint64_t op_stat_sum[OSD_OP_MAX+1] = { 0 };
  214. uint64_t op_stat_count[OSD_OP_MAX+1] = { 0 };
  215. uint64_t op_stat_bytes[OSD_OP_MAX+1] = { 0 };
  216. uint64_t subop_stat_sum[OSD_OP_MAX+1] = { 0 };
  217. uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 };
  218. };
  219. struct osd_messenger_t
  220. {
  221. timerfd_manager_t *tfd;
  222. ring_loop_t *ringloop;
  223. // osd_num_t is only for logging and asserts
  224. osd_num_t osd_num;
  225. // FIXME: make receive_buffer_size configurable
  226. int receive_buffer_size = 64*1024;
  227. int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
  228. int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
  229. int log_level = 0;
  230. bool use_sync_send_recv = false;
  231. std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
  232. std::map<uint64_t, int> osd_peer_fds;
  233. uint64_t next_subop_id = 1;
  234. std::map<int, osd_client_t*> clients;
  235. std::vector<int> read_ready_clients;
  236. std::vector<int> write_ready_clients;
  237. std::vector<std::function<void()>> set_immediate;
  238. // op statistics
  239. osd_op_stats_t stats;
  240. public:
  241. void connect_peer(uint64_t osd_num, json11::Json peer_state);
  242. void stop_client(int peer_fd);
  243. void outbox_push(osd_op_t *cur_op);
  244. std::function<void(osd_op_t*)> exec_op;
  245. std::function<void(osd_num_t)> repeer_pgs;
  246. void handle_peer_epoll(int peer_fd, int epoll_events);
  247. void read_requests();
  248. void send_replies();
  249. void accept_connections(int listen_fd);
  250. ~osd_messenger_t();
  251. protected:
  252. void try_connect_peer(uint64_t osd_num);
  253. void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port);
  254. void handle_connect_epoll(int peer_fd);
  255. void on_connect_peer(osd_num_t peer_osd, int peer_fd);
  256. void check_peer_config(osd_client_t *cl);
  257. void cancel_osd_ops(osd_client_t *cl);
  258. void cancel_op(osd_op_t *op);
  259. bool try_send(osd_client_t *cl);
  260. void measure_exec(osd_op_t *cur_op);
  261. void handle_send(int result, osd_client_t *cl);
  262. bool handle_read(int result, osd_client_t *cl);
  263. bool handle_finished_read(osd_client_t *cl);
  264. void handle_op_hdr(osd_client_t *cl);
  265. bool handle_reply_hdr(osd_client_t *cl);
  266. void handle_reply_ready(osd_op_t *op);
  267. };