Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

297 lines
9.3 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op)
  5. {
  6. if (PRIV(op)->op_state)
  7. {
  8. return continue_rollback(op);
  9. }
  10. obj_ver_id *v, *nv;
  11. int i, todo = op->len;
  12. for (i = 0, v = (obj_ver_id*)op->buf, nv = (obj_ver_id*)op->buf; i < op->len; i++, v++, nv++)
  13. {
  14. if (nv != v)
  15. {
  16. *nv = *v;
  17. }
  18. // Check that there are some versions greater than v->version (which may be zero),
  19. // check that they're unstable, synced, and not currently written to
  20. auto dirty_it = dirty_db.lower_bound((obj_ver_id){
  21. .oid = v->oid,
  22. .version = UINT64_MAX,
  23. });
  24. if (dirty_it == dirty_db.begin())
  25. {
  26. skip_ov:
  27. // Already rolled back, skip this object version
  28. todo--;
  29. nv--;
  30. continue;
  31. }
  32. else
  33. {
  34. dirty_it--;
  35. if (dirty_it->first.oid != v->oid || dirty_it->first.version < v->version)
  36. {
  37. goto skip_ov;
  38. }
  39. while (dirty_it->first.oid == v->oid && dirty_it->first.version > v->version)
  40. {
  41. if (IS_IN_FLIGHT(dirty_it->second.state))
  42. {
  43. // Object write is still in progress. Wait until the write request completes
  44. return 0;
  45. }
  46. else if (!IS_SYNCED(dirty_it->second.state) ||
  47. IS_STABLE(dirty_it->second.state))
  48. {
  49. op->retval = -EBUSY;
  50. FINISH_OP(op);
  51. return 1;
  52. }
  53. if (dirty_it == dirty_db.begin())
  54. {
  55. break;
  56. }
  57. dirty_it--;
  58. }
  59. }
  60. }
  61. op->len = todo;
  62. if (!todo)
  63. {
  64. // Already rolled back
  65. op->retval = 0;
  66. FINISH_OP(op);
  67. return 1;
  68. }
  69. // Check journal space
  70. blockstore_journal_check_t space_check(this);
  71. if (!space_check.check_available(op, todo, sizeof(journal_entry_rollback), 0))
  72. {
  73. return 0;
  74. }
  75. // There is sufficient space. Get SQEs
  76. struct io_uring_sqe *sqe[space_check.sectors_required];
  77. for (i = 0; i < space_check.sectors_required; i++)
  78. {
  79. BS_SUBMIT_GET_SQE_DECL(sqe[i]);
  80. }
  81. // Prepare and submit journal entries
  82. auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
  83. int s = 0, cur_sector = -1;
  84. if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_rollback) &&
  85. journal.sector_info[journal.cur_sector].dirty)
  86. {
  87. if (cur_sector == -1)
  88. PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
  89. cur_sector = journal.cur_sector;
  90. prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
  91. }
  92. for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
  93. {
  94. journal_entry_rollback *je = (journal_entry_rollback*)
  95. prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback));
  96. journal.sector_info[journal.cur_sector].dirty = false;
  97. je->oid = v->oid;
  98. je->version = v->version;
  99. je->crc32 = je_crc32((journal_entry*)je);
  100. journal.crc32_last = je->crc32;
  101. if (cur_sector != journal.cur_sector)
  102. {
  103. // Write previous sector. We should write the sector only after filling it,
  104. // because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode
  105. if (cur_sector != -1)
  106. prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
  107. else
  108. PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
  109. cur_sector = journal.cur_sector;
  110. }
  111. }
  112. if (cur_sector != -1)
  113. prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
  114. PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
  115. PRIV(op)->pending_ops = s;
  116. PRIV(op)->op_state = 1;
  117. return 1;
  118. }
  119. int blockstore_impl_t::continue_rollback(blockstore_op_t *op)
  120. {
  121. if (PRIV(op)->op_state == 2)
  122. goto resume_2;
  123. else if (PRIV(op)->op_state == 3)
  124. goto resume_3;
  125. else if (PRIV(op)->op_state == 5)
  126. goto resume_5;
  127. else
  128. return 1;
  129. resume_2:
  130. // Release used journal sectors
  131. release_journal_sectors(op);
  132. resume_3:
  133. if (!disable_journal_fsync)
  134. {
  135. io_uring_sqe *sqe = get_sqe();
  136. if (!sqe)
  137. {
  138. return 0;
  139. }
  140. ring_data_t *data = ((ring_data_t*)sqe->user_data);
  141. my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
  142. data->iov = { 0 };
  143. data->callback = [this, op](ring_data_t *data) { handle_rollback_event(data, op); };
  144. PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
  145. PRIV(op)->pending_ops = 1;
  146. PRIV(op)->op_state = 4;
  147. return 1;
  148. }
  149. resume_5:
  150. obj_ver_id* v;
  151. int i;
  152. for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
  153. {
  154. mark_rolled_back(*v);
  155. }
  156. flusher->mark_trim_possible();
  157. // Acknowledge op
  158. op->retval = 0;
  159. FINISH_OP(op);
  160. return 1;
  161. }
  162. void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov)
  163. {
  164. auto it = dirty_db.lower_bound((obj_ver_id){
  165. .oid = ov.oid,
  166. .version = UINT64_MAX,
  167. });
  168. if (it != dirty_db.begin())
  169. {
  170. uint64_t max_unstable = 0;
  171. auto rm_start = it;
  172. auto rm_end = it;
  173. it--;
  174. while (1)
  175. {
  176. if (it->first.oid != ov.oid)
  177. break;
  178. else if (it->first.version <= ov.version)
  179. {
  180. if (!IS_STABLE(it->second.state))
  181. max_unstable = it->first.version;
  182. break;
  183. }
  184. else if (IS_IN_FLIGHT(it->second.state) || IS_STABLE(it->second.state))
  185. break;
  186. // Remove entry
  187. rm_start = it;
  188. if (it == dirty_db.begin())
  189. break;
  190. it--;
  191. }
  192. if (rm_start != rm_end)
  193. {
  194. erase_dirty(rm_start, rm_end, UINT64_MAX);
  195. auto unstab_it = unstable_writes.find(ov.oid);
  196. if (unstab_it != unstable_writes.end())
  197. {
  198. if (max_unstable == 0)
  199. unstable_writes.erase(unstab_it);
  200. else
  201. unstab_it->second = max_unstable;
  202. }
  203. }
  204. }
  205. }
  206. void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op)
  207. {
  208. live = true;
  209. if (data->res != data->iov.iov_len)
  210. {
  211. throw std::runtime_error(
  212. "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+
  213. "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"
  214. );
  215. }
  216. PRIV(op)->pending_ops--;
  217. if (PRIV(op)->pending_ops == 0)
  218. {
  219. PRIV(op)->op_state++;
  220. if (!continue_rollback(op))
  221. {
  222. submit_queue.push_front(op);
  223. }
  224. }
  225. }
  226. void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc)
  227. {
  228. if (dirty_end == dirty_start)
  229. {
  230. return;
  231. }
  232. auto dirty_it = dirty_end;
  233. dirty_it--;
  234. if (IS_DELETE(dirty_it->second.state))
  235. {
  236. object_id oid = dirty_it->first.oid;
  237. #ifdef BLOCKSTORE_DEBUG
  238. printf("Unblock writes-after-delete %lx:%lx v%lx\n", oid.inode, oid.stripe, dirty_it->first.version);
  239. #endif
  240. dirty_it = dirty_end;
  241. // Unblock operations blocked by delete flushing
  242. uint32_t next_state = BS_ST_IN_FLIGHT;
  243. while (dirty_it != dirty_db.end() && dirty_it->first.oid == oid)
  244. {
  245. if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_DEL)
  246. {
  247. dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | next_state;
  248. if (IS_BIG_WRITE(dirty_it->second.state))
  249. {
  250. next_state = BS_ST_WAIT_BIG;
  251. }
  252. }
  253. dirty_it++;
  254. }
  255. dirty_it = dirty_end;
  256. dirty_it--;
  257. }
  258. while (1)
  259. {
  260. if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
  261. {
  262. #ifdef BLOCKSTORE_DEBUG
  263. printf("Free block %lu\n", dirty_it->second.location >> block_order);
  264. #endif
  265. data_alloc->set(dirty_it->second.location >> block_order, false);
  266. }
  267. int used = --journal.used_sectors[dirty_it->second.journal_sector];
  268. #ifdef BLOCKSTORE_DEBUG
  269. printf(
  270. "remove usage of journal offset %08lx by %lx:%lx v%lu (%d refs)\n", dirty_it->second.journal_sector,
  271. dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, used
  272. );
  273. #endif
  274. if (used == 0)
  275. {
  276. journal.used_sectors.erase(dirty_it->second.journal_sector);
  277. }
  278. if (entry_attr_size > sizeof(void*))
  279. {
  280. free(dirty_it->second.bitmap);
  281. dirty_it->second.bitmap = NULL;
  282. }
  283. if (dirty_it == dirty_start)
  284. {
  285. break;
  286. }
  287. dirty_it--;
  288. }
  289. dirty_db.erase(dirty_start, dirty_end);
  290. }