Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

271 lines
9.5 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "blockstore_impl.h"
  4. int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len,
  5. uint32_t item_state, uint64_t item_version)
  6. {
  7. if (!len)
  8. {
  9. // Zero-length version - skip
  10. return 1;
  11. }
  12. else if (IS_IN_FLIGHT(item_state))
  13. {
  14. // Write not finished yet - skip
  15. return 1;
  16. }
  17. else if (IS_DELETE(item_state))
  18. {
  19. // item is unallocated - return zeroes
  20. memset(buf, 0, len);
  21. return 1;
  22. }
  23. if (journal.inmemory && IS_JOURNAL(item_state))
  24. {
  25. memcpy(buf, journal.buffer + offset, len);
  26. return 1;
  27. }
  28. BS_SUBMIT_GET_SQE(sqe, data);
  29. data->iov = (struct iovec){ buf, len };
  30. PRIV(op)->pending_ops++;
  31. my_uring_prep_readv(
  32. sqe,
  33. IS_JOURNAL(item_state) ? journal.fd : data_fd,
  34. &data->iov, 1,
  35. (IS_JOURNAL(item_state) ? journal.offset : data_offset) + offset
  36. );
  37. data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); };
  38. return 1;
  39. }
  40. // FIXME I've seen a bug here so I want some tests
  41. int blockstore_impl_t::fulfill_read(blockstore_op_t *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
  42. uint32_t item_state, uint64_t item_version, uint64_t item_location)
  43. {
  44. uint32_t cur_start = item_start;
  45. if (cur_start < read_op->offset + read_op->len && item_end > read_op->offset)
  46. {
  47. cur_start = cur_start < read_op->offset ? read_op->offset : cur_start;
  48. item_end = item_end > read_op->offset + read_op->len ? read_op->offset + read_op->len : item_end;
  49. auto it = PRIV(read_op)->read_vec.begin();
  50. while (1)
  51. {
  52. for (; it != PRIV(read_op)->read_vec.end(); it++)
  53. {
  54. if (it->offset >= cur_start)
  55. {
  56. break;
  57. }
  58. else if (it->offset + it->len > cur_start)
  59. {
  60. cur_start = it->offset + it->len;
  61. if (cur_start >= item_end)
  62. {
  63. goto endwhile;
  64. }
  65. }
  66. }
  67. if (it == PRIV(read_op)->read_vec.end() || it->offset > cur_start)
  68. {
  69. fulfill_read_t el = {
  70. .offset = cur_start,
  71. .len = it == PRIV(read_op)->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start,
  72. };
  73. it = PRIV(read_op)->read_vec.insert(it, el);
  74. if (!fulfill_read_push(read_op,
  75. read_op->buf + el.offset - read_op->offset,
  76. item_location + el.offset - item_start,
  77. el.len, item_state, item_version))
  78. {
  79. return 0;
  80. }
  81. fulfilled += el.len;
  82. }
  83. cur_start = it->offset + it->len;
  84. if (it == PRIV(read_op)->read_vec.end() || cur_start >= item_end)
  85. {
  86. break;
  87. }
  88. }
  89. }
  90. endwhile:
  91. return 1;
  92. }
  93. uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offset)
  94. {
  95. uint8_t *clean_entry_bitmap;
  96. uint64_t meta_loc = block_loc >> block_order;
  97. if (inmemory_meta)
  98. {
  99. uint64_t sector = (meta_loc / (meta_block_size / clean_entry_size)) * meta_block_size;
  100. uint64_t pos = (meta_loc % (meta_block_size / clean_entry_size));
  101. clean_entry_bitmap = (uint8_t*)(metadata_buffer + sector + pos*clean_entry_size + sizeof(clean_disk_entry) + offset);
  102. }
  103. else
  104. clean_entry_bitmap = (uint8_t*)(clean_bitmap + meta_loc*(clean_entry_bitmap_size + entry_attr_size) + offset);
  105. return clean_entry_bitmap;
  106. }
  107. int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
  108. {
  109. auto clean_it = clean_db.find(read_op->oid);
  110. auto dirty_it = dirty_db.upper_bound((obj_ver_id){
  111. .oid = read_op->oid,
  112. .version = UINT64_MAX,
  113. });
  114. if (dirty_it != dirty_db.begin())
  115. dirty_it--;
  116. bool clean_found = clean_it != clean_db.end();
  117. bool dirty_found = (dirty_it != dirty_db.end() && dirty_it->first.oid == read_op->oid);
  118. if (!clean_found && !dirty_found)
  119. {
  120. // region is not allocated - return zeroes
  121. memset(read_op->buf, 0, read_op->len);
  122. read_op->version = 0;
  123. read_op->retval = read_op->len;
  124. FINISH_OP(read_op);
  125. return 1;
  126. }
  127. uint64_t fulfilled = 0;
  128. PRIV(read_op)->pending_ops = 0;
  129. uint64_t result_version = 0;
  130. if (dirty_found)
  131. {
  132. while (dirty_it->first.oid == read_op->oid)
  133. {
  134. dirty_entry& dirty = dirty_it->second;
  135. bool version_ok = read_op->version >= dirty_it->first.version;
  136. if (IS_SYNCED(dirty.state))
  137. {
  138. if (!version_ok && read_op->version != 0)
  139. read_op->version = dirty_it->first.version;
  140. version_ok = true;
  141. }
  142. if (version_ok)
  143. {
  144. if (!result_version)
  145. {
  146. result_version = dirty_it->first.version;
  147. if (read_op->bitmap)
  148. {
  149. void *bmp_ptr = (entry_attr_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap);
  150. memcpy(read_op->bitmap, bmp_ptr, entry_attr_size);
  151. }
  152. }
  153. if (!fulfill_read(read_op, fulfilled, dirty.offset, dirty.offset + dirty.len,
  154. dirty.state, dirty_it->first.version, dirty.location + (IS_JOURNAL(dirty.state) ? 0 : dirty.offset)))
  155. {
  156. // need to wait. undo added requests, don't dequeue op
  157. PRIV(read_op)->read_vec.clear();
  158. return 0;
  159. }
  160. }
  161. if (fulfilled == read_op->len || dirty_it == dirty_db.begin())
  162. {
  163. break;
  164. }
  165. dirty_it--;
  166. }
  167. }
  168. if (clean_it != clean_db.end())
  169. {
  170. if (!result_version)
  171. {
  172. result_version = clean_it->second.version;
  173. if (read_op->bitmap)
  174. {
  175. void *bmp_ptr = get_clean_entry_bitmap(clean_it->second.location, clean_entry_bitmap_size);
  176. memcpy(read_op->bitmap, bmp_ptr, entry_attr_size);
  177. }
  178. }
  179. if (fulfilled < read_op->len)
  180. {
  181. if (!clean_entry_bitmap_size)
  182. {
  183. if (!fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_BIG_WRITE | BS_ST_STABLE), 0, clean_it->second.location))
  184. {
  185. // need to wait. undo added requests, don't dequeue op
  186. PRIV(read_op)->read_vec.clear();
  187. return 0;
  188. }
  189. }
  190. else
  191. {
  192. uint8_t *clean_entry_bitmap = get_clean_entry_bitmap(clean_it->second.location, 0);
  193. uint64_t bmp_start = 0, bmp_end = 0, bmp_size = block_size/bitmap_granularity;
  194. while (bmp_start < bmp_size)
  195. {
  196. while (!(clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7))) && bmp_end < bmp_size)
  197. {
  198. bmp_end++;
  199. }
  200. if (bmp_end > bmp_start)
  201. {
  202. // fill with zeroes
  203. fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
  204. bmp_end * bitmap_granularity, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
  205. }
  206. bmp_start = bmp_end;
  207. while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
  208. {
  209. bmp_end++;
  210. }
  211. if (bmp_end > bmp_start)
  212. {
  213. if (!fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
  214. bmp_end * bitmap_granularity, (BS_ST_BIG_WRITE | BS_ST_STABLE), 0,
  215. clean_it->second.location + bmp_start * bitmap_granularity))
  216. {
  217. // need to wait. undo added requests, don't dequeue op
  218. PRIV(read_op)->read_vec.clear();
  219. return 0;
  220. }
  221. bmp_start = bmp_end;
  222. }
  223. }
  224. }
  225. }
  226. }
  227. else if (fulfilled < read_op->len)
  228. {
  229. // fill remaining parts with zeroes
  230. fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_DELETE | BS_ST_STABLE), 0, 0);
  231. }
  232. assert(fulfilled == read_op->len);
  233. read_op->version = result_version;
  234. if (!PRIV(read_op)->pending_ops)
  235. {
  236. // everything is fulfilled from memory
  237. if (!PRIV(read_op)->read_vec.size())
  238. {
  239. // region is not allocated - return zeroes
  240. memset(read_op->buf, 0, read_op->len);
  241. }
  242. read_op->retval = read_op->len;
  243. FINISH_OP(read_op);
  244. return 1;
  245. }
  246. read_op->retval = 0;
  247. return 1;
  248. }
  249. void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op)
  250. {
  251. live = true;
  252. PRIV(op)->pending_ops--;
  253. if (data->res != data->iov.iov_len)
  254. {
  255. // read error
  256. op->retval = data->res;
  257. }
  258. if (PRIV(op)->pending_ops == 0)
  259. {
  260. if (op->retval == 0)
  261. op->retval = op->len;
  262. FINISH_OP(op);
  263. }
  264. }