Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

598 lines
19 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd)
  5. {
  6. assert(sizeof(blockstore_op_private_t) <= BS_OP_PRIVATE_DATA_SIZE);
  7. this->tfd = tfd;
  8. this->ringloop = ringloop;
  9. ring_consumer.loop = [this]() { loop(); };
  10. ringloop->register_consumer(&ring_consumer);
  11. initialized = 0;
  12. data_fd = meta_fd = journal.fd = -1;
  13. parse_config(config);
  14. zero_object = (uint8_t*)memalign_or_die(MEM_ALIGNMENT, block_size);
  15. try
  16. {
  17. open_data();
  18. open_meta();
  19. open_journal();
  20. calc_lengths();
  21. data_alloc = new allocator(block_count);
  22. }
  23. catch (std::exception & e)
  24. {
  25. if (data_fd >= 0)
  26. close(data_fd);
  27. if (meta_fd >= 0 && meta_fd != data_fd)
  28. close(meta_fd);
  29. if (journal.fd >= 0 && journal.fd != meta_fd)
  30. close(journal.fd);
  31. throw;
  32. }
  33. flusher = new journal_flusher_t(this);
  34. }
  35. blockstore_impl_t::~blockstore_impl_t()
  36. {
  37. delete data_alloc;
  38. delete flusher;
  39. free(zero_object);
  40. ringloop->unregister_consumer(&ring_consumer);
  41. if (data_fd >= 0)
  42. close(data_fd);
  43. if (meta_fd >= 0 && meta_fd != data_fd)
  44. close(meta_fd);
  45. if (journal.fd >= 0 && journal.fd != meta_fd)
  46. close(journal.fd);
  47. if (metadata_buffer)
  48. free(metadata_buffer);
  49. if (clean_bitmap)
  50. free(clean_bitmap);
  51. }
  52. bool blockstore_impl_t::is_started()
  53. {
  54. return initialized == 10;
  55. }
  56. bool blockstore_impl_t::is_stalled()
  57. {
  58. return queue_stall;
  59. }
  60. // main event loop - produce requests
  61. void blockstore_impl_t::loop()
  62. {
  63. // FIXME: initialized == 10 is ugly
  64. if (initialized != 10)
  65. {
  66. // read metadata, then journal
  67. if (initialized == 0)
  68. {
  69. metadata_init_reader = new blockstore_init_meta(this);
  70. initialized = 1;
  71. }
  72. if (initialized == 1)
  73. {
  74. int res = metadata_init_reader->loop();
  75. if (!res)
  76. {
  77. delete metadata_init_reader;
  78. metadata_init_reader = NULL;
  79. journal_init_reader = new blockstore_init_journal(this);
  80. initialized = 2;
  81. }
  82. }
  83. if (initialized == 2)
  84. {
  85. int res = journal_init_reader->loop();
  86. if (!res)
  87. {
  88. delete journal_init_reader;
  89. journal_init_reader = NULL;
  90. if (journal.flush_journal)
  91. initialized = 3;
  92. else
  93. initialized = 10;
  94. ringloop->wakeup();
  95. }
  96. }
  97. if (initialized == 3)
  98. {
  99. if (readonly)
  100. {
  101. printf("Can't flush the journal in readonly mode\n");
  102. exit(1);
  103. }
  104. flusher->loop();
  105. ringloop->submit();
  106. }
  107. }
  108. else
  109. {
  110. // try to submit ops
  111. unsigned initial_ring_space = ringloop->space_left();
  112. // has_writes == 0 - no writes before the current queue item
  113. // has_writes == 1 - some writes in progress
  114. // has_writes == 2 - tried to submit some writes, but failed
  115. int has_writes = 0, op_idx = 0, new_idx = 0;
  116. for (; op_idx < submit_queue.size(); op_idx++, new_idx++)
  117. {
  118. auto op = submit_queue[op_idx];
  119. submit_queue[new_idx] = op;
  120. // FIXME: This needs some simplification
  121. // Writes should not block reads if the ring is not full and reads don't depend on them
  122. // In all other cases we should stop submission
  123. if (PRIV(op)->wait_for)
  124. {
  125. check_wait(op);
  126. if (PRIV(op)->wait_for == WAIT_SQE)
  127. {
  128. break;
  129. }
  130. else if (PRIV(op)->wait_for)
  131. {
  132. if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE)
  133. {
  134. has_writes = 2;
  135. }
  136. continue;
  137. }
  138. }
  139. unsigned ring_space = ringloop->space_left();
  140. unsigned prev_sqe_pos = ringloop->save();
  141. // 0 = can't submit
  142. // 1 = in progress
  143. // 2 = can be removed from queue
  144. int wr_st = 0;
  145. if (op->opcode == BS_OP_READ)
  146. {
  147. wr_st = dequeue_read(op);
  148. }
  149. else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE)
  150. {
  151. if (has_writes == 2)
  152. {
  153. // Some writes already could not be submitted
  154. continue;
  155. }
  156. wr_st = dequeue_write(op);
  157. has_writes = wr_st > 0 ? 1 : 2;
  158. }
  159. else if (op->opcode == BS_OP_DELETE)
  160. {
  161. if (has_writes == 2)
  162. {
  163. // Some writes already could not be submitted
  164. continue;
  165. }
  166. wr_st = dequeue_del(op);
  167. has_writes = wr_st > 0 ? 1 : 2;
  168. }
  169. else if (op->opcode == BS_OP_SYNC)
  170. {
  171. // wait for all small writes to be submitted
  172. // wait for all big writes to complete, submit data device fsync
  173. // wait for the data device fsync to complete, then submit journal writes for big writes
  174. // then submit an fsync operation
  175. if (has_writes)
  176. {
  177. // Can't submit SYNC before previous writes
  178. continue;
  179. }
  180. wr_st = continue_sync(op, false);
  181. if (wr_st != 2)
  182. {
  183. has_writes = wr_st > 0 ? 1 : 2;
  184. }
  185. }
  186. else if (op->opcode == BS_OP_STABLE)
  187. {
  188. wr_st = dequeue_stable(op);
  189. }
  190. else if (op->opcode == BS_OP_ROLLBACK)
  191. {
  192. wr_st = dequeue_rollback(op);
  193. }
  194. else if (op->opcode == BS_OP_LIST)
  195. {
  196. // LIST doesn't need to be blocked by previous modifications
  197. process_list(op);
  198. wr_st = 2;
  199. }
  200. if (wr_st == 2)
  201. {
  202. new_idx--;
  203. }
  204. if (wr_st == 0)
  205. {
  206. ringloop->restore(prev_sqe_pos);
  207. if (PRIV(op)->wait_for == WAIT_SQE)
  208. {
  209. PRIV(op)->wait_detail = 1 + ring_space;
  210. // ring is full, stop submission
  211. break;
  212. }
  213. }
  214. }
  215. if (op_idx != new_idx)
  216. {
  217. while (op_idx < submit_queue.size())
  218. {
  219. submit_queue[new_idx++] = submit_queue[op_idx++];
  220. }
  221. submit_queue.resize(new_idx);
  222. }
  223. if (!readonly)
  224. {
  225. flusher->loop();
  226. }
  227. int ret = ringloop->submit();
  228. if (ret < 0)
  229. {
  230. throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
  231. }
  232. if ((initial_ring_space - ringloop->space_left()) > 0)
  233. {
  234. live = true;
  235. }
  236. queue_stall = !live && !ringloop->has_work();
  237. live = false;
  238. }
  239. }
  240. bool blockstore_impl_t::is_safe_to_stop()
  241. {
  242. // It's safe to stop blockstore when there are no in-flight operations,
  243. // no in-progress syncs and flusher isn't doing anything
  244. if (submit_queue.size() > 0 || !readonly && flusher->is_active())
  245. {
  246. return false;
  247. }
  248. if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0)
  249. {
  250. if (!readonly && !stop_sync_submitted)
  251. {
  252. // We should sync the blockstore before unmounting
  253. blockstore_op_t *op = new blockstore_op_t;
  254. op->opcode = BS_OP_SYNC;
  255. op->buf = NULL;
  256. op->callback = [](blockstore_op_t *op)
  257. {
  258. delete op;
  259. };
  260. enqueue_op(op);
  261. stop_sync_submitted = true;
  262. }
  263. return false;
  264. }
  265. return true;
  266. }
  267. void blockstore_impl_t::check_wait(blockstore_op_t *op)
  268. {
  269. if (PRIV(op)->wait_for == WAIT_SQE)
  270. {
  271. if (ringloop->space_left() < PRIV(op)->wait_detail)
  272. {
  273. // stop submission if there's still no free space
  274. #ifdef BLOCKSTORE_DEBUG
  275. printf("Still waiting for %lu SQE(s)\n", PRIV(op)->wait_detail);
  276. #endif
  277. return;
  278. }
  279. PRIV(op)->wait_for = 0;
  280. }
  281. else if (PRIV(op)->wait_for == WAIT_JOURNAL)
  282. {
  283. if (journal.used_start == PRIV(op)->wait_detail)
  284. {
  285. // do not submit
  286. #ifdef BLOCKSTORE_DEBUG
  287. printf("Still waiting to flush journal offset %08lx\n", PRIV(op)->wait_detail);
  288. #endif
  289. return;
  290. }
  291. flusher->release_trim();
  292. PRIV(op)->wait_for = 0;
  293. }
  294. else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER)
  295. {
  296. int next = ((journal.cur_sector + 1) % journal.sector_count);
  297. if (journal.sector_info[next].flush_count > 0 ||
  298. journal.sector_info[next].dirty)
  299. {
  300. // do not submit
  301. #ifdef BLOCKSTORE_DEBUG
  302. printf("Still waiting for a journal buffer\n");
  303. #endif
  304. return;
  305. }
  306. PRIV(op)->wait_for = 0;
  307. }
  308. else if (PRIV(op)->wait_for == WAIT_FREE)
  309. {
  310. if (!data_alloc->get_free_count() && flusher->is_active())
  311. {
  312. #ifdef BLOCKSTORE_DEBUG
  313. printf("Still waiting for free space on the data device\n");
  314. #endif
  315. return;
  316. }
  317. PRIV(op)->wait_for = 0;
  318. }
  319. else
  320. {
  321. throw std::runtime_error("BUG: op->wait_for value is unexpected");
  322. }
  323. }
  324. void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
  325. {
  326. if (op->opcode < BS_OP_MIN || op->opcode > BS_OP_MAX ||
  327. ((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) && (
  328. op->offset >= block_size ||
  329. op->len > block_size-op->offset ||
  330. (op->len % disk_alignment)
  331. )) ||
  332. readonly && op->opcode != BS_OP_READ && op->opcode != BS_OP_LIST)
  333. {
  334. // Basic verification not passed
  335. op->retval = -EINVAL;
  336. std::function<void (blockstore_op_t*)>(op->callback)(op);
  337. return;
  338. }
  339. if (op->opcode == BS_OP_SYNC_STAB_ALL)
  340. {
  341. std::function<void(blockstore_op_t*)> *old_callback = new std::function<void(blockstore_op_t*)>(op->callback);
  342. op->opcode = BS_OP_SYNC;
  343. op->callback = [this, old_callback](blockstore_op_t *op)
  344. {
  345. if (op->retval >= 0 && unstable_writes.size() > 0)
  346. {
  347. op->opcode = BS_OP_STABLE;
  348. op->len = unstable_writes.size();
  349. obj_ver_id *vers = new obj_ver_id[op->len];
  350. op->buf = vers;
  351. int i = 0;
  352. for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++)
  353. {
  354. vers[i] = {
  355. .oid = it->first,
  356. .version = it->second,
  357. };
  358. }
  359. unstable_writes.clear();
  360. op->callback = [this, old_callback](blockstore_op_t *op)
  361. {
  362. obj_ver_id *vers = (obj_ver_id*)op->buf;
  363. delete[] vers;
  364. op->buf = NULL;
  365. (*old_callback)(op);
  366. delete old_callback;
  367. };
  368. this->enqueue_op(op);
  369. }
  370. else
  371. {
  372. (*old_callback)(op);
  373. delete old_callback;
  374. }
  375. };
  376. }
  377. if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
  378. {
  379. std::function<void (blockstore_op_t*)>(op->callback)(op);
  380. return;
  381. }
  382. // Call constructor without allocating memory. We'll call destructor before returning op back
  383. new ((void*)op->private_data) blockstore_op_private_t;
  384. PRIV(op)->wait_for = 0;
  385. PRIV(op)->op_state = 0;
  386. PRIV(op)->pending_ops = 0;
  387. submit_queue.push_back(op);
  388. ringloop->wakeup();
  389. }
  390. static bool replace_stable(object_id oid, uint64_t version, int search_start, int search_end, obj_ver_id* list)
  391. {
  392. while (search_start < search_end)
  393. {
  394. int pos = search_start+(search_end-search_start)/2;
  395. if (oid < list[pos].oid)
  396. {
  397. search_end = pos;
  398. }
  399. else if (list[pos].oid < oid)
  400. {
  401. search_start = pos+1;
  402. }
  403. else
  404. {
  405. list[pos].version = version;
  406. return true;
  407. }
  408. }
  409. return false;
  410. }
  411. void blockstore_impl_t::process_list(blockstore_op_t *op)
  412. {
  413. uint32_t list_pg = op->offset;
  414. uint32_t pg_count = op->len;
  415. uint64_t pg_stripe_size = op->oid.stripe;
  416. uint64_t min_inode = op->oid.inode;
  417. uint64_t max_inode = op->version;
  418. // Check PG
  419. if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg >= pg_count))
  420. {
  421. op->retval = -EINVAL;
  422. FINISH_OP(op);
  423. return;
  424. }
  425. // Copy clean_db entries (sorted)
  426. int stable_count = 0, stable_alloc = clean_db.size() / (pg_count ? pg_count : 1);
  427. obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc);
  428. if (!stable)
  429. {
  430. op->retval = -ENOMEM;
  431. FINISH_OP(op);
  432. return;
  433. }
  434. {
  435. auto clean_it = clean_db.begin(), clean_end = clean_db.end();
  436. if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
  437. {
  438. clean_it = clean_db.lower_bound({
  439. .inode = min_inode,
  440. .stripe = 0,
  441. });
  442. clean_end = clean_db.upper_bound({
  443. .inode = max_inode,
  444. .stripe = UINT64_MAX,
  445. });
  446. }
  447. for (; clean_it != clean_end; clean_it++)
  448. {
  449. if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
  450. {
  451. if (stable_count >= stable_alloc)
  452. {
  453. stable_alloc += 32768;
  454. stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
  455. if (!stable)
  456. {
  457. op->retval = -ENOMEM;
  458. FINISH_OP(op);
  459. return;
  460. }
  461. }
  462. stable[stable_count++] = {
  463. .oid = clean_it->first,
  464. .version = clean_it->second.version,
  465. };
  466. }
  467. }
  468. }
  469. int clean_stable_count = stable_count;
  470. // Copy dirty_db entries (sorted, too)
  471. int unstable_count = 0, unstable_alloc = 0;
  472. obj_ver_id *unstable = NULL;
  473. {
  474. auto dirty_it = dirty_db.begin(), dirty_end = dirty_db.end();
  475. if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
  476. {
  477. dirty_it = dirty_db.lower_bound({
  478. .oid = {
  479. .inode = min_inode,
  480. .stripe = 0,
  481. },
  482. .version = 0,
  483. });
  484. dirty_end = dirty_db.upper_bound({
  485. .oid = {
  486. .inode = max_inode,
  487. .stripe = UINT64_MAX,
  488. },
  489. .version = UINT64_MAX,
  490. });
  491. }
  492. for (; dirty_it != dirty_end; dirty_it++)
  493. {
  494. if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
  495. {
  496. if (IS_DELETE(dirty_it->second.state))
  497. {
  498. // Deletions are always stable, so try to zero out two possible entries
  499. if (!replace_stable(dirty_it->first.oid, 0, 0, clean_stable_count, stable))
  500. {
  501. replace_stable(dirty_it->first.oid, 0, clean_stable_count, stable_count, stable);
  502. }
  503. }
  504. else if (IS_STABLE(dirty_it->second.state))
  505. {
  506. // First try to replace a clean stable version in the first part of the list
  507. if (!replace_stable(dirty_it->first.oid, dirty_it->first.version, 0, clean_stable_count, stable))
  508. {
  509. // Then try to replace the last dirty stable version in the second part of the list
  510. if (stable_count > 0 && stable[stable_count-1].oid == dirty_it->first.oid)
  511. {
  512. stable[stable_count-1].version = dirty_it->first.version;
  513. }
  514. else
  515. {
  516. if (stable_count >= stable_alloc)
  517. {
  518. stable_alloc += 32768;
  519. stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
  520. if (!stable)
  521. {
  522. if (unstable)
  523. free(unstable);
  524. op->retval = -ENOMEM;
  525. FINISH_OP(op);
  526. return;
  527. }
  528. }
  529. stable[stable_count++] = dirty_it->first;
  530. }
  531. }
  532. }
  533. else
  534. {
  535. if (unstable_count >= unstable_alloc)
  536. {
  537. unstable_alloc += 32768;
  538. unstable = (obj_ver_id*)realloc(unstable, sizeof(obj_ver_id) * unstable_alloc);
  539. if (!unstable)
  540. {
  541. if (stable)
  542. free(stable);
  543. op->retval = -ENOMEM;
  544. FINISH_OP(op);
  545. return;
  546. }
  547. }
  548. unstable[unstable_count++] = dirty_it->first;
  549. }
  550. }
  551. }
  552. }
  553. // Remove zeroed out stable entries
  554. int j = 0;
  555. for (int i = 0; i < stable_count; i++)
  556. {
  557. if (stable[i].version != 0)
  558. {
  559. stable[j++] = stable[i];
  560. }
  561. }
  562. stable_count = j;
  563. if (stable_count+unstable_count > stable_alloc)
  564. {
  565. stable_alloc = stable_count+unstable_count;
  566. stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
  567. if (!stable)
  568. {
  569. if (unstable)
  570. free(unstable);
  571. op->retval = -ENOMEM;
  572. FINISH_OP(op);
  573. return;
  574. }
  575. }
  576. // Copy unstable entries
  577. for (int i = 0; i < unstable_count; i++)
  578. {
  579. stable[j++] = unstable[i];
  580. }
  581. free(unstable);
  582. op->version = stable_count;
  583. op->retval = stable_count+unstable_count;
  584. op->buf = stable;
  585. FINISH_OP(op);
  586. }