Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

585 lines
22 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
  5. {
  6. // Check or assign version number
  7. bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE);
  8. bool wait_big = false, wait_del = false;
  9. uint64_t version = 1;
  10. if (dirty_db.size() > 0)
  11. {
  12. auto dirty_it = dirty_db.upper_bound((obj_ver_id){
  13. .oid = op->oid,
  14. .version = UINT64_MAX,
  15. });
  16. dirty_it--; // segfaults when dirty_db is empty
  17. if (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
  18. {
  19. found = true;
  20. version = dirty_it->first.version + 1;
  21. deleted = IS_DELETE(dirty_it->second.state);
  22. wait_del = ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_DEL);
  23. wait_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
  24. ? !IS_SYNCED(dirty_it->second.state)
  25. : ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG);
  26. }
  27. }
  28. if (!found)
  29. {
  30. auto clean_it = clean_db.find(op->oid);
  31. if (clean_it != clean_db.end())
  32. {
  33. version = clean_it->second.version + 1;
  34. }
  35. else
  36. {
  37. deleted = true;
  38. }
  39. }
  40. if (deleted && is_del)
  41. {
  42. // Already deleted
  43. op->retval = 0;
  44. return false;
  45. }
  46. PRIV(op)->real_version = 0;
  47. if (op->version == 0)
  48. {
  49. op->version = version;
  50. }
  51. else if (op->version < version)
  52. {
  53. // Implicit operations must be added like that: DEL [FLUSH] BIG [SYNC] SMALL SMALL
  54. if (deleted || wait_del)
  55. {
  56. // It's allowed to write versions with low numbers over deletes
  57. // However, we have to flush those deletes first as we use version number for ordering
  58. #ifdef BLOCKSTORE_DEBUG
  59. printf("Write %lx:%lx v%lu over delete (real v%lu) offset=%u len=%u\n", op->oid.inode, op->oid.stripe, version, op->version, op->offset, op->len);
  60. #endif
  61. wait_del = true;
  62. PRIV(op)->real_version = op->version;
  63. op->version = version;
  64. flusher->unshift_flush((obj_ver_id){
  65. .oid = op->oid,
  66. .version = version-1,
  67. }, true);
  68. }
  69. else
  70. {
  71. // Invalid version requested
  72. op->retval = -EEXIST;
  73. return false;
  74. }
  75. }
  76. if (wait_big && !is_del && !deleted && op->len < block_size &&
  77. immediate_commit != IMMEDIATE_ALL)
  78. {
  79. // Issue an additional sync so that the previous big write can reach the journal
  80. blockstore_op_t *sync_op = new blockstore_op_t;
  81. sync_op->opcode = BS_OP_SYNC;
  82. sync_op->callback = [this, op](blockstore_op_t *sync_op)
  83. {
  84. delete sync_op;
  85. };
  86. enqueue_op(sync_op);
  87. }
  88. #ifdef BLOCKSTORE_DEBUG
  89. if (is_del)
  90. printf("Delete %lx:%lx v%lu\n", op->oid.inode, op->oid.stripe, op->version);
  91. else if (!wait_del)
  92. printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len);
  93. #endif
  94. // FIXME No strict need to add it into dirty_db here, it's just left
  95. // from the previous implementation where reads waited for writes
  96. void *bmp = NULL;
  97. uint32_t state;
  98. if (is_del)
  99. state = BS_ST_DELETE | BS_ST_IN_FLIGHT;
  100. else
  101. {
  102. state = (op->len == block_size || deleted ? BS_ST_BIG_WRITE : BS_ST_SMALL_WRITE);
  103. if (wait_del)
  104. state |= BS_ST_WAIT_DEL;
  105. else if (state == BS_ST_SMALL_WRITE && wait_big)
  106. state |= BS_ST_WAIT_BIG;
  107. else
  108. state |= BS_ST_IN_FLIGHT;
  109. if (op->opcode == BS_OP_WRITE_STABLE)
  110. state |= BS_ST_INSTANT;
  111. if (entry_attr_size > sizeof(void*))
  112. bmp = calloc_or_die(1, entry_attr_size);
  113. if (op->bitmap)
  114. memcpy((entry_attr_size > sizeof(void*) ? bmp : &bmp), op->bitmap, entry_attr_size);
  115. }
  116. dirty_db.emplace((obj_ver_id){
  117. .oid = op->oid,
  118. .version = op->version,
  119. }, (dirty_entry){
  120. .state = state,
  121. .flags = 0,
  122. .location = 0,
  123. .offset = is_del ? 0 : op->offset,
  124. .len = is_del ? 0 : op->len,
  125. .journal_sector = 0,
  126. .bitmap = bmp,
  127. });
  128. return true;
  129. }
  130. // First step of the write algorithm: dequeue operation and submit initial write(s)
  131. int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
  132. {
  133. if (PRIV(op)->op_state)
  134. {
  135. return continue_write(op);
  136. }
  137. auto dirty_it = dirty_db.find((obj_ver_id){
  138. .oid = op->oid,
  139. .version = op->version,
  140. });
  141. assert(dirty_it != dirty_db.end());
  142. if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) < BS_ST_IN_FLIGHT)
  143. {
  144. // Don't dequeue
  145. return 0;
  146. }
  147. if (PRIV(op)->real_version != 0)
  148. {
  149. // Restore original low version number for unblocked operations
  150. #ifdef BLOCKSTORE_DEBUG
  151. printf("Restoring %lx:%lx version: v%lu -> v%lu\n", op->oid.inode, op->oid.stripe, op->version, PRIV(op)->real_version);
  152. #endif
  153. auto prev_it = dirty_it;
  154. prev_it--;
  155. if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
  156. {
  157. // Original version is still invalid
  158. // FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it
  159. if (entry_attr_size > sizeof(void*))
  160. free(dirty_it->second.bitmap);
  161. dirty_db.erase(dirty_it);
  162. op->retval = -EEXIST;
  163. FINISH_OP(op);
  164. return 1;
  165. }
  166. op->version = PRIV(op)->real_version;
  167. PRIV(op)->real_version = 0;
  168. dirty_entry e = dirty_it->second;
  169. dirty_db.erase(dirty_it);
  170. dirty_it = dirty_db.emplace((obj_ver_id){
  171. .oid = op->oid,
  172. .version = op->version,
  173. }, e).first;
  174. }
  175. if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
  176. {
  177. blockstore_journal_check_t space_check(this);
  178. if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
  179. {
  180. return 0;
  181. }
  182. // Big (redirect) write
  183. uint64_t loc = data_alloc->find_free();
  184. if (loc == UINT64_MAX)
  185. {
  186. // no space
  187. if (flusher->is_active())
  188. {
  189. // hope that some space will be available after flush
  190. PRIV(op)->wait_for = WAIT_FREE;
  191. return 0;
  192. }
  193. // FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it
  194. if (entry_attr_size > sizeof(void*))
  195. free(dirty_it->second.bitmap);
  196. dirty_db.erase(dirty_it);
  197. op->retval = -ENOSPC;
  198. FINISH_OP(op);
  199. return 1;
  200. }
  201. BS_SUBMIT_GET_SQE(sqe, data);
  202. dirty_it->second.location = loc << block_order;
  203. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
  204. #ifdef BLOCKSTORE_DEBUG
  205. printf("Allocate block %lu\n", loc);
  206. #endif
  207. data_alloc->set(loc, true);
  208. uint64_t stripe_offset = (op->offset % bitmap_granularity);
  209. uint64_t stripe_end = (op->offset + op->len) % bitmap_granularity;
  210. // Zero fill up to bitmap_granularity
  211. int vcnt = 0;
  212. if (stripe_offset)
  213. {
  214. PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ zero_object, stripe_offset };
  215. }
  216. PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ op->buf, op->len };
  217. if (stripe_end)
  218. {
  219. stripe_end = bitmap_granularity - stripe_end;
  220. PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ zero_object, stripe_end };
  221. }
  222. data->iov.iov_len = op->len + stripe_offset + stripe_end; // to check it in the callback
  223. data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
  224. my_uring_prep_writev(
  225. sqe, data_fd, PRIV(op)->iov_zerofill, vcnt, data_offset + (loc << block_order) + op->offset - stripe_offset
  226. );
  227. PRIV(op)->pending_ops = 1;
  228. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  229. if (immediate_commit != IMMEDIATE_ALL)
  230. {
  231. // Remember big write as unsynced
  232. unsynced_big_writes.push_back((obj_ver_id){
  233. .oid = op->oid,
  234. .version = op->version,
  235. });
  236. PRIV(op)->op_state = 3;
  237. }
  238. else
  239. {
  240. PRIV(op)->op_state = 1;
  241. }
  242. }
  243. else /* if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_SMALL_WRITE) */
  244. {
  245. // Small (journaled) write
  246. // First check if the journal has sufficient space
  247. blockstore_journal_check_t space_check(this);
  248. if (unsynced_big_writes.size() && !space_check.check_available(op, unsynced_big_writes.size(), sizeof(journal_entry_big_write), 0)
  249. || !space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len + JOURNAL_STABILIZE_RESERVATION))
  250. {
  251. return 0;
  252. }
  253. // There is sufficient space. Get SQE(s)
  254. struct io_uring_sqe *sqe1 = NULL;
  255. if (immediate_commit != IMMEDIATE_NONE ||
  256. (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_small_write) &&
  257. journal.sector_info[journal.cur_sector].dirty)
  258. {
  259. // Write current journal sector only if it's dirty and full, or in the immediate_commit mode
  260. BS_SUBMIT_GET_SQE_DECL(sqe1);
  261. }
  262. struct io_uring_sqe *sqe2 = NULL;
  263. if (op->len > 0)
  264. {
  265. BS_SUBMIT_GET_SQE_DECL(sqe2);
  266. }
  267. // Got SQEs. Prepare previous journal sector write if required
  268. auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
  269. if (immediate_commit == IMMEDIATE_NONE)
  270. {
  271. if (sqe1)
  272. {
  273. prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
  274. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  275. PRIV(op)->pending_ops++;
  276. }
  277. else
  278. {
  279. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  280. }
  281. }
  282. // Then pre-fill journal entry
  283. journal_entry_small_write *je = (journal_entry_small_write*)prefill_single_journal_entry(
  284. journal, op->opcode == BS_OP_WRITE_STABLE ? JE_SMALL_WRITE_INSTANT : JE_SMALL_WRITE,
  285. sizeof(journal_entry_small_write) + entry_attr_size
  286. );
  287. dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
  288. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  289. #ifdef BLOCKSTORE_DEBUG
  290. printf(
  291. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  292. dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version,
  293. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  294. );
  295. #endif
  296. // Figure out where data will be
  297. journal.next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : journal_block_size;
  298. je->oid = op->oid;
  299. je->version = op->version;
  300. je->offset = op->offset;
  301. je->len = op->len;
  302. je->data_offset = journal.next_free;
  303. je->crc32_data = crc32c(0, op->buf, op->len);
  304. memcpy((void*)(je+1), (entry_attr_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), entry_attr_size);
  305. je->crc32 = je_crc32((journal_entry*)je);
  306. journal.crc32_last = je->crc32;
  307. if (immediate_commit != IMMEDIATE_NONE)
  308. {
  309. prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
  310. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  311. PRIV(op)->pending_ops++;
  312. }
  313. if (op->len > 0)
  314. {
  315. // Prepare journal data write
  316. if (journal.inmemory)
  317. {
  318. // Copy data
  319. memcpy(journal.buffer + journal.next_free, op->buf, op->len);
  320. }
  321. ring_data_t *data2 = ((ring_data_t*)sqe2->user_data);
  322. data2->iov = (struct iovec){ op->buf, op->len };
  323. data2->callback = cb;
  324. my_uring_prep_writev(
  325. sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
  326. );
  327. PRIV(op)->pending_ops++;
  328. }
  329. else
  330. {
  331. // Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data
  332. }
  333. dirty_it->second.location = journal.next_free;
  334. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
  335. journal.next_free += op->len;
  336. if (journal.next_free >= journal.len)
  337. {
  338. journal.next_free = journal_block_size;
  339. }
  340. if (immediate_commit == IMMEDIATE_NONE)
  341. {
  342. // Remember small write as unsynced
  343. unsynced_small_writes.push_back((obj_ver_id){
  344. .oid = op->oid,
  345. .version = op->version,
  346. });
  347. }
  348. if (!PRIV(op)->pending_ops)
  349. {
  350. PRIV(op)->op_state = 4;
  351. continue_write(op);
  352. }
  353. else
  354. {
  355. PRIV(op)->op_state = 3;
  356. }
  357. }
  358. return 1;
  359. }
  360. int blockstore_impl_t::continue_write(blockstore_op_t *op)
  361. {
  362. io_uring_sqe *sqe = NULL;
  363. journal_entry_big_write *je;
  364. auto dirty_it = dirty_db.find((obj_ver_id){
  365. .oid = op->oid,
  366. .version = op->version,
  367. });
  368. assert(dirty_it != dirty_db.end());
  369. if (PRIV(op)->op_state == 2)
  370. goto resume_2;
  371. else if (PRIV(op)->op_state == 4)
  372. goto resume_4;
  373. else
  374. return 1;
  375. resume_2:
  376. // Only for the immediate_commit mode: prepare and submit big_write journal entry
  377. sqe = get_sqe();
  378. if (!sqe)
  379. {
  380. return 0;
  381. }
  382. je = (journal_entry_big_write*)prefill_single_journal_entry(
  383. journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
  384. sizeof(journal_entry_big_write) + entry_attr_size
  385. );
  386. dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
  387. journal.sector_info[journal.cur_sector].dirty = false;
  388. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  389. #ifdef BLOCKSTORE_DEBUG
  390. printf(
  391. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  392. journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version,
  393. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  394. );
  395. #endif
  396. je->oid = op->oid;
  397. je->version = op->version;
  398. je->offset = op->offset;
  399. je->len = op->len;
  400. je->location = dirty_it->second.location;
  401. memcpy((void*)(je+1), (entry_attr_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), entry_attr_size);
  402. je->crc32 = je_crc32((journal_entry*)je);
  403. journal.crc32_last = je->crc32;
  404. prepare_journal_sector_write(journal, journal.cur_sector, sqe,
  405. [this, op](ring_data_t *data) { handle_write_event(data, op); });
  406. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  407. PRIV(op)->pending_ops = 1;
  408. PRIV(op)->op_state = 3;
  409. return 1;
  410. resume_4:
  411. // Switch object state
  412. #ifdef BLOCKSTORE_DEBUG
  413. printf("Ack write %lx:%lx v%lu = state %x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
  414. #endif
  415. bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
  416. ? (immediate_commit == IMMEDIATE_ALL)
  417. : (immediate_commit != IMMEDIATE_NONE);
  418. if (imm)
  419. {
  420. auto & unstab = unstable_writes[op->oid];
  421. unstab = unstab < op->version ? op->version : unstab;
  422. }
  423. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
  424. | (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
  425. if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
  426. {
  427. // Deletions are treated as immediately stable
  428. mark_stable(dirty_it->first);
  429. }
  430. if (immediate_commit == IMMEDIATE_ALL)
  431. {
  432. dirty_it++;
  433. while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
  434. {
  435. if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
  436. {
  437. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
  438. }
  439. dirty_it++;
  440. }
  441. }
  442. // Acknowledge write
  443. op->retval = op->len;
  444. FINISH_OP(op);
  445. return 1;
  446. }
  447. void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op)
  448. {
  449. live = true;
  450. if (data->res != data->iov.iov_len)
  451. {
  452. // FIXME: our state becomes corrupted after a write error. maybe do something better than just die
  453. throw std::runtime_error(
  454. "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  455. "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
  456. );
  457. }
  458. PRIV(op)->pending_ops--;
  459. if (PRIV(op)->pending_ops == 0)
  460. {
  461. release_journal_sectors(op);
  462. PRIV(op)->op_state++;
  463. if (!continue_write(op))
  464. {
  465. submit_queue.push_front(op);
  466. }
  467. }
  468. }
  469. void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
  470. {
  471. // Release flushed journal sectors
  472. if (PRIV(op)->min_flushed_journal_sector > 0 &&
  473. PRIV(op)->max_flushed_journal_sector > 0)
  474. {
  475. uint64_t s = PRIV(op)->min_flushed_journal_sector;
  476. while (1)
  477. {
  478. journal.sector_info[s-1].usage_count--;
  479. if (s != (1+journal.cur_sector) && journal.sector_info[s-1].usage_count == 0)
  480. {
  481. // We know for sure that we won't write into this sector anymore
  482. uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size;
  483. if (new_ds >= journal.len)
  484. {
  485. new_ds = journal.block_size;
  486. }
  487. if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) <
  488. (new_ds + (new_ds >= journal.used_start ? 0 : journal.len)))
  489. {
  490. journal.dirty_start = new_ds;
  491. }
  492. }
  493. if (s == PRIV(op)->max_flushed_journal_sector)
  494. break;
  495. s = 1 + s % journal.sector_count;
  496. }
  497. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  498. }
  499. }
  500. int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
  501. {
  502. auto dirty_it = dirty_db.find((obj_ver_id){
  503. .oid = op->oid,
  504. .version = op->version,
  505. });
  506. assert(dirty_it != dirty_db.end());
  507. blockstore_journal_check_t space_check(this);
  508. if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_STABILIZE_RESERVATION))
  509. {
  510. return 0;
  511. }
  512. io_uring_sqe *sqe = NULL;
  513. if (immediate_commit != IMMEDIATE_NONE ||
  514. (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
  515. journal.sector_info[journal.cur_sector].dirty)
  516. {
  517. // Write current journal sector only if it's dirty and full, or in the immediate_commit mode
  518. BS_SUBMIT_GET_SQE_DECL(sqe);
  519. }
  520. auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
  521. // Prepare journal sector write
  522. if (immediate_commit == IMMEDIATE_NONE)
  523. {
  524. if (sqe)
  525. {
  526. prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
  527. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  528. PRIV(op)->pending_ops++;
  529. }
  530. else
  531. {
  532. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  533. }
  534. }
  535. // Pre-fill journal entry
  536. journal_entry_del *je = (journal_entry_del*)prefill_single_journal_entry(
  537. journal, JE_DELETE, sizeof(struct journal_entry_del)
  538. );
  539. dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
  540. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  541. #ifdef BLOCKSTORE_DEBUG
  542. printf(
  543. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  544. dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version,
  545. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  546. );
  547. #endif
  548. je->oid = op->oid;
  549. je->version = op->version;
  550. je->crc32 = je_crc32((journal_entry*)je);
  551. journal.crc32_last = je->crc32;
  552. dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED;
  553. if (immediate_commit != IMMEDIATE_NONE)
  554. {
  555. prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
  556. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  557. PRIV(op)->pending_ops++;
  558. }
  559. else
  560. {
  561. // Remember delete as unsynced
  562. unsynced_small_writes.push_back((obj_ver_id){
  563. .oid = op->oid,
  564. .version = op->version,
  565. });
  566. }
  567. if (!PRIV(op)->pending_ops)
  568. {
  569. PRIV(op)->op_state = 4;
  570. continue_write(op);
  571. }
  572. else
  573. {
  574. PRIV(op)->op_state = 3;
  575. }
  576. return 1;
  577. }