Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

288 lines
9.3 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #pragma once
  4. #include <sys/types.h>
  5. #include <time.h>
  6. #include <sys/ioctl.h>
  7. #include <sys/stat.h>
  8. #include <fcntl.h>
  9. #include <unistd.h>
  10. #include <malloc.h>
  11. #include <arpa/inet.h>
  12. #include <malloc.h>
  13. #include <set>
  14. #include <deque>
  15. #include "blockstore.h"
  16. #include "ringloop.h"
  17. #include "timerfd_manager.h"
  18. #include "epoll_manager.h"
  19. #include "osd_peering_pg.h"
  20. #include "messenger.h"
  21. #include "etcd_state_client.h"
  22. #define OSD_LOADING_PGS 0x01
  23. #define OSD_PEERING_PGS 0x04
  24. #define OSD_FLUSHING_PGS 0x08
  25. #define OSD_RECOVERING 0x10
  26. #define IMMEDIATE_NONE 0
  27. #define IMMEDIATE_SMALL 1
  28. #define IMMEDIATE_ALL 2
  29. #define MAX_AUTOSYNC_INTERVAL 3600
  30. #define DEFAULT_AUTOSYNC_INTERVAL 5
  31. #define MAX_RECOVERY_QUEUE 2048
  32. #define DEFAULT_RECOVERY_QUEUE 4
  33. #define DEFAULT_RECOVERY_BATCH 16
  34. //#define OSD_STUB
  35. struct osd_object_id_t
  36. {
  37. osd_num_t osd_num;
  38. object_id oid;
  39. };
  40. struct osd_recovery_op_t
  41. {
  42. int st = 0;
  43. bool degraded = false;
  44. object_id oid = { 0 };
  45. osd_op_t *osd_op = NULL;
  46. };
  47. // Posted as /osd/inodestats/$osd, then accumulated by the monitor
  48. #define INODE_STATS_READ 0
  49. #define INODE_STATS_WRITE 1
  50. #define INODE_STATS_DELETE 2
  51. struct inode_stats_t
  52. {
  53. uint64_t op_sum[3] = { 0 };
  54. uint64_t op_count[3] = { 0 };
  55. uint64_t op_bytes[3] = { 0 };
  56. };
  57. struct bitmap_request_t
  58. {
  59. osd_num_t osd_num;
  60. object_id oid;
  61. uint64_t version;
  62. void *bmp_buf;
  63. };
  64. inline bool operator < (const bitmap_request_t & a, const bitmap_request_t & b)
  65. {
  66. return a.osd_num < b.osd_num || a.osd_num == b.osd_num && a.oid < b.oid;
  67. }
  68. struct osd_chain_read_t
  69. {
  70. int chain_pos;
  71. inode_t inode;
  72. uint32_t offset, len;
  73. };
  74. struct osd_rmw_stripe_t;
  75. class osd_t
  76. {
  77. // config
  78. json11::Json::object config;
  79. int etcd_report_interval = 30;
  80. bool readonly = false;
  81. osd_num_t osd_num = 1; // OSD numbers start with 1
  82. bool run_primary = false;
  83. bool no_rebalance = false;
  84. bool no_recovery = false;
  85. std::string bind_address;
  86. int bind_port, listen_backlog;
  87. // FIXME: Implement client queue depth limit
  88. int client_queue_depth = 128;
  89. bool allow_test_ops = false;
  90. int print_stats_interval = 3;
  91. int slow_log_interval = 10;
  92. int immediate_commit = IMMEDIATE_NONE;
  93. int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
  94. int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
  95. int recovery_sync_batch = DEFAULT_RECOVERY_BATCH;
  96. int log_level = 0;
  97. // cluster state
  98. etcd_state_client_t st_cli;
  99. osd_messenger_t msgr;
  100. int etcd_failed_attempts = 0;
  101. std::string etcd_lease_id;
  102. json11::Json self_state;
  103. bool loading_peer_config = false;
  104. std::set<pool_pg_num_t> pg_state_dirty;
  105. bool pg_config_applied = false;
  106. bool etcd_reporting_pg_state = false;
  107. bool etcd_reporting_stats = false;
  108. // peers and PGs
  109. std::map<pool_id_t, pg_num_t> pg_counts;
  110. std::map<pool_pg_num_t, pg_t> pgs;
  111. std::set<pool_pg_num_t> dirty_pgs;
  112. std::set<osd_num_t> dirty_osds;
  113. int copies_to_delete_after_sync_count = 0;
  114. uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
  115. int peering_state = 0;
  116. std::map<object_id, osd_recovery_op_t> recovery_ops;
  117. int recovery_done = 0;
  118. osd_op_t *autosync_op = NULL;
  119. // Unstable writes
  120. std::map<osd_object_id_t, uint64_t> unstable_writes;
  121. std::deque<osd_op_t*> syncs_in_progress;
  122. // client & peer I/O
  123. bool stopping = false;
  124. int inflight_ops = 0;
  125. blockstore_t *bs;
  126. void *zero_buffer = NULL;
  127. uint64_t zero_buffer_size = 0;
  128. uint32_t bs_block_size, bs_bitmap_granularity, clean_entry_bitmap_size;
  129. ring_loop_t *ringloop;
  130. timerfd_manager_t *tfd = NULL;
  131. epoll_manager_t *epmgr = NULL;
  132. int listening_port = 0;
  133. int listen_fd = 0;
  134. ring_consumer_t consumer;
  135. // op statistics
  136. osd_op_stats_t prev_stats;
  137. std::map<uint64_t, inode_stats_t> inode_stats;
  138. const char* recovery_stat_names[2] = { "degraded", "misplaced" };
  139. uint64_t recovery_stat_count[2][2] = { 0 };
  140. uint64_t recovery_stat_bytes[2][2] = { 0 };
  141. // cluster connection
  142. void parse_config(const json11::Json & config);
  143. void init_cluster();
  144. void on_change_osd_state_hook(osd_num_t peer_osd);
  145. void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num);
  146. void on_change_etcd_state_hook(std::map<std::string, etcd_kv_t> & changes);
  147. void on_load_config_hook(json11::Json::object & changes);
  148. json11::Json on_load_pgs_checks_hook();
  149. void on_load_pgs_hook(bool success);
  150. void bind_socket();
  151. void acquire_lease();
  152. json11::Json get_osd_state();
  153. void create_osd_state();
  154. void renew_lease();
  155. void print_stats();
  156. void print_slow();
  157. void reset_stats();
  158. json11::Json get_statistics();
  159. void report_statistics();
  160. void report_pg_state(pg_t & pg);
  161. void report_pg_states();
  162. void apply_pg_count();
  163. void apply_pg_config();
  164. // event loop, socket read/write
  165. void loop();
  166. // peer handling (primary OSD logic)
  167. void parse_test_peer(std::string peer);
  168. void handle_peers();
  169. void repeer_pgs(osd_num_t osd_num);
  170. void start_pg_peering(pg_t & pg);
  171. void submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
  172. void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
  173. void discard_list_subop(osd_op_t *list_op);
  174. bool stop_pg(pg_t & pg);
  175. void reset_pg(pg_t & pg);
  176. void finish_stop_pg(pg_t & pg);
  177. // flushing, recovery and backfill
  178. void submit_pg_flush_ops(pg_t & pg);
  179. void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval);
  180. void submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
  181. bool pick_next_recovery(osd_recovery_op_t &op);
  182. void submit_recovery_op(osd_recovery_op_t *op);
  183. bool continue_recovery();
  184. pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg);
  185. // op execution
  186. void exec_op(osd_op_t *cur_op);
  187. void finish_op(osd_op_t *cur_op, int retval);
  188. // secondary ops
  189. void exec_sync_stab_all(osd_op_t *cur_op);
  190. void exec_show_config(osd_op_t *cur_op);
  191. void exec_secondary(osd_op_t *cur_op);
  192. void secondary_op_callback(osd_op_t *cur_op);
  193. // primary ops
  194. void autosync();
  195. bool prepare_primary_rw(osd_op_t *cur_op);
  196. void continue_primary_read(osd_op_t *cur_op);
  197. void continue_primary_write(osd_op_t *cur_op);
  198. void cancel_primary_write(osd_op_t *cur_op);
  199. void continue_primary_sync(osd_op_t *cur_op);
  200. void continue_primary_del(osd_op_t *cur_op);
  201. bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
  202. void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
  203. void free_object_state(pg_t & pg, pg_osd_set_state_t **object_state);
  204. bool remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
  205. void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op);
  206. void handle_primary_bs_subop(osd_op_t *subop);
  207. void add_bs_subop_stats(osd_op_t *subop);
  208. void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
  209. void submit_primary_subops(int submit_type, uint64_t op_version, const uint64_t* osd_set, osd_op_t *cur_op);
  210. int submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t op_version,
  211. osd_rmw_stripe_t *stripes, const uint64_t* osd_set, osd_op_t *cur_op, int subop_idx, int zero_read);
  212. void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set);
  213. void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count);
  214. int submit_primary_sync_subops(osd_op_t *cur_op);
  215. void submit_primary_stab_subops(osd_op_t *cur_op);
  216. uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state);
  217. void continue_chained_read(osd_op_t *cur_op);
  218. int submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op);
  219. void send_chained_read_results(pg_t & pg, osd_op_t *cur_op);
  220. std::vector<osd_chain_read_t> collect_chained_read_requests(osd_op_t *cur_op);
  221. int collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector<bitmap_request_t> & bitmap_requests);
  222. int submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg);
  223. int read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state);
  224. inline pg_num_t map_to_pg(object_id oid, uint64_t pg_stripe_size)
  225. {
  226. uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];
  227. if (!pg_count)
  228. pg_count = 1;
  229. return (oid.stripe / pg_stripe_size) % pg_count + 1;
  230. }
  231. public:
  232. osd_t(const json11::Json & config, ring_loop_t *ringloop);
  233. ~osd_t();
  234. void force_stop(int exitcode);
  235. bool shutdown();
  236. };
  237. inline bool operator == (const osd_object_id_t & a, const osd_object_id_t & b)
  238. {
  239. return a.osd_num == b.osd_num && a.oid.inode == b.oid.inode && a.oid.stripe == b.oid.stripe;
  240. }
  241. inline bool operator < (const osd_object_id_t & a, const osd_object_id_t & b)
  242. {
  243. return a.osd_num < b.osd_num || a.osd_num == b.osd_num && (
  244. a.oid.inode < b.oid.inode || a.oid.inode == b.oid.inode && a.oid.stripe < b.oid.stripe
  245. );
  246. }