Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

254 lines
9.7 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. #define SYNC_HAS_SMALL 1
  5. #define SYNC_HAS_BIG 2
  6. #define SYNC_DATA_SYNC_SENT 3
  7. #define SYNC_DATA_SYNC_DONE 4
  8. #define SYNC_JOURNAL_WRITE_SENT 5
  9. #define SYNC_JOURNAL_WRITE_DONE 6
  10. #define SYNC_JOURNAL_SYNC_SENT 7
  11. #define SYNC_DONE 8
  12. int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync)
  13. {
  14. if (immediate_commit == IMMEDIATE_ALL)
  15. {
  16. // We can return immediately because sync is only dequeued after all previous writes
  17. op->retval = 0;
  18. FINISH_OP(op);
  19. return 2;
  20. }
  21. if (PRIV(op)->op_state == 0)
  22. {
  23. stop_sync_submitted = false;
  24. unsynced_big_write_count -= unsynced_big_writes.size();
  25. PRIV(op)->sync_big_writes.swap(unsynced_big_writes);
  26. PRIV(op)->sync_small_writes.swap(unsynced_small_writes);
  27. PRIV(op)->sync_small_checked = 0;
  28. PRIV(op)->sync_big_checked = 0;
  29. unsynced_big_writes.clear();
  30. unsynced_small_writes.clear();
  31. if (PRIV(op)->sync_big_writes.size() > 0)
  32. PRIV(op)->op_state = SYNC_HAS_BIG;
  33. else if (PRIV(op)->sync_small_writes.size() > 0)
  34. PRIV(op)->op_state = SYNC_HAS_SMALL;
  35. else
  36. PRIV(op)->op_state = SYNC_DONE;
  37. }
  38. if (PRIV(op)->op_state == SYNC_HAS_SMALL)
  39. {
  40. // No big writes, just fsync the journal
  41. if (journal.sector_info[journal.cur_sector].dirty)
  42. {
  43. // Write out the last journal sector if it happens to be dirty
  44. BS_SUBMIT_GET_ONLY_SQE(sqe);
  45. prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); });
  46. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  47. PRIV(op)->pending_ops = 1;
  48. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
  49. return 1;
  50. }
  51. else
  52. {
  53. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
  54. }
  55. }
  56. if (PRIV(op)->op_state == SYNC_HAS_BIG)
  57. {
  58. // 1st step: fsync data
  59. if (!disable_data_fsync)
  60. {
  61. BS_SUBMIT_GET_SQE(sqe, data);
  62. my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
  63. data->iov = { 0 };
  64. data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
  65. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  66. PRIV(op)->pending_ops = 1;
  67. PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
  68. return 1;
  69. }
  70. else
  71. {
  72. PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
  73. }
  74. }
  75. if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE)
  76. {
  77. // 2nd step: Data device is synced, prepare & write journal entries
  78. // Check space in the journal and journal memory buffers
  79. blockstore_journal_check_t space_check(this);
  80. if (!space_check.check_available(op, PRIV(op)->sync_big_writes.size(),
  81. sizeof(journal_entry_big_write) + clean_entry_bitmap_size, JOURNAL_STABILIZE_RESERVATION))
  82. {
  83. return 0;
  84. }
  85. // Get SQEs. Don't bother about merging, submit each journal sector as a separate request
  86. struct io_uring_sqe *sqe[space_check.sectors_to_write];
  87. for (int i = 0; i < space_check.sectors_to_write; i++)
  88. {
  89. BS_SUBMIT_GET_SQE_DECL(sqe[i]);
  90. }
  91. // Prepare and submit journal entries
  92. auto it = PRIV(op)->sync_big_writes.begin();
  93. int s = 0, cur_sector = -1;
  94. while (it != PRIV(op)->sync_big_writes.end())
  95. {
  96. if (!journal.entry_fits(sizeof(journal_entry_big_write) + clean_entry_bitmap_size) &&
  97. journal.sector_info[journal.cur_sector].dirty)
  98. {
  99. if (cur_sector == -1)
  100. PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
  101. prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
  102. cur_sector = journal.cur_sector;
  103. }
  104. auto & dirty_entry = dirty_db.at(*it);
  105. journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
  106. journal, (dirty_entry.state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
  107. sizeof(journal_entry_big_write) + clean_entry_bitmap_size
  108. );
  109. dirty_entry.journal_sector = journal.sector_info[journal.cur_sector].offset;
  110. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
  111. #ifdef BLOCKSTORE_DEBUG
  112. printf(
  113. "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
  114. dirty_entry.journal_sector, it->oid.inode, it->oid.stripe, it->version,
  115. journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
  116. );
  117. #endif
  118. je->oid = it->oid;
  119. je->version = it->version;
  120. je->offset = dirty_entry.offset;
  121. je->len = dirty_entry.len;
  122. je->location = dirty_entry.location;
  123. memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*)
  124. ? dirty_entry.bitmap : &dirty_entry.bitmap), clean_entry_bitmap_size);
  125. je->crc32 = je_crc32((journal_entry*)je);
  126. journal.crc32_last = je->crc32;
  127. it++;
  128. }
  129. prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
  130. assert(s == space_check.sectors_to_write);
  131. if (cur_sector == -1)
  132. PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
  133. PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  134. PRIV(op)->pending_ops = s;
  135. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
  136. return 1;
  137. }
  138. if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_DONE)
  139. {
  140. if (!disable_journal_fsync)
  141. {
  142. BS_SUBMIT_GET_SQE(sqe, data);
  143. my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
  144. data->iov = { 0 };
  145. data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
  146. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  147. PRIV(op)->pending_ops = 1;
  148. PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
  149. return 1;
  150. }
  151. else
  152. {
  153. PRIV(op)->op_state = SYNC_DONE;
  154. }
  155. }
  156. if (PRIV(op)->op_state == SYNC_DONE && !queue_has_in_progress_sync)
  157. {
  158. ack_sync(op);
  159. return 2;
  160. }
  161. return 1;
  162. }
  163. void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
  164. {
  165. live = true;
  166. if (data->res != data->iov.iov_len)
  167. {
  168. throw std::runtime_error(
  169. "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  170. "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
  171. );
  172. }
  173. PRIV(op)->pending_ops--;
  174. if (PRIV(op)->pending_ops == 0)
  175. {
  176. // Release used journal sectors
  177. release_journal_sectors(op);
  178. // Handle states
  179. if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT)
  180. {
  181. PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
  182. }
  183. else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT)
  184. {
  185. PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
  186. }
  187. else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
  188. {
  189. PRIV(op)->op_state = SYNC_DONE;
  190. }
  191. else
  192. {
  193. throw std::runtime_error("BUG: unexpected sync op state");
  194. }
  195. ringloop->wakeup();
  196. }
  197. }
  198. void blockstore_impl_t::ack_sync(blockstore_op_t *op)
  199. {
  200. // Handle states
  201. for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++)
  202. {
  203. #ifdef BLOCKSTORE_DEBUG
  204. printf("Ack sync big %lx:%lx v%lu\n", it->oid.inode, it->oid.stripe, it->version);
  205. #endif
  206. auto & unstab = unstable_writes[it->oid];
  207. unstab = unstab < it->version ? it->version : unstab;
  208. auto dirty_it = dirty_db.find(*it);
  209. dirty_it->second.state = ((dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED);
  210. if (dirty_it->second.state & BS_ST_INSTANT)
  211. {
  212. mark_stable(dirty_it->first);
  213. }
  214. dirty_it++;
  215. while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
  216. {
  217. if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
  218. {
  219. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
  220. }
  221. dirty_it++;
  222. }
  223. }
  224. for (auto it = PRIV(op)->sync_small_writes.begin(); it != PRIV(op)->sync_small_writes.end(); it++)
  225. {
  226. #ifdef BLOCKSTORE_DEBUG
  227. printf("Ack sync small %lx:%lx v%lu\n", it->oid.inode, it->oid.stripe, it->version);
  228. #endif
  229. auto & unstab = unstable_writes[it->oid];
  230. unstab = unstab < it->version ? it->version : unstab;
  231. if (dirty_db[*it].state == (BS_ST_DELETE | BS_ST_WRITTEN))
  232. {
  233. dirty_db[*it].state = (BS_ST_DELETE | BS_ST_SYNCED);
  234. // Deletions are treated as immediately stable
  235. mark_stable(*it);
  236. }
  237. else /* (BS_ST_INSTANT?) | BS_ST_SMALL_WRITE | BS_ST_WRITTEN */
  238. {
  239. dirty_db[*it].state = (dirty_db[*it].state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED;
  240. if (dirty_db[*it].state & BS_ST_INSTANT)
  241. {
  242. mark_stable(*it);
  243. }
  244. }
  245. }
  246. op->retval = 0;
  247. FINISH_OP(op);
  248. }