diff --git a/Makefile b/Makefile index c3bb314d..c2458e87 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,8 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o \ - osd_primary.o osd_primary_subops.o osd_cluster.o http_client.o osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o + osd_primary.o osd_primary_subops.o etcd_state_client.o osd_cluster.o http_client.o pg_states.o \ + osd_rmw.o json11.o timerfd_interval.o base64.o timerfd_manager.o base64.o: base64.cpp base64.h g++ $(CXXFLAGS) -c -o $@ $< osd_secondary.o: osd_secondary.cpp osd.h osd_ops.h ringloop.h @@ -43,11 +44,15 @@ osd_peering.o: osd_peering.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_cluster.o: osd_cluster.cpp osd.h osd_ops.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< -http_client.o: http_client.cpp http_client.h osd.h osd_ops.h ringloop.h +http_client.o: http_client.cpp http_client.h osd.h osd_ops.h + g++ $(CXXFLAGS) -c -o $@ $< +etcd_state_client.o: etcd_state_client.cpp etcd_state_client.h http_client.h pg_states.h g++ $(CXXFLAGS) -c -o $@ $< osd_flush.o: osd_flush.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< -osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h +osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h pg_states.h + g++ $(CXXFLAGS) -c -o $@ $< +pg_states.o: pg_states.cpp pg_states.h g++ $(CXXFLAGS) -c -o $@ $< osd_rmw.o: osd_rmw.cpp osd_rmw.h xor.h g++ $(CXXFLAGS) -c -o $@ $< diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp new file mode 100644 index 00000000..f857e5d8 --- /dev/null +++ b/etcd_state_client.cpp @@ -0,0 +1,354 @@ +#include "osd_ops.h" +#include "pg_states.h" +#include "etcd_state_client.h" +#include "http_client.h" +#include "base64.h" + +json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) +{ + json_kv_t kv; + kv.key = base64_decode(kv_json["key"].string_value()); + std::string json_err, json_text = base64_decode(kv_json["value"].string_value()); + kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err); + if (json_err != "") + { + printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str()); + kv.key = ""; + } + return kv; +} + +void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function callback) +{ + std::string req = payload.dump(); + req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n" + "Host: "+etcd_address+"\r\n" + "Content-Type: application/json\r\n" + "Content-Length: "+std::to_string(req.size())+"\r\n" + "Connection: close\r\n" + "\r\n"+req; + http_request_json(tfd, etcd_address, req, timeout, callback); +} + +void etcd_state_client_t::etcd_txn(json11::Json txn, int timeout, std::function callback) +{ + etcd_call("/kv/txn", txn, timeout, callback); +} + +void etcd_state_client_t::start_etcd_watcher() +{ + etcd_watches_initialised = 0; + etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg) + { + if (msg->body.length()) + { + std::string json_err; + json11::Json data = json11::Json::parse(msg->body, json_err); + if (json_err != "") + { + printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str()); + } + else + { + if (data["result"]["created"].bool_value()) + { + etcd_watches_initialised++; + } + if (etcd_watches_initialised == 4) + { + etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); + } + // First gather all changes into a hash to remove multiple overwrites + json11::Json::object changes; + for (auto & ev: data["result"]["events"].array_items()) + { + auto kv = parse_etcd_kv(ev["kv"]); + if (kv.key != "") + { + changes[kv.key] = kv.value; + } + } + for (auto & kv: changes) + { + if (this->log_level > 0) + { + printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str()); + } + parse_state(kv.first, kv.second); + } + // React to changes + on_change_hook(changes); + } + } + if (msg->eof) + { + etcd_watch_ws = NULL; + if (etcd_watches_initialised == 0) + { + // Connection not established, retry in + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int) + { + start_etcd_watcher(); + }); + } + else + { + // Connection was live, retry immediately + start_etcd_watcher(); + } + } + }); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/config/") }, + { "range_end", base64_encode(etcd_prefix+"/config0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_CONFIG_WATCH_ID }, + } } + }).dump()); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/osd/state/") }, + { "range_end", base64_encode(etcd_prefix+"/osd/state0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_OSD_STATE_WATCH_ID }, + } } + }).dump()); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/state/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_PG_STATE_WATCH_ID }, + } } + }).dump()); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/history/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_PG_HISTORY_WATCH_ID }, + } } + }).dump()); +} + +void etcd_state_client_t::load_global_config() +{ + etcd_call("/kv/range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/config/global") } + }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) + { + if (err != "") + { + printf("Error reading OSD configuration from etcd: %s\n", err.c_str()); + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) + { + load_global_config(); + }); + return; + } + if (!etcd_watch_revision) + { + etcd_watch_revision = data["header"]["revision"].uint64_value(); + } + json11::Json::object global_config; + if (data["kvs"].array_items().size() > 0) + { + auto kv = parse_etcd_kv(data["kvs"][0]); + if (kv.value.is_object()) + { + global_config = kv.value.object_items(); + } + } + on_load_config_hook(global_config); + }); +} + +void etcd_state_client_t::load_pgs() +{ + json11::Json::array txn = { + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/config/pgs") }, + } } + }, + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/history/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, + } } + }, + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/state/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, + } } + }, + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/osd/state/") }, + { "range_end", base64_encode(etcd_prefix+"/osd/state0") }, + } } + }, + }; + json11::Json::object req = { { "success", txn } }; + json11::Json checks = load_pgs_checks_hook(); + if (checks.array_items().size() > 0) + { + req["compare"] = checks; + } + etcd_txn(req, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) + { + if (err != "") + { + printf("Error loading PGs from etcd: %s\n", err.c_str()); + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) + { + load_pgs(); + }); + return; + } + if (!data["succeeded"].bool_value()) + { + on_load_pgs_hook(false); + return; + } + for (auto & res: data["responses"].array_items()) + { + for (auto & kv_json: res["response_range"]["kvs"].array_items()) + { + auto kv = parse_etcd_kv(kv_json); + parse_state(kv.key, kv.value); + } + } + on_load_pgs_hook(true); + }); +} + +void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value) +{ + if (key == etcd_prefix+"/config/pgs") + { + for (auto & pg_item: this->pg_config) + { + pg_item.second.exists = false; + } + for (auto & pg_item: value["items"].object_items()) + { + pg_num_t pg_num = stoull_full(pg_item.first); + if (!pg_num) + { + printf("Bad key in PG configuration: %s (must be a number), skipped\n", pg_item.first.c_str()); + continue; + } + this->pg_config[pg_num].exists = true; + this->pg_config[pg_num].pause = pg_item.second["pause"].bool_value(); + this->pg_config[pg_num].primary = pg_item.second["primary"].uint64_value(); + this->pg_config[pg_num].target_set.clear(); + for (auto pg_osd: pg_item.second["osd_set"].array_items()) + { + this->pg_config[pg_num].target_set.push_back(pg_osd.uint64_value()); + } + if (this->pg_config[pg_num].target_set.size() != 3) + { + printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str()); + this->pg_config[pg_num].target_set.resize(3); + this->pg_config[pg_num].pause = true; + } + } + } + else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") + { + // /pg/history/%d + pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12)); + if (!pg_num) + { + printf("Bad etcd key %s, ignoring\n", key.c_str()); + } + else + { + auto & pg_cfg = this->pg_config[pg_num]; + pg_cfg.target_history.clear(); + pg_cfg.all_peers.clear(); + // Refuse to start PG if any set of the has no live OSDs + for (auto hist_item: value["osd_sets"].array_items()) + { + std::vector history_set; + for (auto pg_osd: hist_item.array_items()) + { + history_set.push_back(pg_osd.uint64_value()); + } + pg_cfg.target_history.push_back(history_set); + } + // Include these additional OSDs when peering the PG + for (auto pg_osd: value["all_peers"].array_items()) + { + pg_cfg.all_peers.push_back(pg_osd.uint64_value()); + } + } + } + else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/") + { + // /pg/state/%d + pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+10)); + if (!pg_num) + { + printf("Bad etcd key %s, ignoring\n", key.c_str()); + } + else if (value.is_null()) + { + this->pg_config[pg_num].cur_primary = 0; + this->pg_config[pg_num].cur_state = 0; + } + else + { + osd_num_t cur_primary = value["primary"].uint64_value(); + int state = 0; + for (auto & e: value["state"].array_items()) + { + int i; + for (i = 0; i < pg_state_bit_count; i++) + { + if (e.string_value() == pg_state_names[i]) + { + state = state | pg_state_bits[i]; + break; + } + } + if (i >= pg_state_bit_count) + { + printf("Unexpected PG %u state keyword in etcd: %s\n", pg_num, e.dump().c_str()); + return; + } + } + if (!cur_primary || !value["state"].is_array() || !state || + (state & PG_OFFLINE) && state != PG_OFFLINE || + (state & PG_PEERING) && state != PG_PEERING || + (state & PG_INCOMPLETE) && state != PG_INCOMPLETE) + { + printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str()); + return; + } + this->pg_config[pg_num].cur_primary = cur_primary; + this->pg_config[pg_num].cur_state = state; + } + } + else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/") + { + // /osd/state/%d + osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11)); + if (peer_osd > 0) + { + if (value.is_object() && value["state"] == "up" && + value["addresses"].is_array() && + value["port"].int64_value() > 0 && value["port"].int64_value() < 65536) + { + this->peer_states[peer_osd] = value; + } + else + { + this->peer_states.erase(peer_osd); + } + } + } +} diff --git a/etcd_state_client.h b/etcd_state_client.h new file mode 100644 index 00000000..96a0b019 --- /dev/null +++ b/etcd_state_client.h @@ -0,0 +1,58 @@ +#pragma once + +#include "http_client.h" +#include "timerfd_manager.h" + +#define ETCD_CONFIG_WATCH_ID 1 +#define ETCD_PG_STATE_WATCH_ID 2 +#define ETCD_PG_HISTORY_WATCH_ID 3 +#define ETCD_OSD_STATE_WATCH_ID 4 + +#define MAX_ETCD_ATTEMPTS 5 +#define ETCD_SLOW_TIMEOUT 5000 +#define ETCD_QUICK_TIMEOUT 1000 + +struct pg_config_t +{ + bool exists; + osd_num_t primary; + std::vector target_set; + std::vector> target_history; + std::vector all_peers; + bool pause; + osd_num_t cur_primary; + int cur_state; +}; + +struct json_kv_t +{ + std::string key; + json11::Json value; +}; + +struct etcd_state_client_t +{ + // FIXME Allow multiple etcd addresses and select random address + std::string etcd_address, etcd_prefix, etcd_api_path; + int log_level = 0; + timerfd_manager_t *tfd = NULL; + + int etcd_watches_initialised = 0; + uint64_t etcd_watch_revision = 0; + websocket_t *etcd_watch_ws = NULL; + std::map pg_config; + std::map peer_states; + + std::function on_change_hook; + std::function on_load_config_hook; + std::function load_pgs_checks_hook; + std::function on_load_pgs_hook; + + json_kv_t parse_etcd_kv(const json11::Json & kv_json); + void etcd_call(std::string api, json11::Json payload, int timeout, std::function callback); + void etcd_txn(json11::Json txn, int timeout, std::function callback); + void start_etcd_watcher(); + void load_global_config(); + void load_pgs(); + void parse_state(const std::string & key, const json11::Json & value); +}; diff --git a/osd.cpp b/osd.cpp index 080623d8..a3e8d6e9 100644 --- a/osd.cpp +++ b/osd.cpp @@ -407,7 +407,7 @@ void osd_t::stop_client(int peer_fd) { // Reload configuration from etcd when the connection is dropped printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); - peer_states.erase(cl.osd_num); + st_cli.peer_states.erase(cl.osd_num); repeer_pgs(cl.osd_num); } else diff --git a/osd.h b/osd.h index a83696e6..09899399 100644 --- a/osd.h +++ b/osd.h @@ -18,8 +18,7 @@ #include "timerfd_manager.h" #include "osd_ops.h" #include "osd_peering_pg.h" -#include "http_client.h" -#include "json11/json11.hpp" +#include "etcd_state_client.h" #define OSD_OP_IN 0 #define OSD_OP_OUT 1 @@ -186,24 +185,6 @@ struct osd_wanted_peer_t int address_index; }; -struct pg_config_t -{ - bool exists; - osd_num_t primary; - std::vector target_set; - std::vector> target_history; - std::vector all_peers; - bool pause; - osd_num_t cur_primary; - int cur_state; -}; - -struct json_kv_t -{ - std::string key; - json11::Json value; -}; - class osd_t { // config @@ -231,16 +212,12 @@ class osd_t // cluster state + etcd_state_client_t st_cli; + int etcd_failed_attempts = 0; std::string etcd_lease_id; - int etcd_watches_initialised = 0; - uint64_t etcd_watch_revision = 0; - websocket_t *etcd_watch_ws = NULL; json11::Json self_state; - std::map peer_states; std::map wanted_peers; bool loading_peer_config = false; - int etcd_failed_attempts = 0; - std::map pg_config; std::set pg_state_dirty; bool pg_config_applied = false; bool etcd_reporting_pg_state = false; @@ -294,13 +271,12 @@ class osd_t uint64_t recovery_stat_bytes[2][2] = { 0 }; // cluster connection - void etcd_call(std::string api, json11::Json payload, int timeout, std::function callback); - void etcd_txn(json11::Json txn, int timeout, std::function callback); - json_kv_t parse_etcd_kv(const json11::Json & kv_json); void parse_config(blockstore_config_t & config); void init_cluster(); - void start_etcd_watcher(); - void load_global_config(); + void on_change_etcd_state_hook(json11::Json::object & changes); + void on_load_config_hook(json11::Json::object & changes); + json11::Json on_load_pgs_checks_hook(); + void on_load_pgs_hook(bool success); void bind_socket(); void acquire_lease(); json11::Json get_osd_state(); @@ -312,12 +288,9 @@ class osd_t void report_statistics(); void report_pg_state(pg_t & pg); void report_pg_states(); - void load_pgs(); - void parse_pg_state(const std::string & key, const json11::Json & value); void apply_pg_count(); void apply_pg_config(); void load_and_connect_peers(); - void parse_etcd_osd_state(const std::string & key, const json11::Json & value); // event loop, socket read/write void loop(); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 649e9c29..d65e2606 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -1,45 +1,6 @@ #include "osd.h" #include "base64.h" - -#define ETCD_CONFIG_WATCH_ID 1 -#define ETCD_PG_STATE_WATCH_ID 2 -#define ETCD_PG_HISTORY_WATCH_ID 3 -#define ETCD_OSD_STATE_WATCH_ID 4 - -#define MAX_ETCD_ATTEMPTS 5 -#define ETCD_SLOW_TIMEOUT 5000 -#define ETCD_QUICK_TIMEOUT 1000 - -json_kv_t osd_t::parse_etcd_kv(const json11::Json & kv_json) -{ - json_kv_t kv; - kv.key = base64_decode(kv_json["key"].string_value()); - std::string json_err, json_text = base64_decode(kv_json["value"].string_value()); - kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err); - if (json_err != "") - { - printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str()); - kv.key = ""; - } - return kv; -} - -void osd_t::etcd_txn(json11::Json txn, int timeout, std::function callback) -{ - etcd_call("/kv/txn", txn, timeout, callback); -} - -void osd_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function callback) -{ - std::string req = payload.dump(); - req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n" - "Host: "+etcd_address+"\r\n" - "Content-Type: application/json\r\n" - "Content-Length: "+std::to_string(req.size())+"\r\n" - "Connection: close\r\n" - "\r\n"+req; - http_request_json(tfd, etcd_address, req, timeout, callback); -} +#include "etcd_state_client.h" // Startup sequence: // Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state @@ -62,7 +23,7 @@ void osd_t::init_cluster() parse_test_peer(pos < 0 ? peerstr : peerstr.substr(0, pos)); peerstr = pos < 0 ? std::string("") : peerstr.substr(pos+1); } - if (peer_states.size() < 2) + if (st_cli.peer_states.size() < 2) { throw std::runtime_error("run_primary requires at least 2 peers"); } @@ -81,8 +42,17 @@ void osd_t::init_cluster() } else { + st_cli.tfd = tfd; + st_cli.etcd_address = etcd_address; + st_cli.etcd_prefix = etcd_prefix; + st_cli.etcd_api_path = etcd_api_path; + st_cli.log_level = log_level; + st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); }; + st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; + st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); }; + st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); }; peering_state = OSD_LOADING_PGS; - load_global_config(); + st_cli.load_global_config(); } if (run_primary && autosync_interval > 0) { @@ -106,12 +76,12 @@ void osd_t::parse_test_peer(std::string peer) osd_num_t peer_osd = strtoull(osd_num_str.c_str(), NULL, 10); if (!peer_osd) throw new std::runtime_error("Could not parse OSD peer osd_num"); - else if (peer_states.find(peer_osd) != peer_states.end()) + else if (st_cli.peer_states.find(peer_osd) != st_cli.peer_states.end()) throw std::runtime_error("Same osd number "+std::to_string(peer_osd)+" specified twice in peers"); int port = strtoull(port_str.c_str(), NULL, 10); if (!port) throw new std::runtime_error("Could not parse OSD peer port"); - peer_states[peer_osd] = json11::Json::object { + st_cli.peer_states[peer_osd] = json11::Json::object { { "state", "up" }, { "addresses", json11::Json::array { addr } }, { "port", port }, @@ -220,7 +190,7 @@ void osd_t::report_statistics() } } }); } - etcd_txn(json11::Json::object { { "success", txn } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res) + st_cli.etcd_txn(json11::Json::object { { "success", txn } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res) { etcd_reporting_stats = false; if (err != "") @@ -240,157 +210,35 @@ void osd_t::report_statistics() }); } -void osd_t::start_etcd_watcher() +void osd_t::on_change_etcd_state_hook(json11::Json::object & changes) { - etcd_watches_initialised = 0; - etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg) - { - if (msg->body.length()) - { - std::string json_err; - json11::Json data = json11::Json::parse(msg->body, json_err); - if (json_err != "") - { - printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str()); - } - else - { - if (data["result"]["created"].bool_value()) - { - etcd_watches_initialised++; - } - if (etcd_watches_initialised == 4) - { - etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); - } - // First gather all changes into a hash to remove multiple overwrites - json11::Json::object changes; - for (auto & ev: data["result"]["events"].array_items()) - { - auto kv = parse_etcd_kv(ev["kv"]); - if (kv.key != "") - { - changes[kv.key] = kv.value; - } - } - for (auto & kv: changes) - { - if (this->log_level > 0) - { - printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str()); - } - if (kv.first.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/") - { - parse_etcd_osd_state(kv.first, kv.second); - } - else - { - parse_pg_state(kv.first, kv.second); - } - } - apply_pg_count(); - apply_pg_config(); - } - } - if (msg->eof) - { - etcd_watch_ws = NULL; - if (etcd_watches_initialised == 0) - { - // Connection not established, retry in - tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int) - { - start_etcd_watcher(); - }); - } - else - { - // Connection was live, retry immediately - start_etcd_watcher(); - } - } - }); - // FIXME apply config changes in runtime - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { - { "create_request", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/config/") }, - { "range_end", base64_encode(etcd_prefix+"/config0") }, - { "start_revision", etcd_watch_revision+1 }, - { "watch_id", ETCD_CONFIG_WATCH_ID }, - } } - }).dump()); - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { - { "create_request", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/osd/state/") }, - { "range_end", base64_encode(etcd_prefix+"/osd/state0") }, - { "start_revision", etcd_watch_revision+1 }, - { "watch_id", ETCD_OSD_STATE_WATCH_ID }, - } } - }).dump()); - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { - { "create_request", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/state/") }, - { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, - { "start_revision", etcd_watch_revision+1 }, - { "watch_id", ETCD_PG_STATE_WATCH_ID }, - } } - }).dump()); - etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { - { "create_request", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/history/") }, - { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, - { "start_revision", etcd_watch_revision+1 }, - { "watch_id", ETCD_PG_HISTORY_WATCH_ID }, - } } - }).dump()); + // FIXME apply config changes in runtime (maybe, some) + apply_pg_count(); + apply_pg_config(); } -void osd_t::load_global_config() +void osd_t::on_load_config_hook(json11::Json::object & global_config) { - etcd_call("/kv/range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/config/global") } - }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) + blockstore_config_t osd_config = this->config; + for (auto & cfg_var: global_config) { - if (err != "") + if (this->config.find(cfg_var.first) == this->config.end()) { - printf("Error reading OSD configuration from etcd: %s\n", err.c_str()); - tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) - { - load_global_config(); - }); - return; + // FIXME Convert int to str + osd_config[cfg_var.first] = cfg_var.second.string_value(); } - if (!etcd_watch_revision) - { - etcd_watch_revision = data["header"]["revision"].uint64_value(); - } - if (data["kvs"].array_items().size() > 0) - { - auto kv = parse_etcd_kv(data["kvs"][0]); - if (kv.value.is_object()) - { - blockstore_config_t osd_config = this->config; - for (auto & cfg_var: kv.value.object_items()) - { - if (this->config.find(cfg_var.first) == this->config.end()) - { - osd_config[cfg_var.first] = cfg_var.second.string_value(); - } - } - parse_config(osd_config); - } - } - bind_socket(); - start_etcd_watcher(); - acquire_lease(); - }); + } + parse_config(osd_config); + bind_socket(); + st_cli.start_etcd_watcher(); + acquire_lease(); } // Acquire lease void osd_t::acquire_lease() { // Maximum lease TTL is (report interval) + retries * (timeout + repeat interval) - etcd_call("/lease/grant", json11::Json::object { + st_cli.etcd_call("/lease/grant", json11::Json::object { { "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) { @@ -419,7 +267,7 @@ void osd_t::create_osd_state() { std::string state_key = base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)); self_state = get_osd_state(); - etcd_txn(json11::Json::object { + st_cli.etcd_txn(json11::Json::object { // Check that the state key does not exist { "compare", json11::Json::array { json11::Json::object { @@ -456,7 +304,7 @@ void osd_t::create_osd_state() if (!data["succeeded"].bool_value()) { // OSD is already up - auto kv = parse_etcd_kv(data["responses"][0]["response_range"]["kvs"][0]); + auto kv = st_cli.parse_etcd_kv(data["responses"][0]["response_range"]["kvs"][0]); printf("Key %s already exists in etcd, OSD %lu is still up\n", kv.key.c_str(), this->osd_num); int64_t port = kv.value["port"].int64_value(); for (auto & addr: kv.value["addresses"].array_items()) @@ -468,7 +316,7 @@ void osd_t::create_osd_state() } if (run_primary) { - load_pgs(); + st_cli.load_pgs(); } }); } @@ -476,7 +324,7 @@ void osd_t::create_osd_state() // Renew lease void osd_t::renew_lease() { - etcd_call("/lease/keepalive", json11::Json::object { + st_cli.etcd_call("/lease/keepalive", json11::Json::object { { "ID", etcd_lease_id } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) { @@ -512,7 +360,7 @@ void osd_t::force_stop(int exitcode) { if (etcd_lease_id != "") { - etcd_call("/kv/lease/revoke", json11::Json::object { + st_cli.etcd_call("/kv/lease/revoke", json11::Json::object { { "ID", etcd_lease_id } }, ETCD_QUICK_TIMEOUT, [this, exitcode](std::string err, json11::Json data) { @@ -531,7 +379,7 @@ void osd_t::force_stop(int exitcode) } } -void osd_t::load_pgs() +json11::Json osd_t::on_load_pgs_checks_hook() { assert(this->pgs.size() == 0); json11::Json::array checks = { @@ -541,174 +389,28 @@ void osd_t::load_pgs() { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, } }; - json11::Json::array txn = { - json11::Json::object { - { "request_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/config/pgs") }, - } } - }, - json11::Json::object { - { "request_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/history/") }, - { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, - } } - }, - json11::Json::object { - { "request_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/state/") }, - { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, - } } - }, - }; - etcd_txn(json11::Json::object { - { "compare", checks }, { "success", txn } - }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) - { - if (err != "") - { - printf("Error loading PGs from etcd: %s\n", err.c_str()); - tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) - { - load_pgs(); - }); - return; - } - if (!data["succeeded"].bool_value()) - { - printf("Error loading PGs from etcd: lease expired\n"); - force_stop(1); - return; - } - peering_state &= ~OSD_LOADING_PGS; - for (auto & res: data["responses"].array_items()) - { - for (auto & kv_json: res["response_range"]["kvs"].array_items()) - { - auto kv = parse_etcd_kv(kv_json); - parse_pg_state(kv.key, kv.value); - } - } - apply_pg_count(); - apply_pg_config(); - }); + return checks; } -void osd_t::parse_pg_state(const std::string & key, const json11::Json & value) +void osd_t::on_load_pgs_hook(bool success) { - if (key == etcd_prefix+"/config/pgs") + if (!success) { - for (auto & pg_item: this->pg_config) - { - pg_item.second.exists = false; - } - for (auto & pg_item: value["items"].object_items()) - { - pg_num_t pg_num = stoull_full(pg_item.first); - if (!pg_num) - { - printf("Bad key in PG configuration: %s (must be a number), skipped\n", pg_item.first.c_str()); - continue; - } - this->pg_config[pg_num].exists = true; - this->pg_config[pg_num].pause = pg_item.second["pause"].bool_value(); - this->pg_config[pg_num].primary = pg_item.second["primary"].uint64_value(); - this->pg_config[pg_num].target_set.clear(); - for (auto pg_osd: pg_item.second["osd_set"].array_items()) - { - this->pg_config[pg_num].target_set.push_back(pg_osd.uint64_value()); - } - if (this->pg_config[pg_num].target_set.size() != 3) - { - printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str()); - this->pg_config[pg_num].target_set.resize(3); - this->pg_config[pg_num].pause = true; - } - } + printf("Error loading PGs from etcd: lease expired\n"); + force_stop(1); } - else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") + else { - // /pg/history/%d - pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12)); - if (!pg_num) - { - printf("Bad etcd key %s, ignoring\n", key.c_str()); - } - else - { - auto & pg_cfg = this->pg_config[pg_num]; - pg_cfg.target_history.clear(); - pg_cfg.all_peers.clear(); - // Refuse to start PG if any set of the has no live OSDs - for (auto hist_item: value["osd_sets"].array_items()) - { - std::vector history_set; - for (auto pg_osd: hist_item.array_items()) - { - history_set.push_back(pg_osd.uint64_value()); - } - pg_cfg.target_history.push_back(history_set); - } - // Include these additional OSDs when peering the PG - for (auto pg_osd: value["all_peers"].array_items()) - { - pg_cfg.all_peers.push_back(pg_osd.uint64_value()); - } - } - } - else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/") - { - // /pg/state/%d - pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+10)); - if (!pg_num) - { - printf("Bad etcd key %s, ignoring\n", key.c_str()); - } - else if (value.is_null()) - { - this->pg_config[pg_num].cur_primary = 0; - this->pg_config[pg_num].cur_state = 0; - } - else - { - osd_num_t cur_primary = value["primary"].uint64_value(); - int state = 0; - for (auto & e: value["state"].array_items()) - { - int i; - for (i = 0; i < pg_state_bit_count; i++) - { - if (e.string_value() == pg_state_names[i]) - { - state = state | pg_state_bits[i]; - break; - } - } - if (i >= pg_state_bit_count) - { - printf("Unexpected PG %u state keyword in etcd: %s\n", pg_num, e.dump().c_str()); - force_stop(1); - return; - } - } - if (!cur_primary || !value["state"].is_array() || !state || - (state & PG_OFFLINE) && state != PG_OFFLINE || - (state & PG_PEERING) && state != PG_PEERING || - (state & PG_INCOMPLETE) && state != PG_INCOMPLETE) - { - printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str()); - force_stop(1); - return; - } - this->pg_config[pg_num].cur_primary = cur_primary; - this->pg_config[pg_num].cur_state = state; - } + peering_state &= ~OSD_LOADING_PGS; + apply_pg_count(); + apply_pg_config(); } } void osd_t::apply_pg_count() { - pg_num_t pg_count = pg_config.size(); - if (pg_count > 0 && (pg_config.begin()->first != 1 || std::prev(pg_config.end())->first != pg_count)) + pg_num_t pg_count = st_cli.pg_config.size(); + if (pg_count > 0 && (st_cli.pg_config.begin()->first != 1 || std::prev(st_cli.pg_config.end())->first != pg_count)) { printf("Invalid PG configuration: PG numbers don't cover the whole 1..%d range\n", pg_count); force_stop(1); @@ -741,7 +443,7 @@ void osd_t::apply_pg_count() void osd_t::apply_pg_config() { bool all_applied = true; - for (auto & kv: pg_config) + for (auto & kv: st_cli.pg_config) { pg_num_t pg_num = kv.first; auto & pg_cfg = kv.second; @@ -962,7 +664,7 @@ void osd_t::report_pg_states() }); } pg_state_dirty.clear(); - etcd_txn(json11::Json::object { + st_cli.etcd_txn(json11::Json::object { { "compare", checks }, { "success", success }, { "failure", failure } }, ETCD_QUICK_TIMEOUT, [this, reporting_pgs](std::string err, json11::Json data) { @@ -984,15 +686,18 @@ void osd_t::report_pg_states() } for (auto & res: data["responses"].array_items()) { - auto kv = parse_etcd_kv(res["kvs"][0]); - pg_num_t pg_num = stoull_full(kv.key.substr(etcd_prefix.length()+10)); - auto pg_it = pgs.find(pg_num); - if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) + if (res["kvs"].array_items().size()) { - // Live PG state update failed - printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num); - force_stop(1); - return; + auto kv = st_cli.parse_etcd_kv(res["kvs"][0]); + pg_num_t pg_num = stoull_full(kv.key.substr(etcd_prefix.length()+10)); + auto pg_it = pgs.find(pg_num); + if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) + { + // Live PG state update failed + printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num); + force_stop(1); + return; + } } } // Retry after a short pause (hope we'll get some updates and update PG states accordingly) @@ -1032,7 +737,7 @@ void osd_t::load_and_connect_peers() osd_num_t peer_osd = wp_it->first; if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) { - // It shouldn't be here + // Peer is already connected, it shouldn't be in wanted_peers wanted_peers.erase(wp_it++); if (!wanted_peers.size()) { @@ -1040,7 +745,7 @@ void osd_t::load_and_connect_peers() peering_state = peering_state & ~OSD_CONNECTING_PEERS; } } - else if (peer_states.find(peer_osd) == peer_states.end()) + else if (st_cli.peer_states.find(peer_osd) == st_cli.peer_states.end()) { if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval)) { @@ -1059,16 +764,16 @@ void osd_t::load_and_connect_peers() { // Try to connect wp_it->second.connecting = true; - const std::string addr = peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value(); - int64_t peer_port = peer_states[peer_osd]["port"].int64_value(); + const std::string addr = st_cli.peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value(); + int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); wp_it++; connect_peer(peer_osd, addr.c_str(), peer_port, [this](osd_num_t peer_osd, int peer_fd) { wanted_peers[peer_osd].connecting = false; if (peer_fd < 0) { - int64_t peer_port = peer_states[peer_osd]["port"].int64_value(); - auto & addrs = peer_states[peer_osd]["addresses"].array_items(); + int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); + auto & addrs = st_cli.peer_states[peer_osd]["addresses"].array_items(); const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str(); printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd)); if (wanted_peers[peer_osd].address_index < addrs.size()-1) @@ -1079,7 +784,7 @@ void osd_t::load_and_connect_peers() else { wanted_peers[peer_osd].last_connect_attempt = time(NULL); - peer_states.erase(peer_osd); + st_cli.peer_states.erase(peer_osd); } return; } @@ -1102,7 +807,7 @@ void osd_t::load_and_connect_peers() } if (load_peer_txn.size() > 0) { - etcd_txn(json11::Json::object { { "success", load_peer_txn } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) + st_cli.etcd_txn(json11::Json::object { { "success", load_peer_txn } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) { // Ugly, but required to wake up the loop and retry connecting after seconds tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){}); @@ -1116,22 +821,10 @@ void osd_t::load_and_connect_peers() { if (res["response_range"]["kvs"].array_items().size()) { - auto kv = parse_etcd_kv(res["response_range"]["kvs"][0]); - parse_etcd_osd_state(kv.key, kv.value); + auto kv = st_cli.parse_etcd_kv(res["response_range"]["kvs"][0]); + st_cli.parse_state(kv.key, kv.value); } } }); } } - -void osd_t::parse_etcd_osd_state(const std::string & key, const json11::Json & value) -{ - // /osd/state/ - osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11)); - if (peer_osd > 0 && value.is_object() && value["state"] == "up" && - value["addresses"].is_array() && - value["port"].int64_value() > 0 && value["port"].int64_value() < 65536) - { - peer_states[peer_osd] = value; - } -} diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 8d07eb46..02ac2499 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -383,35 +383,3 @@ void pg_t::print_state() total_count ); } - -const int pg_state_bit_count = 13; - -const int pg_state_bits[13] = { - PG_STARTING, - PG_PEERING, - PG_INCOMPLETE, - PG_ACTIVE, - PG_STOPPING, - PG_OFFLINE, - PG_DEGRADED, - PG_HAS_INCOMPLETE, - PG_HAS_DEGRADED, - PG_HAS_MISPLACED, - PG_HAS_UNCLEAN, - PG_LEFT_ON_DEAD, -}; - -const char *pg_state_names[13] = { - "starting", - "peering", - "incomplete", - "active", - "stopping", - "offline", - "degraded", - "has_incomplete", - "has_degraded", - "has_misplaced", - "has_unclean", - "left_on_dead", -}; diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 54a44dcd..41e2122d 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -7,38 +7,7 @@ #include "object_id.h" #include "osd_ops.h" - -// Placement group states -// STARTING -> [acquire lock] -> PEERING -> INCOMPLETE|ACTIVE -> STOPPING -> OFFLINE -> [release lock] -// Exactly one of these: -#define PG_STARTING (1<<0) -#define PG_PEERING (1<<1) -#define PG_INCOMPLETE (1<<2) -#define PG_ACTIVE (1<<3) -#define PG_STOPPING (1<<4) -#define PG_OFFLINE (1<<5) -// Plus any of these: -#define PG_DEGRADED (1<<6) -#define PG_HAS_INCOMPLETE (1<<7) -#define PG_HAS_DEGRADED (1<<8) -#define PG_HAS_MISPLACED (1<<9) -#define PG_HAS_UNCLEAN (1<<10) -#define PG_LEFT_ON_DEAD (1<<11) - -// FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size -#define STRIPE_MASK ((uint64_t)4096 - 1) - -// OSD object states -#define OBJ_DEGRADED 0x02 -#define OBJ_INCOMPLETE 0x04 -#define OBJ_MISPLACED 0x08 -#define OBJ_NEEDS_STABLE 0x10000 -#define OBJ_NEEDS_ROLLBACK 0x20000 -#define OBJ_BUGGY 0x80000 - -extern const int pg_state_bits[]; -extern const char *pg_state_names[]; -extern const int pg_state_bit_count; +#include "pg_states.h" struct pg_obj_loc_t { diff --git a/pg_states.cpp b/pg_states.cpp new file mode 100644 index 00000000..e2f909a4 --- /dev/null +++ b/pg_states.cpp @@ -0,0 +1,33 @@ +#include "pg_states.h" + +const int pg_state_bit_count = 13; + +const int pg_state_bits[13] = { + PG_STARTING, + PG_PEERING, + PG_INCOMPLETE, + PG_ACTIVE, + PG_STOPPING, + PG_OFFLINE, + PG_DEGRADED, + PG_HAS_INCOMPLETE, + PG_HAS_DEGRADED, + PG_HAS_MISPLACED, + PG_HAS_UNCLEAN, + PG_LEFT_ON_DEAD, +}; + +const char *pg_state_names[13] = { + "starting", + "peering", + "incomplete", + "active", + "stopping", + "offline", + "degraded", + "has_incomplete", + "has_degraded", + "has_misplaced", + "has_unclean", + "left_on_dead", +}; diff --git a/pg_states.h b/pg_states.h new file mode 100644 index 00000000..ff826d5b --- /dev/null +++ b/pg_states.h @@ -0,0 +1,33 @@ +#pragma once + +// Placement group states +// STARTING -> [acquire lock] -> PEERING -> INCOMPLETE|ACTIVE -> STOPPING -> OFFLINE -> [release lock] +// Exactly one of these: +#define PG_STARTING (1<<0) +#define PG_PEERING (1<<1) +#define PG_INCOMPLETE (1<<2) +#define PG_ACTIVE (1<<3) +#define PG_STOPPING (1<<4) +#define PG_OFFLINE (1<<5) +// Plus any of these: +#define PG_DEGRADED (1<<6) +#define PG_HAS_INCOMPLETE (1<<7) +#define PG_HAS_DEGRADED (1<<8) +#define PG_HAS_MISPLACED (1<<9) +#define PG_HAS_UNCLEAN (1<<10) +#define PG_LEFT_ON_DEAD (1<<11) + +// FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size +#define STRIPE_MASK ((uint64_t)4096 - 1) + +// OSD object states +#define OBJ_DEGRADED 0x02 +#define OBJ_INCOMPLETE 0x04 +#define OBJ_MISPLACED 0x08 +#define OBJ_NEEDS_STABLE 0x10000 +#define OBJ_NEEDS_ROLLBACK 0x20000 +#define OBJ_BUGGY 0x80000 + +extern const int pg_state_bits[]; +extern const char *pg_state_names[]; +extern const int pg_state_bit_count;