Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

607 lines
22 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "osd_primary.h"
  4. void osd_t::autosync()
  5. {
  6. // FIXME Autosync based on the number of unstable writes to prevent
  7. // "journal_sector_buffer_count is too low for this batch" errors
  8. if (immediate_commit != IMMEDIATE_ALL && !autosync_op)
  9. {
  10. autosync_op = new osd_op_t();
  11. autosync_op->op_type = OSD_OP_IN;
  12. autosync_op->req = (osd_any_op_t){
  13. .sync = {
  14. .header = {
  15. .magic = SECONDARY_OSD_OP_MAGIC,
  16. .id = 1,
  17. .opcode = OSD_OP_SYNC,
  18. },
  19. },
  20. };
  21. autosync_op->callback = [this](osd_op_t *op)
  22. {
  23. if (op->reply.hdr.retval < 0)
  24. {
  25. printf("Warning: automatic sync resulted in an error: %ld (%s)\n", -op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
  26. }
  27. delete autosync_op;
  28. autosync_op = NULL;
  29. };
  30. exec_op(autosync_op);
  31. }
  32. }
  33. void osd_t::finish_op(osd_op_t *cur_op, int retval)
  34. {
  35. inflight_ops--;
  36. if (cur_op->req.hdr.opcode == OSD_OP_READ ||
  37. cur_op->req.hdr.opcode == OSD_OP_WRITE ||
  38. cur_op->req.hdr.opcode == OSD_OP_DELETE)
  39. {
  40. // Track inode statistics
  41. if (!cur_op->tv_end.tv_sec)
  42. {
  43. clock_gettime(CLOCK_REALTIME, &cur_op->tv_end);
  44. }
  45. uint64_t usec = (
  46. (cur_op->tv_end.tv_sec - cur_op->tv_begin.tv_sec)*1000000 +
  47. (cur_op->tv_end.tv_nsec - cur_op->tv_begin.tv_nsec)/1000
  48. );
  49. int inode_st_op = cur_op->req.hdr.opcode == OSD_OP_DELETE
  50. ? INODE_STATS_DELETE
  51. : (cur_op->req.hdr.opcode == OSD_OP_READ ? INODE_STATS_READ : INODE_STATS_WRITE);
  52. inode_stats[cur_op->req.rw.inode].op_count[inode_st_op]++;
  53. inode_stats[cur_op->req.rw.inode].op_sum[inode_st_op] += usec;
  54. if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
  55. inode_stats[cur_op->req.rw.inode].op_bytes[inode_st_op] += cur_op->op_data->pg_data_size * bs_block_size;
  56. else
  57. inode_stats[cur_op->req.rw.inode].op_bytes[inode_st_op] += cur_op->req.rw.len;
  58. }
  59. if (cur_op->op_data)
  60. {
  61. if (cur_op->op_data->pg_num > 0)
  62. {
  63. auto & pg = pgs[{ .pool_id = INODE_POOL(cur_op->op_data->oid.inode), .pg_num = cur_op->op_data->pg_num }];
  64. pg.inflight--;
  65. assert(pg.inflight >= 0);
  66. if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
  67. {
  68. finish_stop_pg(pg);
  69. }
  70. }
  71. assert(!cur_op->op_data->subops);
  72. assert(!cur_op->op_data->unstable_write_osds);
  73. assert(!cur_op->op_data->unstable_writes);
  74. assert(!cur_op->op_data->dirty_pgs);
  75. free(cur_op->op_data);
  76. cur_op->op_data = NULL;
  77. }
  78. if (!cur_op->peer_fd)
  79. {
  80. // Copy lambda to be unaffected by `delete op`
  81. std::function<void(osd_op_t*)>(cur_op->callback)(cur_op);
  82. }
  83. else
  84. {
  85. // FIXME add separate magic number for primary ops
  86. auto cl_it = c_cli.clients.find(cur_op->peer_fd);
  87. if (cl_it != c_cli.clients.end())
  88. {
  89. cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
  90. cur_op->reply.hdr.id = cur_op->req.hdr.id;
  91. cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
  92. cur_op->reply.hdr.retval = retval;
  93. c_cli.outbox_push(cur_op);
  94. }
  95. else
  96. {
  97. delete cur_op;
  98. }
  99. }
  100. }
  101. void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
  102. {
  103. bool wr = submit_type == SUBMIT_WRITE;
  104. osd_primary_op_data_t *op_data = cur_op->op_data;
  105. osd_rmw_stripe_t *stripes = op_data->stripes;
  106. bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
  107. // Allocate subops
  108. int n_subops = 0, zero_read = -1;
  109. for (int role = 0; role < pg_size; role++)
  110. {
  111. if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1)
  112. {
  113. zero_read = role;
  114. }
  115. if (osd_set[role] != 0 && (wr || !rep && stripes[role].read_end != 0))
  116. {
  117. n_subops++;
  118. }
  119. }
  120. if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep))
  121. {
  122. n_subops = 1;
  123. }
  124. else
  125. {
  126. zero_read = -1;
  127. }
  128. osd_op_t *subops = new osd_op_t[n_subops];
  129. op_data->fact_ver = 0;
  130. op_data->done = op_data->errors = 0;
  131. op_data->n_subops = n_subops;
  132. op_data->subops = subops;
  133. int i = 0;
  134. for (int role = 0; role < pg_size; role++)
  135. {
  136. // We always submit zero-length writes to all replicas, even if the stripe is not modified
  137. if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role))
  138. {
  139. continue;
  140. }
  141. osd_num_t role_osd_num = osd_set[role];
  142. if (role_osd_num != 0)
  143. {
  144. int stripe_num = rep ? 0 : role;
  145. if (role_osd_num == this->osd_num)
  146. {
  147. clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
  148. subops[i].op_type = (uint64_t)cur_op;
  149. subops[i].bs_op = new blockstore_op_t({
  150. .opcode = (uint64_t)(wr ? (rep ? BS_OP_WRITE_STABLE : BS_OP_WRITE) : BS_OP_READ),
  151. .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
  152. {
  153. handle_primary_bs_subop(subop);
  154. },
  155. .oid = {
  156. .inode = op_data->oid.inode,
  157. .stripe = op_data->oid.stripe | stripe_num,
  158. },
  159. .version = op_version,
  160. .offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
  161. .len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
  162. .buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
  163. .bitmap = stripes[stripe_num].bmp_buf,
  164. });
  165. #ifdef OSD_DEBUG
  166. printf(
  167. "Submit %s to local: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read",
  168. op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version,
  169. subops[i].bs_op->offset, subops[i].bs_op->len
  170. );
  171. #endif
  172. bs->enqueue_op(subops[i].bs_op);
  173. }
  174. else
  175. {
  176. subops[i].op_type = OSD_OP_OUT;
  177. subops[i].peer_fd = c_cli.osd_peer_fds.at(role_osd_num);
  178. subops[i].bitmap = stripes[stripe_num].bmp_buf;
  179. subops[i].bitmap_len = entry_attr_size;
  180. subops[i].req.sec_rw = {
  181. .header = {
  182. .magic = SECONDARY_OSD_OP_MAGIC,
  183. .id = c_cli.next_subop_id++,
  184. .opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ),
  185. },
  186. .oid = {
  187. .inode = op_data->oid.inode,
  188. .stripe = op_data->oid.stripe | stripe_num,
  189. },
  190. .version = op_version,
  191. .offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
  192. .len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
  193. .attr_len = entry_attr_size,
  194. };
  195. #ifdef OSD_DEBUG
  196. printf(
  197. "Submit %s to osd %lu: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read", role_osd_num,
  198. op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version,
  199. subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len
  200. );
  201. #endif
  202. if (wr)
  203. {
  204. if (stripes[stripe_num].write_end > stripes[stripe_num].write_start)
  205. {
  206. subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
  207. }
  208. }
  209. else
  210. {
  211. if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
  212. {
  213. subops[i].iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start);
  214. }
  215. }
  216. subops[i].callback = [cur_op, this](osd_op_t *subop)
  217. {
  218. int fail_fd = subop->req.hdr.opcode == OSD_OP_SEC_WRITE &&
  219. subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1;
  220. handle_primary_subop(subop, cur_op);
  221. if (fail_fd >= 0)
  222. {
  223. // write operation failed, drop the connection
  224. c_cli.stop_client(fail_fd);
  225. }
  226. };
  227. c_cli.outbox_push(&subops[i]);
  228. }
  229. i++;
  230. }
  231. }
  232. }
  233. static uint64_t bs_op_to_osd_op[] = {
  234. 0,
  235. OSD_OP_SEC_READ, // BS_OP_READ = 1
  236. OSD_OP_SEC_WRITE, // BS_OP_WRITE = 2
  237. OSD_OP_SEC_WRITE_STABLE, // BS_OP_WRITE_STABLE = 3
  238. OSD_OP_SEC_SYNC, // BS_OP_SYNC = 4
  239. OSD_OP_SEC_STABILIZE, // BS_OP_STABLE = 5
  240. OSD_OP_SEC_DELETE, // BS_OP_DELETE = 6
  241. OSD_OP_SEC_LIST, // BS_OP_LIST = 7
  242. OSD_OP_SEC_ROLLBACK, // BS_OP_ROLLBACK = 8
  243. OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL = 9
  244. };
  245. void osd_t::handle_primary_bs_subop(osd_op_t *subop)
  246. {
  247. osd_op_t *cur_op = (osd_op_t*)subop->op_type;
  248. blockstore_op_t *bs_op = subop->bs_op;
  249. int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE
  250. || bs_op->opcode == BS_OP_WRITE_STABLE ? bs_op->len : 0;
  251. if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ)
  252. {
  253. // die
  254. throw std::runtime_error(
  255. "local blockstore modification failed (opcode = "+std::to_string(bs_op->opcode)+
  256. " retval = "+std::to_string(bs_op->retval)+")"
  257. );
  258. }
  259. add_bs_subop_stats(subop);
  260. subop->req.hdr.opcode = bs_op_to_osd_op[bs_op->opcode];
  261. subop->reply.hdr.retval = bs_op->retval;
  262. if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE || bs_op->opcode == BS_OP_WRITE_STABLE)
  263. {
  264. subop->req.sec_rw.len = bs_op->len;
  265. subop->reply.sec_rw.version = bs_op->version;
  266. }
  267. delete bs_op;
  268. subop->bs_op = NULL;
  269. handle_primary_subop(subop, cur_op);
  270. }
  271. void osd_t::add_bs_subop_stats(osd_op_t *subop)
  272. {
  273. // Include local blockstore ops in statistics
  274. uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode];
  275. timespec tv_end;
  276. clock_gettime(CLOCK_REALTIME, &tv_end);
  277. c_cli.stats.op_stat_count[opcode]++;
  278. if (!c_cli.stats.op_stat_count[opcode])
  279. {
  280. c_cli.stats.op_stat_count[opcode] = 1;
  281. c_cli.stats.op_stat_sum[opcode] = 0;
  282. c_cli.stats.op_stat_bytes[opcode] = 0;
  283. }
  284. c_cli.stats.op_stat_sum[opcode] += (
  285. (tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 +
  286. (tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000
  287. );
  288. if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE)
  289. {
  290. c_cli.stats.op_stat_bytes[opcode] += subop->bs_op->len;
  291. }
  292. }
  293. void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
  294. {
  295. uint64_t opcode = subop->req.hdr.opcode;
  296. int retval = subop->reply.hdr.retval;
  297. int expected = opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE
  298. || opcode == OSD_OP_SEC_WRITE_STABLE ? subop->req.sec_rw.len : 0;
  299. osd_primary_op_data_t *op_data = cur_op->op_data;
  300. if (retval != expected)
  301. {
  302. printf("%s subop failed: retval = %d (expected %d)\n", osd_op_names[opcode], retval, expected);
  303. if (retval == -EPIPE)
  304. {
  305. op_data->epipe++;
  306. }
  307. op_data->errors++;
  308. }
  309. else
  310. {
  311. op_data->done++;
  312. if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)
  313. {
  314. uint64_t version = subop->reply.sec_rw.version;
  315. #ifdef OSD_DEBUG
  316. uint64_t peer_osd = c_cli.clients.find(subop->peer_fd) != c_cli.clients.end()
  317. ? c_cli.clients[subop->peer_fd].osd_num : osd_num;
  318. printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version);
  319. #endif
  320. if (op_data->fact_ver != 0 && op_data->fact_ver != version)
  321. {
  322. throw std::runtime_error(
  323. "different fact_versions returned from "+std::string(osd_op_names[opcode])+
  324. " subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)
  325. );
  326. }
  327. op_data->fact_ver = version;
  328. }
  329. }
  330. if ((op_data->errors + op_data->done) >= op_data->n_subops)
  331. {
  332. delete[] op_data->subops;
  333. op_data->subops = NULL;
  334. op_data->st++;
  335. if (cur_op->req.hdr.opcode == OSD_OP_READ)
  336. {
  337. continue_primary_read(cur_op);
  338. }
  339. else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
  340. {
  341. continue_primary_write(cur_op);
  342. }
  343. else if (cur_op->req.hdr.opcode == OSD_OP_SYNC)
  344. {
  345. continue_primary_sync(cur_op);
  346. }
  347. else if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
  348. {
  349. continue_primary_del(cur_op);
  350. }
  351. else
  352. {
  353. throw std::runtime_error("BUG: unknown opcode");
  354. }
  355. }
  356. }
  357. void osd_t::cancel_primary_write(osd_op_t *cur_op)
  358. {
  359. if (cur_op->op_data && cur_op->op_data->subops)
  360. {
  361. // Primary-write operation is waiting for subops, subops
  362. // are sent to peer OSDs, so we can't just throw them away.
  363. // Mark them with an extra EPIPE.
  364. cur_op->op_data->errors++;
  365. cur_op->op_data->epipe++;
  366. cur_op->op_data->done--; // Caution: `done` must be signed because may become -1 here
  367. }
  368. else
  369. {
  370. finish_op(cur_op, -EPIPE);
  371. }
  372. }
  373. static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
  374. {
  375. for (uint64_t i = 0; i < size; i++)
  376. {
  377. if (osd_set[i] == osd_num)
  378. {
  379. return true;
  380. }
  381. }
  382. return false;
  383. }
  384. void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set)
  385. {
  386. osd_primary_op_data_t *op_data = cur_op->op_data;
  387. bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
  388. int extra_chunks = 0;
  389. // ordered comparison for EC/XOR, unordered for replicated pools
  390. for (auto & chunk: loc_set)
  391. {
  392. if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
  393. {
  394. extra_chunks++;
  395. }
  396. }
  397. op_data->n_subops = extra_chunks;
  398. op_data->done = op_data->errors = 0;
  399. if (!extra_chunks)
  400. {
  401. return;
  402. }
  403. osd_op_t *subops = new osd_op_t[extra_chunks];
  404. op_data->subops = subops;
  405. int i = 0;
  406. for (auto & chunk: loc_set)
  407. {
  408. if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
  409. {
  410. int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role;
  411. if (chunk.osd_num == this->osd_num)
  412. {
  413. clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
  414. subops[i].op_type = (uint64_t)cur_op;
  415. subops[i].bs_op = new blockstore_op_t({
  416. .opcode = BS_OP_DELETE,
  417. .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
  418. {
  419. handle_primary_bs_subop(subop);
  420. },
  421. .oid = {
  422. .inode = op_data->oid.inode,
  423. .stripe = op_data->oid.stripe | stripe_num,
  424. },
  425. // Same version as write
  426. .version = op_data->fact_ver,
  427. });
  428. bs->enqueue_op(subops[i].bs_op);
  429. }
  430. else
  431. {
  432. subops[i].op_type = OSD_OP_OUT;
  433. subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num);
  434. subops[i].req.sec_del = {
  435. .header = {
  436. .magic = SECONDARY_OSD_OP_MAGIC,
  437. .id = c_cli.next_subop_id++,
  438. .opcode = OSD_OP_SEC_DELETE,
  439. },
  440. .oid = {
  441. .inode = op_data->oid.inode,
  442. .stripe = op_data->oid.stripe | stripe_num,
  443. },
  444. // Same version as write
  445. .version = op_data->fact_ver,
  446. };
  447. subops[i].callback = [cur_op, this](osd_op_t *subop)
  448. {
  449. int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
  450. handle_primary_subop(subop, cur_op);
  451. if (fail_fd >= 0)
  452. {
  453. // delete operation failed, drop the connection
  454. c_cli.stop_client(fail_fd);
  455. }
  456. };
  457. c_cli.outbox_push(&subops[i]);
  458. }
  459. i++;
  460. }
  461. }
  462. }
  463. void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
  464. {
  465. osd_primary_op_data_t *op_data = cur_op->op_data;
  466. int n_osds = op_data->dirty_osd_count;
  467. osd_op_t *subops = new osd_op_t[n_osds];
  468. op_data->done = op_data->errors = 0;
  469. op_data->n_subops = n_osds;
  470. op_data->subops = subops;
  471. for (int i = 0; i < n_osds; i++)
  472. {
  473. osd_num_t sync_osd = op_data->dirty_osds[i];
  474. if (sync_osd == this->osd_num)
  475. {
  476. clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
  477. subops[i].op_type = (uint64_t)cur_op;
  478. subops[i].bs_op = new blockstore_op_t({
  479. .opcode = BS_OP_SYNC,
  480. .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
  481. {
  482. handle_primary_bs_subop(subop);
  483. },
  484. });
  485. bs->enqueue_op(subops[i].bs_op);
  486. }
  487. else
  488. {
  489. subops[i].op_type = OSD_OP_OUT;
  490. subops[i].peer_fd = c_cli.osd_peer_fds.at(sync_osd);
  491. subops[i].req.sec_sync = {
  492. .header = {
  493. .magic = SECONDARY_OSD_OP_MAGIC,
  494. .id = c_cli.next_subop_id++,
  495. .opcode = OSD_OP_SEC_SYNC,
  496. },
  497. };
  498. subops[i].callback = [cur_op, this](osd_op_t *subop)
  499. {
  500. int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
  501. handle_primary_subop(subop, cur_op);
  502. if (fail_fd >= 0)
  503. {
  504. // sync operation failed, drop the connection
  505. c_cli.stop_client(fail_fd);
  506. }
  507. };
  508. c_cli.outbox_push(&subops[i]);
  509. }
  510. }
  511. }
  512. void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
  513. {
  514. osd_primary_op_data_t *op_data = cur_op->op_data;
  515. int n_osds = op_data->unstable_write_osds->size();
  516. osd_op_t *subops = new osd_op_t[n_osds];
  517. op_data->done = op_data->errors = 0;
  518. op_data->n_subops = n_osds;
  519. op_data->subops = subops;
  520. for (int i = 0; i < n_osds; i++)
  521. {
  522. auto & stab_osd = (*(op_data->unstable_write_osds))[i];
  523. if (stab_osd.osd_num == this->osd_num)
  524. {
  525. clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
  526. subops[i].op_type = (uint64_t)cur_op;
  527. subops[i].bs_op = new blockstore_op_t((blockstore_op_t){
  528. .opcode = BS_OP_STABLE,
  529. .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
  530. {
  531. handle_primary_bs_subop(subop);
  532. },
  533. .len = (uint32_t)stab_osd.len,
  534. .buf = (void*)(op_data->unstable_writes + stab_osd.start),
  535. });
  536. bs->enqueue_op(subops[i].bs_op);
  537. }
  538. else
  539. {
  540. subops[i].op_type = OSD_OP_OUT;
  541. subops[i].peer_fd = c_cli.osd_peer_fds.at(stab_osd.osd_num);
  542. subops[i].req.sec_stab = {
  543. .header = {
  544. .magic = SECONDARY_OSD_OP_MAGIC,
  545. .id = c_cli.next_subop_id++,
  546. .opcode = OSD_OP_SEC_STABILIZE,
  547. },
  548. .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)),
  549. };
  550. subops[i].iov.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
  551. subops[i].callback = [cur_op, this](osd_op_t *subop)
  552. {
  553. int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1;
  554. handle_primary_subop(subop, cur_op);
  555. if (fail_fd >= 0)
  556. {
  557. // sync operation failed, drop the connection
  558. c_cli.stop_client(fail_fd);
  559. }
  560. };
  561. c_cli.outbox_push(&subops[i]);
  562. }
  563. }
  564. }
  565. void osd_t::pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval)
  566. {
  567. auto st_it = pg.write_queue.find(oid), it = st_it;
  568. if (it == pg.write_queue.end() || it->second != first_op)
  569. {
  570. // Write queue doesn't match the first operation.
  571. // first_op is a leftover operation from the previous peering of the same PG.
  572. finish_op(first_op, retval);
  573. return;
  574. }
  575. std::vector<osd_op_t*> cancel_ops;
  576. while (it != pg.write_queue.end())
  577. {
  578. cancel_ops.push_back(it->second);
  579. it++;
  580. }
  581. if (st_it != it)
  582. {
  583. // First erase them and then run finish_op() for the sake of reenterability
  584. // Calling finish_op() on a live iterator previously triggered a bug where some
  585. // of the OSDs were looping infinitely if you stopped all of them with kill -INT during recovery
  586. pg.write_queue.erase(st_it, it);
  587. for (auto op: cancel_ops)
  588. {
  589. finish_op(op, retval);
  590. }
  591. }
  592. }