Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

619 lines
22 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include "osd_primary.h"
  4. void osd_t::autosync()
  5. {
  6. // FIXME Autosync based on the number of unstable writes to prevent
  7. // "journal_sector_buffer_count is too low for this batch" errors
  8. if (immediate_commit != IMMEDIATE_ALL && !autosync_op)
  9. {
  10. autosync_op = new osd_op_t();
  11. autosync_op->op_type = OSD_OP_IN;
  12. autosync_op->req = (osd_any_op_t){
  13. .sync = {
  14. .header = {
  15. .magic = SECONDARY_OSD_OP_MAGIC,
  16. .id = 1,
  17. .opcode = OSD_OP_SYNC,
  18. },
  19. },
  20. };
  21. autosync_op->callback = [this](osd_op_t *op)
  22. {
  23. if (op->reply.hdr.retval < 0)
  24. {
  25. printf("Warning: automatic sync resulted in an error: %ld (%s)\n", -op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
  26. }
  27. delete autosync_op;
  28. autosync_op = NULL;
  29. };
  30. exec_op(autosync_op);
  31. }
  32. }
  33. void osd_t::finish_op(osd_op_t *cur_op, int retval)
  34. {
  35. inflight_ops--;
  36. if (cur_op->req.hdr.opcode == OSD_OP_READ ||
  37. cur_op->req.hdr.opcode == OSD_OP_WRITE ||
  38. cur_op->req.hdr.opcode == OSD_OP_DELETE)
  39. {
  40. // Track inode statistics
  41. if (!cur_op->tv_end.tv_sec)
  42. {
  43. clock_gettime(CLOCK_REALTIME, &cur_op->tv_end);
  44. }
  45. uint64_t usec = (
  46. (cur_op->tv_end.tv_sec - cur_op->tv_begin.tv_sec)*1000000 +
  47. (cur_op->tv_end.tv_nsec - cur_op->tv_begin.tv_nsec)/1000
  48. );
  49. int inode_st_op = cur_op->req.hdr.opcode == OSD_OP_DELETE
  50. ? INODE_STATS_DELETE
  51. : (cur_op->req.hdr.opcode == OSD_OP_READ ? INODE_STATS_READ : INODE_STATS_WRITE);
  52. inode_stats[cur_op->req.rw.inode].op_count[inode_st_op]++;
  53. inode_stats[cur_op->req.rw.inode].op_sum[inode_st_op] += usec;
  54. if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
  55. inode_stats[cur_op->req.rw.inode].op_bytes[inode_st_op] += cur_op->op_data->pg_data_size * bs_block_size;
  56. else
  57. inode_stats[cur_op->req.rw.inode].op_bytes[inode_st_op] += cur_op->req.rw.len;
  58. }
  59. if (cur_op->op_data)
  60. {
  61. if (cur_op->op_data->pg_num > 0)
  62. {
  63. auto & pg = pgs.at({ .pool_id = INODE_POOL(cur_op->op_data->oid.inode), .pg_num = cur_op->op_data->pg_num });
  64. pg.inflight--;
  65. assert(pg.inflight >= 0);
  66. if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)
  67. {
  68. finish_stop_pg(pg);
  69. }
  70. else if ((pg.state & PG_REPEERING) && pg.inflight == 0 && !pg.flush_batch)
  71. {
  72. start_pg_peering(pg);
  73. }
  74. }
  75. assert(!cur_op->op_data->subops);
  76. free(cur_op->op_data);
  77. cur_op->op_data = NULL;
  78. }
  79. if (!cur_op->peer_fd)
  80. {
  81. // Copy lambda to be unaffected by `delete op`
  82. std::function<void(osd_op_t*)>(cur_op->callback)(cur_op);
  83. }
  84. else
  85. {
  86. // FIXME add separate magic number for primary ops
  87. auto cl_it = msgr.clients.find(cur_op->peer_fd);
  88. if (cl_it != msgr.clients.end())
  89. {
  90. cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
  91. cur_op->reply.hdr.id = cur_op->req.hdr.id;
  92. cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
  93. cur_op->reply.hdr.retval = retval;
  94. msgr.outbox_push(cur_op);
  95. }
  96. else
  97. {
  98. delete cur_op;
  99. }
  100. }
  101. }
  102. void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, const uint64_t* osd_set, osd_op_t *cur_op)
  103. {
  104. bool wr = submit_type == SUBMIT_WRITE;
  105. osd_primary_op_data_t *op_data = cur_op->op_data;
  106. osd_rmw_stripe_t *stripes = op_data->stripes;
  107. bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
  108. // Allocate subops
  109. int n_subops = 0, zero_read = -1;
  110. for (int role = 0; role < op_data->pg_size; role++)
  111. {
  112. if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1)
  113. zero_read = role;
  114. if (osd_set[role] != 0 && (wr || !rep && stripes[role].read_end != 0))
  115. n_subops++;
  116. }
  117. if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep))
  118. n_subops = 1;
  119. else
  120. zero_read = -1;
  121. osd_op_t *subops = new osd_op_t[n_subops];
  122. op_data->fact_ver = 0;
  123. op_data->done = op_data->errors = 0;
  124. op_data->n_subops = n_subops;
  125. op_data->subops = subops;
  126. int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read);
  127. assert(sent == n_subops);
  128. }
  129. int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t op_version,
  130. osd_rmw_stripe_t *stripes, const uint64_t* osd_set, osd_op_t *cur_op, int subop_idx, int zero_read)
  131. {
  132. bool wr = submit_type == SUBMIT_WRITE;
  133. osd_primary_op_data_t *op_data = cur_op->op_data;
  134. bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
  135. int i = subop_idx;
  136. for (int role = 0; role < op_data->pg_size; role++)
  137. {
  138. // We always submit zero-length writes to all replicas, even if the stripe is not modified
  139. if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role))
  140. {
  141. continue;
  142. }
  143. osd_num_t role_osd_num = osd_set[role];
  144. if (role_osd_num != 0)
  145. {
  146. int stripe_num = rep ? 0 : role;
  147. osd_op_t *subop = op_data->subops + i;
  148. if (role_osd_num == this->osd_num)
  149. {
  150. clock_gettime(CLOCK_REALTIME, &subop->tv_begin);
  151. subop->op_type = (uint64_t)cur_op;
  152. subop->bitmap = stripes[stripe_num].bmp_buf;
  153. subop->bitmap_len = clean_entry_bitmap_size;
  154. subop->bs_op = new blockstore_op_t({
  155. .opcode = (uint64_t)(wr ? (rep ? BS_OP_WRITE_STABLE : BS_OP_WRITE) : BS_OP_READ),
  156. .callback = [subop, this](blockstore_op_t *bs_subop)
  157. {
  158. handle_primary_bs_subop(subop);
  159. },
  160. .oid = {
  161. .inode = inode,
  162. .stripe = op_data->oid.stripe | stripe_num,
  163. },
  164. .version = op_version,
  165. .offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
  166. .len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
  167. .buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
  168. .bitmap = stripes[stripe_num].bmp_buf,
  169. });
  170. #ifdef OSD_DEBUG
  171. printf(
  172. "Submit %s to local: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read",
  173. inode, op_data->oid.stripe | stripe_num, op_version,
  174. subop->bs_op->offset, subop->bs_op->len
  175. );
  176. #endif
  177. bs->enqueue_op(subop->bs_op);
  178. }
  179. else
  180. {
  181. subop->op_type = OSD_OP_OUT;
  182. subop->peer_fd = msgr.osd_peer_fds.at(role_osd_num);
  183. subop->bitmap = stripes[stripe_num].bmp_buf;
  184. subop->bitmap_len = clean_entry_bitmap_size;
  185. subop->req.sec_rw = {
  186. .header = {
  187. .magic = SECONDARY_OSD_OP_MAGIC,
  188. .id = msgr.next_subop_id++,
  189. .opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ),
  190. },
  191. .oid = {
  192. .inode = inode,
  193. .stripe = op_data->oid.stripe | stripe_num,
  194. },
  195. .version = op_version,
  196. .offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
  197. .len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
  198. .attr_len = wr ? clean_entry_bitmap_size : 0,
  199. };
  200. #ifdef OSD_DEBUG
  201. printf(
  202. "Submit %s to osd %lu: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read", role_osd_num,
  203. inode, op_data->oid.stripe | stripe_num, op_version,
  204. subop->req.sec_rw.offset, subop->req.sec_rw.len
  205. );
  206. #endif
  207. if (wr)
  208. {
  209. if (stripes[stripe_num].write_end > stripes[stripe_num].write_start)
  210. {
  211. subop->iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
  212. }
  213. }
  214. else
  215. {
  216. if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
  217. {
  218. subop->iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start);
  219. }
  220. }
  221. subop->callback = [cur_op, this](osd_op_t *subop)
  222. {
  223. handle_primary_subop(subop, cur_op);
  224. };
  225. msgr.outbox_push(subop);
  226. }
  227. i++;
  228. }
  229. }
  230. return i-subop_idx;
  231. }
  232. static uint64_t bs_op_to_osd_op[] = {
  233. 0,
  234. OSD_OP_SEC_READ, // BS_OP_READ = 1
  235. OSD_OP_SEC_WRITE, // BS_OP_WRITE = 2
  236. OSD_OP_SEC_WRITE_STABLE, // BS_OP_WRITE_STABLE = 3
  237. OSD_OP_SEC_SYNC, // BS_OP_SYNC = 4
  238. OSD_OP_SEC_STABILIZE, // BS_OP_STABLE = 5
  239. OSD_OP_SEC_DELETE, // BS_OP_DELETE = 6
  240. OSD_OP_SEC_LIST, // BS_OP_LIST = 7
  241. OSD_OP_SEC_ROLLBACK, // BS_OP_ROLLBACK = 8
  242. OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL = 9
  243. };
  244. void osd_t::handle_primary_bs_subop(osd_op_t *subop)
  245. {
  246. osd_op_t *cur_op = (osd_op_t*)subop->op_type;
  247. blockstore_op_t *bs_op = subop->bs_op;
  248. int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE
  249. || bs_op->opcode == BS_OP_WRITE_STABLE ? bs_op->len : 0;
  250. if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ)
  251. {
  252. // die
  253. throw std::runtime_error(
  254. "local blockstore modification failed (opcode = "+std::to_string(bs_op->opcode)+
  255. " retval = "+std::to_string(bs_op->retval)+")"
  256. );
  257. }
  258. add_bs_subop_stats(subop);
  259. subop->req.hdr.opcode = bs_op_to_osd_op[bs_op->opcode];
  260. subop->reply.hdr.retval = bs_op->retval;
  261. if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE || bs_op->opcode == BS_OP_WRITE_STABLE)
  262. {
  263. subop->req.sec_rw.len = bs_op->len;
  264. subop->reply.sec_rw.version = bs_op->version;
  265. }
  266. delete bs_op;
  267. subop->bs_op = NULL;
  268. subop->peer_fd = -1;
  269. handle_primary_subop(subop, cur_op);
  270. }
  271. void osd_t::add_bs_subop_stats(osd_op_t *subop)
  272. {
  273. // Include local blockstore ops in statistics
  274. uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode];
  275. timespec tv_end;
  276. clock_gettime(CLOCK_REALTIME, &tv_end);
  277. msgr.stats.op_stat_count[opcode]++;
  278. if (!msgr.stats.op_stat_count[opcode])
  279. {
  280. msgr.stats.op_stat_count[opcode] = 1;
  281. msgr.stats.op_stat_sum[opcode] = 0;
  282. msgr.stats.op_stat_bytes[opcode] = 0;
  283. }
  284. msgr.stats.op_stat_sum[opcode] += (
  285. (tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 +
  286. (tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000
  287. );
  288. if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE)
  289. {
  290. msgr.stats.op_stat_bytes[opcode] += subop->bs_op->len;
  291. }
  292. }
  293. void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
  294. {
  295. uint64_t opcode = subop->req.hdr.opcode;
  296. int retval = subop->reply.hdr.retval;
  297. int expected;
  298. if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)
  299. expected = subop->req.sec_rw.len;
  300. else if (opcode == OSD_OP_SEC_READ_BMP)
  301. expected = subop->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size);
  302. else
  303. expected = 0;
  304. osd_primary_op_data_t *op_data = cur_op->op_data;
  305. if (retval != expected)
  306. {
  307. printf("%s subop failed: retval = %d (expected %d)\n", osd_op_names[opcode], retval, expected);
  308. if (retval == -EPIPE)
  309. {
  310. op_data->epipe++;
  311. }
  312. op_data->errors++;
  313. if (subop->peer_fd >= 0)
  314. {
  315. // Drop connection on any error
  316. msgr.stop_client(subop->peer_fd);
  317. }
  318. }
  319. else
  320. {
  321. op_data->done++;
  322. if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)
  323. {
  324. uint64_t version = subop->reply.sec_rw.version;
  325. #ifdef OSD_DEBUG
  326. uint64_t peer_osd = msgr.clients.find(subop->peer_fd) != msgr.clients.end()
  327. ? msgr.clients[subop->peer_fd]->osd_num : osd_num;
  328. printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version);
  329. #endif
  330. if (op_data->fact_ver != UINT64_MAX)
  331. {
  332. if (op_data->fact_ver != 0 && op_data->fact_ver != version)
  333. {
  334. throw std::runtime_error(
  335. "different fact_versions returned from "+std::string(osd_op_names[opcode])+
  336. " subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)
  337. );
  338. }
  339. op_data->fact_ver = version;
  340. }
  341. }
  342. }
  343. if ((op_data->errors + op_data->done) >= op_data->n_subops)
  344. {
  345. delete[] op_data->subops;
  346. op_data->subops = NULL;
  347. op_data->st++;
  348. if (cur_op->req.hdr.opcode == OSD_OP_READ)
  349. {
  350. continue_primary_read(cur_op);
  351. }
  352. else if (cur_op->req.hdr.opcode == OSD_OP_WRITE)
  353. {
  354. continue_primary_write(cur_op);
  355. }
  356. else if (cur_op->req.hdr.opcode == OSD_OP_SYNC)
  357. {
  358. continue_primary_sync(cur_op);
  359. }
  360. else if (cur_op->req.hdr.opcode == OSD_OP_DELETE)
  361. {
  362. continue_primary_del(cur_op);
  363. }
  364. else
  365. {
  366. throw std::runtime_error("BUG: unknown opcode");
  367. }
  368. }
  369. }
  370. void osd_t::cancel_primary_write(osd_op_t *cur_op)
  371. {
  372. if (cur_op->op_data && cur_op->op_data->subops)
  373. {
  374. // Primary-write operation is waiting for subops, subops
  375. // are sent to peer OSDs, so we can't just throw them away.
  376. // Mark them with an extra EPIPE.
  377. cur_op->op_data->errors++;
  378. cur_op->op_data->epipe++;
  379. cur_op->op_data->done--; // Caution: `done` must be signed because may become -1 here
  380. }
  381. else
  382. {
  383. finish_op(cur_op, -EPIPE);
  384. }
  385. }
  386. bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
  387. {
  388. for (uint64_t i = 0; i < size; i++)
  389. {
  390. if (osd_set[i] == osd_num)
  391. {
  392. return true;
  393. }
  394. }
  395. return false;
  396. }
  397. void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set)
  398. {
  399. osd_primary_op_data_t *op_data = cur_op->op_data;
  400. bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
  401. obj_ver_osd_t extra_chunks[loc_set.size()];
  402. int chunks_to_del = 0;
  403. for (auto & chunk: loc_set)
  404. {
  405. // ordered comparison for EC/XOR, unordered for replicated pools
  406. if (!cur_set || (rep
  407. ? !contains_osd(cur_set, set_size, chunk.osd_num)
  408. : (chunk.osd_num != cur_set[chunk.role])))
  409. {
  410. extra_chunks[chunks_to_del++] = (obj_ver_osd_t){
  411. .osd_num = chunk.osd_num,
  412. .oid = {
  413. .inode = op_data->oid.inode,
  414. .stripe = op_data->oid.stripe | (rep ? 0 : chunk.role),
  415. },
  416. // Same version as write
  417. .version = op_data->fact_ver,
  418. };
  419. }
  420. }
  421. submit_primary_del_batch(cur_op, extra_chunks, chunks_to_del);
  422. }
  423. void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count)
  424. {
  425. osd_primary_op_data_t *op_data = cur_op->op_data;
  426. op_data->n_subops = chunks_to_delete_count;
  427. op_data->done = op_data->errors = 0;
  428. if (!op_data->n_subops)
  429. {
  430. return;
  431. }
  432. osd_op_t *subops = new osd_op_t[chunks_to_delete_count];
  433. op_data->subops = subops;
  434. for (int i = 0; i < chunks_to_delete_count; i++)
  435. {
  436. auto & chunk = chunks_to_delete[i];
  437. if (chunk.osd_num == this->osd_num)
  438. {
  439. clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
  440. subops[i].op_type = (uint64_t)cur_op;
  441. subops[i].bs_op = new blockstore_op_t({
  442. .opcode = BS_OP_DELETE,
  443. .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
  444. {
  445. handle_primary_bs_subop(subop);
  446. },
  447. .oid = chunk.oid,
  448. .version = chunk.version,
  449. });
  450. bs->enqueue_op(subops[i].bs_op);
  451. }
  452. else
  453. {
  454. subops[i].op_type = OSD_OP_OUT;
  455. subops[i].peer_fd = msgr.osd_peer_fds.at(chunk.osd_num);
  456. subops[i].req = (osd_any_op_t){ .sec_del = {
  457. .header = {
  458. .magic = SECONDARY_OSD_OP_MAGIC,
  459. .id = msgr.next_subop_id++,
  460. .opcode = OSD_OP_SEC_DELETE,
  461. },
  462. .oid = chunk.oid,
  463. .version = chunk.version,
  464. } };
  465. subops[i].callback = [cur_op, this](osd_op_t *subop)
  466. {
  467. handle_primary_subop(subop, cur_op);
  468. };
  469. msgr.outbox_push(&subops[i]);
  470. }
  471. }
  472. }
  473. int osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
  474. {
  475. osd_primary_op_data_t *op_data = cur_op->op_data;
  476. int n_osds = op_data->dirty_osd_count;
  477. osd_op_t *subops = new osd_op_t[n_osds];
  478. op_data->done = op_data->errors = 0;
  479. op_data->n_subops = n_osds;
  480. op_data->subops = subops;
  481. std::map<uint64_t, int>::iterator peer_it;
  482. for (int i = 0; i < n_osds; i++)
  483. {
  484. osd_num_t sync_osd = op_data->dirty_osds[i];
  485. if (sync_osd == this->osd_num)
  486. {
  487. clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
  488. subops[i].op_type = (uint64_t)cur_op;
  489. subops[i].bs_op = new blockstore_op_t({
  490. .opcode = BS_OP_SYNC,
  491. .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
  492. {
  493. handle_primary_bs_subop(subop);
  494. },
  495. });
  496. bs->enqueue_op(subops[i].bs_op);
  497. }
  498. else if ((peer_it = msgr.osd_peer_fds.find(sync_osd)) != msgr.osd_peer_fds.end())
  499. {
  500. subops[i].op_type = OSD_OP_OUT;
  501. subops[i].peer_fd = peer_it->second;
  502. subops[i].req = (osd_any_op_t){ .sec_sync = {
  503. .header = {
  504. .magic = SECONDARY_OSD_OP_MAGIC,
  505. .id = msgr.next_subop_id++,
  506. .opcode = OSD_OP_SEC_SYNC,
  507. },
  508. } };
  509. subops[i].callback = [cur_op, this](osd_op_t *subop)
  510. {
  511. handle_primary_subop(subop, cur_op);
  512. };
  513. msgr.outbox_push(&subops[i]);
  514. }
  515. else
  516. {
  517. op_data->done++;
  518. }
  519. }
  520. if (op_data->done >= op_data->n_subops)
  521. {
  522. delete[] op_data->subops;
  523. op_data->subops = NULL;
  524. return 0;
  525. }
  526. return 1;
  527. }
  528. void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
  529. {
  530. osd_primary_op_data_t *op_data = cur_op->op_data;
  531. int n_osds = op_data->unstable_write_osds->size();
  532. osd_op_t *subops = new osd_op_t[n_osds];
  533. op_data->done = op_data->errors = 0;
  534. op_data->n_subops = n_osds;
  535. op_data->subops = subops;
  536. for (int i = 0; i < n_osds; i++)
  537. {
  538. auto & stab_osd = (*(op_data->unstable_write_osds))[i];
  539. if (stab_osd.osd_num == this->osd_num)
  540. {
  541. clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
  542. subops[i].op_type = (uint64_t)cur_op;
  543. subops[i].bs_op = new blockstore_op_t((blockstore_op_t){
  544. .opcode = BS_OP_STABLE,
  545. .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
  546. {
  547. handle_primary_bs_subop(subop);
  548. },
  549. .len = (uint32_t)stab_osd.len,
  550. .buf = (void*)(op_data->unstable_writes + stab_osd.start),
  551. });
  552. bs->enqueue_op(subops[i].bs_op);
  553. }
  554. else
  555. {
  556. subops[i].op_type = OSD_OP_OUT;
  557. subops[i].peer_fd = msgr.osd_peer_fds.at(stab_osd.osd_num);
  558. subops[i].req = (osd_any_op_t){ .sec_stab = {
  559. .header = {
  560. .magic = SECONDARY_OSD_OP_MAGIC,
  561. .id = msgr.next_subop_id++,
  562. .opcode = OSD_OP_SEC_STABILIZE,
  563. },
  564. .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)),
  565. } };
  566. subops[i].iov.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
  567. subops[i].callback = [cur_op, this](osd_op_t *subop)
  568. {
  569. handle_primary_subop(subop, cur_op);
  570. };
  571. msgr.outbox_push(&subops[i]);
  572. }
  573. }
  574. }
  575. void osd_t::pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval)
  576. {
  577. auto st_it = pg.write_queue.find(oid), it = st_it;
  578. if (it == pg.write_queue.end() || it->second != first_op)
  579. {
  580. // Write queue doesn't match the first operation.
  581. // first_op is a leftover operation from the previous peering of the same PG.
  582. finish_op(first_op, retval);
  583. return;
  584. }
  585. std::vector<osd_op_t*> cancel_ops;
  586. while (it != pg.write_queue.end() && it->first == oid)
  587. {
  588. cancel_ops.push_back(it->second);
  589. it++;
  590. }
  591. if (st_it != it)
  592. {
  593. // First erase them and then run finish_op() for the sake of reenterability
  594. // Calling finish_op() on a live iterator previously triggered a bug where some
  595. // of the OSDs were looping infinitely if you stopped all of them with kill -INT during recovery
  596. pg.write_queue.erase(st_it, it);
  597. for (auto op: cancel_ops)
  598. {
  599. finish_op(op, retval);
  600. }
  601. }
  602. }