Use etcd instead of Consul

Vitaliy Filippov 2020-04-24 01:03:55 +03:00
parent 7cf71a8031
commit 4f42e9659e
3 changed files with 131 additions and 113 deletions

View File

@ -72,11 +72,6 @@ osd_t::~osd_t()
delete sync_tfd;
sync_tfd = NULL;
if (consul_tfd)
delete consul_tfd;
consul_tfd = NULL;
@ -106,15 +101,40 @@ osd_op_t::~osd_op_t()
void osd_t::parse_config(blockstore_config_t & config)
int pos;
// Initial startup configuration
consul_address = config["consul_address"];
consul_host = consul_address.find(':') >= 0 ? consul_address.substr(0, consul_address.find(':')) : consul_address;
consul_prefix = config["consul_prefix"];
if (consul_prefix == "")
consul_prefix = "microceph";
consul_report_interval = strtoull(config["consul_report_interval"].c_str(), NULL, 10);
if (consul_report_interval <= 0)
consul_report_interval = 30;
etcd_address = config["etcd_address"];
etcd_prefix = config["etcd_prefix"];
if (etcd_prefix == "")
etcd_prefix = "/microceph";
if ((pos = etcd_address.find('/')) >= 0)
etcd_api_path = etcd_address.substr(pos);
etcd_address = etcd_address.substr(0, pos);
else if (config.find("etcd_version") != config.end())
int major, minor;
if (sscanf(config["etcd_version"].c_str(), "%d.%d", &major, &minor) < 2)
throw std::runtime_error("etcd_version should be in the form MAJOR.MINOR (for example, 3.2)");
if (major < 3 || major == 3 && minor < 2)
throw std::runtime_error("Your etcd is too old, minimum required version is 3.2");
else if (major == 3 && minor == 2)
etcd_api_path = "/v3alpha";
else if (major == 3 && minor == 3)
etcd_api_path = "/v3beta";
etcd_api_path = "/v3";
etcd_api_path = "/v3";
if ((pos = etcd_address.find(':')) >= 0)
etcd_host = etcd_address.substr(0, pos);
etcd_host = etcd_address;
etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10);
if (etcd_report_interval <= 0)
etcd_report_interval = 30;
osd_num = strtoull(config["osd_num"].c_str(), NULL, 10);
if (!osd_num)
throw std::runtime_error("osd_num is required in the configuration");
@ -384,7 +404,7 @@ void osd_t::stop_client(int peer_fd)
if (cl.osd_num)
// Reload configuration from Consul when the connection is dropped
// Reload configuration from etcd when the connection is dropped
printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num);

View File

@ -50,9 +50,9 @@
//#define OSD_STUB
@ -197,8 +197,10 @@ class osd_t
// config
blockstore_config_t config;
std::string etcd_address, etcd_host, etcd_prefix, etcd_api_path;
int etcd_report_interval = 30;
bool readonly = false;
std::string consul_address, consul_host, consul_prefix = "microceph";
osd_num_t osd_num = 1; // OSD numbers start with 1
bool run_primary = false;
std::string bind_address;
@ -207,7 +209,6 @@ class osd_t
bool allow_test_ops = true;
int receive_buffer_size = 9000;
int print_stats_interval = 3;
int consul_report_interval = 30;
int immediate_commit = IMMEDIATE_NONE;
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
@ -221,7 +222,7 @@ class osd_t
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
bool loading_peer_config = false;
std::vector<std::string> bind_addresses;
int consul_failed_attempts = 0;
int etcd_failed_attempts = 0;
std::map<uint64_t, int> osd_peer_fds;
std::map<pg_num_t, pg_t> pgs;
@ -245,7 +246,7 @@ class osd_t
uint32_t bs_block_size, bs_disk_alignment;
uint64_t pg_stripe_size = 4*1024*1024; // 4 MB by default
ring_loop_t *ringloop;
timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL, *consul_tfd = NULL;
timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL;
timerfd_manager_t *tfd = NULL;
int wait_state = 0;
@ -271,7 +272,7 @@ class osd_t
void print_stats();
void reset_stats();
json11::Json get_status();
void consul_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback);
void etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback);
void init_cluster();
void report_status();
void load_pgs();

View File

@ -8,14 +8,14 @@ void osd_t::init_cluster()
if (consul_address != "")
if (etcd_address != "")
if (!run_primary)
printf("[OSD %lu] reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval);
this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]()
printf("[OSD %lu] reporting to etcd at %s each %d seconds\n", osd_num, etcd_address.c_str(), etcd_report_interval);
tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id)
@ -28,7 +28,7 @@ void osd_t::init_cluster()
void osd_t::init_primary()
if (consul_address == "")
if (etcd_address == "")
// Test version of clustering code with 1 PG and 2 peers
// Example: peers = 2:,3:
@ -142,16 +142,13 @@ void osd_t::report_status()
std::string st = get_status().dump();
// (!) Keys end with . to allow "select /osd/state/123. by prefix"
// because Consul transactions fail if you try to read non-existing keys
json11::Json::array txn = {
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "set" },
{ "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." },
{ "Value", base64_encode(st) },
} }
// because etcd transactions fail if you try to read non-existing keys
json11::Json::array txn = { json11::Json::object {
{ "requestPut", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) },
{ "value", base64_encode(st) },
} }
} };
for (auto & p: pgs)
auto & pg = p.second;
@ -168,10 +165,9 @@ void osd_t::report_status()
pg_st["incomplete_count"] = pg.incomplete_objects.size();
pg_st["write_osd_set"] = pg.cur_set;
txn.push_back(json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "set" },
{ "Key", consul_prefix+"/pg/state/"+std::to_string(pg.pg_num)+"." },
{ "Value", base64_encode(json11::Json(pg_st).dump()) },
{ "requestPut", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) },
{ "value", base64_encode(json11::Json(pg_st).dump()) },
} }
if (pg.state == PG_ACTIVE && pg.target_history.size())
@ -179,46 +175,45 @@ void osd_t::report_status()
pg.all_peers = pg.target_set;
txn.push_back(json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "delete" },
{ "Key", consul_prefix+"/pg/history/"+std::to_string(pg.pg_num)+"." },
{ "requestDeleteRange", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) },
} }
consul_txn(txn, [this](std::string err, json11::Json res)
etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json res)
if (err != "")
printf("Error reporting state to Consul: %s\n", err.c_str());
if (consul_failed_attempts > MAX_CONSUL_ATTEMPTS)
printf("Error reporting state to etcd: %s\n", err.c_str());
if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS)
throw std::runtime_error("Cluster connection failed");
// Retry
tfd->set_timer(CONSUL_RETRY_INTERVAL, false, [this](int timer_id)
tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id)
consul_failed_attempts = 0;
etcd_failed_attempts = 0;
void osd_t::consul_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback)
void osd_t::etcd_txn(json11::Json txn, std::function<void(std::string, json11::Json)> callback)
std::string req = txn.dump();
req = "PUT /v1/txn HTTP/1.1\r\n"
"Host: "+consul_host+"\r\n"
req = "POST "+etcd_api_path+"/kv/txn HTTP/1.1\r\n"
"Host: "+etcd_host+"\r\n"
"Content-Type: application/json\r\n"
"Content-Length: "+std::to_string(req.size())+"\r\n"
"Connection: close\r\n"
http_request_json(consul_address, req, callback);
http_request_json(etcd_address, req, callback);
// Start -> Load config & PGs -> Load peers -> Connect to peers -> Peer PGs
@ -230,37 +225,34 @@ void osd_t::load_pgs()
json11::Json::array txn = {
// Update OSD state when loading PGs to allow "monitors" do CAS transactions when moving PGs
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "set" },
{ "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." },
{ "Value", base64_encode(get_status().dump()) },
{ "requestPut", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) },
{ "value", base64_encode(get_status().dump()) },
} }
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get-tree" },
{ "Key", consul_prefix+"/config/osd/all" },
{ "requestRange", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/config/osd/all") },
} }
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get" },
{ "Key", consul_prefix+"/config/pgs" },
{ "requestRange", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/config/pgs") },
} }
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get-tree" },
{ "Key", consul_prefix+"/pg/history/" },
{ "requestRange", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
} }
consul_txn(txn, [this](std::string err, json11::Json data)
etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json data)
if (err != "")
printf("Error loading PGs from Consul: %s\n", err.c_str());
tfd->set_timer(CONSUL_START_INTERVAL, false, [this](int timer_id)
printf("Error loading PGs from etcd: %s\n", err.c_str());
tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id)
@ -270,40 +262,43 @@ void osd_t::load_pgs()
blockstore_config_t osd_config = this->config;
json11::Json pg_config;
std::map<pg_num_t, json11::Json> pg_history;
for (auto & res: data["Results"].array_items())
for (auto & res: data["responses"].array_items())
std::string key = res["KV"]["Key"].string_value();
if (key == (consul_prefix+"/osd/state/"+std::to_string(osd_num)+"."))
if (!res["response_range"].is_object())
std::string json_err, json_text = base64_decode(res["KV"]["Value"].string_value());
json11::Json value = json11::Json::parse(json_text, json_err);
if (json_err != "")
for (auto & kvs: res["response_range"]["kvs"].array_items())
printf("Bad JSON in Consul key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str());
if (key == consul_prefix+"/config/osd/all")
for (auto & cfg_var: value.object_items())
std::string key = base64_decode(kvs["key"].string_value());
std::string json_err, json_text = base64_decode(kvs["value"].string_value());
json11::Json value = json11::Json::parse(json_text, json_err);
if (json_err != "")
if (this->config.find(cfg_var.first) == this->config.end())
printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str());
if (key == etcd_prefix+"/config/osd/all")
for (auto & cfg_var: value.object_items())
osd_config[cfg_var.first] = cfg_var.second.string_value();
if (this->config.find(cfg_var.first) == this->config.end())
osd_config[cfg_var.first] = cfg_var.second.string_value();
else if (key == consul_prefix+"/config/pgs")
pg_config = value;
else if (key.substr(0, consul_prefix.length()+12) == consul_prefix+"/pg/history/")
// <consul_prefix>/pg/history/%d.
pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+12, key.length()-consul_prefix.length()-13));
if (pg_num)
else if (key == etcd_prefix+"/config/pgs")
pg_history[pg_num] = value;
pg_config = value;
else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
// <etcd_prefix>/pg/history/%d.
pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12));
if (pg_num)
pg_history[pg_num] = value;
@ -410,12 +405,11 @@ void osd_t::load_and_connect_peers()
if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval))
// (Re)load OSD state from Consul
// (Re)load OSD state from etcd
wp_it->second.last_load_attempt = time(NULL);
load_peer_txn.push_back(json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get-tree" },
{ "Key", consul_prefix+"/osd/state/"+std::to_string(peer_osd)+"." },
{ "requestRange", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(peer_osd)) },
} }
@ -469,32 +463,35 @@ void osd_t::load_and_connect_peers()
if (load_peer_txn.size() > 0)
consul_txn(load_peer_txn, [this](std::string err, json11::Json data)
etcd_txn(json11::Json::object { { "success", load_peer_txn } }, [this](std::string err, json11::Json data)
// Ugly, but required to wake up the loop
tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){});
loading_peer_config = false;
if (err != "")
printf("Failed to load peer configuration from Consul: %s\n", err.c_str());
printf("Failed to load peer configuration from etcd: %s\n", err.c_str());
for (auto & res: data["Results"].array_items())
for (auto & res: data["responses"].array_items())
std::string key = res["KV"]["Key"].string_value();
// <consul_prefix>/osd/state/<osd_num>.
osd_num_t peer_osd = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12));
std::string json_err;
std::string json_text = base64_decode(res["KV"]["Value"].string_value());
json11::Json st = json11::Json::parse(json_text, json_err);
if (json_err != "")
if (res["response_range"]["kvs"].array_items().size())
printf("Bad JSON in Consul key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str());
if (peer_osd > 0 && st.is_object() && st["state"] == "up" &&
st["addresses"].is_array() && st["port"].int64_value() > 0 && st["port"].int64_value() < 65536)
peer_states[peer_osd] = st;
std::string key = base64_decode(res["response_range"]["kvs"][0]["key"].string_value());
// <etcd_prefix>/osd/state/<osd_num>
osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11));
std::string json_err;
std::string json_text = base64_decode(res["response_range"]["kvs"][0]["value"].string_value());
json11::Json st = json11::Json::parse(json_text, json_err);
if (json_err != "")
printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str());
if (peer_osd > 0 && st.is_object() && st["state"] == "up" &&
st["addresses"].is_array() && st["port"].int64_value() > 0 && st["port"].int64_value() < 65536)
peer_states[peer_osd] = st;