Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

543 lines
17 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include <netinet/tcp.h>
  4. #include <sys/epoll.h>
  5. #include <algorithm>
  6. #include "base64.h"
  7. #include "osd.h"
  8. // Peering loop
  9. void osd_t::handle_peers()
  10. {
  11. if (peering_state & OSD_PEERING_PGS)
  12. {
  13. bool still = false;
  14. for (auto & p: pgs)
  15. {
  16. if (p.second.state == PG_PEERING)
  17. {
  18. if (!p.second.peering_state->list_ops.size())
  19. {
  20. p.second.calc_object_states(log_level);
  21. report_pg_state(p.second);
  22. incomplete_objects += p.second.incomplete_objects.size();
  23. misplaced_objects += p.second.misplaced_objects.size();
  24. // FIXME: degraded objects may currently include misplaced, too! Report them separately?
  25. degraded_objects += p.second.degraded_objects.size();
  26. if ((p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN)) == (PG_ACTIVE | PG_HAS_UNCLEAN))
  27. peering_state = peering_state | OSD_FLUSHING_PGS;
  28. else if (p.second.state & PG_ACTIVE)
  29. peering_state = peering_state | OSD_RECOVERING;
  30. }
  31. else
  32. {
  33. still = true;
  34. }
  35. }
  36. }
  37. if (!still)
  38. {
  39. // Done all PGs
  40. peering_state = peering_state & ~OSD_PEERING_PGS;
  41. }
  42. }
  43. if ((peering_state & OSD_FLUSHING_PGS) && !readonly)
  44. {
  45. bool still = false;
  46. for (auto & p: pgs)
  47. {
  48. if ((p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN)) == (PG_ACTIVE | PG_HAS_UNCLEAN))
  49. {
  50. if (!p.second.flush_batch)
  51. {
  52. submit_pg_flush_ops(p.second);
  53. }
  54. still = true;
  55. }
  56. }
  57. if (!still)
  58. {
  59. peering_state = peering_state & ~OSD_FLUSHING_PGS | OSD_RECOVERING;
  60. }
  61. }
  62. if ((peering_state & OSD_RECOVERING) && !readonly)
  63. {
  64. if (!continue_recovery())
  65. {
  66. peering_state = peering_state & ~OSD_RECOVERING;
  67. }
  68. }
  69. }
  70. void osd_t::repeer_pgs(osd_num_t peer_osd)
  71. {
  72. // Re-peer affected PGs
  73. for (auto & p: pgs)
  74. {
  75. bool repeer = false;
  76. if (p.second.state & (PG_PEERING | PG_ACTIVE | PG_INCOMPLETE))
  77. {
  78. for (osd_num_t pg_osd: p.second.all_peers)
  79. {
  80. if (pg_osd == peer_osd)
  81. {
  82. repeer = true;
  83. break;
  84. }
  85. }
  86. if (repeer)
  87. {
  88. // Repeer this pg
  89. printf("[PG %u/%u] Repeer because of OSD %lu\n", p.second.pool_id, p.second.pg_num, peer_osd);
  90. start_pg_peering(p.second);
  91. }
  92. }
  93. }
  94. }
  95. // Repeer on each connect/disconnect peer event
  96. void osd_t::start_pg_peering(pg_t & pg)
  97. {
  98. pg.state = PG_PEERING;
  99. this->peering_state |= OSD_PEERING_PGS;
  100. report_pg_state(pg);
  101. // Reset PG state
  102. pg.cur_peers.clear();
  103. pg.state_dict.clear();
  104. incomplete_objects -= pg.incomplete_objects.size();
  105. misplaced_objects -= pg.misplaced_objects.size();
  106. degraded_objects -= pg.degraded_objects.size();
  107. pg.incomplete_objects.clear();
  108. pg.misplaced_objects.clear();
  109. pg.degraded_objects.clear();
  110. pg.flush_actions.clear();
  111. pg.ver_override.clear();
  112. if (pg.flush_batch)
  113. {
  114. delete pg.flush_batch;
  115. }
  116. pg.flush_batch = NULL;
  117. for (auto p: pg.write_queue)
  118. {
  119. cancel_primary_write(p.second);
  120. }
  121. pg.write_queue.clear();
  122. uint64_t pg_stripe_size = st_cli.pool_config[pg.pool_id].pg_stripe_size;
  123. for (auto it = unstable_writes.begin(); it != unstable_writes.end(); )
  124. {
  125. // Forget this PG's unstable writes
  126. if (INODE_POOL(it->first.oid.inode) == pg.pool_id && map_to_pg(it->first.oid, pg_stripe_size) == pg.pg_num)
  127. unstable_writes.erase(it++);
  128. else
  129. it++;
  130. }
  131. dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  132. // Drop connections of clients who have this PG in dirty_pgs
  133. if (immediate_commit != IMMEDIATE_ALL)
  134. {
  135. std::vector<int> to_stop;
  136. for (auto & cp: c_cli.clients)
  137. {
  138. if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end())
  139. {
  140. to_stop.push_back(cp.first);
  141. }
  142. }
  143. for (auto peer_fd: to_stop)
  144. {
  145. c_cli.stop_client(peer_fd);
  146. }
  147. }
  148. // Calculate current write OSD set
  149. pg.pg_cursize = 0;
  150. pg.cur_set.resize(pg.target_set.size());
  151. pg.cur_loc_set.clear();
  152. for (int role = 0; role < pg.target_set.size(); role++)
  153. {
  154. pg.cur_set[role] = pg.target_set[role] == this->osd_num ||
  155. c_cli.osd_peer_fds.find(pg.target_set[role]) != c_cli.osd_peer_fds.end() ? pg.target_set[role] : 0;
  156. if (pg.cur_set[role] != 0)
  157. {
  158. pg.pg_cursize++;
  159. pg.cur_loc_set.push_back({
  160. .role = (uint64_t)role,
  161. .osd_num = pg.cur_set[role],
  162. .outdated = false,
  163. });
  164. }
  165. }
  166. if (pg.target_history.size())
  167. {
  168. // Refuse to start PG if no peers are available from any of the historical OSD sets
  169. // (PG history is kept up to the latest active+clean state)
  170. for (auto & history_set: pg.target_history)
  171. {
  172. bool found = false;
  173. for (auto history_osd: history_set)
  174. {
  175. if (history_osd != 0 && c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end())
  176. {
  177. found = true;
  178. break;
  179. }
  180. }
  181. if (!found)
  182. {
  183. pg.state = PG_INCOMPLETE;
  184. report_pg_state(pg);
  185. return;
  186. }
  187. }
  188. }
  189. if (pg.pg_cursize < pg.pg_minsize)
  190. {
  191. pg.state = PG_INCOMPLETE;
  192. report_pg_state(pg);
  193. return;
  194. }
  195. std::set<osd_num_t> cur_peers;
  196. for (auto pg_osd: pg.all_peers)
  197. {
  198. if (pg_osd == this->osd_num || c_cli.osd_peer_fds.find(pg_osd) != c_cli.osd_peer_fds.end())
  199. {
  200. cur_peers.insert(pg_osd);
  201. }
  202. else if (c_cli.wanted_peers.find(pg_osd) == c_cli.wanted_peers.end())
  203. {
  204. c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
  205. }
  206. }
  207. pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end());
  208. if (pg.peering_state)
  209. {
  210. // Adjust the peering operation that's still in progress - discard unneeded results
  211. for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end();)
  212. {
  213. if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end())
  214. {
  215. // Discard the result after completion, which, chances are, will be unsuccessful
  216. discard_list_subop(it->second);
  217. pg.peering_state->list_ops.erase(it++);
  218. }
  219. else
  220. it++;
  221. }
  222. for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end();)
  223. {
  224. if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end())
  225. {
  226. if (it->second.buf)
  227. {
  228. free(it->second.buf);
  229. }
  230. pg.peering_state->list_results.erase(it++);
  231. }
  232. else
  233. it++;
  234. }
  235. }
  236. if (pg.state == PG_INCOMPLETE)
  237. {
  238. if (pg.peering_state)
  239. {
  240. delete pg.peering_state;
  241. pg.peering_state = NULL;
  242. }
  243. return;
  244. }
  245. if (!pg.peering_state)
  246. {
  247. pg.peering_state = new pg_peering_state_t();
  248. pg.peering_state->pool_id = pg.pool_id;
  249. pg.peering_state->pg_num = pg.pg_num;
  250. }
  251. for (osd_num_t peer_osd: cur_peers)
  252. {
  253. if (pg.peering_state->list_ops.find(peer_osd) != pg.peering_state->list_ops.end() ||
  254. pg.peering_state->list_results.find(peer_osd) != pg.peering_state->list_results.end())
  255. {
  256. continue;
  257. }
  258. submit_sync_and_list_subop(peer_osd, pg.peering_state);
  259. }
  260. ringloop->wakeup();
  261. }
  262. void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
  263. {
  264. // Sync before listing, if not readonly
  265. if (readonly)
  266. {
  267. submit_list_subop(role_osd, ps);
  268. }
  269. else if (role_osd == this->osd_num)
  270. {
  271. // Self
  272. osd_op_t *op = new osd_op_t();
  273. op->op_type = 0;
  274. op->peer_fd = 0;
  275. clock_gettime(CLOCK_REALTIME, &op->tv_begin);
  276. op->bs_op = new blockstore_op_t();
  277. op->bs_op->opcode = BS_OP_SYNC;
  278. op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
  279. {
  280. if (bs_op->retval < 0)
  281. {
  282. printf("Local OP_SYNC failed: %d (%s)\n", bs_op->retval, strerror(-bs_op->retval));
  283. force_stop(1);
  284. return;
  285. }
  286. add_bs_subop_stats(op);
  287. delete op->bs_op;
  288. op->bs_op = NULL;
  289. delete op;
  290. ps->list_ops.erase(role_osd);
  291. submit_list_subop(role_osd, ps);
  292. };
  293. bs->enqueue_op(op->bs_op);
  294. ps->list_ops[role_osd] = op;
  295. }
  296. else
  297. {
  298. // Peer
  299. auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]);
  300. osd_op_t *op = new osd_op_t();
  301. op->op_type = OSD_OP_OUT;
  302. op->peer_fd = cl->peer_fd;
  303. op->req = (osd_any_op_t){
  304. .sec_sync = {
  305. .header = {
  306. .magic = SECONDARY_OSD_OP_MAGIC,
  307. .id = c_cli.next_subop_id++,
  308. .opcode = OSD_OP_SEC_SYNC,
  309. },
  310. },
  311. };
  312. op->callback = [this, ps, role_osd](osd_op_t *op)
  313. {
  314. if (op->reply.hdr.retval < 0)
  315. {
  316. // FIXME: Mark peer as failed and don't reconnect immediately after dropping the connection
  317. printf("Failed to sync OSD %lu: %ld (%s), disconnecting peer\n", role_osd, op->reply.hdr.retval, strerror(-op->reply.hdr.retval));
  318. ps->list_ops.erase(role_osd);
  319. c_cli.stop_client(op->peer_fd);
  320. delete op;
  321. return;
  322. }
  323. delete op;
  324. ps->list_ops.erase(role_osd);
  325. submit_list_subop(role_osd, ps);
  326. };
  327. c_cli.outbox_push(op);
  328. ps->list_ops[role_osd] = op;
  329. }
  330. }
  331. void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
  332. {
  333. if (role_osd == this->osd_num)
  334. {
  335. // Self
  336. osd_op_t *op = new osd_op_t();
  337. op->op_type = 0;
  338. op->peer_fd = 0;
  339. clock_gettime(CLOCK_REALTIME, &op->tv_begin);
  340. op->bs_op = new blockstore_op_t();
  341. op->bs_op->opcode = BS_OP_LIST;
  342. op->bs_op->oid.stripe = st_cli.pool_config[ps->pool_id].pg_stripe_size;
  343. op->bs_op->oid.inode = ((uint64_t)ps->pool_id << (64 - POOL_ID_BITS));
  344. op->bs_op->version = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1;
  345. op->bs_op->len = pg_counts[ps->pool_id];
  346. op->bs_op->offset = ps->pg_num-1;
  347. op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
  348. {
  349. if (op->bs_op->retval < 0)
  350. {
  351. throw std::runtime_error("local OP_LIST failed");
  352. }
  353. add_bs_subop_stats(op);
  354. printf(
  355. "[PG %u/%u] Got object list from OSD %lu (local): %d object versions (%lu of them stable)\n",
  356. ps->pool_id, ps->pg_num, role_osd, bs_op->retval, bs_op->version
  357. );
  358. ps->list_results[role_osd] = {
  359. .buf = (obj_ver_id*)op->bs_op->buf,
  360. .total_count = (uint64_t)op->bs_op->retval,
  361. .stable_count = op->bs_op->version,
  362. };
  363. ps->list_ops.erase(role_osd);
  364. delete op->bs_op;
  365. op->bs_op = NULL;
  366. delete op;
  367. };
  368. bs->enqueue_op(op->bs_op);
  369. ps->list_ops[role_osd] = op;
  370. }
  371. else
  372. {
  373. // Peer
  374. osd_op_t *op = new osd_op_t();
  375. op->op_type = OSD_OP_OUT;
  376. op->peer_fd = c_cli.osd_peer_fds[role_osd];
  377. op->req = (osd_any_op_t){
  378. .sec_list = {
  379. .header = {
  380. .magic = SECONDARY_OSD_OP_MAGIC,
  381. .id = c_cli.next_subop_id++,
  382. .opcode = OSD_OP_SEC_LIST,
  383. },
  384. .list_pg = ps->pg_num,
  385. .pg_count = pg_counts[ps->pool_id],
  386. .pg_stripe_size = st_cli.pool_config[ps->pool_id].pg_stripe_size,
  387. .min_inode = ((uint64_t)(ps->pool_id) << (64 - POOL_ID_BITS)),
  388. .max_inode = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1,
  389. },
  390. };
  391. op->callback = [this, ps, role_osd](osd_op_t *op)
  392. {
  393. if (op->reply.hdr.retval < 0)
  394. {
  395. printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval);
  396. ps->list_ops.erase(role_osd);
  397. c_cli.stop_client(op->peer_fd);
  398. delete op;
  399. return;
  400. }
  401. printf(
  402. "[PG %u/%u] Got object list from OSD %lu: %ld object versions (%lu of them stable)\n",
  403. ps->pool_id, ps->pg_num, role_osd, op->reply.hdr.retval, op->reply.sec_list.stable_count
  404. );
  405. ps->list_results[role_osd] = {
  406. .buf = (obj_ver_id*)op->buf,
  407. .total_count = (uint64_t)op->reply.hdr.retval,
  408. .stable_count = op->reply.sec_list.stable_count,
  409. };
  410. // set op->buf to NULL so it doesn't get freed
  411. op->buf = NULL;
  412. ps->list_ops.erase(role_osd);
  413. delete op;
  414. };
  415. c_cli.outbox_push(op);
  416. ps->list_ops[role_osd] = op;
  417. }
  418. }
  419. void osd_t::discard_list_subop(osd_op_t *list_op)
  420. {
  421. if (list_op->peer_fd == 0)
  422. {
  423. // Self
  424. list_op->bs_op->callback = [list_op](blockstore_op_t *bs_op)
  425. {
  426. if (list_op->bs_op->buf)
  427. free(list_op->bs_op->buf);
  428. delete list_op->bs_op;
  429. list_op->bs_op = NULL;
  430. delete list_op;
  431. };
  432. }
  433. else
  434. {
  435. // Peer
  436. list_op->callback = [](osd_op_t *list_op)
  437. {
  438. delete list_op;
  439. };
  440. }
  441. }
  442. bool osd_t::stop_pg(pg_t & pg)
  443. {
  444. if (pg.peering_state)
  445. {
  446. // Stop peering
  447. for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end();)
  448. {
  449. discard_list_subop(it->second);
  450. }
  451. for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end();)
  452. {
  453. if (it->second.buf)
  454. {
  455. free(it->second.buf);
  456. }
  457. }
  458. delete pg.peering_state;
  459. pg.peering_state = NULL;
  460. }
  461. if (!(pg.state & PG_ACTIVE))
  462. {
  463. return false;
  464. }
  465. pg.state = pg.state & ~PG_ACTIVE | PG_STOPPING;
  466. if (pg.inflight == 0 && !pg.flush_batch)
  467. {
  468. finish_stop_pg(pg);
  469. }
  470. else
  471. {
  472. report_pg_state(pg);
  473. }
  474. return true;
  475. }
  476. void osd_t::finish_stop_pg(pg_t & pg)
  477. {
  478. pg.state = PG_OFFLINE;
  479. report_pg_state(pg);
  480. }
  481. void osd_t::report_pg_state(pg_t & pg)
  482. {
  483. pg.print_state();
  484. this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
  485. if (pg.state == PG_ACTIVE && (pg.target_history.size() > 0 || pg.all_peers.size() > pg.target_set.size()))
  486. {
  487. // Clear history of active+clean PGs
  488. pg.history_changed = true;
  489. pg.target_history.clear();
  490. pg.all_peers = pg.target_set;
  491. pg.cur_peers = pg.target_set;
  492. }
  493. else if (pg.state == (PG_ACTIVE|PG_LEFT_ON_DEAD))
  494. {
  495. // Clear history of active+left_on_dead PGs, but leave dead OSDs in all_peers
  496. pg.history_changed = true;
  497. pg.target_history.clear();
  498. std::set<osd_num_t> dead_peers;
  499. for (auto pg_osd: pg.all_peers)
  500. {
  501. dead_peers.insert(pg_osd);
  502. }
  503. for (auto pg_osd: pg.cur_peers)
  504. {
  505. dead_peers.erase(pg_osd);
  506. }
  507. for (auto pg_osd: pg.target_set)
  508. {
  509. if (pg_osd)
  510. {
  511. dead_peers.insert(pg_osd);
  512. }
  513. }
  514. pg.all_peers.clear();
  515. pg.all_peers.insert(pg.all_peers.begin(), dead_peers.begin(), dead_peers.end());
  516. pg.cur_peers.clear();
  517. for (auto pg_osd: pg.target_set)
  518. {
  519. if (pg_osd)
  520. {
  521. pg.cur_peers.push_back(pg_osd);
  522. }
  523. }
  524. }
  525. if (pg.state == PG_OFFLINE && !this->pg_config_applied)
  526. {
  527. apply_pg_config();
  528. }
  529. report_pg_states();
  530. }