Fix reporting to Consul, report even if we are purely secondary

trace-sqes
Vitaliy Filippov 2020-04-17 01:59:06 +03:00
parent 309486d746
commit 2a8e40835e
6 changed files with 39 additions and 16 deletions

View File

@ -47,6 +47,8 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
if (run_primary) if (run_primary)
init_primary(); init_primary();
init_cluster();
consumer.loop = [this]() { loop(); }; consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer); ringloop->register_consumer(&consumer);
} }

1
osd.h
View File

@ -267,6 +267,7 @@ class osd_t
void print_stats(); void print_stats();
void reset_stats(); void reset_stats();
json11::Json get_status(); json11::Json get_status();
void init_cluster();
void report_status(); void report_status();
void load_pgs(); void load_pgs();
void parse_pgs(json11::Json data); void parse_pgs(json11::Json data);

View File

@ -2,6 +2,19 @@
#include "osd_http.h" #include "osd_http.h"
#include "base64.h" #include "base64.h"
void osd_t::init_cluster()
{
if (consul_address != "")
{
printf("OSD %lu reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval);
report_status();
this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]()
{
report_status();
});
}
}
json11::Json osd_t::get_status() json11::Json osd_t::get_status()
{ {
json11::Json::object st; json11::Json::object st;
@ -18,6 +31,8 @@ json11::Json osd_t::get_status()
st["addresses"] = bind_addresses; st["addresses"] = bind_addresses;
} }
st["port"] = bind_port; st["port"] = bind_port;
st["primary_enabled"] = run_primary;
st["blockstore_ready"] = bs->is_started();
st["blockstore_enabled"] = bs ? true : false; st["blockstore_enabled"] = bs ? true : false;
if (bs) if (bs)
{ {
@ -69,9 +84,9 @@ json11::Json osd_t::get_status()
void osd_t::report_status() void osd_t::report_status()
{ {
std::string st = get_status().dump(); std::string st = get_status().dump();
// (!) Keys end with / to allow "select /osd/state/123/ by prefix" // (!) Keys end with . to allow "select /osd/state/123. by prefix"
// because Consul transactions fail if you try to read non-existing keys // because Consul transactions fail if you try to read non-existing keys
std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/ HTTP/1.1\r\n"+ std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/state/"+std::to_string(osd_num)+". HTTP/1.1\r\n"+
"Host: "+consul_host+"\r\n"+ "Host: "+consul_host+"\r\n"+
"Content-Length: "+std::to_string(st.size())+"\r\n"+ "Content-Length: "+std::to_string(st.size())+"\r\n"+
"Connection: close\r\n"+ "Connection: close\r\n"+
@ -211,7 +226,7 @@ void osd_t::load_and_connect_peers()
consul_txn.push_back(json11::Json::object { consul_txn.push_back(json11::Json::object {
{ "KV", json11::Json::object { { "KV", json11::Json::object {
{ "Verb", "get-tree" }, { "Verb", "get-tree" },
{ "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"/" }, { "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." },
} } } }
}); });
} }
@ -283,7 +298,7 @@ void osd_t::load_and_connect_peers()
for (auto & res: data["Results"].array_items()) for (auto & res: data["Results"].array_items())
{ {
std::string key = res["KV"]["Key"].string_value(); std::string key = res["KV"]["Key"].string_value();
// <consul_prefix>/osd/state/<osd_num>/ // <consul_prefix>/osd/state/<osd_num>.
osd_num_t osd_num = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12)); osd_num_t osd_num = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12));
std::string json_err; std::string json_err;
json11::Json data = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); json11::Json data = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err);

View File

@ -308,12 +308,7 @@ void http_co_t::resume()
// Read response // Read response
if (st == 5) if (st == 5)
{ {
if (epoll_events & (EPOLLRDHUP|EPOLLERR)) if (epoll_events & EPOLLIN)
{
delete this;
return;
}
else if (epoll_events & EPOLLIN)
{ {
if (rbuf.size() != 9000) if (rbuf.size() != 9000)
rbuf.resize(9000); rbuf.resize(9000);
@ -332,7 +327,12 @@ void http_co_t::resume()
}; };
my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0); my_uring_prep_recvmsg(sqe, peer_fd, &msg, 0);
st = 6; st = 6;
epoll_events = 0; epoll_events = epoll_events & ~EPOLLIN;
}
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
{
delete this;
return;
} }
} }
if (st == 6) if (st == 6)

View File

@ -38,10 +38,6 @@ void osd_t::init_primary()
{ {
peering_state = OSD_LOADING_PGS; peering_state = OSD_LOADING_PGS;
load_pgs(); load_pgs();
this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]()
{
report_status();
});
} }
if (autosync_interval > 0) if (autosync_interval > 0)
{ {

View File

@ -98,10 +98,19 @@ void timerfd_manager_t::set_nearest()
nearest = i; nearest = i;
} }
} }
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
itimerspec exp = { itimerspec exp = {
.it_interval = { 0 }, .it_interval = { 0 },
.it_value = timers[nearest].next, .it_value = timers[nearest].next,
}; };
exp.it_value.tv_sec -= now.tv_sec;
exp.it_value.tv_nsec -= now.tv_nsec;
if (exp.it_value.tv_nsec < 0)
{
exp.it_value.tv_sec--;
exp.it_value.tv_nsec += 1000000000;
}
if (timerfd_settime(timerfd, 0, &exp, NULL)) if (timerfd_settime(timerfd, 0, &exp, NULL))
{ {
throw std::runtime_error(std::string("timerfd_settime: ") + strerror(errno)); throw std::runtime_error(std::string("timerfd_settime: ") + strerror(errno));
@ -130,7 +139,7 @@ void timerfd_manager_t::set_wait()
} }
ring_data_t *data = ((ring_data_t*)sqe->user_data); ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, timerfd, POLLIN); my_uring_prep_poll_add(sqe, timerfd, POLLIN);
data->callback = [&](ring_data_t *data) data->callback = [this](ring_data_t *data)
{ {
if (data->res < 0) if (data->res < 0)
{ {