Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

835 lines
29 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "osd_primary.h"
  4. #include "allocator.h"
  5. // read: read directly or read paired stripe(s), reconstruct, return
  6. // write: read paired stripe(s), reconstruct, modify, calculate parity, write
  7. //
  8. // nuance: take care to read the same version from paired stripes!
  9. // to do so, we remember "last readable" version until a write request completes
  10. // and we postpone other write requests to the same stripe until completion of previous ones
  11. //
  12. // sync: sync peers, get unstable versions, stabilize them
  13. bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
  14. {
  15. // PG number is calculated from the offset
  16. // Our EC scheme stores data in fixed chunks equal to (K*block size)
  17. // K = (pg_size-parity_chunks) in case of EC/XOR, or 1 for replicated pools
  18. pool_id_t pool_id = INODE_POOL(cur_op->req.rw.inode);
  19. // FIXME: We have to access pool config here, so make sure that it doesn't change while its PGs are active...
  20. auto pool_cfg_it = st_cli.pool_config.find(pool_id);
  21. if (pool_cfg_it == st_cli.pool_config.end())
  22. {
  23. // Pool config is not loaded yet
  24. finish_op(cur_op, -EPIPE);
  25. return false;
  26. }
  27. auto & pool_cfg = pool_cfg_it->second;
  28. uint64_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
  29. uint64_t pg_block_size = bs_block_size * pg_data_size;
  30. object_id oid = {
  31. .inode = cur_op->req.rw.inode,
  32. // oid.stripe = starting offset of the parity stripe
  33. .stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
  34. };
  35. pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pool_cfg.pg_stripe_size) % pg_counts[pool_id] + 1;
  36. auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
  37. if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
  38. {
  39. // This OSD is not primary for this PG or the PG is inactive
  40. // FIXME: Allow reads from PGs degraded under pg_minsize, but don't allow writes
  41. finish_op(cur_op, -EPIPE);
  42. return false;
  43. }
  44. if ((cur_op->req.rw.offset + cur_op->req.rw.len) > (oid.stripe + pg_block_size) ||
  45. (cur_op->req.rw.offset % bs_bitmap_granularity) != 0 ||
  46. (cur_op->req.rw.len % bs_bitmap_granularity) != 0)
  47. {
  48. finish_op(cur_op, -EINVAL);
  49. return false;
  50. }
  51. int stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
  52. osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc_or_die(
  53. 1, sizeof(osd_primary_op_data_t) + (entry_attr_size + sizeof(osd_rmw_stripe_t)) * stripe_count
  54. );
  55. op_data->pg_num = pg_num;
  56. op_data->oid = oid;
  57. op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1));
  58. op_data->scheme = pool_cfg.scheme;
  59. op_data->pg_data_size = pg_data_size;
  60. cur_op->op_data = op_data;
  61. split_stripes(pg_data_size, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
  62. // Allocate bitmaps along with stripes to avoid extra allocations and fragmentation
  63. for (int i = 0; i < stripe_count; i++)
  64. {
  65. op_data->stripes[i].bmp_buf = (void*)(op_data->stripes+stripe_count) + entry_attr_size*i;
  66. }
  67. pg_it->second.inflight++;
  68. return true;
  69. }
  70. static uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state)
  71. {
  72. if (!(pg.state & (PG_HAS_INCOMPLETE | PG_HAS_DEGRADED | PG_HAS_MISPLACED)))
  73. {
  74. *object_state = NULL;
  75. return def;
  76. }
  77. auto st_it = pg.incomplete_objects.find(oid);
  78. if (st_it != pg.incomplete_objects.end())
  79. {
  80. *object_state = st_it->second;
  81. return st_it->second->read_target.data();
  82. }
  83. st_it = pg.degraded_objects.find(oid);
  84. if (st_it != pg.degraded_objects.end())
  85. {
  86. *object_state = st_it->second;
  87. return st_it->second->read_target.data();
  88. }
  89. st_it = pg.misplaced_objects.find(oid);
  90. if (st_it != pg.misplaced_objects.end())
  91. {
  92. *object_state = st_it->second;
  93. return st_it->second->read_target.data();
  94. }
  95. *object_state = NULL;
  96. return def;
  97. }
  98. void osd_t::continue_primary_read(osd_op_t *cur_op)
  99. {
  100. if (!cur_op->op_data && !prepare_primary_rw(cur_op))
  101. {
  102. return;
  103. }
  104. osd_primary_op_data_t *op_data = cur_op->op_data;
  105. if (op_data->st == 1) goto resume_1;
  106. else if (op_data->st == 2) goto resume_2;
  107. {
  108. auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
  109. for (int role = 0; role < op_data->pg_data_size; role++)
  110. {
  111. op_data->stripes[role].read_start = op_data->stripes[role].req_start;
  112. op_data->stripes[role].read_end = op_data->stripes[role].req_end;
  113. }
  114. // Determine version
  115. auto vo_it = pg.ver_override.find(op_data->oid);
  116. op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
  117. if (pg.state == PG_ACTIVE || op_data->scheme == POOL_SCHEME_REPLICATED)
  118. {
  119. // Fast happy-path
  120. cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_data_size, 0);
  121. submit_primary_subops(SUBMIT_READ, op_data->target_ver,
  122. (op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size), pg.cur_set.data(), cur_op);
  123. op_data->st = 1;
  124. }
  125. else
  126. {
  127. // PG may be degraded or have misplaced objects
  128. uint64_t* cur_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
  129. if (extend_missing_stripes(op_data->stripes, cur_set, op_data->pg_data_size, pg.pg_size) < 0)
  130. {
  131. finish_op(cur_op, -EIO);
  132. return;
  133. }
  134. // Submit reads
  135. op_data->pg_size = pg.pg_size;
  136. op_data->scheme = pg.scheme;
  137. op_data->degraded = 1;
  138. cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
  139. submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_size, cur_set, cur_op);
  140. op_data->st = 1;
  141. }
  142. }
  143. resume_1:
  144. return;
  145. resume_2:
  146. if (op_data->errors > 0)
  147. {
  148. finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
  149. return;
  150. }
  151. if (op_data->degraded)
  152. {
  153. // Reconstruct missing stripes
  154. osd_rmw_stripe_t *stripes = op_data->stripes;
  155. if (op_data->scheme == POOL_SCHEME_XOR)
  156. {
  157. reconstruct_stripes_xor(stripes, op_data->pg_size, entry_attr_size);
  158. }
  159. else if (op_data->scheme == POOL_SCHEME_JERASURE)
  160. {
  161. reconstruct_stripes_jerasure(stripes, op_data->pg_size, op_data->pg_data_size, entry_attr_size);
  162. }
  163. for (int role = 0; role < op_data->pg_size; role++)
  164. {
  165. if (stripes[role].req_end != 0)
  166. {
  167. // Send buffer in parts to avoid copying
  168. cur_op->iov.push_back(
  169. stripes[role].read_buf + (stripes[role].req_start - stripes[role].read_start),
  170. stripes[role].req_end - stripes[role].req_start
  171. );
  172. }
  173. }
  174. }
  175. else
  176. {
  177. cur_op->reply.rw.bitmap_len = op_data->pg_data_size * entry_attr_size;
  178. cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
  179. cur_op->iov.push_back(cur_op->buf, cur_op->req.rw.len);
  180. }
  181. finish_op(cur_op, cur_op->req.rw.len);
  182. }
  183. bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg)
  184. {
  185. osd_primary_op_data_t *op_data = cur_op->op_data;
  186. // Check if actions are pending for this object
  187. auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){
  188. .oid = op_data->oid,
  189. .osd_num = 0,
  190. });
  191. if (act_it != pg.flush_actions.end() &&
  192. act_it->first.oid.inode == op_data->oid.inode &&
  193. (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe)
  194. {
  195. pg.write_queue.emplace(op_data->oid, cur_op);
  196. return false;
  197. }
  198. // Check if there are other write requests to the same object
  199. auto vo_it = pg.write_queue.find(op_data->oid);
  200. if (vo_it != pg.write_queue.end())
  201. {
  202. op_data->st = 1;
  203. pg.write_queue.emplace(op_data->oid, cur_op);
  204. return false;
  205. }
  206. pg.write_queue.emplace(op_data->oid, cur_op);
  207. return true;
  208. }
  209. void osd_t::continue_primary_write(osd_op_t *cur_op)
  210. {
  211. if (!cur_op->op_data && !prepare_primary_rw(cur_op))
  212. {
  213. return;
  214. }
  215. osd_primary_op_data_t *op_data = cur_op->op_data;
  216. auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
  217. if (op_data->st == 1) goto resume_1;
  218. else if (op_data->st == 2) goto resume_2;
  219. else if (op_data->st == 3) goto resume_3;
  220. else if (op_data->st == 4) goto resume_4;
  221. else if (op_data->st == 5) goto resume_5;
  222. else if (op_data->st == 6) goto resume_6;
  223. else if (op_data->st == 7) goto resume_7;
  224. else if (op_data->st == 8) goto resume_8;
  225. else if (op_data->st == 9) goto resume_9;
  226. else if (op_data->st == 10) goto resume_10;
  227. assert(op_data->st == 0);
  228. if (!check_write_queue(cur_op, pg))
  229. {
  230. return;
  231. }
  232. resume_1:
  233. // Determine blocks to read and write
  234. // Missing chunks are allowed to be overwritten even in incomplete objects
  235. // FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact
  236. op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
  237. if (op_data->scheme == POOL_SCHEME_REPLICATED)
  238. {
  239. // Simplified algorithm
  240. op_data->stripes[0].write_start = op_data->stripes[0].req_start;
  241. op_data->stripes[0].write_end = op_data->stripes[0].req_end;
  242. op_data->stripes[0].write_buf = cur_op->buf;
  243. op_data->stripes[0].bmp_buf = (void*)(op_data->stripes+1);
  244. if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
  245. op_data->stripes[0].write_end != bs_block_size))
  246. {
  247. // Object is degraded/misplaced and will be moved to <write_osd_set>
  248. op_data->stripes[0].read_start = 0;
  249. op_data->stripes[0].read_end = bs_block_size;
  250. cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign_or_die(MEM_ALIGNMENT, bs_block_size);
  251. }
  252. }
  253. else
  254. {
  255. cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
  256. pg.pg_size, op_data->pg_data_size, pg.pg_cursize, pg.cur_set.data(), bs_block_size, entry_attr_size);
  257. if (!cur_op->rmw_buf)
  258. {
  259. // Refuse partial overwrite of an incomplete object
  260. cur_op->reply.hdr.retval = -EINVAL;
  261. goto continue_others;
  262. }
  263. }
  264. // Read required blocks
  265. submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
  266. resume_2:
  267. op_data->st = 2;
  268. return;
  269. resume_3:
  270. if (op_data->errors > 0)
  271. {
  272. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  273. return;
  274. }
  275. // Save version override for parallel reads
  276. pg.ver_override[op_data->oid] = op_data->fact_ver;
  277. if (op_data->scheme == POOL_SCHEME_REPLICATED)
  278. {
  279. // Set bitmap bits
  280. bitmap_set(op_data->stripes[0].bmp_buf, op_data->stripes[0].write_start, op_data->stripes[0].write_end, bs_bitmap_granularity);
  281. // Possibly copy new data from the request into the recovery buffer
  282. if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
  283. op_data->stripes[0].write_end != bs_block_size))
  284. {
  285. memcpy(
  286. op_data->stripes[0].read_buf + op_data->stripes[0].req_start,
  287. op_data->stripes[0].write_buf,
  288. op_data->stripes[0].req_end - op_data->stripes[0].req_start
  289. );
  290. op_data->stripes[0].write_buf = op_data->stripes[0].read_buf;
  291. op_data->stripes[0].write_start = 0;
  292. op_data->stripes[0].write_end = bs_block_size;
  293. }
  294. }
  295. else
  296. {
  297. // Recover missing stripes, calculate parity
  298. if (pg.scheme == POOL_SCHEME_XOR)
  299. {
  300. calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size, entry_attr_size);
  301. }
  302. else if (pg.scheme == POOL_SCHEME_JERASURE)
  303. {
  304. calc_rmw_parity_jerasure(op_data->stripes, pg.pg_size, op_data->pg_data_size, op_data->prev_set, pg.cur_set.data(), bs_block_size, entry_attr_size);
  305. }
  306. }
  307. // Send writes
  308. if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch)
  309. {
  310. op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1;
  311. }
  312. else
  313. {
  314. if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1))
  315. {
  316. assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1));
  317. pg.epoch++;
  318. }
  319. op_data->target_ver = op_data->fact_ver + 1;
  320. }
  321. if (pg.epoch > pg.reported_epoch)
  322. {
  323. // Report newer epoch before writing
  324. // FIXME: We may report only one PG state here...
  325. this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  326. pg.history_changed = true;
  327. report_pg_states();
  328. resume_10:
  329. if (pg.epoch > pg.reported_epoch)
  330. {
  331. op_data->st = 10;
  332. return;
  333. }
  334. }
  335. submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op);
  336. resume_4:
  337. op_data->st = 4;
  338. return;
  339. resume_5:
  340. if (op_data->errors > 0)
  341. {
  342. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  343. return;
  344. }
  345. resume_6:
  346. resume_7:
  347. if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
  348. {
  349. // FIXME: Check for immediate_commit == IMMEDIATE_SMALL
  350. return;
  351. }
  352. if (op_data->fact_ver == 1)
  353. {
  354. // Object is created
  355. pg.clean_count++;
  356. pg.total_count++;
  357. }
  358. if (op_data->object_state)
  359. {
  360. {
  361. int recovery_type = op_data->object_state->state & (OBJ_DEGRADED|OBJ_INCOMPLETE) ? 0 : 1;
  362. recovery_stat_count[0][recovery_type]++;
  363. if (!recovery_stat_count[0][recovery_type])
  364. {
  365. recovery_stat_count[0][recovery_type]++;
  366. recovery_stat_bytes[0][recovery_type] = 0;
  367. }
  368. for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++)
  369. {
  370. recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start;
  371. }
  372. }
  373. if (op_data->object_state->state & OBJ_MISPLACED)
  374. {
  375. // Remove extra chunks
  376. submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
  377. if (op_data->n_subops > 0)
  378. {
  379. resume_8:
  380. op_data->st = 8;
  381. return;
  382. resume_9:
  383. if (op_data->errors > 0)
  384. {
  385. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  386. return;
  387. }
  388. }
  389. }
  390. // Clear object state
  391. remove_object_from_state(op_data->oid, op_data->object_state, pg);
  392. pg.clean_count++;
  393. }
  394. cur_op->reply.hdr.retval = cur_op->req.rw.len;
  395. continue_others:
  396. // Remove version override
  397. pg.ver_override.erase(op_data->oid);
  398. object_id oid = op_data->oid;
  399. finish_op(cur_op, cur_op->reply.hdr.retval);
  400. // Continue other write operations to the same object
  401. auto next_it = pg.write_queue.find(oid);
  402. auto this_it = next_it;
  403. if (this_it != pg.write_queue.end() && this_it->second == cur_op)
  404. {
  405. next_it++;
  406. pg.write_queue.erase(this_it);
  407. if (next_it != pg.write_queue.end() && next_it->first == oid)
  408. {
  409. osd_op_t *next_op = next_it->second;
  410. continue_primary_write(next_op);
  411. }
  412. }
  413. }
  414. bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state)
  415. {
  416. osd_primary_op_data_t *op_data = cur_op->op_data;
  417. if (op_data->st == base_state)
  418. {
  419. goto resume_6;
  420. }
  421. else if (op_data->st == base_state+1)
  422. {
  423. goto resume_7;
  424. }
  425. // FIXME: Check for immediate_commit == IMMEDIATE_SMALL
  426. if (immediate_commit == IMMEDIATE_ALL)
  427. {
  428. if (op_data->scheme != POOL_SCHEME_REPLICATED)
  429. {
  430. // Send STABILIZE ops immediately
  431. op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
  432. op_data->unstable_writes = new obj_ver_id[loc_set.size()];
  433. {
  434. int last_start = 0;
  435. for (auto & chunk: loc_set)
  436. {
  437. op_data->unstable_writes[last_start] = (obj_ver_id){
  438. .oid = {
  439. .inode = op_data->oid.inode,
  440. .stripe = op_data->oid.stripe | chunk.role,
  441. },
  442. .version = op_data->fact_ver,
  443. };
  444. op_data->unstable_write_osds->push_back((unstable_osd_num_t){
  445. .osd_num = chunk.osd_num,
  446. .start = last_start,
  447. .len = 1,
  448. });
  449. last_start++;
  450. }
  451. }
  452. submit_primary_stab_subops(cur_op);
  453. resume_6:
  454. op_data->st = 6;
  455. return false;
  456. resume_7:
  457. // FIXME: Free those in the destructor?
  458. delete op_data->unstable_write_osds;
  459. delete[] op_data->unstable_writes;
  460. op_data->unstable_writes = NULL;
  461. op_data->unstable_write_osds = NULL;
  462. if (op_data->errors > 0)
  463. {
  464. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  465. return false;
  466. }
  467. }
  468. }
  469. else
  470. {
  471. if (op_data->scheme != POOL_SCHEME_REPLICATED)
  472. {
  473. // Remember version as unstable for EC/XOR
  474. for (auto & chunk: loc_set)
  475. {
  476. this->dirty_osds.insert(chunk.osd_num);
  477. this->unstable_writes[(osd_object_id_t){
  478. .osd_num = chunk.osd_num,
  479. .oid = {
  480. .inode = op_data->oid.inode,
  481. .stripe = op_data->oid.stripe | chunk.role,
  482. },
  483. }] = op_data->fact_ver;
  484. }
  485. }
  486. else
  487. {
  488. // Only remember to sync OSDs for replicated pools
  489. for (auto & chunk: loc_set)
  490. {
  491. this->dirty_osds.insert(chunk.osd_num);
  492. }
  493. }
  494. // Remember PG as dirty to drop the connection when PG goes offline
  495. // (this is required because of the "lazy sync")
  496. c_cli.clients[cur_op->peer_fd]->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  497. dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  498. }
  499. return true;
  500. }
  501. // Save and clear unstable_writes -> SYNC all -> STABLE all
  502. void osd_t::continue_primary_sync(osd_op_t *cur_op)
  503. {
  504. if (!cur_op->op_data)
  505. {
  506. cur_op->op_data = (osd_primary_op_data_t*)calloc_or_die(1, sizeof(osd_primary_op_data_t));
  507. }
  508. osd_primary_op_data_t *op_data = cur_op->op_data;
  509. if (op_data->st == 1) goto resume_1;
  510. else if (op_data->st == 2) goto resume_2;
  511. else if (op_data->st == 3) goto resume_3;
  512. else if (op_data->st == 4) goto resume_4;
  513. else if (op_data->st == 5) goto resume_5;
  514. else if (op_data->st == 6) goto resume_6;
  515. assert(op_data->st == 0);
  516. if (syncs_in_progress.size() > 0)
  517. {
  518. // Wait for previous syncs, if any
  519. // FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all
  520. syncs_in_progress.push_back(cur_op);
  521. op_data->st = 1;
  522. resume_1:
  523. return;
  524. }
  525. else
  526. {
  527. syncs_in_progress.push_back(cur_op);
  528. }
  529. resume_2:
  530. if (dirty_osds.size() == 0)
  531. {
  532. // Nothing to sync
  533. goto finish;
  534. }
  535. // Save and clear unstable_writes
  536. // In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication
  537. // It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway
  538. if (unstable_writes.size() > 0)
  539. {
  540. op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
  541. op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
  542. osd_num_t last_osd = 0;
  543. int last_start = 0, last_end = 0;
  544. for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
  545. {
  546. if (last_osd != it->first.osd_num)
  547. {
  548. if (last_osd != 0)
  549. {
  550. op_data->unstable_write_osds->push_back((unstable_osd_num_t){
  551. .osd_num = last_osd,
  552. .start = last_start,
  553. .len = last_end - last_start,
  554. });
  555. }
  556. last_osd = it->first.osd_num;
  557. last_start = last_end;
  558. }
  559. op_data->unstable_writes[last_end] = (obj_ver_id){
  560. .oid = it->first.oid,
  561. .version = it->second,
  562. };
  563. last_end++;
  564. }
  565. if (last_osd != 0)
  566. {
  567. op_data->unstable_write_osds->push_back((unstable_osd_num_t){
  568. .osd_num = last_osd,
  569. .start = last_start,
  570. .len = last_end - last_start,
  571. });
  572. }
  573. this->unstable_writes.clear();
  574. }
  575. {
  576. void *dirty_buf = malloc_or_die(sizeof(pool_pg_num_t)*dirty_pgs.size() + sizeof(osd_num_t)*dirty_osds.size());
  577. op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf;
  578. op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size());
  579. op_data->dirty_pg_count = dirty_pgs.size();
  580. op_data->dirty_osd_count = dirty_osds.size();
  581. int dpg = 0;
  582. for (auto dirty_pg_num: dirty_pgs)
  583. {
  584. pgs[dirty_pg_num].inflight++;
  585. op_data->dirty_pgs[dpg++] = dirty_pg_num;
  586. }
  587. dirty_pgs.clear();
  588. dpg = 0;
  589. for (auto osd_num: dirty_osds)
  590. {
  591. op_data->dirty_osds[dpg++] = osd_num;
  592. }
  593. dirty_osds.clear();
  594. }
  595. if (immediate_commit != IMMEDIATE_ALL)
  596. {
  597. // SYNC
  598. submit_primary_sync_subops(cur_op);
  599. resume_3:
  600. op_data->st = 3;
  601. return;
  602. resume_4:
  603. if (op_data->errors > 0)
  604. {
  605. goto resume_6;
  606. }
  607. }
  608. if (op_data->unstable_writes)
  609. {
  610. // Stabilize version sets, if any
  611. submit_primary_stab_subops(cur_op);
  612. resume_5:
  613. op_data->st = 5;
  614. return;
  615. }
  616. resume_6:
  617. if (op_data->errors > 0)
  618. {
  619. // Return PGs and OSDs back into their dirty sets
  620. for (int i = 0; i < op_data->dirty_pg_count; i++)
  621. {
  622. dirty_pgs.insert(op_data->dirty_pgs[i]);
  623. }
  624. for (int i = 0; i < op_data->dirty_osd_count; i++)
  625. {
  626. dirty_osds.insert(op_data->dirty_osds[i]);
  627. }
  628. if (op_data->unstable_writes)
  629. {
  630. // Return objects back into the unstable write set
  631. for (auto unstable_osd: *(op_data->unstable_write_osds))
  632. {
  633. for (int i = 0; i < unstable_osd.len; i++)
  634. {
  635. // Except those from peered PGs
  636. auto & w = op_data->unstable_writes[i];
  637. pool_pg_num_t wpg = {
  638. .pool_id = INODE_POOL(w.oid.inode),
  639. .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size),
  640. };
  641. if (pgs[wpg].state & PG_ACTIVE)
  642. {
  643. uint64_t & dest = this->unstable_writes[(osd_object_id_t){
  644. .osd_num = unstable_osd.osd_num,
  645. .oid = w.oid,
  646. }];
  647. dest = dest < w.version ? w.version : dest;
  648. dirty_pgs.insert(wpg);
  649. }
  650. }
  651. }
  652. }
  653. }
  654. for (int i = 0; i < op_data->dirty_pg_count; i++)
  655. {
  656. auto & pg = pgs.at(op_data->dirty_pgs[i]);
  657. pg.inflight--;
  658. if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
  659. {
  660. finish_stop_pg(pg);
  661. }
  662. }
  663. // FIXME: Free those in the destructor?
  664. free(op_data->dirty_pgs);
  665. op_data->dirty_pgs = NULL;
  666. op_data->dirty_osds = NULL;
  667. if (op_data->unstable_writes)
  668. {
  669. delete op_data->unstable_write_osds;
  670. delete[] op_data->unstable_writes;
  671. op_data->unstable_writes = NULL;
  672. op_data->unstable_write_osds = NULL;
  673. }
  674. if (op_data->errors > 0)
  675. {
  676. finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
  677. }
  678. else
  679. {
  680. finish:
  681. if (cur_op->peer_fd)
  682. {
  683. auto it = c_cli.clients.find(cur_op->peer_fd);
  684. if (it != c_cli.clients.end())
  685. it->second->dirty_pgs.clear();
  686. }
  687. finish_op(cur_op, 0);
  688. }
  689. assert(syncs_in_progress.front() == cur_op);
  690. syncs_in_progress.pop_front();
  691. if (syncs_in_progress.size() > 0)
  692. {
  693. cur_op = syncs_in_progress.front();
  694. op_data = cur_op->op_data;
  695. op_data->st++;
  696. goto resume_2;
  697. }
  698. }
  699. // Decrement pg_osd_set_state_t's object_count and change PG state accordingly
  700. void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg)
  701. {
  702. if (object_state->state & OBJ_INCOMPLETE)
  703. {
  704. // Successful write means that object is not incomplete anymore
  705. this->incomplete_objects--;
  706. pg.incomplete_objects.erase(oid);
  707. if (!pg.incomplete_objects.size())
  708. {
  709. pg.state = pg.state & ~PG_HAS_INCOMPLETE;
  710. report_pg_state(pg);
  711. }
  712. }
  713. else if (object_state->state & OBJ_DEGRADED)
  714. {
  715. this->degraded_objects--;
  716. pg.degraded_objects.erase(oid);
  717. if (!pg.degraded_objects.size())
  718. {
  719. pg.state = pg.state & ~PG_HAS_DEGRADED;
  720. report_pg_state(pg);
  721. }
  722. }
  723. else if (object_state->state & OBJ_MISPLACED)
  724. {
  725. this->misplaced_objects--;
  726. pg.misplaced_objects.erase(oid);
  727. if (!pg.misplaced_objects.size())
  728. {
  729. pg.state = pg.state & ~PG_HAS_MISPLACED;
  730. report_pg_state(pg);
  731. }
  732. }
  733. else
  734. {
  735. throw std::runtime_error("BUG: Invalid object state: "+std::to_string(object_state->state));
  736. }
  737. object_state->object_count--;
  738. if (!object_state->object_count)
  739. {
  740. pg.state_dict.erase(object_state->osd_set);
  741. }
  742. }
  743. void osd_t::continue_primary_del(osd_op_t *cur_op)
  744. {
  745. if (!cur_op->op_data && !prepare_primary_rw(cur_op))
  746. {
  747. return;
  748. }
  749. osd_primary_op_data_t *op_data = cur_op->op_data;
  750. auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
  751. if (op_data->st == 1) goto resume_1;
  752. else if (op_data->st == 2) goto resume_2;
  753. else if (op_data->st == 3) goto resume_3;
  754. else if (op_data->st == 4) goto resume_4;
  755. else if (op_data->st == 5) goto resume_5;
  756. assert(op_data->st == 0);
  757. // Delete is forbidden even in active PGs if they're also degraded or have previous dead OSDs
  758. if (pg.state & (PG_DEGRADED | PG_LEFT_ON_DEAD))
  759. {
  760. finish_op(cur_op, -EBUSY);
  761. return;
  762. }
  763. if (!check_write_queue(cur_op, pg))
  764. {
  765. return;
  766. }
  767. resume_1:
  768. // Determine which OSDs contain this object and delete it
  769. op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
  770. // Submit 1 read to determine the actual version number
  771. submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
  772. resume_2:
  773. op_data->st = 2;
  774. return;
  775. resume_3:
  776. if (op_data->errors > 0)
  777. {
  778. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  779. return;
  780. }
  781. // Save version override for parallel reads
  782. pg.ver_override[op_data->oid] = op_data->fact_ver;
  783. // Submit deletes
  784. op_data->fact_ver++;
  785. submit_primary_del_subops(cur_op, NULL, 0, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set);
  786. resume_4:
  787. op_data->st = 4;
  788. return;
  789. resume_5:
  790. if (op_data->errors > 0)
  791. {
  792. pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
  793. return;
  794. }
  795. // Remove version override
  796. pg.ver_override.erase(op_data->oid);
  797. // Adjust PG stats after "instant stabilize", because we need object_state above
  798. if (!op_data->object_state)
  799. {
  800. pg.clean_count--;
  801. }
  802. else
  803. {
  804. remove_object_from_state(op_data->oid, op_data->object_state, pg);
  805. }
  806. pg.total_count--;
  807. object_id oid = op_data->oid;
  808. finish_op(cur_op, cur_op->req.rw.len);
  809. // Continue other write operations to the same object
  810. auto next_it = pg.write_queue.find(oid);
  811. auto this_it = next_it;
  812. if (this_it != pg.write_queue.end() && this_it->second == cur_op)
  813. {
  814. next_it++;
  815. pg.write_queue.erase(this_it);
  816. if (next_it != pg.write_queue.end() &&
  817. next_it->first == oid)
  818. {
  819. osd_op_t *next_op = next_it->second;
  820. continue_primary_write(next_op);
  821. }
  822. }
  823. }