Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

767 lines
28 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
  3. #include "osd_ops.h"
  4. #include "pg_states.h"
  5. #include "etcd_state_client.h"
  6. #ifndef __MOCK__
  7. #include "http_client.h"
  8. #include "base64.h"
  9. #endif
  10. etcd_state_client_t::~etcd_state_client_t()
  11. {
  12. for (auto watch: watches)
  13. {
  14. delete watch;
  15. }
  16. watches.clear();
  17. etcd_watches_initialised = -1;
  18. #ifndef __MOCK__
  19. if (etcd_watch_ws)
  20. {
  21. etcd_watch_ws->close();
  22. etcd_watch_ws = NULL;
  23. }
  24. #endif
  25. }
  26. #ifndef __MOCK__
  27. etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
  28. {
  29. etcd_kv_t kv;
  30. kv.key = base64_decode(kv_json["key"].string_value());
  31. std::string json_err, json_text = base64_decode(kv_json["value"].string_value());
  32. kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err);
  33. if (json_err != "")
  34. {
  35. fprintf(stderr, "Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str());
  36. kv.key = "";
  37. }
  38. else
  39. kv.mod_revision = kv_json["mod_revision"].uint64_value();
  40. return kv;
  41. }
  42. void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback)
  43. {
  44. etcd_call("/kv/txn", txn, timeout, callback);
  45. }
  46. void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback)
  47. {
  48. if (!etcd_addresses.size())
  49. {
  50. fprintf(stderr, "etcd_address is missing in Vitastor configuration\n");
  51. exit(1);
  52. }
  53. std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
  54. std::string etcd_api_path;
  55. int pos = etcd_address.find('/');
  56. if (pos >= 0)
  57. {
  58. etcd_api_path = etcd_address.substr(pos);
  59. etcd_address = etcd_address.substr(0, pos);
  60. }
  61. std::string req = payload.dump();
  62. req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n"
  63. "Host: "+etcd_address+"\r\n"
  64. "Content-Type: application/json\r\n"
  65. "Content-Length: "+std::to_string(req.size())+"\r\n"
  66. "Connection: close\r\n"
  67. "\r\n"+req;
  68. http_request_json(tfd, etcd_address, req, timeout, callback);
  69. }
  70. void etcd_state_client_t::add_etcd_url(std::string addr)
  71. {
  72. if (addr.length() > 0)
  73. {
  74. if (strtolower(addr.substr(0, 7)) == "http://")
  75. addr = addr.substr(7);
  76. else if (strtolower(addr.substr(0, 8)) == "https://")
  77. {
  78. fprintf(stderr, "HTTPS is unsupported for etcd. Either use plain HTTP or setup a local proxy for etcd interaction\n");
  79. exit(1);
  80. }
  81. if (addr.find('/') == std::string::npos)
  82. addr += "/v3";
  83. this->etcd_addresses.push_back(addr);
  84. }
  85. }
  86. void etcd_state_client_t::parse_config(const json11::Json & config)
  87. {
  88. this->etcd_addresses.clear();
  89. if (config["etcd_address"].is_string())
  90. {
  91. std::string ea = config["etcd_address"].string_value();
  92. while (1)
  93. {
  94. int pos = ea.find(',');
  95. add_etcd_url(pos >= 0 ? ea.substr(0, pos) : ea);
  96. if (pos >= 0)
  97. ea = ea.substr(pos+1);
  98. else
  99. break;
  100. }
  101. }
  102. else if (config["etcd_address"].array_items().size())
  103. {
  104. for (auto & ea: config["etcd_address"].array_items())
  105. {
  106. add_etcd_url(ea.string_value());
  107. }
  108. }
  109. this->etcd_prefix = config["etcd_prefix"].string_value();
  110. if (this->etcd_prefix == "")
  111. {
  112. this->etcd_prefix = "/vitastor";
  113. }
  114. else if (this->etcd_prefix[0] != '/')
  115. {
  116. this->etcd_prefix = "/"+this->etcd_prefix;
  117. }
  118. this->log_level = config["log_level"].int64_value();
  119. }
  120. void etcd_state_client_t::start_etcd_watcher()
  121. {
  122. if (!etcd_addresses.size())
  123. {
  124. fprintf(stderr, "etcd_address is missing in Vitastor configuration\n");
  125. exit(1);
  126. }
  127. std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
  128. std::string etcd_api_path;
  129. int pos = etcd_address.find('/');
  130. if (pos >= 0)
  131. {
  132. etcd_api_path = etcd_address.substr(pos);
  133. etcd_address = etcd_address.substr(0, pos);
  134. }
  135. etcd_watches_initialised = 0;
  136. etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg)
  137. {
  138. if (msg->body.length())
  139. {
  140. std::string json_err;
  141. json11::Json data = json11::Json::parse(msg->body, json_err);
  142. if (json_err != "")
  143. {
  144. fprintf(stderr, "Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str());
  145. }
  146. else
  147. {
  148. if (data["result"]["created"].bool_value())
  149. {
  150. etcd_watches_initialised++;
  151. }
  152. if (etcd_watches_initialised == 4)
  153. {
  154. etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();
  155. }
  156. // First gather all changes into a hash to remove multiple overwrites
  157. std::map<std::string, etcd_kv_t> changes;
  158. for (auto & ev: data["result"]["events"].array_items())
  159. {
  160. auto kv = parse_etcd_kv(ev["kv"]);
  161. if (kv.key != "")
  162. {
  163. changes[kv.key] = kv;
  164. }
  165. }
  166. for (auto & kv: changes)
  167. {
  168. if (this->log_level > 3)
  169. {
  170. fprintf(stderr, "Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.value.dump().c_str());
  171. }
  172. parse_state(kv.second);
  173. }
  174. // React to changes
  175. if (on_change_hook != NULL)
  176. {
  177. on_change_hook(changes);
  178. }
  179. }
  180. }
  181. if (msg->eof)
  182. {
  183. etcd_watch_ws = NULL;
  184. if (etcd_watches_initialised == 0)
  185. {
  186. // Connection not established, retry in <ETCD_SLOW_TIMEOUT>
  187. tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int)
  188. {
  189. start_etcd_watcher();
  190. });
  191. }
  192. else if (etcd_watches_initialised > 0)
  193. {
  194. // Connection was live, retry immediately
  195. start_etcd_watcher();
  196. }
  197. }
  198. });
  199. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  200. { "create_request", json11::Json::object {
  201. { "key", base64_encode(etcd_prefix+"/config/") },
  202. { "range_end", base64_encode(etcd_prefix+"/config0") },
  203. { "start_revision", etcd_watch_revision+1 },
  204. { "watch_id", ETCD_CONFIG_WATCH_ID },
  205. { "progress_notify", true },
  206. } }
  207. }).dump());
  208. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  209. { "create_request", json11::Json::object {
  210. { "key", base64_encode(etcd_prefix+"/osd/state/") },
  211. { "range_end", base64_encode(etcd_prefix+"/osd/state0") },
  212. { "start_revision", etcd_watch_revision+1 },
  213. { "watch_id", ETCD_OSD_STATE_WATCH_ID },
  214. { "progress_notify", true },
  215. } }
  216. }).dump());
  217. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  218. { "create_request", json11::Json::object {
  219. { "key", base64_encode(etcd_prefix+"/pg/state/") },
  220. { "range_end", base64_encode(etcd_prefix+"/pg/state0") },
  221. { "start_revision", etcd_watch_revision+1 },
  222. { "watch_id", ETCD_PG_STATE_WATCH_ID },
  223. { "progress_notify", true },
  224. } }
  225. }).dump());
  226. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  227. { "create_request", json11::Json::object {
  228. { "key", base64_encode(etcd_prefix+"/pg/history/") },
  229. { "range_end", base64_encode(etcd_prefix+"/pg/history0") },
  230. { "start_revision", etcd_watch_revision+1 },
  231. { "watch_id", ETCD_PG_HISTORY_WATCH_ID },
  232. { "progress_notify", true },
  233. } }
  234. }).dump());
  235. }
  236. void etcd_state_client_t::load_global_config()
  237. {
  238. etcd_call("/kv/range", json11::Json::object {
  239. { "key", base64_encode(etcd_prefix+"/config/global") }
  240. }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
  241. {
  242. if (err != "")
  243. {
  244. fprintf(stderr, "Error reading OSD configuration from etcd: %s\n", err.c_str());
  245. tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
  246. {
  247. load_global_config();
  248. });
  249. return;
  250. }
  251. json11::Json::object global_config;
  252. if (data["kvs"].array_items().size() > 0)
  253. {
  254. auto kv = parse_etcd_kv(data["kvs"][0]);
  255. if (kv.value.is_object())
  256. {
  257. global_config = kv.value.object_items();
  258. }
  259. }
  260. bs_block_size = global_config["block_size"].uint64_value();
  261. if (!bs_block_size)
  262. {
  263. bs_block_size = DEFAULT_BLOCK_SIZE;
  264. }
  265. on_load_config_hook(global_config);
  266. });
  267. }
  268. void etcd_state_client_t::load_pgs()
  269. {
  270. json11::Json::array txn = {
  271. json11::Json::object {
  272. { "request_range", json11::Json::object {
  273. { "key", base64_encode(etcd_prefix+"/config/pools") },
  274. } }
  275. },
  276. json11::Json::object {
  277. { "request_range", json11::Json::object {
  278. { "key", base64_encode(etcd_prefix+"/config/pgs") },
  279. } }
  280. },
  281. json11::Json::object {
  282. { "request_range", json11::Json::object {
  283. { "key", base64_encode(etcd_prefix+"/config/inode/") },
  284. { "range_end", base64_encode(etcd_prefix+"/config/inode0") },
  285. } }
  286. },
  287. json11::Json::object {
  288. { "request_range", json11::Json::object {
  289. { "key", base64_encode(etcd_prefix+"/pg/history/") },
  290. { "range_end", base64_encode(etcd_prefix+"/pg/history0") },
  291. } }
  292. },
  293. json11::Json::object {
  294. { "request_range", json11::Json::object {
  295. { "key", base64_encode(etcd_prefix+"/pg/state/") },
  296. { "range_end", base64_encode(etcd_prefix+"/pg/state0") },
  297. } }
  298. },
  299. json11::Json::object {
  300. { "request_range", json11::Json::object {
  301. { "key", base64_encode(etcd_prefix+"/osd/state/") },
  302. { "range_end", base64_encode(etcd_prefix+"/osd/state0") },
  303. } }
  304. },
  305. };
  306. json11::Json::object req = { { "success", txn } };
  307. json11::Json checks = load_pgs_checks_hook != NULL ? load_pgs_checks_hook() : json11::Json();
  308. if (checks.array_items().size() > 0)
  309. {
  310. req["compare"] = checks;
  311. }
  312. etcd_txn(req, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
  313. {
  314. if (err != "")
  315. {
  316. fprintf(stderr, "Error loading PGs from etcd: %s\n", err.c_str());
  317. tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
  318. {
  319. load_pgs();
  320. });
  321. return;
  322. }
  323. if (!data["succeeded"].bool_value())
  324. {
  325. on_load_pgs_hook(false);
  326. return;
  327. }
  328. if (!etcd_watch_revision)
  329. {
  330. etcd_watch_revision = data["header"]["revision"].uint64_value();
  331. }
  332. for (auto & res: data["responses"].array_items())
  333. {
  334. for (auto & kv_json: res["response_range"]["kvs"].array_items())
  335. {
  336. auto kv = parse_etcd_kv(kv_json);
  337. parse_state(kv);
  338. }
  339. }
  340. on_load_pgs_hook(true);
  341. start_etcd_watcher();
  342. });
  343. }
  344. #else
  345. void etcd_state_client_t::parse_config(const json11::Json & config)
  346. {
  347. }
  348. void etcd_state_client_t::load_global_config()
  349. {
  350. json11::Json::object global_config;
  351. on_load_config_hook(global_config);
  352. }
  353. void etcd_state_client_t::load_pgs()
  354. {
  355. }
  356. #endif
  357. void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
  358. {
  359. const std::string & key = kv.key;
  360. const json11::Json & value = kv.value;
  361. if (key == etcd_prefix+"/config/pools")
  362. {
  363. for (auto & pool_item: this->pool_config)
  364. {
  365. pool_item.second.exists = false;
  366. }
  367. for (auto & pool_item: value.object_items())
  368. {
  369. pool_config_t pc;
  370. // ID
  371. pool_id_t pool_id;
  372. char null_byte = 0;
  373. sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte);
  374. if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0)
  375. {
  376. fprintf(stderr, "Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
  377. continue;
  378. }
  379. pc.id = pool_id;
  380. // Pool Name
  381. pc.name = pool_item.second["name"].string_value();
  382. if (pc.name == "")
  383. {
  384. fprintf(stderr, "Pool %u has empty name, skipping pool\n", pool_id);
  385. continue;
  386. }
  387. // Failure Domain
  388. pc.failure_domain = pool_item.second["failure_domain"].string_value();
  389. // Coding Scheme
  390. if (pool_item.second["scheme"] == "replicated")
  391. pc.scheme = POOL_SCHEME_REPLICATED;
  392. else if (pool_item.second["scheme"] == "xor")
  393. pc.scheme = POOL_SCHEME_XOR;
  394. else if (pool_item.second["scheme"] == "jerasure")
  395. pc.scheme = POOL_SCHEME_JERASURE;
  396. else
  397. {
  398. fprintf(stderr, "Pool %u has invalid coding scheme (one of \"xor\", \"replicated\" or \"jerasure\" required), skipping pool\n", pool_id);
  399. continue;
  400. }
  401. // PG Size
  402. pc.pg_size = pool_item.second["pg_size"].uint64_value();
  403. if (pc.pg_size < 1 ||
  404. pool_item.second["pg_size"].uint64_value() < 3 &&
  405. (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) ||
  406. pool_item.second["pg_size"].uint64_value() > 256)
  407. {
  408. fprintf(stderr, "Pool %u has invalid pg_size, skipping pool\n", pool_id);
  409. continue;
  410. }
  411. // Parity Chunks
  412. pc.parity_chunks = pool_item.second["parity_chunks"].uint64_value();
  413. if (pc.scheme == POOL_SCHEME_XOR)
  414. {
  415. if (pc.parity_chunks > 1)
  416. {
  417. fprintf(stderr, "Pool %u has invalid parity_chunks (must be 1), skipping pool\n", pool_id);
  418. continue;
  419. }
  420. pc.parity_chunks = 1;
  421. }
  422. if (pc.scheme == POOL_SCHEME_JERASURE &&
  423. (pc.parity_chunks < 1 || pc.parity_chunks > pc.pg_size-2))
  424. {
  425. fprintf(stderr, "Pool %u has invalid parity_chunks (must be between 1 and pg_size-2), skipping pool\n", pool_id);
  426. continue;
  427. }
  428. // PG MinSize
  429. pc.pg_minsize = pool_item.second["pg_minsize"].uint64_value();
  430. if (pc.pg_minsize < 1 || pc.pg_minsize > pc.pg_size ||
  431. (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) &&
  432. pc.pg_minsize < (pc.pg_size-pc.parity_chunks))
  433. {
  434. fprintf(stderr, "Pool %u has invalid pg_minsize, skipping pool\n", pool_id);
  435. continue;
  436. }
  437. // PG Count
  438. pc.pg_count = pool_item.second["pg_count"].uint64_value();
  439. if (pc.pg_count < 1)
  440. {
  441. fprintf(stderr, "Pool %u has invalid pg_count, skipping pool\n", pool_id);
  442. continue;
  443. }
  444. // Max OSD Combinations
  445. pc.max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value();
  446. if (!pc.max_osd_combinations)
  447. pc.max_osd_combinations = 10000;
  448. if (pc.max_osd_combinations > 0 && pc.max_osd_combinations < 100)
  449. {
  450. fprintf(stderr, "Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
  451. continue;
  452. }
  453. // PG Stripe Size
  454. pc.pg_stripe_size = pool_item.second["pg_stripe_size"].uint64_value();
  455. uint64_t min_stripe_size = bs_block_size * (pc.scheme == POOL_SCHEME_REPLICATED ? 1 : (pc.pg_size-pc.parity_chunks));
  456. if (pc.pg_stripe_size < min_stripe_size)
  457. pc.pg_stripe_size = min_stripe_size;
  458. // Save
  459. pc.real_pg_count = this->pool_config[pool_id].real_pg_count;
  460. std::swap(pc.pg_config, this->pool_config[pool_id].pg_config);
  461. std::swap(this->pool_config[pool_id], pc);
  462. auto & parsed_cfg = this->pool_config[pool_id];
  463. parsed_cfg.exists = true;
  464. for (auto & pg_item: parsed_cfg.pg_config)
  465. {
  466. if (pg_item.second.target_set.size() != parsed_cfg.pg_size)
  467. {
  468. fprintf(stderr, "Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
  469. pool_id, pg_item.first, pg_item.second.target_set.size(), parsed_cfg.pg_size);
  470. pg_item.second.pause = true;
  471. }
  472. }
  473. }
  474. }
  475. else if (key == etcd_prefix+"/config/pgs")
  476. {
  477. for (auto & pool_item: this->pool_config)
  478. {
  479. for (auto & pg_item: pool_item.second.pg_config)
  480. {
  481. pg_item.second.exists = false;
  482. }
  483. }
  484. for (auto & pool_item: value["items"].object_items())
  485. {
  486. pool_id_t pool_id;
  487. char null_byte = 0;
  488. sscanf(pool_item.first.c_str(), "%u%c", &pool_id, &null_byte);
  489. if (!pool_id || pool_id >= POOL_ID_MAX || null_byte != 0)
  490. {
  491. fprintf(stderr, "Pool ID %s is invalid in PG configuration (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
  492. continue;
  493. }
  494. for (auto & pg_item: pool_item.second.object_items())
  495. {
  496. pg_num_t pg_num = 0;
  497. sscanf(pg_item.first.c_str(), "%u%c", &pg_num, &null_byte);
  498. if (!pg_num || null_byte != 0)
  499. {
  500. fprintf(stderr, "Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str());
  501. continue;
  502. }
  503. auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num];
  504. parsed_cfg.exists = true;
  505. parsed_cfg.pause = pg_item.second["pause"].bool_value();
  506. parsed_cfg.primary = pg_item.second["primary"].uint64_value();
  507. parsed_cfg.target_set.clear();
  508. for (auto & pg_osd: pg_item.second["osd_set"].array_items())
  509. {
  510. parsed_cfg.target_set.push_back(pg_osd.uint64_value());
  511. }
  512. if (parsed_cfg.target_set.size() != pool_config[pool_id].pg_size)
  513. {
  514. fprintf(stderr, "Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
  515. pool_id, pg_num, parsed_cfg.target_set.size(), pool_config[pool_id].pg_size);
  516. parsed_cfg.pause = true;
  517. }
  518. }
  519. }
  520. for (auto & pool_item: this->pool_config)
  521. {
  522. int n = 0;
  523. for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
  524. {
  525. if (pg_it->second.exists && pg_it->first != ++n)
  526. {
  527. fprintf(
  528. stderr, "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n",
  529. pool_item.second.id, pool_item.second.pg_config.size()
  530. );
  531. for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
  532. {
  533. pg_it->second.exists = false;
  534. }
  535. n = 0;
  536. break;
  537. }
  538. }
  539. pool_item.second.real_pg_count = n;
  540. }
  541. }
  542. else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
  543. {
  544. // <etcd_prefix>/pg/history/%d/%d
  545. pool_id_t pool_id = 0;
  546. pg_num_t pg_num = 0;
  547. char null_byte = 0;
  548. sscanf(key.c_str() + etcd_prefix.length()+12, "%u/%u%c", &pool_id, &pg_num, &null_byte);
  549. if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
  550. {
  551. fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
  552. }
  553. else
  554. {
  555. auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
  556. pg_cfg.target_history.clear();
  557. pg_cfg.all_peers.clear();
  558. // Refuse to start PG if any set of the <osd_sets> has no live OSDs
  559. for (auto hist_item: value["osd_sets"].array_items())
  560. {
  561. std::vector<osd_num_t> history_set;
  562. for (auto pg_osd: hist_item.array_items())
  563. {
  564. history_set.push_back(pg_osd.uint64_value());
  565. }
  566. pg_cfg.target_history.push_back(history_set);
  567. }
  568. // Include these additional OSDs when peering the PG
  569. for (auto pg_osd: value["all_peers"].array_items())
  570. {
  571. pg_cfg.all_peers.push_back(pg_osd.uint64_value());
  572. }
  573. // Read epoch
  574. pg_cfg.epoch = value["epoch"].uint64_value();
  575. if (on_change_pg_history_hook != NULL)
  576. {
  577. on_change_pg_history_hook(pool_id, pg_num);
  578. }
  579. }
  580. }
  581. else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/")
  582. {
  583. // <etcd_prefix>/pg/state/%d/%d
  584. pool_id_t pool_id = 0;
  585. pg_num_t pg_num = 0;
  586. char null_byte = 0;
  587. sscanf(key.c_str() + etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte);
  588. if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
  589. {
  590. fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
  591. }
  592. else if (value.is_null())
  593. {
  594. this->pool_config[pool_id].pg_config[pg_num].cur_primary = 0;
  595. this->pool_config[pool_id].pg_config[pg_num].cur_state = 0;
  596. }
  597. else
  598. {
  599. osd_num_t cur_primary = value["primary"].uint64_value();
  600. int state = 0;
  601. for (auto & e: value["state"].array_items())
  602. {
  603. int i;
  604. for (i = 0; i < pg_state_bit_count; i++)
  605. {
  606. if (e.string_value() == pg_state_names[i])
  607. {
  608. state = state | pg_state_bits[i];
  609. break;
  610. }
  611. }
  612. if (i >= pg_state_bit_count)
  613. {
  614. fprintf(stderr, "Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str());
  615. return;
  616. }
  617. }
  618. if (!cur_primary || !value["state"].is_array() || !state ||
  619. (state & PG_OFFLINE) && state != PG_OFFLINE ||
  620. (state & PG_PEERING) && state != PG_PEERING ||
  621. (state & PG_INCOMPLETE) && state != PG_INCOMPLETE)
  622. {
  623. fprintf(stderr, "Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str());
  624. return;
  625. }
  626. this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary;
  627. this->pool_config[pool_id].pg_config[pg_num].cur_state = state;
  628. }
  629. }
  630. else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")
  631. {
  632. // <etcd_prefix>/osd/state/%d
  633. osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11));
  634. if (peer_osd > 0)
  635. {
  636. if (value.is_object() && value["state"] == "up" &&
  637. value["addresses"].is_array() &&
  638. value["port"].int64_value() > 0 && value["port"].int64_value() < 65536)
  639. {
  640. this->peer_states[peer_osd] = value;
  641. }
  642. else
  643. {
  644. this->peer_states.erase(peer_osd);
  645. }
  646. if (on_change_osd_state_hook != NULL)
  647. {
  648. on_change_osd_state_hook(peer_osd);
  649. }
  650. }
  651. }
  652. else if (key.substr(0, etcd_prefix.length()+14) == etcd_prefix+"/config/inode/")
  653. {
  654. // <etcd_prefix>/config/inode/%d/%d
  655. uint64_t pool_id = 0;
  656. uint64_t inode_num = 0;
  657. char null_byte = 0;
  658. sscanf(key.c_str() + etcd_prefix.length()+14, "%lu/%lu%c", &pool_id, &inode_num, &null_byte);
  659. if (!pool_id || pool_id >= POOL_ID_MAX || !inode_num || (inode_num >> (64-POOL_ID_BITS)) || null_byte != 0)
  660. {
  661. fprintf(stderr, "Bad etcd key %s, ignoring\n", key.c_str());
  662. }
  663. else
  664. {
  665. inode_num |= (pool_id << (64-POOL_ID_BITS));
  666. auto it = this->inode_config.find(inode_num);
  667. if (it != this->inode_config.end() && it->second.name != "")
  668. {
  669. auto n_it = this->inode_by_name.find(it->second.name);
  670. if (n_it->second == inode_num)
  671. {
  672. this->inode_by_name.erase(n_it);
  673. for (auto w: watches)
  674. {
  675. if (w->name == it->second.name)
  676. {
  677. w->cfg = { 0 };
  678. }
  679. }
  680. }
  681. }
  682. if (!value.is_object())
  683. {
  684. this->inode_config.erase(inode_num);
  685. }
  686. else
  687. {
  688. inode_t parent_inode_num = value["parent_id"].uint64_value();
  689. if (parent_inode_num && !(parent_inode_num >> (64-POOL_ID_BITS)))
  690. {
  691. uint64_t parent_pool_id = value["parent_pool"].uint64_value();
  692. if (!parent_pool_id)
  693. parent_inode_num |= pool_id << (64-POOL_ID_BITS);
  694. else if (parent_pool_id >= POOL_ID_MAX)
  695. {
  696. fprintf(
  697. stderr, "Inode %lu/%lu parent_pool value is invalid, ignoring parent setting\n",
  698. inode_num >> (64-POOL_ID_BITS), inode_num & ((1l << (64-POOL_ID_BITS)) - 1)
  699. );
  700. parent_inode_num = 0;
  701. }
  702. else
  703. parent_inode_num |= parent_pool_id << (64-POOL_ID_BITS);
  704. }
  705. inode_config_t cfg = (inode_config_t){
  706. .num = inode_num,
  707. .name = value["name"].string_value(),
  708. .size = value["size"].uint64_value(),
  709. .parent_id = parent_inode_num,
  710. .readonly = value["readonly"].bool_value(),
  711. .mod_revision = kv.mod_revision,
  712. };
  713. this->inode_config[inode_num] = cfg;
  714. if (cfg.name != "")
  715. {
  716. this->inode_by_name[cfg.name] = inode_num;
  717. for (auto w: watches)
  718. {
  719. if (w->name == value["name"].string_value())
  720. {
  721. w->cfg = cfg;
  722. }
  723. }
  724. }
  725. }
  726. }
  727. }
  728. }
  729. inode_watch_t* etcd_state_client_t::watch_inode(std::string name)
  730. {
  731. inode_watch_t *watch = new inode_watch_t;
  732. watch->name = name;
  733. watches.push_back(watch);
  734. auto it = inode_by_name.find(name);
  735. if (it != inode_by_name.end())
  736. {
  737. watch->cfg = inode_config[it->second];
  738. }
  739. return watch;
  740. }
  741. void etcd_state_client_t::close_watch(inode_watch_t* watch)
  742. {
  743. for (int i = 0; i < watches.size(); i++)
  744. {
  745. if (watches[i] == watch)
  746. {
  747. watches.erase(watches.begin()+i, watches.begin()+i+1);
  748. break;
  749. }
  750. }
  751. delete watch;
  752. }