Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

858 lines
31 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 (see README.md for details)
  3. #include "osd.h"
  4. #include "base64.h"
  5. #include "etcd_state_client.h"
  6. #include "osd_rmw.h"
  7. // Startup sequence:
  8. // Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state
  9. // -> Load PG config -> Report&lock PG states -> Load peers -> Connect to peers -> Peer PGs
  10. // Event handling
  11. // Wait for PG changes -> Start/Stop PGs when requested
  12. // Peer connection is lost -> Reload connection data -> Try to reconnect
  13. void osd_t::init_cluster()
  14. {
  15. if (!st_cli.etcd_addresses.size())
  16. {
  17. if (run_primary)
  18. {
  19. // Test version of clustering code with 1 pool, 1 PG and 2 peers
  20. // Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
  21. std::string peerstr = config["peers"];
  22. while (peerstr.size())
  23. {
  24. int pos = peerstr.find(',');
  25. parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos));
  26. peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
  27. }
  28. if (st_cli.peer_states.size() < 2)
  29. {
  30. throw std::runtime_error("run_primary requires at least 2 peers");
  31. }
  32. pgs[{ 1, 1 }] = (pg_t){
  33. .state = PG_PEERING,
  34. .scheme = POOL_SCHEME_XOR,
  35. .pg_cursize = 0,
  36. .pg_size = 3,
  37. .pg_minsize = 2,
  38. .parity_chunks = 1,
  39. .pool_id = 1,
  40. .pg_num = 1,
  41. .target_set = { 1, 2, 3 },
  42. .cur_set = { 0, 0, 0 },
  43. };
  44. st_cli.pool_config[1] = (pool_config_t){
  45. .exists = true,
  46. .id = 1,
  47. .name = "testpool",
  48. .scheme = POOL_SCHEME_XOR,
  49. .pg_size = 3,
  50. .pg_minsize = 2,
  51. .pg_count = 1,
  52. .real_pg_count = 1,
  53. };
  54. report_pg_state(pgs[{ 1, 1 }]);
  55. pg_counts[1] = 1;
  56. }
  57. bind_socket();
  58. }
  59. else
  60. {
  61. st_cli.tfd = tfd;
  62. st_cli.log_level = log_level;
  63. st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); };
  64. st_cli.on_change_pg_history_hook = [this](pool_id_t pool_id, pg_num_t pg_num) { on_change_pg_history_hook(pool_id, pg_num); };
  65. st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); };
  66. st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
  67. st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); };
  68. st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
  69. peering_state = OSD_LOADING_PGS;
  70. st_cli.load_global_config();
  71. }
  72. if (run_primary && autosync_interval > 0)
  73. {
  74. this->tfd->set_timer(autosync_interval*1000, true, [this](int timer_id)
  75. {
  76. autosync();
  77. });
  78. }
  79. }
  80. void osd_t::parse_test_peer(std::string peer)
  81. {
  82. // OSD_NUM:IP:PORT
  83. int pos1 = peer.find(':');
  84. int pos2 = peer.find(':', pos1+1);
  85. if (pos1 < 0 || pos2 < 0)
  86. throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT");
  87. std::string addr = peer.substr(pos1+1, pos2-pos1-1);
  88. std::string osd_num_str = peer.substr(0, pos1);
  89. std::string port_str = peer.substr(pos2+1);
  90. osd_num_t peer_osd = strtoull(osd_num_str.c_str(), NULL, 10);
  91. if (!peer_osd)
  92. throw new std::runtime_error("Could not parse OSD peer osd_num");
  93. else if (st_cli.peer_states.find(peer_osd) != st_cli.peer_states.end())
  94. throw std::runtime_error("Same osd number "+std::to_string(peer_osd)+" specified twice in peers");
  95. int port = strtoull(port_str.c_str(), NULL, 10);
  96. if (!port)
  97. throw new std::runtime_error("Could not parse OSD peer port");
  98. st_cli.peer_states[peer_osd] = json11::Json::object {
  99. { "state", "up" },
  100. { "addresses", json11::Json::array { addr } },
  101. { "port", port },
  102. };
  103. c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
  104. }
  105. json11::Json osd_t::get_osd_state()
  106. {
  107. std::vector<char> hostname;
  108. hostname.resize(1024);
  109. while (gethostname(hostname.data(), hostname.size()) < 0 && errno == ENAMETOOLONG)
  110. hostname.resize(hostname.size()+1024);
  111. hostname.resize(strnlen(hostname.data(), hostname.size()));
  112. json11::Json::object st;
  113. st["state"] = "up";
  114. if (bind_address != "0.0.0.0")
  115. st["addresses"] = json11::Json::array { bind_address };
  116. else
  117. st["addresses"] = getifaddr_list();
  118. st["host"] = std::string(hostname.data(), hostname.size());
  119. st["port"] = listening_port;
  120. st["primary_enabled"] = run_primary;
  121. st["blockstore_enabled"] = bs ? true : false;
  122. return st;
  123. }
  124. json11::Json osd_t::get_statistics()
  125. {
  126. json11::Json::object st;
  127. timespec ts;
  128. clock_gettime(CLOCK_REALTIME, &ts);
  129. char time_str[50] = { 0 };
  130. sprintf(time_str, "%ld.%03ld", ts.tv_sec, ts.tv_nsec/1000000);
  131. st["time"] = time_str;
  132. st["blockstore_ready"] = bs->is_started();
  133. if (bs)
  134. {
  135. st["size"] = bs->get_block_count() * bs->get_block_size();
  136. st["free"] = bs->get_free_block_count() * bs->get_block_size();
  137. }
  138. st["host"] = self_state["host"];
  139. json11::Json::object op_stats, subop_stats;
  140. for (int i = 0; i <= OSD_OP_MAX; i++)
  141. {
  142. op_stats[osd_op_names[i]] = json11::Json::object {
  143. { "count", c_cli.stats.op_stat_count[i] },
  144. { "usec", c_cli.stats.op_stat_sum[i] },
  145. { "bytes", c_cli.stats.op_stat_bytes[i] },
  146. };
  147. }
  148. for (int i = 0; i <= OSD_OP_MAX; i++)
  149. {
  150. subop_stats[osd_op_names[i]] = json11::Json::object {
  151. { "count", c_cli.stats.subop_stat_count[i] },
  152. { "usec", c_cli.stats.subop_stat_sum[i] },
  153. };
  154. }
  155. st["op_stats"] = op_stats;
  156. st["subop_stats"] = subop_stats;
  157. st["recovery_stats"] = json11::Json::object {
  158. { recovery_stat_names[0], json11::Json::object {
  159. { "count", recovery_stat_count[0][0] },
  160. { "bytes", recovery_stat_bytes[0][0] },
  161. } },
  162. { recovery_stat_names[1], json11::Json::object {
  163. { "count", recovery_stat_count[0][1] },
  164. { "bytes", recovery_stat_bytes[0][1] },
  165. } },
  166. };
  167. return st;
  168. }
  169. void osd_t::report_statistics()
  170. {
  171. if (etcd_reporting_stats)
  172. {
  173. return;
  174. }
  175. etcd_reporting_stats = true;
  176. // Report space usage statistics as a whole
  177. // Maybe we'll report it using deltas if we tune for a lot of inodes at some point
  178. json11::Json::object inode_space;
  179. for (auto kv: bs->get_inode_space_stats())
  180. {
  181. inode_space[std::to_string(kv.first)] = kv.second;
  182. }
  183. json11::Json::object inode_ops;
  184. for (auto kv: inode_stats)
  185. {
  186. inode_ops[std::to_string(kv.first)] = json11::Json::object {
  187. { "read", json11::Json::object {
  188. { "count", kv.second.op_count[INODE_STATS_READ] },
  189. { "usec", kv.second.op_sum[INODE_STATS_READ] },
  190. { "bytes", kv.second.op_bytes[INODE_STATS_READ] },
  191. } },
  192. { "write", json11::Json::object {
  193. { "count", kv.second.op_count[INODE_STATS_WRITE] },
  194. { "usec", kv.second.op_sum[INODE_STATS_WRITE] },
  195. { "bytes", kv.second.op_bytes[INODE_STATS_WRITE] },
  196. } },
  197. { "delete", json11::Json::object {
  198. { "count", kv.second.op_count[INODE_STATS_DELETE] },
  199. { "usec", kv.second.op_sum[INODE_STATS_DELETE] },
  200. { "bytes", kv.second.op_bytes[INODE_STATS_DELETE] },
  201. } },
  202. };
  203. }
  204. json11::Json::array txn = { json11::Json::object {
  205. { "request_put", json11::Json::object {
  206. { "key", base64_encode(st_cli.etcd_prefix+"/osd/stats/"+std::to_string(osd_num)) },
  207. { "value", base64_encode(get_statistics().dump()) },
  208. } },
  209. { "request_put", json11::Json::object {
  210. { "key", base64_encode(st_cli.etcd_prefix+"/osd/space/"+std::to_string(osd_num)) },
  211. { "value", base64_encode(json11::Json(inode_space).dump()) },
  212. } },
  213. { "request_put", json11::Json::object {
  214. { "key", base64_encode(st_cli.etcd_prefix+"/osd/inodestats/"+std::to_string(osd_num)) },
  215. { "value", base64_encode(json11::Json(inode_ops).dump()) },
  216. } },
  217. } };
  218. for (auto & p: pgs)
  219. {
  220. auto & pg = p.second;
  221. if (pg.state & (PG_OFFLINE | PG_STARTING))
  222. {
  223. // Don't report statistics for offline PGs
  224. continue;
  225. }
  226. json11::Json::object pg_stats;
  227. pg_stats["object_count"] = pg.total_count;
  228. pg_stats["clean_count"] = pg.clean_count;
  229. pg_stats["misplaced_count"] = pg.misplaced_objects.size();
  230. pg_stats["degraded_count"] = pg.degraded_objects.size();
  231. pg_stats["incomplete_count"] = pg.incomplete_objects.size();
  232. pg_stats["write_osd_set"] = pg.cur_set;
  233. txn.push_back(json11::Json::object {
  234. { "request_put", json11::Json::object {
  235. { "key", base64_encode(st_cli.etcd_prefix+"/pg/stats/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num)) },
  236. { "value", base64_encode(json11::Json(pg_stats).dump()) },
  237. } }
  238. });
  239. }
  240. st_cli.etcd_txn(json11::Json::object { { "success", txn } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res)
  241. {
  242. etcd_reporting_stats = false;
  243. if (err != "")
  244. {
  245. printf("[OSD %lu] Error reporting state to etcd: %s\n", this->osd_num, err.c_str());
  246. // Retry indefinitely
  247. tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
  248. {
  249. report_statistics();
  250. });
  251. }
  252. else if (res["error"].string_value() != "")
  253. {
  254. printf("[OSD %lu] Error reporting state to etcd: %s\n", this->osd_num, res["error"].string_value().c_str());
  255. force_stop(1);
  256. }
  257. });
  258. }
  259. void osd_t::on_change_osd_state_hook(osd_num_t peer_osd)
  260. {
  261. if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end())
  262. {
  263. c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]);
  264. }
  265. }
  266. void osd_t::on_change_etcd_state_hook(json11::Json::object & changes)
  267. {
  268. // FIXME apply config changes in runtime (maybe, some)
  269. if (run_primary)
  270. {
  271. apply_pg_count();
  272. apply_pg_config();
  273. }
  274. }
  275. void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
  276. {
  277. auto pg_it = pgs.find({
  278. .pool_id = pool_id,
  279. .pg_num = pg_num,
  280. });
  281. if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch &&
  282. st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg_it->second.epoch)
  283. {
  284. pg_it->second.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
  285. object_id oid = { 0 };
  286. bool first = true;
  287. for (auto op: pg_it->second.write_queue)
  288. {
  289. if (first || oid != op.first)
  290. {
  291. oid = op.first;
  292. first = false;
  293. continue_primary_write(op.second);
  294. }
  295. }
  296. }
  297. }
  298. void osd_t::on_load_config_hook(json11::Json::object & global_config)
  299. {
  300. blockstore_config_t osd_config = this->config;
  301. for (auto & cfg_var: global_config)
  302. {
  303. if (this->config.find(cfg_var.first) == this->config.end())
  304. {
  305. if (cfg_var.second.is_string())
  306. {
  307. osd_config[cfg_var.first] = cfg_var.second.string_value();
  308. }
  309. else
  310. {
  311. osd_config[cfg_var.first] = cfg_var.second.dump();
  312. }
  313. }
  314. }
  315. parse_config(osd_config);
  316. bind_socket();
  317. acquire_lease();
  318. }
  319. // Acquire lease
  320. void osd_t::acquire_lease()
  321. {
  322. // Maximum lease TTL is (report interval) + retries * (timeout + repeat interval)
  323. st_cli.etcd_call("/lease/grant", json11::Json::object {
  324. { "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 }
  325. }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
  326. {
  327. if (err != "" || data["ID"].string_value() == "")
  328. {
  329. printf("Error acquiring a lease from etcd: %s\n", err.c_str());
  330. tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id)
  331. {
  332. acquire_lease();
  333. });
  334. return;
  335. }
  336. etcd_lease_id = data["ID"].string_value();
  337. create_osd_state();
  338. });
  339. printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].c_str(), etcd_report_interval);
  340. tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
  341. {
  342. renew_lease();
  343. });
  344. }
  345. // Report "up" state once, then keep it alive using the lease
  346. // Do it first to allow "monitors" check it when moving PGs
  347. void osd_t::create_osd_state()
  348. {
  349. std::string state_key = base64_encode(st_cli.etcd_prefix+"/osd/state/"+std::to_string(osd_num));
  350. self_state = get_osd_state();
  351. st_cli.etcd_txn(json11::Json::object {
  352. // Check that the state key does not exist
  353. { "compare", json11::Json::array {
  354. json11::Json::object {
  355. { "target", "CREATE" },
  356. { "create_revision", 0 },
  357. { "key", state_key },
  358. }
  359. } },
  360. { "success", json11::Json::array {
  361. json11::Json::object {
  362. { "request_put", json11::Json::object {
  363. { "key", state_key },
  364. { "value", base64_encode(self_state.dump()) },
  365. { "lease", etcd_lease_id },
  366. } }
  367. },
  368. } },
  369. { "failure", json11::Json::array {
  370. json11::Json::object {
  371. { "request_range", json11::Json::object {
  372. { "key", state_key },
  373. } }
  374. },
  375. } },
  376. }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
  377. {
  378. if (err != "")
  379. {
  380. etcd_failed_attempts++;
  381. printf("Error creating OSD state key: %s\n", err.c_str());
  382. if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS)
  383. {
  384. // Die
  385. throw std::runtime_error("Cluster connection failed");
  386. }
  387. // Retry
  388. tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id)
  389. {
  390. create_osd_state();
  391. });
  392. return;
  393. }
  394. if (!data["succeeded"].bool_value())
  395. {
  396. // OSD is already up
  397. auto kv = st_cli.parse_etcd_kv(data["responses"][0]["response_range"]["kvs"][0]);
  398. printf("Key %s already exists in etcd, OSD %lu is still up\n", kv.key.c_str(), this->osd_num);
  399. int64_t port = kv.value["port"].int64_value();
  400. for (auto & addr: kv.value["addresses"].array_items())
  401. {
  402. printf(" listening at: %s:%ld\n", addr.string_value().c_str(), port);
  403. }
  404. force_stop(0);
  405. return;
  406. }
  407. if (run_primary)
  408. {
  409. st_cli.load_pgs();
  410. }
  411. });
  412. }
  413. // Renew lease
  414. void osd_t::renew_lease()
  415. {
  416. st_cli.etcd_call("/lease/keepalive", json11::Json::object {
  417. { "ID", etcd_lease_id }
  418. }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data)
  419. {
  420. if (err == "" && data["result"]["TTL"].string_value() == "")
  421. {
  422. // Die
  423. throw std::runtime_error("etcd lease has expired");
  424. }
  425. if (err != "")
  426. {
  427. etcd_failed_attempts++;
  428. printf("Error renewing etcd lease: %s\n", err.c_str());
  429. if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS)
  430. {
  431. // Die
  432. throw std::runtime_error("Cluster connection failed");
  433. }
  434. // Retry
  435. tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id)
  436. {
  437. renew_lease();
  438. });
  439. }
  440. else
  441. {
  442. etcd_failed_attempts = 0;
  443. report_statistics();
  444. }
  445. });
  446. }
  447. void osd_t::force_stop(int exitcode)
  448. {
  449. if (etcd_lease_id != "")
  450. {
  451. st_cli.etcd_call("/kv/lease/revoke", json11::Json::object {
  452. { "ID", etcd_lease_id }
  453. }, ETCD_QUICK_TIMEOUT, [this, exitcode](std::string err, json11::Json data)
  454. {
  455. if (err != "")
  456. {
  457. printf("Error revoking etcd lease: %s\n", err.c_str());
  458. }
  459. printf("[OSD %lu] Force stopping\n", this->osd_num);
  460. exit(exitcode);
  461. });
  462. }
  463. else
  464. {
  465. printf("[OSD %lu] Force stopping\n", this->osd_num);
  466. exit(exitcode);
  467. }
  468. }
  469. json11::Json osd_t::on_load_pgs_checks_hook()
  470. {
  471. assert(this->pgs.size() == 0);
  472. json11::Json::array checks = {
  473. json11::Json::object {
  474. { "target", "LEASE" },
  475. { "lease", etcd_lease_id },
  476. { "key", base64_encode(st_cli.etcd_prefix+"/osd/state/"+std::to_string(osd_num)) },
  477. }
  478. };
  479. return checks;
  480. }
  481. void osd_t::on_load_pgs_hook(bool success)
  482. {
  483. if (!success)
  484. {
  485. printf("Error loading PGs from etcd: lease expired\n");
  486. force_stop(1);
  487. }
  488. else
  489. {
  490. peering_state &= ~OSD_LOADING_PGS;
  491. apply_pg_count();
  492. apply_pg_config();
  493. }
  494. }
  495. void osd_t::apply_pg_count()
  496. {
  497. for (auto & pool_item: st_cli.pool_config)
  498. {
  499. if (pool_item.second.real_pg_count != 0 &&
  500. pool_item.second.real_pg_count != pg_counts[pool_item.first])
  501. {
  502. // Check that all pool PGs are offline. It is not allowed to change PG count when any PGs are online
  503. // The external tool must wait for all PGs to come down before changing PG count
  504. // If it doesn't wait, a restarted OSD may apply the new count immediately which will lead to bugs
  505. // So an OSD just dies if it detects PG count change while there are active PGs
  506. int still_active = 0;
  507. for (auto & kv: pgs)
  508. {
  509. if (kv.first.pool_id == pool_item.first && (kv.second.state & PG_ACTIVE))
  510. {
  511. still_active++;
  512. }
  513. }
  514. if (still_active > 0)
  515. {
  516. printf(
  517. "[OSD %lu] PG count change detected for pool %u (new is %lu, old is %u),"
  518. " but %u PG(s) are still active. This is not allowed. Exiting\n",
  519. this->osd_num, pool_item.first, pool_item.second.real_pg_count, pg_counts[pool_item.first], still_active
  520. );
  521. force_stop(1);
  522. return;
  523. }
  524. }
  525. this->pg_counts[pool_item.first] = pool_item.second.real_pg_count;
  526. }
  527. }
  528. void osd_t::apply_pg_config()
  529. {
  530. bool all_applied = true;
  531. for (auto & pool_item: st_cli.pool_config)
  532. {
  533. auto pool_id = pool_item.first;
  534. for (auto & kv: pool_item.second.pg_config)
  535. {
  536. pg_num_t pg_num = kv.first;
  537. auto & pg_cfg = kv.second;
  538. bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num &&
  539. !pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num);
  540. auto pg_it = this->pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
  541. bool currently_taken = pg_it != this->pgs.end() && pg_it->second.state != PG_OFFLINE;
  542. if (currently_taken && !take)
  543. {
  544. // Stop this PG
  545. stop_pg(pg_it->second);
  546. }
  547. else if (take)
  548. {
  549. // Take this PG
  550. std::set<osd_num_t> all_peers;
  551. for (osd_num_t pg_osd: pg_cfg.target_set)
  552. {
  553. if (pg_osd != 0)
  554. {
  555. all_peers.insert(pg_osd);
  556. }
  557. }
  558. for (osd_num_t pg_osd: pg_cfg.all_peers)
  559. {
  560. if (pg_osd != 0)
  561. {
  562. all_peers.insert(pg_osd);
  563. }
  564. }
  565. for (auto & hist_item: pg_cfg.target_history)
  566. {
  567. for (auto pg_osd: hist_item)
  568. {
  569. if (pg_osd != 0)
  570. {
  571. all_peers.insert(pg_osd);
  572. }
  573. }
  574. }
  575. if (currently_taken)
  576. {
  577. if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING))
  578. {
  579. if (pg_it->second.target_set == pg_cfg.target_set)
  580. {
  581. // No change in osd_set; history changes are ignored
  582. continue;
  583. }
  584. else
  585. {
  586. // Stop PG, reapply change after stopping
  587. stop_pg(pg_it->second);
  588. all_applied = false;
  589. continue;
  590. }
  591. }
  592. else if (pg_it->second.state & PG_STOPPING)
  593. {
  594. // Reapply change after stopping
  595. all_applied = false;
  596. continue;
  597. }
  598. else if (pg_it->second.state & PG_STARTING)
  599. {
  600. if (pg_cfg.cur_primary == this->osd_num)
  601. {
  602. // PG locked, continue
  603. }
  604. else
  605. {
  606. // Reapply change after locking the PG
  607. all_applied = false;
  608. continue;
  609. }
  610. }
  611. else
  612. {
  613. throw std::runtime_error("Unexpected PG "+std::to_string(pg_num)+" state: "+std::to_string(pg_it->second.state));
  614. }
  615. }
  616. auto & pg = this->pgs[{ .pool_id = pool_id, .pg_num = pg_num }];
  617. pg = (pg_t){
  618. .state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING,
  619. .scheme = pool_item.second.scheme,
  620. .pg_cursize = 0,
  621. .pg_size = pool_item.second.pg_size,
  622. .pg_minsize = pool_item.second.pg_minsize,
  623. .parity_chunks = pool_item.second.parity_chunks,
  624. .pool_id = pool_id,
  625. .pg_num = pg_num,
  626. .reported_epoch = pg_cfg.epoch,
  627. .target_history = pg_cfg.target_history,
  628. .all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
  629. .target_set = pg_cfg.target_set,
  630. };
  631. if (pg.scheme == POOL_SCHEME_JERASURE)
  632. {
  633. use_jerasure(pg.pg_size, pg.pg_size-pg.parity_chunks, true);
  634. }
  635. this->pg_state_dirty.insert({ .pool_id = pool_id, .pg_num = pg_num });
  636. pg.print_state();
  637. if (pg_cfg.cur_primary == this->osd_num)
  638. {
  639. // Add peers
  640. for (auto pg_osd: all_peers)
  641. {
  642. if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end())
  643. {
  644. c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
  645. }
  646. }
  647. start_pg_peering(pg);
  648. }
  649. else
  650. {
  651. // Reapply change after locking the PG
  652. all_applied = false;
  653. }
  654. }
  655. }
  656. }
  657. report_pg_states();
  658. this->pg_config_applied = all_applied;
  659. }
  660. void osd_t::report_pg_states()
  661. {
  662. if (etcd_reporting_pg_state || !this->pg_state_dirty.size() || !st_cli.etcd_addresses.size())
  663. {
  664. return;
  665. }
  666. std::vector<std::pair<pool_pg_num_t,bool>> reporting_pgs;
  667. json11::Json::array checks;
  668. json11::Json::array success;
  669. json11::Json::array failure;
  670. for (auto it = pg_state_dirty.begin(); it != pg_state_dirty.end(); it++)
  671. {
  672. auto pg_it = this->pgs.find(*it);
  673. if (pg_it == this->pgs.end())
  674. {
  675. continue;
  676. }
  677. auto & pg = pg_it->second;
  678. reporting_pgs.push_back({ *it, pg.history_changed });
  679. std::string state_key_base64 = base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num));
  680. if (pg.state == PG_STARTING)
  681. {
  682. // Check that the PG key does not exist
  683. // Failed check indicates an unsuccessful PG lock attempt in this case
  684. checks.push_back(json11::Json::object {
  685. { "target", "VERSION" },
  686. { "version", 0 },
  687. { "key", state_key_base64 },
  688. });
  689. }
  690. else
  691. {
  692. // Check that the key is ours
  693. // Failed check indicates success for OFFLINE pgs (PG lock is already deleted)
  694. // and an unexpected race condition for started pgs (PG lock is held by someone else)
  695. checks.push_back(json11::Json::object {
  696. { "target", "LEASE" },
  697. { "lease", etcd_lease_id },
  698. { "key", state_key_base64 },
  699. });
  700. }
  701. if (pg.state == PG_OFFLINE)
  702. {
  703. success.push_back(json11::Json::object {
  704. { "request_delete_range", json11::Json::object {
  705. { "key", state_key_base64 },
  706. } }
  707. });
  708. }
  709. else
  710. {
  711. json11::Json::array pg_state_keywords;
  712. for (int i = 0; i < pg_state_bit_count; i++)
  713. {
  714. if (pg.state & pg_state_bits[i])
  715. {
  716. pg_state_keywords.push_back(pg_state_names[i]);
  717. }
  718. }
  719. success.push_back(json11::Json::object {
  720. { "request_put", json11::Json::object {
  721. { "key", state_key_base64 },
  722. { "value", base64_encode(json11::Json(json11::Json::object {
  723. { "primary", this->osd_num },
  724. { "state", pg_state_keywords },
  725. { "peers", pg.cur_peers },
  726. }).dump()) },
  727. { "lease", etcd_lease_id },
  728. } }
  729. });
  730. if (pg.history_changed)
  731. {
  732. // Prevent race conditions (for the case when the monitor is updating this key at the same time)
  733. pg.history_changed = false;
  734. std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num));
  735. json11::Json::object history_value = {
  736. { "epoch", pg.epoch },
  737. { "all_peers", pg.all_peers },
  738. { "osd_sets", pg.target_history },
  739. };
  740. checks.push_back(json11::Json::object {
  741. { "target", "MOD" },
  742. { "key", history_key },
  743. { "result", "LESS" },
  744. { "mod_revision", st_cli.etcd_watch_revision+1 },
  745. });
  746. success.push_back(json11::Json::object {
  747. { "request_put", json11::Json::object {
  748. { "key", history_key },
  749. { "value", base64_encode(json11::Json(history_value).dump()) },
  750. } }
  751. });
  752. }
  753. }
  754. failure.push_back(json11::Json::object {
  755. { "request_range", json11::Json::object {
  756. { "key", state_key_base64 },
  757. } }
  758. });
  759. }
  760. pg_state_dirty.clear();
  761. etcd_reporting_pg_state = true;
  762. st_cli.etcd_txn(json11::Json::object {
  763. { "compare", checks }, { "success", success }, { "failure", failure }
  764. }, ETCD_QUICK_TIMEOUT, [this, reporting_pgs](std::string err, json11::Json data)
  765. {
  766. etcd_reporting_pg_state = false;
  767. if (!data["succeeded"].bool_value())
  768. {
  769. // One of PG state updates failed, put dirty flags back
  770. for (auto pp: reporting_pgs)
  771. {
  772. this->pg_state_dirty.insert(pp.first);
  773. if (pp.second)
  774. {
  775. auto pg_it = this->pgs.find(pp.first);
  776. if (pg_it != this->pgs.end())
  777. {
  778. pg_it->second.history_changed = true;
  779. }
  780. }
  781. }
  782. for (auto & res: data["responses"].array_items())
  783. {
  784. if (res["kvs"].array_items().size())
  785. {
  786. auto kv = st_cli.parse_etcd_kv(res["kvs"][0]);
  787. if (kv.key.substr(st_cli.etcd_prefix.length()+10) == st_cli.etcd_prefix+"/pg/state/")
  788. {
  789. pool_id_t pool_id = 0;
  790. pg_num_t pg_num = 0;
  791. char null_byte = 0;
  792. sscanf(kv.key.c_str() + st_cli.etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte);
  793. if (null_byte == 0)
  794. {
  795. auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
  796. if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING)
  797. {
  798. // Live PG state update failed
  799. printf("Failed to report state of pool %u PG %u which is live. Race condition detected, exiting\n", pool_id, pg_num);
  800. force_stop(1);
  801. return;
  802. }
  803. }
  804. }
  805. }
  806. }
  807. // Retry after a short pause (hope we'll get some updates and update PG states accordingly)
  808. tfd->set_timer(500, false, [this](int) { report_pg_states(); });
  809. }
  810. else
  811. {
  812. // Success. We'll get our changes back via the watcher and react to them
  813. for (auto pp: reporting_pgs)
  814. {
  815. auto pg_it = this->pgs.find(pp.first);
  816. if (pg_it != this->pgs.end())
  817. {
  818. if (pg_it->second.state == PG_OFFLINE)
  819. {
  820. // Remove offline PGs after reporting their state
  821. this->pgs.erase(pg_it);
  822. if (pg_it->second.scheme == POOL_SCHEME_JERASURE)
  823. {
  824. use_jerasure(pg_it->second.pg_size, pg_it->second.pg_size-pg_it->second.parity_chunks, false);
  825. }
  826. }
  827. }
  828. }
  829. // Push other PG state updates, if any
  830. report_pg_states();
  831. if (!this->pg_state_dirty.size())
  832. {
  833. // Update statistics
  834. report_statistics();
  835. }
  836. }
  837. });
  838. }