Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

573 lines
18 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 (see README.md for details)
  3. #include <netinet/tcp.h>
  4. #include <sys/epoll.h>
  5. #include <algorithm>
  6. #include "base64.h"
  7. #include "osd.h"
  8. // Peering loop
  9. void osd_t::handle_peers()
  10. {
  11. if (peering_state & OSD_PEERING_PGS)
  12. {
  13. bool still = false;
  14. for (auto & p: pgs)
  15. {
  16. if (p.second.state == PG_PEERING)
  17. {
  18. if (!p.second.peering_state->list_ops.size())
  19. {
  20. p.second.calc_object_states(log_level);
  21. report_pg_state(p.second);
  22. incomplete_objects += p.second.incomplete_objects.size();
  23. misplaced_objects += p.second.misplaced_objects.size();
  24. // FIXME: degraded objects may currently include misplaced, too! Report them separately?
  25. degraded_objects += p.second.degraded_objects.size();
  26. if ((p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN)) == (PG_ACTIVE | PG_HAS_UNCLEAN))
  27. peering_state = peering_state | OSD_FLUSHING_PGS;
  28. else if (p.second.state & PG_ACTIVE)
  29. peering_state = peering_state | OSD_RECOVERING;
  30. }
  31. else
  32. {
  33. still = true;
  34. }
  35. }
  36. }
  37. if (!still)
  38. {
  39. // Done all PGs
  40. peering_state = peering_state & ~OSD_PEERING_PGS;
  41. }
  42. }
  43. if ((peering_state & OSD_FLUSHING_PGS) && !readonly)
  44. {
  45. bool still = false;
  46. for (auto & p: pgs)
  47. {
  48. if ((p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN)) == (PG_ACTIVE | PG_HAS_UNCLEAN))
  49. {
  50. if (!p.second.flush_batch)
  51. {
  52. submit_pg_flush_ops(p.second);
  53. }
  54. still = true;
  55. }
  56. }
  57. if (!still)
  58. {
  59. peering_state = peering_state & ~OSD_FLUSHING_PGS | OSD_RECOVERING;
  60. }
  61. }
  62. if ((peering_state & OSD_RECOVERING) && !readonly)
  63. {
  64. if (!continue_recovery())
  65. {
  66. peering_state = peering_state & ~OSD_RECOVERING;
  67. }
  68. }
  69. }
  70. void osd_t::repeer_pgs(osd_num_t peer_osd)
  71. {
  72. // Re-peer affected PGs
  73. for (auto & p: pgs)
  74. {
  75. auto & pg = p.second;
  76. bool repeer = false;
  77. if (pg.state & (PG_PEERING | PG_ACTIVE | PG_INCOMPLETE))
  78. {
  79. for (osd_num_t pg_osd: pg.all_peers)
  80. {
  81. if (pg_osd == peer_osd)
  82. {
  83. repeer = true;
  84. break;
  85. }
  86. }
  87. if (repeer)
  88. {
  89. // Repeer this pg
  90. printf("[PG %u/%u] Repeer because of OSD %lu\n", pg.pool_id, pg.pg_num, peer_osd);
  91. if (!(pg.state & (PG_ACTIVE | PG_REPEERING)) || pg.inflight == 0 && !pg.flush_batch)
  92. {
  93. start_pg_peering(pg);
  94. }
  95. else
  96. {
  97. // Stop accepting new operations, wait for current ones to finish or fail
  98. pg.state = pg.state & ~PG_ACTIVE | PG_REPEERING;
  99. report_pg_state(pg);
  100. }
  101. }
  102. }
  103. }
  104. }
  105. // Reset PG state (when peering or stopping)
  106. void osd_t::reset_pg(pg_t & pg)
  107. {
  108. pg.cur_peers.clear();
  109. pg.state_dict.clear();
  110. copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size();
  111. pg.copies_to_delete_after_sync.clear();
  112. incomplete_objects -= pg.incomplete_objects.size();
  113. misplaced_objects -= pg.misplaced_objects.size();
  114. degraded_objects -= pg.degraded_objects.size();
  115. pg.incomplete_objects.clear();
  116. pg.misplaced_objects.clear();
  117. pg.degraded_objects.clear();
  118. pg.flush_actions.clear();
  119. pg.ver_override.clear();
  120. if (pg.flush_batch)
  121. {
  122. delete pg.flush_batch;
  123. }
  124. pg.flush_batch = NULL;
  125. for (auto p: pg.write_queue)
  126. {
  127. cancel_primary_write(p.second);
  128. }
  129. pg.write_queue.clear();
  130. uint64_t pg_stripe_size = st_cli.pool_config[pg.pool_id].pg_stripe_size;
  131. for (auto it = unstable_writes.begin(); it != unstable_writes.end(); )
  132. {
  133. // Forget this PG's unstable writes
  134. if (INODE_POOL(it->first.oid.inode) == pg.pool_id && map_to_pg(it->first.oid, pg_stripe_size) == pg.pg_num)
  135. unstable_writes.erase(it++);
  136. else
  137. it++;
  138. }
  139. dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  140. }
  141. // Repeer on each connect/disconnect peer event
  142. void osd_t::start_pg_peering(pg_t & pg)
  143. {
  144. pg.state = PG_PEERING;
  145. this->peering_state |= OSD_PEERING_PGS;
  146. reset_pg(pg);
  147. report_pg_state(pg);
  148. // Drop connections of clients who have this PG in dirty_pgs
  149. if (immediate_commit != IMMEDIATE_ALL)
  150. {
  151. std::vector<int> to_stop;
  152. for (auto & cp: msgr.clients)
  153. {
  154. if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end())
  155. {
  156. to_stop.push_back(cp.first);
  157. }
  158. }
  159. for (auto peer_fd: to_stop)
  160. {
  161. msgr.stop_client(peer_fd);
  162. }
  163. }
  164. // Calculate current write OSD set
  165. pg.pg_cursize = 0;
  166. pg.cur_set.resize(pg.target_set.size());
  167. pg.cur_loc_set.clear();
  168. for (int role = 0; role < pg.target_set.size(); role++)
  169. {
  170. pg.cur_set[role] = pg.target_set[role] == this->osd_num ||
  171. msgr.osd_peer_fds.find(pg.target_set[role]) != msgr.osd_peer_fds.end() ? pg.target_set[role] : 0;
  172. if (pg.cur_set[role] != 0)
  173. {
  174. pg.pg_cursize++;
  175. pg.cur_loc_set.push_back({
  176. .role = (uint64_t)role,
  177. .osd_num = pg.cur_set[role],
  178. .outdated = false,
  179. });
  180. }
  181. }
  182. if (pg.target_history.size())
  183. {
  184. // Refuse to start PG if no peers are available from any of the historical OSD sets
  185. // (PG history is kept up to the latest active+clean state)
  186. for (auto & history_set: pg.target_history)
  187. {
  188. bool found = true;
  189. for (auto history_osd: history_set)
  190. {
  191. if (history_osd != 0)
  192. {
  193. found = false;
  194. if (history_osd == this->osd_num ||
  195. msgr.osd_peer_fds.find(history_osd) != msgr.osd_peer_fds.end())
  196. {
  197. found = true;
  198. break;
  199. }
  200. }
  201. }
  202. if (!found)
  203. {
  204. pg.state = PG_INCOMPLETE;
  205. report_pg_state(pg);
  206. return;
  207. }
  208. }
  209. }
  210. if (pg.pg_cursize < pg.pg_minsize)
  211. {
  212. pg.state = PG_INCOMPLETE;
  213. report_pg_state(pg);
  214. return;
  215. }
  216. std::set<osd_num_t> cur_peers;
  217. for (auto pg_osd: pg.all_peers)
  218. {
  219. if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end())
  220. {
  221. cur_peers.insert(pg_osd);
  222. }
  223. else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end())
  224. {
  225. msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
  226. }
  227. }
  228. pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end());
  229. if (pg.peering_state)
  230. {
  231. // Adjust the peering operation that's still in progress - discard unneeded results
  232. for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end();)
  233. {
  234. if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end())
  235. {
  236. // Discard the result after completion, which, chances are, will be unsuccessful
  237. discard_list_subop(it->second);
  238. pg.peering_state->list_ops.erase(it++);
  239. }
  240. else
  241. it++;
  242. }
  243. for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end();)
  244. {
  245. if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end())
  246. {
  247. if (it->second.buf)
  248. {
  249. free(it->second.buf);
  250. }
  251. pg.peering_state->list_results.erase(it++);
  252. }
  253. else
  254. it++;
  255. }
  256. }
  257. if (pg.state == PG_INCOMPLETE)
  258. {
  259. if (pg.peering_state)
  260. {
  261. delete pg.peering_state;
  262. pg.peering_state = NULL;
  263. }
  264. return;
  265. }
  266. if (!pg.peering_state)
  267. {
  268. pg.peering_state = new pg_peering_state_t();
  269. pg.peering_state->pool_id = pg.pool_id;
  270. pg.peering_state->pg_num = pg.pg_num;
  271. }
  272. for (osd_num_t peer_osd: cur_peers)
  273. {
  274. if (pg.peering_state->list_ops.find(peer_osd) != pg.peering_state->list_ops.end() ||
  275. pg.peering_state->list_results.find(peer_osd) != pg.peering_state->list_results.end())
  276. {
  277. continue;
  278. }
  279. submit_sync_and_list_subop(peer_osd, pg.peering_state);
  280. }
  281. ringloop->wakeup();
  282. }
  283. void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
  284. {
  285. // Sync before listing, if not readonly
  286. if (readonly)
  287. {
  288. submit_list_subop(role_osd, ps);
  289. }
  290. else if (role_osd == this->osd_num)
  291. {
  292. // Self
  293. osd_op_t *op = new osd_op_t();
  294. op->op_type = 0;
  295. op->peer_fd = 0;
  296. clock_gettime(CLOCK_REALTIME, &op->tv_begin);
  297. op->bs_op = new blockstore_op_t();
  298. op->bs_op->opcode = BS_OP_SYNC;
  299. op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
  300. {
  301. if (bs_op->retval < 0)
  302. {
  303. printf("Local OP_SYNC failed: %d (%s)\n", bs_op->retval, strerror(-bs_op->retval));
  304. force_stop(1);
  305. return;
  306. }
  307. add_bs_subop_stats(op);
  308. delete op->bs_op;
  309. op->bs_op = NULL;
  310. delete op;
  311. ps->list_ops.erase(role_osd);
  312. submit_list_subop(role_osd, ps);
  313. };
  314. bs->enqueue_op(op->bs_op);
  315. ps->list_ops[role_osd] = op;
  316. }
  317. else
  318. {
  319. // Peer
  320. auto & cl = msgr.clients.at(msgr.osd_peer_fds[role_osd]);
  321. osd_op_t *op = new osd_op_t();
  322. op->op_type = OSD_OP_OUT;
  323. op->peer_fd = cl->peer_fd;
  324. op->req = (osd_any_op_t){
  325. .sec_sync = {
  326. .header = {
  327. .magic = SECONDARY_OSD_OP_MAGIC,
  328. .id = msgr.next_subop_id++,
  329. .opcode = OSD_OP_SEC_SYNC,
  330. },
  331. },
  332. };
  333. op->callback = [this, ps, role_osd](osd_op_t *op)
  334. {
  335. if (op->reply.hdr.retval < 0)
  336. {
  337. // FIXME: Mark peer as failed and don't reconnect immediately after dropping the connection
  338. printf("Failed to sync OSD %lu: %ld (%s), disconnecting peer\n", role_osd, op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
  339. int fail_fd = op->peer_fd;
  340. ps->list_ops.erase(role_osd);
  341. delete op;
  342. msgr.stop_client(fail_fd);
  343. return;
  344. }
  345. delete op;
  346. ps->list_ops.erase(role_osd);
  347. submit_list_subop(role_osd, ps);
  348. };
  349. msgr.outbox_push(op);
  350. ps->list_ops[role_osd] = op;
  351. }
  352. }
  353. void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
  354. {
  355. if (role_osd == this->osd_num)
  356. {
  357. // Self
  358. osd_op_t *op = new osd_op_t();
  359. op->op_type = 0;
  360. op->peer_fd = 0;
  361. clock_gettime(CLOCK_REALTIME, &op->tv_begin);
  362. op->bs_op = new blockstore_op_t();
  363. op->bs_op->opcode = BS_OP_LIST;
  364. op->bs_op->oid.stripe = st_cli.pool_config[ps->pool_id].pg_stripe_size;
  365. op->bs_op->oid.inode = ((uint64_t)ps->pool_id << (64 - POOL_ID_BITS));
  366. op->bs_op->version = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1;
  367. op->bs_op->len = pg_counts[ps->pool_id];
  368. op->bs_op->offset = ps->pg_num-1;
  369. op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
  370. {
  371. if (op->bs_op->retval < 0)
  372. {
  373. throw std::runtime_error("local OP_LIST failed");
  374. }
  375. add_bs_subop_stats(op);
  376. printf(
  377. "[PG %u/%u] Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n",
  378. ps->pool_id, ps->pg_num, role_osd, bs_op->retval, bs_op->version
  379. );
  380. ps->list_results[role_osd] = {
  381. .buf = (obj_ver_id*)op->bs_op->buf,
  382. .total_count = (uint64_t)op->bs_op->retval,
  383. .stable_count = op->bs_op->version,
  384. };
  385. ps->list_ops.erase(role_osd);
  386. delete op->bs_op;
  387. op->bs_op = NULL;
  388. delete op;
  389. };
  390. bs->enqueue_op(op->bs_op);
  391. ps->list_ops[role_osd] = op;
  392. }
  393. else
  394. {
  395. // Peer
  396. osd_op_t *op = new osd_op_t();
  397. op->op_type = OSD_OP_OUT;
  398. op->peer_fd = msgr.osd_peer_fds[role_osd];
  399. op->req = (osd_any_op_t){
  400. .sec_list = {
  401. .header = {
  402. .magic = SECONDARY_OSD_OP_MAGIC,
  403. .id = msgr.next_subop_id++,
  404. .opcode = OSD_OP_SEC_LIST,
  405. },
  406. .list_pg = ps->pg_num,
  407. .pg_count = pg_counts[ps->pool_id],
  408. .pg_stripe_size = st_cli.pool_config[ps->pool_id].pg_stripe_size,
  409. .min_inode = ((uint64_t)(ps->pool_id) << (64 - POOL_ID_BITS)),
  410. .max_inode = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1,
  411. },
  412. };
  413. op->callback = [this, ps, role_osd](osd_op_t *op)
  414. {
  415. if (op->reply.hdr.retval < 0)
  416. {
  417. printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
  418. int fail_fd = op->peer_fd;
  419. ps->list_ops.erase(role_osd);
  420. delete op;
  421. msgr.stop_client(fail_fd);
  422. return;
  423. }
  424. printf(
  425. "[PG %u/%u] Got object list from OSD %lu: %ld object versions (%lu of them stable)\n",
  426. ps->pool_id, ps->pg_num, role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count
  427. );
  428. ps->list_results[role_osd] = {
  429. .buf = (obj_ver_id*)op->buf,
  430. .total_count = (uint64_t)op->reply.hdr.retval,
  431. .stable_count = op->reply.sec_list.stable_count,
  432. };
  433. // set op->buf to NULL so it doesn't get freed
  434. op->buf = NULL;
  435. ps->list_ops.erase(role_osd);
  436. delete op;
  437. };
  438. msgr.outbox_push(op);
  439. ps->list_ops[role_osd] = op;
  440. }
  441. }
  442. void osd_t::discard_list_subop(osd_op_t *list_op)
  443. {
  444. if (list_op->peer_fd == 0)
  445. {
  446. // Self
  447. list_op->bs_op->callback = [list_op](blockstore_op_t *bs_op)
  448. {
  449. if (list_op->bs_op->buf)
  450. free(list_op->bs_op->buf);
  451. delete list_op->bs_op;
  452. list_op->bs_op = NULL;
  453. delete list_op;
  454. };
  455. }
  456. else
  457. {
  458. // Peer
  459. list_op->callback = [](osd_op_t *list_op)
  460. {
  461. delete list_op;
  462. };
  463. }
  464. }
  465. bool osd_t::stop_pg(pg_t & pg)
  466. {
  467. if (pg.peering_state)
  468. {
  469. // Stop peering
  470. for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end(); it++)
  471. {
  472. discard_list_subop(it->second);
  473. }
  474. for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end(); it++)
  475. {
  476. if (it->second.buf)
  477. {
  478. free(it->second.buf);
  479. }
  480. }
  481. delete pg.peering_state;
  482. pg.peering_state = NULL;
  483. }
  484. if (pg.state & (PG_STOPPING | PG_OFFLINE))
  485. {
  486. return false;
  487. }
  488. if (!(pg.state & (PG_ACTIVE | PG_REPEERING)))
  489. {
  490. finish_stop_pg(pg);
  491. return true;
  492. }
  493. pg.state = pg.state & ~PG_ACTIVE & ~PG_REPEERING | PG_STOPPING;
  494. if (pg.inflight == 0 && !pg.flush_batch)
  495. {
  496. finish_stop_pg(pg);
  497. }
  498. else
  499. {
  500. report_pg_state(pg);
  501. }
  502. return true;
  503. }
  504. void osd_t::finish_stop_pg(pg_t & pg)
  505. {
  506. pg.state = PG_OFFLINE;
  507. reset_pg(pg);
  508. report_pg_state(pg);
  509. }
  510. void osd_t::report_pg_state(pg_t & pg)
  511. {
  512. pg.print_state();
  513. this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  514. if (pg.state == PG_ACTIVE && (pg.target_history.size() > 0 || pg.all_peers.size() > pg.target_set.size()))
  515. {
  516. // Clear history of active+clean PGs
  517. pg.history_changed = true;
  518. pg.target_history.clear();
  519. pg.all_peers = pg.target_set;
  520. pg.cur_peers = pg.target_set;
  521. }
  522. else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
  523. {
  524. // Clear history of active+left_on_dead PGs, but leave dead OSDs in all_peers
  525. pg.history_changed = true;
  526. pg.target_history.clear();
  527. std::set<osd_num_t> dead_peers;
  528. for (auto pg_osd: pg.all_peers)
  529. {
  530. dead_peers.insert(pg_osd);
  531. }
  532. for (auto pg_osd: pg.cur_peers)
  533. {
  534. dead_peers.erase(pg_osd);
  535. }
  536. for (auto pg_osd: pg.target_set)
  537. {
  538. if (pg_osd)
  539. {
  540. dead_peers.insert(pg_osd);
  541. }
  542. }
  543. pg.all_peers.clear();
  544. pg.all_peers.insert(pg.all_peers.begin(), dead_peers.begin(), dead_peers.end());
  545. pg.cur_peers.clear();
  546. for (auto pg_osd: pg.target_set)
  547. {
  548. if (pg_osd)
  549. {
  550. pg.cur_peers.push_back(pg_osd);
  551. }
  552. }
  553. }
  554. if (pg.state == PG_OFFLINE && !this->pg_config_applied)
  555. {
  556. apply_pg_config();
  557. }
  558. report_pg_states();
  559. }