Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

597 lines
19 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop)
  5. {
  6. assert(sizeof(blockstore_op_private_t) <= BS_OP_PRIVATE_DATA_SIZE);
  7. this->ringloop = ringloop;
  8. ring_consumer.loop = [this]() { loop(); };
  9. ringloop->register_consumer(&ring_consumer);
  10. initialized = 0;
  11. data_fd = meta_fd = journal.fd = -1;
  12. parse_config(config);
  13. zero_object = (uint8_t*)memalign_or_die(MEM_ALIGNMENT, block_size);
  14. try
  15. {
  16. open_data();
  17. open_meta();
  18. open_journal();
  19. calc_lengths();
  20. data_alloc = new allocator(block_count);
  21. }
  22. catch (std::exception & e)
  23. {
  24. if (data_fd >= 0)
  25. close(data_fd);
  26. if (meta_fd >= 0 && meta_fd != data_fd)
  27. close(meta_fd);
  28. if (journal.fd >= 0 && journal.fd != meta_fd)
  29. close(journal.fd);
  30. throw;
  31. }
  32. flusher = new journal_flusher_t(flusher_count, this);
  33. }
  34. blockstore_impl_t::~blockstore_impl_t()
  35. {
  36. delete data_alloc;
  37. delete flusher;
  38. free(zero_object);
  39. ringloop->unregister_consumer(&ring_consumer);
  40. if (data_fd >= 0)
  41. close(data_fd);
  42. if (meta_fd >= 0 && meta_fd != data_fd)
  43. close(meta_fd);
  44. if (journal.fd >= 0 && journal.fd != meta_fd)
  45. close(journal.fd);
  46. if (metadata_buffer)
  47. free(metadata_buffer);
  48. if (clean_bitmap)
  49. free(clean_bitmap);
  50. }
  51. bool blockstore_impl_t::is_started()
  52. {
  53. return initialized == 10;
  54. }
  55. bool blockstore_impl_t::is_stalled()
  56. {
  57. return queue_stall;
  58. }
  59. // main event loop - produce requests
  60. void blockstore_impl_t::loop()
  61. {
  62. // FIXME: initialized == 10 is ugly
  63. if (initialized != 10)
  64. {
  65. // read metadata, then journal
  66. if (initialized == 0)
  67. {
  68. metadata_init_reader = new blockstore_init_meta(this);
  69. initialized = 1;
  70. }
  71. if (initialized == 1)
  72. {
  73. int res = metadata_init_reader->loop();
  74. if (!res)
  75. {
  76. delete metadata_init_reader;
  77. metadata_init_reader = NULL;
  78. journal_init_reader = new blockstore_init_journal(this);
  79. initialized = 2;
  80. }
  81. }
  82. if (initialized == 2)
  83. {
  84. int res = journal_init_reader->loop();
  85. if (!res)
  86. {
  87. delete journal_init_reader;
  88. journal_init_reader = NULL;
  89. initialized = 10;
  90. ringloop->wakeup();
  91. }
  92. }
  93. }
  94. else
  95. {
  96. // try to submit ops
  97. unsigned initial_ring_space = ringloop->space_left();
  98. // FIXME: rework this "sync polling"
  99. auto cur_sync = in_progress_syncs.begin();
  100. while (cur_sync != in_progress_syncs.end())
  101. {
  102. if (continue_sync(*cur_sync) != 2)
  103. {
  104. // List is unmodified
  105. cur_sync++;
  106. }
  107. else
  108. {
  109. cur_sync = in_progress_syncs.begin();
  110. }
  111. }
  112. auto cur = submit_queue.begin();
  113. int has_writes = 0;
  114. while (cur != submit_queue.end())
  115. {
  116. auto op_ptr = cur;
  117. auto op = *(cur++);
  118. // FIXME: This needs some simplification
  119. // Writes should not block reads if the ring is not full and reads don't depend on them
  120. // In all other cases we should stop submission
  121. if (PRIV(op)->wait_for)
  122. {
  123. check_wait(op);
  124. if (PRIV(op)->wait_for == WAIT_SQE)
  125. {
  126. break;
  127. }
  128. else if (PRIV(op)->wait_for)
  129. {
  130. if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE)
  131. {
  132. has_writes = 2;
  133. }
  134. continue;
  135. }
  136. }
  137. unsigned ring_space = ringloop->space_left();
  138. unsigned prev_sqe_pos = ringloop->save();
  139. bool dequeue_op = false;
  140. if (op->opcode == BS_OP_READ)
  141. {
  142. dequeue_op = dequeue_read(op);
  143. }
  144. else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE)
  145. {
  146. if (has_writes == 2)
  147. {
  148. // Some writes already could not be submitted
  149. continue;
  150. }
  151. dequeue_op = dequeue_write(op);
  152. has_writes = dequeue_op ? 1 : 2;
  153. }
  154. else if (op->opcode == BS_OP_DELETE)
  155. {
  156. if (has_writes == 2)
  157. {
  158. // Some writes already could not be submitted
  159. continue;
  160. }
  161. dequeue_op = dequeue_del(op);
  162. has_writes = dequeue_op ? 1 : 2;
  163. }
  164. else if (op->opcode == BS_OP_SYNC)
  165. {
  166. // wait for all small writes to be submitted
  167. // wait for all big writes to complete, submit data device fsync
  168. // wait for the data device fsync to complete, then submit journal writes for big writes
  169. // then submit an fsync operation
  170. if (has_writes)
  171. {
  172. // Can't submit SYNC before previous writes
  173. continue;
  174. }
  175. dequeue_op = dequeue_sync(op);
  176. }
  177. else if (op->opcode == BS_OP_STABLE)
  178. {
  179. dequeue_op = dequeue_stable(op);
  180. }
  181. else if (op->opcode == BS_OP_ROLLBACK)
  182. {
  183. dequeue_op = dequeue_rollback(op);
  184. }
  185. else if (op->opcode == BS_OP_LIST)
  186. {
  187. // LIST doesn't need to be blocked by previous modifications,
  188. // it only needs to include all in-progress writes as they're guaranteed
  189. // to be readable and stabilizable/rollbackable by subsequent operations
  190. process_list(op);
  191. dequeue_op = true;
  192. }
  193. if (dequeue_op)
  194. {
  195. submit_queue.erase(op_ptr);
  196. }
  197. else
  198. {
  199. ringloop->restore(prev_sqe_pos);
  200. if (PRIV(op)->wait_for == WAIT_SQE)
  201. {
  202. PRIV(op)->wait_detail = 1 + ring_space;
  203. // ring is full, stop submission
  204. break;
  205. }
  206. }
  207. }
  208. if (!readonly)
  209. {
  210. flusher->loop();
  211. }
  212. int ret = ringloop->submit();
  213. if (ret < 0)
  214. {
  215. throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
  216. }
  217. if ((initial_ring_space - ringloop->space_left()) > 0)
  218. {
  219. live = true;
  220. }
  221. queue_stall = !live && !ringloop->has_work();
  222. live = false;
  223. }
  224. }
  225. bool blockstore_impl_t::is_safe_to_stop()
  226. {
  227. // It's safe to stop blockstore when there are no in-flight operations,
  228. // no in-progress syncs and flusher isn't doing anything
  229. if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active())
  230. {
  231. return false;
  232. }
  233. if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0)
  234. {
  235. if (!readonly && !stop_sync_submitted)
  236. {
  237. // We should sync the blockstore before unmounting
  238. blockstore_op_t *op = new blockstore_op_t;
  239. op->opcode = BS_OP_SYNC;
  240. op->buf = NULL;
  241. op->callback = [](blockstore_op_t *op)
  242. {
  243. delete op;
  244. };
  245. enqueue_op(op);
  246. stop_sync_submitted = true;
  247. }
  248. return false;
  249. }
  250. return true;
  251. }
  252. void blockstore_impl_t::check_wait(blockstore_op_t *op)
  253. {
  254. if (PRIV(op)->wait_for == WAIT_SQE)
  255. {
  256. if (ringloop->space_left() < PRIV(op)->wait_detail)
  257. {
  258. // stop submission if there's still no free space
  259. #ifdef BLOCKSTORE_DEBUG
  260. printf("Still waiting for %lu SQE(s)\n", PRIV(op)->wait_detail);
  261. #endif
  262. return;
  263. }
  264. PRIV(op)->wait_for = 0;
  265. }
  266. else if (PRIV(op)->wait_for == WAIT_JOURNAL)
  267. {
  268. if (journal.used_start == PRIV(op)->wait_detail)
  269. {
  270. // do not submit
  271. #ifdef BLOCKSTORE_DEBUG
  272. printf("Still waiting to flush journal offset %08lx\n", PRIV(op)->wait_detail);
  273. #endif
  274. return;
  275. }
  276. flusher->release_trim();
  277. PRIV(op)->wait_for = 0;
  278. }
  279. else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER)
  280. {
  281. int next = ((journal.cur_sector + 1) % journal.sector_count);
  282. if (journal.sector_info[next].usage_count > 0 ||
  283. journal.sector_info[next].dirty)
  284. {
  285. // do not submit
  286. #ifdef BLOCKSTORE_DEBUG
  287. printf("Still waiting for a journal buffer\n");
  288. #endif
  289. return;
  290. }
  291. PRIV(op)->wait_for = 0;
  292. }
  293. else if (PRIV(op)->wait_for == WAIT_FREE)
  294. {
  295. if (!data_alloc->get_free_count() && !flusher->is_active())
  296. {
  297. #ifdef BLOCKSTORE_DEBUG
  298. printf("Still waiting for free space on the data device\n");
  299. #endif
  300. return;
  301. }
  302. PRIV(op)->wait_for = 0;
  303. }
  304. else
  305. {
  306. throw std::runtime_error("BUG: op->wait_for value is unexpected");
  307. }
  308. }
  309. void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
  310. {
  311. if (op->opcode < BS_OP_MIN || op->opcode > BS_OP_MAX ||
  312. ((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) && (
  313. op->offset >= block_size ||
  314. op->len > block_size-op->offset ||
  315. (op->len % disk_alignment)
  316. )) ||
  317. readonly && op->opcode != BS_OP_READ && op->opcode != BS_OP_LIST ||
  318. first && (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE))
  319. {
  320. // Basic verification not passed
  321. op->retval = -EINVAL;
  322. std::function<void (blockstore_op_t*)>(op->callback)(op);
  323. return;
  324. }
  325. if (op->opcode == BS_OP_SYNC_STAB_ALL)
  326. {
  327. std::function<void(blockstore_op_t*)> *old_callback = new std::function<void(blockstore_op_t*)>(op->callback);
  328. op->opcode = BS_OP_SYNC;
  329. op->callback = [this, old_callback](blockstore_op_t *op)
  330. {
  331. if (op->retval >= 0 && unstable_writes.size() > 0)
  332. {
  333. op->opcode = BS_OP_STABLE;
  334. op->len = unstable_writes.size();
  335. obj_ver_id *vers = new obj_ver_id[op->len];
  336. op->buf = vers;
  337. int i = 0;
  338. for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++)
  339. {
  340. vers[i] = {
  341. .oid = it->first,
  342. .version = it->second,
  343. };
  344. }
  345. unstable_writes.clear();
  346. op->callback = [this, old_callback](blockstore_op_t *op)
  347. {
  348. obj_ver_id *vers = (obj_ver_id*)op->buf;
  349. delete[] vers;
  350. op->buf = NULL;
  351. (*old_callback)(op);
  352. delete old_callback;
  353. };
  354. this->enqueue_op(op);
  355. }
  356. else
  357. {
  358. (*old_callback)(op);
  359. delete old_callback;
  360. }
  361. };
  362. }
  363. if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
  364. {
  365. std::function<void (blockstore_op_t*)>(op->callback)(op);
  366. return;
  367. }
  368. if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL)
  369. {
  370. op->retval = 0;
  371. std::function<void (blockstore_op_t*)>(op->callback)(op);
  372. return;
  373. }
  374. // Call constructor without allocating memory. We'll call destructor before returning op back
  375. new ((void*)op->private_data) blockstore_op_private_t;
  376. PRIV(op)->wait_for = 0;
  377. PRIV(op)->op_state = 0;
  378. PRIV(op)->pending_ops = 0;
  379. if (!first)
  380. {
  381. submit_queue.push_back(op);
  382. }
  383. else
  384. {
  385. submit_queue.push_front(op);
  386. }
  387. ringloop->wakeup();
  388. }
  389. static bool replace_stable(object_id oid, uint64_t version, int search_start, int search_end, obj_ver_id* list)
  390. {
  391. while (search_start < search_end)
  392. {
  393. int pos = search_start+(search_end-search_start)/2;
  394. if (oid < list[pos].oid)
  395. {
  396. search_end = pos;
  397. }
  398. else if (list[pos].oid < oid)
  399. {
  400. search_start = pos+1;
  401. }
  402. else
  403. {
  404. list[pos].version = version;
  405. return true;
  406. }
  407. }
  408. return false;
  409. }
  410. void blockstore_impl_t::process_list(blockstore_op_t *op)
  411. {
  412. uint32_t list_pg = op->offset;
  413. uint32_t pg_count = op->len;
  414. uint64_t pg_stripe_size = op->oid.stripe;
  415. uint64_t min_inode = op->oid.inode;
  416. uint64_t max_inode = op->version;
  417. // Check PG
  418. if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg >= pg_count))
  419. {
  420. op->retval = -EINVAL;
  421. FINISH_OP(op);
  422. return;
  423. }
  424. // Copy clean_db entries (sorted)
  425. int stable_count = 0, stable_alloc = clean_db.size() / (pg_count ? pg_count : 1);
  426. obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc);
  427. if (!stable)
  428. {
  429. op->retval = -ENOMEM;
  430. FINISH_OP(op);
  431. return;
  432. }
  433. {
  434. auto clean_it = clean_db.begin(), clean_end = clean_db.end();
  435. if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
  436. {
  437. clean_it = clean_db.lower_bound({
  438. .inode = min_inode,
  439. .stripe = 0,
  440. });
  441. clean_end = clean_db.upper_bound({
  442. .inode = max_inode,
  443. .stripe = UINT64_MAX,
  444. });
  445. }
  446. for (; clean_it != clean_end; clean_it++)
  447. {
  448. if (!pg_count || ((clean_it->first.inode + clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg)
  449. {
  450. if (stable_count >= stable_alloc)
  451. {
  452. stable_alloc += 32768;
  453. stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
  454. if (!stable)
  455. {
  456. op->retval = -ENOMEM;
  457. FINISH_OP(op);
  458. return;
  459. }
  460. }
  461. stable[stable_count++] = {
  462. .oid = clean_it->first,
  463. .version = clean_it->second.version,
  464. };
  465. }
  466. }
  467. }
  468. int clean_stable_count = stable_count;
  469. // Copy dirty_db entries (sorted, too)
  470. int unstable_count = 0, unstable_alloc = 0;
  471. obj_ver_id *unstable = NULL;
  472. {
  473. auto dirty_it = dirty_db.begin(), dirty_end = dirty_db.end();
  474. if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
  475. {
  476. dirty_it = dirty_db.lower_bound({
  477. .oid = {
  478. .inode = min_inode,
  479. .stripe = 0,
  480. },
  481. .version = 0,
  482. });
  483. dirty_end = dirty_db.upper_bound({
  484. .oid = {
  485. .inode = max_inode,
  486. .stripe = UINT64_MAX,
  487. },
  488. .version = UINT64_MAX,
  489. });
  490. }
  491. for (; dirty_it != dirty_end; dirty_it++)
  492. {
  493. if (!pg_count || ((dirty_it->first.oid.inode + dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg)
  494. {
  495. if (IS_DELETE(dirty_it->second.state))
  496. {
  497. // Deletions are always stable, so try to zero out two possible entries
  498. if (!replace_stable(dirty_it->first.oid, 0, 0, clean_stable_count, stable))
  499. {
  500. replace_stable(dirty_it->first.oid, 0, clean_stable_count, stable_count, stable);
  501. }
  502. }
  503. else if (IS_STABLE(dirty_it->second.state))
  504. {
  505. // First try to replace a clean stable version in the first part of the list
  506. if (!replace_stable(dirty_it->first.oid, dirty_it->first.version, 0, clean_stable_count, stable))
  507. {
  508. // Then try to replace the last dirty stable version in the second part of the list
  509. if (stable_count > 0 && stable[stable_count-1].oid == dirty_it->first.oid)
  510. {
  511. stable[stable_count-1].version = dirty_it->first.version;
  512. }
  513. else
  514. {
  515. if (stable_count >= stable_alloc)
  516. {
  517. stable_alloc += 32768;
  518. stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
  519. if (!stable)
  520. {
  521. if (unstable)
  522. free(unstable);
  523. op->retval = -ENOMEM;
  524. FINISH_OP(op);
  525. return;
  526. }
  527. }
  528. stable[stable_count++] = dirty_it->first;
  529. }
  530. }
  531. }
  532. else
  533. {
  534. if (unstable_count >= unstable_alloc)
  535. {
  536. unstable_alloc += 32768;
  537. unstable = (obj_ver_id*)realloc(unstable, sizeof(obj_ver_id) * unstable_alloc);
  538. if (!unstable)
  539. {
  540. if (stable)
  541. free(stable);
  542. op->retval = -ENOMEM;
  543. FINISH_OP(op);
  544. return;
  545. }
  546. }
  547. unstable[unstable_count++] = dirty_it->first;
  548. }
  549. }
  550. }
  551. }
  552. // Remove zeroed out stable entries
  553. int j = 0;
  554. for (int i = 0; i < stable_count; i++)
  555. {
  556. if (stable[i].version != 0)
  557. {
  558. stable[j++] = stable[i];
  559. }
  560. }
  561. stable_count = j;
  562. if (stable_count+unstable_count > stable_alloc)
  563. {
  564. stable_alloc = stable_count+unstable_count;
  565. stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
  566. if (!stable)
  567. {
  568. if (unstable)
  569. free(unstable);
  570. op->retval = -ENOMEM;
  571. FINISH_OP(op);
  572. return;
  573. }
  574. }
  575. // Copy unstable entries
  576. for (int i = 0; i < unstable_count; i++)
  577. {
  578. stable[j++] = unstable[i];
  579. }
  580. free(unstable);
  581. op->version = stable_count;
  582. op->retval = stable_count+unstable_count;
  583. op->buf = stable;
  584. FINISH_OP(op);
  585. }