Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

311 lines
12 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. #define SYNC_HAS_SMALL 1
  5. #define SYNC_HAS_BIG 2
  6. #define SYNC_DATA_SYNC_SENT 3
  7. #define SYNC_DATA_SYNC_DONE 4
  8. #define SYNC_JOURNAL_WRITE_SENT 5
  9. #define SYNC_JOURNAL_WRITE_DONE 6
  10. #define SYNC_JOURNAL_SYNC_SENT 7
  11. #define SYNC_DONE 8
  12. int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
  13. {
  14. if (PRIV(op)->op_state == 0)
  15. {
  16. stop_sync_submitted = false;
  17. PRIV(op)->sync_big_writes.swap(unsynced_big_writes);
  18. PRIV(op)->sync_small_writes.swap(unsynced_small_writes);
  19. PRIV(op)->sync_small_checked = 0;
  20. PRIV(op)->sync_big_checked = 0;
  21. unsynced_big_writes.clear();
  22. unsynced_small_writes.clear();
  23. if (PRIV(op)->sync_big_writes.size() > 0)
  24. PRIV(op)->op_state = SYNC_HAS_BIG;
  25. else if (PRIV(op)->sync_small_writes.size() > 0)
  26. PRIV(op)->op_state = SYNC_HAS_SMALL;
  27. else
  28. PRIV(op)->op_state = SYNC_DONE;
  29. // Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes
  30. PRIV(op)->prev_sync_count = in_progress_syncs.size();
  31. PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
  32. }
  33. continue_sync(op);
  34. // Always dequeue because we always add syncs to in_progress_syncs
  35. return 1;
  36. }
  37. int blockstore_impl_t::continue_sync(blockstore_op_t *op)
  38. {
  39. auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
  40. if (PRIV(op)->op_state == SYNC_HAS_SMALL)
  41. {
  42. // No big writes, just fsync the journal
  43. for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
  44. {
  45. if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
  46. {
  47. // Wait for small inflight writes to complete
  48. return 0;
  49. }
  50. }
  51. if (journal.sector_info[journal.cur_sector].dirty)
  52. {
  53. // Write out the last journal sector if it happens to be dirty
  54. BS_SUBMIT_GET_ONLY_SQE(sqe);
  55. prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
  56. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  57. PRIV(op)->pending_ops = 1;
  58. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
  59. return 1;
  60. }
  61. else
  62. {
  63. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
  64. }
  65. }
  66. if (PRIV(op)->op_state == SYNC_HAS_BIG)
  67. {
  68. for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++)
  69. {
  70. if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_big_writes[PRIV(op)->sync_big_checked]].state))
  71. {
  72. // Wait for big inflight writes to complete
  73. return 0;
  74. }
  75. }
  76. // 1st step: fsync data
  77. if (!disable_data_fsync)
  78. {
  79. BS_SUBMIT_GET_SQE(sqe, data);
  80. my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
  81. data->iov = { 0 };
  82. data->callback = cb;
  83. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  84. PRIV(op)->pending_ops = 1;
  85. PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
  86. return 1;
  87. }
  88. else
  89. {
  90. PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
  91. }
  92. }
  93. if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE)
  94. {
  95. for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
  96. {
  97. if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
  98. {
  99. // Wait for small inflight writes to complete
  100. return 0;
  101. }
  102. }
  103. // 2nd step: Data device is synced, prepare & write journal entries
  104. // Check space in the journal and journal memory buffers
  105. blockstore_journal_check_t space_check(this);
  106. if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(), sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
  107. {
  108. return 0;
  109. }
  110. // Get SQEs. Don't bother about merging, submit each journal sector as a separate request
  111. struct io_uring_sqe *sqe[space_check.sectors_required];
  112. for (int i = 0; i < space_check.sectors_required; i++)
  113. {
  114. BS_SUBMIT_GET_SQE_DECL(sqe[i]);
  115. }
  116. // Prepare and submit journal entries
  117. auto it = PRIV(op)->sync_big_writes.begin();
  118. int s = 0, cur_sector = -1;
  119. if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_big_write) &&
  120. journal.sector_info[journal.cur_sector].dirty)
  121. {
  122. if (cur_sector == -1)
  123. PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
  124. cur_sector = journal.cur_sector;
  125. prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
  126. }
  127. while (it != PRIV(op)->sync_big_writes.end())
  128. {
  129. journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
  130. journal, (dirty_db[*it].state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
  131. sizeof(journal_entry_big_write)
  132. );
  133. dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset;
  134. journal.sector_info[journal.cur_sector].dirty = false;
  135. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  136. #ifdef BLOCKSTORE_DEBUG
  137. printf(
  138. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  139. dirty_db[*it].journal_sector, it->oid.inode, it->oid.stripe, it->version,
  140. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  141. );
  142. #endif
  143. je->oid = it->oid;
  144. je->version = it->version;
  145. je->offset = dirty_db[*it].offset;
  146. je->len = dirty_db[*it].len;
  147. je->location = dirty_db[*it].location;
  148. je->crc32 = je_crc32((journal_entry*)je);
  149. journal.crc32_last = je->crc32;
  150. it++;
  151. if (cur_sector != journal.cur_sector)
  152. {
  153. // Write previous sector. We should write the sector only after filling it,
  154. // because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode
  155. if (cur_sector != -1)
  156. prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
  157. else
  158. PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
  159. cur_sector = journal.cur_sector;
  160. }
  161. }
  162. if (cur_sector != -1)
  163. prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
  164. PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  165. PRIV(op)->pending_ops = s;
  166. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
  167. return 1;
  168. }
  169. if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_DONE)
  170. {
  171. if (!disable_journal_fsync)
  172. {
  173. BS_SUBMIT_GET_SQE(sqe, data);
  174. my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
  175. data->iov = { 0 };
  176. data->callback = cb;
  177. PRIV(op)->pending_ops = 1;
  178. PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
  179. return 1;
  180. }
  181. else
  182. {
  183. PRIV(op)->op_state = SYNC_DONE;
  184. }
  185. }
  186. if (PRIV(op)->op_state == SYNC_DONE)
  187. {
  188. return ack_sync(op);
  189. }
  190. return 1;
  191. }
  192. void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
  193. {
  194. live = true;
  195. if (data->res != data->iov.iov_len)
  196. {
  197. throw std::runtime_error(
  198. "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  199. "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
  200. );
  201. }
  202. PRIV(op)->pending_ops--;
  203. if (PRIV(op)->pending_ops == 0)
  204. {
  205. // Release used journal sectors
  206. release_journal_sectors(op);
  207. // Handle states
  208. if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT)
  209. {
  210. PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
  211. }
  212. else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT)
  213. {
  214. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
  215. }
  216. else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
  217. {
  218. PRIV(op)->op_state = SYNC_DONE;
  219. ack_sync(op);
  220. }
  221. else
  222. {
  223. throw std::runtime_error("BUG: unexpected sync op state");
  224. }
  225. }
  226. }
  227. int blockstore_impl_t::ack_sync(blockstore_op_t *op)
  228. {
  229. if (PRIV(op)->op_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0)
  230. {
  231. // Remove dependency of subsequent syncs
  232. auto it = PRIV(op)->in_progress_ptr;
  233. int done_syncs = 1;
  234. ++it;
  235. // Acknowledge sync
  236. ack_one_sync(op);
  237. while (it != in_progress_syncs.end())
  238. {
  239. auto & next_sync = *it++;
  240. PRIV(next_sync)->prev_sync_count -= done_syncs;
  241. if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->op_state == SYNC_DONE)
  242. {
  243. done_syncs++;
  244. // Acknowledge next_sync
  245. ack_one_sync(next_sync);
  246. }
  247. }
  248. return 2;
  249. }
  250. return 0;
  251. }
  252. void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
  253. {
  254. // Handle states
  255. for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++)
  256. {
  257. #ifdef BLOCKSTORE_DEBUG
  258. printf("Ack sync big %lx:%lx v%lu\n", it->oid.inode, it->oid.stripe, it->version);
  259. #endif
  260. auto & unstab = unstable_writes[it->oid];
  261. unstab = unstab < it->version ? it->version : unstab;
  262. auto dirty_it = dirty_db.find(*it);
  263. dirty_it->second.state = ((dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED);
  264. if (dirty_it->second.state & BS_ST_INSTANT)
  265. {
  266. mark_stable(dirty_it->first);
  267. }
  268. dirty_it++;
  269. while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
  270. {
  271. if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
  272. {
  273. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
  274. }
  275. dirty_it++;
  276. }
  277. }
  278. for (auto it = PRIV(op)->sync_small_writes.begin(); it != PRIV(op)->sync_small_writes.end(); it++)
  279. {
  280. #ifdef BLOCKSTORE_DEBUG
  281. printf("Ack sync small %lx:%lx v%lu\n", it->oid.inode, it->oid.stripe, it->version);
  282. #endif
  283. auto & unstab = unstable_writes[it->oid];
  284. unstab = unstab < it->version ? it->version : unstab;
  285. if (dirty_db[*it].state == (BS_ST_DELETE | BS_ST_WRITTEN))
  286. {
  287. dirty_db[*it].state = (BS_ST_DELETE | BS_ST_SYNCED);
  288. // Deletions are treated as immediately stable
  289. mark_stable(*it);
  290. }
  291. else /* (BS_ST_INSTANT?) | BS_ST_SMALL_WRITE | BS_ST_WRITTEN */
  292. {
  293. dirty_db[*it].state = (dirty_db[*it].state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED;
  294. if (dirty_db[*it].state & BS_ST_INSTANT)
  295. {
  296. mark_stable(*it);
  297. }
  298. }
  299. }
  300. in_progress_syncs.erase(PRIV(op)->in_progress_ptr);
  301. op->retval = 0;
  302. FINISH_OP(op);
  303. }