Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

755 lines
26 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include <stdexcept>
  4. #include <string.h>
  5. #include <assert.h>
  6. #include <jerasure/reed_sol.h>
  7. #include <jerasure.h>
  8. #include <map>
  9. #include "allocator.h"
  10. #include "xor.h"
  11. #include "osd_rmw.h"
  12. #include "malloc_or_die.h"
  13. #define OSD_JERASURE_W 8
  14. static inline void extend_read(uint32_t start, uint32_t end, osd_rmw_stripe_t & stripe)
  15. {
  16. if (stripe.read_end == 0)
  17. {
  18. stripe.read_start = start;
  19. stripe.read_end = end;
  20. }
  21. else
  22. {
  23. if (stripe.read_end < end)
  24. stripe.read_end = end;
  25. if (stripe.read_start > start)
  26. stripe.read_start = start;
  27. }
  28. }
  29. static inline void cover_read(uint32_t start, uint32_t end, osd_rmw_stripe_t & stripe)
  30. {
  31. // Subtract <to> write request from <from> request
  32. if (start >= stripe.req_start &&
  33. end <= stripe.req_end)
  34. {
  35. return;
  36. }
  37. if (start <= stripe.req_start &&
  38. end >= stripe.req_start &&
  39. end <= stripe.req_end)
  40. {
  41. end = stripe.req_start;
  42. }
  43. else if (start >= stripe.req_start &&
  44. start <= stripe.req_end &&
  45. end >= stripe.req_end)
  46. {
  47. start = stripe.req_end;
  48. }
  49. if (stripe.read_end == 0)
  50. {
  51. stripe.read_start = start;
  52. stripe.read_end = end;
  53. }
  54. else
  55. {
  56. if (stripe.read_end < end)
  57. stripe.read_end = end;
  58. if (stripe.read_start > start)
  59. stripe.read_start = start;
  60. }
  61. }
  62. void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint32_t start, uint32_t end, osd_rmw_stripe_t *stripes)
  63. {
  64. if (end == 0)
  65. {
  66. // Zero length request - offset doesn't matter
  67. return;
  68. }
  69. end = start+end;
  70. for (int role = 0; role < pg_minsize; role++)
  71. {
  72. if (start < (1+role)*bs_block_size && end > role*bs_block_size)
  73. {
  74. stripes[role].req_start = start < role*bs_block_size ? 0 : start-role*bs_block_size;
  75. stripes[role].req_end = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size;
  76. }
  77. }
  78. }
  79. void reconstruct_stripes_xor(osd_rmw_stripe_t *stripes, int pg_size, uint32_t bitmap_size)
  80. {
  81. for (int role = 0; role < pg_size; role++)
  82. {
  83. if (stripes[role].read_end != 0 && stripes[role].missing)
  84. {
  85. // Reconstruct missing stripe (XOR k+1)
  86. int prev = -2;
  87. for (int other = 0; other < pg_size; other++)
  88. {
  89. if (other != role)
  90. {
  91. if (prev == -2)
  92. {
  93. prev = other;
  94. }
  95. else if (prev >= 0)
  96. {
  97. assert(stripes[role].read_start >= stripes[prev].read_start &&
  98. stripes[role].read_start >= stripes[other].read_start);
  99. memxor(
  100. stripes[prev].read_buf + (stripes[role].read_start - stripes[prev].read_start),
  101. stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
  102. stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
  103. );
  104. memxor(stripes[prev].bmp_buf, stripes[other].bmp_buf, stripes[role].bmp_buf, bitmap_size);
  105. prev = -1;
  106. }
  107. else
  108. {
  109. assert(stripes[role].read_start >= stripes[other].read_start);
  110. memxor(
  111. stripes[role].read_buf,
  112. stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
  113. stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
  114. );
  115. memxor(stripes[role].bmp_buf, stripes[other].bmp_buf, stripes[role].bmp_buf, bitmap_size);
  116. }
  117. }
  118. }
  119. }
  120. }
  121. }
  122. struct reed_sol_erased_t
  123. {
  124. int *data;
  125. int size;
  126. };
  127. inline bool operator < (const reed_sol_erased_t &a, const reed_sol_erased_t &b)
  128. {
  129. for (int i = 0; i < a.size && i < b.size; i++)
  130. {
  131. if (a.data[i] < b.data[i])
  132. return -1;
  133. else if (a.data[i] > b.data[i])
  134. return 1;
  135. }
  136. return 0;
  137. }
  138. struct reed_sol_matrix_t
  139. {
  140. int refs = 0;
  141. int *data;
  142. std::map<reed_sol_erased_t, int*> decodings;
  143. };
  144. std::map<uint64_t, reed_sol_matrix_t> matrices;
  145. void use_jerasure(int pg_size, int pg_minsize, bool use)
  146. {
  147. uint64_t key = (uint64_t)pg_size | ((uint64_t)pg_minsize) << 32;
  148. auto rs_it = matrices.find(key);
  149. if (rs_it == matrices.end())
  150. {
  151. if (!use)
  152. {
  153. return;
  154. }
  155. int *matrix = reed_sol_vandermonde_coding_matrix(pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W);
  156. matrices[key] = (reed_sol_matrix_t){
  157. .refs = 0,
  158. .data = matrix,
  159. };
  160. rs_it = matrices.find(key);
  161. }
  162. rs_it->second.refs += (!use ? -1 : 1);
  163. if (rs_it->second.refs <= 0)
  164. {
  165. free(rs_it->second.data);
  166. for (auto dec_it = rs_it->second.decodings.begin(); dec_it != rs_it->second.decodings.end();)
  167. {
  168. int *data = dec_it->second;
  169. rs_it->second.decodings.erase(dec_it++);
  170. free(data);
  171. }
  172. matrices.erase(rs_it);
  173. }
  174. }
  175. reed_sol_matrix_t* get_jerasure_matrix(int pg_size, int pg_minsize)
  176. {
  177. uint64_t key = (uint64_t)pg_size | ((uint64_t)pg_minsize) << 32;
  178. auto rs_it = matrices.find(key);
  179. if (rs_it == matrices.end())
  180. {
  181. throw std::runtime_error("jerasure matrix not initialized");
  182. }
  183. return &rs_it->second;
  184. }
  185. // jerasure_matrix_decode() decodes all chunks at once and tries to reencode all missing coding chunks.
  186. // we don't need it. also it makes an extra allocation of int *erased on every call and doesn't cache
  187. // the decoding matrix.
  188. // all these flaws are fixed in this function:
  189. int* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize)
  190. {
  191. int edd = 0;
  192. int erased[pg_size] = { 0 };
  193. for (int i = 0; i < pg_size; i++)
  194. if (stripes[i].read_end == 0 || stripes[i].missing)
  195. erased[i] = 1;
  196. for (int i = 0; i < pg_minsize; i++)
  197. if (stripes[i].read_end != 0 && stripes[i].missing)
  198. edd++;
  199. if (edd == 0)
  200. return NULL;
  201. reed_sol_matrix_t *matrix = get_jerasure_matrix(pg_size, pg_minsize);
  202. auto dec_it = matrix->decodings.find((reed_sol_erased_t){ .data = erased, .size = pg_size });
  203. if (dec_it == matrix->decodings.end())
  204. {
  205. int *dm_ids = (int*)malloc_or_die(sizeof(int)*(pg_minsize + pg_minsize*pg_minsize + pg_size));
  206. int *decoding_matrix = dm_ids + pg_minsize;
  207. if (!dm_ids)
  208. throw std::bad_alloc();
  209. // we always use row_k_ones=1 and w=8 (OSD_JERASURE_W)
  210. if (jerasure_make_decoding_matrix(pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W, matrix->data, erased, decoding_matrix, dm_ids) < 0)
  211. {
  212. free(dm_ids);
  213. throw std::runtime_error("jerasure_make_decoding_matrix() failed");
  214. }
  215. int *erased_copy = dm_ids + pg_minsize + pg_minsize*pg_minsize;
  216. memcpy(erased_copy, erased, pg_size*sizeof(int));
  217. matrix->decodings.emplace((reed_sol_erased_t){ .data = erased_copy, .size = pg_size }, dm_ids);
  218. return dm_ids;
  219. }
  220. return dec_it->second;
  221. }
  222. void reconstruct_stripes_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, uint32_t bitmap_size)
  223. {
  224. int *dm_ids = get_jerasure_decoding_matrix(stripes, pg_size, pg_minsize);
  225. if (!dm_ids)
  226. {
  227. return;
  228. }
  229. int *decoding_matrix = dm_ids + pg_minsize;
  230. char *data_ptrs[pg_size] = { 0 };
  231. for (int role = 0; role < pg_minsize; role++)
  232. {
  233. if (stripes[role].read_end != 0 && stripes[role].missing)
  234. {
  235. if (stripes[role].read_end > stripes[role].read_start)
  236. {
  237. for (int other = 0; other < pg_size; other++)
  238. {
  239. if (stripes[other].read_end != 0 && !stripes[other].missing)
  240. {
  241. assert(stripes[other].read_start <= stripes[role].read_start);
  242. assert(stripes[other].read_end >= stripes[role].read_end);
  243. data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start));
  244. }
  245. }
  246. data_ptrs[role] = (char*)stripes[role].read_buf;
  247. jerasure_matrix_dotprod(
  248. pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
  249. data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start
  250. );
  251. }
  252. for (int other = 0; other < pg_size; other++)
  253. {
  254. if (stripes[other].read_end != 0 && !stripes[other].missing)
  255. {
  256. data_ptrs[other] = (char*)(stripes[other].bmp_buf);
  257. }
  258. }
  259. data_ptrs[role] = (char*)stripes[role].bmp_buf;
  260. jerasure_matrix_dotprod(
  261. pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
  262. data_ptrs, data_ptrs+pg_minsize, bitmap_size
  263. );
  264. }
  265. }
  266. }
  267. int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int pg_minsize, int pg_size)
  268. {
  269. for (int role = 0; role < pg_minsize; role++)
  270. {
  271. if (stripes[role].read_end != 0 && osd_set[role] == 0)
  272. {
  273. stripes[role].missing = true;
  274. // Stripe is missing. Extend read to other stripes.
  275. // We need at least pg_minsize stripes to recover the lost part.
  276. // FIXME: LRC EC and similar don't require to read all other stripes.
  277. int exist = 0;
  278. for (int j = 0; j < pg_size; j++)
  279. {
  280. if (osd_set[j] != 0)
  281. {
  282. extend_read(stripes[role].read_start, stripes[role].read_end, stripes[j]);
  283. exist++;
  284. if (exist >= pg_minsize)
  285. {
  286. break;
  287. }
  288. }
  289. }
  290. if (exist < pg_minsize)
  291. {
  292. // Less than pg_minsize stripes are available for this object
  293. return -1;
  294. }
  295. }
  296. }
  297. return 0;
  298. }
  299. void* alloc_read_buffer(osd_rmw_stripe_t *stripes, int read_pg_size, uint64_t add_size)
  300. {
  301. // Calculate buffer size
  302. uint64_t buf_size = add_size;
  303. for (int role = 0; role < read_pg_size; role++)
  304. {
  305. if (stripes[role].read_end != 0)
  306. {
  307. buf_size += stripes[role].read_end - stripes[role].read_start;
  308. }
  309. }
  310. // Allocate buffer
  311. void *buf = memalign_or_die(MEM_ALIGNMENT, buf_size);
  312. uint64_t buf_pos = add_size;
  313. for (int role = 0; role < read_pg_size; role++)
  314. {
  315. if (stripes[role].read_end != 0)
  316. {
  317. stripes[role].read_buf = buf + buf_pos;
  318. buf_pos += stripes[role].read_end - stripes[role].read_start;
  319. }
  320. }
  321. return buf;
  322. }
  323. void* calc_rmw(void *request_buf, osd_rmw_stripe_t *stripes, uint64_t *read_osd_set,
  324. uint64_t pg_size, uint64_t pg_minsize, uint64_t pg_cursize, uint64_t *write_osd_set,
  325. uint64_t chunk_size, uint32_t bitmap_size)
  326. {
  327. // Generic parity modification (read-modify-write) algorithm
  328. // Read -> Reconstruct missing chunks -> Calc parity chunks -> Write
  329. // Now we always read continuous ranges. This means that an update of the beginning
  330. // of one data stripe and the end of another will lead to a read of full paired stripes.
  331. // FIXME: (Maybe) read small individual ranges in that case instead.
  332. uint32_t start = 0, end = 0;
  333. for (int role = 0; role < pg_minsize; role++)
  334. {
  335. if (stripes[role].req_end != 0)
  336. {
  337. start = !end || stripes[role].req_start < start ? stripes[role].req_start : start;
  338. end = std::max(stripes[role].req_end, end);
  339. stripes[role].write_start = stripes[role].req_start;
  340. stripes[role].write_end = stripes[role].req_end;
  341. }
  342. }
  343. int write_parity = 0;
  344. for (int role = pg_minsize; role < pg_size; role++)
  345. {
  346. if (write_osd_set[role] != 0)
  347. {
  348. write_parity = 1;
  349. if (write_osd_set[role] != read_osd_set[role])
  350. {
  351. start = 0;
  352. end = chunk_size;
  353. for (int r2 = pg_minsize; r2 < role; r2++)
  354. {
  355. stripes[r2].write_start = start;
  356. stripes[r2].write_end = end;
  357. }
  358. }
  359. stripes[role].write_start = start;
  360. stripes[role].write_end = end;
  361. }
  362. }
  363. if (write_parity)
  364. {
  365. for (int role = 0; role < pg_minsize; role++)
  366. {
  367. cover_read(start, end, stripes[role]);
  368. }
  369. }
  370. if (write_osd_set != read_osd_set)
  371. {
  372. pg_cursize = 0;
  373. // Object is degraded/misplaced and will be moved to <write_osd_set>
  374. for (int role = 0; role < pg_size; role++)
  375. {
  376. if (role < pg_minsize && write_osd_set[role] != read_osd_set[role] && write_osd_set[role] != 0)
  377. {
  378. // We need to get data for any moved / recovered chunk
  379. // And we need a continuous write buffer so we'll only optimize
  380. // for the case when the whole chunk is ovewritten in the request
  381. if (stripes[role].req_start != 0 ||
  382. stripes[role].req_end != chunk_size)
  383. {
  384. stripes[role].read_start = 0;
  385. stripes[role].read_end = chunk_size;
  386. // Warning: We don't modify write_start/write_end here, we do it in calc_rmw_parity()
  387. }
  388. }
  389. if (read_osd_set[role] != 0)
  390. {
  391. pg_cursize++;
  392. }
  393. }
  394. }
  395. if (pg_cursize < pg_size)
  396. {
  397. // Some stripe(s) are missing, so we need to read parity
  398. for (int role = 0; role < pg_size; role++)
  399. {
  400. if (read_osd_set[role] == 0)
  401. {
  402. stripes[role].missing = true;
  403. if (stripes[role].read_end != 0)
  404. {
  405. int found = 0;
  406. for (int r2 = 0; r2 < pg_size && found < pg_minsize; r2++)
  407. {
  408. // Read the non-covered range of <role> from at least <minsize> other stripes to reconstruct it
  409. if (read_osd_set[r2] != 0)
  410. {
  411. extend_read(stripes[role].read_start, stripes[role].read_end, stripes[r2]);
  412. found++;
  413. }
  414. }
  415. if (found < pg_minsize)
  416. {
  417. // Object is incomplete - refuse partial overwrite
  418. return NULL;
  419. }
  420. }
  421. }
  422. }
  423. }
  424. // Allocate read buffers
  425. void *rmw_buf = alloc_read_buffer(stripes, pg_size, (write_parity ? pg_size-pg_minsize : 0) * (end - start));
  426. // Position write buffers
  427. uint64_t buf_pos = 0, in_pos = 0;
  428. for (int role = 0; role < pg_size; role++)
  429. {
  430. if (stripes[role].req_end != 0)
  431. {
  432. stripes[role].write_buf = request_buf + in_pos;
  433. in_pos += stripes[role].req_end - stripes[role].req_start;
  434. }
  435. else if (role >= pg_minsize && write_osd_set[role] != 0 && end != 0)
  436. {
  437. stripes[role].write_buf = rmw_buf + buf_pos;
  438. buf_pos += end - start;
  439. }
  440. }
  441. return rmw_buf;
  442. }
  443. static void get_old_new_buffers(osd_rmw_stripe_t & stripe, uint32_t wr_start, uint32_t wr_end, buf_len_t *bufs, int & nbufs)
  444. {
  445. uint32_t ns = 0, ne = 0, os = 0, oe = 0;
  446. if (stripe.req_end > wr_start &&
  447. stripe.req_start < wr_end)
  448. {
  449. ns = std::max(stripe.req_start, wr_start);
  450. ne = std::min(stripe.req_end, wr_end);
  451. }
  452. if (stripe.read_end > wr_start &&
  453. stripe.read_start < wr_end)
  454. {
  455. os = std::max(stripe.read_start, wr_start);
  456. oe = std::min(stripe.read_end, wr_end);
  457. }
  458. if (ne && (!oe || ns <= os))
  459. {
  460. // NEW or NEW->OLD
  461. bufs[nbufs++] = { .buf = stripe.write_buf + ns - stripe.req_start, .len = ne-ns };
  462. if (os < ne)
  463. os = ne;
  464. if (oe > os)
  465. {
  466. // NEW->OLD
  467. bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = oe-os };
  468. }
  469. }
  470. else if (oe)
  471. {
  472. // OLD or OLD->NEW or OLD->NEW->OLD
  473. if (ne)
  474. {
  475. // OLD->NEW or OLD->NEW->OLD
  476. bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = ns-os };
  477. bufs[nbufs++] = { .buf = stripe.write_buf + ns - stripe.req_start, .len = ne-ns };
  478. if (oe > ne)
  479. {
  480. // OLD->NEW->OLD
  481. bufs[nbufs++] = { .buf = stripe.read_buf + ne - stripe.read_start, .len = oe-ne };
  482. }
  483. }
  484. else
  485. {
  486. // OLD
  487. bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = oe-os };
  488. }
  489. }
  490. }
  491. static void xor_multiple_buffers(buf_len_t *xor1, int n1, buf_len_t *xor2, int n2, void *dest, uint32_t len)
  492. {
  493. assert(n1 > 0 && n2 > 0);
  494. int i1 = 0, i2 = 0;
  495. uint32_t start1 = 0, start2 = 0, end1 = xor1[0].len, end2 = xor2[0].len;
  496. uint32_t pos = 0;
  497. while (pos < len)
  498. {
  499. // We know for sure that ranges overlap
  500. uint32_t end = std::min(end1, end2);
  501. memxor(xor1[i1].buf + pos-start1, xor2[i2].buf + pos-start2, dest+pos, end-pos);
  502. pos = end;
  503. if (pos >= end1)
  504. {
  505. i1++;
  506. if (i1 >= n1)
  507. {
  508. assert(pos >= end2);
  509. return;
  510. }
  511. start1 = end1;
  512. end1 += xor1[i1].len;
  513. }
  514. if (pos >= end2)
  515. {
  516. i2++;
  517. start2 = end2;
  518. end2 += xor2[i2].len;
  519. }
  520. }
  521. }
  522. static void calc_rmw_parity_copy_mod(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
  523. uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t bitmap_granularity,
  524. uint32_t &start, uint32_t &end)
  525. {
  526. if (write_osd_set[pg_minsize] != 0 || write_osd_set != read_osd_set)
  527. {
  528. // start & end are required for calc_rmw_parity
  529. for (int role = 0; role < pg_minsize; role++)
  530. {
  531. if (stripes[role].req_end != 0)
  532. {
  533. start = !end || stripes[role].req_start < start ? stripes[role].req_start : start;
  534. end = std::max(stripes[role].req_end, end);
  535. }
  536. }
  537. for (int role = pg_minsize; role < pg_size; role++)
  538. {
  539. if (write_osd_set[role] != 0 && write_osd_set[role] != read_osd_set[role])
  540. {
  541. start = 0;
  542. end = chunk_size;
  543. }
  544. }
  545. }
  546. // Set bitmap bits accordingly
  547. if (bitmap_granularity > 0)
  548. {
  549. for (int role = 0; role < pg_minsize; role++)
  550. {
  551. if (stripes[role].req_end != 0)
  552. {
  553. bitmap_set(
  554. stripes[role].bmp_buf, stripes[role].req_start,
  555. stripes[role].req_end-stripes[role].req_start, bitmap_granularity
  556. );
  557. }
  558. }
  559. }
  560. if (write_osd_set != read_osd_set)
  561. {
  562. for (int role = 0; role < pg_minsize; role++)
  563. {
  564. if (write_osd_set[role] != read_osd_set[role] && write_osd_set[role] != 0 &&
  565. (stripes[role].req_start != 0 || stripes[role].req_end != chunk_size))
  566. {
  567. // Copy modified chunk into the read buffer to write it back
  568. memcpy(
  569. stripes[role].read_buf + stripes[role].req_start,
  570. stripes[role].write_buf,
  571. stripes[role].req_end - stripes[role].req_start
  572. );
  573. stripes[role].write_buf = stripes[role].read_buf;
  574. stripes[role].write_start = 0;
  575. stripes[role].write_end = chunk_size;
  576. }
  577. }
  578. }
  579. }
  580. static void calc_rmw_parity_copy_parity(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
  581. uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t start, uint32_t end)
  582. {
  583. if (write_osd_set != read_osd_set)
  584. {
  585. for (int role = pg_minsize; role < pg_size; role++)
  586. {
  587. if (write_osd_set[role] != read_osd_set[role] && (start != 0 || end != chunk_size))
  588. {
  589. // Copy new parity into the read buffer to write it back
  590. memcpy(
  591. stripes[role].read_buf + start,
  592. stripes[role].write_buf,
  593. end - start
  594. );
  595. stripes[role].write_buf = stripes[role].read_buf;
  596. stripes[role].write_start = 0;
  597. stripes[role].write_end = chunk_size;
  598. }
  599. }
  600. }
  601. #ifdef RMW_DEBUG
  602. printf("calc_rmw_parity:\n");
  603. for (int role = 0; role < pg_size; role++)
  604. {
  605. auto & s = stripes[role];
  606. printf(
  607. "Tr=%lu Tw=%lu Q=%x-%x R=%x-%x W=%x-%x Rb=%lx Wb=%lx\n",
  608. read_osd_set[role], write_osd_set[role],
  609. s.req_start, s.req_end,
  610. s.read_start, s.read_end,
  611. s.write_start, s.write_end,
  612. (uint64_t)s.read_buf,
  613. (uint64_t)s.write_buf
  614. );
  615. }
  616. #endif
  617. }
  618. void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_osd_set, uint64_t *write_osd_set,
  619. uint32_t chunk_size, uint32_t bitmap_size)
  620. {
  621. uint32_t bitmap_granularity = bitmap_size > 0 ? chunk_size / bitmap_size / 8 : 0;
  622. int pg_minsize = pg_size-1;
  623. reconstruct_stripes_xor(stripes, pg_size, bitmap_size);
  624. uint32_t start = 0, end = 0;
  625. calc_rmw_parity_copy_mod(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, bitmap_granularity, start, end);
  626. if (write_osd_set[pg_minsize] != 0 && end != 0)
  627. {
  628. // Calculate new parity (XOR k+1)
  629. int parity = pg_minsize, prev = -2;
  630. for (int other = 0; other < pg_minsize; other++)
  631. {
  632. if (prev == -2)
  633. {
  634. prev = other;
  635. }
  636. else
  637. {
  638. int n1 = 0, n2 = 0;
  639. buf_len_t xor1[3], xor2[3];
  640. if (prev == -1)
  641. {
  642. xor1[n1++] = { .buf = stripes[parity].write_buf, .len = end-start };
  643. memxor(stripes[parity].bmp_buf, stripes[other].bmp_buf, stripes[parity].bmp_buf, bitmap_size);
  644. }
  645. else
  646. {
  647. memxor(stripes[prev].bmp_buf, stripes[other].bmp_buf, stripes[parity].bmp_buf, bitmap_size);
  648. get_old_new_buffers(stripes[prev], start, end, xor1, n1);
  649. prev = -1;
  650. }
  651. get_old_new_buffers(stripes[other], start, end, xor2, n2);
  652. xor_multiple_buffers(xor1, n1, xor2, n2, stripes[parity].write_buf, end-start);
  653. }
  654. }
  655. }
  656. calc_rmw_parity_copy_parity(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end);
  657. }
  658. void calc_rmw_parity_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
  659. uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t bitmap_size)
  660. {
  661. uint32_t bitmap_granularity = bitmap_size > 0 ? chunk_size / bitmap_size / 8 : 0;
  662. reed_sol_matrix_t *matrix = get_jerasure_matrix(pg_size, pg_minsize);
  663. reconstruct_stripes_jerasure(stripes, pg_size, pg_minsize, bitmap_size);
  664. uint32_t start = 0, end = 0;
  665. calc_rmw_parity_copy_mod(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, bitmap_granularity, start, end);
  666. if (end != 0)
  667. {
  668. int i;
  669. for (i = pg_minsize; i < pg_size; i++)
  670. {
  671. if (write_osd_set[i] != 0)
  672. break;
  673. }
  674. if (i < pg_size)
  675. {
  676. // Calculate new coding chunks
  677. buf_len_t bufs[pg_size][3];
  678. int nbuf[pg_size] = { 0 }, curbuf[pg_size] = { 0 };
  679. uint32_t positions[pg_size];
  680. void *data_ptrs[pg_size] = { 0 };
  681. for (int i = 0; i < pg_minsize; i++)
  682. {
  683. get_old_new_buffers(stripes[i], start, end, bufs[i], nbuf[i]);
  684. positions[i] = start;
  685. }
  686. for (int i = pg_minsize; i < pg_size; i++)
  687. {
  688. bufs[i][nbuf[i]++] = { .buf = stripes[i].write_buf, .len = end-start };
  689. positions[i] = start;
  690. }
  691. uint32_t pos = start;
  692. while (pos < end)
  693. {
  694. uint32_t next_end = end;
  695. for (int i = 0; i < pg_size; i++)
  696. {
  697. assert(curbuf[i] < nbuf[i]);
  698. assert(bufs[i][curbuf[i]].buf);
  699. data_ptrs[i] = bufs[i][curbuf[i]].buf + pos-positions[i];
  700. uint32_t this_end = bufs[i][curbuf[i]].len + positions[i];
  701. if (next_end > this_end)
  702. next_end = this_end;
  703. }
  704. assert(next_end > pos);
  705. for (int i = 0; i < pg_size; i++)
  706. {
  707. uint32_t this_end = bufs[i][curbuf[i]].len + positions[i];
  708. if (next_end >= this_end)
  709. {
  710. positions[i] += bufs[i][curbuf[i]].len;
  711. curbuf[i]++;
  712. }
  713. }
  714. jerasure_matrix_encode(
  715. pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W, matrix->data,
  716. (char**)data_ptrs, (char**)data_ptrs+pg_minsize, next_end-pos
  717. );
  718. pos = next_end;
  719. }
  720. for (int i = 0; i < pg_size; i++)
  721. {
  722. data_ptrs[i] = stripes[i].bmp_buf;
  723. }
  724. jerasure_matrix_encode(
  725. pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W, matrix->data,
  726. (char**)data_ptrs, (char**)data_ptrs+pg_minsize, bitmap_size
  727. );
  728. }
  729. }
  730. calc_rmw_parity_copy_parity(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end);
  731. }