Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

752 lines
26 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include <stdexcept>
  4. #include <string.h>
  5. #include <assert.h>
  6. #include <jerasure/reed_sol.h>
  7. #include <jerasure.h>
  8. #include <map>
  9. #include "allocator.h"
  10. #include "xor.h"
  11. #include "osd_rmw.h"
  12. #include "malloc_or_die.h"
  13. #define OSD_JERASURE_W 32
  14. static inline void extend_read(uint32_t start, uint32_t end, osd_rmw_stripe_t & stripe)
  15. {
  16. if (stripe.read_end == 0)
  17. {
  18. stripe.read_start = start;
  19. stripe.read_end = end;
  20. }
  21. else
  22. {
  23. if (stripe.read_end < end)
  24. stripe.read_end = end;
  25. if (stripe.read_start > start)
  26. stripe.read_start = start;
  27. }
  28. }
  29. static inline void cover_read(uint32_t start, uint32_t end, osd_rmw_stripe_t & stripe)
  30. {
  31. // Subtract <to> write request from <from> request
  32. if (start >= stripe.req_start &&
  33. end <= stripe.req_end)
  34. {
  35. return;
  36. }
  37. if (start <= stripe.req_start &&
  38. end >= stripe.req_start &&
  39. end <= stripe.req_end)
  40. {
  41. end = stripe.req_start;
  42. }
  43. else if (start >= stripe.req_start &&
  44. start <= stripe.req_end &&
  45. end >= stripe.req_end)
  46. {
  47. start = stripe.req_end;
  48. }
  49. if (stripe.read_end == 0)
  50. {
  51. stripe.read_start = start;
  52. stripe.read_end = end;
  53. }
  54. else
  55. {
  56. if (stripe.read_end < end)
  57. stripe.read_end = end;
  58. if (stripe.read_start > start)
  59. stripe.read_start = start;
  60. }
  61. }
  62. void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint32_t start, uint32_t end, osd_rmw_stripe_t *stripes)
  63. {
  64. if (end == 0)
  65. {
  66. // Zero length request - offset doesn't matter
  67. return;
  68. }
  69. end = start+end;
  70. for (int role = 0; role < pg_minsize; role++)
  71. {
  72. if (start < (1+role)*bs_block_size && end > role*bs_block_size)
  73. {
  74. stripes[role].req_start = start < role*bs_block_size ? 0 : start-role*bs_block_size;
  75. stripes[role].req_end = end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size;
  76. }
  77. }
  78. }
  79. void reconstruct_stripes_xor(osd_rmw_stripe_t *stripes, int pg_size, uint32_t bitmap_size)
  80. {
  81. for (int role = 0; role < pg_size; role++)
  82. {
  83. if (stripes[role].read_end != 0 && stripes[role].missing)
  84. {
  85. // Reconstruct missing stripe (XOR k+1)
  86. int prev = -2;
  87. for (int other = 0; other < pg_size; other++)
  88. {
  89. if (other != role)
  90. {
  91. if (prev == -2)
  92. {
  93. prev = other;
  94. }
  95. else if (prev >= 0)
  96. {
  97. assert(stripes[role].read_start >= stripes[prev].read_start &&
  98. stripes[role].read_start >= stripes[other].read_start);
  99. memxor(
  100. stripes[prev].read_buf + (stripes[role].read_start - stripes[prev].read_start),
  101. stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
  102. stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
  103. );
  104. memxor(stripes[prev].bmp_buf, stripes[other].bmp_buf, stripes[role].bmp_buf, bitmap_size);
  105. prev = -1;
  106. }
  107. else
  108. {
  109. assert(stripes[role].read_start >= stripes[other].read_start);
  110. memxor(
  111. stripes[role].read_buf,
  112. stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start),
  113. stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start
  114. );
  115. memxor(stripes[role].bmp_buf, stripes[other].bmp_buf, stripes[role].bmp_buf, bitmap_size);
  116. }
  117. }
  118. }
  119. }
  120. }
  121. }
  122. struct reed_sol_erased_t
  123. {
  124. int *data;
  125. int size;
  126. };
  127. inline bool operator < (const reed_sol_erased_t &a, const reed_sol_erased_t &b)
  128. {
  129. for (int i = 0; i < a.size && i < b.size; i++)
  130. {
  131. if (a.data[i] < b.data[i])
  132. return -1;
  133. else if (a.data[i] > b.data[i])
  134. return 1;
  135. }
  136. return 0;
  137. }
  138. struct reed_sol_matrix_t
  139. {
  140. int refs = 0;
  141. int *data;
  142. std::map<reed_sol_erased_t, int*> decodings;
  143. };
  144. std::map<uint64_t, reed_sol_matrix_t> matrices;
  145. void use_jerasure(int pg_size, int pg_minsize, bool use)
  146. {
  147. uint64_t key = (uint64_t)pg_size | ((uint64_t)pg_minsize) << 32;
  148. auto rs_it = matrices.find(key);
  149. if (rs_it == matrices.end())
  150. {
  151. if (!use)
  152. {
  153. return;
  154. }
  155. int *matrix = reed_sol_vandermonde_coding_matrix(pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W);
  156. matrices[key] = (reed_sol_matrix_t){
  157. .refs = 0,
  158. .data = matrix,
  159. };
  160. rs_it = matrices.find(key);
  161. }
  162. rs_it->second.refs += (!use ? -1 : 1);
  163. if (rs_it->second.refs <= 0)
  164. {
  165. free(rs_it->second.data);
  166. for (auto dec_it = rs_it->second.decodings.begin(); dec_it != rs_it->second.decodings.end();)
  167. {
  168. int *data = dec_it->second;
  169. rs_it->second.decodings.erase(dec_it++);
  170. free(data);
  171. }
  172. matrices.erase(rs_it);
  173. }
  174. }
  175. reed_sol_matrix_t* get_jerasure_matrix(int pg_size, int pg_minsize)
  176. {
  177. uint64_t key = (uint64_t)pg_size | ((uint64_t)pg_minsize) << 32;
  178. auto rs_it = matrices.find(key);
  179. if (rs_it == matrices.end())
  180. {
  181. throw std::runtime_error("jerasure matrix not initialized");
  182. }
  183. return &rs_it->second;
  184. }
  185. // jerasure_matrix_decode() decodes all chunks at once and tries to reencode all missing coding chunks.
  186. // we don't need it. also it makes an extra allocation of int *erased on every call and doesn't cache
  187. // the decoding matrix.
  188. // all these flaws are fixed in this function:
  189. int* get_jerasure_decoding_matrix(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize)
  190. {
  191. int edd = 0;
  192. int erased[pg_size] = { 0 };
  193. for (int i = 0; i < pg_size; i++)
  194. if (stripes[i].read_end == 0 || stripes[i].missing)
  195. erased[i] = 1;
  196. for (int i = 0; i < pg_minsize; i++)
  197. if (stripes[i].read_end != 0 && stripes[i].missing)
  198. edd++;
  199. if (edd == 0)
  200. return NULL;
  201. reed_sol_matrix_t *matrix = get_jerasure_matrix(pg_size, pg_minsize);
  202. auto dec_it = matrix->decodings.find((reed_sol_erased_t){ .data = erased, .size = pg_size });
  203. if (dec_it == matrix->decodings.end())
  204. {
  205. int *dm_ids = (int*)malloc_or_die(sizeof(int)*(pg_minsize + pg_minsize*pg_minsize + pg_size));
  206. int *decoding_matrix = dm_ids + pg_minsize;
  207. if (!dm_ids)
  208. throw std::bad_alloc();
  209. // we always use row_k_ones=1 and w=8 (OSD_JERASURE_W)
  210. if (jerasure_make_decoding_matrix(pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W, matrix->data, erased, decoding_matrix, dm_ids) < 0)
  211. {
  212. free(dm_ids);
  213. throw std::runtime_error("jerasure_make_decoding_matrix() failed");
  214. }
  215. int *erased_copy = dm_ids + pg_minsize + pg_minsize*pg_minsize;
  216. memcpy(erased_copy, erased, pg_size*sizeof(int));
  217. matrix->decodings.emplace((reed_sol_erased_t){ .data = erased_copy, .size = pg_size }, dm_ids);
  218. return dm_ids;
  219. }
  220. return dec_it->second;
  221. }
  222. void reconstruct_stripes_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize, uint32_t bitmap_size)
  223. {
  224. int *dm_ids = get_jerasure_decoding_matrix(stripes, pg_size, pg_minsize);
  225. if (!dm_ids)
  226. {
  227. return;
  228. }
  229. int *decoding_matrix = dm_ids + pg_minsize;
  230. char *data_ptrs[pg_size] = { 0 };
  231. for (int role = 0; role < pg_minsize; role++)
  232. {
  233. if (stripes[role].read_end != 0 && stripes[role].missing)
  234. {
  235. for (int other = 0; other < pg_size; other++)
  236. {
  237. if (stripes[other].read_end != 0 && !stripes[other].missing)
  238. {
  239. assert(stripes[other].read_start <= stripes[role].read_start);
  240. assert(stripes[other].read_end >= stripes[role].read_end);
  241. data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start));
  242. }
  243. }
  244. data_ptrs[role] = (char*)stripes[role].read_buf;
  245. jerasure_matrix_dotprod(
  246. pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
  247. data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start
  248. );
  249. for (int other = 0; other < pg_size; other++)
  250. {
  251. if (stripes[other].read_end != 0 && !stripes[other].missing)
  252. {
  253. data_ptrs[other] = (char*)(stripes[other].bmp_buf);
  254. }
  255. }
  256. data_ptrs[role] = (char*)stripes[role].bmp_buf;
  257. jerasure_matrix_dotprod(
  258. pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
  259. data_ptrs, data_ptrs+pg_minsize, bitmap_size
  260. );
  261. }
  262. }
  263. }
  264. int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int pg_minsize, int pg_size)
  265. {
  266. for (int role = 0; role < pg_minsize; role++)
  267. {
  268. if (stripes[role].read_end != 0 && osd_set[role] == 0)
  269. {
  270. stripes[role].missing = true;
  271. // Stripe is missing. Extend read to other stripes.
  272. // We need at least pg_minsize stripes to recover the lost part.
  273. // FIXME: LRC EC and similar don't require to read all other stripes.
  274. int exist = 0;
  275. for (int j = 0; j < pg_size; j++)
  276. {
  277. if (osd_set[j] != 0)
  278. {
  279. extend_read(stripes[role].read_start, stripes[role].read_end, stripes[j]);
  280. exist++;
  281. if (exist >= pg_minsize)
  282. {
  283. break;
  284. }
  285. }
  286. }
  287. if (exist < pg_minsize)
  288. {
  289. // Less than pg_minsize stripes are available for this object
  290. return -1;
  291. }
  292. }
  293. }
  294. return 0;
  295. }
  296. void* alloc_read_buffer(osd_rmw_stripe_t *stripes, int read_pg_size, uint64_t add_size)
  297. {
  298. // Calculate buffer size
  299. uint64_t buf_size = add_size;
  300. for (int role = 0; role < read_pg_size; role++)
  301. {
  302. if (stripes[role].read_end != 0)
  303. {
  304. buf_size += stripes[role].read_end - stripes[role].read_start;
  305. }
  306. }
  307. // Allocate buffer
  308. void *buf = memalign_or_die(MEM_ALIGNMENT, buf_size);
  309. uint64_t buf_pos = add_size;
  310. for (int role = 0; role < read_pg_size; role++)
  311. {
  312. if (stripes[role].read_end != 0)
  313. {
  314. stripes[role].read_buf = buf + buf_pos;
  315. buf_pos += stripes[role].read_end - stripes[role].read_start;
  316. }
  317. }
  318. return buf;
  319. }
  320. void* calc_rmw(void *request_buf, osd_rmw_stripe_t *stripes, uint64_t *read_osd_set,
  321. uint64_t pg_size, uint64_t pg_minsize, uint64_t pg_cursize, uint64_t *write_osd_set,
  322. uint64_t chunk_size, uint32_t bitmap_size)
  323. {
  324. // Generic parity modification (read-modify-write) algorithm
  325. // Read -> Reconstruct missing chunks -> Calc parity chunks -> Write
  326. // Now we always read continuous ranges. This means that an update of the beginning
  327. // of one data stripe and the end of another will lead to a read of full paired stripes.
  328. // FIXME: (Maybe) read small individual ranges in that case instead.
  329. uint32_t start = 0, end = 0;
  330. for (int role = 0; role < pg_minsize; role++)
  331. {
  332. if (stripes[role].req_end != 0)
  333. {
  334. start = !end || stripes[role].req_start < start ? stripes[role].req_start : start;
  335. end = std::max(stripes[role].req_end, end);
  336. stripes[role].write_start = stripes[role].req_start;
  337. stripes[role].write_end = stripes[role].req_end;
  338. }
  339. }
  340. int write_parity = 0;
  341. for (int role = pg_minsize; role < pg_size; role++)
  342. {
  343. if (write_osd_set[role] != 0)
  344. {
  345. write_parity = 1;
  346. if (write_osd_set[role] != read_osd_set[role])
  347. {
  348. start = 0;
  349. end = chunk_size;
  350. for (int r2 = pg_minsize; r2 < role; r2++)
  351. {
  352. stripes[r2].write_start = start;
  353. stripes[r2].write_end = end;
  354. }
  355. }
  356. stripes[role].write_start = start;
  357. stripes[role].write_end = end;
  358. }
  359. }
  360. if (write_parity)
  361. {
  362. for (int role = 0; role < pg_minsize; role++)
  363. {
  364. cover_read(start, end, stripes[role]);
  365. }
  366. }
  367. if (write_osd_set != read_osd_set)
  368. {
  369. pg_cursize = 0;
  370. // Object is degraded/misplaced and will be moved to <write_osd_set>
  371. for (int role = 0; role < pg_size; role++)
  372. {
  373. if (role < pg_minsize && write_osd_set[role] != read_osd_set[role] && write_osd_set[role] != 0)
  374. {
  375. // We need to get data for any moved / recovered chunk
  376. // And we need a continuous write buffer so we'll only optimize
  377. // for the case when the whole chunk is ovewritten in the request
  378. if (stripes[role].req_start != 0 ||
  379. stripes[role].req_end != chunk_size)
  380. {
  381. stripes[role].read_start = 0;
  382. stripes[role].read_end = chunk_size;
  383. // Warning: We don't modify write_start/write_end here, we do it in calc_rmw_parity()
  384. }
  385. }
  386. if (read_osd_set[role] != 0)
  387. {
  388. pg_cursize++;
  389. }
  390. }
  391. }
  392. if (pg_cursize < pg_size)
  393. {
  394. // Some stripe(s) are missing, so we need to read parity
  395. for (int role = 0; role < pg_size; role++)
  396. {
  397. if (read_osd_set[role] == 0)
  398. {
  399. stripes[role].missing = true;
  400. if (stripes[role].read_end != 0)
  401. {
  402. int found = 0;
  403. for (int r2 = 0; r2 < pg_size && found < pg_minsize; r2++)
  404. {
  405. // Read the non-covered range of <role> from at least <minsize> other stripes to reconstruct it
  406. if (read_osd_set[r2] != 0)
  407. {
  408. extend_read(stripes[role].read_start, stripes[role].read_end, stripes[r2]);
  409. found++;
  410. }
  411. }
  412. if (found < pg_minsize)
  413. {
  414. // Object is incomplete - refuse partial overwrite
  415. return NULL;
  416. }
  417. }
  418. }
  419. }
  420. }
  421. // Allocate read buffers
  422. void *rmw_buf = alloc_read_buffer(stripes, pg_size, (write_parity ? pg_size-pg_minsize : 0) * (end - start));
  423. // Position write buffers
  424. uint64_t buf_pos = 0, in_pos = 0;
  425. for (int role = 0; role < pg_size; role++)
  426. {
  427. if (stripes[role].req_end != 0)
  428. {
  429. stripes[role].write_buf = request_buf + in_pos;
  430. in_pos += stripes[role].req_end - stripes[role].req_start;
  431. }
  432. else if (role >= pg_minsize && write_osd_set[role] != 0 && end != 0)
  433. {
  434. stripes[role].write_buf = rmw_buf + buf_pos;
  435. buf_pos += end - start;
  436. }
  437. }
  438. return rmw_buf;
  439. }
  440. static void get_old_new_buffers(osd_rmw_stripe_t & stripe, uint32_t wr_start, uint32_t wr_end, buf_len_t *bufs, int & nbufs)
  441. {
  442. uint32_t ns = 0, ne = 0, os = 0, oe = 0;
  443. if (stripe.req_end > wr_start &&
  444. stripe.req_start < wr_end)
  445. {
  446. ns = std::max(stripe.req_start, wr_start);
  447. ne = std::min(stripe.req_end, wr_end);
  448. }
  449. if (stripe.read_end > wr_start &&
  450. stripe.read_start < wr_end)
  451. {
  452. os = std::max(stripe.read_start, wr_start);
  453. oe = std::min(stripe.read_end, wr_end);
  454. }
  455. if (ne && (!oe || ns <= os))
  456. {
  457. // NEW or NEW->OLD
  458. bufs[nbufs++] = { .buf = stripe.write_buf + ns - stripe.req_start, .len = ne-ns };
  459. if (os < ne)
  460. os = ne;
  461. if (oe > os)
  462. {
  463. // NEW->OLD
  464. bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = oe-os };
  465. }
  466. }
  467. else if (oe)
  468. {
  469. // OLD or OLD->NEW or OLD->NEW->OLD
  470. if (ne)
  471. {
  472. // OLD->NEW or OLD->NEW->OLD
  473. bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = ns-os };
  474. bufs[nbufs++] = { .buf = stripe.write_buf + ns - stripe.req_start, .len = ne-ns };
  475. if (oe > ne)
  476. {
  477. // OLD->NEW->OLD
  478. bufs[nbufs++] = { .buf = stripe.read_buf + ne - stripe.read_start, .len = oe-ne };
  479. }
  480. }
  481. else
  482. {
  483. // OLD
  484. bufs[nbufs++] = { .buf = stripe.read_buf + os - stripe.read_start, .len = oe-os };
  485. }
  486. }
  487. }
  488. static void xor_multiple_buffers(buf_len_t *xor1, int n1, buf_len_t *xor2, int n2, void *dest, uint32_t len)
  489. {
  490. assert(n1 > 0 && n2 > 0);
  491. int i1 = 0, i2 = 0;
  492. uint32_t start1 = 0, start2 = 0, end1 = xor1[0].len, end2 = xor2[0].len;
  493. uint32_t pos = 0;
  494. while (pos < len)
  495. {
  496. // We know for sure that ranges overlap
  497. uint32_t end = std::min(end1, end2);
  498. memxor(xor1[i1].buf + pos-start1, xor2[i2].buf + pos-start2, dest+pos, end-pos);
  499. pos = end;
  500. if (pos >= end1)
  501. {
  502. i1++;
  503. if (i1 >= n1)
  504. {
  505. assert(pos >= end2);
  506. return;
  507. }
  508. start1 = end1;
  509. end1 += xor1[i1].len;
  510. }
  511. if (pos >= end2)
  512. {
  513. i2++;
  514. start2 = end2;
  515. end2 += xor2[i2].len;
  516. }
  517. }
  518. }
  519. static void calc_rmw_parity_copy_mod(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
  520. uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t bitmap_granularity,
  521. uint32_t &start, uint32_t &end)
  522. {
  523. if (write_osd_set[pg_minsize] != 0 || write_osd_set != read_osd_set)
  524. {
  525. // start & end are required for calc_rmw_parity
  526. for (int role = 0; role < pg_minsize; role++)
  527. {
  528. if (stripes[role].req_end != 0)
  529. {
  530. start = !end || stripes[role].req_start < start ? stripes[role].req_start : start;
  531. end = std::max(stripes[role].req_end, end);
  532. }
  533. }
  534. for (int role = pg_minsize; role < pg_size; role++)
  535. {
  536. if (write_osd_set[role] != 0 && write_osd_set[role] != read_osd_set[role])
  537. {
  538. start = 0;
  539. end = chunk_size;
  540. }
  541. }
  542. }
  543. // Set bitmap bits accordingly
  544. if (bitmap_granularity > 0)
  545. {
  546. for (int role = 0; role < pg_minsize; role++)
  547. {
  548. if (stripes[role].req_end != 0)
  549. {
  550. bitmap_set(
  551. stripes[role].bmp_buf, stripes[role].req_start,
  552. stripes[role].req_end-stripes[role].req_start, bitmap_granularity
  553. );
  554. }
  555. }
  556. }
  557. if (write_osd_set != read_osd_set)
  558. {
  559. for (int role = 0; role < pg_minsize; role++)
  560. {
  561. if (write_osd_set[role] != read_osd_set[role] && write_osd_set[role] != 0 &&
  562. (stripes[role].req_start != 0 || stripes[role].req_end != chunk_size))
  563. {
  564. // Copy modified chunk into the read buffer to write it back
  565. memcpy(
  566. stripes[role].read_buf + stripes[role].req_start,
  567. stripes[role].write_buf,
  568. stripes[role].req_end - stripes[role].req_start
  569. );
  570. stripes[role].write_buf = stripes[role].read_buf;
  571. stripes[role].write_start = 0;
  572. stripes[role].write_end = chunk_size;
  573. }
  574. }
  575. }
  576. }
  577. static void calc_rmw_parity_copy_parity(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
  578. uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t start, uint32_t end)
  579. {
  580. if (write_osd_set != read_osd_set)
  581. {
  582. for (int role = pg_minsize; role < pg_size; role++)
  583. {
  584. if (write_osd_set[role] != read_osd_set[role] && (start != 0 || end != chunk_size))
  585. {
  586. // Copy new parity into the read buffer to write it back
  587. memcpy(
  588. stripes[role].read_buf + start,
  589. stripes[role].write_buf,
  590. end - start
  591. );
  592. stripes[role].write_buf = stripes[role].read_buf;
  593. stripes[role].write_start = 0;
  594. stripes[role].write_end = chunk_size;
  595. }
  596. }
  597. }
  598. #ifdef RMW_DEBUG
  599. printf("calc_rmw_parity:\n");
  600. for (int role = 0; role < pg_size; role++)
  601. {
  602. auto & s = stripes[role];
  603. printf(
  604. "Tr=%lu Tw=%lu Q=%x-%x R=%x-%x W=%x-%x Rb=%lx Wb=%lx\n",
  605. read_osd_set[role], write_osd_set[role],
  606. s.req_start, s.req_end,
  607. s.read_start, s.read_end,
  608. s.write_start, s.write_end,
  609. (uint64_t)s.read_buf,
  610. (uint64_t)s.write_buf
  611. );
  612. }
  613. #endif
  614. }
  615. void calc_rmw_parity_xor(osd_rmw_stripe_t *stripes, int pg_size, uint64_t *read_osd_set, uint64_t *write_osd_set,
  616. uint32_t chunk_size, uint32_t bitmap_size)
  617. {
  618. uint32_t bitmap_granularity = bitmap_size > 0 ? chunk_size / bitmap_size / 8 : 0;
  619. int pg_minsize = pg_size-1;
  620. reconstruct_stripes_xor(stripes, pg_size, bitmap_size);
  621. uint32_t start = 0, end = 0;
  622. calc_rmw_parity_copy_mod(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, bitmap_granularity, start, end);
  623. if (write_osd_set[pg_minsize] != 0 && end != 0)
  624. {
  625. // Calculate new parity (XOR k+1)
  626. int parity = pg_minsize, prev = -2;
  627. for (int other = 0; other < pg_minsize; other++)
  628. {
  629. if (prev == -2)
  630. {
  631. prev = other;
  632. }
  633. else
  634. {
  635. int n1 = 0, n2 = 0;
  636. buf_len_t xor1[3], xor2[3];
  637. if (prev == -1)
  638. {
  639. xor1[n1++] = { .buf = stripes[parity].write_buf, .len = end-start };
  640. memxor(stripes[parity].bmp_buf, stripes[other].bmp_buf, stripes[parity].bmp_buf, bitmap_size);
  641. }
  642. else
  643. {
  644. memxor(stripes[prev].bmp_buf, stripes[other].bmp_buf, stripes[parity].bmp_buf, bitmap_size);
  645. get_old_new_buffers(stripes[prev], start, end, xor1, n1);
  646. prev = -1;
  647. }
  648. get_old_new_buffers(stripes[other], start, end, xor2, n2);
  649. xor_multiple_buffers(xor1, n1, xor2, n2, stripes[parity].write_buf, end-start);
  650. }
  651. }
  652. }
  653. calc_rmw_parity_copy_parity(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end);
  654. }
  655. void calc_rmw_parity_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg_minsize,
  656. uint64_t *read_osd_set, uint64_t *write_osd_set, uint32_t chunk_size, uint32_t bitmap_size)
  657. {
  658. uint32_t bitmap_granularity = bitmap_size > 0 ? chunk_size / bitmap_size / 8 : 0;
  659. reed_sol_matrix_t *matrix = get_jerasure_matrix(pg_size, pg_minsize);
  660. reconstruct_stripes_jerasure(stripes, pg_size, pg_minsize, bitmap_size);
  661. uint32_t start = 0, end = 0;
  662. calc_rmw_parity_copy_mod(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, bitmap_granularity, start, end);
  663. if (end != 0)
  664. {
  665. int i;
  666. for (i = pg_minsize; i < pg_size; i++)
  667. {
  668. if (write_osd_set[i] != 0)
  669. break;
  670. }
  671. if (i < pg_size)
  672. {
  673. // Calculate new coding chunks
  674. buf_len_t bufs[pg_size][3];
  675. int nbuf[pg_size] = { 0 }, curbuf[pg_size] = { 0 };
  676. uint32_t positions[pg_size];
  677. void *data_ptrs[pg_size] = { 0 };
  678. for (int i = 0; i < pg_minsize; i++)
  679. {
  680. get_old_new_buffers(stripes[i], start, end, bufs[i], nbuf[i]);
  681. positions[i] = start;
  682. }
  683. for (int i = pg_minsize; i < pg_size; i++)
  684. {
  685. bufs[i][nbuf[i]++] = { .buf = stripes[i].write_buf, .len = end-start };
  686. positions[i] = start;
  687. }
  688. uint32_t pos = start;
  689. while (pos < end)
  690. {
  691. uint32_t next_end = end;
  692. for (int i = 0; i < pg_size; i++)
  693. {
  694. assert(curbuf[i] < nbuf[i]);
  695. assert(bufs[i][curbuf[i]].buf);
  696. data_ptrs[i] = bufs[i][curbuf[i]].buf + pos-positions[i];
  697. uint32_t this_end = bufs[i][curbuf[i]].len + positions[i];
  698. if (next_end > this_end)
  699. next_end = this_end;
  700. }
  701. assert(next_end > pos);
  702. for (int i = 0; i < pg_size; i++)
  703. {
  704. uint32_t this_end = bufs[i][curbuf[i]].len + positions[i];
  705. if (next_end >= this_end)
  706. {
  707. positions[i] += bufs[i][curbuf[i]].len;
  708. curbuf[i]++;
  709. }
  710. }
  711. jerasure_matrix_encode(
  712. pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W, matrix->data,
  713. (char**)data_ptrs, (char**)data_ptrs+pg_minsize, next_end-pos
  714. );
  715. pos = next_end;
  716. }
  717. for (int i = 0; i < pg_size; i++)
  718. {
  719. data_ptrs[i] = stripes[i].bmp_buf;
  720. }
  721. jerasure_matrix_encode(
  722. pg_minsize, pg_size-pg_minsize, OSD_JERASURE_W, matrix->data,
  723. (char**)data_ptrs, (char**)data_ptrs+pg_minsize, bitmap_size
  724. );
  725. }
  726. }
  727. calc_rmw_parity_copy_parity(stripes, pg_size, pg_minsize, read_osd_set, write_osd_set, chunk_size, start, end);
  728. }