Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

564 lines
22 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "osd_primary.h"
  4. #include "allocator.h"
  5. void osd_t::continue_chained_read(osd_op_t *cur_op)
  6. {
  7. osd_primary_op_data_t *op_data = cur_op->op_data;
  8. auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
  9. if (op_data->st == 1)
  10. goto resume_1;
  11. else if (op_data->st == 2)
  12. goto resume_2;
  13. else if (op_data->st == 3)
  14. goto resume_3;
  15. else if (op_data->st == 4)
  16. goto resume_4;
  17. cur_op->reply.rw.bitmap_len = 0;
  18. for (int role = 0; role < op_data->pg_data_size; role++)
  19. {
  20. op_data->stripes[role].read_start = op_data->stripes[role].req_start;
  21. op_data->stripes[role].read_end = op_data->stripes[role].req_end;
  22. }
  23. resume_1:
  24. resume_2:
  25. // Read bitmaps
  26. if (read_bitmaps(cur_op, pg, 1) != 0)
  27. return;
  28. // Prepare & submit reads
  29. if (submit_chained_read_requests(pg, cur_op) != 0)
  30. return;
  31. if (op_data->n_subops > 0)
  32. {
  33. // Wait for reads
  34. op_data->st = 3;
  35. resume_3:
  36. return;
  37. }
  38. resume_4:
  39. if (op_data->errors > 0)
  40. {
  41. free(op_data->chain_reads);
  42. op_data->chain_reads = NULL;
  43. finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
  44. return;
  45. }
  46. send_chained_read_results(pg, cur_op);
  47. finish_op(cur_op, cur_op->req.rw.len);
  48. }
  49. int osd_t::read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state)
  50. {
  51. osd_primary_op_data_t *op_data = cur_op->op_data;
  52. if (op_data->st == base_state)
  53. goto resume_0;
  54. else if (op_data->st == base_state+1)
  55. goto resume_1;
  56. if (pg.state == PG_ACTIVE && pg.scheme == POOL_SCHEME_REPLICATED)
  57. {
  58. // Happy path for clean replicated PGs (all bitmaps are available locally)
  59. for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++)
  60. {
  61. object_id cur_oid = { .inode = op_data->read_chain[chain_num], .stripe = op_data->oid.stripe };
  62. auto vo_it = pg.ver_override.find(cur_oid);
  63. auto read_version = (vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
  64. // Read bitmap synchronously from the local database
  65. bs->read_bitmap(
  66. cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
  67. !chain_num ? &cur_op->reply.rw.version : NULL
  68. );
  69. }
  70. }
  71. else
  72. {
  73. if (submit_bitmap_subops(cur_op, pg) < 0)
  74. {
  75. // Failure
  76. finish_op(cur_op, -EIO);
  77. return -1;
  78. }
  79. resume_0:
  80. if (op_data->n_subops > 0)
  81. {
  82. // Wait for subops
  83. op_data->st = base_state;
  84. return 1;
  85. }
  86. resume_1:
  87. if (pg.scheme != POOL_SCHEME_REPLICATED)
  88. {
  89. for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++)
  90. {
  91. // Check if we need to reconstruct any bitmaps
  92. for (int i = 0; i < pg.pg_size; i++)
  93. {
  94. if (op_data->missing_flags[chain_num*pg.pg_size + i])
  95. {
  96. osd_rmw_stripe_t local_stripes[pg.pg_size] = { 0 };
  97. for (i = 0; i < pg.pg_size; i++)
  98. {
  99. local_stripes[i].missing = op_data->missing_flags[chain_num*pg.pg_size + i] && true;
  100. local_stripes[i].bmp_buf = op_data->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size;
  101. local_stripes[i].read_start = local_stripes[i].read_end = 1;
  102. }
  103. if (pg.scheme == POOL_SCHEME_XOR)
  104. {
  105. reconstruct_stripes_xor(local_stripes, pg.pg_size, clean_entry_bitmap_size);
  106. }
  107. else if (pg.scheme == POOL_SCHEME_JERASURE)
  108. {
  109. reconstruct_stripes_jerasure(local_stripes, pg.pg_size, pg.pg_data_size, clean_entry_bitmap_size);
  110. }
  111. break;
  112. }
  113. }
  114. }
  115. }
  116. }
  117. return 0;
  118. }
  119. int osd_t::collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector<bitmap_request_t> & bitmap_requests)
  120. {
  121. osd_primary_op_data_t *op_data = cur_op->op_data;
  122. for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++)
  123. {
  124. object_id cur_oid = { .inode = op_data->read_chain[chain_num], .stripe = op_data->oid.stripe };
  125. auto vo_it = pg.ver_override.find(cur_oid);
  126. uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
  127. pg_osd_set_state_t *object_state;
  128. uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
  129. if (pg.scheme == POOL_SCHEME_REPLICATED)
  130. {
  131. osd_num_t read_target = 0;
  132. for (int i = 0; i < pg.pg_size; i++)
  133. {
  134. if (cur_set[i] == this->osd_num || cur_set[i] != 0 && read_target == 0)
  135. {
  136. // Select local or any other available OSD for reading
  137. read_target = cur_set[i];
  138. }
  139. }
  140. assert(read_target != 0);
  141. bitmap_requests.push_back((bitmap_request_t){
  142. .osd_num = read_target,
  143. .oid = cur_oid,
  144. .version = target_version,
  145. .bmp_buf = op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
  146. });
  147. }
  148. else
  149. {
  150. osd_rmw_stripe_t local_stripes[pg.pg_size];
  151. memcpy(local_stripes, op_data->stripes, sizeof(osd_rmw_stripe_t) * pg.pg_size);
  152. if (extend_missing_stripes(local_stripes, cur_set, pg.pg_data_size, pg.pg_size) < 0)
  153. {
  154. free(op_data->snapshot_bitmaps);
  155. return -1;
  156. }
  157. int need_at_least = 0;
  158. for (int i = 0; i < pg.pg_size; i++)
  159. {
  160. if (local_stripes[i].read_end != 0 && cur_set[i] == 0)
  161. {
  162. // We need this part of the bitmap, but it's unavailable
  163. need_at_least = pg.pg_data_size;
  164. op_data->missing_flags[chain_num*pg.pg_size + i] = 1;
  165. }
  166. else
  167. {
  168. op_data->missing_flags[chain_num*pg.pg_size + i] = 0;
  169. }
  170. }
  171. int found = 0;
  172. for (int i = 0; i < pg.pg_size; i++)
  173. {
  174. if (cur_set[i] != 0 && (local_stripes[i].read_end != 0 || found < need_at_least))
  175. {
  176. // Read part of the bitmap
  177. bitmap_requests.push_back((bitmap_request_t){
  178. .osd_num = cur_set[i],
  179. .oid = {
  180. .inode = cur_oid.inode,
  181. .stripe = cur_oid.stripe | i,
  182. },
  183. .version = target_version,
  184. .bmp_buf = op_data->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size,
  185. });
  186. found++;
  187. }
  188. }
  189. // Already checked by extend_missing_stripes, so it's fine to use assert
  190. assert(found >= need_at_least);
  191. }
  192. }
  193. std::sort(bitmap_requests.begin(), bitmap_requests.end());
  194. return 0;
  195. }
  196. int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
  197. {
  198. osd_primary_op_data_t *op_data = cur_op->op_data;
  199. std::vector<bitmap_request_t> *bitmap_requests = new std::vector<bitmap_request_t>();
  200. if (collect_bitmap_requests(cur_op, pg, *bitmap_requests) < 0)
  201. {
  202. return -1;
  203. }
  204. op_data->n_subops = 0;
  205. for (int i = 0; i < bitmap_requests->size(); i++)
  206. {
  207. if ((i == bitmap_requests->size()-1 || (*bitmap_requests)[i+1].osd_num != (*bitmap_requests)[i].osd_num) &&
  208. (*bitmap_requests)[i].osd_num != this->osd_num)
  209. {
  210. op_data->n_subops++;
  211. }
  212. }
  213. if (op_data->n_subops)
  214. {
  215. op_data->fact_ver = 0;
  216. op_data->done = op_data->errors = 0;
  217. op_data->subops = new osd_op_t[op_data->n_subops];
  218. }
  219. for (int i = 0, subop_idx = 0, prev = 0; i < bitmap_requests->size(); i++)
  220. {
  221. if (i == bitmap_requests->size()-1 || (*bitmap_requests)[i+1].osd_num != (*bitmap_requests)[i].osd_num)
  222. {
  223. osd_num_t subop_osd_num = (*bitmap_requests)[i].osd_num;
  224. if (subop_osd_num == this->osd_num)
  225. {
  226. // Read bitmap synchronously from the local database
  227. for (int j = prev; j <= i; j++)
  228. {
  229. bs->read_bitmap(
  230. (*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf,
  231. (*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode ? &cur_op->reply.rw.version : NULL
  232. );
  233. }
  234. }
  235. else
  236. {
  237. // Send to a remote OSD
  238. osd_op_t *subop = op_data->subops+subop_idx;
  239. subop->op_type = OSD_OP_OUT;
  240. subop->peer_fd = msgr.osd_peer_fds.at(subop_osd_num);
  241. // FIXME: Use the pre-allocated buffer
  242. subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
  243. subop->req = (osd_any_op_t){
  244. .sec_read_bmp = {
  245. .header = {
  246. .magic = SECONDARY_OSD_OP_MAGIC,
  247. .id = msgr.next_subop_id++,
  248. .opcode = OSD_OP_SEC_READ_BMP,
  249. },
  250. .len = sizeof(obj_ver_id)*(i+1-prev),
  251. }
  252. };
  253. obj_ver_id *ov = (obj_ver_id*)subop->buf;
  254. for (int j = prev; j <= i; j++, ov++)
  255. {
  256. ov->oid = (*bitmap_requests)[j].oid;
  257. ov->version = (*bitmap_requests)[j].version;
  258. }
  259. subop->callback = [cur_op, bitmap_requests, prev, i, this](osd_op_t *subop)
  260. {
  261. int requested_count = subop->req.sec_read_bmp.len / sizeof(obj_ver_id);
  262. if (subop->reply.hdr.retval == requested_count * (8 + clean_entry_bitmap_size))
  263. {
  264. void *cur_buf = subop->buf + 8;
  265. for (int j = prev; j <= i; j++)
  266. {
  267. memcpy((*bitmap_requests)[j].bmp_buf, cur_buf, clean_entry_bitmap_size);
  268. if ((*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode)
  269. {
  270. memcpy(&cur_op->reply.rw.version, cur_buf-8, 8);
  271. }
  272. cur_buf += 8 + clean_entry_bitmap_size;
  273. }
  274. }
  275. if ((cur_op->op_data->errors + cur_op->op_data->done + 1) >= cur_op->op_data->n_subops)
  276. {
  277. delete bitmap_requests;
  278. }
  279. handle_primary_subop(subop, cur_op);
  280. };
  281. msgr.outbox_push(subop);
  282. subop_idx++;
  283. }
  284. prev = i+1;
  285. }
  286. }
  287. if (!op_data->n_subops)
  288. {
  289. delete bitmap_requests;
  290. }
  291. return 0;
  292. }
  293. std::vector<osd_chain_read_t> osd_t::collect_chained_read_requests(osd_op_t *cur_op)
  294. {
  295. osd_primary_op_data_t *op_data = cur_op->op_data;
  296. std::vector<osd_chain_read_t> chain_reads;
  297. int stripe_count = (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : op_data->pg_size);
  298. memset(op_data->stripes[0].bmp_buf, 0, stripe_count * clean_entry_bitmap_size);
  299. uint8_t *global_bitmap = (uint8_t*)op_data->stripes[0].bmp_buf;
  300. // We always use at most 1 read request per layer
  301. for (int chain_pos = 0; chain_pos < op_data->chain_size; chain_pos++)
  302. {
  303. uint8_t *part_bitmap = ((uint8_t*)op_data->snapshot_bitmaps) + chain_pos*stripe_count*clean_entry_bitmap_size;
  304. int start = (cur_op->req.rw.offset - op_data->oid.stripe)/bs_bitmap_granularity;
  305. int end = start + cur_op->req.rw.len/bs_bitmap_granularity;
  306. // Skip unneeded part in the beginning
  307. while (start < end && (
  308. ((global_bitmap[start>>3] >> (start&7)) & 1) ||
  309. !((part_bitmap[start>>3] >> (start&7)) & 1)))
  310. {
  311. start++;
  312. }
  313. // Skip unneeded part in the end
  314. while (start < end && (
  315. ((global_bitmap[(end-1)>>3] >> ((end-1)&7)) & 1) ||
  316. !((part_bitmap[(end-1)>>3] >> ((end-1)&7)) & 1)))
  317. {
  318. end--;
  319. }
  320. if (start < end)
  321. {
  322. // Copy (OR) bits in between
  323. int cur = start;
  324. for (; cur < end && (cur & 0x7); cur++)
  325. {
  326. global_bitmap[cur>>3] = global_bitmap[cur>>3] | (part_bitmap[cur>>3] & (1 << (cur&7)));
  327. }
  328. for (; cur <= end-8; cur += 8)
  329. {
  330. global_bitmap[cur>>3] = global_bitmap[cur>>3] | part_bitmap[cur>>3];
  331. }
  332. for (; cur < end; cur++)
  333. {
  334. global_bitmap[cur>>3] = global_bitmap[cur>>3] | (part_bitmap[cur>>3] & (1 << (cur&7)));
  335. }
  336. // Add request
  337. chain_reads.push_back((osd_chain_read_t){
  338. .chain_pos = chain_pos,
  339. .inode = op_data->read_chain[chain_pos],
  340. .offset = start*bs_bitmap_granularity,
  341. .len = (end-start)*bs_bitmap_granularity,
  342. });
  343. }
  344. }
  345. return chain_reads;
  346. }
  347. int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
  348. {
  349. // Decide which parts of which objects we need to read based on bitmaps
  350. osd_primary_op_data_t *op_data = cur_op->op_data;
  351. auto chain_reads = collect_chained_read_requests(cur_op);
  352. int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size);
  353. op_data->chain_read_count = chain_reads.size();
  354. op_data->chain_reads = (osd_chain_read_t*)calloc_or_die(
  355. 1, sizeof(osd_chain_read_t) * chain_reads.size()
  356. + sizeof(osd_rmw_stripe_t) * stripe_count * op_data->chain_size
  357. );
  358. osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(
  359. ((void*)op_data->chain_reads) + sizeof(osd_chain_read_t) * op_data->chain_read_count
  360. );
  361. // Now process each subrequest as a separate read, including reconstruction if needed
  362. // Prepare reads
  363. int n_subops = 0;
  364. uint64_t read_buffer_size = 0;
  365. for (int cri = 0; cri < chain_reads.size(); cri++)
  366. {
  367. op_data->chain_reads[cri] = chain_reads[cri];
  368. object_id cur_oid = { .inode = chain_reads[cri].inode, .stripe = op_data->oid.stripe };
  369. // FIXME: maybe introduce split_read_stripes to shorten these lines and to remove read_start=req_start
  370. osd_rmw_stripe_t *stripes = chain_stripes + chain_reads[cri].chain_pos*stripe_count;
  371. split_stripes(pg.pg_data_size, bs_block_size, chain_reads[cri].offset, chain_reads[cri].len, stripes);
  372. if (op_data->scheme == POOL_SCHEME_REPLICATED && !stripes[0].req_end)
  373. {
  374. continue;
  375. }
  376. for (int role = 0; role < op_data->pg_data_size; role++)
  377. {
  378. stripes[role].read_start = stripes[role].req_start;
  379. stripes[role].read_end = stripes[role].req_end;
  380. }
  381. uint64_t *cur_set = pg.cur_set.data();
  382. if (pg.state != PG_ACTIVE && op_data->scheme != POOL_SCHEME_REPLICATED)
  383. {
  384. pg_osd_set_state_t *object_state;
  385. cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
  386. if (extend_missing_stripes(stripes, cur_set, pg.pg_data_size, pg.pg_size) < 0)
  387. {
  388. free(op_data->chain_reads);
  389. op_data->chain_reads = NULL;
  390. finish_op(cur_op, -EIO);
  391. return -1;
  392. }
  393. op_data->degraded = 1;
  394. }
  395. if (op_data->scheme == POOL_SCHEME_REPLICATED)
  396. {
  397. n_subops++;
  398. read_buffer_size += stripes[0].read_end - stripes[0].read_start;
  399. }
  400. else
  401. {
  402. for (int role = 0; role < pg.pg_size; role++)
  403. {
  404. if (stripes[role].read_end > 0 && cur_set[role] != 0)
  405. n_subops++;
  406. if (stripes[role].read_end > 0)
  407. read_buffer_size += stripes[role].read_end - stripes[role].read_start;
  408. }
  409. }
  410. }
  411. cur_op->buf = memalign_or_die(MEM_ALIGNMENT, read_buffer_size);
  412. void *cur_buf = cur_op->buf;
  413. for (int cri = 0; cri < chain_reads.size(); cri++)
  414. {
  415. osd_rmw_stripe_t *stripes = chain_stripes + chain_reads[cri].chain_pos*stripe_count;
  416. for (int role = 0; role < stripe_count; role++)
  417. {
  418. if (stripes[role].read_end > 0)
  419. {
  420. stripes[role].read_buf = cur_buf;
  421. stripes[role].bmp_buf = op_data->snapshot_bitmaps + (chain_reads[cri].chain_pos*stripe_count + role)*clean_entry_bitmap_size;
  422. cur_buf += stripes[role].read_end - stripes[role].read_start;
  423. }
  424. }
  425. }
  426. // Submit all reads
  427. op_data->fact_ver = UINT64_MAX;
  428. op_data->done = op_data->errors = 0;
  429. op_data->n_subops = n_subops;
  430. if (!n_subops)
  431. {
  432. return 0;
  433. }
  434. op_data->subops = new osd_op_t[n_subops];
  435. int cur_subops = 0;
  436. for (int cri = 0; cri < chain_reads.size(); cri++)
  437. {
  438. osd_rmw_stripe_t *stripes = chain_stripes + chain_reads[cri].chain_pos*stripe_count;
  439. if (op_data->scheme == POOL_SCHEME_REPLICATED && !stripes[0].req_end)
  440. {
  441. continue;
  442. }
  443. object_id cur_oid = { .inode = chain_reads[cri].inode, .stripe = op_data->oid.stripe };
  444. auto vo_it = pg.ver_override.find(cur_oid);
  445. uint64_t target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
  446. uint64_t *cur_set = pg.cur_set.data();
  447. if (pg.state != PG_ACTIVE && op_data->scheme != POOL_SCHEME_REPLICATED)
  448. {
  449. pg_osd_set_state_t *object_state;
  450. cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
  451. }
  452. int zero_read = -1;
  453. if (op_data->scheme == POOL_SCHEME_REPLICATED)
  454. {
  455. for (int role = 0; role < op_data->pg_size; role++)
  456. if (cur_set[role] == this->osd_num || zero_read == -1)
  457. zero_read = role;
  458. }
  459. cur_subops += submit_primary_subop_batch(SUBMIT_READ, chain_reads[cri].inode, target_ver, stripes, cur_set, cur_op, cur_subops, zero_read);
  460. }
  461. assert(cur_subops == n_subops);
  462. return 0;
  463. }
  464. void osd_t::send_chained_read_results(pg_t & pg, osd_op_t *cur_op)
  465. {
  466. osd_primary_op_data_t *op_data = cur_op->op_data;
  467. int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size);
  468. osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(
  469. ((void*)op_data->chain_reads) + sizeof(osd_chain_read_t) * op_data->chain_read_count
  470. );
  471. // Reconstruct parts if needed
  472. if (op_data->degraded)
  473. {
  474. int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size);
  475. for (int cri = 0; cri < op_data->chain_read_count; cri++)
  476. {
  477. // Reconstruct missing stripes
  478. osd_rmw_stripe_t *stripes = chain_stripes + op_data->chain_reads[cri].chain_pos*stripe_count;
  479. if (op_data->scheme == POOL_SCHEME_XOR)
  480. {
  481. reconstruct_stripes_xor(stripes, pg.pg_size, clean_entry_bitmap_size);
  482. }
  483. else if (op_data->scheme == POOL_SCHEME_JERASURE)
  484. {
  485. reconstruct_stripes_jerasure(stripes, pg.pg_size, pg.pg_data_size, clean_entry_bitmap_size);
  486. }
  487. }
  488. }
  489. // Send bitmap
  490. cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size;
  491. cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
  492. // And finally compose the result
  493. uint64_t sent = 0;
  494. int prev_pos = 0, pos = 0;
  495. bool prev_set = false;
  496. int prev = (cur_op->req.rw.offset - op_data->oid.stripe) / bs_bitmap_granularity;
  497. int end = prev + cur_op->req.rw.len/bs_bitmap_granularity;
  498. int cur = prev;
  499. while (cur <= end)
  500. {
  501. bool has_bit = false;
  502. if (cur < end)
  503. {
  504. for (pos = 0; pos < op_data->chain_size; pos++)
  505. {
  506. has_bit = (((uint8_t*)op_data->snapshot_bitmaps)[pos*stripe_count*clean_entry_bitmap_size + cur/8] >> (cur%8)) & 1;
  507. if (has_bit)
  508. break;
  509. }
  510. }
  511. if (has_bit != prev_set || pos != prev_pos || cur == end)
  512. {
  513. if (cur > prev)
  514. {
  515. // Send buffer in parts to avoid copying
  516. if (!prev_set)
  517. {
  518. while ((cur-prev) > zero_buffer_size/bs_bitmap_granularity)
  519. {
  520. cur_op->iov.push_back(zero_buffer, zero_buffer_size);
  521. sent += zero_buffer_size;
  522. prev += zero_buffer_size/bs_bitmap_granularity;
  523. }
  524. cur_op->iov.push_back(zero_buffer, (cur-prev)*bs_bitmap_granularity);
  525. sent += (cur-prev)*bs_bitmap_granularity;
  526. }
  527. else
  528. {
  529. osd_rmw_stripe_t *stripes = chain_stripes + prev_pos*stripe_count;
  530. while (cur > prev)
  531. {
  532. int role = prev*bs_bitmap_granularity/bs_block_size;
  533. int role_start = prev*bs_bitmap_granularity - role*bs_block_size;
  534. int role_end = cur*bs_bitmap_granularity - role*bs_block_size;
  535. if (role_end > bs_block_size)
  536. role_end = bs_block_size;
  537. assert(stripes[role].read_buf);
  538. cur_op->iov.push_back(
  539. stripes[role].read_buf + (role_start - stripes[role].read_start),
  540. role_end - role_start
  541. );
  542. sent += role_end - role_start;
  543. prev += (role_end - role_start)/bs_bitmap_granularity;
  544. }
  545. }
  546. }
  547. prev = cur;
  548. prev_pos = pos;
  549. prev_set = has_bit;
  550. }
  551. cur++;
  552. }
  553. assert(sent == cur_op->req.rw.len);
  554. free(op_data->chain_reads);
  555. op_data->chain_reads = NULL;
  556. }