Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

230 lines
7.3 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #pragma once
  4. #include <sys/types.h>
  5. #include <time.h>
  6. #include <sys/ioctl.h>
  7. #include <sys/stat.h>
  8. #include <fcntl.h>
  9. #include <unistd.h>
  10. #include <malloc.h>
  11. #include <arpa/inet.h>
  12. #include <malloc.h>
  13. #include <set>
  14. #include <deque>
  15. #include "blockstore.h"
  16. #include "ringloop.h"
  17. #include "timerfd_manager.h"
  18. #include "epoll_manager.h"
  19. #include "osd_peering_pg.h"
  20. #include "messenger.h"
  21. #include "etcd_state_client.h"
  22. #define OSD_LOADING_PGS 0x01
  23. #define OSD_PEERING_PGS 0x04
  24. #define OSD_FLUSHING_PGS 0x08
  25. #define OSD_RECOVERING 0x10
  26. #define IMMEDIATE_NONE 0
  27. #define IMMEDIATE_SMALL 1
  28. #define IMMEDIATE_ALL 2
  29. #define MAX_AUTOSYNC_INTERVAL 3600
  30. #define DEFAULT_AUTOSYNC_INTERVAL 5
  31. #define MAX_RECOVERY_QUEUE 2048
  32. #define DEFAULT_RECOVERY_QUEUE 4
  33. //#define OSD_STUB
  34. struct osd_object_id_t
  35. {
  36. osd_num_t osd_num;
  37. object_id oid;
  38. };
  39. struct osd_recovery_op_t
  40. {
  41. int st = 0;
  42. bool degraded = false;
  43. object_id oid = { 0 };
  44. osd_op_t *osd_op = NULL;
  45. };
  46. class osd_t
  47. {
  48. // config
  49. blockstore_config_t config;
  50. int etcd_report_interval = 30;
  51. bool readonly = false;
  52. osd_num_t osd_num = 1; // OSD numbers start with 1
  53. bool run_primary = false;
  54. std::string bind_address;
  55. int bind_port, listen_backlog;
  56. // FIXME: Implement client queue depth limit
  57. int client_queue_depth = 128;
  58. bool allow_test_ops = true;
  59. int print_stats_interval = 3;
  60. int slow_log_interval = 10;
  61. int immediate_commit = IMMEDIATE_NONE;
  62. int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
  63. int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
  64. int log_level = 0;
  65. // cluster state
  66. etcd_state_client_t st_cli;
  67. osd_messenger_t c_cli;
  68. int etcd_failed_attempts = 0;
  69. std::string etcd_lease_id;
  70. json11::Json self_state;
  71. bool loading_peer_config = false;
  72. std::set<pool_pg_num_t> pg_state_dirty;
  73. bool pg_config_applied = false;
  74. bool etcd_reporting_pg_state = false;
  75. bool etcd_reporting_stats = false;
  76. // peers and PGs
  77. std::map<pool_id_t, pg_num_t> pg_counts;
  78. std::map<pool_pg_num_t, pg_t> pgs;
  79. std::set<pool_pg_num_t> dirty_pgs;
  80. std::set<osd_num_t> dirty_osds;
  81. uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
  82. int peering_state = 0;
  83. std::map<object_id, osd_recovery_op_t> recovery_ops;
  84. osd_op_t *autosync_op = NULL;
  85. // Unstable writes
  86. std::map<osd_object_id_t, uint64_t> unstable_writes;
  87. std::deque<osd_op_t*> syncs_in_progress;
  88. // client & peer I/O
  89. bool stopping = false;
  90. int inflight_ops = 0;
  91. blockstore_t *bs;
  92. uint32_t bs_block_size, bs_bitmap_granularity, entry_attr_size;
  93. ring_loop_t *ringloop;
  94. timerfd_manager_t *tfd = NULL;
  95. epoll_manager_t *epmgr = NULL;
  96. int listening_port = 0;
  97. int listen_fd = 0;
  98. ring_consumer_t consumer;
  99. // op statistics
  100. osd_op_stats_t prev_stats;
  101. const char* recovery_stat_names[2] = { "degraded", "misplaced" };
  102. uint64_t recovery_stat_count[2][2] = { 0 };
  103. uint64_t recovery_stat_bytes[2][2] = { 0 };
  104. // cluster connection
  105. void parse_config(blockstore_config_t & config);
  106. void init_cluster();
  107. void on_change_osd_state_hook(osd_num_t peer_osd);
  108. void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num);
  109. void on_change_etcd_state_hook(json11::Json::object & changes);
  110. void on_load_config_hook(json11::Json::object & changes);
  111. json11::Json on_load_pgs_checks_hook();
  112. void on_load_pgs_hook(bool success);
  113. void bind_socket();
  114. void acquire_lease();
  115. json11::Json get_osd_state();
  116. void create_osd_state();
  117. void renew_lease();
  118. void print_stats();
  119. void print_slow();
  120. void reset_stats();
  121. json11::Json get_statistics();
  122. void report_statistics();
  123. void report_pg_state(pg_t & pg);
  124. void report_pg_states();
  125. void apply_pg_count();
  126. void apply_pg_config();
  127. // event loop, socket read/write
  128. void loop();
  129. // peer handling (primary OSD logic)
  130. void parse_test_peer(std::string peer);
  131. void handle_peers();
  132. void repeer_pgs(osd_num_t osd_num);
  133. void start_pg_peering(pg_t & pg);
  134. void submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
  135. void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
  136. void discard_list_subop(osd_op_t *list_op);
  137. bool stop_pg(pg_t & pg);
  138. void finish_stop_pg(pg_t & pg);
  139. // flushing, recovery and backfill
  140. void submit_pg_flush_ops(pg_t & pg);
  141. void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval);
  142. void submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
  143. bool pick_next_recovery(osd_recovery_op_t &op);
  144. void submit_recovery_op(osd_recovery_op_t *op);
  145. bool continue_recovery();
  146. pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg);
  147. // op execution
  148. void exec_op(osd_op_t *cur_op);
  149. void finish_op(osd_op_t *cur_op, int retval);
  150. // secondary ops
  151. void exec_sync_stab_all(osd_op_t *cur_op);
  152. void exec_show_config(osd_op_t *cur_op);
  153. void exec_secondary(osd_op_t *cur_op);
  154. void secondary_op_callback(osd_op_t *cur_op);
  155. // primary ops
  156. void autosync();
  157. bool prepare_primary_rw(osd_op_t *cur_op);
  158. void continue_primary_read(osd_op_t *cur_op);
  159. void continue_primary_write(osd_op_t *cur_op);
  160. void cancel_primary_write(osd_op_t *cur_op);
  161. void continue_primary_sync(osd_op_t *cur_op);
  162. void continue_primary_del(osd_op_t *cur_op);
  163. bool check_write_queue(osd_op_t *cur_op, pg_t & pg);
  164. void remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t &pg);
  165. bool remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state);
  166. void handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op);
  167. void handle_primary_bs_subop(osd_op_t *subop);
  168. void add_bs_subop_stats(osd_op_t *subop);
  169. void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
  170. void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
  171. void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set);
  172. void submit_primary_sync_subops(osd_op_t *cur_op);
  173. void submit_primary_stab_subops(osd_op_t *cur_op);
  174. inline pg_num_t map_to_pg(object_id oid, uint64_t pg_stripe_size)
  175. {
  176. uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];
  177. if (!pg_count)
  178. pg_count = 1;
  179. return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1;
  180. }
  181. public:
  182. osd_t(blockstore_config_t & config, ring_loop_t *ringloop);
  183. ~osd_t();
  184. void force_stop(int exitcode);
  185. bool shutdown();
  186. };
  187. inline bool operator == (const osd_object_id_t & a, const osd_object_id_t & b)
  188. {
  189. return a.osd_num == b.osd_num && a.oid.inode == b.oid.inode && a.oid.stripe == b.oid.stripe;
  190. }
  191. inline bool operator < (const osd_object_id_t & a, const osd_object_id_t & b)
  192. {
  193. return a.osd_num < b.osd_num || a.osd_num == b.osd_num && (
  194. a.oid.inode < b.oid.inode || a.oid.inode == b.oid.inode && a.oid.stripe < b.oid.stripe
  195. );
  196. }