Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

743 lines
29 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. blockstore_init_meta::blockstore_init_meta(blockstore_impl_t *bs)
  5. {
  6. this->bs = bs;
  7. }
  8. void blockstore_init_meta::handle_event(ring_data_t *data)
  9. {
  10. if (data->res <= 0)
  11. {
  12. throw std::runtime_error(
  13. std::string("read metadata failed at offset ") + std::to_string(metadata_read) +
  14. std::string(": ") + strerror(-data->res)
  15. );
  16. }
  17. prev_done = data->res > 0 ? submitted : 0;
  18. done_len = data->res;
  19. done_pos = metadata_read;
  20. metadata_read += data->res;
  21. submitted = 0;
  22. }
  23. int blockstore_init_meta::loop()
  24. {
  25. if (wait_state == 1)
  26. goto resume_1;
  27. printf("Reading blockstore metadata\n");
  28. if (bs->inmemory_meta)
  29. metadata_buffer = bs->metadata_buffer;
  30. else
  31. metadata_buffer = memalign(MEM_ALIGNMENT, 2*bs->metadata_buf_size);
  32. if (!metadata_buffer)
  33. throw std::runtime_error("Failed to allocate metadata read buffer");
  34. while (1)
  35. {
  36. resume_1:
  37. if (submitted)
  38. {
  39. wait_state = 1;
  40. return 1;
  41. }
  42. if (metadata_read < bs->meta_len)
  43. {
  44. sqe = bs->get_sqe();
  45. if (!sqe)
  46. {
  47. throw std::runtime_error("io_uring is full while trying to read metadata");
  48. }
  49. data = ((ring_data_t*)sqe->user_data);
  50. data->iov = {
  51. metadata_buffer + (bs->inmemory_meta
  52. ? metadata_read
  53. : (prev == 1 ? bs->metadata_buf_size : 0)),
  54. bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read,
  55. };
  56. data->callback = [this](ring_data_t *data) { handle_event(data); };
  57. my_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read);
  58. bs->ringloop->submit();
  59. submitted = (prev == 1 ? 2 : 1);
  60. prev = submitted;
  61. }
  62. if (prev_done)
  63. {
  64. void *done_buf = bs->inmemory_meta
  65. ? (metadata_buffer + done_pos)
  66. : (metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0));
  67. unsigned count = bs->meta_block_size / bs->clean_entry_size;
  68. for (int sector = 0; sector < done_len; sector += bs->meta_block_size)
  69. {
  70. // handle <count> entries
  71. handle_entries(done_buf + sector, count, bs->block_order);
  72. done_cnt += count;
  73. }
  74. prev_done = 0;
  75. done_len = 0;
  76. }
  77. if (!submitted)
  78. {
  79. break;
  80. }
  81. }
  82. // metadata read finished
  83. printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count);
  84. if (!bs->inmemory_meta)
  85. {
  86. free(metadata_buffer);
  87. metadata_buffer = NULL;
  88. }
  89. return 0;
  90. }
  91. void blockstore_init_meta::handle_entries(void* entries, unsigned count, int block_order)
  92. {
  93. for (unsigned i = 0; i < count; i++)
  94. {
  95. clean_disk_entry *entry = (clean_disk_entry*)(entries + i*bs->clean_entry_size);
  96. if (!bs->inmemory_meta && (bs->clean_entry_bitmap_size || bs->entry_attr_size))
  97. {
  98. memcpy(bs->clean_bitmap + (done_cnt+i)*(bs->clean_entry_bitmap_size + bs->entry_attr_size), &entry->bitmap, (bs->clean_entry_bitmap_size + bs->entry_attr_size));
  99. }
  100. if (entry->oid.inode > 0)
  101. {
  102. auto clean_it = bs->clean_db.find(entry->oid);
  103. if (clean_it == bs->clean_db.end() || clean_it->second.version < entry->version)
  104. {
  105. if (clean_it != bs->clean_db.end())
  106. {
  107. // free the previous block
  108. #ifdef BLOCKSTORE_DEBUG
  109. printf("Free block %lu (new location is %lu)\n", clean_it->second.location >> block_order, done_cnt+i);
  110. #endif
  111. bs->data_alloc->set(clean_it->second.location >> block_order, false);
  112. }
  113. else
  114. {
  115. bs->inode_space_stats[entry->oid.inode] += bs->block_size;
  116. }
  117. entries_loaded++;
  118. #ifdef BLOCKSTORE_DEBUG
  119. printf("Allocate block (clean entry) %lu: %lx:%lx v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version);
  120. #endif
  121. bs->data_alloc->set(done_cnt+i, true);
  122. bs->clean_db[entry->oid] = (struct clean_entry){
  123. .version = entry->version,
  124. .location = (done_cnt+i) << block_order,
  125. };
  126. }
  127. else
  128. {
  129. #ifdef BLOCKSTORE_DEBUG
  130. printf("Old clean entry %lu: %lx:%lx v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version);
  131. #endif
  132. }
  133. }
  134. }
  135. }
  136. blockstore_init_journal::blockstore_init_journal(blockstore_impl_t *bs)
  137. {
  138. this->bs = bs;
  139. next_free = bs->journal.block_size;
  140. simple_callback = [this](ring_data_t *data1)
  141. {
  142. if (data1->res != data1->iov.iov_len)
  143. {
  144. throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res));
  145. }
  146. wait_count--;
  147. };
  148. }
  149. bool iszero(uint64_t *buf, int len)
  150. {
  151. for (int i = 0; i < len; i++)
  152. if (buf[i] != 0)
  153. return false;
  154. return true;
  155. }
  156. void blockstore_init_journal::handle_event(ring_data_t *data1)
  157. {
  158. if (data1->res <= 0)
  159. {
  160. throw std::runtime_error(
  161. std::string("read journal failed at offset ") + std::to_string(journal_pos) +
  162. std::string(": ") + strerror(-data1->res)
  163. );
  164. }
  165. done.push_back({
  166. .buf = submitted_buf,
  167. .pos = journal_pos,
  168. .len = (uint64_t)data1->res,
  169. });
  170. journal_pos += data1->res;
  171. if (journal_pos >= bs->journal.len)
  172. {
  173. // Continue from the beginning
  174. journal_pos = bs->journal.block_size;
  175. wrapped = true;
  176. }
  177. submitted_buf = NULL;
  178. }
  179. #define GET_SQE() \
  180. sqe = bs->get_sqe();\
  181. if (!sqe)\
  182. throw std::runtime_error("io_uring is full while trying to read journal");\
  183. data = ((ring_data_t*)sqe->user_data)
  184. int blockstore_init_journal::loop()
  185. {
  186. if (wait_state == 1)
  187. goto resume_1;
  188. else if (wait_state == 2)
  189. goto resume_2;
  190. else if (wait_state == 3)
  191. goto resume_3;
  192. else if (wait_state == 4)
  193. goto resume_4;
  194. else if (wait_state == 5)
  195. goto resume_5;
  196. else if (wait_state == 6)
  197. goto resume_6;
  198. else if (wait_state == 7)
  199. goto resume_7;
  200. printf("Reading blockstore journal\n");
  201. if (!bs->journal.inmemory)
  202. submitted_buf = memalign_or_die(MEM_ALIGNMENT, 2*bs->journal.block_size);
  203. else
  204. submitted_buf = bs->journal.buffer;
  205. // Read first block of the journal
  206. sqe = bs->get_sqe();
  207. if (!sqe)
  208. throw std::runtime_error("io_uring is full while trying to read journal");
  209. data = ((ring_data_t*)sqe->user_data);
  210. data->iov = { submitted_buf, bs->journal.block_size };
  211. data->callback = simple_callback;
  212. my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
  213. bs->ringloop->submit();
  214. wait_count = 1;
  215. resume_1:
  216. if (wait_count > 0)
  217. {
  218. wait_state = 1;
  219. return 1;
  220. }
  221. if (iszero((uint64_t*)submitted_buf, 3))
  222. {
  223. // Journal is empty
  224. // FIXME handle this wrapping to journal_block_size better (maybe)
  225. bs->journal.used_start = bs->journal.block_size;
  226. bs->journal.next_free = bs->journal.block_size;
  227. // Initialize journal "superblock" and the first block
  228. memset(submitted_buf, 0, 2*bs->journal.block_size);
  229. *((journal_entry_start*)submitted_buf) = {
  230. .crc32 = 0,
  231. .magic = JOURNAL_MAGIC,
  232. .type = JE_START,
  233. .size = sizeof(journal_entry_start),
  234. .reserved = 0,
  235. .journal_start = bs->journal.block_size,
  236. };
  237. ((journal_entry_start*)submitted_buf)->crc32 = je_crc32((journal_entry*)submitted_buf);
  238. if (bs->readonly)
  239. {
  240. printf("Skipping journal initialization because blockstore is readonly\n");
  241. }
  242. else
  243. {
  244. // Cool effect. Same operations result in journal replay.
  245. // FIXME: Randomize initial crc32. Track crc32 when trimming.
  246. printf("Resetting journal\n");
  247. GET_SQE();
  248. data->iov = (struct iovec){ submitted_buf, 2*bs->journal.block_size };
  249. data->callback = simple_callback;
  250. my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
  251. wait_count++;
  252. bs->ringloop->submit();
  253. resume_6:
  254. if (wait_count > 0)
  255. {
  256. wait_state = 6;
  257. return 1;
  258. }
  259. if (!bs->disable_journal_fsync)
  260. {
  261. GET_SQE();
  262. my_uring_prep_fsync(sqe, bs->journal.fd, IORING_FSYNC_DATASYNC);
  263. data->iov = { 0 };
  264. data->callback = simple_callback;
  265. wait_count++;
  266. bs->ringloop->submit();
  267. }
  268. resume_4:
  269. if (wait_count > 0)
  270. {
  271. wait_state = 4;
  272. return 1;
  273. }
  274. }
  275. if (!bs->journal.inmemory)
  276. {
  277. free(submitted_buf);
  278. }
  279. }
  280. else
  281. {
  282. // First block always contains a single JE_START entry
  283. je_start = (journal_entry_start*)submitted_buf;
  284. if (je_start->magic != JOURNAL_MAGIC ||
  285. je_start->type != JE_START ||
  286. je_start->size != sizeof(journal_entry_start) ||
  287. je_crc32((journal_entry*)je_start) != je_start->crc32)
  288. {
  289. // Entry is corrupt
  290. throw std::runtime_error("first entry of the journal is corrupt");
  291. }
  292. next_free = journal_pos = bs->journal.used_start = je_start->journal_start;
  293. if (!bs->journal.inmemory)
  294. free(submitted_buf);
  295. submitted_buf = NULL;
  296. crc32_last = 0;
  297. // Read journal
  298. while (1)
  299. {
  300. resume_2:
  301. if (submitted_buf)
  302. {
  303. wait_state = 2;
  304. return 1;
  305. }
  306. if (!wrapped || journal_pos < bs->journal.used_start)
  307. {
  308. GET_SQE();
  309. uint64_t end = bs->journal.len;
  310. if (journal_pos < bs->journal.used_start)
  311. end = bs->journal.used_start;
  312. if (!bs->journal.inmemory)
  313. submitted_buf = memalign_or_die(MEM_ALIGNMENT, JOURNAL_BUFFER_SIZE);
  314. else
  315. submitted_buf = bs->journal.buffer + journal_pos;
  316. data->iov = {
  317. submitted_buf,
  318. end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
  319. };
  320. data->callback = [this](ring_data_t *data1) { handle_event(data1); };
  321. my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
  322. bs->ringloop->submit();
  323. }
  324. while (done.size() > 0)
  325. {
  326. handle_res = handle_journal_part(done[0].buf, done[0].pos, done[0].len);
  327. if (handle_res == 0)
  328. {
  329. // journal ended
  330. // zero out corrupted entry, if required
  331. if (init_write_buf && !bs->readonly)
  332. {
  333. GET_SQE();
  334. data->iov = { init_write_buf, bs->journal.block_size };
  335. data->callback = simple_callback;
  336. my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + init_write_sector);
  337. wait_count++;
  338. bs->ringloop->submit();
  339. resume_7:
  340. if (wait_count > 0)
  341. {
  342. wait_state = 7;
  343. return 1;
  344. }
  345. if (!bs->disable_journal_fsync)
  346. {
  347. GET_SQE();
  348. data->iov = { 0 };
  349. data->callback = simple_callback;
  350. my_uring_prep_fsync(sqe, bs->journal.fd, IORING_FSYNC_DATASYNC);
  351. wait_count++;
  352. bs->ringloop->submit();
  353. }
  354. resume_5:
  355. if (wait_count > 0)
  356. {
  357. wait_state = 5;
  358. return 1;
  359. }
  360. }
  361. // wait for the next read to complete, then stop
  362. resume_3:
  363. if (submitted_buf)
  364. {
  365. wait_state = 3;
  366. return 1;
  367. }
  368. // free buffers
  369. if (!bs->journal.inmemory)
  370. for (auto & e: done)
  371. free(e.buf);
  372. done.clear();
  373. break;
  374. }
  375. else if (handle_res == 1)
  376. {
  377. // OK, remove it
  378. if (!bs->journal.inmemory)
  379. {
  380. free(done[0].buf);
  381. }
  382. done.erase(done.begin());
  383. }
  384. else if (handle_res == 2)
  385. {
  386. // Need to wait for more reads
  387. break;
  388. }
  389. }
  390. if (!submitted_buf)
  391. {
  392. break;
  393. }
  394. }
  395. }
  396. bs->flusher->mark_trim_possible();
  397. bs->journal.dirty_start = bs->journal.next_free;
  398. printf(
  399. "Journal entries loaded: %lu, free journal space: %lu bytes (%08lx..%08lx is used), free blocks: %lu / %lu\n",
  400. entries_loaded,
  401. (bs->journal.next_free >= bs->journal.used_start
  402. ? bs->journal.len-bs->journal.block_size - (bs->journal.next_free-bs->journal.used_start)
  403. : bs->journal.used_start - bs->journal.next_free),
  404. bs->journal.used_start, bs->journal.next_free,
  405. bs->data_alloc->get_free_count(), bs->block_count
  406. );
  407. bs->journal.crc32_last = crc32_last;
  408. return 0;
  409. }
  410. int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, uint64_t len)
  411. {
  412. uint64_t proc_pos, pos;
  413. if (continue_pos != 0)
  414. {
  415. proc_pos = (continue_pos / bs->journal.block_size) * bs->journal.block_size;
  416. pos = continue_pos % bs->journal.block_size;
  417. continue_pos = 0;
  418. goto resume;
  419. }
  420. while (next_free >= done_pos && next_free < done_pos+len)
  421. {
  422. proc_pos = next_free;
  423. pos = 0;
  424. next_free += bs->journal.block_size;
  425. if (next_free >= bs->journal.len)
  426. {
  427. next_free = bs->journal.block_size;
  428. }
  429. resume:
  430. while (pos < bs->journal.block_size)
  431. {
  432. journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos);
  433. if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
  434. je->type < JE_MIN || je->type > JE_MAX || started && je->crc32_prev != crc32_last)
  435. {
  436. if (pos == 0)
  437. {
  438. // invalid entry in the beginning, this is definitely the end of the journal
  439. bs->journal.next_free = proc_pos;
  440. return 0;
  441. }
  442. else
  443. {
  444. // allow partially filled sectors
  445. break;
  446. }
  447. }
  448. if (je->type == JE_SMALL_WRITE || je->type == JE_SMALL_WRITE_INSTANT)
  449. {
  450. #ifdef BLOCKSTORE_DEBUG
  451. printf(
  452. "je_small_write%s oid=%lx:%lx ver=%lu offset=%u len=%u\n",
  453. je->type == JE_SMALL_WRITE_INSTANT ? "_instant" : "",
  454. je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version,
  455. je->small_write.offset, je->small_write.len
  456. );
  457. #endif
  458. // oid, version, offset, len
  459. uint64_t prev_free = next_free;
  460. if (next_free + je->small_write.len > bs->journal.len)
  461. {
  462. // data continues from the beginning of the journal
  463. next_free = bs->journal.block_size;
  464. }
  465. uint64_t location = next_free;
  466. next_free += je->small_write.len;
  467. if (next_free >= bs->journal.len)
  468. {
  469. next_free = bs->journal.block_size;
  470. }
  471. if (location != je->small_write.data_offset)
  472. {
  473. char err[1024];
  474. snprintf(err, 1024, "BUG: calculated journal data offset (%08lx) != stored journal data offset (%08lx)", location, je->small_write.data_offset);
  475. throw std::runtime_error(err);
  476. }
  477. uint32_t data_crc32 = 0;
  478. if (location >= done_pos && location+je->small_write.len <= done_pos+len)
  479. {
  480. // data is within this buffer
  481. data_crc32 = crc32c(0, buf + location - done_pos, je->small_write.len);
  482. }
  483. else
  484. {
  485. // this case is even more interesting because we must carry data crc32 check to next buffer(s)
  486. uint64_t covered = 0;
  487. for (int i = 0; i < done.size(); i++)
  488. {
  489. if (location+je->small_write.len > done[i].pos &&
  490. location < done[i].pos+done[i].len)
  491. {
  492. uint64_t part_end = (location+je->small_write.len < done[i].pos+done[i].len
  493. ? location+je->small_write.len : done[i].pos+done[i].len);
  494. uint64_t part_begin = (location < done[i].pos ? done[i].pos : location);
  495. covered += part_end - part_begin;
  496. data_crc32 = crc32c(data_crc32, done[i].buf + part_begin - done[i].pos, part_end - part_begin);
  497. }
  498. }
  499. if (covered < je->small_write.len)
  500. {
  501. continue_pos = proc_pos+pos;
  502. next_free = prev_free;
  503. return 2;
  504. }
  505. }
  506. if (data_crc32 != je->small_write.crc32_data)
  507. {
  508. // journal entry is corrupt, stop here
  509. // interesting thing is that we must clear the corrupt entry if we're not readonly,
  510. // because we don't write next entries in the same journal block
  511. printf("Journal entry data is corrupt (data crc32 %x != %x)\n", data_crc32, je->small_write.crc32_data);
  512. memset(buf + proc_pos - done_pos + pos, 0, bs->journal.block_size - pos);
  513. bs->journal.next_free = prev_free;
  514. init_write_buf = buf + proc_pos - done_pos;
  515. init_write_sector = proc_pos;
  516. return 0;
  517. }
  518. auto clean_it = bs->clean_db.find(je->small_write.oid);
  519. if (clean_it == bs->clean_db.end() ||
  520. clean_it->second.version < je->small_write.version)
  521. {
  522. obj_ver_id ov = {
  523. .oid = je->small_write.oid,
  524. .version = je->small_write.version,
  525. };
  526. void *bmp = (void*)je + sizeof(journal_entry_small_write);
  527. if (bs->entry_attr_size <= sizeof(void*))
  528. {
  529. memcpy(&bmp, bmp, bs->entry_attr_size);
  530. }
  531. else if (!bs->journal.inmemory)
  532. {
  533. // FIXME Using large blockstore objects and not keeping journal in memory
  534. // will result in a lot of small allocations for entry bitmaps. This can
  535. // only be fixed by using a patched map with dynamic entry size, but not
  536. // the btree_map, because it doesn't keep iterators valid all the time.
  537. void *bmp_cp = malloc_or_die(bs->entry_attr_size);
  538. memcpy(bmp_cp, bmp, bs->entry_attr_size);
  539. bmp = bmp_cp;
  540. }
  541. bs->dirty_db.emplace(ov, (dirty_entry){
  542. .state = (BS_ST_SMALL_WRITE | BS_ST_SYNCED),
  543. .flags = 0,
  544. .location = location,
  545. .offset = je->small_write.offset,
  546. .len = je->small_write.len,
  547. .journal_sector = proc_pos,
  548. .bitmap = bmp,
  549. });
  550. bs->journal.used_sectors[proc_pos]++;
  551. #ifdef BLOCKSTORE_DEBUG
  552. printf(
  553. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  554. proc_pos, ov.oid.inode, ov.oid.stripe, ov.version, bs->journal.used_sectors[proc_pos]
  555. );
  556. #endif
  557. auto & unstab = bs->unstable_writes[ov.oid];
  558. unstab = unstab < ov.version ? ov.version : unstab;
  559. if (je->type == JE_SMALL_WRITE_INSTANT)
  560. {
  561. bs->mark_stable(ov);
  562. }
  563. }
  564. }
  565. else if (je->type == JE_BIG_WRITE || je->type == JE_BIG_WRITE_INSTANT)
  566. {
  567. #ifdef BLOCKSTORE_DEBUG
  568. printf(
  569. "je_big_write%s oid=%lx:%lx ver=%lu loc=%lu\n",
  570. je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "",
  571. je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location >> bs->block_order
  572. );
  573. #endif
  574. auto dirty_it = bs->dirty_db.upper_bound((obj_ver_id){
  575. .oid = je->big_write.oid,
  576. .version = UINT64_MAX,
  577. });
  578. if (dirty_it != bs->dirty_db.begin() && bs->dirty_db.size() > 0)
  579. {
  580. dirty_it--;
  581. if (dirty_it->first.oid == je->big_write.oid &&
  582. dirty_it->first.version >= je->big_write.version &&
  583. (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE)
  584. {
  585. // It is allowed to overwrite a deleted object with a
  586. // version number smaller than deletion version number,
  587. // because the presence of a BIG_WRITE entry means that
  588. // its data and metadata are already flushed.
  589. // We don't know if newer versions are flushed, but
  590. // the previous delete definitely is.
  591. // So we flush previous dirty entries, but retain the clean one.
  592. // This feature is required for writes happening shortly
  593. // after deletes.
  594. auto dirty_end = dirty_it;
  595. dirty_end++;
  596. while (1)
  597. {
  598. if (dirty_it == bs->dirty_db.begin())
  599. {
  600. break;
  601. }
  602. dirty_it--;
  603. if (dirty_it->first.oid != je->big_write.oid)
  604. {
  605. dirty_it++;
  606. break;
  607. }
  608. }
  609. auto clean_it = bs->clean_db.find(je->big_write.oid);
  610. bs->erase_dirty(
  611. dirty_it, dirty_end,
  612. clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX
  613. );
  614. // Remove it from the flusher's queue, too
  615. // Otherwise it may end up referring to a small unstable write after reading the rest of the journal
  616. bs->flusher->remove_flush(je->big_write.oid);
  617. }
  618. }
  619. auto clean_it = bs->clean_db.find(je->big_write.oid);
  620. if (clean_it == bs->clean_db.end() ||
  621. clean_it->second.version < je->big_write.version)
  622. {
  623. // oid, version, block
  624. obj_ver_id ov = {
  625. .oid = je->big_write.oid,
  626. .version = je->big_write.version,
  627. };
  628. void *bmp = (void*)je + sizeof(journal_entry_big_write);
  629. if (bs->entry_attr_size <= sizeof(void*))
  630. {
  631. memcpy(&bmp, bmp, bs->entry_attr_size);
  632. }
  633. else if (!bs->journal.inmemory)
  634. {
  635. // FIXME Using large blockstore objects and not keeping journal in memory
  636. // will result in a lot of small allocations for entry bitmaps. This can
  637. // only be fixed by using a patched map with dynamic entry size, but not
  638. // the btree_map, because it doesn't keep iterators valid all the time.
  639. void *bmp_cp = malloc_or_die(bs->entry_attr_size);
  640. memcpy(bmp_cp, bmp, bs->entry_attr_size);
  641. bmp = bmp_cp;
  642. }
  643. bs->dirty_db.emplace(ov, (dirty_entry){
  644. .state = (BS_ST_BIG_WRITE | BS_ST_SYNCED),
  645. .flags = 0,
  646. .location = je->big_write.location,
  647. .offset = je->big_write.offset,
  648. .len = je->big_write.len,
  649. .journal_sector = proc_pos,
  650. .bitmap = bmp,
  651. });
  652. #ifdef BLOCKSTORE_DEBUG
  653. printf("Allocate block %lu\n", je->big_write.location >> bs->block_order);
  654. #endif
  655. bs->data_alloc->set(je->big_write.location >> bs->block_order, true);
  656. bs->journal.used_sectors[proc_pos]++;
  657. #ifdef BLOCKSTORE_DEBUG
  658. printf(
  659. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  660. proc_pos, ov.oid.inode, ov.oid.stripe, ov.version, bs->journal.used_sectors[proc_pos]
  661. );
  662. #endif
  663. auto & unstab = bs->unstable_writes[ov.oid];
  664. unstab = unstab < ov.version ? ov.version : unstab;
  665. if (je->type == JE_BIG_WRITE_INSTANT)
  666. {
  667. bs->mark_stable(ov);
  668. }
  669. }
  670. }
  671. else if (je->type == JE_STABLE)
  672. {
  673. #ifdef BLOCKSTORE_DEBUG
  674. printf("je_stable oid=%lx:%lx ver=%lu\n", je->stable.oid.inode, je->stable.oid.stripe, je->stable.version);
  675. #endif
  676. // oid, version
  677. obj_ver_id ov = {
  678. .oid = je->stable.oid,
  679. .version = je->stable.version,
  680. };
  681. bs->mark_stable(ov);
  682. }
  683. else if (je->type == JE_ROLLBACK)
  684. {
  685. #ifdef BLOCKSTORE_DEBUG
  686. printf("je_rollback oid=%lx:%lx ver=%lu\n", je->rollback.oid.inode, je->rollback.oid.stripe, je->rollback.version);
  687. #endif
  688. // rollback dirty writes of <oid> up to <version>
  689. obj_ver_id ov = {
  690. .oid = je->rollback.oid,
  691. .version = je->rollback.version,
  692. };
  693. bs->mark_rolled_back(ov);
  694. }
  695. else if (je->type == JE_DELETE)
  696. {
  697. #ifdef BLOCKSTORE_DEBUG
  698. printf("je_delete oid=%lx:%lx ver=%lu\n", je->del.oid.inode, je->del.oid.stripe, je->del.version);
  699. #endif
  700. auto clean_it = bs->clean_db.find(je->del.oid);
  701. if (clean_it != bs->clean_db.end() &&
  702. clean_it->second.version < je->del.version)
  703. {
  704. // oid, version
  705. obj_ver_id ov = {
  706. .oid = je->del.oid,
  707. .version = je->del.version,
  708. };
  709. bs->dirty_db.emplace(ov, (dirty_entry){
  710. .state = (BS_ST_DELETE | BS_ST_SYNCED),
  711. .flags = 0,
  712. .location = 0,
  713. .offset = 0,
  714. .len = 0,
  715. .journal_sector = proc_pos,
  716. });
  717. bs->journal.used_sectors[proc_pos]++;
  718. // Deletions are treated as immediately stable, because
  719. // "2-phase commit" (write->stabilize) isn't sufficient for them anyway
  720. bs->mark_stable(ov);
  721. }
  722. }
  723. started = true;
  724. pos += je->size;
  725. crc32_last = je->crc32;
  726. entries_loaded++;
  727. }
  728. }
  729. bs->journal.next_free = next_free;
  730. return 1;
  731. }