From fa98be6bc085c47635f1b9aa1ef39ff1456fd413 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 25 May 2020 15:53:48 +0300 Subject: [PATCH] Allow to specify multiple etcd addresses --- etcd_state_client.cpp | 26 ++++++++++++++++++----- etcd_state_client.h | 4 ++-- osd.cpp | 40 +++++++++++++++++------------------- osd.h | 2 -- osd_cluster.cpp | 48 ++++++++++++++++++++++++------------------- osd_ops.h | 2 +- 6 files changed, 70 insertions(+), 52 deletions(-) diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp index f857e5d8..ee61463d 100644 --- a/etcd_state_client.cpp +++ b/etcd_state_client.cpp @@ -18,8 +18,21 @@ json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) return kv; } +void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function callback) +{ + etcd_call("/kv/txn", txn, timeout, callback); +} + void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function callback) { + std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()]; + std::string etcd_api_path; + int pos = etcd_address.find('/'); + if (pos >= 0) + { + etcd_api_path = etcd_address.substr(pos); + etcd_address = etcd_address.substr(0, pos); + } std::string req = payload.dump(); req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n" "Host: "+etcd_address+"\r\n" @@ -30,13 +43,16 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t http_request_json(tfd, etcd_address, req, timeout, callback); } -void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function callback) -{ - etcd_call("/kv/txn", txn, timeout, callback); -} - void etcd_state_client_t::start_etcd_watcher() { + std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()]; + std::string etcd_api_path; + int pos = etcd_address.find('/'); + if (pos >= 0) + { + etcd_api_path = etcd_address.substr(pos); + etcd_address = etcd_address.substr(0, pos); + } etcd_watches_initialised = 0; etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg) { diff --git a/etcd_state_client.h b/etcd_state_client.h index 96a0b019..e4a5439a 100644 --- a/etcd_state_client.h +++ b/etcd_state_client.h @@ -32,8 +32,8 @@ struct json_kv_t struct etcd_state_client_t { - // FIXME Allow multiple etcd addresses and select random address - std::string etcd_address, etcd_prefix, etcd_api_path; + std::vector etcd_addresses; + std::string etcd_prefix; int log_level = 0; timerfd_manager_t *tfd = NULL; diff --git a/osd.cpp b/osd.cpp index 95675360..4c8e30c0 100644 --- a/osd.cpp +++ b/osd.cpp @@ -89,29 +89,27 @@ void osd_t::parse_config(blockstore_config_t & config) { int pos; // Initial startup configuration - etcd_address = config["etcd_address"]; - etcd_prefix = config["etcd_prefix"]; - if (etcd_prefix == "") - etcd_prefix = "/microceph"; - if ((pos = etcd_address.find('/')) >= 0) { - etcd_api_path = etcd_address.substr(pos); - etcd_address = etcd_address.substr(0, pos); + std::string ea = config["etcd_address"]; + while (1) + { + pos = ea.find(','); + std::string addr = pos >= 0 ? ea.substr(0, pos) : ea; + if (addr.length() > 0) + { + if (addr.find('/') < 0) + addr += "/v3"; + st_cli.etcd_addresses.push_back(addr); + } + if (pos >= 0) + ea = ea.substr(pos+1); + else + break; + } } - else if (config.find("etcd_version") != config.end()) - { - int major, minor; - if (sscanf(config["etcd_version"].c_str(), "%d.%d", &major, &minor) < 2) - throw std::runtime_error("etcd_version should be in the form MAJOR.MINOR (for example, 3.2)"); - if (major < 3 || major == 3 && minor < 3) - throw std::runtime_error("Your etcd is too old, minimum required version is 3.3"); - else if (major == 3 && minor == 3) - etcd_api_path = "/v3beta"; - else - etcd_api_path = "/v3"; - } - else - etcd_api_path = "/v3"; + st_cli.etcd_prefix = config["etcd_prefix"]; + if (st_cli.etcd_prefix == "") + st_cli.etcd_prefix = "/microceph"; etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10); if (etcd_report_interval <= 0) etcd_report_interval = 30; diff --git a/osd.h b/osd.h index 9eb4dcb3..8338d1cc 100644 --- a/osd.h +++ b/osd.h @@ -189,8 +189,6 @@ class osd_t // config blockstore_config_t config; - // FIXME Allow multiple etcd addresses and select random address - std::string etcd_address, etcd_prefix, etcd_api_path; int etcd_report_interval = 30; bool readonly = false; diff --git a/osd_cluster.cpp b/osd_cluster.cpp index c9c4f131..267add65 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -10,7 +10,7 @@ // Peer connection is lost -> Reload connection data -> Try to reconnect void osd_t::init_cluster() { - if (etcd_address == "") + if (!st_cli.etcd_addresses.size()) { if (run_primary) { @@ -43,9 +43,6 @@ void osd_t::init_cluster() else { st_cli.tfd = tfd; - st_cli.etcd_address = etcd_address; - st_cli.etcd_prefix = etcd_prefix; - st_cli.etcd_api_path = etcd_api_path; st_cli.log_level = log_level; st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); }; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; @@ -164,7 +161,7 @@ void osd_t::report_statistics() etcd_reporting_stats = true; json11::Json::array txn = { json11::Json::object { { "request_put", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/osd/stats/"+std::to_string(osd_num)) }, + { "key", base64_encode(st_cli.etcd_prefix+"/osd/stats/"+std::to_string(osd_num)) }, { "value", base64_encode(get_statistics().dump()) }, } } } }; @@ -185,7 +182,7 @@ void osd_t::report_statistics() pg_stats["write_osd_set"] = pg.cur_set; txn.push_back(json11::Json::object { { "request_put", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/stats/"+std::to_string(pg.pg_num)) }, + { "key", base64_encode(st_cli.etcd_prefix+"/pg/stats/"+std::to_string(pg.pg_num)) }, { "value", base64_encode(json11::Json(pg_stats).dump()) }, } } }); @@ -240,12 +237,12 @@ void osd_t::acquire_lease() // Maximum lease TTL is (report interval) + retries * (timeout + repeat interval) st_cli.etcd_call("/lease/grant", json11::Json::object { { "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 } - }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) + }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) { if (err != "" || data["ID"].string_value() == "") { printf("Error acquiring a lease from etcd: %s\n", err.c_str()); - tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) + tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id) { acquire_lease(); }); @@ -254,7 +251,7 @@ void osd_t::acquire_lease() etcd_lease_id = data["ID"].string_value(); create_osd_state(); }); - printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval); + printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, config["etcd_address"].c_str(), etcd_report_interval); tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id) { renew_lease(); @@ -265,7 +262,7 @@ void osd_t::acquire_lease() // Do it first to allow "monitors" check it when moving PGs void osd_t::create_osd_state() { - std::string state_key = base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)); + std::string state_key = base64_encode(st_cli.etcd_prefix+"/osd/state/"+std::to_string(osd_num)); self_state = get_osd_state(); st_cli.etcd_txn(json11::Json::object { // Check that the state key does not exist @@ -296,9 +293,18 @@ void osd_t::create_osd_state() { if (err != "") { - // FIXME Retry? But etcd should be already UP because we've just got the lease - printf("Error reporting OSD state to etcd: %s\n", err.c_str()); - force_stop(1); + etcd_failed_attempts++; + printf("Error creating OSD state key: %s\n", err.c_str()); + if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS) + { + // Die + throw std::runtime_error("Cluster connection failed"); + } + // Retry + tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id) + { + create_osd_state(); + }); return; } if (!data["succeeded"].bool_value()) @@ -386,7 +392,7 @@ json11::Json osd_t::on_load_pgs_checks_hook() json11::Json::object { { "target", "LEASE" }, { "lease", etcd_lease_id }, - { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, + { "key", base64_encode(st_cli.etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, } }; return checks; @@ -564,7 +570,7 @@ void osd_t::apply_pg_config() void osd_t::report_pg_states() { - if (etcd_reporting_pg_state || !this->pg_state_dirty.size() || !etcd_address.length()) + if (etcd_reporting_pg_state || !this->pg_state_dirty.size() || !st_cli.etcd_addresses.size()) { return; } @@ -582,7 +588,7 @@ void osd_t::report_pg_states() } auto & pg = pg_it->second; reporting_pgs.push_back({ pg.pg_num, pg.history_changed }); - std::string state_key_base64 = base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)); + std::string state_key_base64 = base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)); if (pg.state == PG_STARTING) { // Check that the PG key does not exist @@ -624,7 +630,7 @@ void osd_t::report_pg_states() } success.push_back(json11::Json::object { { "request_put", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, + { "key", base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, { "value", base64_encode(json11::Json(json11::Json::object { { "primary", this->osd_num }, { "state", pg_state_keywords }, @@ -640,7 +646,7 @@ void osd_t::report_pg_states() { success.push_back(json11::Json::object { { "request_delete_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, + { "key", base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, } } }); } @@ -648,7 +654,7 @@ void osd_t::report_pg_states() { success.push_back(json11::Json::object { { "request_put", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, + { "key", base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, { "value", base64_encode(json11::Json(json11::Json::object { { "all_peers", pg.all_peers }, }).dump()) }, @@ -689,7 +695,7 @@ void osd_t::report_pg_states() if (res["kvs"].array_items().size()) { auto kv = st_cli.parse_etcd_kv(res["kvs"][0]); - pg_num_t pg_num = stoull_full(kv.key.substr(etcd_prefix.length()+10)); + pg_num_t pg_num = stoull_full(kv.key.substr(st_cli.etcd_prefix.length()+10)); auto pg_it = pgs.find(pg_num); if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) { @@ -785,7 +791,7 @@ void osd_t::load_and_connect_peers() wp_it->second.last_load_attempt = time(NULL); load_peer_txn.push_back(json11::Json::object { { "request_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(peer_osd)) }, + { "key", base64_encode(st_cli.etcd_prefix+"/osd/state/"+std::to_string(peer_osd)) }, } } }); } diff --git a/osd_ops.h b/osd_ops.h index 2b5d6817..d35e121a 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -146,7 +146,7 @@ struct __attribute__((__packed__)) osd_reply_secondary_list_t }; // read or write to the primary OSD (must be within individual stripe) -// FIXME: allow to return used block bitmap (required for snaphots) +// FIXME: allow to return used block bitmap (required for snapshots) struct __attribute__((__packed__)) osd_op_rw_t { osd_op_header_t header;