Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

705 lines
26 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
  5. {
  6. // Check or assign version number
  7. bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE);
  8. bool wait_big = false, wait_del = false;
  9. void *bmp = NULL;
  10. uint64_t version = 1;
  11. if (!is_del && clean_entry_bitmap_size > sizeof(void*))
  12. {
  13. bmp = calloc_or_die(1, clean_entry_bitmap_size);
  14. }
  15. if (dirty_db.size() > 0)
  16. {
  17. auto dirty_it = dirty_db.upper_bound((obj_ver_id){
  18. .oid = op->oid,
  19. .version = UINT64_MAX,
  20. });
  21. dirty_it--; // segfaults when dirty_db is empty
  22. if (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
  23. {
  24. found = true;
  25. version = dirty_it->first.version + 1;
  26. deleted = IS_DELETE(dirty_it->second.state);
  27. wait_del = ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_DEL);
  28. wait_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
  29. ? !IS_SYNCED(dirty_it->second.state)
  30. : ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG);
  31. if (!is_del && !deleted)
  32. {
  33. if (clean_entry_bitmap_size > sizeof(void*))
  34. memcpy(bmp, dirty_it->second.bitmap, clean_entry_bitmap_size);
  35. else
  36. bmp = dirty_it->second.bitmap;
  37. }
  38. }
  39. }
  40. if (!found)
  41. {
  42. auto clean_it = clean_db.find(op->oid);
  43. if (clean_it != clean_db.end())
  44. {
  45. version = clean_it->second.version + 1;
  46. if (!is_del)
  47. {
  48. void *bmp_ptr = get_clean_entry_bitmap(clean_it->second.location, clean_entry_bitmap_size);
  49. memcpy((clean_entry_bitmap_size > sizeof(void*) ? bmp : &bmp), bmp_ptr, clean_entry_bitmap_size);
  50. }
  51. }
  52. else
  53. {
  54. deleted = true;
  55. }
  56. }
  57. if (deleted && is_del)
  58. {
  59. // Already deleted
  60. op->retval = 0;
  61. return false;
  62. }
  63. PRIV(op)->real_version = 0;
  64. if (op->version == 0)
  65. {
  66. op->version = version;
  67. }
  68. else if (op->version < version)
  69. {
  70. // Implicit operations must be added like that: DEL [FLUSH] BIG [SYNC] SMALL SMALL
  71. if (deleted || wait_del)
  72. {
  73. // It's allowed to write versions with low numbers over deletes
  74. // However, we have to flush those deletes first as we use version number for ordering
  75. #ifdef BLOCKSTORE_DEBUG
  76. printf("Write %lx:%lx v%lu over delete (real v%lu) offset=%u len=%u\n", op->oid.inode, op->oid.stripe, version, op->version, op->offset, op->len);
  77. #endif
  78. wait_del = true;
  79. PRIV(op)->real_version = op->version;
  80. op->version = version;
  81. flusher->unshift_flush((obj_ver_id){
  82. .oid = op->oid,
  83. .version = version-1,
  84. }, true);
  85. }
  86. else
  87. {
  88. // Invalid version requested
  89. op->retval = -EEXIST;
  90. if (!is_del && clean_entry_bitmap_size > sizeof(void*))
  91. {
  92. free(bmp);
  93. }
  94. return false;
  95. }
  96. }
  97. if (wait_big && !is_del && !deleted && op->len < block_size &&
  98. immediate_commit != IMMEDIATE_ALL)
  99. {
  100. // Issue an additional sync so that the previous big write can reach the journal
  101. blockstore_op_t *sync_op = new blockstore_op_t;
  102. sync_op->opcode = BS_OP_SYNC;
  103. sync_op->callback = [this, op](blockstore_op_t *sync_op)
  104. {
  105. delete sync_op;
  106. };
  107. enqueue_op(sync_op);
  108. }
  109. #ifdef BLOCKSTORE_DEBUG
  110. if (is_del)
  111. printf("Delete %lx:%lx v%lu\n", op->oid.inode, op->oid.stripe, op->version);
  112. else if (!wait_del)
  113. printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len);
  114. #endif
  115. // FIXME No strict need to add it into dirty_db here, it's just left
  116. // from the previous implementation where reads waited for writes
  117. uint32_t state;
  118. if (is_del)
  119. state = BS_ST_DELETE | BS_ST_IN_FLIGHT;
  120. else
  121. {
  122. state = (op->len == block_size || deleted ? BS_ST_BIG_WRITE : BS_ST_SMALL_WRITE);
  123. if (state == BS_ST_SMALL_WRITE && throttle_small_writes)
  124. clock_gettime(CLOCK_REALTIME, &PRIV(op)->tv_begin);
  125. if (wait_del)
  126. state |= BS_ST_WAIT_DEL;
  127. else if (state == BS_ST_SMALL_WRITE && wait_big)
  128. state |= BS_ST_WAIT_BIG;
  129. else
  130. state |= BS_ST_IN_FLIGHT;
  131. if (op->opcode == BS_OP_WRITE_STABLE)
  132. state |= BS_ST_INSTANT;
  133. if (op->bitmap)
  134. {
  135. // Only allow to overwrite part of the object bitmap respective to the write's offset/len
  136. uint8_t *bmp_ptr = (uint8_t*)(clean_entry_bitmap_size > sizeof(void*) ? bmp : &bmp);
  137. uint32_t bit = op->offset/bitmap_granularity;
  138. uint32_t bits_left = op->len/bitmap_granularity;
  139. while (!(bit % 8) && bits_left > 8)
  140. {
  141. // Copy bytes
  142. bmp_ptr[bit/8] = ((uint8_t*)op->bitmap)[bit/8];
  143. bit += 8;
  144. bits_left -= 8;
  145. }
  146. while (bits_left > 0)
  147. {
  148. // Copy bits
  149. bmp_ptr[bit/8] = (bmp_ptr[bit/8] & ~(1 << (bit%8)))
  150. | (((uint8_t*)op->bitmap)[bit/8] & (1 << bit%8));
  151. bit++;
  152. bits_left--;
  153. }
  154. }
  155. }
  156. dirty_db.emplace((obj_ver_id){
  157. .oid = op->oid,
  158. .version = op->version,
  159. }, (dirty_entry){
  160. .state = state,
  161. .flags = 0,
  162. .location = 0,
  163. .offset = is_del ? 0 : op->offset,
  164. .len = is_del ? 0 : op->len,
  165. .journal_sector = 0,
  166. .bitmap = bmp,
  167. });
  168. return true;
  169. }
  170. void blockstore_impl_t::cancel_all_writes(blockstore_op_t *op, blockstore_dirty_db_t::iterator dirty_it, int retval)
  171. {
  172. while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
  173. {
  174. if (clean_entry_bitmap_size > sizeof(void*))
  175. free(dirty_it->second.bitmap);
  176. dirty_db.erase(dirty_it++);
  177. }
  178. bool found = false;
  179. for (auto other_op: submit_queue)
  180. {
  181. if (!found && other_op == op)
  182. found = true;
  183. else if (found && other_op->oid == op->oid &&
  184. (other_op->opcode == BS_OP_WRITE || other_op->opcode == BS_OP_WRITE_STABLE))
  185. {
  186. // Mark operations to cancel them
  187. PRIV(other_op)->real_version = UINT64_MAX;
  188. other_op->retval = retval;
  189. }
  190. }
  191. op->retval = retval;
  192. FINISH_OP(op);
  193. }
  194. // First step of the write algorithm: dequeue operation and submit initial write(s)
  195. int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
  196. {
  197. if (PRIV(op)->op_state)
  198. {
  199. return continue_write(op);
  200. }
  201. auto dirty_it = dirty_db.find((obj_ver_id){
  202. .oid = op->oid,
  203. .version = op->version,
  204. });
  205. assert(dirty_it != dirty_db.end());
  206. if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) < BS_ST_IN_FLIGHT)
  207. {
  208. // Don't dequeue
  209. return 0;
  210. }
  211. if (PRIV(op)->real_version != 0)
  212. {
  213. if (PRIV(op)->real_version == UINT64_MAX)
  214. {
  215. // This is the flag value used to cancel operations
  216. FINISH_OP(op);
  217. return 2;
  218. }
  219. // Restore original low version number for unblocked operations
  220. #ifdef BLOCKSTORE_DEBUG
  221. printf("Restoring %lx:%lx version: v%lu -> v%lu\n", op->oid.inode, op->oid.stripe, op->version, PRIV(op)->real_version);
  222. #endif
  223. auto prev_it = dirty_it;
  224. prev_it--;
  225. if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
  226. {
  227. // Original version is still invalid
  228. // All subsequent writes to the same object must be canceled too
  229. cancel_all_writes(op, dirty_it, -EEXIST);
  230. return 2;
  231. }
  232. op->version = PRIV(op)->real_version;
  233. PRIV(op)->real_version = 0;
  234. dirty_entry e = dirty_it->second;
  235. dirty_db.erase(dirty_it);
  236. dirty_it = dirty_db.emplace((obj_ver_id){
  237. .oid = op->oid,
  238. .version = op->version,
  239. }, e).first;
  240. }
  241. if (write_iodepth >= max_write_iodepth)
  242. {
  243. return 0;
  244. }
  245. if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
  246. {
  247. blockstore_journal_check_t space_check(this);
  248. if (!space_check.check_available(op, unsynced_big_write_count + 1,
  249. sizeof(journal_entry_big_write) + clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
  250. {
  251. return 0;
  252. }
  253. // Big (redirect) write
  254. uint64_t loc = data_alloc->find_free();
  255. if (loc == UINT64_MAX)
  256. {
  257. // no space
  258. if (flusher->is_active())
  259. {
  260. // hope that some space will be available after flush
  261. PRIV(op)->wait_for = WAIT_FREE;
  262. return 0;
  263. }
  264. cancel_all_writes(op, dirty_it, -ENOSPC);
  265. return 2;
  266. }
  267. write_iodepth++;
  268. BS_SUBMIT_GET_SQE(sqe, data);
  269. dirty_it->second.location = loc << block_order;
  270. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
  271. #ifdef BLOCKSTORE_DEBUG
  272. printf(
  273. "Allocate block %lu for %lx:%lx v%lu\n",
  274. loc, op->oid.inode, op->oid.stripe, op->version
  275. );
  276. #endif
  277. data_alloc->set(loc, true);
  278. uint64_t stripe_offset = (op->offset % bitmap_granularity);
  279. uint64_t stripe_end = (op->offset + op->len) % bitmap_granularity;
  280. // Zero fill up to bitmap_granularity
  281. int vcnt = 0;
  282. if (stripe_offset)
  283. {
  284. PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ zero_object, stripe_offset };
  285. }
  286. PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ op->buf, op->len };
  287. if (stripe_end)
  288. {
  289. stripe_end = bitmap_granularity - stripe_end;
  290. PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ zero_object, stripe_end };
  291. }
  292. data->iov.iov_len = op->len + stripe_offset + stripe_end; // to check it in the callback
  293. data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
  294. my_uring_prep_writev(
  295. sqe, data_fd, PRIV(op)->iov_zerofill, vcnt, data_offset + (loc << block_order) + op->offset - stripe_offset
  296. );
  297. PRIV(op)->pending_ops = 1;
  298. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  299. if (immediate_commit != IMMEDIATE_ALL)
  300. {
  301. // Increase the counter, but don't save into unsynced_writes yet (can't sync until the write is finished)
  302. unsynced_big_write_count++;
  303. PRIV(op)->op_state = 3;
  304. }
  305. else
  306. {
  307. PRIV(op)->op_state = 1;
  308. }
  309. }
  310. else /* if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_SMALL_WRITE) */
  311. {
  312. // Small (journaled) write
  313. // First check if the journal has sufficient space
  314. blockstore_journal_check_t space_check(this);
  315. if (unsynced_big_write_count &&
  316. !space_check.check_available(op, unsynced_big_write_count,
  317. sizeof(journal_entry_big_write) + clean_entry_bitmap_size, 0)
  318. || !space_check.check_available(op, 1,
  319. sizeof(journal_entry_small_write) + clean_entry_bitmap_size, op->len + JOURNAL_STABILIZE_RESERVATION))
  320. {
  321. return 0;
  322. }
  323. write_iodepth++;
  324. // There is sufficient space. Get SQE(s)
  325. struct io_uring_sqe *sqe1 = NULL;
  326. if (immediate_commit != IMMEDIATE_NONE ||
  327. !journal.entry_fits(sizeof(journal_entry_small_write) + clean_entry_bitmap_size))
  328. {
  329. // Write current journal sector only if it's dirty and full, or in the immediate_commit mode
  330. BS_SUBMIT_GET_SQE_DECL(sqe1);
  331. }
  332. struct io_uring_sqe *sqe2 = NULL;
  333. if (op->len > 0)
  334. {
  335. BS_SUBMIT_GET_SQE_DECL(sqe2);
  336. }
  337. // Got SQEs. Prepare previous journal sector write if required
  338. auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
  339. if (immediate_commit == IMMEDIATE_NONE)
  340. {
  341. if (sqe1)
  342. {
  343. prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
  344. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  345. PRIV(op)->pending_ops++;
  346. }
  347. else
  348. {
  349. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  350. }
  351. }
  352. // Then pre-fill journal entry
  353. journal_entry_small_write *je = (journal_entry_small_write*)prefill_single_journal_entry(
  354. journal, op->opcode == BS_OP_WRITE_STABLE ? JE_SMALL_WRITE_INSTANT : JE_SMALL_WRITE,
  355. sizeof(journal_entry_small_write) + clean_entry_bitmap_size
  356. );
  357. dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
  358. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  359. #ifdef BLOCKSTORE_DEBUG
  360. printf(
  361. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  362. dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version,
  363. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  364. );
  365. #endif
  366. // Figure out where data will be
  367. journal.next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : journal_block_size;
  368. je->oid = op->oid;
  369. je->version = op->version;
  370. je->offset = op->offset;
  371. je->len = op->len;
  372. je->data_offset = journal.next_free;
  373. je->crc32_data = crc32c(0, op->buf, op->len);
  374. memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size);
  375. je->crc32 = je_crc32((journal_entry*)je);
  376. journal.crc32_last = je->crc32;
  377. if (immediate_commit != IMMEDIATE_NONE)
  378. {
  379. prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
  380. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  381. PRIV(op)->pending_ops++;
  382. }
  383. if (op->len > 0)
  384. {
  385. // Prepare journal data write
  386. if (journal.inmemory)
  387. {
  388. // Copy data
  389. memcpy(journal.buffer + journal.next_free, op->buf, op->len);
  390. }
  391. ring_data_t *data2 = ((ring_data_t*)sqe2->user_data);
  392. data2->iov = (struct iovec){ op->buf, op->len };
  393. data2->callback = cb;
  394. my_uring_prep_writev(
  395. sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
  396. );
  397. PRIV(op)->pending_ops++;
  398. }
  399. else
  400. {
  401. // Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data
  402. }
  403. dirty_it->second.location = journal.next_free;
  404. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED;
  405. journal.next_free += op->len;
  406. if (journal.next_free >= journal.len)
  407. {
  408. journal.next_free = journal_block_size;
  409. }
  410. if (!PRIV(op)->pending_ops)
  411. {
  412. PRIV(op)->op_state = 4;
  413. return continue_write(op);
  414. }
  415. else
  416. {
  417. PRIV(op)->op_state = 3;
  418. }
  419. }
  420. return 1;
  421. }
  422. int blockstore_impl_t::continue_write(blockstore_op_t *op)
  423. {
  424. int op_state = PRIV(op)->op_state;
  425. if (op_state == 2)
  426. goto resume_2;
  427. else if (op_state == 4)
  428. goto resume_4;
  429. else if (op_state == 6)
  430. goto resume_6;
  431. else
  432. {
  433. // In progress
  434. return 1;
  435. }
  436. resume_2:
  437. // Only for the immediate_commit mode: prepare and submit big_write journal entry
  438. {
  439. auto dirty_it = dirty_db.find((obj_ver_id){
  440. .oid = op->oid,
  441. .version = op->version,
  442. });
  443. assert(dirty_it != dirty_db.end());
  444. io_uring_sqe *sqe = NULL;
  445. BS_SUBMIT_GET_SQE_DECL(sqe);
  446. journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
  447. journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
  448. sizeof(journal_entry_big_write) + clean_entry_bitmap_size
  449. );
  450. dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
  451. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  452. #ifdef BLOCKSTORE_DEBUG
  453. printf(
  454. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  455. journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version,
  456. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  457. );
  458. #endif
  459. je->oid = op->oid;
  460. je->version = op->version;
  461. je->offset = op->offset;
  462. je->len = op->len;
  463. je->location = dirty_it->second.location;
  464. memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size);
  465. je->crc32 = je_crc32((journal_entry*)je);
  466. journal.crc32_last = je->crc32;
  467. prepare_journal_sector_write(journal, journal.cur_sector, sqe,
  468. [this, op](ring_data_t *data) { handle_write_event(data, op); });
  469. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  470. PRIV(op)->pending_ops = 1;
  471. PRIV(op)->op_state = 3;
  472. return 1;
  473. }
  474. resume_4:
  475. // Switch object state
  476. #ifdef BLOCKSTORE_DEBUG
  477. printf("Ack write %lx:%lx v%lu = state 0x%x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
  478. #endif
  479. {
  480. auto dirty_it = dirty_db.find((obj_ver_id){
  481. .oid = op->oid,
  482. .version = op->version,
  483. });
  484. assert(dirty_it != dirty_db.end());
  485. bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE;
  486. bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE);
  487. if (imm)
  488. {
  489. auto & unstab = unstable_writes[op->oid];
  490. unstab = unstab < op->version ? op->version : unstab;
  491. }
  492. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
  493. | (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
  494. if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
  495. {
  496. // Deletions and 'instant' operations are treated as immediately stable
  497. mark_stable(dirty_it->first);
  498. }
  499. if (!imm)
  500. {
  501. if (is_big)
  502. {
  503. // Remember big write as unsynced
  504. unsynced_big_writes.push_back((obj_ver_id){
  505. .oid = op->oid,
  506. .version = op->version,
  507. });
  508. }
  509. else
  510. {
  511. // Remember small write as unsynced
  512. unsynced_small_writes.push_back((obj_ver_id){
  513. .oid = op->oid,
  514. .version = op->version,
  515. });
  516. }
  517. }
  518. if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
  519. {
  520. // Unblock small writes
  521. dirty_it++;
  522. while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
  523. {
  524. if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
  525. {
  526. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
  527. }
  528. dirty_it++;
  529. }
  530. }
  531. // Apply throttling to not fill the journal too fast for the SSD+HDD case
  532. if (!is_big && throttle_small_writes)
  533. {
  534. // Apply throttling
  535. timespec tv_end;
  536. clock_gettime(CLOCK_REALTIME, &tv_end);
  537. uint64_t exec_us =
  538. (tv_end.tv_sec - PRIV(op)->tv_begin.tv_sec)*1000000 +
  539. (tv_end.tv_nsec - PRIV(op)->tv_begin.tv_nsec)/1000;
  540. // Compare with target execution time
  541. // 100% free -> target time = 0
  542. // 0% free -> target time = iodepth/parallelism * (iops + size/bw) / write per second
  543. uint64_t used_start = journal.get_trim_pos();
  544. uint64_t journal_free_space = journal.next_free < used_start
  545. ? (used_start - journal.next_free)
  546. : (journal.len - journal.next_free + used_start - journal.block_size);
  547. uint64_t ref_us =
  548. (write_iodepth <= throttle_target_parallelism ? 100 : 100*write_iodepth/throttle_target_parallelism)
  549. * (1000000/throttle_target_iops + op->len*1000000/throttle_target_mbs/1024/1024)
  550. / 100;
  551. ref_us -= ref_us * journal_free_space / journal.len;
  552. if (ref_us > exec_us + throttle_threshold_us)
  553. {
  554. // Pause reply
  555. tfd->set_timer_us(ref_us-exec_us, false, [this, op](int timer_id)
  556. {
  557. PRIV(op)->op_state++;
  558. ringloop->wakeup();
  559. });
  560. PRIV(op)->op_state = 5;
  561. return 1;
  562. }
  563. }
  564. }
  565. resume_6:
  566. // Acknowledge write
  567. op->retval = op->len;
  568. write_iodepth--;
  569. FINISH_OP(op);
  570. return 2;
  571. }
  572. void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op)
  573. {
  574. live = true;
  575. if (data->res != data->iov.iov_len)
  576. {
  577. // FIXME: our state becomes corrupted after a write error. maybe do something better than just die
  578. throw std::runtime_error(
  579. "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  580. "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
  581. );
  582. }
  583. PRIV(op)->pending_ops--;
  584. if (PRIV(op)->pending_ops == 0)
  585. {
  586. release_journal_sectors(op);
  587. PRIV(op)->op_state++;
  588. ringloop->wakeup();
  589. }
  590. }
  591. void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
  592. {
  593. // Release flushed journal sectors
  594. if (PRIV(op)->min_flushed_journal_sector > 0 &&
  595. PRIV(op)->max_flushed_journal_sector > 0)
  596. {
  597. uint64_t s = PRIV(op)->min_flushed_journal_sector;
  598. while (1)
  599. {
  600. journal.sector_info[s-1].flush_count--;
  601. if (s != (1+journal.cur_sector) && journal.sector_info[s-1].flush_count == 0)
  602. {
  603. // We know for sure that we won't write into this sector anymore
  604. uint64_t new_ds = journal.sector_info[s-1].offset + journal.block_size;
  605. if (new_ds >= journal.len)
  606. {
  607. new_ds = journal.block_size;
  608. }
  609. if ((journal.dirty_start + (journal.dirty_start >= journal.used_start ? 0 : journal.len)) <
  610. (new_ds + (new_ds >= journal.used_start ? 0 : journal.len)))
  611. {
  612. journal.dirty_start = new_ds;
  613. }
  614. }
  615. if (s == PRIV(op)->max_flushed_journal_sector)
  616. break;
  617. s = 1 + s % journal.sector_count;
  618. }
  619. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  620. }
  621. }
  622. int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
  623. {
  624. if (PRIV(op)->op_state)
  625. {
  626. return continue_write(op);
  627. }
  628. auto dirty_it = dirty_db.find((obj_ver_id){
  629. .oid = op->oid,
  630. .version = op->version,
  631. });
  632. assert(dirty_it != dirty_db.end());
  633. blockstore_journal_check_t space_check(this);
  634. if (!space_check.check_available(op, 1, sizeof(journal_entry_del), JOURNAL_STABILIZE_RESERVATION))
  635. {
  636. return 0;
  637. }
  638. write_iodepth++;
  639. io_uring_sqe *sqe = NULL;
  640. if (immediate_commit != IMMEDIATE_NONE ||
  641. (journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
  642. journal.sector_info[journal.cur_sector].dirty)
  643. {
  644. // Write current journal sector only if it's dirty and full, or in the immediate_commit mode
  645. BS_SUBMIT_GET_SQE_DECL(sqe);
  646. }
  647. auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
  648. // Prepare journal sector write
  649. if (immediate_commit == IMMEDIATE_NONE)
  650. {
  651. if (sqe)
  652. {
  653. prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
  654. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  655. PRIV(op)->pending_ops++;
  656. }
  657. else
  658. {
  659. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  660. }
  661. }
  662. // Pre-fill journal entry
  663. journal_entry_del *je = (journal_entry_del*)prefill_single_journal_entry(
  664. journal, JE_DELETE, sizeof(struct journal_entry_del)
  665. );
  666. dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
  667. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  668. #ifdef BLOCKSTORE_DEBUG
  669. printf(
  670. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  671. dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version,
  672. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  673. );
  674. #endif
  675. je->oid = op->oid;
  676. je->version = op->version;
  677. je->crc32 = je_crc32((journal_entry*)je);
  678. journal.crc32_last = je->crc32;
  679. dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED;
  680. if (immediate_commit != IMMEDIATE_NONE)
  681. {
  682. prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
  683. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  684. PRIV(op)->pending_ops++;
  685. }
  686. if (!PRIV(op)->pending_ops)
  687. {
  688. PRIV(op)->op_state = 4;
  689. return continue_write(op);
  690. }
  691. else
  692. {
  693. PRIV(op)->op_state = 3;
  694. }
  695. return 1;
  696. }