Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

126 lines
3.8 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. #pragma once
  4. #include "messenger.h"
  5. #include "etcd_state_client.h"
  6. #define MIN_BLOCK_SIZE 4*1024
  7. #define MAX_BLOCK_SIZE 128*1024*1024
  8. #define DEFAULT_CLIENT_MAX_DIRTY_BYTES 32*1024*1024
  9. #define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024
  10. struct cluster_op_t;
  11. struct cluster_op_part_t
  12. {
  13. cluster_op_t *parent;
  14. uint64_t offset;
  15. uint32_t len;
  16. pg_num_t pg_num;
  17. osd_num_t osd_num;
  18. osd_op_buf_list_t iov;
  19. unsigned flags;
  20. osd_op_t op;
  21. };
  22. struct cluster_op_t
  23. {
  24. uint64_t opcode; // OSD_OP_READ, OSD_OP_WRITE, OSD_OP_SYNC
  25. uint64_t inode;
  26. uint64_t offset;
  27. uint64_t len;
  28. int retval;
  29. osd_op_buf_list_t iov;
  30. std::function<void(cluster_op_t*)> callback;
  31. ~cluster_op_t();
  32. protected:
  33. uint64_t flags = 0;
  34. int state = 0;
  35. uint64_t cur_inode; // for snapshot reads
  36. void *buf = NULL;
  37. cluster_op_t *orig_op = NULL;
  38. bool needs_reslice = false;
  39. bool up_wait = false;
  40. int inflight_count = 0, done_count = 0;
  41. std::vector<cluster_op_part_t> parts;
  42. void *bitmap_buf = NULL, *part_bitmaps = NULL;
  43. unsigned bitmap_buf_size = 0;
  44. cluster_op_t *prev = NULL, *next = NULL;
  45. int prev_wait = 0;
  46. friend class cluster_client_t;
  47. };
  48. struct cluster_buffer_t
  49. {
  50. void *buf;
  51. uint64_t len;
  52. int state;
  53. };
  54. // FIXME: Split into public and private interfaces
  55. class cluster_client_t
  56. {
  57. timerfd_manager_t *tfd;
  58. ring_loop_t *ringloop;
  59. uint64_t bs_block_size = 0;
  60. uint32_t bs_bitmap_granularity = 0, bs_bitmap_size = 0;
  61. std::map<pool_id_t, uint64_t> pg_counts;
  62. // WARNING: initially true so execute() doesn't create fake sync
  63. bool immediate_commit = true;
  64. // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
  65. uint64_t client_max_dirty_bytes = 0;
  66. uint64_t client_max_dirty_ops = 0;
  67. int log_level;
  68. int up_wait_retry_interval = 500; // ms
  69. int retry_timeout_id = 0;
  70. uint64_t op_id = 1;
  71. std::vector<cluster_op_t*> offline_ops;
  72. cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL;
  73. std::map<object_id, cluster_buffer_t> dirty_buffers;
  74. std::set<osd_num_t> dirty_osds;
  75. uint64_t dirty_bytes = 0, dirty_ops = 0;
  76. void *scrap_buffer = NULL;
  77. unsigned scrap_buffer_size = 0;
  78. bool pgs_loaded = false;
  79. ring_consumer_t consumer;
  80. std::vector<std::function<void(void)>> on_ready_hooks;
  81. int continuing_ops = 0;
  82. public:
  83. etcd_state_client_t st_cli;
  84. osd_messenger_t msgr;
  85. json11::Json config;
  86. cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
  87. ~cluster_client_t();
  88. void execute(cluster_op_t *op);
  89. bool is_ready();
  90. void on_ready(std::function<void(void)> fn);
  91. static void copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers);
  92. void continue_ops(bool up_retry = false);
  93. protected:
  94. bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
  95. void flush_buffer(const object_id & oid, cluster_buffer_t *wr);
  96. void on_load_config_hook(json11::Json::object & config);
  97. void on_load_pgs_hook(bool success);
  98. void on_change_hook(std::map<std::string, etcd_kv_t> & changes);
  99. void on_change_osd_state_hook(uint64_t peer_osd);
  100. int continue_rw(cluster_op_t *op);
  101. void slice_rw(cluster_op_t *op);
  102. bool try_send(cluster_op_t *op, int i);
  103. int continue_sync(cluster_op_t *op);
  104. void send_sync(cluster_op_t *op, cluster_op_part_t *part);
  105. void handle_op_part(cluster_op_part_t *part);
  106. void copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part);
  107. void erase_op(cluster_op_t *op);
  108. void calc_wait(cluster_op_t *op);
  109. void inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc);
  110. };