Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

896 lines
31 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. journal_flusher_t::journal_flusher_t(blockstore_impl_t *bs)
  5. {
  6. this->bs = bs;
  7. this->max_flusher_count = bs->max_flusher_count;
  8. this->min_flusher_count = bs->min_flusher_count;
  9. this->cur_flusher_count = bs->min_flusher_count;
  10. this->target_flusher_count = bs->min_flusher_count;
  11. dequeuing = false;
  12. trimming = false;
  13. active_flushers = 0;
  14. syncing_flushers = 0;
  15. // FIXME: allow to configure flusher_start_threshold and journal_trim_interval
  16. flusher_start_threshold = bs->journal_block_size / sizeof(journal_entry_stable);
  17. journal_trim_interval = 512;
  18. journal_trim_counter = bs->journal.flush_journal ? 1 : 0;
  19. trim_wanted = bs->journal.flush_journal ? 1 : 0;
  20. journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign_or_die(MEM_ALIGNMENT, bs->journal_block_size);
  21. co = new journal_flusher_co[max_flusher_count];
  22. for (int i = 0; i < max_flusher_count; i++)
  23. {
  24. co[i].bs = bs;
  25. co[i].flusher = this;
  26. }
  27. }
  28. journal_flusher_co::journal_flusher_co()
  29. {
  30. wait_state = 0;
  31. simple_callback_r = [this](ring_data_t* data)
  32. {
  33. bs->live = true;
  34. if (data->res != data->iov.iov_len)
  35. {
  36. throw std::runtime_error(
  37. "data read operation failed during flush ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  38. "). can't continue, sorry :-("
  39. );
  40. }
  41. wait_count--;
  42. };
  43. simple_callback_w = [this](ring_data_t* data)
  44. {
  45. bs->live = true;
  46. if (data->res != data->iov.iov_len)
  47. {
  48. throw std::runtime_error(
  49. "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  50. "). state "+std::to_string(wait_state)+". in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
  51. );
  52. }
  53. wait_count--;
  54. };
  55. }
  56. journal_flusher_t::~journal_flusher_t()
  57. {
  58. if (!bs->journal.inmemory)
  59. free(journal_superblock);
  60. delete[] co;
  61. }
  62. bool journal_flusher_t::is_active()
  63. {
  64. return active_flushers > 0 || dequeuing;
  65. }
  66. void journal_flusher_t::loop()
  67. {
  68. target_flusher_count = bs->write_iodepth*2;
  69. if (target_flusher_count < min_flusher_count)
  70. target_flusher_count = min_flusher_count;
  71. else if (target_flusher_count > max_flusher_count)
  72. target_flusher_count = max_flusher_count;
  73. if (target_flusher_count > cur_flusher_count)
  74. cur_flusher_count = target_flusher_count;
  75. else if (target_flusher_count < cur_flusher_count)
  76. {
  77. while (target_flusher_count < cur_flusher_count)
  78. {
  79. if (co[cur_flusher_count-1].wait_state)
  80. break;
  81. cur_flusher_count--;
  82. }
  83. }
  84. for (int i = 0; (active_flushers > 0 || dequeuing) && i < cur_flusher_count; i++)
  85. co[i].loop();
  86. }
  87. void journal_flusher_t::enqueue_flush(obj_ver_id ov)
  88. {
  89. #ifdef BLOCKSTORE_DEBUG
  90. printf("enqueue_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
  91. #endif
  92. auto it = flush_versions.find(ov.oid);
  93. if (it != flush_versions.end())
  94. {
  95. if (it->second < ov.version)
  96. it->second = ov.version;
  97. }
  98. else
  99. {
  100. flush_versions[ov.oid] = ov.version;
  101. flush_queue.push_back(ov.oid);
  102. }
  103. if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
  104. {
  105. dequeuing = true;
  106. bs->ringloop->wakeup();
  107. }
  108. }
  109. void journal_flusher_t::unshift_flush(obj_ver_id ov, bool force)
  110. {
  111. #ifdef BLOCKSTORE_DEBUG
  112. printf("unshift_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version);
  113. #endif
  114. auto it = flush_versions.find(ov.oid);
  115. if (it != flush_versions.end())
  116. {
  117. if (it->second < ov.version)
  118. it->second = ov.version;
  119. }
  120. else
  121. {
  122. flush_versions[ov.oid] = ov.version;
  123. if (!force)
  124. flush_queue.push_front(ov.oid);
  125. }
  126. if (force)
  127. flush_queue.push_front(ov.oid);
  128. if (force || !dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
  129. {
  130. dequeuing = true;
  131. bs->ringloop->wakeup();
  132. }
  133. }
  134. void journal_flusher_t::remove_flush(object_id oid)
  135. {
  136. #ifdef BLOCKSTORE_DEBUG
  137. printf("undo_flush %lx:%lx\n", oid.inode, oid.stripe);
  138. #endif
  139. auto v_it = flush_versions.find(oid);
  140. if (v_it != flush_versions.end())
  141. {
  142. flush_versions.erase(v_it);
  143. for (auto q_it = flush_queue.begin(); q_it != flush_queue.end(); q_it++)
  144. {
  145. if (*q_it == oid)
  146. {
  147. flush_queue.erase(q_it);
  148. break;
  149. }
  150. }
  151. }
  152. }
  153. void journal_flusher_t::request_trim()
  154. {
  155. dequeuing = true;
  156. trim_wanted++;
  157. bs->ringloop->wakeup();
  158. }
  159. void journal_flusher_t::mark_trim_possible()
  160. {
  161. if (trim_wanted > 0)
  162. {
  163. dequeuing = true;
  164. journal_trim_counter++;
  165. bs->ringloop->wakeup();
  166. }
  167. }
  168. void journal_flusher_t::release_trim()
  169. {
  170. trim_wanted--;
  171. }
  172. #define await_sqe(label) \
  173. resume_##label:\
  174. sqe = bs->get_sqe();\
  175. if (!sqe)\
  176. {\
  177. wait_state = label;\
  178. return false;\
  179. }\
  180. data = ((ring_data_t*)sqe->user_data);
  181. // FIXME: Implement batch flushing
  182. bool journal_flusher_co::loop()
  183. {
  184. // This is much better than implementing the whole function as an FSM
  185. // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
  186. if (wait_state == 1)
  187. goto resume_1;
  188. else if (wait_state == 2)
  189. goto resume_2;
  190. else if (wait_state == 3)
  191. goto resume_3;
  192. else if (wait_state == 4)
  193. goto resume_4;
  194. else if (wait_state == 5)
  195. goto resume_5;
  196. else if (wait_state == 6)
  197. goto resume_6;
  198. else if (wait_state == 7)
  199. goto resume_7;
  200. else if (wait_state == 8)
  201. goto resume_8;
  202. else if (wait_state == 9)
  203. goto resume_9;
  204. else if (wait_state == 10)
  205. goto resume_10;
  206. else if (wait_state == 12)
  207. goto resume_12;
  208. else if (wait_state == 13)
  209. goto resume_13;
  210. else if (wait_state == 14)
  211. goto resume_14;
  212. else if (wait_state == 15)
  213. goto resume_15;
  214. else if (wait_state == 16)
  215. goto resume_16;
  216. else if (wait_state == 17)
  217. goto resume_17;
  218. else if (wait_state == 18)
  219. goto resume_18;
  220. else if (wait_state == 19)
  221. goto resume_19;
  222. else if (wait_state == 20)
  223. goto resume_20;
  224. else if (wait_state == 21)
  225. goto resume_21;
  226. resume_0:
  227. if (flusher->flush_queue.size() < flusher->min_flusher_count && !flusher->trim_wanted ||
  228. !flusher->flush_queue.size() || !flusher->dequeuing)
  229. {
  230. stop_flusher:
  231. if (flusher->trim_wanted > 0 && flusher->journal_trim_counter > 0)
  232. {
  233. // Attempt forced trim
  234. flusher->active_flushers++;
  235. goto trim_journal;
  236. }
  237. flusher->dequeuing = false;
  238. wait_state = 0;
  239. return true;
  240. }
  241. cur.oid = flusher->flush_queue.front();
  242. cur.version = flusher->flush_versions[cur.oid];
  243. flusher->flush_queue.pop_front();
  244. flusher->flush_versions.erase(cur.oid);
  245. dirty_end = bs->dirty_db.find(cur);
  246. if (dirty_end != bs->dirty_db.end())
  247. {
  248. repeat_it = flusher->sync_to_repeat.find(cur.oid);
  249. if (repeat_it != flusher->sync_to_repeat.end())
  250. {
  251. #ifdef BLOCKSTORE_DEBUG
  252. printf("Postpone %lx:%lx v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version);
  253. #endif
  254. // We don't flush different parts of history of the same object in parallel
  255. // So we check if someone is already flushing this object
  256. // In that case we set sync_to_repeat and pick another object
  257. // Another coroutine will see it and re-queue the object after it finishes
  258. if (repeat_it->second < cur.version)
  259. repeat_it->second = cur.version;
  260. wait_state = 0;
  261. goto resume_0;
  262. }
  263. else
  264. flusher->sync_to_repeat[cur.oid] = 0;
  265. if (dirty_end->second.journal_sector >= bs->journal.dirty_start &&
  266. (bs->journal.dirty_start >= bs->journal.used_start ||
  267. dirty_end->second.journal_sector < bs->journal.used_start))
  268. {
  269. flusher->enqueue_flush(cur);
  270. // We can't flush journal sectors that are still written to
  271. // However, as we group flushes by oid, current oid may have older writes to flush!
  272. // And it may even block writes if we don't flush the older version
  273. // (if it's in the beginning of the journal)...
  274. // So first try to find an older version of the same object to flush.
  275. bool found = false;
  276. while (dirty_end != bs->dirty_db.begin())
  277. {
  278. dirty_end--;
  279. if (dirty_end->first.oid != cur.oid)
  280. {
  281. break;
  282. }
  283. if (!(dirty_end->second.journal_sector >= bs->journal.dirty_start &&
  284. (bs->journal.dirty_start >= bs->journal.used_start ||
  285. dirty_end->second.journal_sector < bs->journal.used_start)))
  286. {
  287. found = true;
  288. cur.version = dirty_end->first.version;
  289. break;
  290. }
  291. }
  292. if (!found)
  293. {
  294. // Try other objects
  295. flusher->sync_to_repeat.erase(cur.oid);
  296. int search_left = flusher->flush_queue.size() - 1;
  297. #ifdef BLOCKSTORE_DEBUG
  298. printf("Flusher overran writers (dirty_start=%08lx) - searching for older flushes (%d left)\n", bs->journal.dirty_start, search_left);
  299. #endif
  300. while (search_left > 0)
  301. {
  302. cur.oid = flusher->flush_queue.front();
  303. cur.version = flusher->flush_versions[cur.oid];
  304. flusher->flush_queue.pop_front();
  305. flusher->flush_versions.erase(cur.oid);
  306. dirty_end = bs->dirty_db.find(cur);
  307. if (dirty_end != bs->dirty_db.end())
  308. {
  309. if (dirty_end->second.journal_sector >= bs->journal.dirty_start &&
  310. (bs->journal.dirty_start >= bs->journal.used_start ||
  311. dirty_end->second.journal_sector < bs->journal.used_start))
  312. {
  313. #ifdef BLOCKSTORE_DEBUG
  314. printf("Write %lx:%lx v%lu is too new: offset=%08lx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector);
  315. #endif
  316. flusher->enqueue_flush(cur);
  317. }
  318. else
  319. {
  320. repeat_it = flusher->sync_to_repeat.find(cur.oid);
  321. if (repeat_it == flusher->sync_to_repeat.end())
  322. {
  323. flusher->sync_to_repeat[cur.oid] = 0;
  324. break;
  325. }
  326. }
  327. }
  328. search_left--;
  329. }
  330. if (search_left <= 0)
  331. {
  332. #ifdef BLOCKSTORE_DEBUG
  333. printf("No older flushes, stopping\n");
  334. #endif
  335. goto stop_flusher;
  336. }
  337. }
  338. }
  339. #ifdef BLOCKSTORE_DEBUG
  340. printf("Flushing %lx:%lx v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version);
  341. #endif
  342. flusher->active_flushers++;
  343. resume_1:
  344. // Find it in clean_db
  345. clean_it = bs->clean_db.find(cur.oid);
  346. old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
  347. // Scan dirty versions of the object
  348. if (!scan_dirty(1))
  349. {
  350. wait_state += 1;
  351. return false;
  352. }
  353. // Writes and deletes shouldn't happen at the same time
  354. assert(!has_writes || !has_delete);
  355. if (!has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX)
  356. {
  357. // Nothing to flush
  358. bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
  359. goto release_oid;
  360. }
  361. if (clean_loc == UINT64_MAX)
  362. {
  363. if (old_clean_loc == UINT64_MAX)
  364. {
  365. // Object not allocated. This is a bug.
  366. char err[1024];
  367. snprintf(
  368. err, 1024, "BUG: Object %lx:%lx v%lu that we are trying to flush is not allocated on the data device",
  369. cur.oid.inode, cur.oid.stripe, cur.version
  370. );
  371. throw std::runtime_error(err);
  372. }
  373. else
  374. {
  375. clean_loc = old_clean_loc;
  376. }
  377. }
  378. // Also we need to submit metadata read(s). We do read-modify-write cycle(s) for every operation.
  379. resume_2:
  380. if (!modify_meta_read(clean_loc, meta_new, 2))
  381. {
  382. wait_state += 2;
  383. return false;
  384. }
  385. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  386. {
  387. resume_14:
  388. if (!modify_meta_read(old_clean_loc, meta_old, 14))
  389. {
  390. wait_state += 14;
  391. return false;
  392. }
  393. }
  394. else
  395. meta_old.submitted = false;
  396. resume_3:
  397. if (wait_count > 0)
  398. {
  399. wait_state = 3;
  400. return false;
  401. }
  402. if (meta_new.submitted)
  403. {
  404. meta_new.it->second.state = 1;
  405. bs->ringloop->wakeup();
  406. }
  407. if (meta_old.submitted)
  408. {
  409. meta_old.it->second.state = 1;
  410. bs->ringloop->wakeup();
  411. }
  412. // Reads completed, submit writes and set bitmap bits
  413. if (bs->clean_entry_bitmap_size)
  414. {
  415. new_clean_bitmap = (bs->inmemory_meta
  416. ? meta_new.buf + meta_new.pos*bs->clean_entry_size + sizeof(clean_disk_entry)
  417. : bs->clean_bitmap + (clean_loc >> bs->block_order)*(2*bs->clean_entry_bitmap_size));
  418. if (clean_init_bitmap)
  419. {
  420. memset(new_clean_bitmap, 0, bs->clean_entry_bitmap_size);
  421. bitmap_set(new_clean_bitmap, clean_bitmap_offset, clean_bitmap_len, bs->bitmap_granularity);
  422. }
  423. }
  424. for (it = v.begin(); it != v.end(); it++)
  425. {
  426. if (new_clean_bitmap)
  427. {
  428. bitmap_set(new_clean_bitmap, it->offset, it->len, bs->bitmap_granularity);
  429. }
  430. await_sqe(4);
  431. data->iov = (struct iovec){ it->buf, (size_t)it->len };
  432. data->callback = simple_callback_w;
  433. my_uring_prep_writev(
  434. sqe, bs->data_fd, &data->iov, 1, bs->data_offset + clean_loc + it->offset
  435. );
  436. wait_count++;
  437. }
  438. // Sync data before writing metadata
  439. resume_16:
  440. resume_17:
  441. resume_18:
  442. if (copy_count && !fsync_batch(false, 16))
  443. {
  444. wait_state += 16;
  445. return false;
  446. }
  447. resume_5:
  448. // And metadata writes, but only after data writes complete
  449. if (!bs->inmemory_meta && meta_new.it->second.state == 0 || wait_count > 0)
  450. {
  451. // metadata sector is still being read or data is still being written, wait for it
  452. wait_state = 5;
  453. return false;
  454. }
  455. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  456. {
  457. if (!bs->inmemory_meta && meta_old.it->second.state == 0)
  458. {
  459. wait_state = 5;
  460. return false;
  461. }
  462. // zero out old metadata entry
  463. memset(meta_old.buf + meta_old.pos*bs->clean_entry_size, 0, bs->clean_entry_size);
  464. await_sqe(15);
  465. data->iov = (struct iovec){ meta_old.buf, bs->meta_block_size };
  466. data->callback = simple_callback_w;
  467. my_uring_prep_writev(
  468. sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_old.sector
  469. );
  470. wait_count++;
  471. }
  472. if (has_delete)
  473. {
  474. clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
  475. if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
  476. {
  477. printf("Fatal error (metadata corruption or bug): tried to delete metadata entry %lu (%lx:%lx) while deleting %lx:%lx\n",
  478. clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe);
  479. exit(1);
  480. }
  481. // zero out new metadata entry
  482. memset(meta_new.buf + meta_new.pos*bs->clean_entry_size, 0, bs->clean_entry_size);
  483. }
  484. else
  485. {
  486. clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size);
  487. if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid)
  488. {
  489. printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx) with %lx:%lx\n",
  490. clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe);
  491. exit(1);
  492. }
  493. new_entry->oid = cur.oid;
  494. new_entry->version = cur.version;
  495. if (!bs->inmemory_meta)
  496. {
  497. memcpy(&new_entry->bitmap, new_clean_bitmap, bs->clean_entry_bitmap_size);
  498. }
  499. // copy latest external bitmap/attributes
  500. if (bs->clean_entry_bitmap_size)
  501. {
  502. void *bmp_ptr = bs->clean_entry_bitmap_size > sizeof(void*) ? dirty_end->second.bitmap : &dirty_end->second.bitmap;
  503. memcpy((void*)(new_entry+1) + bs->clean_entry_bitmap_size, bmp_ptr, bs->clean_entry_bitmap_size);
  504. }
  505. }
  506. await_sqe(6);
  507. data->iov = (struct iovec){ meta_new.buf, bs->meta_block_size };
  508. data->callback = simple_callback_w;
  509. my_uring_prep_writev(
  510. sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_new.sector
  511. );
  512. wait_count++;
  513. resume_7:
  514. if (wait_count > 0)
  515. {
  516. wait_state = 7;
  517. return false;
  518. }
  519. // Done, free all buffers
  520. if (!bs->inmemory_meta)
  521. {
  522. meta_new.it->second.usage_count--;
  523. if (meta_new.it->second.usage_count == 0)
  524. {
  525. free(meta_new.it->second.buf);
  526. flusher->meta_sectors.erase(meta_new.it);
  527. }
  528. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  529. {
  530. meta_old.it->second.usage_count--;
  531. if (meta_old.it->second.usage_count == 0)
  532. {
  533. free(meta_old.it->second.buf);
  534. flusher->meta_sectors.erase(meta_old.it);
  535. }
  536. }
  537. }
  538. for (it = v.begin(); it != v.end(); it++)
  539. {
  540. free(it->buf);
  541. }
  542. v.clear();
  543. // And sync metadata (in batches - not per each operation!)
  544. resume_8:
  545. resume_9:
  546. resume_10:
  547. if (!fsync_batch(true, 8))
  548. {
  549. wait_state += 8;
  550. return false;
  551. }
  552. // Update clean_db and dirty_db, free old data locations
  553. update_clean_db();
  554. #ifdef BLOCKSTORE_DEBUG
  555. printf("Flushed %lx:%lx v%lu (%d copies, wr:%d, del:%d), %ld left\n", cur.oid.inode, cur.oid.stripe, cur.version,
  556. copy_count, has_writes, has_delete, flusher->flush_queue.size());
  557. #endif
  558. release_oid:
  559. repeat_it = flusher->sync_to_repeat.find(cur.oid);
  560. if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version)
  561. {
  562. // Requeue version
  563. flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }, false);
  564. }
  565. flusher->sync_to_repeat.erase(repeat_it);
  566. trim_journal:
  567. // Clear unused part of the journal every <journal_trim_interval> flushes
  568. if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0)
  569. {
  570. flusher->journal_trim_counter = 0;
  571. new_trim_pos = bs->journal.get_trim_pos();
  572. if (new_trim_pos != bs->journal.used_start)
  573. {
  574. resume_19:
  575. // Wait for other coroutines trimming the journal, if any
  576. if (flusher->trimming)
  577. {
  578. wait_state = 19;
  579. return false;
  580. }
  581. flusher->trimming = true;
  582. // First update journal "superblock" and only then update <used_start> in memory
  583. await_sqe(12);
  584. *((journal_entry_start*)flusher->journal_superblock) = {
  585. .crc32 = 0,
  586. .magic = JOURNAL_MAGIC,
  587. .type = JE_START,
  588. .size = sizeof(journal_entry_start),
  589. .reserved = 0,
  590. .journal_start = new_trim_pos,
  591. .version = JOURNAL_VERSION,
  592. };
  593. ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
  594. data->iov = (struct iovec){ flusher->journal_superblock, bs->journal_block_size };
  595. data->callback = simple_callback_w;
  596. my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
  597. wait_count++;
  598. resume_13:
  599. if (wait_count > 0)
  600. {
  601. wait_state = 13;
  602. return false;
  603. }
  604. if (!bs->disable_journal_fsync)
  605. {
  606. await_sqe(20);
  607. my_uring_prep_fsync(sqe, bs->journal.fd, IORING_FSYNC_DATASYNC);
  608. data->iov = { 0 };
  609. data->callback = simple_callback_w;
  610. resume_21:
  611. if (wait_count > 0)
  612. {
  613. wait_state = 21;
  614. return false;
  615. }
  616. }
  617. bs->journal.used_start = new_trim_pos;
  618. #ifdef BLOCKSTORE_DEBUG
  619. printf("Journal trimmed to %08lx (next_free=%08lx)\n", bs->journal.used_start, bs->journal.next_free);
  620. #endif
  621. flusher->trimming = false;
  622. }
  623. if (bs->journal.flush_journal && !flusher->flush_queue.size())
  624. {
  625. assert(bs->journal.used_start == bs->journal.next_free);
  626. printf("Journal flushed\n");
  627. exit(0);
  628. }
  629. }
  630. // All done
  631. flusher->active_flushers--;
  632. wait_state = 0;
  633. goto resume_0;
  634. }
  635. return true;
  636. }
  637. bool journal_flusher_co::scan_dirty(int wait_base)
  638. {
  639. if (wait_state == wait_base)
  640. {
  641. goto resume_0;
  642. }
  643. dirty_it = dirty_start = dirty_end;
  644. v.clear();
  645. wait_count = 0;
  646. copy_count = 0;
  647. clean_loc = UINT64_MAX;
  648. has_delete = false;
  649. has_writes = false;
  650. skip_copy = false;
  651. clean_init_bitmap = false;
  652. while (1)
  653. {
  654. if (!IS_STABLE(dirty_it->second.state))
  655. {
  656. char err[1024];
  657. snprintf(
  658. err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu unstable state during flush: 0x%x",
  659. dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state
  660. );
  661. throw std::runtime_error(err);
  662. }
  663. else if (IS_JOURNAL(dirty_it->second.state) && !skip_copy)
  664. {
  665. // First we submit all reads
  666. has_writes = true;
  667. if (dirty_it->second.len != 0)
  668. {
  669. offset = dirty_it->second.offset;
  670. end_offset = dirty_it->second.offset + dirty_it->second.len;
  671. it = v.begin();
  672. while (1)
  673. {
  674. for (; it != v.end(); it++)
  675. if (it->offset >= offset)
  676. break;
  677. if (it == v.end() || it->offset > offset && it->len > 0)
  678. {
  679. submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
  680. submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
  681. it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign_or_die(MEM_ALIGNMENT, submit_len) });
  682. copy_count++;
  683. if (bs->journal.inmemory)
  684. {
  685. // Take it from memory
  686. memcpy(it->buf, bs->journal.buffer + submit_offset, submit_len);
  687. }
  688. else
  689. {
  690. // Read it from disk
  691. await_sqe(0);
  692. data->iov = (struct iovec){ it->buf, (size_t)submit_len };
  693. data->callback = simple_callback_r;
  694. my_uring_prep_readv(
  695. sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + submit_offset
  696. );
  697. wait_count++;
  698. }
  699. }
  700. offset = it->offset+it->len;
  701. if (it == v.end() || offset >= end_offset)
  702. break;
  703. }
  704. }
  705. }
  706. else if (IS_BIG_WRITE(dirty_it->second.state) && !skip_copy)
  707. {
  708. // There is an unflushed big write. Copy small writes in its position
  709. has_writes = true;
  710. clean_loc = dirty_it->second.location;
  711. clean_init_bitmap = true;
  712. clean_bitmap_offset = dirty_it->second.offset;
  713. clean_bitmap_len = dirty_it->second.len;
  714. skip_copy = true;
  715. }
  716. else if (IS_DELETE(dirty_it->second.state) && !skip_copy)
  717. {
  718. // There is an unflushed delete
  719. has_delete = true;
  720. skip_copy = true;
  721. }
  722. dirty_start = dirty_it;
  723. if (dirty_it == bs->dirty_db.begin())
  724. {
  725. break;
  726. }
  727. dirty_it--;
  728. if (dirty_it->first.oid != cur.oid)
  729. {
  730. break;
  731. }
  732. }
  733. return true;
  734. }
  735. bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base)
  736. {
  737. if (wait_state == wait_base)
  738. {
  739. goto resume_0;
  740. }
  741. // We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
  742. // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
  743. // so I'll avoid it as long as I can.
  744. wr.submitted = false;
  745. wr.sector = ((meta_loc >> bs->block_order) / (bs->meta_block_size / bs->clean_entry_size)) * bs->meta_block_size;
  746. wr.pos = ((meta_loc >> bs->block_order) % (bs->meta_block_size / bs->clean_entry_size));
  747. if (bs->inmemory_meta)
  748. {
  749. wr.buf = bs->metadata_buffer + wr.sector;
  750. return true;
  751. }
  752. wr.it = flusher->meta_sectors.find(wr.sector);
  753. if (wr.it == flusher->meta_sectors.end())
  754. {
  755. // Not in memory yet, read it
  756. wr.buf = memalign_or_die(MEM_ALIGNMENT, bs->meta_block_size);
  757. wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){
  758. .offset = wr.sector,
  759. .len = bs->meta_block_size,
  760. .state = 0, // 0 = not read yet
  761. .buf = wr.buf,
  762. .usage_count = 1,
  763. }).first;
  764. await_sqe(0);
  765. data->iov = (struct iovec){ wr.it->second.buf, bs->meta_block_size };
  766. data->callback = simple_callback_r;
  767. wr.submitted = true;
  768. my_uring_prep_readv(
  769. sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + wr.sector
  770. );
  771. wait_count++;
  772. }
  773. else
  774. {
  775. wr.buf = wr.it->second.buf;
  776. wr.it->second.usage_count++;
  777. }
  778. return true;
  779. }
  780. void journal_flusher_co::update_clean_db()
  781. {
  782. if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc)
  783. {
  784. #ifdef BLOCKSTORE_DEBUG
  785. printf("Free block %lu from %lx:%lx v%lu (new location is %lu)\n",
  786. old_clean_loc >> bs->block_order,
  787. cur.oid.inode, cur.oid.stripe, cur.version,
  788. clean_loc >> bs->block_order);
  789. #endif
  790. bs->data_alloc->set(old_clean_loc >> bs->block_order, false);
  791. }
  792. if (has_delete)
  793. {
  794. auto clean_it = bs->clean_db.find(cur.oid);
  795. bs->clean_db.erase(clean_it);
  796. #ifdef BLOCKSTORE_DEBUG
  797. printf("Free block %lu from %lx:%lx v%lu (delete)\n",
  798. clean_loc >> bs->block_order,
  799. cur.oid.inode, cur.oid.stripe, cur.version);
  800. #endif
  801. bs->data_alloc->set(clean_loc >> bs->block_order, false);
  802. clean_loc = UINT64_MAX;
  803. }
  804. else
  805. {
  806. bs->clean_db[cur.oid] = {
  807. .version = cur.version,
  808. .location = clean_loc,
  809. };
  810. }
  811. bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc);
  812. }
  813. bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
  814. {
  815. if (wait_state == wait_base)
  816. goto resume_0;
  817. else if (wait_state == wait_base+1)
  818. goto resume_1;
  819. else if (wait_state == wait_base+2)
  820. goto resume_2;
  821. if (!(fsync_meta ? bs->disable_meta_fsync : bs->disable_data_fsync))
  822. {
  823. cur_sync = flusher->syncs.end();
  824. while (cur_sync != flusher->syncs.begin())
  825. {
  826. cur_sync--;
  827. if (cur_sync->fsync_meta == fsync_meta && cur_sync->state == 0)
  828. {
  829. goto sync_found;
  830. }
  831. }
  832. cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){
  833. .fsync_meta = fsync_meta,
  834. .ready_count = 0,
  835. .state = 0,
  836. });
  837. sync_found:
  838. cur_sync->ready_count++;
  839. flusher->syncing_flushers++;
  840. resume_1:
  841. if (!cur_sync->state)
  842. {
  843. if (flusher->syncing_flushers >= flusher->cur_flusher_count || !flusher->flush_queue.size())
  844. {
  845. // Sync batch is ready. Do it.
  846. await_sqe(0);
  847. data->iov = { 0 };
  848. data->callback = simple_callback_w;
  849. my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
  850. cur_sync->state = 1;
  851. wait_count++;
  852. resume_2:
  853. if (wait_count > 0)
  854. {
  855. wait_state = 2;
  856. return false;
  857. }
  858. // Sync completed. All previous coroutines waiting for it must be resumed
  859. cur_sync->state = 2;
  860. bs->ringloop->wakeup();
  861. }
  862. else
  863. {
  864. // Wait until someone else sends and completes a sync.
  865. wait_state = 1;
  866. return false;
  867. }
  868. }
  869. flusher->syncing_flushers--;
  870. cur_sync->ready_count--;
  871. if (cur_sync->ready_count == 0)
  872. {
  873. flusher->syncs.erase(cur_sync);
  874. }
  875. }
  876. return true;
  877. }