Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

334 lines
12 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #pragma once
  4. #include "blockstore.h"
  5. #include <sys/types.h>
  6. #include <sys/ioctl.h>
  7. #include <sys/stat.h>
  8. #include <fcntl.h>
  9. #include <unistd.h>
  10. #include <linux/fs.h>
  11. #include <vector>
  12. #include <list>
  13. #include <deque>
  14. #include <new>
  15. #include "cpp-btree/btree_map.h"
  16. #include "malloc_or_die.h"
  17. #include "allocator.h"
  18. //#define BLOCKSTORE_DEBUG
  19. // States are not stored on disk. Instead, they're deduced from the journal
  20. #define BS_ST_SMALL_WRITE 0x01
  21. #define BS_ST_BIG_WRITE 0x02
  22. #define BS_ST_DELETE 0x03
  23. #define BS_ST_WAIT_DEL 0x10
  24. #define BS_ST_WAIT_BIG 0x20
  25. #define BS_ST_IN_FLIGHT 0x30
  26. #define BS_ST_SUBMITTED 0x40
  27. #define BS_ST_WRITTEN 0x50
  28. #define BS_ST_SYNCED 0x60
  29. #define BS_ST_STABLE 0x70
  30. #define BS_ST_INSTANT 0x100
  31. #define IMMEDIATE_NONE 0
  32. #define IMMEDIATE_SMALL 1
  33. #define IMMEDIATE_ALL 2
  34. #define BS_ST_TYPE_MASK 0x0F
  35. #define BS_ST_WORKFLOW_MASK 0xF0
  36. #define IS_IN_FLIGHT(st) (((st) & 0xF0) <= BS_ST_SUBMITTED)
  37. #define IS_STABLE(st) (((st) & 0xF0) == BS_ST_STABLE)
  38. #define IS_SYNCED(st) (((st) & 0xF0) >= BS_ST_SYNCED)
  39. #define IS_JOURNAL(st) (((st) & 0x0F) == BS_ST_SMALL_WRITE)
  40. #define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE)
  41. #define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE)
  42. #define BS_SUBMIT_GET_SQE(sqe, data) \
  43. BS_SUBMIT_GET_ONLY_SQE(sqe); \
  44. struct ring_data_t *data = ((ring_data_t*)sqe->user_data)
  45. #define BS_SUBMIT_GET_ONLY_SQE(sqe) \
  46. struct io_uring_sqe *sqe = get_sqe();\
  47. if (!sqe)\
  48. {\
  49. /* Pause until there are more requests available */\
  50. PRIV(op)->wait_for = WAIT_SQE;\
  51. return 0;\
  52. }
  53. #define BS_SUBMIT_GET_SQE_DECL(sqe) \
  54. sqe = get_sqe();\
  55. if (!sqe)\
  56. {\
  57. /* Pause until there are more requests available */\
  58. PRIV(op)->wait_for = WAIT_SQE;\
  59. return 0;\
  60. }
  61. #include "blockstore_journal.h"
  62. // 32 bytes = 24 bytes + block bitmap (4 bytes by default) + external attributes (also bitmap, 4 bytes by default)
  63. // per "clean" entry on disk with fixed metadata tables
  64. // FIXME: maybe add crc32's to metadata
  65. struct __attribute__((__packed__)) clean_disk_entry
  66. {
  67. object_id oid;
  68. uint64_t version;
  69. uint8_t bitmap[];
  70. };
  71. // 32 = 16 + 16 bytes per "clean" entry in memory (object_id => clean_entry)
  72. struct __attribute__((__packed__)) clean_entry
  73. {
  74. uint64_t version;
  75. uint64_t location;
  76. };
  77. // 64 = 24 + 40 bytes per dirty entry in memory (obj_ver_id => dirty_entry)
  78. struct __attribute__((__packed__)) dirty_entry
  79. {
  80. uint32_t state;
  81. uint32_t flags; // unneeded, but present for alignment
  82. uint64_t location; // location in either journal or data -> in BYTES
  83. uint32_t offset; // data offset within object (stripe)
  84. uint32_t len; // data length
  85. uint64_t journal_sector; // journal sector used for this entry
  86. void* bitmap; // either external bitmap itself when it fits, or a pointer to it when it doesn't
  87. };
  88. // - Sync must be submitted after previous writes/deletes (not before!)
  89. // - Reads to the same object must be submitted after previous writes/deletes
  90. // are written (not necessarily synced) in their location. This is because we
  91. // rely on read-modify-write for erasure coding and we must return new data
  92. // to calculate parity for subsequent writes
  93. // - Writes may be submitted in any order, because they don't overlap. Each write
  94. // goes into a new location - either on the journal device or on the data device
  95. // - Stable (stabilize) must be submitted after sync of that object is completed
  96. // It's even OK to return an error to the caller if that object is not synced yet
  97. // - Journal trim may be processed only after all versions are moved to
  98. // the main storage AND after all read operations for older versions complete
  99. // - If an operation can not be submitted because the ring is full
  100. // we should stop submission of other operations. Otherwise some "scatter" reads
  101. // may end up blocked for a long time.
  102. // Otherwise, the submit order is free, that is all operations may be submitted immediately
  103. // In fact, adding a write operation must immediately result in dirty_db being populated
  104. // Suspend operation until there are more free SQEs
  105. #define WAIT_SQE 1
  106. // Suspend operation until there are <wait_detail> bytes of free space in the journal on disk
  107. #define WAIT_JOURNAL 3
  108. // Suspend operation until the next journal sector buffer is free
  109. #define WAIT_JOURNAL_BUFFER 4
  110. // Suspend operation until there is some free space on the data device
  111. #define WAIT_FREE 5
  112. struct fulfill_read_t
  113. {
  114. uint64_t offset, len;
  115. };
  116. #define PRIV(op) ((blockstore_op_private_t*)(op)->private_data)
  117. #define FINISH_OP(op) PRIV(op)->~blockstore_op_private_t(); std::function<void (blockstore_op_t*)>(op->callback)(op)
  118. struct blockstore_op_private_t
  119. {
  120. // Wait status
  121. int wait_for;
  122. uint64_t wait_detail;
  123. int pending_ops;
  124. int op_state;
  125. // Read
  126. std::vector<fulfill_read_t> read_vec;
  127. // Sync, write
  128. uint64_t min_flushed_journal_sector, max_flushed_journal_sector;
  129. // Write
  130. struct iovec iov_zerofill[3];
  131. // Warning: must not have a default value here because it's written to before calling constructor in blockstore_write.cpp O_o
  132. uint64_t real_version;
  133. // Sync
  134. std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
  135. int sync_small_checked, sync_big_checked;
  136. std::list<blockstore_op_t*>::iterator in_progress_ptr;
  137. int prev_sync_count;
  138. };
  139. // https://github.com/algorithm-ninja/cpp-btree
  140. // https://github.com/greg7mdp/sparsepp/ was used previously, but it was TERRIBLY slow after resizing
  141. // with sparsepp, random reads dropped to ~700 iops very fast with just as much as ~32k objects in the DB
  142. typedef btree::btree_map<object_id, clean_entry> blockstore_clean_db_t;
  143. typedef std::map<obj_ver_id, dirty_entry> blockstore_dirty_db_t;
  144. #include "blockstore_init.h"
  145. #include "blockstore_flush.h"
  146. class blockstore_impl_t
  147. {
  148. /******* OPTIONS *******/
  149. std::string data_device, meta_device, journal_device;
  150. uint32_t block_size;
  151. uint64_t meta_offset;
  152. uint64_t data_offset;
  153. uint64_t cfg_journal_size, cfg_data_size;
  154. // Required write alignment and journal/metadata/data areas' location alignment
  155. uint32_t disk_alignment = 4096;
  156. // Journal block size - minimum_io_size of the journal device is the best choice
  157. uint64_t journal_block_size = 4096;
  158. // Metadata block size - minimum_io_size of the metadata device is the best choice
  159. uint64_t meta_block_size = 4096;
  160. // Sparse write tracking granularity. 4 KB is a good choice. Must be a multiple of disk_alignment
  161. uint64_t bitmap_granularity = 4096;
  162. bool readonly = false;
  163. // By default, Blockstore locks all opened devices exclusively. This option can be used to disable locking
  164. bool disable_flock = false;
  165. // It is safe to disable fsync() if drive write cache is writethrough
  166. bool disable_data_fsync = false, disable_meta_fsync = false, disable_journal_fsync = false;
  167. // Enable if you want every operation to be executed with an "implicit fsync"
  168. // Suitable only for server SSDs with capacitors, requires disabled data and journal fsyncs
  169. int immediate_commit = IMMEDIATE_NONE;
  170. bool inmemory_meta = false;
  171. int flusher_count;
  172. /******* END OF OPTIONS *******/
  173. struct ring_consumer_t ring_consumer;
  174. blockstore_clean_db_t clean_db;
  175. uint8_t *clean_bitmap = NULL;
  176. blockstore_dirty_db_t dirty_db;
  177. std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here
  178. std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
  179. std::list<blockstore_op_t*> in_progress_syncs; // ...and probably here, too
  180. allocator *data_alloc = NULL;
  181. uint8_t *zero_object;
  182. uint32_t block_order;
  183. uint64_t block_count;
  184. uint32_t clean_entry_bitmap_size = 0, clean_entry_size = 0, entry_attr_size = 0;
  185. int meta_fd;
  186. int data_fd;
  187. uint64_t meta_size, meta_area, meta_len;
  188. uint64_t data_size, data_len;
  189. void *metadata_buffer = NULL;
  190. struct journal_t journal;
  191. journal_flusher_t *flusher;
  192. bool live = false, queue_stall = false;
  193. ring_loop_t *ringloop;
  194. bool stop_sync_submitted;
  195. inline struct io_uring_sqe* get_sqe()
  196. {
  197. return ringloop->get_sqe();
  198. }
  199. friend class blockstore_init_meta;
  200. friend class blockstore_init_journal;
  201. friend class blockstore_journal_check_t;
  202. friend class journal_flusher_t;
  203. friend class journal_flusher_co;
  204. void parse_config(blockstore_config_t & config);
  205. void calc_lengths();
  206. void open_data();
  207. void open_meta();
  208. void open_journal();
  209. uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset);
  210. // Asynchronous init
  211. int initialized;
  212. int metadata_buf_size;
  213. blockstore_init_meta* metadata_init_reader;
  214. blockstore_init_journal* journal_init_reader;
  215. void check_wait(blockstore_op_t *op);
  216. // Read
  217. int dequeue_read(blockstore_op_t *read_op);
  218. int fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
  219. uint32_t item_state, uint64_t item_version, uint64_t item_location);
  220. int fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
  221. uint32_t item_state, uint64_t item_version);
  222. void handle_read_event(ring_data_t *data, blockstore_op_t *op);
  223. // Write
  224. bool enqueue_write(blockstore_op_t *op);
  225. int dequeue_write(blockstore_op_t *op);
  226. int dequeue_del(blockstore_op_t *op);
  227. int continue_write(blockstore_op_t *op);
  228. void release_journal_sectors(blockstore_op_t *op);
  229. void handle_write_event(ring_data_t *data, blockstore_op_t *op);
  230. // Sync
  231. int dequeue_sync(blockstore_op_t *op);
  232. void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
  233. int continue_sync(blockstore_op_t *op);
  234. void ack_one_sync(blockstore_op_t *op);
  235. int ack_sync(blockstore_op_t *op);
  236. // Stabilize
  237. int dequeue_stable(blockstore_op_t *op);
  238. int continue_stable(blockstore_op_t *op);
  239. void mark_stable(const obj_ver_id & ov);
  240. void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
  241. void stabilize_object(object_id oid, uint64_t max_ver);
  242. // Rollback
  243. int dequeue_rollback(blockstore_op_t *op);
  244. int continue_rollback(blockstore_op_t *op);
  245. void mark_rolled_back(const obj_ver_id & ov);
  246. void handle_rollback_event(ring_data_t *data, blockstore_op_t *op);
  247. void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc);
  248. // List
  249. void process_list(blockstore_op_t *op);
  250. public:
  251. blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop);
  252. ~blockstore_impl_t();
  253. // Event loop
  254. void loop();
  255. // Returns true when blockstore is ready to process operations
  256. // (Although you're free to enqueue them before that)
  257. bool is_started();
  258. // Returns true when it's safe to destroy the instance. If destroying the instance
  259. // requires to purge some queues, starts that process. Should be called in the event
  260. // loop until it returns true.
  261. bool is_safe_to_stop();
  262. // Returns true if stalled
  263. bool is_stalled();
  264. // Submission
  265. void enqueue_op(blockstore_op_t *op, bool first = false);
  266. // Unstable writes are added here (map of object_id -> version)
  267. std::unordered_map<object_id, uint64_t> unstable_writes;
  268. inline uint32_t get_block_size() { return block_size; }
  269. inline uint64_t get_block_count() { return block_count; }
  270. inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); }
  271. inline uint32_t get_bitmap_granularity() { return disk_alignment; }
  272. };