Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

698 lines
26 KiB

  1. #include "blockstore_impl.h"
  2. blockstore_init_meta::blockstore_init_meta(blockstore_impl_t *bs)
  3. {
  4. this->bs = bs;
  5. }
  6. void blockstore_init_meta::handle_event(ring_data_t *data)
  7. {
  8. if (data->res <= 0)
  9. {
  10. throw std::runtime_error(
  11. std::string("read metadata failed at offset ") + std::to_string(metadata_read) +
  12. std::string(": ") + strerror(-data->res)
  13. );
  14. }
  15. prev_done = data->res > 0 ? submitted : 0;
  16. done_len = data->res;
  17. done_pos = metadata_read;
  18. metadata_read += data->res;
  19. submitted = 0;
  20. }
  21. int blockstore_init_meta::loop()
  22. {
  23. if (wait_state == 1)
  24. goto resume_1;
  25. printf("Reading blockstore metadata\n");
  26. if (bs->inmemory_meta)
  27. metadata_buffer = bs->metadata_buffer;
  28. else
  29. metadata_buffer = memalign(MEM_ALIGNMENT, 2*bs->metadata_buf_size);
  30. if (!metadata_buffer)
  31. throw std::runtime_error("Failed to allocate metadata read buffer");
  32. while (1)
  33. {
  34. resume_1:
  35. if (submitted)
  36. {
  37. wait_state = 1;
  38. return 1;
  39. }
  40. if (metadata_read < bs->meta_len)
  41. {
  42. sqe = bs->get_sqe();
  43. if (!sqe)
  44. {
  45. throw std::runtime_error("io_uring is full while trying to read metadata");
  46. }
  47. data = ((ring_data_t*)sqe->user_data);
  48. data->iov = {
  49. metadata_buffer + (bs->inmemory_meta
  50. ? metadata_read
  51. : (prev == 1 ? bs->metadata_buf_size : 0)),
  52. bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read,
  53. };
  54. data->callback = [this](ring_data_t *data) { handle_event(data); };
  55. my_uring_prep_readv(sqe, bs->meta_fd_index, &data->iov, 1, bs->meta_offset + metadata_read);
  56. sqe->flags |= IOSQE_FIXED_FILE;
  57. bs->ringloop->submit();
  58. submitted = (prev == 1 ? 2 : 1);
  59. prev = submitted;
  60. }
  61. if (prev_done)
  62. {
  63. void *done_buf = bs->inmemory_meta
  64. ? (metadata_buffer + done_pos)
  65. : (metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0));
  66. unsigned count = bs->meta_block_size / bs->clean_entry_size;
  67. for (int sector = 0; sector < done_len; sector += bs->meta_block_size)
  68. {
  69. // handle <count> entries
  70. handle_entries(done_buf + sector, count, bs->block_order);
  71. done_cnt += count;
  72. }
  73. prev_done = 0;
  74. done_len = 0;
  75. }
  76. if (!submitted)
  77. {
  78. break;
  79. }
  80. }
  81. // metadata read finished
  82. printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count);
  83. if (!bs->inmemory_meta)
  84. {
  85. free(metadata_buffer);
  86. metadata_buffer = NULL;
  87. }
  88. return 0;
  89. }
  90. void blockstore_init_meta::handle_entries(void* entries, unsigned count, int block_order)
  91. {
  92. for (unsigned i = 0; i < count; i++)
  93. {
  94. clean_disk_entry *entry = (clean_disk_entry*)(entries + i*bs->clean_entry_size);
  95. if (!bs->inmemory_meta && bs->clean_entry_bitmap_size)
  96. {
  97. memcpy(bs->clean_bitmap + (done_cnt+i)*bs->clean_entry_bitmap_size, &entry->bitmap, bs->clean_entry_bitmap_size);
  98. }
  99. if (entry->oid.inode > 0)
  100. {
  101. auto clean_it = bs->clean_db.find(entry->oid);
  102. if (clean_it == bs->clean_db.end() || clean_it->second.version < entry->version)
  103. {
  104. if (clean_it != bs->clean_db.end())
  105. {
  106. // free the previous block
  107. #ifdef BLOCKSTORE_DEBUG
  108. printf("Free block %lu\n", clean_it->second.location >> bs->block_order);
  109. #endif
  110. bs->data_alloc->set(clean_it->second.location >> block_order, false);
  111. }
  112. entries_loaded++;
  113. #ifdef BLOCKSTORE_DEBUG
  114. printf("Allocate block (clean entry) %lu: %lu:%lu v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version);
  115. #endif
  116. bs->data_alloc->set(done_cnt+i, true);
  117. bs->clean_db[entry->oid] = (struct clean_entry){
  118. .version = entry->version,
  119. .location = (done_cnt+i) << block_order,
  120. };
  121. }
  122. else
  123. {
  124. #ifdef BLOCKSTORE_DEBUG
  125. printf("Old clean entry %lu: %lu:%lu v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version);
  126. #endif
  127. }
  128. }
  129. }
  130. }
  131. blockstore_init_journal::blockstore_init_journal(blockstore_impl_t *bs)
  132. {
  133. this->bs = bs;
  134. next_free = bs->journal.block_size;
  135. simple_callback = [this](ring_data_t *data1)
  136. {
  137. if (data1->res != data1->iov.iov_len)
  138. {
  139. throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res));
  140. }
  141. wait_count--;
  142. };
  143. }
  144. bool iszero(uint64_t *buf, int len)
  145. {
  146. for (int i = 0; i < len; i++)
  147. if (buf[i] != 0)
  148. return false;
  149. return true;
  150. }
  151. void blockstore_init_journal::handle_event(ring_data_t *data1)
  152. {
  153. if (data1->res <= 0)
  154. {
  155. throw std::runtime_error(
  156. std::string("read journal failed at offset ") + std::to_string(journal_pos) +
  157. std::string(": ") + strerror(-data1->res)
  158. );
  159. }
  160. done.push_back({
  161. .buf = submitted_buf,
  162. .pos = journal_pos,
  163. .len = (uint64_t)data1->res,
  164. });
  165. journal_pos += data1->res;
  166. if (journal_pos >= bs->journal.len)
  167. {
  168. // Continue from the beginning
  169. journal_pos = bs->journal.block_size;
  170. wrapped = true;
  171. }
  172. submitted_buf = NULL;
  173. }
  174. #define GET_SQE() \
  175. sqe = bs->get_sqe();\
  176. if (!sqe)\
  177. throw std::runtime_error("io_uring is full while trying to read journal");\
  178. data = ((ring_data_t*)sqe->user_data)
  179. int blockstore_init_journal::loop()
  180. {
  181. if (wait_state == 1)
  182. goto resume_1;
  183. else if (wait_state == 2)
  184. goto resume_2;
  185. else if (wait_state == 3)
  186. goto resume_3;
  187. else if (wait_state == 4)
  188. goto resume_4;
  189. else if (wait_state == 5)
  190. goto resume_5;
  191. else if (wait_state == 6)
  192. goto resume_6;
  193. else if (wait_state == 7)
  194. goto resume_7;
  195. printf("Reading blockstore journal\n");
  196. if (!bs->journal.inmemory)
  197. {
  198. submitted_buf = memalign(MEM_ALIGNMENT, 2*bs->journal.block_size);
  199. if (!submitted_buf)
  200. throw std::bad_alloc();
  201. }
  202. else
  203. submitted_buf = bs->journal.buffer;
  204. // Read first block of the journal
  205. sqe = bs->get_sqe();
  206. if (!sqe)
  207. throw std::runtime_error("io_uring is full while trying to read journal");
  208. data = ((ring_data_t*)sqe->user_data);
  209. data->iov = { submitted_buf, bs->journal.block_size };
  210. data->callback = simple_callback;
  211. my_uring_prep_readv(sqe, bs->journal_fd_index, &data->iov, 1, bs->journal.offset);
  212. sqe->flags |= IOSQE_FIXED_FILE;
  213. bs->ringloop->submit();
  214. wait_count = 1;
  215. resume_1:
  216. if (wait_count > 0)
  217. {
  218. wait_state = 1;
  219. return 1;
  220. }
  221. if (iszero((uint64_t*)submitted_buf, 3))
  222. {
  223. // Journal is empty
  224. // FIXME handle this wrapping to journal_block_size better (maybe)
  225. bs->journal.used_start = bs->journal.block_size;
  226. bs->journal.next_free = bs->journal.block_size;
  227. // Initialize journal "superblock" and the first block
  228. memset(submitted_buf, 0, 2*bs->journal.block_size);
  229. *((journal_entry_start*)submitted_buf) = {
  230. .crc32 = 0,
  231. .magic = JOURNAL_MAGIC,
  232. .type = JE_START,
  233. .size = sizeof(journal_entry_start),
  234. .reserved = 0,
  235. .journal_start = bs->journal.block_size,
  236. };
  237. ((journal_entry_start*)submitted_buf)->crc32 = je_crc32((journal_entry*)submitted_buf);
  238. if (bs->readonly)
  239. {
  240. printf("Skipping journal initialization because blockstore is readonly\n");
  241. }
  242. else
  243. {
  244. // Cool effect. Same operations result in journal replay.
  245. // FIXME: Randomize initial crc32. Track crc32 when trimming.
  246. printf("Resetting journal\n");
  247. GET_SQE();
  248. data->iov = (struct iovec){ submitted_buf, 2*bs->journal.block_size };
  249. data->callback = simple_callback;
  250. my_uring_prep_writev(sqe, bs->journal_fd_index, &data->iov, 1, bs->journal.offset);
  251. sqe->flags |= IOSQE_FIXED_FILE;
  252. wait_count++;
  253. bs->ringloop->submit();
  254. resume_6:
  255. if (wait_count > 0)
  256. {
  257. wait_state = 6;
  258. return 1;
  259. }
  260. if (!bs->disable_journal_fsync)
  261. {
  262. GET_SQE();
  263. my_uring_prep_fsync(sqe, bs->journal_fd_index, IORING_FSYNC_DATASYNC);
  264. sqe->flags |= IOSQE_FIXED_FILE;
  265. data->iov = { 0 };
  266. data->callback = simple_callback;
  267. wait_count++;
  268. bs->ringloop->submit();
  269. }
  270. resume_4:
  271. if (wait_count > 0)
  272. {
  273. wait_state = 4;
  274. return 1;
  275. }
  276. }
  277. if (!bs->journal.inmemory)
  278. {
  279. free(submitted_buf);
  280. }
  281. }
  282. else
  283. {
  284. // First block always contains a single JE_START entry
  285. je_start = (journal_entry_start*)submitted_buf;
  286. if (je_start->magic != JOURNAL_MAGIC ||
  287. je_start->type != JE_START ||
  288. je_start->size != sizeof(journal_entry_start) ||
  289. je_crc32((journal_entry*)je_start) != je_start->crc32)
  290. {
  291. // Entry is corrupt
  292. throw std::runtime_error("first entry of the journal is corrupt");
  293. }
  294. next_free = journal_pos = bs->journal.used_start = je_start->journal_start;
  295. if (!bs->journal.inmemory)
  296. free(submitted_buf);
  297. submitted_buf = NULL;
  298. crc32_last = 0;
  299. // Read journal
  300. while (1)
  301. {
  302. resume_2:
  303. if (submitted_buf)
  304. {
  305. wait_state = 2;
  306. return 1;
  307. }
  308. if (!wrapped || journal_pos < bs->journal.used_start)
  309. {
  310. GET_SQE();
  311. uint64_t end = bs->journal.len;
  312. if (journal_pos < bs->journal.used_start)
  313. end = bs->journal.used_start;
  314. if (!bs->journal.inmemory)
  315. submitted_buf = memalign(MEM_ALIGNMENT, JOURNAL_BUFFER_SIZE);
  316. else
  317. submitted_buf = bs->journal.buffer + journal_pos;
  318. data->iov = {
  319. submitted_buf,
  320. end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
  321. };
  322. data->callback = [this](ring_data_t *data1) { handle_event(data1); };
  323. my_uring_prep_readv(sqe, bs->journal_fd_index, &data->iov, 1, bs->journal.offset + journal_pos);
  324. sqe->flags |= IOSQE_FIXED_FILE;
  325. bs->ringloop->submit();
  326. }
  327. while (done.size() > 0)
  328. {
  329. handle_res = handle_journal_part(done[0].buf, done[0].pos, done[0].len);
  330. if (handle_res == 0)
  331. {
  332. // journal ended
  333. // zero out corrupted entry, if required
  334. if (init_write_buf && !bs->readonly)
  335. {
  336. GET_SQE();
  337. data->iov = { init_write_buf, bs->journal.block_size };
  338. data->callback = simple_callback;
  339. my_uring_prep_writev(sqe, bs->journal_fd_index, &data->iov, 1, bs->journal.offset + init_write_sector);
  340. sqe->flags |= IOSQE_FIXED_FILE;
  341. wait_count++;
  342. bs->ringloop->submit();
  343. resume_7:
  344. if (wait_count > 0)
  345. {
  346. wait_state = 7;
  347. return 1;
  348. }
  349. if (!bs->disable_journal_fsync)
  350. {
  351. GET_SQE();
  352. data->iov = { 0 };
  353. data->callback = simple_callback;
  354. my_uring_prep_fsync(sqe, bs->journal_fd_index, IORING_FSYNC_DATASYNC);
  355. sqe->flags |= IOSQE_FIXED_FILE;
  356. wait_count++;
  357. bs->ringloop->submit();
  358. }
  359. resume_5:
  360. if (wait_count > 0)
  361. {
  362. wait_state = 5;
  363. return 1;
  364. }
  365. }
  366. // wait for the next read to complete, then stop
  367. resume_3:
  368. if (submitted_buf)
  369. {
  370. wait_state = 3;
  371. return 1;
  372. }
  373. // free buffers
  374. if (!bs->journal.inmemory)
  375. for (auto & e: done)
  376. free(e.buf);
  377. done.clear();
  378. break;
  379. }
  380. else if (handle_res == 1)
  381. {
  382. // OK, remove it
  383. if (!bs->journal.inmemory)
  384. {
  385. free(done[0].buf);
  386. }
  387. done.erase(done.begin());
  388. }
  389. else if (handle_res == 2)
  390. {
  391. // Need to wait for more reads
  392. break;
  393. }
  394. }
  395. if (!submitted_buf)
  396. {
  397. break;
  398. }
  399. }
  400. }
  401. // Trim journal on start so we don't stall when all entries are older
  402. bs->journal.trim();
  403. printf(
  404. "Journal entries loaded: %lu, free journal space: %lu bytes (%lu..%lu is used), free blocks: %lu / %lu\n",
  405. entries_loaded,
  406. (bs->journal.next_free >= bs->journal.used_start
  407. ? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start)
  408. : bs->journal.used_start - bs->journal.next_free),
  409. bs->journal.used_start, bs->journal.next_free,
  410. bs->data_alloc->get_free_count(), bs->block_count
  411. );
  412. bs->journal.crc32_last = crc32_last;
  413. return 0;
  414. }
  415. int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, uint64_t len)
  416. {
  417. uint64_t proc_pos, pos;
  418. if (continue_pos != 0)
  419. {
  420. proc_pos = (continue_pos / bs->journal.block_size) * bs->journal.block_size;
  421. pos = continue_pos % bs->journal.block_size;
  422. continue_pos = 0;
  423. goto resume;
  424. }
  425. while (next_free >= done_pos && next_free < done_pos+len)
  426. {
  427. proc_pos = next_free;
  428. pos = 0;
  429. next_free += bs->journal.block_size;
  430. if (next_free >= bs->journal.len)
  431. {
  432. next_free = bs->journal.block_size;
  433. }
  434. resume:
  435. while (pos < bs->journal.block_size)
  436. {
  437. journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos);
  438. if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
  439. je->type < JE_SMALL_WRITE || je->type > JE_DELETE || started && je->crc32_prev != crc32_last)
  440. {
  441. if (pos == 0)
  442. {
  443. // invalid entry in the beginning, this is definitely the end of the journal
  444. bs->journal.next_free = proc_pos;
  445. return 0;
  446. }
  447. else
  448. {
  449. // allow partially filled sectors
  450. break;
  451. }
  452. }
  453. if (je->type == JE_SMALL_WRITE)
  454. {
  455. #ifdef BLOCKSTORE_DEBUG
  456. printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version, je->small_write.offset, je->small_write.len);
  457. #endif
  458. // oid, version, offset, len
  459. uint64_t prev_free = next_free;
  460. if (next_free + je->small_write.len > bs->journal.len)
  461. {
  462. // data continues from the beginning of the journal
  463. next_free = bs->journal.block_size;
  464. }
  465. uint64_t location = next_free;
  466. next_free += je->small_write.len;
  467. if (next_free >= bs->journal.len)
  468. {
  469. next_free = bs->journal.block_size;
  470. }
  471. if (location != je->small_write.data_offset)
  472. {
  473. char err[1024];
  474. snprintf(err, 1024, "BUG: calculated journal data offset (%lu) != stored journal data offset (%lu)", location, je->small_write.data_offset);
  475. throw std::runtime_error(err);
  476. }
  477. uint32_t data_crc32 = 0;
  478. if (location >= done_pos && location+je->small_write.len <= done_pos+len)
  479. {
  480. // data is within this buffer
  481. data_crc32 = crc32c(0, buf + location - done_pos, je->small_write.len);
  482. }
  483. else
  484. {
  485. // this case is even more interesting because we must carry data crc32 check to next buffer(s)
  486. uint64_t covered = 0;
  487. for (int i = 0; i < done.size(); i++)
  488. {
  489. if (location+je->small_write.len > done[i].pos &&
  490. location < done[i].pos+done[i].len)
  491. {
  492. uint64_t part_end = (location+je->small_write.len < done[i].pos+done[i].len
  493. ? location+je->small_write.len : done[i].pos+done[i].len);
  494. uint64_t part_begin = (location < done[i].pos ? done[i].pos : location);
  495. covered += part_end - part_begin;
  496. data_crc32 = crc32c(data_crc32, done[i].buf + part_begin - done[i].pos, part_end - part_begin);
  497. }
  498. }
  499. if (covered < je->small_write.len)
  500. {
  501. continue_pos = proc_pos+pos;
  502. next_free = prev_free;
  503. return 2;
  504. }
  505. }
  506. if (data_crc32 != je->small_write.crc32_data)
  507. {
  508. // journal entry is corrupt, stop here
  509. // interesting thing is that we must clear the corrupt entry if we're not readonly
  510. memset(buf + proc_pos - done_pos + pos, 0, bs->journal.block_size - pos);
  511. bs->journal.next_free = prev_free;
  512. init_write_buf = buf + proc_pos - done_pos;
  513. init_write_sector = proc_pos;
  514. return 0;
  515. }
  516. auto clean_it = bs->clean_db.find(je->small_write.oid);
  517. if (clean_it == bs->clean_db.end() ||
  518. clean_it->second.version < je->big_write.version)
  519. {
  520. obj_ver_id ov = {
  521. .oid = je->small_write.oid,
  522. .version = je->small_write.version,
  523. };
  524. bs->dirty_db.emplace(ov, (dirty_entry){
  525. .state = ST_J_SYNCED,
  526. .flags = 0,
  527. .location = location,
  528. .offset = je->small_write.offset,
  529. .len = je->small_write.len,
  530. .journal_sector = proc_pos,
  531. });
  532. bs->journal.used_sectors[proc_pos]++;
  533. #ifdef BLOCKSTORE_DEBUG
  534. printf("journal offset %lu is used by %lu:%lu v%lu\n", proc_pos, ov.oid.inode, ov.oid.stripe, ov.version);
  535. #endif
  536. auto & unstab = bs->unstable_writes[ov.oid];
  537. unstab = unstab < ov.version ? ov.version : unstab;
  538. }
  539. }
  540. else if (je->type == JE_BIG_WRITE)
  541. {
  542. #ifdef BLOCKSTORE_DEBUG
  543. printf("je_big_write oid=%lu:%lu ver=%lu loc=%lu\n", je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location);
  544. #endif
  545. auto clean_it = bs->clean_db.find(je->big_write.oid);
  546. if (clean_it == bs->clean_db.end() ||
  547. clean_it->second.version < je->big_write.version)
  548. {
  549. // oid, version, block
  550. obj_ver_id ov = {
  551. .oid = je->big_write.oid,
  552. .version = je->big_write.version,
  553. };
  554. bs->dirty_db.emplace(ov, (dirty_entry){
  555. .state = ST_D_META_SYNCED,
  556. .flags = 0,
  557. .location = je->big_write.location,
  558. .offset = je->big_write.offset,
  559. .len = je->big_write.len,
  560. .journal_sector = proc_pos,
  561. });
  562. #ifdef BLOCKSTORE_DEBUG
  563. printf("Allocate block %lu\n", je->big_write.location >> bs->block_order);
  564. #endif
  565. bs->data_alloc->set(je->big_write.location >> bs->block_order, true);
  566. bs->journal.used_sectors[proc_pos]++;
  567. auto & unstab = bs->unstable_writes[ov.oid];
  568. unstab = unstab < ov.version ? ov.version : unstab;
  569. }
  570. }
  571. else if (je->type == JE_STABLE)
  572. {
  573. #ifdef BLOCKSTORE_DEBUG
  574. printf("je_stable oid=%lu:%lu ver=%lu\n", je->stable.oid.inode, je->stable.oid.stripe, je->stable.version);
  575. #endif
  576. // oid, version
  577. obj_ver_id ov = {
  578. .oid = je->stable.oid,
  579. .version = je->stable.version,
  580. };
  581. auto it = bs->dirty_db.find(ov);
  582. if (it == bs->dirty_db.end())
  583. {
  584. // journal contains a legitimate STABLE entry for a non-existing dirty write
  585. // this probably means that journal was trimmed between WRITE and STABLE entries
  586. // skip it
  587. }
  588. else
  589. {
  590. while (1)
  591. {
  592. it->second.state = (it->second.state == ST_D_META_SYNCED
  593. ? ST_D_STABLE
  594. : (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
  595. if (it == bs->dirty_db.begin())
  596. break;
  597. it--;
  598. if (it->first.oid != ov.oid || IS_STABLE(it->second.state))
  599. break;
  600. }
  601. bs->flusher->enqueue_flush(ov);
  602. }
  603. auto unstab_it = bs->unstable_writes.find(ov.oid);
  604. if (unstab_it != bs->unstable_writes.end() && unstab_it->second <= ov.version)
  605. {
  606. bs->unstable_writes.erase(unstab_it);
  607. }
  608. }
  609. else if (je->type == JE_ROLLBACK)
  610. {
  611. #ifdef BLOCKSTORE_DEBUG
  612. printf("je_rollback oid=%lu:%lu ver=%lu\n", je->rollback.oid.inode, je->rollback.oid.stripe, je->rollback.version);
  613. #endif
  614. // rollback dirty writes of <oid> up to <version>
  615. auto it = bs->dirty_db.lower_bound((obj_ver_id){
  616. .oid = je->rollback.oid,
  617. .version = UINT64_MAX,
  618. });
  619. if (it != bs->dirty_db.begin())
  620. {
  621. uint64_t max_unstable = 0;
  622. auto rm_start = it;
  623. auto rm_end = it;
  624. it--;
  625. while (it->first.oid == je->rollback.oid &&
  626. it->first.version > je->rollback.version &&
  627. !IS_IN_FLIGHT(it->second.state) &&
  628. !IS_STABLE(it->second.state))
  629. {
  630. if (it->first.oid != je->rollback.oid)
  631. break;
  632. else if (it->first.version <= je->rollback.version)
  633. {
  634. if (!IS_STABLE(it->second.state))
  635. max_unstable = it->first.version;
  636. break;
  637. }
  638. else if (IS_STABLE(it->second.state))
  639. break;
  640. // Remove entry
  641. rm_start = it;
  642. if (it == bs->dirty_db.begin())
  643. break;
  644. it--;
  645. }
  646. if (rm_start != rm_end)
  647. {
  648. bs->erase_dirty(rm_start, rm_end, UINT64_MAX);
  649. }
  650. auto unstab_it = bs->unstable_writes.find(je->rollback.oid);
  651. if (unstab_it != bs->unstable_writes.end())
  652. {
  653. if (max_unstable == 0)
  654. bs->unstable_writes.erase(unstab_it);
  655. else
  656. unstab_it->second = max_unstable;
  657. }
  658. }
  659. }
  660. else if (je->type == JE_DELETE)
  661. {
  662. #ifdef BLOCKSTORE_DEBUG
  663. printf("je_delete oid=%lu:%lu ver=%lu\n", je->del.oid.inode, je->del.oid.stripe, je->del.version);
  664. #endif
  665. // oid, version
  666. obj_ver_id ov = {
  667. .oid = je->del.oid,
  668. .version = je->del.version,
  669. };
  670. bs->dirty_db.emplace(ov, (dirty_entry){
  671. .state = ST_DEL_SYNCED,
  672. .flags = 0,
  673. .location = 0,
  674. .offset = 0,
  675. .len = 0,
  676. .journal_sector = proc_pos,
  677. });
  678. bs->journal.used_sectors[proc_pos]++;
  679. }
  680. started = true;
  681. pos += je->size;
  682. crc32_last = je->crc32;
  683. entries_loaded++;
  684. }
  685. }
  686. bs->journal.next_free = next_free;
  687. return 1;
  688. }