Fix PG loading - now it works, at least once

trace-sqes
Vitaliy Filippov 2020-04-17 02:33:44 +03:00
parent 2a8e40835e
commit 9126ffb0f9
4 changed files with 59 additions and 39 deletions

View File

@ -3,10 +3,11 @@
std::string base64_encode(const std::string &in)
{
std::string out;
unsigned val = 0, valb = -6;
unsigned val = 0;
int valb = -6;
for (unsigned char c: in)
{
val = (val<<8) + c;
val = (val << 8) + c;
valb += 8;
while (valb >= 0)
{
@ -33,16 +34,17 @@ std::string base64_decode(const std::string &in)
for (int i = 0; i < 64; i++)
T[(unsigned char)("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[i])] = i;
}
unsigned val = 0, valb = -8;
unsigned val = 0;
int valb = -8;
for (unsigned char c: in)
{
if (T[c] == -1)
break;
val = (val<<6) + T[c];
valb += 6;
if (valb>=0)
if (valb >= 0)
{
out.push_back(char((val>>valb)&0xFF));
out.push_back(char((val >> valb) & 0xFF));
valb -= 8;
}
}

View File

@ -159,30 +159,25 @@ void osd_t::parse_pgs(json11::Json data)
throw std::runtime_error("Bad key in PG hash: "+pg_item.first);
}
auto & pg_json = pg_item.second;
osd_num_t primary_osd = 0;
std::vector<osd_num_t> target_set;
for (auto pg_osd_num: pg_json["osd_set"].array_items())
{
osd_num_t pg_osd = pg_osd_num.uint64_value();
target_set.push_back(pg_osd);
if (pg_osd != 0 && primary_osd == 0)
{
primary_osd = pg_osd;
}
}
if (target_set.size() != 3)
{
throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set");
}
osd_num_t primary_osd = pg_json["primary"].uint64_value();
if (primary_osd == this->osd_num)
{
// Take this PG
std::vector<osd_num_t> target_set;
for (auto pg_osd_num: pg_json["osd_set"].array_items())
{
osd_num_t pg_osd = pg_osd_num.uint64_value();
target_set.push_back(pg_osd);
}
if (target_set.size() != 3)
{
throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set");
}
this->pgs[pg_num] = (pg_t){
.state = PG_PEERING,
.pg_cursize = 0,
.pg_num = pg_num,
.target_set = target_set,
.cur_set = target_set,
};
this->pgs[pg_num].print_state();
// Add peers
@ -216,10 +211,9 @@ void osd_t::load_and_connect_peers()
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
}
}
else if (peer_states.find(osd_num) == peer_states.end() &&
time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval)
else if (peer_states.find(osd_num) == peer_states.end())
{
if (!loading_peer_config)
if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval))
{
// (Re)load OSD state from Consul
wp_it->second.last_load_attempt = time(NULL);
@ -237,7 +231,7 @@ void osd_t::load_and_connect_peers()
{
// Try to connect
wp_it->second.connecting = true;
const std::string & addr = peer_states[osd_num]["addresses"][wp_it->second.address_index].string_value();
const std::string addr = peer_states[osd_num]["addresses"][wp_it->second.address_index].string_value();
int64_t port = peer_states[osd_num]["port"].int64_value();
wp_it++;
connect_peer(osd_num, addr.c_str(), port, [this](osd_num_t osd_num, int peer_fd)
@ -266,6 +260,7 @@ void osd_t::load_and_connect_peers()
if (!wanted_peers.size())
{
// Connected to all peers
printf("Connected to all peers\n");
peering_state = peering_state & ~OSD_CONNECTING_PEERS;
}
repeer_pgs(osd_num, true);
@ -292,20 +287,24 @@ void osd_t::load_and_connect_peers()
loading_peer_config = false;
if (err != "")
{
printf("Failed to load peer configuration from Consul");
printf("Failed to load peer configuration from Consul: %s\n", err.c_str());
return;
}
for (auto & res: data["Results"].array_items())
{
std::string key = res["KV"]["Key"].string_value();
// <consul_prefix>/osd/state/<osd_num>.
osd_num_t osd_num = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12));
osd_num_t peer_osd = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12));
std::string json_err;
json11::Json data = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err);
if (osd_num > 0 && data.is_object() && data["state"] == "up" &&
data["addresses"].is_array() && data["port"].is_number())
json11::Json st = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err);
if (json_err != "")
{
peer_states[osd_num] = data;
printf("Bad JSON in Consul key %s: %s\n", key.c_str(), json_err.c_str());
}
if (peer_osd > 0 && st.is_object() && st["state"] == "up" &&
st["addresses"].is_array() && st["port"].is_number())
{
peer_states[peer_osd] = st;
}
}
});

View File

@ -119,7 +119,7 @@ void osd_t::http_request_json(std::string host, std::string request,
json11::Json data = json11::Json::parse(res->body, json_err);
if (json_err != "")
{
callback("Bad JSON: "+json_err+" (response: "+res->body+")", json11::Json());
callback("Bad JSON: "+json_err+" (response: "+(res->body == "" ? txt : res->body)+")", json11::Json());
return;
}
callback(std::string(), data);
@ -142,14 +142,27 @@ http_response_t *parse_http_response(std::string res)
status_text = NULL;
}
int prev = pos;
while ((pos = res.find("\r\n", prev)) > prev)
while ((pos = res.find("\r\n", prev)) >= prev)
{
if (pos == prev+2)
if (pos == prev)
{
parsed->body = res.substr(pos+2);
if (parsed->headers["transfer-encoding"] == "chunked")
{
prev = pos+2;
while ((pos = res.find("\r\n", prev)) >= prev)
{
uint64_t len = strtoull(res.c_str()+prev, NULL, 16);
parsed->body += res.substr(pos+2, len);
prev = pos+2+len+2;
}
}
else
{
parsed->body = res.substr(pos+2);
}
break;
}
std::string header = res.substr(prev, pos);
std::string header = res.substr(prev, pos-prev);
int p2 = header.find(":");
if (p2 >= 0)
{

View File

@ -230,9 +230,8 @@ void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected)
for (int r = 0; r < p.second.target_set.size(); r++)
{
if (p.second.target_set[r] == osd_num &&
p.second.cur_set[r] != real_osd)
(p.second.cur_set.size() < r || p.second.cur_set[r] != real_osd))
{
p.second.cur_set[r] = real_osd;
repeer = true;
break;
}
@ -287,8 +286,11 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
dirty_pgs.erase(pg.pg_num);
// Start peering
pg.pg_cursize = 0;
for (int role = 0; role < pg.cur_set.size(); role++)
pg.cur_set.resize(pg.target_set.size());
for (int role = 0; role < pg.target_set.size(); role++)
{
pg.cur_set[role] = pg.target_set[role] == this->osd_num ||
osd_peer_fds.find(pg.target_set[role]) != osd_peer_fds.end() ? pg.target_set[role] : 0;
if (pg.cur_set[role] != 0)
{
pg.pg_cursize++;
@ -308,7 +310,9 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
for (role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] == it->first)
{
break;
}
}
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
{
@ -342,7 +346,9 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
for (role = 0; role < pg.cur_set.size(); role++)
{
if (pg.cur_set[role] == it->first)
{
break;
}
}
if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size())
{