|
|
|
@ -50,19 +50,13 @@ void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function< |
|
|
|
|
|
|
|
|
|
void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function<void(std::string, json11::Json)> callback) |
|
|
|
|
{ |
|
|
|
|
if (!etcd_addresses.size()) |
|
|
|
|
if (!etcd_addresses.size() && !etcd_local.size()) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "etcd_address is missing in Vitastor configuration\n"); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
std::string etcd_address; |
|
|
|
|
// Always prefer local etcd
|
|
|
|
|
// FIXME: Handle partial etcd failures. Now OSDs select etcd instance
|
|
|
|
|
// randomly so all OSDs may crash if any of etcds crashes.
|
|
|
|
|
if (etcd_local.size() > 0) |
|
|
|
|
etcd_address = etcd_local[rand() % etcd_local.size()]; |
|
|
|
|
else |
|
|
|
|
etcd_address = etcd_addresses[rand() % etcd_addresses.size()]; |
|
|
|
|
pick_next_etcd(); |
|
|
|
|
std::string etcd_address = selected_etcd_address; |
|
|
|
|
std::string etcd_api_path; |
|
|
|
|
int pos = etcd_address.find('/'); |
|
|
|
|
if (pos >= 0) |
|
|
|
@ -77,7 +71,12 @@ void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int t |
|
|
|
|
"Content-Length: "+std::to_string(req.size())+"\r\n" |
|
|
|
|
"Connection: close\r\n" |
|
|
|
|
"\r\n"+req; |
|
|
|
|
http_request_json(tfd, etcd_address, req, timeout, callback); |
|
|
|
|
http_request_json(tfd, etcd_address, req, timeout, [this, cur_addr = selected_etcd_address, callback](std::string err, json11::Json data) |
|
|
|
|
{ |
|
|
|
|
if (err != "" && cur_addr == selected_etcd_address) |
|
|
|
|
selected_etcd_address = ""; |
|
|
|
|
callback(err, data); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void etcd_state_client_t::add_etcd_url(std::string addr) |
|
|
|
@ -104,10 +103,17 @@ void etcd_state_client_t::add_etcd_url(std::string addr) |
|
|
|
|
check_addr = addr; |
|
|
|
|
if (pos == std::string::npos) |
|
|
|
|
addr += "/v3"; |
|
|
|
|
this->etcd_addresses.push_back(addr); |
|
|
|
|
for (int i = 0; i < local_ips.size(); i++) |
|
|
|
|
int i; |
|
|
|
|
for (i = 0; i < local_ips.size(); i++) |
|
|
|
|
{ |
|
|
|
|
if (local_ips[i] == check_addr) |
|
|
|
|
{ |
|
|
|
|
this->etcd_local.push_back(addr); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (i >= local_ips.size()) |
|
|
|
|
this->etcd_addresses.push_back(addr); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -146,14 +152,38 @@ void etcd_state_client_t::parse_config(const json11::Json & config) |
|
|
|
|
this->log_level = config["log_level"].int64_value(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void etcd_state_client_t::pick_next_etcd() |
|
|
|
|
{ |
|
|
|
|
if (selected_etcd_address != "") |
|
|
|
|
return; |
|
|
|
|
if (addresses_to_try.size() == 0) |
|
|
|
|
{ |
|
|
|
|
// Prefer local etcd, if any
|
|
|
|
|
for (int i = 0; i < etcd_local.size(); i++) |
|
|
|
|
addresses_to_try.push_back(etcd_local[i]); |
|
|
|
|
std::vector<int> ns; |
|
|
|
|
for (int i = 0; i < etcd_addresses.size(); i++) |
|
|
|
|
ns.push_back(i); |
|
|
|
|
while (ns.size()) |
|
|
|
|
{ |
|
|
|
|
int i = rand() % ns.size(); |
|
|
|
|
addresses_to_try.push_back(etcd_addresses[ns[i]]); |
|
|
|
|
ns.erase(ns.begin()+i, ns.begin()+i+1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
selected_etcd_address = addresses_to_try[0]; |
|
|
|
|
addresses_to_try.erase(addresses_to_try.begin(), addresses_to_try.begin()+1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void etcd_state_client_t::start_etcd_watcher() |
|
|
|
|
{ |
|
|
|
|
if (!etcd_addresses.size()) |
|
|
|
|
if (!etcd_addresses.size() && !etcd_local.size()) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "etcd_address is missing in Vitastor configuration\n"); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
std::string etcd_address = etcd_addresses[rand() % etcd_addresses.size()]; |
|
|
|
|
pick_next_etcd(); |
|
|
|
|
std::string etcd_address = selected_etcd_address; |
|
|
|
|
std::string etcd_api_path; |
|
|
|
|
int pos = etcd_address.find('/'); |
|
|
|
|
if (pos >= 0) |
|
|
|
@ -162,7 +192,8 @@ void etcd_state_client_t::start_etcd_watcher() |
|
|
|
|
etcd_address = etcd_address.substr(0, pos); |
|
|
|
|
} |
|
|
|
|
etcd_watches_initialised = 0; |
|
|
|
|
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg) |
|
|
|
|
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, |
|
|
|
|
[this, cur_addr = selected_etcd_address](const http_response_t *msg) |
|
|
|
|
{ |
|
|
|
|
if (msg->body.length()) |
|
|
|
|
{ |
|
|
|
@ -181,6 +212,7 @@ void etcd_state_client_t::start_etcd_watcher() |
|
|
|
|
if (etcd_watches_initialised == 4) |
|
|
|
|
{ |
|
|
|
|
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); |
|
|
|
|
addresses_to_try.clear(); |
|
|
|
|
} |
|
|
|
|
// First gather all changes into a hash to remove multiple overwrites
|
|
|
|
|
std::map<std::string, etcd_kv_t> changes; |
|
|
|
@ -209,11 +241,13 @@ void etcd_state_client_t::start_etcd_watcher() |
|
|
|
|
} |
|
|
|
|
if (msg->eof) |
|
|
|
|
{ |
|
|
|
|
if (cur_addr == selected_etcd_address) |
|
|
|
|
selected_etcd_address = ""; |
|
|
|
|
etcd_watch_ws = NULL; |
|
|
|
|
if (etcd_watches_initialised == 0) |
|
|
|
|
{ |
|
|
|
|
// Connection not established, retry in <ETCD_SLOW_TIMEOUT>
|
|
|
|
|
tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int) |
|
|
|
|
// Connection not established, retry in <ETCD_QUICK_TIMEOUT>
|
|
|
|
|
tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int) |
|
|
|
|
{ |
|
|
|
|
start_etcd_watcher(); |
|
|
|
|
}); |
|
|
|
@ -805,3 +839,8 @@ json11::Json::object etcd_state_client_t::serialize_inode_cfg(inode_config_t *cf |
|
|
|
|
} |
|
|
|
|
return new_cfg; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int etcd_state_client_t::address_count() |
|
|
|
|
{ |
|
|
|
|
return etcd_addresses.size() + etcd_local.size(); |
|
|
|
|
} |
|
|
|
|