diff --git a/mon/mon.js b/mon/mon.js index 297413d4..470c7454 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -677,6 +677,11 @@ class Mon }, this.etcd_start_timeout, 0); } + get_mon_state() + { + return { ip: this.local_ips(), hostname: os.hostname() }; + } + async get_lease() { const max_ttl = this.config.etcd_mon_ttl + this.config.etcd_mon_timeout/1000*this.config.etcd_mon_retries; @@ -684,7 +689,7 @@ class Mon let res = await this.etcd_call('/lease/grant', { TTL: max_ttl }, this.config.etcd_mon_timeout, -1); this.etcd_lease_id = res.ID; // Register in /mon/member, just for the information - const state = { ip: this.local_ips() }; + const state = this.get_mon_state(); res = await this.etcd_call('/kv/put', { key: b64(this.etcd_prefix+'/mon/member/'+this.etcd_lease_id), value: b64(JSON.stringify(state)), @@ -716,7 +721,7 @@ class Mon async become_master() { - const state = { ip: this.local_ips(), id: ''+this.etcd_lease_id }; + const state = { ...this.get_mon_state(), id: ''+this.etcd_lease_id }; while (1) { const res = await this.etcd_call('/kv/txn', { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 31d9e6c8..12dc5c19 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -154,7 +154,7 @@ target_link_libraries(vitastor-nbd # vitastor-cli add_executable(vitastor-cli - cli.cpp cli_alloc_osd.cpp cli_simple_offsets.cpp cli_df.cpp + cli.cpp cli_alloc_osd.cpp cli_simple_offsets.cpp cli_status.cpp cli_df.cpp cli_ls.cpp cli_create.cpp cli_modify.cpp cli_flatten.cpp cli_merge.cpp cli_rm_data.cpp cli_rm.cpp ) target_link_libraries(vitastor-cli diff --git a/src/cli.cpp b/src/cli.cpp index 010cbcc7..78f646fa 100644 --- a/src/cli.cpp +++ b/src/cli.cpp @@ -86,6 +86,9 @@ void cli_tool_t::help() "(c) Vitaliy Filippov, 2019+ (VNPL-1.1)\n" "\n" "USAGE:\n" + "%s status\n" + " Show cluster status\n" + "\n" "%s df\n" " Show pool space statistics\n" "\n" @@ -155,7 +158,7 @@ void cli_tool_t::help() " --no-color Disable colored output\n" " --json JSON output\n" , - exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, + exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, exe_name, exe_name ); exit(0); @@ -266,6 +269,11 @@ void cli_tool_t::run(json11::Json cfg) fprintf(stderr, "command is missing\n"); exit(1); } + else if (cmd[0] == "status") + { + // Show cluster status + action_cb = start_status(cfg); + } else if (cmd[0] == "df") { // Show pool space stats @@ -340,6 +348,8 @@ void cli_tool_t::run(json11::Json cfg) ringloop = new ring_loop_t(512); epmgr = new epoll_manager_t(ringloop); cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); + // Smaller timeout by default for more interactiveness + cli->st_cli.etcd_slow_timeout = cli->st_cli.etcd_quick_timeout; cli->on_ready([this]() { // Initialize job diff --git a/src/cli.h b/src/cli.h index ae272caa..f7a9b8f3 100644 --- a/src/cli.h +++ b/src/cli.h @@ -51,6 +51,7 @@ public: friend struct snap_flattener_t; friend struct snap_remover_t; + std::function start_status(json11::Json cfg); std::function start_df(json11::Json); std::function start_ls(json11::Json); std::function start_create(json11::Json); @@ -69,7 +70,7 @@ uint64_t parse_size(std::string size_str); std::string print_table(json11::Json items, json11::Json header, bool use_esc); -std::string format_size(uint64_t size); +std::string format_size(uint64_t size, bool nobytes = false); std::string format_lat(uint64_t lat); diff --git a/src/cli_ls.cpp b/src/cli_ls.cpp index 98e67436..f126659e 100644 --- a/src/cli_ls.cpp +++ b/src/cli_ls.cpp @@ -437,22 +437,25 @@ std::string print_table(json11::Json items, json11::Json header, bool use_esc) } static uint64_t size_thresh[] = { 1024l*1024*1024*1024, 1024l*1024*1024, 1024l*1024, 1024, 0 }; +static uint64_t size_thresh_d[] = { 1000000000000l, 1000000000l, 1000000l, 1000l, 0 }; +static const int size_thresh_n = sizeof(size_thresh)/sizeof(size_thresh[0]); static const char *size_unit = "TGMKB"; -std::string format_size(uint64_t size) +std::string format_size(uint64_t size, bool nobytes) { + uint64_t *thr = nobytes ? size_thresh_d : size_thresh; char buf[256]; - for (int i = 0; i < sizeof(size_thresh)/sizeof(size_thresh[0]); i++) + for (int i = 0; i < size_thresh_n; i++) { - if (size >= size_thresh[i] || i >= sizeof(size_thresh)/sizeof(size_thresh[0])-1) + if (size >= thr[i] || i >= size_thresh_n-1) { - double value = size_thresh[i] ? (double)size/size_thresh[i] : size; + double value = thr[i] ? (double)size/thr[i] : size; int l = snprintf(buf, sizeof(buf), "%.1f", value); assert(l < sizeof(buf)-2); if (buf[l-1] == '0') l -= 2; - buf[l] = ' '; - buf[l+1] = size_unit[i]; + buf[l] = i == size_thresh_n-1 && nobytes ? 0 : ' '; + buf[l+1] = i == size_thresh_n-1 && nobytes ? 0 : size_unit[i]; buf[l+2] = 0; break; } diff --git a/src/cli_status.cpp b/src/cli_status.cpp new file mode 100644 index 00000000..0aa7284b --- /dev/null +++ b/src/cli_status.cpp @@ -0,0 +1,295 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) + +#include "cli.h" +#include "cluster_client.h" +#include "base64.h" +#include "pg_states.h" + +// Print cluster status: +// etcd, mon, osd states +// raw/used space, object states, pool states, pg states +// client io, recovery io, rebalance io +struct status_printer_t +{ + cli_tool_t *parent; + + int state = 0; + json11::Json::array mon_members, osd_stats; + json11::Json agg_stats; + std::map pool_stats; + json11::Json::array etcd_states; + + bool is_done() + { + return state == 100; + } + + void loop() + { + if (state == 1) + goto resume_1; + else if (state == 2) + goto resume_2; + // etcd states + { + auto addrs = parent->cli->st_cli.get_addresses(); + etcd_states.resize(addrs.size()); + for (int i = 0; i < etcd_states.size(); i++) + { + parent->waiting++; + parent->cli->st_cli.etcd_call_oneshot( + addrs[i], "/maintenance/status", json11::Json::object(), + parent->cli->st_cli.etcd_quick_timeout, [this, i](std::string err, json11::Json res) + { + parent->waiting--; + etcd_states[i] = err != "" ? json11::Json::object{ { "error", err } } : res; + parent->ringloop->wakeup(); + } + ); + } + } + state = 1; +resume_1: + if (parent->waiting > 0) + return; + // Monitors, OSD states + parent->etcd_txn(json11::Json::object { + { "success", json11::Json::array { + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/mon/") }, + { "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/mon0") }, + } }, + }, + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode( + parent->cli->st_cli.etcd_prefix+"/osd/stats/" + ) }, + { "range_end", base64_encode( + parent->cli->st_cli.etcd_prefix+"/osd/stats0" + ) }, + } }, + }, + json11::Json::object { + { "request_range", json11::Json::object { + { "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/stats") }, + } }, + }, + } }, + }); + state = 2; +resume_2: + if (parent->waiting > 0) + return; + mon_members = parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items(); + osd_stats = parent->etcd_result["responses"][1]["response_range"]["kvs"].array_items(); + if (parent->etcd_result["responses"][2]["response_range"]["kvs"].array_items().size() > 0) + { + agg_stats = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][2]["response_range"]["kvs"][0]).value; + } + int etcd_alive = 0; + uint64_t etcd_db_size = 0; + std::string etcd_detail; + for (int i = 0; i < etcd_states.size(); i++) + { + if (etcd_states[i]["error"].is_null()) + { + etcd_alive++; + etcd_db_size = etcd_states[i]["dbSizeInUse"].uint64_value(); + } + } + int mon_count = 0; + std::string mon_master; + for (int i = 0; i < mon_members.size(); i++) + { + auto kv = parent->cli->st_cli.parse_etcd_kv(mon_members[i]); + kv.key = kv.key.substr(parent->cli->st_cli.etcd_prefix.size()); + if (kv.key.substr(0, 12) == "/mon/member/") + mon_count++; + else if (kv.key == "/mon/master") + { + if (kv.value["hostname"].is_string()) + mon_master = kv.value["hostname"].string_value(); + else + mon_master = kv.value["ip"][0].string_value(); + } + } + int osd_count = 0, osd_up = 0; + uint64_t total_raw = 0, free_raw = 0, free_down_raw = 0, down_raw = 0; + for (int i = 0; i < osd_stats.size(); i++) + { + auto kv = parent->cli->st_cli.parse_etcd_kv(osd_stats[i]); + osd_num_t stat_osd_num = 0; + char null_byte = 0; + sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.size(), "/osd/stats/%lu%c", &stat_osd_num, &null_byte); + if (!stat_osd_num || null_byte != 0) + { + fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str()); + continue; + } + osd_count++; + total_raw += kv.value["size"].uint64_value(); + free_raw += kv.value["free"].uint64_value(); + auto peer_it = parent->cli->st_cli.peer_states.find(stat_osd_num); + if (peer_it != parent->cli->st_cli.peer_states.end()) + { + osd_up++; + } + else + { + down_raw += kv.value["size"].uint64_value(); + free_down_raw += kv.value["size"].uint64_value(); + } + } + int pool_count = 0, pools_active = 0; + std::map pgs_by_state; + std::string pgs_by_state_str; + for (auto & pool_pair: parent->cli->st_cli.pool_config) + { + auto & pool_cfg = pool_pair.second; + bool active = true; + if (pool_cfg.pg_config.size() != pool_cfg.pg_count) + { + active = false; + pgs_by_state["offline"] += pool_cfg.pg_count-pool_cfg.pg_config.size(); + } + pool_count++; + for (auto pg_it = pool_cfg.pg_config.begin(); pg_it != pool_cfg.pg_config.end(); pg_it++) + { + if (!(pg_it->second.cur_state & PG_ACTIVE)) + { + active = false; + } + std::string pg_state_str; + for (int i = 0; i < pg_state_bit_count; i++) + { + if (pg_it->second.cur_state & pg_state_bits[i]) + { + pg_state_str += "+"; + pg_state_str += pg_state_names[i]; + } + } + if (pg_state_str.size()) + pgs_by_state[pg_state_str.substr(1)]++; + else + pgs_by_state["offline"]++; + } + if (active) + { + pools_active++; + } + } + for (auto & kv: pgs_by_state) + { + if (pgs_by_state_str.size()) + { + pgs_by_state_str += "\n "; + } + pgs_by_state_str += std::to_string(kv.second)+" "+kv.first; + } + uint64_t object_size = parent->cli->get_bs_block_size(); + std::string more_states; + uint64_t obj_n; + obj_n = agg_stats["object_counts"]["misplaced"].uint64_value(); + if (obj_n > 0) + more_states += ", "+format_size(obj_n*object_size)+" misplaced"; + obj_n = agg_stats["object_counts"]["degraded"].uint64_value(); + if (obj_n > 0) + more_states += ", "+format_size(obj_n*object_size)+" degraded"; + obj_n = agg_stats["object_counts"]["incomplete"].uint64_value(); + if (obj_n > 0) + more_states += ", "+format_size(obj_n*object_size)+" incomplete"; + std::string recovery_io; + { + uint64_t deg_bps = agg_stats["recovery_stats"]["degraded"]["bps"].uint64_value(); + uint64_t deg_iops = agg_stats["recovery_stats"]["degraded"]["iops"].uint64_value(); + uint64_t misp_bps = agg_stats["recovery_stats"]["misplaced"]["bps"].uint64_value(); + uint64_t misp_iops = agg_stats["recovery_stats"]["misplaced"]["iops"].uint64_value(); + if (deg_iops > 0 || deg_bps > 0) + recovery_io += " recovery: "+format_size(deg_bps)+"/s, "+format_size(deg_iops, true)+" op/s\n"; + if (misp_iops > 0 || misp_bps > 0) + recovery_io += " rebalance: "+format_size(misp_bps)+"/s, "+format_size(misp_iops, true)+" op/s\n"; + } + if (parent->json_output) + { + // JSON output + printf("%s\n", json11::Json(json11::Json::object { + { "etcd_alive", etcd_alive }, + { "etcd_count", etcd_states.size() }, + { "etcd_db_size", etcd_db_size }, + { "mon_count", mon_count }, + { "mon_master", mon_master }, + { "osd_up", osd_up }, + { "osd_count", osd_count }, + { "total_raw", total_raw }, + { "free_raw", free_raw }, + { "down_raw", down_raw }, + { "free_down_raw", free_down_raw }, + { "clean_data", agg_stats["object_counts"]["clean"].uint64_value() * object_size }, + { "misplaced_data", agg_stats["object_counts"]["misplaced"].uint64_value() * object_size }, + { "degraded_data", agg_stats["object_counts"]["degraded"].uint64_value() * object_size }, + { "incomplete_data", agg_stats["object_counts"]["incomplete"].uint64_value() * object_size }, + { "pool_count", pool_count }, + { "active_pool_count", pools_active }, + { "pg_states", pgs_by_state }, + { "op_stats", agg_stats["op_stats"] }, + { "recovery_stats", agg_stats["recovery_stats"] }, + { "object_counts", agg_stats["object_counts"] }, + }).dump().c_str()); + state = 100; + return; + } + printf( + " cluster:\n" + " etcd: %d / %ld up, %s database size\n" + " mon: %d up%s\n" + " osd: %d / %d up\n" + " \n" + " data:\n" + " raw: %s used, %s / %s available%s\n" + " state: %s clean%s\n" + " pools: %d / %d active\n" + " pgs: %s\n" + " \n" + " io:\n" + " client:%s %s/s rd, %s op/s rd, %s/s wr, %s op/s wr\n" + "%s", + etcd_alive, etcd_states.size(), format_size(etcd_db_size).c_str(), + mon_count, mon_master == "" ? "" : (", master "+mon_master).c_str(), + osd_up, osd_count, + format_size(total_raw-free_raw).c_str(), + format_size(free_raw-free_down_raw).c_str(), + format_size(total_raw-down_raw).c_str(), + (down_raw > 0 ? (", "+format_size(down_raw)+" down").c_str() : ""), + format_size(agg_stats["object_counts"]["clean"].uint64_value() * object_size).c_str(), more_states.c_str(), + pools_active, pool_count, + pgs_by_state_str.c_str(), + recovery_io.size() > 0 ? " " : "", + format_size(agg_stats["op_stats"]["primary_read"]["bps"].uint64_value()).c_str(), + format_size(agg_stats["op_stats"]["primary_read"]["iops"].uint64_value(), true).c_str(), + format_size(agg_stats["op_stats"]["primary_write"]["bps"].uint64_value()).c_str(), + format_size(agg_stats["op_stats"]["primary_write"]["iops"].uint64_value(), true).c_str(), + recovery_io.c_str() + ); + state = 100; + } +}; + +std::function cli_tool_t::start_status(json11::Json cfg) +{ + json11::Json::array cmd = cfg["command"].array_items(); + auto printer = new status_printer_t(); + printer->parent = this; + return [printer]() + { + printer->loop(); + if (printer->is_done()) + { + delete printer; + return true; + } + return false; + }; +} diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index 7681e30e..2ffc5123 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -64,6 +64,42 @@ void etcd_state_client_t::etcd_txn_slow(json11::Json txn, std::function etcd_state_client_t::get_addresses() +{ + auto addrs = etcd_local; + addrs.insert(addrs.end(), etcd_addresses.begin(), etcd_addresses.end()); + return addrs; +} + +void etcd_state_client_t::etcd_call_oneshot(std::string etcd_address, std::string api, json11::Json payload, + int timeout, std::function callback) +{ + std::string etcd_api_path; + int pos = etcd_address.find('/'); + if (pos >= 0) + { + etcd_api_path = etcd_address.substr(pos); + etcd_address = etcd_address.substr(0, pos); + } + std::string req = payload.dump(); + req = "POST "+etcd_api_path+api+" HTTP/1.1\r\n" + "Host: "+etcd_address+"\r\n" + "Content-Type: application/json\r\n" + "Content-Length: "+std::to_string(req.size())+"\r\n" + "Connection: close\r\n" + "\r\n"+req; + auto http_cli = http_init(tfd); + auto cb = [this, http_cli, callback](const http_response_t *response) + { + std::string err; + json11::Json data; + response->parse_json_response(err, data); + callback(err, data); + http_close(http_cli); + }; + http_request(http_cli, etcd_address, req, { .timeout = timeout }, cb); +} + void etcd_state_client_t::etcd_call(std::string api, json11::Json payload, int timeout, int retries, int interval, std::function callback) { diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index 2cb53d24..6ebc2ae7 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -112,6 +112,8 @@ public: json11::Json::object serialize_inode_cfg(inode_config_t *cfg); etcd_kv_t parse_etcd_kv(const json11::Json & kv_json); + std::vector get_addresses(); + void etcd_call_oneshot(std::string etcd_address, std::string api, json11::Json payload, int timeout, std::function callback); void etcd_call(std::string api, json11::Json payload, int timeout, int retries, int interval, std::function callback); void etcd_txn(json11::Json txn, int timeout, int retries, int interval, std::function callback); void etcd_txn_slow(json11::Json txn, std::function callback); diff --git a/src/http_client.cpp b/src/http_client.cpp index b255c603..ffd36ed4 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -342,6 +342,8 @@ void http_co_t::handle_events() } else if (epoll_events & (EPOLLRDHUP|EPOLLERR)) { + if (state == HTTP_CO_HEADERS_RECEIVED) + std::swap(parsed.body, response); close_connection(); run_cb_and_clear(); break; @@ -465,6 +467,8 @@ again: { // < 0 means error, 0 means EOF epoll_events = epoll_events & ~EPOLLIN; + if (state == HTTP_CO_HEADERS_RECEIVED) + std::swap(parsed.body, response); close_connection(); if (res < 0) parsed = { .error = std::string("recvmsg: ")+strerror(-res) };