Add consul & connect timeouts, report state before loading PGs, move init_primary to osd_cluster

trace-sqes
Vitaliy Filippov 2020-04-20 15:43:07 +03:00
parent 663153713b
commit ff38b464a5
7 changed files with 150 additions and 93 deletions

View File

@ -44,9 +44,6 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
this->tfd = new timerfd_manager_t(ringloop);
if (run_primary)
init_primary();
init_cluster();
consumer.loop = [this]() { loop(); };
@ -141,6 +138,12 @@ void osd_t::parse_config(blockstore_config_t & config)
peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10);
if (!peer_connect_interval)
peer_connect_interval = 5;
http_request_timeout = strtoull(config["http_request_timeout"].c_str(), NULL, 10);
if (!http_request_timeout)
http_request_timeout = 5;
peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10);
if (!peer_connect_timeout)
peer_connect_timeout = 5;
}
void osd_t::bind_socket()

3
osd.h
View File

@ -136,6 +136,7 @@ struct osd_client_t
int peer_fd;
int peer_state;
std::function<void(osd_num_t, int)> connect_callback;
int connect_timeout_id = -1;
osd_num_t osd_num = 0;
void *in_buf = NULL;
@ -211,6 +212,8 @@ class osd_t
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
int peer_connect_interval = 5;
int http_request_timeout = 5;
int peer_connect_timeout = 5;
// peer OSDs

View File

@ -4,10 +4,17 @@
void osd_t::init_cluster()
{
if (run_primary)
{
init_primary();
}
if (consul_address != "")
{
if (!run_primary)
{
report_status();
}
printf("OSD %lu reporting to Consul at %s each %d seconds\n", osd_num, consul_address.c_str(), consul_report_interval);
report_status();
this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]()
{
report_status();
@ -15,6 +22,74 @@ void osd_t::init_cluster()
}
}
void osd_t::init_primary()
{
if (consul_address == "")
{
// Test version of clustering code with 1 PG and 2 peers
// Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
std::string peerstr = config["peers"];
while (peerstr.size())
{
int pos = peerstr.find(',');
parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos));
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
}
if (peer_states.size() < 2)
{
throw std::runtime_error("run_primary requires at least 2 peers");
}
pgs[1] = (pg_t){
.state = PG_PEERING,
.pg_cursize = 0,
.pg_num = 1,
.target_set = { 1, 2, 3 },
.cur_set = { 0, 0, 0 },
};
pgs[1].print_state();
pg_count = 1;
peering_state = OSD_CONNECTING_PEERS;
}
else
{
peering_state = OSD_LOADING_PGS;
load_pgs();
}
if (autosync_interval > 0)
{
this->sync_tfd = new timerfd_interval(ringloop, autosync_interval, [this]()
{
autosync();
});
}
}
void osd_t::parse_test_peer(std::string peer)
{
// OSD_NUM:IP:PORT
int pos1 = peer.find(':');
int pos2 = peer.find(':', pos1+1);
if (pos1 < 0 || pos2 < 0)
throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT");
std::string addr = peer.substr(pos1+1, pos2-pos1-1);
std::string osd_num_str = peer.substr(0, pos1);
std::string port_str = peer.substr(pos2+1);
osd_num_t osd_num = strtoull(osd_num_str.c_str(), NULL, 10);
if (!osd_num)
throw new std::runtime_error("Could not parse OSD peer osd_num");
else if (peer_states.find(osd_num) != peer_states.end())
throw std::runtime_error("Same osd number "+std::to_string(osd_num)+" specified twice in peers");
int port = strtoull(port_str.c_str(), NULL, 10);
if (!port)
throw new std::runtime_error("Could not parse OSD peer port");
peer_states[osd_num] = json11::Json::object {
{ "state", "up" },
{ "addresses", json11::Json::array { addr } },
{ "port", port },
};
wanted_peers[osd_num] = { 0 };
}
json11::Json osd_t::get_status()
{
json11::Json::object st;
@ -142,21 +217,6 @@ void osd_t::consul_txn(json11::Json txn, std::function<void(std::string, json11:
http_request_json(consul_address, req, callback);
}
uint64_t stoull_full(std::string str, int base = 10)
{
if (isspace(str[0]))
{
return 0;
}
size_t end = -1;
uint64_t r = std::stoull(str, &end, base);
if (end < str.length())
{
return 0;
}
return r;
}
// Start -> Load PGs -> Load peers -> Connect to peers -> Peer PGs
// Wait for PG changes -> Start/Stop PGs when requested
// Peer connection is lost -> Reload connection data -> Try to reconnect -> Repeat
@ -164,6 +224,14 @@ void osd_t::load_pgs()
{
assert(this->pgs.size() == 0);
json11::Json::array txn = {
// Update OSD state when loading PGs to allow "monitors" do CAS transactions when moving PGs
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "set" },
{ "Key", consul_prefix+"/osd/state/"+std::to_string(osd_num)+"." },
{ "Value", base64_encode(get_status().dump()) },
} }
},
json11::Json::object {
{ "KV", json11::Json::object {
{ "Verb", "get" },
@ -188,6 +256,7 @@ void osd_t::load_pgs()
});
return;
}
peering_state &= ~OSD_LOADING_PGS;
json11::Json pg_config;
std::map<pg_num_t, json11::Json> pg_history;
for (auto & res: data["Results"].array_items())
@ -203,10 +272,10 @@ void osd_t::load_pgs()
{
pg_config = value;
}
else
else if (key.substr(0, consul_prefix.length()+12) == consul_prefix+"/pg/history/")
{
// <consul_prefix>/pg/history/%d.
pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+13, key.length()-consul_prefix.length()-14));
pg_num_t pg_num = stoull_full(key.substr(consul_prefix.length()+12, key.length()-consul_prefix.length()-13));
if (pg_num)
{
pg_history[pg_num] = value;

View File

@ -25,7 +25,7 @@ static int extract_port(std::string & host)
return port;
}
std::vector<std::string> getifaddr_list()
std::vector<std::string> getifaddr_list(bool include_v6)
{
std::vector<std::string> addresses;
ifaddrs *list, *ifa;
@ -40,7 +40,7 @@ std::vector<std::string> getifaddr_list()
continue;
}
int family = ifa->ifa_addr->sa_family;
if ((family == AF_INET || family == AF_INET6) &&
if ((family == AF_INET || family == AF_INET6 && include_v6) &&
(ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING))
{
void *addr_ptr;
@ -70,6 +70,7 @@ struct http_co_t
int st = 0;
int peer_fd = -1;
int timeout_id = -1;
int epoll_events = 0;
int code = 0;
int sent = 0, received = 0;
@ -181,6 +182,11 @@ http_response_t *parse_http_response(std::string res)
http_co_t::~http_co_t()
{
if (timeout_id >= 0)
{
osd->tfd->clear_timer(timeout_id);
timeout_id = -1;
}
callback(code, response);
if (peer_fd >= 0)
{
@ -214,6 +220,17 @@ void http_co_t::resume()
return;
}
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
if (osd->http_request_timeout > 0)
{
timeout_id = osd->tfd->set_timer(1000*osd->http_request_timeout, false, [this](int timer_id)
{
if (response.length() == 0)
{
code = EIO;
}
delete this;
});
}
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS)
{
@ -366,3 +383,18 @@ void http_co_t::resume()
return;
}
}
uint64_t stoull_full(std::string str, int base)
{
if (isspace(str[0]))
{
return 0;
}
size_t end = -1;
uint64_t r = std::stoull(str, &end, base);
if (end < str.length())
{
return 0;
}
return r;
}

View File

@ -13,4 +13,5 @@ struct http_response_t
};
http_response_t *parse_http_response(std::string res);
std::vector<std::string> getifaddr_list();
std::vector<std::string> getifaddr_list(bool include_v6 = false);
uint64_t stoull_full(std::string str, int base = 10);

View File

@ -6,74 +6,6 @@
#include "base64.h"
#include "osd.h"
void osd_t::init_primary()
{
if (consul_address == "")
{
// Test version of clustering code with 1 PG and 2 peers
// Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
std::string peerstr = config["peers"];
while (peerstr.size())
{
int pos = peerstr.find(',');
parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos));
peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1);
}
if (peer_states.size() < 2)
{
throw std::runtime_error("run_primary requires at least 2 peers");
}
pgs[1] = (pg_t){
.state = PG_PEERING,
.pg_cursize = 0,
.pg_num = 1,
.target_set = { 1, 2, 3 },
.cur_set = { 0, 0, 0 },
};
pgs[1].print_state();
pg_count = 1;
peering_state = OSD_CONNECTING_PEERS;
}
else
{
peering_state = OSD_LOADING_PGS;
load_pgs();
}
if (autosync_interval > 0)
{
this->sync_tfd = new timerfd_interval(ringloop, 3, [this]()
{
autosync();
});
}
}
void osd_t::parse_test_peer(std::string peer)
{
// OSD_NUM:IP:PORT
int pos1 = peer.find(':');
int pos2 = peer.find(':', pos1+1);
if (pos1 < 0 || pos2 < 0)
throw new std::runtime_error("OSD peer string must be in the form OSD_NUM:IP:PORT");
std::string addr = peer.substr(pos1+1, pos2-pos1-1);
std::string osd_num_str = peer.substr(0, pos1);
std::string port_str = peer.substr(pos2+1);
osd_num_t osd_num = strtoull(osd_num_str.c_str(), NULL, 10);
if (!osd_num)
throw new std::runtime_error("Could not parse OSD peer osd_num");
else if (peer_states.find(osd_num) != peer_states.end())
throw std::runtime_error("Same osd number "+std::to_string(osd_num)+" specified twice in peers");
int port = strtoull(port_str.c_str(), NULL, 10);
if (!port)
throw new std::runtime_error("Could not parse OSD peer port");
peer_states[osd_num] = json11::Json::object {
{ "state", "up" },
{ "addresses", json11::Json::array { addr } },
{ "port", port },
};
wanted_peers[osd_num] = { 0 };
}
void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback)
{
struct sockaddr_in addr;
@ -92,6 +24,18 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
return;
}
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
int timeout_id = -1;
if (peer_connect_timeout > 0)
{
tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
{
auto callback = clients[peer_fd].connect_callback;
osd_num_t osd_num = clients[peer_fd].osd_num;
stop_client(peer_fd);
callback(osd_num, -EIO);
return;
});
}
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS)
{
@ -105,6 +49,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTING,
.connect_callback = callback,
.connect_timeout_id = timeout_id,
.osd_num = osd_num,
.in_buf = malloc(receive_buffer_size),
};
@ -122,6 +67,11 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port
void osd_t::handle_connect_result(int peer_fd)
{
auto & cl = clients[peer_fd];
if (cl.connect_timeout_id >= 0)
{
tfd->clear_timer(cl.connect_timeout_id);
cl.connect_timeout_id = -1;
}
osd_num_t osd_num = cl.osd_num;
auto callback = cl.connect_callback;
int result = 0;

View File

@ -110,7 +110,6 @@ void pg_obj_state_check_t::handle_version()
n_copies++;
if (replica >= pg->pg_size)
{
// FIXME In the future, check it against the PG epoch number to handle replication factor/scheme changes
n_buggy++;
}
else