Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

106 lines
3.1 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #pragma once
  4. #include "messenger.h"
  5. #include "etcd_state_client.h"
  6. #define MIN_BLOCK_SIZE 4*1024
  7. #define MAX_BLOCK_SIZE 128*1024*1024
  8. #define DEFAULT_CLIENT_DIRTY_LIMIT 32*1024*1024
  9. struct cluster_op_t;
  10. struct cluster_op_part_t
  11. {
  12. cluster_op_t *parent;
  13. uint64_t offset;
  14. uint32_t len;
  15. pg_num_t pg_num;
  16. osd_num_t osd_num;
  17. osd_op_buf_list_t iov;
  18. bool sent;
  19. bool done;
  20. osd_op_t op;
  21. };
  22. struct cluster_op_t
  23. {
  24. uint64_t opcode; // OSD_OP_READ, OSD_OP_WRITE, OSD_OP_SYNC
  25. uint64_t inode;
  26. uint64_t offset;
  27. uint64_t len;
  28. int retval;
  29. osd_op_buf_list_t iov;
  30. std::function<void(cluster_op_t*)> callback;
  31. protected:
  32. void *buf = NULL;
  33. cluster_op_t *orig_op = NULL;
  34. bool is_internal = false;
  35. bool needs_reslice = false;
  36. bool up_wait = false;
  37. int sent_count = 0, done_count = 0;
  38. std::vector<cluster_op_part_t> parts;
  39. friend class cluster_client_t;
  40. };
  41. class cluster_client_t
  42. {
  43. timerfd_manager_t *tfd;
  44. ring_loop_t *ringloop;
  45. uint64_t bs_block_size = 0;
  46. uint64_t bs_bitmap_granularity = 0;
  47. std::map<pool_id_t, uint64_t> pg_counts;
  48. bool immediate_commit = false;
  49. // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
  50. uint64_t client_dirty_limit = 0;
  51. int log_level;
  52. int up_wait_retry_interval = 500; // ms
  53. uint64_t op_id = 1;
  54. ring_consumer_t consumer;
  55. // operations currently in progress
  56. std::set<cluster_op_t*> cur_ops;
  57. int retry_timeout_id = 0;
  58. // unsynced operations are copied in memory to allow replay when cluster isn't in the immediate_commit mode
  59. // unsynced_writes are replayed in any order (because only the SYNC operation guarantees ordering)
  60. std::vector<cluster_op_t*> unsynced_writes;
  61. std::vector<cluster_op_t*> syncing_writes;
  62. cluster_op_t* cur_sync = NULL;
  63. std::vector<cluster_op_t*> next_writes;
  64. std::vector<cluster_op_t*> offline_ops;
  65. uint64_t queued_bytes = 0;
  66. void *scrap_bitmap = NULL;
  67. unsigned scrap_bitmap_size = 0;
  68. bool pgs_loaded = false;
  69. std::vector<std::function<void(void)>> on_ready_hooks;
  70. public:
  71. etcd_state_client_t st_cli;
  72. osd_messenger_t msgr;
  73. cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config);
  74. ~cluster_client_t();
  75. void execute(cluster_op_t *op);
  76. void on_ready(std::function<void(void)> fn);
  77. void stop();
  78. protected:
  79. void continue_ops(bool up_retry = false);
  80. void on_load_config_hook(json11::Json::object & config);
  81. void on_load_pgs_hook(bool success);
  82. void on_change_hook(json11::Json::object & changes);
  83. void on_change_osd_state_hook(uint64_t peer_osd);
  84. void continue_rw(cluster_op_t *op);
  85. void slice_rw(cluster_op_t *op);
  86. bool try_send(cluster_op_t *op, cluster_op_part_t *part);
  87. void execute_sync(cluster_op_t *op);
  88. void continue_sync();
  89. void finish_sync();
  90. void send_sync(cluster_op_t *op, cluster_op_part_t *part);
  91. void handle_op_part(cluster_op_part_t *part);
  92. };