From 9126ffb0f9fa9285ff11bd5644e396695ecf9c69 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 17 Apr 2020 02:33:44 +0300 Subject: [PATCH] Fix PG loading - now it works, at least once --- base64.cpp | 12 +++++++----- osd_cluster.cpp | 51 ++++++++++++++++++++++++------------------------- osd_http.cpp | 23 +++++++++++++++++----- osd_peering.cpp | 12 +++++++++--- 4 files changed, 59 insertions(+), 39 deletions(-) diff --git a/base64.cpp b/base64.cpp index 79546452..dfe366eb 100644 --- a/base64.cpp +++ b/base64.cpp @@ -3,10 +3,11 @@ std::string base64_encode(const std::string &in) { std::string out; - unsigned val = 0, valb = -6; + unsigned val = 0; + int valb = -6; for (unsigned char c: in) { - val = (val<<8) + c; + val = (val << 8) + c; valb += 8; while (valb >= 0) { @@ -33,16 +34,17 @@ std::string base64_decode(const std::string &in) for (int i = 0; i < 64; i++) T[(unsigned char)("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"[i])] = i; } - unsigned val = 0, valb = -8; + unsigned val = 0; + int valb = -8; for (unsigned char c: in) { if (T[c] == -1) break; val = (val<<6) + T[c]; valb += 6; - if (valb>=0) + if (valb >= 0) { - out.push_back(char((val>>valb)&0xFF)); + out.push_back(char((val >> valb) & 0xFF)); valb -= 8; } } diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 192f6783..8eea1710 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -159,30 +159,25 @@ void osd_t::parse_pgs(json11::Json data) throw std::runtime_error("Bad key in PG hash: "+pg_item.first); } auto & pg_json = pg_item.second; - osd_num_t primary_osd = 0; - std::vector target_set; - for (auto pg_osd_num: pg_json["osd_set"].array_items()) - { - osd_num_t pg_osd = pg_osd_num.uint64_value(); - target_set.push_back(pg_osd); - if (pg_osd != 0 && primary_osd == 0) - { - primary_osd = pg_osd; - } - } - if (target_set.size() != 3) - { - throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set"); - } + osd_num_t primary_osd = pg_json["primary"].uint64_value(); if (primary_osd == this->osd_num) { // Take this PG + std::vector target_set; + for (auto pg_osd_num: pg_json["osd_set"].array_items()) + { + osd_num_t pg_osd = pg_osd_num.uint64_value(); + target_set.push_back(pg_osd); + } + if (target_set.size() != 3) + { + throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set"); + } this->pgs[pg_num] = (pg_t){ .state = PG_PEERING, .pg_cursize = 0, .pg_num = pg_num, .target_set = target_set, - .cur_set = target_set, }; this->pgs[pg_num].print_state(); // Add peers @@ -216,10 +211,9 @@ void osd_t::load_and_connect_peers() peering_state = peering_state & ~OSD_CONNECTING_PEERS; } } - else if (peer_states.find(osd_num) == peer_states.end() && - time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval) + else if (peer_states.find(osd_num) == peer_states.end()) { - if (!loading_peer_config) + if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval)) { // (Re)load OSD state from Consul wp_it->second.last_load_attempt = time(NULL); @@ -237,7 +231,7 @@ void osd_t::load_and_connect_peers() { // Try to connect wp_it->second.connecting = true; - const std::string & addr = peer_states[osd_num]["addresses"][wp_it->second.address_index].string_value(); + const std::string addr = peer_states[osd_num]["addresses"][wp_it->second.address_index].string_value(); int64_t port = peer_states[osd_num]["port"].int64_value(); wp_it++; connect_peer(osd_num, addr.c_str(), port, [this](osd_num_t osd_num, int peer_fd) @@ -266,6 +260,7 @@ void osd_t::load_and_connect_peers() if (!wanted_peers.size()) { // Connected to all peers + printf("Connected to all peers\n"); peering_state = peering_state & ~OSD_CONNECTING_PEERS; } repeer_pgs(osd_num, true); @@ -292,20 +287,24 @@ void osd_t::load_and_connect_peers() loading_peer_config = false; if (err != "") { - printf("Failed to load peer configuration from Consul"); + printf("Failed to load peer configuration from Consul: %s\n", err.c_str()); return; } for (auto & res: data["Results"].array_items()) { std::string key = res["KV"]["Key"].string_value(); // /osd/state/. - osd_num_t osd_num = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12)); + osd_num_t peer_osd = std::stoull(key.substr(consul_prefix.length()+11, key.length()-consul_prefix.length()-12)); std::string json_err; - json11::Json data = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); - if (osd_num > 0 && data.is_object() && data["state"] == "up" && - data["addresses"].is_array() && data["port"].is_number()) + json11::Json st = json11::Json::parse(base64_decode(res["KV"]["Value"].string_value()), json_err); + if (json_err != "") { - peer_states[osd_num] = data; + printf("Bad JSON in Consul key %s: %s\n", key.c_str(), json_err.c_str()); + } + if (peer_osd > 0 && st.is_object() && st["state"] == "up" && + st["addresses"].is_array() && st["port"].is_number()) + { + peer_states[peer_osd] = st; } } }); diff --git a/osd_http.cpp b/osd_http.cpp index dcdf3d69..cbf00e2f 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -119,7 +119,7 @@ void osd_t::http_request_json(std::string host, std::string request, json11::Json data = json11::Json::parse(res->body, json_err); if (json_err != "") { - callback("Bad JSON: "+json_err+" (response: "+res->body+")", json11::Json()); + callback("Bad JSON: "+json_err+" (response: "+(res->body == "" ? txt : res->body)+")", json11::Json()); return; } callback(std::string(), data); @@ -142,14 +142,27 @@ http_response_t *parse_http_response(std::string res) status_text = NULL; } int prev = pos; - while ((pos = res.find("\r\n", prev)) > prev) + while ((pos = res.find("\r\n", prev)) >= prev) { - if (pos == prev+2) + if (pos == prev) { - parsed->body = res.substr(pos+2); + if (parsed->headers["transfer-encoding"] == "chunked") + { + prev = pos+2; + while ((pos = res.find("\r\n", prev)) >= prev) + { + uint64_t len = strtoull(res.c_str()+prev, NULL, 16); + parsed->body += res.substr(pos+2, len); + prev = pos+2+len+2; + } + } + else + { + parsed->body = res.substr(pos+2); + } break; } - std::string header = res.substr(prev, pos); + std::string header = res.substr(prev, pos-prev); int p2 = header.find(":"); if (p2 >= 0) { diff --git a/osd_peering.cpp b/osd_peering.cpp index 28d69bc7..c0d60dea 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -230,9 +230,8 @@ void osd_t::repeer_pgs(osd_num_t osd_num, bool is_connected) for (int r = 0; r < p.second.target_set.size(); r++) { if (p.second.target_set[r] == osd_num && - p.second.cur_set[r] != real_osd) + (p.second.cur_set.size() < r || p.second.cur_set[r] != real_osd)) { - p.second.cur_set[r] = real_osd; repeer = true; break; } @@ -287,8 +286,11 @@ void osd_t::start_pg_peering(pg_num_t pg_num) dirty_pgs.erase(pg.pg_num); // Start peering pg.pg_cursize = 0; - for (int role = 0; role < pg.cur_set.size(); role++) + pg.cur_set.resize(pg.target_set.size()); + for (int role = 0; role < pg.target_set.size(); role++) { + pg.cur_set[role] = pg.target_set[role] == this->osd_num || + osd_peer_fds.find(pg.target_set[role]) != osd_peer_fds.end() ? pg.target_set[role] : 0; if (pg.cur_set[role] != 0) { pg.pg_cursize++; @@ -308,7 +310,9 @@ void osd_t::start_pg_peering(pg_num_t pg_num) for (role = 0; role < pg.cur_set.size(); role++) { if (pg.cur_set[role] == it->first) + { break; + } } if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size()) { @@ -342,7 +346,9 @@ void osd_t::start_pg_peering(pg_num_t pg_num) for (role = 0; role < pg.cur_set.size(); role++) { if (pg.cur_set[role] == it->first) + { break; + } } if (pg.state == PG_INCOMPLETE || role >= pg.cur_set.size()) {