Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

854 lines
29 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
  5. {
  6. this->bs = bs;
  7. this->flusher_count = flusher_count;
  8. dequeuing = false;
  9. trimming = false;
  10. active_flushers = 0;
  11. syncing_flushers = 0;
  12. // FIXME: allow to configure flusher_start_threshold and journal_trim_interval
  13. flusher_start_threshold = bs->journal_block_size / sizeof(journal_entry_stable);
  14. journal_trim_interval = 512;
  15. journal_trim_counter = 0;
  16. trim_wanted = 0;
  17. journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign_or_die(MEM_ALIGNMENT, bs->journal_block_size);
  18. co = new journal_flusher_co[flusher_count];
  19. for (int i = 0; i < flusher_count; i++)
  20. {
  21. co[i].bs = bs;
  22. co[i].flusher = this;
  23. }
  24. }
  25. journal_flusher_co::journal_flusher_co()
  26. {
  27. wait_state = 0;
  28. simple_callback_r = [this](ring_data_t* data)
  29. {
  30. bs->live = true;
  31. if (data->res != data->iov.iov_len)
  32. {
  33. throw std::runtime_error(
  34. "data read operation failed during flush ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  35. "). can't continue, sorry :-("
  36. );
  37. }
  38. wait_count--;
  39. };
  40. simple_callback_w = [this](ring_data_t* data)
  41. {
  42. bs->live = true;
  43. if (data->res != data->iov.iov_len)
  44. {
  45. throw std::runtime_error(
  46. "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  47. "). state "+std::to_string(wait_state)+". in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
  48. );
  49. }
  50. wait_count--;
  51. };
  52. }
  53. journal_flusher_t::~journal_flusher_t()
  54. {
  55. if (!bs->journal.inmemory)
  56. free(journal_superblock);
  57. delete[] co;
  58. }
  59. bool journal_flusher_t::is_active()
  60. {
  61. return active_flushers > 0 || dequeuing;
  62. }
  63. void journal_flusher_t::loop()
  64. {
  65. for (int i = 0; (active_flushers > 0 || dequeuing) && i < flusher_count; i++)
  66. {
  67. co[i].loop();
  68. }
  69. }
  70. void journal_flusher_t::enqueue_flush(obj_ver_id ov)
  71. {
  72. #ifdef BLOCKSTORE_DEBUG
  73. printf("enqueue_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
  74. #endif
  75. auto it = flush_versions.find(ov.oid);
  76. if (it != flush_versions.end())
  77. {
  78. if (it->second < ov.version)
  79. it->second = ov.version;
  80. }
  81. else
  82. {
  83. flush_versions[ov.oid] = ov.version;
  84. flush_queue.push_back(ov.oid);
  85. }
  86. if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
  87. {
  88. dequeuing = true;
  89. bs->ringloop->wakeup();
  90. }
  91. }
  92. void journal_flusher_t::unshift_flush(obj_ver_id ov, bool force)
  93. {
  94. #ifdef BLOCKSTORE_DEBUG
  95. printf("unshift_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
  96. #endif
  97. auto it = flush_versions.find(ov.oid);
  98. if (it != flush_versions.end())
  99. {
  100. if (it->second < ov.version)
  101. it->second = ov.version;
  102. }
  103. else
  104. {
  105. flush_versions[ov.oid] = ov.version;
  106. if (!force)
  107. flush_queue.push_front(ov.oid);
  108. }
  109. if (force)
  110. flush_queue.push_front(ov.oid);
  111. if (force || !dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
  112. {
  113. dequeuing = true;
  114. bs->ringloop->wakeup();
  115. }
  116. }
  117. void journal_flusher_t::remove_flush(object_id oid)
  118. {
  119. #ifdef BLOCKSTORE_DEBUG
  120. printf("undo_flush %lx:%lx\n", oid.inode, oid.stripe);
  121. #endif
  122. auto v_it = flush_versions.find(oid);
  123. if (v_it != flush_versions.end())
  124. {
  125. flush_versions.erase(v_it);
  126. for (auto q_it = flush_queue.begin(); q_it != flush_queue.end(); q_it++)
  127. {
  128. if (*q_it == oid)
  129. {
  130. flush_queue.erase(q_it);
  131. break;
  132. }
  133. }
  134. }
  135. }
  136. void journal_flusher_t::request_trim()
  137. {
  138. dequeuing = true;
  139. trim_wanted++;
  140. bs->ringloop->wakeup();
  141. }
  142. void journal_flusher_t::mark_trim_possible()
  143. {
  144. if (trim_wanted > 0)
  145. {
  146. dequeuing = true;
  147. journal_trim_counter++;
  148. bs->ringloop->wakeup();
  149. }
  150. }
  151. void journal_flusher_t::release_trim()
  152. {
  153. trim_wanted--;
  154. }
  155. #define await_sqe(label) \
  156. resume_##label:\
  157. sqe = bs->get_sqe();\
  158. if (!sqe)\
  159. {\
  160. wait_state = label;\
  161. return false;\
  162. }\
  163. data = ((ring_data_t*)sqe->user_data);
  164. // FIXME: Implement batch flushing
  165. bool journal_flusher_co::loop()
  166. {
  167. // This is much better than implementing the whole function as an FSM
  168. // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
  169. if (wait_state == 1)
  170. goto resume_1;
  171. else if (wait_state == 2)
  172. goto resume_2;
  173. else if (wait_state == 3)
  174. goto resume_3;
  175. else if (wait_state == 4)
  176. goto resume_4;
  177. else if (wait_state == 5)
  178. goto resume_5;
  179. else if (wait_state == 6)
  180. goto resume_6;
  181. else if (wait_state == 7)
  182. goto resume_7;
  183. else if (wait_state == 8)
  184. goto resume_8;
  185. else if (wait_state == 9)
  186. goto resume_9;
  187. else if (wait_state == 10)
  188. goto resume_10;
  189. else if (wait_state == 12)
  190. goto resume_12;
  191. else if (wait_state == 13)
  192. goto resume_13;
  193. else if (wait_state == 14)
  194. goto resume_14;
  195. else if (wait_state == 15)
  196. goto resume_15;
  197. else if (wait_state == 16)
  198. goto resume_16;
  199. else if (wait_state == 17)
  200. goto resume_17;
  201. else if (wait_state == 18)
  202. goto resume_18;
  203. else if (wait_state == 19)
  204. goto resume_19;
  205. else if (wait_state == 20)
  206. goto resume_20;
  207. else if (wait_state == 21)
  208. goto resume_21;
  209. resume_0:
  210. if (!flusher->flush_queue.size() || !flusher->dequeuing)
  211. {
  212. if (flusher->trim_wanted > 0 && flusher->journal_trim_counter > 0)
  213. {
  214. // Attempt forced trim
  215. flusher->active_flushers++;
  216. goto trim_journal;
  217. }
  218. flusher->dequeuing = false;
  219. wait_state = 0;
  220. return true;
  221. }
  222. cur.oid = flusher->flush_queue.front();
  223. cur.version = flusher->flush_versions[cur.oid];
  224. flusher->flush_queue.pop_front();
  225. flusher->flush_versions.erase(cur.oid);
  226. dirty_end = bs->dirty_db.find(cur);
  227. if (dirty_end != bs->dirty_db.end())
  228. {
  229. repeat_it = flusher->sync_to_repeat.find(cur.oid);
  230. if (repeat_it != flusher->sync_to_repeat.end())
  231. {
  232. #ifdef BLOCKSTORE_DEBUG
  233. printf("Postpone %lx:%lx v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version);
  234. #endif
  235. // We don't flush different parts of history of the same object in parallel
  236. // So we check if someone is already flushing this object
  237. // In that case we set sync_to_repeat and pick another object
  238. // Another coroutine will see it and re-queue the object after it finishes
  239. if (repeat_it->second < cur.version)
  240. repeat_it->second = cur.version;
  241. wait_state = 0;
  242. goto resume_0;
  243. }
  244. else
  245. flusher->sync_to_repeat[cur.oid] = 0;
  246. if (dirty_end->second.journal_sector >= bs->journal.dirty_start &&
  247. (bs->journal.dirty_start >= bs->journal.used_start ||
  248. dirty_end->second.journal_sector < bs->journal.used_start))
  249. {
  250. flusher->enqueue_flush(cur);
  251. // We can't flush journal sectors that are still written to
  252. // However, as we group flushes by oid, current oid may have older writes to flush!
  253. // And it may even block writes if we don't flush the older version
  254. // (if it's in the beginning of the journal)...
  255. // So first try to find an older version of the same object to flush.
  256. bool found = false;
  257. while (dirty_end != bs->dirty_db.begin())
  258. {
  259. dirty_end--;
  260. if (dirty_end->first.oid != cur.oid)
  261. {
  262. break;
  263. }
  264. if (!(dirty_end->second.journal_sector >= bs->journal.dirty_start &&
  265. (bs->journal.dirty_start >= bs->journal.used_start ||
  266. dirty_end->second.journal_sector < bs->journal.used_start)))
  267. {
  268. found = true;
  269. cur.version = dirty_end->first.version;
  270. break;
  271. }
  272. }
  273. if (!found)
  274. {
  275. // Try other objects
  276. flusher->sync_to_repeat.erase(cur.oid);
  277. int search_left = flusher->flush_queue.size() - 1;
  278. #ifdef BLOCKSTORE_DEBUG
  279. printf("Flusher overran writers (dirty_start=%08lx) - searching for older flushes (%d left)\n", bs->journal.dirty_start, search_left);
  280. #endif
  281. while (search_left > 0)
  282. {
  283. cur.oid = flusher->flush_queue.front();
  284. cur.version = flusher->flush_versions[cur.oid];
  285. flusher->flush_queue.pop_front();
  286. flusher->flush_versions.erase(cur.oid);
  287. dirty_end = bs->dirty_db.find(cur);
  288. if (dirty_end != bs->dirty_db.end())
  289. {
  290. if (dirty_end->second.journal_sector >= bs->journal.dirty_start &&
  291. (bs->journal.dirty_start >= bs->journal.used_start ||
  292. dirty_end->second.journal_sector < bs->journal.used_start))
  293. {
  294. #ifdef BLOCKSTORE_DEBUG
  295. printf("Write %lx:%lx v%lu is too new: offset=%08lx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector);
  296. #endif
  297. flusher->enqueue_flush(cur);
  298. }
  299. else
  300. {
  301. repeat_it = flusher->sync_to_repeat.find(cur.oid);
  302. if (repeat_it == flusher->sync_to_repeat.end())
  303. {
  304. flusher->sync_to_repeat[cur.oid] = 0;
  305. break;
  306. }
  307. }
  308. }
  309. search_left--;
  310. }
  311. if (search_left <= 0)
  312. {
  313. #ifdef BLOCKSTORE_DEBUG
  314. printf("No older flushes, stopping\n");
  315. #endif
  316. flusher->dequeuing = false;
  317. wait_state = 0;
  318. return true;
  319. }
  320. }
  321. }
  322. #ifdef BLOCKSTORE_DEBUG
  323. printf("Flushing %lx:%lx v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version);
  324. #endif
  325. flusher->active_flushers++;
  326. resume_1:
  327. // Find it in clean_db
  328. clean_it = bs->clean_db.find(cur.oid);
  329. old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
  330. // Scan dirty versions of the object
  331. if (!scan_dirty(1))
  332. {
  333. wait_state += 1;
  334. return false;
  335. }
  336. // Writes and deletes shouldn't happen at the same time
  337. assert(!has_writes || !has_delete);
  338. if (!has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX)
  339. {
  340. // Nothing to flush
  341. bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
  342. goto release_oid;
  343. }
  344. if (clean_loc == UINT64_MAX)
  345. {
  346. if (old_clean_loc == UINT64_MAX)
  347. {
  348. // Object not allocated. This is a bug.
  349. char err[1024];
  350. snprintf(
  351. err, 1024, "BUG: Object %lx:%lx v%lu that we are trying to flush is not allocated on the data device",
  352. cur.oid.inode, cur.oid.stripe, cur.version
  353. );
  354. throw std::runtime_error(err);
  355. }
  356. else
  357. {
  358. clean_loc = old_clean_loc;
  359. }
  360. }
  361. // Also we need to submit metadata read(s). We do read-modify-write cycle(s) for every operation.
  362. resume_2:
  363. if (!modify_meta_read(clean_loc, meta_new, 2))
  364. {
  365. wait_state += 2;
  366. return false;
  367. }
  368. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  369. {
  370. resume_14:
  371. if (!modify_meta_read(old_clean_loc, meta_old, 14))
  372. {
  373. wait_state += 14;
  374. return false;
  375. }
  376. }
  377. else
  378. meta_old.submitted = false;
  379. resume_3:
  380. if (wait_count > 0)
  381. {
  382. wait_state = 3;
  383. return false;
  384. }
  385. if (meta_new.submitted)
  386. {
  387. meta_new.it->second.state = 1;
  388. bs->ringloop->wakeup();
  389. }
  390. if (meta_old.submitted)
  391. {
  392. meta_old.it->second.state = 1;
  393. bs->ringloop->wakeup();
  394. }
  395. // Reads completed, submit writes and set bitmap bits
  396. if (bs->clean_entry_bitmap_size)
  397. {
  398. new_clean_bitmap = (bs->inmemory_meta
  399. ? meta_new.buf + meta_new.pos*bs->clean_entry_size + sizeof(clean_disk_entry)
  400. : bs->clean_bitmap + (clean_loc >> bs->block_order)*(bs->clean_entry_bitmap_size + bs->entry_attr_size));
  401. if (clean_init_bitmap)
  402. {
  403. memset(new_clean_bitmap, 0, bs->clean_entry_bitmap_size);
  404. bitmap_set(new_clean_bitmap, clean_bitmap_offset, clean_bitmap_len, bs->bitmap_granularity);
  405. }
  406. }
  407. for (it = v.begin(); it != v.end(); it++)
  408. {
  409. if (new_clean_bitmap)
  410. {
  411. bitmap_set(new_clean_bitmap, it->offset, it->len, bs->bitmap_granularity);
  412. }
  413. await_sqe(4);
  414. data->iov = (struct iovec){ it->buf, (size_t)it->len };
  415. data->callback = simple_callback_w;
  416. my_uring_prep_writev(
  417. sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
  418. );
  419. wait_count++;
  420. }
  421. // Sync data before writing metadata
  422. resume_16:
  423. resume_17:
  424. resume_18:
  425. if (copy_count && !fsync_batch(false, 16))
  426. {
  427. wait_state += 16;
  428. return false;
  429. }
  430. resume_5:
  431. // And metadata writes, but only after data writes complete
  432. if (!bs->inmemory_meta && meta_new.it->second.state == 0 || wait_count > 0)
  433. {
  434. // metadata sector is still being read or data is still being written, wait for it
  435. wait_state = 5;
  436. return false;
  437. }
  438. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  439. {
  440. if (!bs->inmemory_meta && meta_old.it->second.state == 0)
  441. {
  442. wait_state = 5;
  443. return false;
  444. }
  445. // zero out old metadata entry
  446. memset(meta_old.buf + meta_old.pos*bs->clean_entry_size, 0, bs->clean_entry_size);
  447. await_sqe(15);
  448. data->iov = (struct iovec){ meta_old.buf, bs->meta_block_size };
  449. data->callback = simple_callback_w;
  450. my_uring_prep_writev(
  451. sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_old.sector
  452. );
  453. wait_count++;
  454. }
  455. if (has_delete)
  456. {
  457. // zero out new metadata entry
  458. memset(meta_new.buf + meta_new.pos*bs->clean_entry_size, 0, bs->clean_entry_size);
  459. }
  460. else
  461. {
  462. clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
  463. if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
  464. {
  465. printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx) with %lx:%lx\n",
  466. clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe);
  467. exit(1);
  468. }
  469. new_entry->oid = cur.oid;
  470. new_entry->version = cur.version;
  471. if (!bs->inmemory_meta)
  472. {
  473. memcpy(&new_entry->bitmap, new_clean_bitmap, bs->clean_entry_bitmap_size);
  474. }
  475. if (bs->entry_attr_size)
  476. {
  477. // copy latest external bitmap/attributes
  478. void *bmp_ptr = bs->entry_attr_size > sizeof(void*) ? dirty_end->second.bitmap : &dirty_end->second.bitmap;
  479. memcpy((void*)(new_entry+1) + bs->clean_entry_bitmap_size, bmp_ptr, bs->entry_attr_size);
  480. }
  481. }
  482. await_sqe(6);
  483. data->iov = (struct iovec){ meta_new.buf, bs->meta_block_size };
  484. data->callback = simple_callback_w;
  485. my_uring_prep_writev(
  486. sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_new.sector
  487. );
  488. wait_count++;
  489. resume_7:
  490. if (wait_count > 0)
  491. {
  492. wait_state = 7;
  493. return false;
  494. }
  495. // Done, free all buffers
  496. if (!bs->inmemory_meta)
  497. {
  498. meta_new.it->second.usage_count--;
  499. if (meta_new.it->second.usage_count == 0)
  500. {
  501. free(meta_new.it->second.buf);
  502. flusher->meta_sectors.erase(meta_new.it);
  503. }
  504. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  505. {
  506. meta_old.it->second.usage_count--;
  507. if (meta_old.it->second.usage_count == 0)
  508. {
  509. free(meta_old.it->second.buf);
  510. flusher->meta_sectors.erase(meta_old.it);
  511. }
  512. }
  513. }
  514. for (it = v.begin(); it != v.end(); it++)
  515. {
  516. free(it->buf);
  517. }
  518. v.clear();
  519. // And sync metadata (in batches - not per each operation!)
  520. resume_8:
  521. resume_9:
  522. resume_10:
  523. if (!fsync_batch(true, 8))
  524. {
  525. wait_state += 8;
  526. return false;
  527. }
  528. // Update clean_db and dirty_db, free old data locations
  529. update_clean_db();
  530. #ifdef BLOCKSTORE_DEBUG
  531. printf("Flushed %lx:%lx v%lu (%d copies, wr:%d, del:%d), %ld left\n", cur.oid.inode, cur.oid.stripe, cur.version,
  532. copy_count, has_writes, has_delete, flusher->flush_queue.size());
  533. #endif
  534. release_oid:
  535. repeat_it = flusher->sync_to_repeat.find(cur.oid);
  536. if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version)
  537. {
  538. // Requeue version
  539. flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }, false);
  540. }
  541. flusher->sync_to_repeat.erase(repeat_it);
  542. trim_journal:
  543. // Clear unused part of the journal every <journal_trim_interval> flushes
  544. if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0)
  545. {
  546. flusher->journal_trim_counter = 0;
  547. new_trim_pos = bs->journal.get_trim_pos();
  548. if (new_trim_pos != bs->journal.used_start)
  549. {
  550. resume_19:
  551. // Wait for other coroutines trimming the journal, if any
  552. if (flusher->trimming)
  553. {
  554. wait_state = 19;
  555. return false;
  556. }
  557. flusher->trimming = true;
  558. // First update journal "superblock" and only then update <used_start> in memory
  559. await_sqe(12);
  560. *((journal_entry_start*)flusher->journal_superblock) = {
  561. .crc32 = 0,
  562. .magic = JOURNAL_MAGIC,
  563. .type = JE_START,
  564. .size = sizeof(journal_entry_start),
  565. .reserved = 0,
  566. .journal_start = new_trim_pos,
  567. };
  568. ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
  569. data->iov = (struct iovec){ flusher->journal_superblock, bs->journal_block_size };
  570. data->callback = simple_callback_w;
  571. my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
  572. wait_count++;
  573. resume_13:
  574. if (wait_count > 0)
  575. {
  576. wait_state = 13;
  577. return false;
  578. }
  579. if (!bs->disable_journal_fsync)
  580. {
  581. await_sqe(20);
  582. my_uring_prep_fsync(sqe, bs->journal.fd, IORING_FSYNC_DATASYNC);
  583. data->iov = { 0 };
  584. data->callback = simple_callback_w;
  585. resume_21:
  586. if (wait_count > 0)
  587. {
  588. wait_state = 21;
  589. return false;
  590. }
  591. }
  592. bs->journal.used_start = new_trim_pos;
  593. #ifdef BLOCKSTORE_DEBUG
  594. printf("Journal trimmed to %08lx (next_free=%08lx)\n", bs->journal.used_start, bs->journal.next_free);
  595. #endif
  596. flusher->trimming = false;
  597. }
  598. }
  599. // All done
  600. flusher->active_flushers--;
  601. wait_state = 0;
  602. goto resume_0;
  603. }
  604. return true;
  605. }
  606. bool journal_flusher_co::scan_dirty(int wait_base)
  607. {
  608. if (wait_state == wait_base)
  609. {
  610. goto resume_0;
  611. }
  612. dirty_it = dirty_start = dirty_end;
  613. v.clear();
  614. wait_count = 0;
  615. copy_count = 0;
  616. clean_loc = UINT64_MAX;
  617. has_delete = false;
  618. has_writes = false;
  619. skip_copy = false;
  620. clean_init_bitmap = false;
  621. while (1)
  622. {
  623. if (!IS_STABLE(dirty_it->second.state))
  624. {
  625. char err[1024];
  626. snprintf(
  627. err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu unstable state during flush: %d",
  628. dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
  629. );
  630. throw std::runtime_error(err);
  631. }
  632. else if (IS_JOURNAL(dirty_it->second.state) && !skip_copy)
  633. {
  634. // First we submit all reads
  635. has_writes = true;
  636. if (dirty_it->second.len != 0)
  637. {
  638. offset = dirty_it->second.offset;
  639. end_offset = dirty_it->second.offset + dirty_it->second.len;
  640. it = v.begin();
  641. while (1)
  642. {
  643. for (; it != v.end(); it++)
  644. if (it->offset >= offset)
  645. break;
  646. if (it == v.end() || it->offset > offset && it->len > 0)
  647. {
  648. submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
  649. submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
  650. it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign_or_die(MEM_ALIGNMENT, submit_len) });
  651. copy_count++;
  652. if (bs->journal.inmemory)
  653. {
  654. // Take it from memory
  655. memcpy(it->buf, bs->journal.buffer + submit_offset, submit_len);
  656. }
  657. else
  658. {
  659. // Read it from disk
  660. await_sqe(0);
  661. data->iov = (struct iovec){ it->buf, (size_t)submit_len };
  662. data->callback = simple_callback_r;
  663. my_uring_prep_readv(
  664. sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset
  665. );
  666. wait_count++;
  667. }
  668. }
  669. offset = it->offset+it->len;
  670. if (it == v.end() || offset >= end_offset)
  671. break;
  672. }
  673. }
  674. }
  675. else if (IS_BIG_WRITE(dirty_it->second.state) && !skip_copy)
  676. {
  677. // There is an unflushed big write. Copy small writes in its position
  678. has_writes = true;
  679. clean_loc = dirty_it->second.location;
  680. clean_init_bitmap = true;
  681. clean_bitmap_offset = dirty_it->second.offset;
  682. clean_bitmap_len = dirty_it->second.len;
  683. skip_copy = true;
  684. }
  685. else if (IS_DELETE(dirty_it->second.state) && !skip_copy)
  686. {
  687. // There is an unflushed delete
  688. has_delete = true;
  689. skip_copy = true;
  690. }
  691. dirty_start = dirty_it;
  692. if (dirty_it == bs->dirty_db.begin())
  693. {
  694. break;
  695. }
  696. dirty_it--;
  697. if (dirty_it->first.oid != cur.oid)
  698. {
  699. break;
  700. }
  701. }
  702. return true;
  703. }
  704. bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base)
  705. {
  706. if (wait_state == wait_base)
  707. {
  708. goto resume_0;
  709. }
  710. // We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
  711. // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
  712. // so I'll avoid it as long as I can.
  713. wr.submitted = false;
  714. wr.sector = ((meta_loc >> bs->block_order) / (bs->meta_block_size / bs->clean_entry_size)) * bs->meta_block_size;
  715. wr.pos = ((meta_loc >> bs->block_order) % (bs->meta_block_size / bs->clean_entry_size));
  716. if (bs->inmemory_meta)
  717. {
  718. wr.buf = bs->metadata_buffer + wr.sector;
  719. return true;
  720. }
  721. wr.it = flusher->meta_sectors.find(wr.sector);
  722. if (wr.it == flusher->meta_sectors.end())
  723. {
  724. // Not in memory yet, read it
  725. wr.buf = memalign_or_die(MEM_ALIGNMENT, bs->meta_block_size);
  726. wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){
  727. .offset = wr.sector,
  728. .len = bs->meta_block_size,
  729. .state = 0, // 0 = not read yet
  730. .buf = wr.buf,
  731. .usage_count = 1,
  732. }).first;
  733. await_sqe(0);
  734. data->iov = (struct iovec){ wr.it->second.buf, bs->meta_block_size };
  735. data->callback = simple_callback_r;
  736. wr.submitted = true;
  737. my_uring_prep_readv(
  738. sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + wr.sector
  739. );
  740. wait_count++;
  741. }
  742. else
  743. {
  744. wr.buf = wr.it->second.buf;
  745. wr.it->second.usage_count++;
  746. }
  747. return true;
  748. }
  749. void journal_flusher_co::update_clean_db()
  750. {
  751. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  752. {
  753. #ifdef BLOCKSTORE_DEBUG
  754. printf("Free block %lu (new location is %lu)\n", old_clean_loc >> bs->block_order, clean_loc >> bs->block_order);
  755. #endif
  756. bs->data_alloc->set(old_clean_loc >> bs->block_order, false);
  757. }
  758. if (has_delete)
  759. {
  760. auto clean_it = bs->clean_db.find(cur.oid);
  761. bs->clean_db.erase(clean_it);
  762. bs->data_alloc->set(clean_loc >> bs->block_order, false);
  763. clean_loc = UINT64_MAX;
  764. }
  765. else
  766. {
  767. bs->clean_db[cur.oid] = {
  768. .version = cur.version,
  769. .location = clean_loc,
  770. };
  771. }
  772. bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
  773. }
  774. bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
  775. {
  776. if (wait_state == wait_base)
  777. goto resume_0;
  778. else if (wait_state == wait_base+1)
  779. goto resume_1;
  780. else if (wait_state == wait_base+2)
  781. goto resume_2;
  782. if (!(fsync_meta ? bs->disable_meta_fsync : bs->disable_journal_fsync))
  783. {
  784. cur_sync = flusher->syncs.end();
  785. while (cur_sync != flusher->syncs.begin())
  786. {
  787. cur_sync--;
  788. if (cur_sync->fsync_meta == fsync_meta && cur_sync->state == 0)
  789. {
  790. goto sync_found;
  791. }
  792. }
  793. cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){
  794. .fsync_meta = fsync_meta,
  795. .ready_count = 0,
  796. .state = 0,
  797. });
  798. sync_found:
  799. cur_sync->ready_count++;
  800. flusher->syncing_flushers++;
  801. if (flusher->syncing_flushers >= flusher->flusher_count || !flusher->flush_queue.size())
  802. {
  803. // Sync batch is ready. Do it.
  804. await_sqe(0);
  805. data->iov = { 0 };
  806. data->callback = simple_callback_w;
  807. my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
  808. cur_sync->state = 1;
  809. wait_count++;
  810. resume_1:
  811. if (wait_count > 0)
  812. {
  813. wait_state = 1;
  814. return false;
  815. }
  816. // Sync completed. All previous coroutines waiting for it must be resumed
  817. cur_sync->state = 2;
  818. bs->ringloop->wakeup();
  819. }
  820. // Wait until someone else sends and completes a sync.
  821. resume_2:
  822. if (!cur_sync->state)
  823. {
  824. wait_state = 2;
  825. return false;
  826. }
  827. flusher->syncing_flushers--;
  828. cur_sync->ready_count--;
  829. if (cur_sync->ready_count == 0)
  830. {
  831. flusher->syncs.erase(cur_sync);
  832. }
  833. }
  834. return true;
  835. }