Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

388 lines
14 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "osd_primary.h"
  4. #include "allocator.h"
  5. // read: read directly or read paired stripe(s), reconstruct, return
  6. // write: read paired stripe(s), reconstruct, modify, calculate parity, write
  7. //
  8. // nuance: take care to read the same version from paired stripes!
  9. // to do so, we remember "last readable" version until a write request completes
  10. // and we postpone other write requests to the same stripe until completion of previous ones
  11. //
  12. // sync: sync peers, get unstable versions, stabilize them
  13. bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
  14. {
  15. // PG number is calculated from the offset
  16. // Our EC scheme stores data in fixed chunks equal to (K*block size)
  17. // K = (pg_size-parity_chunks) in case of EC/XOR, or 1 for replicated pools
  18. pool_id_t pool_id = INODE_POOL(cur_op->req.rw.inode);
  19. // Note: We read pool config here, so we must NOT change it when PGs are active
  20. auto pool_cfg_it = st_cli.pool_config.find(pool_id);
  21. if (pool_cfg_it == st_cli.pool_config.end())
  22. {
  23. // Pool config is not loaded yet
  24. finish_op(cur_op, -EPIPE);
  25. return false;
  26. }
  27. auto & pool_cfg = pool_cfg_it->second;
  28. // FIXME: op_data->pg_data_size can probably be removed (there's pg.pg_data_size)
  29. uint64_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
  30. uint64_t pg_block_size = bs_block_size * pg_data_size;
  31. object_id oid = {
  32. .inode = cur_op->req.rw.inode,
  33. // oid.stripe = starting offset of the parity stripe
  34. .stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
  35. };
  36. pg_num_t pg_num = (oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1; // like map_to_pg()
  37. auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
  38. if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
  39. {
  40. // This OSD is not primary for this PG or the PG is inactive
  41. // FIXME: Allow reads from PGs degraded under pg_minsize, but don't allow writes
  42. finish_op(cur_op, -EPIPE);
  43. return false;
  44. }
  45. if ((cur_op->req.rw.offset + cur_op->req.rw.len) > (oid.stripe + pg_block_size) ||
  46. (cur_op->req.rw.offset % bs_bitmap_granularity) != 0 ||
  47. (cur_op->req.rw.len % bs_bitmap_granularity) != 0)
  48. {
  49. finish_op(cur_op, -EINVAL);
  50. return false;
  51. }
  52. int stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
  53. int chain_size = 0;
  54. if (cur_op->req.hdr.opcode == OSD_OP_READ && cur_op->req.rw.meta_revision > 0)
  55. {
  56. // Chained read
  57. auto inode_it = st_cli.inode_config.find(cur_op->req.rw.inode);
  58. if (inode_it->second.mod_revision != cur_op->req.rw.meta_revision)
  59. {
  60. // Client view of the metadata differs from OSD's view
  61. // Operation can't be completed correctly, client should retry later
  62. finish_op(cur_op, -EPIPE);
  63. return false;
  64. }
  65. // Find parents from the same pool. Optimized reads only work within pools
  66. while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id &&
  67. INODE_POOL(inode_it->second.parent_id) == pg_it->second.pool_id)
  68. {
  69. chain_size++;
  70. inode_it = st_cli.inode_config.find(inode_it->second.parent_id);
  71. }
  72. if (chain_size)
  73. {
  74. // Add the original inode
  75. chain_size++;
  76. }
  77. }
  78. osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc_or_die(
  79. // Allocate:
  80. // - op_data
  81. 1, sizeof(osd_primary_op_data_t) +
  82. // - stripes
  83. // - resulting bitmap buffers
  84. stripe_count * (clean_entry_bitmap_size + sizeof(osd_rmw_stripe_t)) +
  85. chain_size * (
  86. // - copy of the chain
  87. sizeof(inode_t) +
  88. // - bitmap buffers for chained read
  89. stripe_count * clean_entry_bitmap_size +
  90. // - 'missing' flags for chained reads
  91. (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : pg_it->second.pg_size)
  92. )
  93. );
  94. void *data_buf = ((void*)op_data) + sizeof(osd_primary_op_data_t);
  95. op_data->pg_num = pg_num;
  96. op_data->oid = oid;
  97. op_data->stripes = (osd_rmw_stripe_t*)data_buf;
  98. data_buf += sizeof(osd_rmw_stripe_t) * stripe_count;
  99. op_data->scheme = pool_cfg.scheme;
  100. op_data->pg_data_size = pg_data_size;
  101. op_data->pg_size = pg_it->second.pg_size;
  102. cur_op->op_data = op_data;
  103. split_stripes(pg_data_size, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
  104. // Allocate bitmaps along with stripes to avoid extra allocations and fragmentation
  105. for (int i = 0; i < stripe_count; i++)
  106. {
  107. op_data->stripes[i].bmp_buf = data_buf;
  108. data_buf += clean_entry_bitmap_size;
  109. }
  110. op_data->chain_size = chain_size;
  111. if (chain_size > 0)
  112. {
  113. op_data->read_chain = (inode_t*)data_buf;
  114. data_buf += sizeof(inode_t) * chain_size;
  115. op_data->snapshot_bitmaps = data_buf;
  116. data_buf += chain_size * stripe_count * clean_entry_bitmap_size;
  117. op_data->missing_flags = (uint8_t*)data_buf;
  118. data_buf += chain_size * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : pg_it->second.pg_size);
  119. // Copy chain
  120. int chain_num = 0;
  121. op_data->read_chain[chain_num++] = cur_op->req.rw.inode;
  122. auto inode_it = st_cli.inode_config.find(cur_op->req.rw.inode);
  123. while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id)
  124. {
  125. op_data->read_chain[chain_num++] = inode_it->second.parent_id;
  126. inode_it = st_cli.inode_config.find(inode_it->second.parent_id);
  127. }
  128. }
  129. pg_it->second.inflight++;
  130. return true;
  131. }
  132. uint64_t* osd_t::get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state)
  133. {
  134. if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED)))
  135. {
  136. *object_state = NULL;
  137. return def;
  138. }
  139. auto st_it = pg.incomplete_objects.find(oid);
  140. if (st_it != pg.incomplete_objects.end())
  141. {
  142. *object_state = st_it->second;
  143. return st_it->second->read_target.data();
  144. }
  145. st_it = pg.degraded_objects.find(oid);
  146. if (st_it != pg.degraded_objects.end())
  147. {
  148. *object_state = st_it->second;
  149. return st_it->second->read_target.data();
  150. }
  151. st_it = pg.misplaced_objects.find(oid);
  152. if (st_it != pg.misplaced_objects.end())
  153. {
  154. *object_state = st_it->second;
  155. return st_it->second->read_target.data();
  156. }
  157. *object_state = NULL;
  158. return def;
  159. }
  160. void osd_t::continue_primary_read(osd_op_t *cur_op)
  161. {
  162. if (!cur_op->op_data && !prepare_primary_rw(cur_op))
  163. {
  164. return;
  165. }
  166. osd_primary_op_data_t *op_data = cur_op->op_data;
  167. if (op_data->chain_size)
  168. {
  169. continue_chained_read(cur_op);
  170. return;
  171. }
  172. if (op_data->st == 1)
  173. goto resume_1;
  174. else if (op_data->st == 2)
  175. goto resume_2;
  176. cur_op->reply.rw.bitmap_len = 0;
  177. {
  178. auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
  179. for (int role = 0; role < op_data->pg_data_size; role++)
  180. {
  181. op_data->stripes[role].read_start = op_data->stripes[role].req_start;
  182. op_data->stripes[role].read_end = op_data->stripes[role].req_end;
  183. }
  184. // Determine version
  185. auto vo_it = pg.ver_override.find(op_data->oid);
  186. op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
  187. if (pg.state == PG_ACTIVE || op_data->scheme == POOL_SCHEME_REPLICATED)
  188. {
  189. // Fast happy-path
  190. cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_data_size, 0);
  191. submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.cur_set.data(), cur_op);
  192. op_data->st = 1;
  193. }
  194. else
  195. {
  196. // PG may be degraded or have misplaced objects
  197. uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
  198. if (extend_missing_stripes(op_data->stripes, cur_set, op_data->pg_data_size, pg.pg_size) < 0)
  199. {
  200. finish_op(cur_op, -EIO);
  201. return;
  202. }
  203. // Submit reads
  204. op_data->pg_size = pg.pg_size;
  205. op_data->scheme = pg.scheme;
  206. op_data->degraded = 1;
  207. cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
  208. submit_primary_subops(SUBMIT_READ, op_data->target_ver, cur_set, cur_op);
  209. op_data->st = 1;
  210. }
  211. }
  212. resume_1:
  213. return;
  214. resume_2:
  215. if (op_data->errors > 0)
  216. {
  217. finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
  218. return;
  219. }
  220. cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size;
  221. if (op_data->degraded)
  222. {
  223. // Reconstruct missing stripes
  224. osd_rmw_stripe_t *stripes = op_data->stripes;
  225. if (op_data->scheme == POOL_SCHEME_XOR)
  226. {
  227. reconstruct_stripes_xor(stripes, op_data->pg_size, clean_entry_bitmap_size);
  228. }
  229. else if (op_data->scheme == POOL_SCHEME_JERASURE)
  230. {
  231. reconstruct_stripes_jerasure(stripes, op_data->pg_size, op_data->pg_data_size, clean_entry_bitmap_size);
  232. }
  233. cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
  234. for (int role = 0; role < op_data->pg_size; role++)
  235. {
  236. if (stripes[role].req_end != 0)
  237. {
  238. // Send buffer in parts to avoid copying
  239. cur_op->iov.push_back(
  240. stripes[role].read_buf + (stripes[role].req_start - stripes[role].read_start),
  241. stripes[role].req_end - stripes[role].req_start
  242. );
  243. }
  244. }
  245. }
  246. else
  247. {
  248. cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
  249. cur_op->iov.push_back(cur_op->buf, cur_op->req.rw.len);
  250. }
  251. finish_op(cur_op, cur_op->req.rw.len);
  252. }
  253. // Decrement pg_osd_set_state_t's object_count and change PG state accordingly
  254. void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg)
  255. {
  256. if (object_state->state & OBJ_INCOMPLETE)
  257. {
  258. // Successful write means that object is not incomplete anymore
  259. this->incomplete_objects--;
  260. pg.incomplete_objects.erase(oid);
  261. if (!pg.incomplete_objects.size())
  262. {
  263. pg.state = pg.state & ~PG_HAS_INCOMPLETE;
  264. report_pg_state(pg);
  265. }
  266. }
  267. else if (object_state->state & OBJ_DEGRADED)
  268. {
  269. this->degraded_objects--;
  270. pg.degraded_objects.erase(oid);
  271. if (!pg.degraded_objects.size())
  272. {
  273. pg.state = pg.state & ~PG_HAS_DEGRADED;
  274. report_pg_state(pg);
  275. }
  276. }
  277. else if (object_state->state & OBJ_MISPLACED)
  278. {
  279. this->misplaced_objects--;
  280. pg.misplaced_objects.erase(oid);
  281. if (!pg.misplaced_objects.size())
  282. {
  283. pg.state = pg.state & ~PG_HAS_MISPLACED;
  284. report_pg_state(pg);
  285. }
  286. }
  287. else
  288. {
  289. throw std::runtime_error("BUG: Invalid object state: "+std::to_string(object_state->state));
  290. }
  291. }
  292. void osd_t::free_object_state(pg_t & pg, pg_osd_set_state_t **object_state)
  293. {
  294. if (*object_state && !(--(*object_state)->object_count))
  295. {
  296. pg.state_dict.erase((*object_state)->osd_set);
  297. *object_state = NULL;
  298. }
  299. }
  300. void osd_t::continue_primary_del(osd_op_t *cur_op)
  301. {
  302. if (!cur_op->op_data && !prepare_primary_rw(cur_op))
  303. {
  304. return;
  305. }
  306. osd_primary_op_data_t *op_data = cur_op->op_data;
  307. auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
  308. if (op_data->st == 1) goto resume_1;
  309. else if (op_data->st == 2) goto resume_2;
  310. else if (op_data->st == 3) goto resume_3;
  311. else if (op_data->st == 4) goto resume_4;
  312. else if (op_data->st == 5) goto resume_5;
  313. assert(op_data->st == 0);
  314. // Delete is forbidden even in active PGs if they're also degraded or have previous dead OSDs
  315. if (pg.state & (PG_DEGRADED | PG_LEFT_ON_DEAD))
  316. {
  317. finish_op(cur_op, -EBUSY);
  318. return;
  319. }
  320. if (!check_write_queue(cur_op, pg))
  321. {
  322. return;
  323. }
  324. resume_1:
  325. // Determine which OSDs contain this object and delete it
  326. op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
  327. // Submit 1 read to determine the actual version number
  328. submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
  329. resume_2:
  330. op_data->st = 2;
  331. return;
  332. resume_3:
  333. if (op_data->errors > 0)
  334. {
  335. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  336. return;
  337. }
  338. // Save version override for parallel reads
  339. pg.ver_override[op_data->oid] = op_data->fact_ver;
  340. // Submit deletes
  341. op_data->fact_ver++;
  342. submit_primary_del_subops(cur_op, NULL, 0, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set);
  343. resume_4:
  344. op_data->st = 4;
  345. return;
  346. resume_5:
  347. if (op_data->errors > 0)
  348. {
  349. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  350. return;
  351. }
  352. // Remove version override
  353. pg.ver_override.erase(op_data->oid);
  354. // Adjust PG stats after "instant stabilize", because we need object_state above
  355. if (!op_data->object_state)
  356. {
  357. pg.clean_count--;
  358. }
  359. else
  360. {
  361. remove_object_from_state(op_data->oid, op_data->object_state, pg);
  362. free_object_state(pg, &op_data->object_state);
  363. }
  364. pg.total_count--;
  365. osd_op_t *next_op = NULL;
  366. auto next_it = pg.write_queue.find(op_data->oid);
  367. if (next_it != pg.write_queue.end() && next_it->second == cur_op)
  368. {
  369. pg.write_queue.erase(next_it++);
  370. if (next_it != pg.write_queue.end() && next_it->first == op_data->oid)
  371. next_op = next_it->second;
  372. }
  373. finish_op(cur_op, cur_op->req.rw.len);
  374. if (next_op)
  375. {
  376. // Continue next write to the same object
  377. continue_primary_write(next_op);
  378. }
  379. }