From 7b57eeeeb37d3d1cbe2809401b12972c406a79ba Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 27 Apr 2020 14:32:59 +0300 Subject: [PATCH] Implement PG state locking and PG moving in response to etcd events --- Makefile | 2 + osd.cpp | 18 +- osd.h | 70 +++- osd_cluster.cpp | 764 ++++++++++++++++++++++++++++++---------- osd_flush.cpp | 6 +- osd_http.cpp | 37 +- osd_http.h | 6 + osd_main.cpp | 2 +- osd_peering.cpp | 94 +++-- osd_peering_pg.cpp | 41 ++- osd_peering_pg.h | 22 +- osd_peering_pg_test.cpp | 3 +- osd_primary.cpp | 8 +- 13 files changed, 784 insertions(+), 289 deletions(-) diff --git a/Makefile b/Makefile index 10504d87..dd0484be 100644 --- a/Makefile +++ b/Makefile @@ -67,6 +67,8 @@ rw_blocking.o: rw_blocking.cpp rw_blocking.h g++ $(CXXFLAGS) -c -o $@ $< osd_test: osd_test.cpp osd_ops.h rw_blocking.o g++ $(CXXFLAGS) -o osd_test osd_test.cpp rw_blocking.o -ltcmalloc_minimal +osd_peering_pg_test: osd_peering_pg_test.cpp osd_peering_pg.o + g++ $(CXXFLAGS) -o $@ $< osd_peering_pg.o -ltcmalloc_minimal libfio_sec_osd.so: fio_sec_osd.cpp osd_ops.h rw_blocking.o g++ $(CXXFLAGS) -ltcmalloc_minimal -shared -o libfio_sec_osd.so fio_sec_osd.cpp rw_blocking.o -luring diff --git a/osd.cpp b/osd.cpp index 01266915..beda6dbe 100644 --- a/osd.cpp +++ b/osd.cpp @@ -106,10 +106,8 @@ void osd_t::parse_config(blockstore_config_t & config) int major, minor; if (sscanf(config["etcd_version"].c_str(), "%d.%d", &major, &minor) < 2) throw std::runtime_error("etcd_version should be in the form MAJOR.MINOR (for example, 3.2)"); - if (major < 3 || major == 3 && minor < 2) - throw std::runtime_error("Your etcd is too old, minimum required version is 3.2"); - else if (major == 3 && minor == 2) - etcd_api_path = "/v3alpha"; + if (major < 3 || major == 3 && minor < 3) + throw std::runtime_error("Your etcd is too old, minimum required version is 3.3"); else if (major == 3 && minor == 3) etcd_api_path = "/v3beta"; else @@ -117,10 +115,6 @@ void osd_t::parse_config(blockstore_config_t & config) } else etcd_api_path = "/v3"; - if ((pos = etcd_address.find(':')) >= 0) - etcd_host = etcd_address.substr(0, pos); - else - etcd_host = etcd_address; etcd_report_interval = strtoull(config["etcd_report_interval"].c_str(), NULL, 10); if (etcd_report_interval <= 0) etcd_report_interval = 30; @@ -153,12 +147,10 @@ void osd_t::parse_config(blockstore_config_t & config) peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); if (!peer_connect_interval) peer_connect_interval = 5; - http_request_timeout = strtoull(config["http_request_timeout"].c_str(), NULL, 10); - if (!http_request_timeout) - http_request_timeout = 5; peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10); if (!peer_connect_timeout) peer_connect_timeout = 5; + log_level = strtoull(config["log_level"].c_str(), NULL, 10); } void osd_t::bind_socket() @@ -394,13 +386,13 @@ void osd_t::stop_client(int peer_fd) if (cl.osd_num) { // Reload configuration from etcd when the connection is dropped - printf("[%lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); + printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); peer_states.erase(cl.osd_num); repeer_pgs(cl.osd_num); } else { - printf("[%lu] Stopping client %d (regular client)\n", osd_num, peer_fd); + printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); } } clients.erase(it); diff --git a/osd.h b/osd.h index 2030ed88..7f436fc9 100644 --- a/osd.h +++ b/osd.h @@ -18,6 +18,7 @@ #include "timerfd_manager.h" #include "osd_ops.h" #include "osd_peering_pg.h" +#include "osd_http.h" #include "json11/json11.hpp" #define OSD_OP_IN 0 @@ -49,10 +50,6 @@ #define MAX_RECOVERY_QUEUE 2048 #define DEFAULT_RECOVERY_QUEUE 4 -#define MAX_ETCD_ATTEMPTS 5 -#define ETCD_START_INTERVAL 5000 -#define ETCD_RETRY_INTERVAL 1000 - //#define OSD_STUB extern const char* osd_op_names[]; @@ -189,16 +186,30 @@ struct osd_wanted_peer_t int address_index; }; -struct http_response_t; +struct pg_config_t +{ + bool exists; + osd_num_t primary; + std::vector target_set; + std::vector> target_history; + bool pause; + osd_num_t cur_primary; + int cur_state; +}; -struct websocket_t; +struct json_kv_t +{ + std::string key; + json11::Json value; +}; class osd_t { // config blockstore_config_t config; - std::string etcd_address, etcd_host, etcd_prefix, etcd_api_path; + // FIXME Allow multiple etcd addresses and select random address + std::string etcd_address, etcd_prefix, etcd_api_path; int etcd_report_interval = 30; bool readonly = false; @@ -214,16 +225,26 @@ class osd_t int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; int peer_connect_interval = 5; - int http_request_timeout = 5; int peer_connect_timeout = 5; + int log_level = 0; - // peer OSDs + // cluster state - std::string etcd_lease_id, etcd_watch_revision; + std::string etcd_lease_id; + int etcd_watches_initialised = 0; + uint64_t etcd_watch_revision = 0; + websocket_t *etcd_watch_ws = NULL; std::map peer_states; std::map wanted_peers; bool loading_peer_config = false; int etcd_failed_attempts = 0; + std::map pg_config; + std::set pg_state_dirty; + bool pg_config_applied = false; + bool etcd_reporting_pg_state = false; + bool etcd_reporting_stats = false; + + // peers and PGs std::map osd_peer_fds; std::map pgs; @@ -267,26 +288,36 @@ class osd_t uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 }; // cluster connection - void http_request(std::string host, std::string request, bool streaming, std::function callback); - void http_request_json(std::string host, std::string request, std::function callback); - websocket_t* open_websocket(std::string host, std::string path, std::function callback); - void etcd_call(std::string api, json11::Json payload, std::function callback); - void etcd_txn(json11::Json txn, std::function callback); + void http_request(const std::string & host, const std::string & request, + const http_options_t & options, std::function callback); + void http_request_json(const std::string & host, const std::string & request, int timeout, + std::function callback); + websocket_t* open_websocket(const std::string & host, const std::string & path, int timeout, + std::function callback); + void etcd_call(std::string api, json11::Json payload, int timeout, std::function callback); + void etcd_txn(json11::Json txn, int timeout, std::function callback); + json_kv_t parse_etcd_kv(const json11::Json & kv_json); void parse_config(blockstore_config_t & config); void init_cluster(); + void start_etcd_watcher(); void load_global_config(); void bind_socket(); void acquire_lease(); - void create_state(); + json11::Json get_osd_state(); + void create_osd_state(); void renew_lease(); void print_stats(); void reset_stats(); - json11::Json get_status(); json11::Json get_statistics(); void report_statistics(); + void report_pg_state(pg_t & pg); + void report_pg_states(); void load_pgs(); - void parse_pgs(const json11::Json & pg_config, const std::map & pg_history); + void parse_pg_state(const std::string & key, const json11::Json & value); + void apply_pg_count(); + void apply_pg_config(); void load_and_connect_peers(); + void parse_etcd_osd_state(const std::string & key, const json11::Json & value); // event loop, socket read/write void loop(); @@ -313,6 +344,7 @@ class osd_t void repeer_pgs(osd_num_t osd_num); void start_pg_peering(pg_num_t pg_num); void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps); + void discard_list_subop(osd_op_t *list_op); bool stop_pg(pg_num_t pg_num); void finish_stop_pg(pg_t & pg); @@ -356,7 +388,7 @@ class osd_t public: osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringloop); ~osd_t(); - void force_stop(); + void force_stop(int exitcode); bool shutdown(); }; diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 0222df37..1c0fa760 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -2,26 +2,49 @@ #include "osd_http.h" #include "base64.h" -void osd_t::etcd_txn(json11::Json txn, std::function callback) +#define ETCD_CONFIG_WATCH_ID 1 +#define ETCD_PG_STATE_WATCH_ID 2 +#define ETCD_PG_HISTORY_WATCH_ID 3 +#define ETCD_OSD_STATE_WATCH_ID 4 + +#define MAX_ETCD_ATTEMPTS 5 +#define ETCD_SLOW_TIMEOUT 5000 +#define ETCD_QUICK_TIMEOUT 1000 + +json_kv_t osd_t::parse_etcd_kv(const json11::Json & kv_json) { - etcd_call("/kv/txn", txn, callback); + json_kv_t kv; + kv.key = base64_decode(kv_json["key"].string_value()); + std::string json_err, json_text = base64_decode(kv_json["value"].string_value()); + kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err); + if (json_err != "") + { + printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str()); + kv.key = ""; + } + return kv; } -void osd_t::etcd_call(std::string api, json11::Json payload, std::function callback) +void osd_t::etcd_txn(json11::Json txn, int timeout, std::function callback) +{ + etcd_call("/kv/txn", txn, timeout, callback); +} + +void osd_t::etcd_call(std::string api, json11::Json payload, int timeout, std::function callback) { std::string req = payload.dump(); req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n" - "Host: "+etcd_host+"\r\n" + "Host: "+etcd_address+"\r\n" "Content-Type: application/json\r\n" "Content-Length: "+std::to_string(req.size())+"\r\n" "Connection: close\r\n" "\r\n"+req; - http_request_json(etcd_address, req, callback); + http_request_json(etcd_address, req, timeout, callback); } // Startup sequence: -// Load global OSD configuration -> Bind socket -> Acquire lease -> Report state -// -> Load PGs -> Load peers -> Connect to peers -> Peer PGs +// Start etcd watcher -> Load global OSD configuration -> Bind socket -> Acquire lease -> Report&lock OSD state +// -> Load PG config -> Report&lock PG states -> Load peers -> Connect to peers -> Peer PGs // Event handling // Wait for PG changes -> Start/Stop PGs when requested // Peer connection is lost -> Reload connection data -> Try to reconnect @@ -51,7 +74,7 @@ void osd_t::init_cluster() .target_set = { 1, 2, 3 }, .cur_set = { 0, 0, 0 }, }; - pgs[1].print_state(); + report_pg_state(pgs[1]); pg_count = 1; peering_state = OSD_CONNECTING_PEERS; } @@ -97,7 +120,7 @@ void osd_t::parse_test_peer(std::string peer) wanted_peers[peer_osd] = { 0 }; } -json11::Json osd_t::get_status() +json11::Json osd_t::get_osd_state() { json11::Json::object st; st["state"] = "up"; @@ -145,6 +168,11 @@ json11::Json osd_t::get_statistics() void osd_t::report_statistics() { + if (etcd_reporting_stats) + { + return; + } + etcd_reporting_stats = true; json11::Json::array txn = { json11::Json::object { { "request_put", json11::Json::object { { "key", base64_encode(etcd_prefix+"/osd/stats/"+std::to_string(osd_num)) }, @@ -154,10 +182,11 @@ void osd_t::report_statistics() for (auto & p: pgs) { auto & pg = p.second; - json11::Json::array pg_state; - for (int i = 0; i < pg_state_bit_count; i++) - if (pg.state & pg_state_bits[i]) - pg_state.push_back(pg_state_names[i]); + if (pg.state & (PG_OFFLINE | PG_STARTING)) + { + // Don't report statistics for offline PGs + continue; + } json11::Json::object pg_stats; pg_stats["object_count"] = pg.total_count; pg_stats["clean_count"] = pg.clean_count; @@ -165,45 +194,21 @@ void osd_t::report_statistics() pg_stats["degraded_count"] = pg.degraded_objects.size(); pg_stats["incomplete_count"] = pg.incomplete_objects.size(); pg_stats["write_osd_set"] = pg.cur_set; - txn.push_back(json11::Json::object { - { "request_put", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, - { "value", base64_encode(json11::Json(json11::Json::object { - { "primary", this->osd_num }, - { "state", pg_state }, - }).dump()) }, - { "lease", etcd_lease_id }, - } } - }); txn.push_back(json11::Json::object { { "request_put", json11::Json::object { { "key", base64_encode(etcd_prefix+"/pg/stats/"+std::to_string(pg.pg_num)) }, { "value", base64_encode(json11::Json(pg_stats).dump()) }, } } }); - if (pg.state == PG_ACTIVE && pg.target_history.size()) - { - pg.target_history.clear(); - pg.all_peers = pg.target_set; - txn.push_back(json11::Json::object { - { "request_delete_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, - } } - }); - } } - etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json res) + etcd_txn(json11::Json::object { { "success", txn } }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json res) { + etcd_reporting_stats = false; if (err != "") { - etcd_failed_attempts++; - printf("Error reporting state to etcd: %s\n", err.c_str()); - if (etcd_failed_attempts > MAX_ETCD_ATTEMPTS) - { - throw std::runtime_error("Cluster connection failed"); - } - // Retry - tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id) + printf("[OSD %lu] Error reporting state to etcd: %s\n", this->osd_num, err.c_str()); + // Retry indefinitely + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) { report_statistics(); }); @@ -211,45 +216,141 @@ void osd_t::report_statistics() else if (res["error"].string_value() != "") { printf("[OSD %lu] Error reporting state to etcd: %s\n", this->osd_num, res["error"].string_value().c_str()); - exit(1); - } - else - { - etcd_failed_attempts = 0; + force_stop(1); } }); } +void osd_t::start_etcd_watcher() +{ + etcd_watches_initialised = 0; + etcd_watch_ws = open_websocket(etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this](const http_response_t *msg) + { + if (msg->body.length()) + { + std::string json_err; + json11::Json data = json11::Json::parse(msg->body, json_err); + if (json_err != "") + { + printf("Bad JSON in etcd event: %s, ignoring event\n", json_err.c_str()); + } + else + { + if (data["result"]["created"].bool_value()) + { + etcd_watches_initialised++; + } + if (etcd_watches_initialised == 4) + { + etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); + } + // First gather all changes into a hash to remove multiple overwrites + json11::Json::object changes; + for (auto & ev: data["result"]["events"].array_items()) + { + auto kv = parse_etcd_kv(ev["kv"]); + if (kv.key != "") + { + changes[kv.key] = kv.value; + } + } + for (auto & kv: changes) + { + if (this->log_level > 0) + { + printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str()); + } + if (kv.first.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/") + { + parse_etcd_osd_state(kv.first, kv.second); + } + else + { + parse_pg_state(kv.first, kv.second); + } + } + apply_pg_count(); + apply_pg_config(); + } + } + if (msg->eof) + { + etcd_watch_ws = NULL; + if (etcd_watches_initialised == 0) + { + // Connection not established, retry in + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int) + { + start_etcd_watcher(); + }); + } + else + { + // Connection was live, retry immediately + start_etcd_watcher(); + } + } + }); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/config/") }, + { "range_end", base64_encode(etcd_prefix+"/config0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_CONFIG_WATCH_ID }, + } } + }).dump()); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/osd/state/") }, + { "range_end", base64_encode(etcd_prefix+"/osd/state0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_OSD_STATE_WATCH_ID }, + } } + }).dump()); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/state/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_PG_STATE_WATCH_ID }, + } } + }).dump()); + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "create_request", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/history/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, + { "start_revision", etcd_watch_revision+1 }, + { "watch_id", ETCD_PG_HISTORY_WATCH_ID }, + } } + }).dump()); +} + void osd_t::load_global_config() { etcd_call("/kv/range", json11::Json::object { { "key", base64_encode(etcd_prefix+"/config/osd/all") } - }, [this](std::string err, json11::Json data) + }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) { if (err != "") { printf("Error reading OSD configuration from etcd: %s\n", err.c_str()); - tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id) + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) { load_global_config(); }); return; } - etcd_watch_revision = data["header"]["revision"].string_value(); + if (!etcd_watch_revision) + { + etcd_watch_revision = data["header"]["revision"].uint64_value(); + } if (data["kvs"].array_items().size() > 0) { - std::string key = base64_decode(data["kvs"][0]["key"].string_value()); - std::string json_text = base64_decode(data["kvs"][0]["value"].string_value()); - std::string json_err; - json11::Json value = json11::Json::parse(json_text, json_err); - if (json_err != "") - { - printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); - } - else + auto kv = parse_etcd_kv(data["kvs"][0]); + if (kv.value.is_object()) { blockstore_config_t osd_config = this->config; - for (auto & cfg_var: value.object_items()) + for (auto & cfg_var: kv.value.object_items()) { if (this->config.find(cfg_var.first) == this->config.end()) { @@ -260,6 +361,7 @@ void osd_t::load_global_config() } } bind_socket(); + start_etcd_watcher(); acquire_lease(); }); } @@ -267,21 +369,22 @@ void osd_t::load_global_config() // Acquire lease void osd_t::acquire_lease() { + // Maximum lease TTL is (report interval) + retries * (timeout + repeat interval) etcd_call("/lease/grant", json11::Json::object { - { "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*ETCD_RETRY_INTERVAL+999)/1000 } - }, [this](std::string err, json11::Json data) + { "TTL", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 } + }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) { if (err != "" || data["ID"].string_value() == "") { printf("Error acquiring a lease from etcd: %s\n", err.c_str()); - tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id) + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) { acquire_lease(); }); return; } etcd_lease_id = data["ID"].string_value(); - create_state(); + create_osd_state(); }); printf("[OSD %lu] reporting to etcd at %s every %d seconds\n", this->osd_num, etcd_address.c_str(), etcd_report_interval); tfd->set_timer(etcd_report_interval*1000, true, [this](int timer_id) @@ -292,7 +395,7 @@ void osd_t::acquire_lease() // Report "up" state once, then keep it alive using the lease // Do it first to allow "monitors" check it when moving PGs -void osd_t::create_state() +void osd_t::create_osd_state() { std::string state_key = base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)); etcd_txn(json11::Json::object { @@ -308,7 +411,7 @@ void osd_t::create_state() json11::Json::object { { "request_put", json11::Json::object { { "key", state_key }, - { "value", base64_encode(get_status().dump()) }, + { "value", base64_encode(get_osd_state().dump()) }, { "lease", etcd_lease_id }, } } }, @@ -320,28 +423,27 @@ void osd_t::create_state() } } }, } }, - }, [this](std::string err, json11::Json data) + }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) { if (err != "") { - // FIXME Retry? + // FIXME Retry? But etcd should be already UP because we've just got the lease printf("Error reporting OSD state to etcd: %s\n", err.c_str()); - exit(1); + force_stop(1); + return; } - if (data["responses"][0]["response_range"].is_object()) + if (!data["succeeded"].bool_value()) { // OSD is already up - auto & kv = data["responses"][0]["response_range"]["kvs"][0]; - std::string key = base64_decode(kv["key"].string_value()); - std::string json_err; - json11::Json state = json11::Json::parse(base64_decode(kv["value"].string_value()), json_err); - printf("Key %s already exists in etcd, OSD %lu is still up\n", key.c_str(), this->osd_num); - int64_t port = state["port"].int64_value(); - for (auto & addr: state["addresses"].array_items()) + auto kv = parse_etcd_kv(data["responses"][0]["response_range"]["kvs"][0]); + printf("Key %s already exists in etcd, OSD %lu is still up\n", kv.key.c_str(), this->osd_num); + int64_t port = kv.value["port"].int64_value(); + for (auto & addr: kv.value["addresses"].array_items()) { printf(" listening at: %s:%ld\n", addr.string_value().c_str(), port); } - exit(0); + force_stop(0); + return; } if (run_primary) { @@ -355,7 +457,7 @@ void osd_t::renew_lease() { etcd_call("/lease/keepalive", json11::Json::object { { "ID", etcd_lease_id } - }, [this](std::string err, json11::Json data) + }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) { if (err == "" && data["result"]["TTL"].string_value() == "") { @@ -372,7 +474,7 @@ void osd_t::renew_lease() throw std::runtime_error("Cluster connection failed"); } // Retry - tfd->set_timer(ETCD_RETRY_INTERVAL, false, [this](int timer_id) + tfd->set_timer(ETCD_QUICK_TIMEOUT, false, [this](int timer_id) { renew_lease(); }); @@ -385,38 +487,40 @@ void osd_t::renew_lease() }); } -void osd_t::force_stop() +void osd_t::force_stop(int exitcode) { if (etcd_lease_id != "") { etcd_call("/kv/lease/revoke", json11::Json::object { { "ID", etcd_lease_id } - }, [this](std::string err, json11::Json data) + }, ETCD_QUICK_TIMEOUT, [this, exitcode](std::string err, json11::Json data) { if (err != "") { printf("Error revoking etcd lease: %s\n", err.c_str()); } printf("[OSD %lu] Force stopping\n", this->osd_num); - exit(0); + exit(exitcode); }); } else { printf("[OSD %lu] Force stopping\n", this->osd_num); - exit(0); + exit(exitcode); } } void osd_t::load_pgs() { assert(this->pgs.size() == 0); - json11::Json::array txn = { + json11::Json::array checks = { json11::Json::object { - { "request_range", json11::Json::object { - { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, - } } - }, + { "target", "LEASE" }, + { "lease", etcd_lease_id }, + { "key", base64_encode(etcd_prefix+"/osd/state/"+std::to_string(osd_num)) }, + } + }; + json11::Json::array txn = { json11::Json::object { { "request_range", json11::Json::object { { "key", base64_encode(etcd_prefix+"/config/pgs") }, @@ -428,141 +532,436 @@ void osd_t::load_pgs() { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, } } }, + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/state/") }, + { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, + } } + }, }; - etcd_txn(json11::Json::object { { "success", txn } }, [this](std::string err, json11::Json data) + etcd_txn(json11::Json::object { + { "compare", checks }, { "success", txn } + }, ETCD_SLOW_TIMEOUT, [this](std::string err, json11::Json data) { if (err != "") { printf("Error loading PGs from etcd: %s\n", err.c_str()); - tfd->set_timer(ETCD_START_INTERVAL, false, [this](int timer_id) + tfd->set_timer(ETCD_SLOW_TIMEOUT, false, [this](int timer_id) { load_pgs(); }); return; } - if (!data["responses"].array_items().size()) + if (!data["succeeded"].bool_value()) { printf("Error loading PGs from etcd: lease expired\n"); - exit(1); + force_stop(1); + return; } peering_state &= ~OSD_LOADING_PGS; - json11::Json pg_config; - std::map pg_history; - bool lease_valid = false; for (auto & res: data["responses"].array_items()) { - for (auto & kvs: res["response_range"]["kvs"].array_items()) + for (auto & kv_json: res["response_range"]["kvs"].array_items()) { - std::string key = base64_decode(kvs["key"].string_value()); - std::string json_err, json_text = base64_decode(kvs["value"].string_value()); - json11::Json value = json11::Json::parse(json_text, json_err); - if (json_err != "") - { - printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); - } - else if (key == etcd_prefix+"/osd/state/"+std::to_string(osd_num)) - { - lease_valid = kvs["lease"].string_value() == etcd_lease_id; - } - else if (key == etcd_prefix+"/config/pgs") - { - pg_config = value; - } - else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") - { - // /pg/history/%d - pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12)); - if (pg_num) - { - pg_history[pg_num] = value; - } - } + auto kv = parse_etcd_kv(kv_json); + parse_pg_state(kv.key, kv.value); } } - if (!lease_valid) - { - printf("Error loading PGs from etcd: lease expired\n"); - exit(1); - } - parse_pgs(pg_config, pg_history); - report_statistics(); + apply_pg_count(); + apply_pg_config(); }); } -void osd_t::parse_pgs(const json11::Json & pg_config, const std::map & pg_history) +void osd_t::parse_pg_state(const std::string & key, const json11::Json & value) { - uint64_t pg_count = 0; - for (auto pg_item: pg_config.object_items()) + if (key == etcd_prefix+"/config/pgs") { - pg_num_t pg_num = stoull_full(pg_item.first); + for (auto & pg_item: this->pg_config) + { + pg_item.second.exists = false; + } + for (auto & pg_item: value.object_items()) + { + pg_num_t pg_num = stoull_full(pg_item.first); + if (!pg_num) + { + printf("Bad key in PG configuration: %s (must be a number), skipped\n", pg_item.first.c_str()); + continue; + } + this->pg_config[pg_num].exists = true; + this->pg_config[pg_num].pause = pg_item.second["pause"].bool_value(); + this->pg_config[pg_num].primary = pg_item.second["primary"].uint64_value(); + this->pg_config[pg_num].target_set.clear(); + for (auto pg_osd: pg_item.second["osd_set"].array_items()) + { + this->pg_config[pg_num].target_set.push_back(pg_osd.uint64_value()); + } + if (this->pg_config[pg_num].target_set.size() != 3) + { + printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str()); + this->pg_config[pg_num].target_set.resize(3); + this->pg_config[pg_num].pause = true; + } + } + } + else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") + { + // /pg/history/%d + pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+12)); if (!pg_num) { - throw std::runtime_error("Bad key in PG hash: "+pg_item.first); + printf("Bad etcd key %s, ignoring\n", key.c_str()); } - auto & pg_json = pg_item.second; - osd_num_t primary_osd = pg_json["primary"].uint64_value(); - if (primary_osd != 0 && primary_osd == this->osd_num) + else + { + this->pg_config[pg_num].target_history.clear(); + for (auto hist_item: value.array_items()) + { + std::vector history_set; + for (auto pg_osd: hist_item["osd_set"].array_items()) + { + history_set.push_back(pg_osd.uint64_value()); + } + this->pg_config[pg_num].target_history.push_back(history_set); + } + } + } + else if (key.substr(0, etcd_prefix.length()+10) == etcd_prefix+"/pg/state/") + { + // /pg/state/%d + pg_num_t pg_num = stoull_full(key.substr(etcd_prefix.length()+10)); + if (!pg_num) + { + printf("Bad etcd key %s, ignoring\n", key.c_str()); + } + else if (value.is_null()) + { + this->pg_config[pg_num].cur_primary = 0; + this->pg_config[pg_num].cur_state = 0; + } + else + { + osd_num_t cur_primary = value["primary"].uint64_value(); + int state = 0; + for (auto & e: value["state"].array_items()) + { + int i; + for (i = 0; i < pg_state_bit_count; i++) + { + if (e.string_value() == pg_state_names[i]) + { + state = state | pg_state_bits[i]; + break; + } + } + if (i >= pg_state_bit_count) + { + printf("Unexpected PG %u state keyword in etcd: %s\n", pg_num, e.dump().c_str()); + force_stop(1); + return; + } + } + if (!cur_primary || !value["state"].is_array() || !state || + (state & PG_OFFLINE) && state != PG_OFFLINE || + (state & PG_PEERING) && state != PG_PEERING || + (state & PG_INCOMPLETE) && state != PG_INCOMPLETE) + { + printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str()); + force_stop(1); + return; + } + this->pg_config[pg_num].cur_primary = cur_primary; + this->pg_config[pg_num].cur_state = state; + } + } +} + +void osd_t::apply_pg_count() +{ + pg_num_t pg_count = pg_config.size(); + if (pg_count > 0 && (pg_config.begin()->first != 1 || std::prev(pg_config.end())->first != pg_count)) + { + printf("Invalid PG configuration: PG numbers don't cover the whole 1..%d range\n", pg_count); + force_stop(1); + return; + } + if (this->pg_count != 0 && this->pg_count != pg_count) + { + // Check that all PGs are offline. It is not allowed to change PG count when any PGs are online + // The external tool must wait for all PGs to come down before changing PG count + // If it doesn't wait, a restarted OSD may apply the new count immediately which will lead to bugs + // So an OSD just dies if it detects PG count change while there are active PGs + int still_active = 0; + for (auto & kv: pgs) + { + if (kv.second.state & PG_ACTIVE) + { + still_active++; + } + } + if (still_active > 0) + { + printf("[OSD %lu] PG count change detected, but %d PG(s) are still active. This is not allowed. Exiting\n", this->osd_num, still_active); + force_stop(1); + return; + } + } + this->pg_count = pg_count; +} + +void osd_t::apply_pg_config() +{ + bool all_applied = true; + for (auto & kv: pg_config) + { + pg_num_t pg_num = kv.first; + auto & pg_cfg = kv.second; + bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num && + !pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num); + bool currently_taken = this->pgs.find(pg_num) != this->pgs.end() && + this->pgs[pg_num].state != PG_OFFLINE; + if (currently_taken && !take) + { + // Stop this PG + stop_pg(pg_num); + } + else if (take) { // Take this PG std::set all_peers; - std::vector target_set; - for (auto pg_osd_num: pg_json["osd_set"].array_items()) + for (osd_num_t pg_osd: pg_cfg.target_set) { - osd_num_t pg_osd = pg_osd_num.uint64_value(); - target_set.push_back(pg_osd); if (pg_osd != 0) { all_peers.insert(pg_osd); } } - if (target_set.size() != 3) + for (auto & hist_item: pg_cfg.target_history) { - throw std::runtime_error("Bad PG "+std::to_string(pg_num)+" config format: incorrect osd_set"); - } - std::vector> target_history; - auto hist_it = pg_history.find(pg_num); - if (hist_it != pg_history.end()) - { - for (auto hist_item: hist_it->second.array_items()) + for (auto pg_osd: hist_item) { - std::vector history_set; - for (auto pg_osd_num: hist_item["osd_set"].array_items()) + if (pg_osd != 0) { - osd_num_t pg_osd = pg_osd_num.uint64_value(); - history_set.push_back(pg_osd); - if (pg_osd != 0) - { - all_peers.insert(pg_osd); - } + all_peers.insert(pg_osd); } - target_history.push_back(history_set); + } + } + if (currently_taken) + { + if (this->pgs[pg_num].state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING)) + { + if (this->pgs[pg_num].target_set == pg_cfg.target_set) + { + // No change in osd_set; history changes are ignored + continue; + } + else + { + // Stop PG, reapply change after stopping + stop_pg(pg_num); + all_applied = false; + continue; + } + } + else if (this->pgs[pg_num].state & PG_STOPPING) + { + // Reapply change after stopping + all_applied = false; + continue; + } + else if (this->pgs[pg_num].state & PG_STARTING) + { + if (pg_cfg.cur_primary == this->osd_num) + { + // PG locked, continue + } + else + { + // Reapply change after locking the PG + all_applied = false; + continue; + } + } + else + { + throw std::runtime_error("Unexpected PG "+std::to_string(pg_num)+" state: "+std::to_string(this->pgs[pg_num].state)); } } this->pgs[pg_num] = (pg_t){ - .state = PG_PEERING, + .state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING, .pg_cursize = 0, .pg_num = pg_num, .all_peers = std::vector(all_peers.begin(), all_peers.end()), - .target_history = target_history, - .target_set = target_set, + .target_history = pg_cfg.target_history, + .target_set = pg_cfg.target_set, }; - // Add peers - for (auto pg_osd: all_peers) + this->pg_state_dirty.insert(pg_num); + if (pg_cfg.cur_primary == this->osd_num) { - if (pg_osd != this->osd_num && osd_peer_fds.find(pg_osd) == osd_peer_fds.end()) + // Add peers + for (auto pg_osd: all_peers) { - wanted_peers[pg_osd] = { 0 }; + if (pg_osd != this->osd_num && osd_peer_fds.find(pg_osd) == osd_peer_fds.end()) + { + wanted_peers[pg_osd] = { 0 }; + } } + start_pg_peering(pg_num); + } + else + { + // Reapply change after locking the PG + all_applied = false; } - start_pg_peering(pg_num); } - pg_count++; } - this->pg_count = pg_count; if (wanted_peers.size() > 0) { peering_state |= OSD_CONNECTING_PEERS; } + report_pg_states(); + this->pg_config_applied = all_applied; +} + +void osd_t::report_pg_states() +{ + if (etcd_reporting_pg_state || !this->pg_state_dirty.size() || !etcd_address.length()) + { + return; + } + etcd_reporting_pg_state = true; + std::vector reporting_pgs(pg_state_dirty.begin(), pg_state_dirty.end()); + pg_state_dirty.clear(); + json11::Json::array checks; + json11::Json::array success; + json11::Json::array failure; + for (auto pg_num: reporting_pgs) + { + auto & pg = this->pgs[pg_num]; + std::string state_key_base64 = base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg_num)); + if (pg.state == PG_STARTING) + { + // Check that the PG key does not exist + // Failed check indicates an unsuccessful PG lock attempt in this case + checks.push_back(json11::Json::object { + { "target", "VERSION" }, + { "version", 0 }, + { "key", state_key_base64 }, + }); + } + else + { + // Check that the key is ours + // Failed check indicates success for OFFLINE pgs (PG lock is already deleted) + // and an unexpected race condition for started pgs (PG lock is held by someone else) + checks.push_back(json11::Json::object { + { "target", "LEASE" }, + { "lease", etcd_lease_id }, + { "key", state_key_base64 }, + }); + } + if (pg.state == PG_OFFLINE) + { + success.push_back(json11::Json::object { + { "request_delete_range", json11::Json::object { + { "key", state_key_base64 }, + } } + }); + } + else + { + json11::Json::array pg_state_keywords; + for (int i = 0; i < pg_state_bit_count; i++) + { + if (pg.state & pg_state_bits[i]) + { + pg_state_keywords.push_back(pg_state_names[i]); + } + } + success.push_back(json11::Json::object { + { "request_put", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) }, + { "value", base64_encode(json11::Json(json11::Json::object { + { "primary", this->osd_num }, + { "state", pg_state_keywords }, + }).dump()) }, + { "lease", etcd_lease_id }, + } } + }); + if (pg.state == PG_ACTIVE && pg.target_history.size() > 0) + { + // Clear history of active+clean PGs + success.push_back(json11::Json::object { + { "request_delete_range", json11::Json::object { + { "key", base64_encode(etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num)) }, + } } + }); + } + } + failure.push_back(json11::Json::object { + { "request_range", json11::Json::object { + { "key", state_key_base64 }, + } } + }); + } + etcd_txn(json11::Json::object { + { "compare", checks }, { "success", success }, { "failure", failure } + }, ETCD_QUICK_TIMEOUT, [this, reporting_pgs](std::string err, json11::Json data) + { + etcd_reporting_pg_state = false; + if (!data["succeeded"].bool_value()) + { + // One of PG state updates failed + for (auto pg_num: reporting_pgs) + { + this->pg_state_dirty.insert(pg_num); + } + for (auto & res: data["responses"].array_items()) + { + auto kv = parse_etcd_kv(res["kvs"][0]); + pg_num_t pg_num = stoull_full(kv.key.substr(etcd_prefix.length()+10)); + auto pg_it = pgs.find(pg_num); + if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING) + { + // Live PG state update failed + printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num); + force_stop(1); + return; + } + } + // Retry after a short pause (hope we'll get some updates and update PG states accordingly) + tfd->set_timer(500, false, [this](int) { report_pg_states(); }); + } + else + { + // Success. We'll get our changes back via the watcher and react to them + for (auto pg_num: reporting_pgs) + { + auto pg_it = this->pgs.find(pg_num); + if (pg_it != this->pgs.end()) + { + if (pg_it->second.state == PG_OFFLINE) + { + // Remove offline PGs after reporting their state + this->pgs.erase(pg_it); + } + else if (pg_it->second.state == PG_ACTIVE && pg_it->second.target_history.size() > 0) + { + // Clear history of active+clean PGs + pg_it->second.target_history.clear(); + pg_it->second.all_peers = pg_it->second.target_set; + } + } + } + // Push other PG state updates, if any + report_pg_states(); + if (!this->pg_state_dirty.size()) + { + // Update statistics + report_statistics(); + } + } + }); } void osd_t::load_and_connect_peers() @@ -643,7 +1042,7 @@ void osd_t::load_and_connect_peers() } if (load_peer_txn.size() > 0) { - etcd_txn(json11::Json::object { { "success", load_peer_txn } }, [this](std::string err, json11::Json data) + etcd_txn(json11::Json::object { { "success", load_peer_txn } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) { // Ugly, but required to wake up the loop tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){}); @@ -657,23 +1056,22 @@ void osd_t::load_and_connect_peers() { if (res["response_range"]["kvs"].array_items().size()) { - std::string key = base64_decode(res["response_range"]["kvs"][0]["key"].string_value()); - // /osd/state/ - osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11)); - std::string json_err; - std::string json_text = base64_decode(res["response_range"]["kvs"][0]["value"].string_value()); - json11::Json st = json11::Json::parse(json_text, json_err); - if (json_err != "") - { - printf("Bad JSON in etcd key %s: %s (value: %s)\n", key.c_str(), json_err.c_str(), json_text.c_str()); - } - if (peer_osd > 0 && st.is_object() && st["state"] == "up" && - st["addresses"].is_array() && st["port"].int64_value() > 0 && st["port"].int64_value() < 65536) - { - peer_states[peer_osd] = st; - } + auto kv = parse_etcd_kv(res["response_range"]["kvs"][0]); + parse_etcd_osd_state(kv.key, kv.value); } } }); } } + +void osd_t::parse_etcd_osd_state(const std::string & key, const json11::Json & value) +{ + // /osd/state/ + osd_num_t peer_osd = std::stoull(key.substr(etcd_prefix.length()+11)); + if (peer_osd > 0 && value.is_object() && value["state"] == "up" && + value["addresses"].is_array() && + value["port"].int64_value() > 0 && value["port"].int64_value() < 65536) + { + peer_states[peer_osd] = value; + } +} diff --git a/osd_flush.cpp b/osd_flush.cpp index 0264ceb8..80437aaa 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -129,12 +129,16 @@ void osd_t::handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd if (!pg.flush_actions.size()) { pg.state = pg.state & ~PG_HAS_UNCLEAN; - pg.print_state(); + report_pg_state(pg); } for (osd_op_t *op: continue_ops) { continue_primary_write(op); } + if (pg.inflight == 0 && (pg.state & PG_STOPPING)) + { + finish_stop_pg(pg); + } } } diff --git a/osd_http.cpp b/osd_http.cpp index 076ab492..16622340 100644 --- a/osd_http.cpp +++ b/osd_http.cpp @@ -18,6 +18,7 @@ static std::string trim(const std::string & in); static std::string ws_format_frame(int type, uint64_t size); static bool ws_parse_frame(std::string & buf, int & type, std::string & res); +// FIXME: Use keepalive struct http_co_t { ring_loop_t *ringloop; @@ -66,15 +67,19 @@ struct http_co_t #define HTTP_CO_WEBSOCKET 5 #define HTTP_CO_CHUNKED 6 -void osd_t::http_request(std::string host, std::string request, bool streaming, std::function callback) +#define DEFAULT_TIMEOUT 5000 + +// FIXME: Remove osd_t dependency from here +void osd_t::http_request(const std::string & host, const std::string & request, + const http_options_t & options, std::function callback) { http_co_t *handler = new http_co_t(); handler->ringloop = ringloop; handler->epoll_fd = epoll_fd; handler->epoll_handlers = &epoll_handlers; - handler->request_timeout = http_request_timeout; + handler->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout); + handler->want_streaming = options.want_streaming; handler->tfd = tfd; - handler->want_streaming = streaming; handler->host = host; handler->request = request; handler->callback = callback; @@ -82,10 +87,10 @@ void osd_t::http_request(std::string host, std::string request, bool streaming, handler->start_connection(); } -void osd_t::http_request_json(std::string host, std::string request, - std::function callback) +void osd_t::http_request_json(const std::string & host, const std::string & request, + int timeout, std::function callback) { - http_request(host, request, false, [this, callback](const http_response_t* res) + http_request(host, request, { .timeout = timeout }, [this, callback](const http_response_t* res) { if (res->error_code != 0) { @@ -108,7 +113,8 @@ void osd_t::http_request_json(std::string host, std::string request, }); } -websocket_t* osd_t::open_websocket(std::string host, std::string path, std::function callback) +websocket_t* osd_t::open_websocket(const std::string & host, const std::string & path, + int timeout, std::function callback) { std::string request = "GET "+path+" HTTP/1.1\r\n" "Host: "+host+"\r\n" @@ -121,9 +127,9 @@ websocket_t* osd_t::open_websocket(std::string host, std::string path, std::func handler->ringloop = ringloop; handler->epoll_fd = epoll_fd; handler->epoll_handlers = &epoll_handlers; - handler->request_timeout = http_request_timeout; - handler->tfd = tfd; + handler->request_timeout = timeout < 0 ? -1 : (timeout == 0 ? DEFAULT_TIMEOUT : timeout); handler->want_streaming = false; + handler->tfd = tfd; handler->host = host; handler->request = request; handler->callback = callback; @@ -215,11 +221,11 @@ void http_co_t::start_connection() fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); if (request_timeout > 0) { - timeout_id = tfd->set_timer(1000*request_timeout, false, [this](int timer_id) + timeout_id = tfd->set_timer(request_timeout, false, [this](int timer_id) { if (response.length() == 0) { - parsed.error_code = EIO; + parsed.error_code = ETIME; } delete this; }); @@ -440,8 +446,8 @@ void http_co_t::handle_read() if (!len) { // Zero length chunk indicates EOF - delete this; - return; + parsed.eof = true; + break; } if (response.size() < pos+2+len+2) { @@ -454,6 +460,11 @@ void http_co_t::handle_read() { response = response.substr(prev); } + if (parsed.eof) + { + delete this; + return; + } if (want_streaming && parsed.body.size() > 0) { callback(&parsed); diff --git a/osd_http.h b/osd_http.h index 10c1e866..d2bb439a 100644 --- a/osd_http.h +++ b/osd_http.h @@ -10,6 +10,12 @@ #define WS_PING 9 #define WS_PONG 10 +struct http_options_t +{ + int timeout; + bool want_streaming; +}; + struct http_response_t { bool eof = false; diff --git a/osd_main.cpp b/osd_main.cpp index 15085be3..dbf673af 100644 --- a/osd_main.cpp +++ b/osd_main.cpp @@ -10,7 +10,7 @@ static void handle_sigint(int sig) if (osd && !force_stopping) { force_stopping = true; - osd->force_stop(); + osd->force_stop(0); return; } exit(0); diff --git a/osd_peering.cpp b/osd_peering.cpp index 5e6ebae0..aa82151a 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -168,12 +168,13 @@ void osd_t::handle_peers() { if (!p.second.peering_state->list_ops.size()) { - p.second.calc_object_states(); + p.second.calc_object_states(log_level); + report_pg_state(p.second); incomplete_objects += p.second.incomplete_objects.size(); misplaced_objects += p.second.misplaced_objects.size(); // FIXME: degraded objects may currently include misplaced, too! Report them separately? degraded_objects += p.second.degraded_objects.size(); - if (p.second.state & PG_HAS_UNCLEAN) + if (p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN) == (PG_ACTIVE | PG_HAS_UNCLEAN)) peering_state = peering_state | OSD_FLUSHING_PGS; else peering_state = peering_state | OSD_RECOVERING; @@ -195,7 +196,7 @@ void osd_t::handle_peers() bool still = false; for (auto & p: pgs) { - if (p.second.state & PG_HAS_UNCLEAN) + if (p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN) == (PG_ACTIVE | PG_HAS_UNCLEAN)) { if (!p.second.flush_batch) { @@ -224,7 +225,7 @@ void osd_t::repeer_pgs(osd_num_t peer_osd) for (auto & p: pgs) { bool repeer = false; - if (p.second.state != PG_OFFLINE) + if (p.second.state & (PG_PEERING | PG_ACTIVE | PG_INCOMPLETE)) { for (osd_num_t pg_osd: p.second.all_peers) { @@ -239,7 +240,6 @@ void osd_t::repeer_pgs(osd_num_t peer_osd) // Repeer this pg printf("[PG %u] Repeer because of OSD %lu\n", p.second.pg_num, peer_osd); start_pg_peering(p.second.pg_num); - peering_state |= OSD_PEERING_PGS; } } } @@ -250,7 +250,8 @@ void osd_t::start_pg_peering(pg_num_t pg_num) { auto & pg = pgs[pg_num]; pg.state = PG_PEERING; - pg.print_state(); + this->peering_state |= OSD_PEERING_PGS; + report_pg_state(pg); // Reset PG state pg.state_dict.clear(); incomplete_objects -= pg.incomplete_objects.size(); @@ -312,14 +313,14 @@ void osd_t::start_pg_peering(pg_num_t pg_num) if (!found) { pg.state = PG_INCOMPLETE; - pg.print_state(); + report_pg_state(pg); } } } if (pg.pg_cursize < pg.pg_minsize) { pg.state = PG_INCOMPLETE; - pg.print_state(); + report_pg_state(pg); } std::set cur_peers; for (auto peer_osd: pg.all_peers) @@ -342,25 +343,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) if (pg.state == PG_INCOMPLETE || cur_peers.find(it->first) == cur_peers.end()) { // Discard the result after completion, which, chances are, will be unsuccessful - auto list_op = it->second; - if (list_op->peer_fd == 0) - { - // Self - list_op->bs_op->callback = [list_op](blockstore_op_t *bs_op) - { - if (list_op->bs_op->buf) - free(list_op->bs_op->buf); - delete list_op; - }; - } - else - { - // Peer - list_op->callback = [](osd_op_t *list_op) - { - delete list_op; - }; - } + discard_list_subop(it->second); pg.peering_state->list_ops.erase(it); it = pg.peering_state->list_ops.begin(); } @@ -491,6 +474,28 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) } } +void osd_t::discard_list_subop(osd_op_t *list_op) +{ + if (list_op->peer_fd == 0) + { + // Self + list_op->bs_op->callback = [list_op](blockstore_op_t *bs_op) + { + if (list_op->bs_op->buf) + free(list_op->bs_op->buf); + delete list_op; + }; + } + else + { + // Peer + list_op->callback = [](osd_op_t *list_op) + { + delete list_op; + }; + } +} + bool osd_t::stop_pg(pg_num_t pg_num) { auto pg_it = pgs.find(pg_num); @@ -499,19 +504,52 @@ bool osd_t::stop_pg(pg_num_t pg_num) return false; } auto & pg = pg_it->second; + if (pg.peering_state) + { + // Stop peering + for (auto it = pg.peering_state->list_ops.begin(); it != pg.peering_state->list_ops.end();) + { + discard_list_subop(it->second); + } + for (auto it = pg.peering_state->list_results.begin(); it != pg.peering_state->list_results.end();) + { + if (it->second.buf) + { + free(it->second.buf); + } + } + delete pg.peering_state; + pg.peering_state = NULL; + } if (!(pg.state & PG_ACTIVE)) { return false; } pg.state = pg.state & ~PG_ACTIVE | PG_STOPPING; - if (pg.inflight == 0) + if (pg.inflight == 0 && !pg.flush_batch) { finish_stop_pg(pg); } + else + { + report_pg_state(pg); + } return true; } void osd_t::finish_stop_pg(pg_t & pg) { pg.state = PG_OFFLINE; + report_pg_state(pg); +} + +void osd_t::report_pg_state(pg_t & pg) +{ + pg.print_state(); + this->pg_state_dirty.insert(pg.pg_num); + if (pg.state == PG_OFFLINE && !this->pg_config_applied) + { + apply_pg_config(); + } + report_pg_states(); } diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 6cfa0a0d..e846642c 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -43,6 +43,7 @@ struct pg_obj_state_check_t uint64_t n_copies = 0, has_roles = 0, n_roles = 0, n_stable = 0, n_mismatched = 0; uint64_t n_unstable = 0, n_buggy = 0; pg_osd_set_t osd_set; + int log_level; void walk(); void start_object(); @@ -198,14 +199,16 @@ void pg_obj_state_check_t::finish_object() } if (n_roles < pg->pg_minsize) { - printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); - for (int i = ver_start; i < ver_end; i++) + if (log_level > 1) { - printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + printf("Object is incomplete: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); + for (int i = ver_start; i < ver_end; i++) + { + printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + } } - if (0) + if (log_level > 2) { - // For future debug level for (int i = obj_start; i < obj_end; i++) { printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); @@ -216,10 +219,13 @@ void pg_obj_state_check_t::finish_object() } else if (n_roles < pg->pg_cursize) { - printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); - for (int i = ver_start; i < ver_end; i++) + if (log_level > 1) { - printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + printf("Object is degraded: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver); + for (int i = ver_start; i < ver_end; i++) + { + printf("Present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : ""); + } } state = OBJ_DEGRADED; pg->state = pg->state | PG_HAS_DEGRADED; @@ -321,10 +327,11 @@ void pg_obj_state_check_t::finish_object() } // FIXME: Write at least some tests for this function -void pg_t::calc_object_states() +void pg_t::calc_object_states(int log_level) { // Copy all object lists into one array pg_obj_state_check_t st; + st.log_level = log_level; st.pg = this; auto ps = peering_state; for (auto it: ps->list_results) @@ -352,12 +359,10 @@ void pg_t::calc_object_states() std::sort(st.list.begin(), st.list.end()); // Walk over it and check object states st.walk(); - print_state(); } void pg_t::print_state() { - // FIXME Immediately report state on each change printf( "[PG %u] is %s%s%s%s%s%s%s%s%s (%lu objects)\n", pg_num, (state & PG_OFFLINE) ? "offline" : "", @@ -373,13 +378,15 @@ void pg_t::print_state() ); } -const int pg_state_bit_count = 10; +const int pg_state_bit_count = 12; -const int pg_state_bits[10] = { - PG_OFFLINE, +const int pg_state_bits[12] = { + PG_STARTING, PG_PEERING, PG_INCOMPLETE, PG_ACTIVE, + PG_STOPPING, + PG_OFFLINE, PG_DEGRADED, PG_HAS_INCOMPLETE, PG_HAS_DEGRADED, @@ -387,11 +394,13 @@ const int pg_state_bits[10] = { PG_HAS_UNCLEAN, }; -const char *pg_state_names[10] = { - "offline", +const char *pg_state_names[12] = { + "starting", "peering", "incomplete", "active", + "stopping", + "offline", "degraded", "has_incomplete", "has_degraded", diff --git a/osd_peering_pg.h b/osd_peering_pg.h index d07ecc17..10f3d8af 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -9,18 +9,20 @@ #include "osd_ops.h" // Placement group states +// STARTING -> [acquire lock] -> PEERING -> INCOMPLETE|ACTIVE -> STOPPING -> OFFLINE -> [release lock] // Exactly one of these: -#define PG_OFFLINE (1<<0) +#define PG_STARTING (1<<0) #define PG_PEERING (1<<1) #define PG_INCOMPLETE (1<<2) #define PG_ACTIVE (1<<3) #define PG_STOPPING (1<<4) +#define PG_OFFLINE (1<<5) // Plus any of these: -#define PG_DEGRADED (1<<5) -#define PG_HAS_INCOMPLETE (1<<6) -#define PG_HAS_DEGRADED (1<<7) -#define PG_HAS_MISPLACED (1<<8) -#define PG_HAS_UNCLEAN (1<<9) +#define PG_DEGRADED (1<<6) +#define PG_HAS_INCOMPLETE (1<<7) +#define PG_HAS_DEGRADED (1<<8) +#define PG_HAS_MISPLACED (1<<9) +#define PG_HAS_UNCLEAN (1<<10) // FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size #define STRIPE_MASK ((uint64_t)4096 - 1) @@ -33,8 +35,8 @@ #define OBJ_NEEDS_ROLLBACK 0x20000 #define OBJ_BUGGY 0x80000 -extern const int pg_state_bits[10]; -extern const char *pg_state_names[10]; +extern const int pg_state_bits[]; +extern const char *pg_state_names[]; extern const int pg_state_bit_count; struct pg_obj_loc_t @@ -96,7 +98,7 @@ struct pg_flush_batch_t struct pg_t { - int state = PG_OFFLINE; + int state = 0; uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2; pg_num_t pg_num; uint64_t clean_count = 0, total_count = 0; @@ -122,7 +124,7 @@ struct pg_t int inflight = 0; // including write_queue std::multimap write_queue; - void calc_object_states(); + void calc_object_states(int log_level); void print_state(); }; diff --git a/osd_peering_pg_test.cpp b/osd_peering_pg_test.cpp index f4b890c8..2c1f4075 100644 --- a/osd_peering_pg_test.cpp +++ b/osd_peering_pg_test.cpp @@ -1,6 +1,7 @@ #define _LARGEFILE64_SOURCE #include "osd_peering_pg.h" +#define STRIPE_SHIFT 12 /** * TODO tests for object & pg state calculation. @@ -43,7 +44,7 @@ int main(int argc, char *argv[]) } pg.peering_state->list_results[osd_num] = r; } - pg.calc_object_states(); + pg.calc_object_states(0); printf("deviation variants=%ld clean=%lu\n", pg.state_dict.size(), pg.clean_count); for (auto it: pg.state_dict) { diff --git a/osd_primary.cpp b/osd_primary.cpp index 3fda3d01..88a49152 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -51,7 +51,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) auto & pg = pgs[cur_op->op_data->pg_num]; int n = --pg.inflight; assert(n >= 0); - if (n == 0 && (pg.state & PG_STOPPING)) + if ((pg.state & PG_STOPPING) && n == 0 && !pg.flush_batch) { finish_stop_pg(pg); } @@ -513,7 +513,7 @@ resume_8: if (!pg.incomplete_objects.size()) { pg.state = pg.state & ~PG_HAS_INCOMPLETE; - pg.print_state(); + report_pg_state(pg); } } else if (op_data->object_state->state & OBJ_DEGRADED) @@ -523,7 +523,7 @@ resume_8: if (!pg.degraded_objects.size()) { pg.state = pg.state & ~PG_HAS_DEGRADED; - pg.print_state(); + report_pg_state(pg); } } else if (op_data->object_state->state & OBJ_MISPLACED) @@ -533,7 +533,7 @@ resume_8: if (!pg.misplaced_objects.size()) { pg.state = pg.state & ~PG_HAS_MISPLACED; - pg.print_state(); + report_pg_state(pg); } } else