Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

601 lines
23 KiB

  1. // Copyright (c) Vitaliy Filippov, 2019+
  2. // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
  3. #include "osd_ops.h"
  4. #include "pg_states.h"
  5. #include "etcd_state_client.h"
  6. #include "http_client.h"
  7. #include "base64.h"
  8. json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json)
  9. {
  10. json_kv_t kv;
  11. kv.key = base64_decode(kv_json["key"].string_value());
  12. std::string json_err, json_text = base64_decode(kv_json["value"].string_value());
  13. kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err);
  14. if (json_err != "")
  15. {
  16. printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str());
  17. kv.key = "";
  18. }
  19. return kv;
  20. }
  21. void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function<void(std::string, json11::Json)> callback)
  22. {
  23. etcd_call("/kv/txn", txn, timeout, callback);
  24. }
  25. void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback)
  26. {
  27. std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
  28. std::string etcd_api_path;
  29. int pos = etcd_address.find('/');
  30. if (pos >= 0)
  31. {
  32. etcd_api_path = etcd_address.substr(pos);
  33. etcd_address = etcd_address.substr(0, pos);
  34. }
  35. std::string req = payload.dump();
  36. req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n"
  37. "Host: "+etcd_address+"\r\n"
  38. "Content-Type: application/json\r\n"
  39. "Content-Length: "+std::to_string(req.size())+"\r\n"
  40. "Connection: close\r\n"
  41. "\r\n"+req;
  42. http_request_json(tfd, etcd_address, req, timeout, callback);
  43. }
  44. void etcd_state_client_t::parse_config(json11::Json & config)
  45. {
  46. this->etcd_addresses.clear();
  47. if (config["etcd_address"].is_string())
  48. {
  49. std::string ea = config["etcd_address"].string_value();
  50. while (1)
  51. {
  52. int pos = ea.find(',');
  53. std::string addr = pos >= 0 ? ea.substr(0, pos) : ea;
  54. if (addr.length() > 0)
  55. {
  56. if (addr.find('/') < 0)
  57. addr += "/v3";
  58. this->etcd_addresses.push_back(addr);
  59. }
  60. if (pos >= 0)
  61. ea = ea.substr(pos+1);
  62. else
  63. break;
  64. }
  65. }
  66. else if (config["etcd_address"].array_items().size())
  67. {
  68. for (auto & ea: config["etcd_address"].array_items())
  69. {
  70. std::string addr = ea.string_value();
  71. if (addr != "")
  72. {
  73. if (addr.find('/') < 0)
  74. addr += "/v3";
  75. this->etcd_addresses.push_back(addr);
  76. }
  77. }
  78. }
  79. this->etcd_prefix = config["etcd_prefix"].string_value();
  80. if (this->etcd_prefix == "")
  81. {
  82. this->etcd_prefix = "/vitastor";
  83. }
  84. else if (this->etcd_prefix[0] != '/')
  85. {
  86. this->etcd_prefix = "/"+this->etcd_prefix;
  87. }
  88. this->log_level = config["log_level"].int64_value();
  89. }
  90. void etcd_state_client_t::start_etcd_watcher()
  91. {
  92. std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()];
  93. std::string etcd_api_path;
  94. int pos = etcd_address.find('/');
  95. if (pos >= 0)
  96. {
  97. etcd_api_path = etcd_address.substr(pos);
  98. etcd_address = etcd_address.substr(0, pos);
  99. }
  100. etcd_watches_initialised = 0;
  101. etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg)
  102. {
  103. if (msg->body.length())
  104. {
  105. std::string json_err;
  106. json11::Json data = json11::Json::parse(msg->body, json_err);
  107. if (json_err != "")
  108. {
  109. printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str());
  110. }
  111. else
  112. {
  113. if (data["result"]["created"].bool_value())
  114. {
  115. etcd_watches_initialised++;
  116. }
  117. if (etcd_watches_initialised == 4)
  118. {
  119. etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();
  120. }
  121. // First gather all changes into a hash to remove multiple overwrites
  122. json11::Json::object changes;
  123. for (auto & ev: data["result"]["events"].array_items())
  124. {
  125. auto kv = parse_etcd_kv(ev["kv"]);
  126. if (kv.key != "")
  127. {
  128. changes[kv.key] = kv.value;
  129. }
  130. }
  131. for (auto & kv: changes)
  132. {
  133. if (this->log_level > 3)
  134. {
  135. printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str());
  136. }
  137. parse_state(kv.first, kv.second);
  138. }
  139. // React to changes
  140. if (on_change_hook != NULL)
  141. {
  142. on_change_hook(changes);
  143. }
  144. }
  145. }
  146. if (msg->eof)
  147. {
  148. etcd_watch_ws = NULL;
  149. if (etcd_watches_initialised == 0)
  150. {
  151. // Connection not established, retry in <ETCD_SLOW_TIMEOUT>
  152. tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int)
  153. {
  154. start_etcd_watcher();
  155. });
  156. }
  157. else
  158. {
  159. // Connection was live, retry immediately
  160. start_etcd_watcher();
  161. }
  162. }
  163. });
  164. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  165. { "create_request", json11::Json::object {
  166. { "key", base64_encode(etcd_prefix+"/config/") },
  167. { "range_end", base64_encode(etcd_prefix+"/config0") },
  168. { "start_revision", etcd_watch_revision+1 },
  169. { "watch_id", ETCD_CONFIG_WATCH_ID },
  170. { "progress_notify", true },
  171. } }
  172. }).dump());
  173. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  174. { "create_request", json11::Json::object {
  175. { "key", base64_encode(etcd_prefix+"/osd/state/") },
  176. { "range_end", base64_encode(etcd_prefix+"/osd/state0") },
  177. { "start_revision", etcd_watch_revision+1 },
  178. { "watch_id", ETCD_OSD_STATE_WATCH_ID },
  179. { "progress_notify", true },
  180. } }
  181. }).dump());
  182. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  183. { "create_request", json11::Json::object {
  184. { "key", base64_encode(etcd_prefix+"/pg/state/") },
  185. { "range_end", base64_encode(etcd_prefix+"/pg/state0") },
  186. { "start_revision", etcd_watch_revision+1 },
  187. { "watch_id", ETCD_PG_STATE_WATCH_ID },
  188. { "progress_notify", true },
  189. } }
  190. }).dump());
  191. etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
  192. { "create_request", json11::Json::object {
  193. { "key", base64_encode(etcd_prefix+"/pg/history/") },
  194. { "range_end", base64_encode(etcd_prefix+"/pg/history0") },
  195. { "start_revision", etcd_watch_revision+1 },
  196. { "watch_id", ETCD_PG_HISTORY_WATCH_ID },
  197. { "progress_notify", true },
  198. } }
  199. }).dump());
  200. }
  201. void etcd_state_client_t::load_global_config()
  202. {
  203. etcd_call("/kv/range", json11::Json::object {
  204. { "key", base64_encode(etcd_prefix+"/config/global") }
  205. }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
  206. {
  207. if (err != "")
  208. {
  209. printf("Error reading OSD configuration from etcd: %s\n", err.c_str());
  210. tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
  211. {
  212. load_global_config();
  213. });
  214. return;
  215. }
  216. json11::Json::object global_config;
  217. if (data["kvs"].array_items().size() > 0)
  218. {
  219. auto kv = parse_etcd_kv(data["kvs"][0]);
  220. if (kv.value.is_object())
  221. {
  222. global_config = kv.value.object_items();
  223. }
  224. }
  225. bs_block_size = global_config["block_size"].uint64_value();
  226. if (!bs_block_size)
  227. {
  228. bs_block_size = DEFAULT_BLOCK_SIZE;
  229. }
  230. on_load_config_hook(global_config);
  231. });
  232. }
  233. void etcd_state_client_t::load_pgs()
  234. {
  235. json11::Json::array txn = {
  236. json11::Json::object {
  237. { "request_range", json11::Json::object {
  238. { "key", base64_encode(etcd_prefix+"/config/pools") },
  239. } }
  240. },
  241. json11::Json::object {
  242. { "request_range", json11::Json::object {
  243. { "key", base64_encode(etcd_prefix+"/config/pgs") },
  244. } }
  245. },
  246. json11::Json::object {
  247. { "request_range", json11::Json::object {
  248. { "key", base64_encode(etcd_prefix+"/pg/history/") },
  249. { "range_end", base64_encode(etcd_prefix+"/pg/history0") },
  250. } }
  251. },
  252. json11::Json::object {
  253. { "request_range", json11::Json::object {
  254. { "key", base64_encode(etcd_prefix+"/pg/state/") },
  255. { "range_end", base64_encode(etcd_prefix+"/pg/state0") },
  256. } }
  257. },
  258. json11::Json::object {
  259. { "request_range", json11::Json::object {
  260. { "key", base64_encode(etcd_prefix+"/osd/state/") },
  261. { "range_end", base64_encode(etcd_prefix+"/osd/state0") },
  262. } }
  263. },
  264. };
  265. json11::Json::object req = { { "success", txn } };
  266. json11::Json checks = load_pgs_checks_hook != NULL ? load_pgs_checks_hook() : json11::Json();
  267. if (checks.array_items().size() > 0)
  268. {
  269. req["compare"] = checks;
  270. }
  271. etcd_txn(req, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data)
  272. {
  273. if (err != "")
  274. {
  275. printf("Error loading PGs from etcd: %s\n", err.c_str());
  276. tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id)
  277. {
  278. load_pgs();
  279. });
  280. return;
  281. }
  282. if (!data["succeeded"].bool_value())
  283. {
  284. on_load_pgs_hook(false);
  285. return;
  286. }
  287. if (!etcd_watch_revision)
  288. {
  289. etcd_watch_revision = data["header"]["revision"].uint64_value();
  290. }
  291. for (auto & res: data["responses"].array_items())
  292. {
  293. for (auto & kv_json: res["response_range"]["kvs"].array_items())
  294. {
  295. auto kv = parse_etcd_kv(kv_json);
  296. parse_state(kv.key, kv.value);
  297. }
  298. }
  299. on_load_pgs_hook(true);
  300. start_etcd_watcher();
  301. });
  302. }
  303. void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value)
  304. {
  305. if (key == etcd_prefix+"/config/pools")
  306. {
  307. for (auto & pool_item: this->pool_config)
  308. {
  309. pool_item.second.exists = false;
  310. }
  311. for (auto & pool_item: value.object_items())
  312. {
  313. pool_config_t pc;
  314. // ID
  315. pool_id_t pool_id = stoull_full(pool_item.first);
  316. if (!pool_id || pool_id >= POOL_ID_MAX)
  317. {
  318. printf("Pool ID %s is invalid (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
  319. continue;
  320. }
  321. pc.id = pool_id;
  322. // Pool Name
  323. pc.name = pool_item.second["name"].string_value();
  324. if (pc.name == "")
  325. {
  326. printf("Pool %u has empty name, skipping pool\n", pool_id);
  327. continue;
  328. }
  329. // Failure Domain
  330. pc.failure_domain = pool_item.second["failure_domain"].string_value();
  331. // Coding Scheme
  332. if (pool_item.second["scheme"] == "replicated")
  333. pc.scheme = POOL_SCHEME_REPLICATED;
  334. else if (pool_item.second["scheme"] == "xor")
  335. pc.scheme = POOL_SCHEME_XOR;
  336. else if (pool_item.second["scheme"] == "jerasure")
  337. pc.scheme = POOL_SCHEME_JERASURE;
  338. else
  339. {
  340. printf("Pool %u has invalid coding scheme (one of \"xor\", \"replicated\" or \"jerasure\" required), skipping pool\n", pool_id);
  341. continue;
  342. }
  343. // PG Size
  344. pc.pg_size = pool_item.second["pg_size"].uint64_value();
  345. if (pc.pg_size < 1 ||
  346. pool_item.second["pg_size"].uint64_value() < 3 &&
  347. (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) ||
  348. pool_item.second["pg_size"].uint64_value() > 256)
  349. {
  350. printf("Pool %u has invalid pg_size, skipping pool\n", pool_id);
  351. continue;
  352. }
  353. // Parity Chunks
  354. pc.parity_chunks = pool_item.second["parity_chunks"].uint64_value();
  355. if (pc.scheme == POOL_SCHEME_XOR)
  356. {
  357. if (pc.parity_chunks > 1)
  358. {
  359. printf("Pool %u has invalid parity_chunks (must be 1), skipping pool\n", pool_id);
  360. continue;
  361. }
  362. pc.parity_chunks = 1;
  363. }
  364. if (pc.scheme == POOL_SCHEME_JERASURE &&
  365. (pc.parity_chunks < 1 || pc.parity_chunks > pc.pg_size-2))
  366. {
  367. printf("Pool %u has invalid parity_chunks (must be between 1 and pg_size-2), skipping pool\n", pool_id);
  368. continue;
  369. }
  370. // PG MinSize
  371. pc.pg_minsize = pool_item.second["pg_minsize"].uint64_value();
  372. if (pc.pg_minsize < 1 || pc.pg_minsize > pc.pg_size ||
  373. (pc.scheme == POOL_SCHEME_XOR || pc.scheme == POOL_SCHEME_JERASURE) &&
  374. pc.pg_minsize < (pc.pg_size-pc.parity_chunks))
  375. {
  376. printf("Pool %u has invalid pg_minsize, skipping pool\n", pool_id);
  377. continue;
  378. }
  379. // PG Count
  380. pc.pg_count = pool_item.second["pg_count"].uint64_value();
  381. if (pc.pg_count < 1)
  382. {
  383. printf("Pool %u has invalid pg_count, skipping pool\n", pool_id);
  384. continue;
  385. }
  386. // Max OSD Combinations
  387. pc.max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value();
  388. if (!pc.max_osd_combinations)
  389. pc.max_osd_combinations = 10000;
  390. if (pc.max_osd_combinations > 0 && pc.max_osd_combinations < 100)
  391. {
  392. printf("Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
  393. continue;
  394. }
  395. // PG Stripe Size
  396. pc.pg_stripe_size = pool_item.second["pg_stripe_size"].uint64_value();
  397. uint64_t min_stripe_size = bs_block_size * (pc.scheme == POOL_SCHEME_REPLICATED ? 1 : (pc.pg_size-pc.parity_chunks));
  398. if (pc.pg_stripe_size < min_stripe_size)
  399. pc.pg_stripe_size = min_stripe_size;
  400. // Save
  401. pc.real_pg_count = this->pool_config[pool_id].real_pg_count;
  402. std::swap(pc.pg_config, this->pool_config[pool_id].pg_config);
  403. std::swap(this->pool_config[pool_id], pc);
  404. auto & parsed_cfg = this->pool_config[pool_id];
  405. parsed_cfg.exists = true;
  406. for (auto & pg_item: parsed_cfg.pg_config)
  407. {
  408. if (pg_item.second.target_set.size() != parsed_cfg.pg_size)
  409. {
  410. printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
  411. pool_id, pg_item.first, pg_item.second.target_set.size(), parsed_cfg.pg_size);
  412. pg_item.second.pause = true;
  413. }
  414. }
  415. }
  416. }
  417. else if (key == etcd_prefix+"/config/pgs")
  418. {
  419. for (auto & pool_item: this->pool_config)
  420. {
  421. for (auto & pg_item: pool_item.second.pg_config)
  422. {
  423. pg_item.second.exists = false;
  424. }
  425. }
  426. for (auto & pool_item: value["items"].object_items())
  427. {
  428. pool_id_t pool_id = stoull_full(pool_item.first);
  429. if (!pool_id || pool_id >= POOL_ID_MAX)
  430. {
  431. printf("Pool ID %s is invalid in PG configuration (must be a number less than 0x%x), skipping pool\n", pool_item.first.c_str(), POOL_ID_MAX);
  432. continue;
  433. }
  434. for (auto & pg_item: pool_item.second.object_items())
  435. {
  436. pg_num_t pg_num = stoull_full(pg_item.first);
  437. if (!pg_num)
  438. {
  439. printf("Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str());
  440. continue;
  441. }
  442. auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num];
  443. parsed_cfg.exists = true;
  444. parsed_cfg.pause = pg_item.second["pause"].bool_value();
  445. parsed_cfg.primary = pg_item.second["primary"].uint64_value();
  446. parsed_cfg.target_set.clear();
  447. for (auto & pg_osd: pg_item.second["osd_set"].array_items())
  448. {
  449. parsed_cfg.target_set.push_back(pg_osd.uint64_value());
  450. }
  451. if (parsed_cfg.target_set.size() != pool_config[pool_id].pg_size)
  452. {
  453. printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
  454. pool_id, pg_num, parsed_cfg.target_set.size(), pool_config[pool_id].pg_size);
  455. parsed_cfg.pause = true;
  456. }
  457. }
  458. }
  459. for (auto & pool_item: this->pool_config)
  460. {
  461. int n = 0;
  462. for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
  463. {
  464. if (pg_it->second.exists && pg_it->first != ++n)
  465. {
  466. printf(
  467. "Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n",
  468. pool_item.second.id, pool_item.second.pg_config.size()
  469. );
  470. for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
  471. {
  472. pg_it->second.exists = false;
  473. }
  474. n = 0;
  475. break;
  476. }
  477. }
  478. pool_item.second.real_pg_count = n;
  479. }
  480. }
  481. else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
  482. {
  483. // <etcd_prefix>/pg/history/%d/%d
  484. pool_id_t pool_id = 0;
  485. pg_num_t pg_num = 0;
  486. char null_byte = 0;
  487. sscanf(key.c_str() + etcd_prefix.length()+12, "%u/%u%c", &pool_id, &pg_num, &null_byte);
  488. if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
  489. {
  490. printf("Bad etcd key %s, ignoring\n", key.c_str());
  491. }
  492. else
  493. {
  494. auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
  495. pg_cfg.target_history.clear();
  496. pg_cfg.all_peers.clear();
  497. // Refuse to start PG if any set of the <osd_sets> has no live OSDs
  498. for (auto hist_item: value["osd_sets"].array_items())
  499. {
  500. std::vector<osd_num_t> history_set;
  501. for (auto pg_osd: hist_item.array_items())
  502. {
  503. history_set.push_back(pg_osd.uint64_value());
  504. }
  505. pg_cfg.target_history.push_back(history_set);
  506. }
  507. // Include these additional OSDs when peering the PG
  508. for (auto pg_osd: value["all_peers"].array_items())
  509. {
  510. pg_cfg.all_peers.push_back(pg_osd.uint64_value());
  511. }
  512. // Read epoch
  513. pg_cfg.epoch = value["epoch"].uint64_value();
  514. if (on_change_pg_history_hook != NULL)
  515. {
  516. on_change_pg_history_hook(pool_id, pg_num);
  517. }
  518. }
  519. }
  520. else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/")
  521. {
  522. // <etcd_prefix>/pg/state/%d/%d
  523. pool_id_t pool_id = 0;
  524. pg_num_t pg_num = 0;
  525. char null_byte = 0;
  526. sscanf(key.c_str() + etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte);
  527. if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
  528. {
  529. printf("Bad etcd key %s, ignoring\n", key.c_str());
  530. }
  531. else if (value.is_null())
  532. {
  533. this->pool_config[pool_id].pg_config[pg_num].cur_primary = 0;
  534. this->pool_config[pool_id].pg_config[pg_num].cur_state = 0;
  535. }
  536. else
  537. {
  538. osd_num_t cur_primary = value["primary"].uint64_value();
  539. int state = 0;
  540. for (auto & e: value["state"].array_items())
  541. {
  542. int i;
  543. for (i = 0; i < pg_state_bit_count; i++)
  544. {
  545. if (e.string_value() == pg_state_names[i])
  546. {
  547. state = state | pg_state_bits[i];
  548. break;
  549. }
  550. }
  551. if (i >= pg_state_bit_count)
  552. {
  553. printf("Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str());
  554. return;
  555. }
  556. }
  557. if (!cur_primary || !value["state"].is_array() || !state ||
  558. (state & PG_OFFLINE) && state != PG_OFFLINE ||
  559. (state & PG_PEERING) && state != PG_PEERING ||
  560. (state & PG_INCOMPLETE) && state != PG_INCOMPLETE)
  561. {
  562. printf("Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str());
  563. return;
  564. }
  565. this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary;
  566. this->pool_config[pool_id].pg_config[pg_num].cur_state = state;
  567. }
  568. }
  569. else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")
  570. {
  571. // <etcd_prefix>/osd/state/%d
  572. osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11));
  573. if (peer_osd > 0)
  574. {
  575. if (value.is_object() && value["state"] == "up" &&
  576. value["addresses"].is_array() &&
  577. value["port"].int64_value() > 0 && value["port"].int64_value() < 65536)
  578. {
  579. this->peer_states[peer_osd] = value;
  580. }
  581. else
  582. {
  583. this->peer_states.erase(peer_osd);
  584. }
  585. if (on_change_osd_state_hook != NULL)
  586. {
  587. on_change_osd_state_hook(peer_osd);
  588. }
  589. }
  590. }
  591. }