From 37b27c30254577bbd8980c7181859b343e309bd5 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 14 Apr 2020 14:37:50 +0300 Subject: [PATCH] Implement basic OSD status reporting to Consul --- blockstore.cpp | 5 ++ blockstore.h | 1 + blockstore_impl.h | 1 + osd.cpp | 68 +++++++++++++++++----- osd.h | 23 +++++--- osd_cluster.cpp | 141 ++++++++++++++++++++++++++++++++++++++++----- osd_peering.cpp | 7 +++ osd_peering_pg.cpp | 26 +++++++++ osd_peering_pg.h | 4 ++ osd_primary.cpp | 1 + osd_receive.cpp | 8 +-- osd_send.cpp | 14 +---- 12 files changed, 246 insertions(+), 53 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index 05c4c413..adc4e448 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -55,6 +55,11 @@ uint64_t blockstore_t::get_block_count() return impl->get_block_count(); } +uint64_t blockstore_t::get_free_block_count() +{ + return impl->get_free_block_count(); +} + uint32_t blockstore_t::get_disk_alignment() { return impl->get_disk_alignment(); diff --git a/blockstore.h b/blockstore.h index 3d99c29c..d97a67d1 100644 --- a/blockstore.h +++ b/blockstore.h @@ -176,6 +176,7 @@ public: // FIXME rename to object_size uint32_t get_block_size(); uint64_t get_block_count(); + uint64_t get_free_block_count(); uint32_t get_disk_alignment(); }; diff --git a/blockstore_impl.h b/blockstore_impl.h index 745ac261..a3914975 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -325,5 +325,6 @@ public: inline uint32_t get_block_size() { return block_size; } inline uint64_t get_block_count() { return block_count; } + inline uint64_t get_free_block_count() { return data_alloc->get_free_count(); } inline uint32_t get_disk_alignment() { return disk_alignment; } }; diff --git a/osd.cpp b/osd.cpp index ca0b9401..2fcaab56 100644 --- a/osd.cpp +++ b/osd.cpp @@ -37,7 +37,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo bind_socket(); - this->stats_tfd = new timerfd_interval(ringloop, 3, [this]() + this->stats_tfd = new timerfd_interval(ringloop, print_stats_interval, [this]() { print_stats(); }); @@ -51,12 +51,21 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo osd_t::~osd_t() { - delete stats_tfd; + if (stats_tfd) + { + delete stats_tfd; + stats_tfd = NULL; + } if (sync_tfd) { delete sync_tfd; sync_tfd = NULL; } + if (consul_tfd) + { + delete consul_tfd; + consul_tfd = NULL; + } ringloop->unregister_consumer(&consumer); close(epoll_fd); close(listen_fd); @@ -86,6 +95,13 @@ osd_op_t::~osd_op_t() void osd_t::parse_config(blockstore_config_t & config) { + consul_address = config["consul_address"]; + consul_prefix = config["consul_prefix"]; + if (consul_prefix == "") + consul_prefix = "microceph"; + consul_report_interval = strtoull(config["consul_report_interval"].c_str(), NULL, 10); + if (consul_report_interval <= 0) + consul_report_interval = 30; bind_address = config["bind_address"]; if (bind_address == "") bind_address = "0.0.0.0"; @@ -108,6 +124,9 @@ void osd_t::parse_config(blockstore_config_t & config) recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes") readonly = true; + print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10); + if (!print_stats_interval) + print_stats_interval = 3; } void osd_t::bind_socket() @@ -424,32 +443,49 @@ void osd_t::exec_op(osd_op_t *cur_op) } } +void osd_t::reset_stats() +{ + for (int p = 0; p < 2; p++) + { + for (int i = 0; i <= OSD_OP_MAX; i++) + { + if (op_stat_count[p][i] != 0) + { + op_stat_count[p][i] = 0; + op_stat_sum[p][i] = 0; + } + } + for (int i = 0; i <= OSD_OP_MAX; i++) + { + if (subop_stat_count[p][i] != 0) + { + subop_stat_count[p][i] = 0; + subop_stat_sum[p][i] = 0; + } + } + } +} + void osd_t::print_stats() { for (int i = 0; i <= OSD_OP_MAX; i++) { - if (op_stat_count[i] != 0) + if (op_stat_count[0][i] != op_stat_count[1][i]) { - printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], op_stat_sum[i]/op_stat_count[i]); - op_stat_count[i] = 0; - op_stat_sum[i] = 0; + uint64_t avg = (op_stat_sum[0][i] - op_stat_sum[1][i])/(op_stat_count[0][i] - op_stat_count[1][i]); + printf("avg latency for op %d (%s): %ld us\n", i, osd_op_names[i], avg); + op_stat_count[1][i] = op_stat_count[0][i]; + op_stat_sum[1][i] = op_stat_sum[0][i]; } } for (int i = 0; i <= OSD_OP_MAX; i++) { - if (subop_stat_count[i] != 0) + if (subop_stat_count[0][i] != subop_stat_count[1][i]) { - printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], subop_stat_sum[i]/subop_stat_count[i]); - subop_stat_count[i] = 0; - subop_stat_sum[i] = 0; + uint64_t avg = (subop_stat_sum[0][i] - subop_stat_sum[1][i])/(subop_stat_count[0][i] - subop_stat_count[1][i]); + printf("avg latency for subop %d (%s): %ld us\n", i, osd_op_names[i], avg); } } - if (send_stat_count != 0) - { - printf("avg latency to send stabilize subop: %ld us\n", send_stat_sum/send_stat_count); - send_stat_count = 0; - send_stat_sum = 0; - } if (incomplete_objects > 0) { printf("%lu object(s) incomplete\n", incomplete_objects); diff --git a/osd.h b/osd.h index c3fd1f85..d2ddd457 100644 --- a/osd.h +++ b/osd.h @@ -18,6 +18,7 @@ #include "timerfd_interval.h" #include "osd_ops.h" #include "osd_peering_pg.h" +#include "json11/json11.hpp" #define OSD_OP_IN 0 #define OSD_OP_OUT 1 @@ -190,7 +191,7 @@ class osd_t // config bool readonly = false; - std::string consul_address; + std::string consul_address, consul_host, consul_prefix = "microceph"; osd_num_t osd_num = 1; // OSD numbers start with 1 bool run_primary = false; std::vector peers; @@ -200,12 +201,15 @@ class osd_t int client_queue_depth = 128; bool allow_test_ops = true; int receive_buffer_size = 9000; + int print_stats_interval = 3; + int consul_report_interval = 30; int immediate_commit = IMMEDIATE_NONE; int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; // peer OSDs + std::vector bind_addresses; std::map osd_peer_fds; std::map pgs; std::set dirty_pgs; @@ -228,7 +232,7 @@ class osd_t uint32_t bs_block_size, bs_disk_alignment; uint64_t pg_stripe_size = 4*1024*1024; // 4 MB by default ring_loop_t *ringloop; - timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL; + timerfd_interval *stats_tfd = NULL, *sync_tfd = NULL, *consul_tfd = NULL; int wait_state = 0; int epoll_fd = 0; @@ -239,17 +243,20 @@ class osd_t std::unordered_map clients; std::vector read_ready_clients; std::vector write_ready_clients; - uint64_t op_stat_sum[OSD_OP_MAX+1] = { 0 }; - uint64_t op_stat_count[OSD_OP_MAX+1] = { 0 }; - uint64_t subop_stat_sum[OSD_OP_MAX+1] = { 0 }; - uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 }; - uint64_t send_stat_sum = 0; - uint64_t send_stat_count = 0; + + // op statistics + uint64_t op_stat_sum[2][OSD_OP_MAX+1] = { 0 }; + uint64_t op_stat_count[2][OSD_OP_MAX+1] = { 0 }; + uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 }; + uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 }; // methods void parse_config(blockstore_config_t & config); void bind_socket(); void print_stats(); + void reset_stats(); + json11::Json get_status(); + void report_status(); // event loop, socket read/write void loop(); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 08d848c6..0da3f7bb 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -1,9 +1,12 @@ #include #include +#include +#include + #include "osd.h" -int get_port(std::string & host) +static int extract_port(std::string & host) { int port = 0; int pos = 0; @@ -19,12 +22,129 @@ int get_port(std::string & host) return port; } +std::vector getifaddr_list() +{ + std::vector addresses; + ifaddrs *list, *ifa; + if (getifaddrs(&list) == -1) + { + throw std::runtime_error(std::string("getifaddrs: ") + strerror(errno)); + } + for (ifa = list; ifa != NULL; ifa = ifa->ifa_next) + { + if (!ifa->ifa_addr) + { + continue; + } + int family = ifa->ifa_addr->sa_family; + if ((family == AF_INET || family == AF_INET6) && + (ifa->ifa_flags & (IFF_UP | IFF_RUNNING | IFF_LOOPBACK)) == (IFF_UP | IFF_RUNNING)) + { + void *addr_ptr; + if (family == AF_INET) + addr_ptr = &((sockaddr_in *)ifa->ifa_addr)->sin_addr; + else + addr_ptr = &((sockaddr_in6 *)ifa->ifa_addr)->sin6_addr; + char addr[INET6_ADDRSTRLEN]; + if (!inet_ntop(family, addr_ptr, addr, INET6_ADDRSTRLEN)) + { + throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); + } + addresses.push_back(std::string(addr)); + } + } + freeifaddrs(list); + return addresses; +} + +json11::Json osd_t::get_status() +{ + json11::Json::object st; + st["state"] = "up"; + if (bind_address != "0.0.0.0") + st["addresses"] = { bind_address }; + else + { + if (bind_addresses.size() == 0) + bind_addresses = getifaddr_list(); + st["addresses"] = bind_addresses; + } + st["port"] = bind_port; + st["blockstore_enabled"] = bs ? true : false; + if (bs) + { + st["size"] = bs->get_block_count() * bs->get_block_size(); + st["free"] = bs->get_free_block_count() * bs->get_block_size(); + } + json11::Json::object pg_status; + for (auto & p: pgs) + { + auto & pg = p.second; + json11::Json::object pg_st; + json11::Json::array pg_state; + for (int i = 0; i < pg_state_bit_count; i++) + if (pg.state & pg_state_bits[i]) + pg_state.push_back(pg_state_names[i]); + pg_st["state"] = pg_state; + pg_st["object_count"] = pg.total_count; + pg_st["clean_count"] = pg.clean_count; + pg_st["misplaced_count"] = pg.misplaced_objects.size(); + pg_st["degraded_count"] = pg.degraded_objects.size(); + pg_st["incomplete_count"] = pg.incomplete_objects.size(); + pg_st["write_osd_set"] = pg.cur_set; + pg_status[std::to_string(pg.pg_num)] = pg_st; + } + st["pgs"] = pg_status; + json11::Json::object op_stats, subop_stats; + for (int i = 0; i <= OSD_OP_MAX; i++) + { + op_stats[osd_op_names[i]] = json11::Json::object { + { "count", op_stat_count[0][i] }, + { "sum", op_stat_sum[0][i] }, + }; + } + for (int i = 0; i <= OSD_OP_MAX; i++) + { + subop_stats[osd_op_names[i]] = json11::Json::object { + { "count", subop_stat_count[0][i] }, + { "sum", subop_stat_sum[0][i] }, + }; + } + st["op_latency"] = op_stats; + st["subop_latency"] = subop_stats; + return st; +} + +void osd_t::report_status() +{ + if (consul_host == "") + { + consul_host = consul_address; + extract_port(consul_host); + } + std::string st = get_status().dump(); + std::string req = "PUT /v1/kv/"+consul_prefix+"/osd/"+std::to_string(osd_num)+" HTTP/1.1\r\n"+ + "Host: "+consul_host+"\r\n"+ + "Content-Length: "+std::to_string(st.size())+"\r\n"+ + "Connection: close\r\n" + "\r\n"+st; + http_request(consul_address, req, [this](int err, std::string res) + { + int pos = res.find("\r\n\r\n"); + if (pos >= 0) + res = res.substr(pos+4); + if (err != 0 || res != "true") + printf("Error reporting state to Consul: code %d (%s), response text: %s\n", err, strerror(err), res.c_str()); + }); +} + struct http_co_t { osd_t *osd; std::string host; std::string request; - std::vector response; + std::string response; + std::vector rbuf; int st = 0; int peer_fd = -1; @@ -59,9 +179,7 @@ void osd_t::http_request(std::string host, std::string request, std::function= 0) { osd->epoll_handlers.erase(peer_fd); @@ -75,7 +193,7 @@ void http_co_t::resume() { if (st == 0) { - int port = get_port(host); + int port = extract_port(host); struct sockaddr_in addr; int r; if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) @@ -208,12 +326,13 @@ void http_co_t::resume() } else if (epoll_events & EPOLLIN) { - response.resize(received + 9000); + if (rbuf.size() != 9000) + rbuf.resize(9000); io_uring_sqe *sqe = osd->ringloop->get_sqe(); if (!sqe) return; ring_data_t* data = ((ring_data_t*)sqe->user_data); - iov = { .iov_base = response.data()+received, .iov_len = 9000 }; + iov = { .iov_base = rbuf.data(), .iov_len = 9000 }; msg.msg_iov = &iov; msg.msg_iovlen = 1; data->callback = [this](ring_data_t *data) @@ -238,14 +357,10 @@ void http_co_t::resume() delete this; return; } + response += std::string(rbuf.data(), cqe_res); received += cqe_res; st = 5; resume(); return; } } - -/*void osd_t::get_pgs() -{ - //consul_address -}*/ diff --git a/osd_peering.cpp b/osd_peering.cpp index 69a4f536..c14d516d 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -31,6 +31,13 @@ void osd_t::init_primary() pgs[1].print_state(); pg_count = 1; peering_state = OSD_CONNECTING_PEERS; + if (consul_address != "") + { + this->consul_tfd = new timerfd_interval(ringloop, consul_report_interval, [this]() + { + report_status(); + }); + } if (autosync_interval > 0) { this->sync_tfd = new timerfd_interval(ringloop, 3, [this]() diff --git a/osd_peering_pg.cpp b/osd_peering_pg.cpp index 0c39b01d..21d2a7d7 100644 --- a/osd_peering_pg.cpp +++ b/osd_peering_pg.cpp @@ -372,3 +372,29 @@ void pg_t::print_state() total_count ); } + +const int pg_state_bit_count = 10; + +const int pg_state_bits[10] = { + PG_OFFLINE, + PG_PEERING, + PG_INCOMPLETE, + PG_ACTIVE, + PG_DEGRADED, + PG_HAS_INCOMPLETE, + PG_HAS_DEGRADED, + PG_HAS_MISPLACED, + PG_HAS_UNCLEAN, +}; + +const char *pg_state_names[10] = { + "offline", + "peering", + "incomplete", + "active", + "degraded", + "has_incomplete", + "has_degraded", + "has_misplaced", + "has_unclean", +}; diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 61b92ade..c9593980 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -33,6 +33,10 @@ #define OBJ_NEEDS_ROLLBACK 0x20000 #define OBJ_BUGGY 0x80000 +extern const int pg_state_bits[10]; +extern const char *pg_state_names[10]; +extern const int pg_state_bit_count; + struct pg_obj_loc_t { uint64_t role; diff --git a/osd_primary.cpp b/osd_primary.cpp index f44e852c..3fda3d01 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -484,6 +484,7 @@ resume_5: { // Object is created pg.clean_count++; + pg.total_count++; } if (op_data->object_state) { diff --git a/osd_receive.cpp b/osd_receive.cpp index 3b9c3d0d..84ec1430 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -139,8 +139,8 @@ void osd_t::handle_finished_read(osd_client_t & cl) // Measure subop latency timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - subop_stat_count[request->req.hdr.opcode]++; - subop_stat_sum[request->req.hdr.opcode] += ( + subop_stat_count[0][request->req.hdr.opcode]++; + subop_stat_sum[0][request->req.hdr.opcode] += ( (tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000 ); @@ -242,8 +242,8 @@ void osd_t::handle_reply_hdr(osd_client_t *cl) // Measure subop latency timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - subop_stat_count[op->req.hdr.opcode]++; - subop_stat_sum[op->req.hdr.opcode] += ( + subop_stat_count[0][op->req.hdr.opcode]++; + subop_stat_sum[0][op->req.hdr.opcode] += ( (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000 ); diff --git a/osd_send.cpp b/osd_send.cpp index 56d79ca2..991c1fc3 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -39,8 +39,8 @@ bool osd_t::try_send(osd_client_t & cl) { // Measure execution latency timespec tv_end = cl.write_op->tv_send; - op_stat_count[cl.write_op->req.hdr.opcode]++; - op_stat_sum[cl.write_op->req.hdr.opcode] += ( + op_stat_count[0][cl.write_op->req.hdr.opcode]++; + op_stat_sum[0][cl.write_op->req.hdr.opcode] += ( (tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - cl.write_op->tv_begin.tv_nsec)/1000 ); @@ -101,16 +101,6 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd) if (cur_op->send_list.sent >= cur_op->send_list.count) { // Done - if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_STABILIZE) - { - timespec tv_end; - clock_gettime(CLOCK_REALTIME, &tv_end); - send_stat_count++; - send_stat_sum += ( - (tv_end.tv_sec - cl.write_op->tv_send.tv_sec)*1000000 + - (tv_end.tv_nsec - cl.write_op->tv_send.tv_nsec)/1000 - ); - } if (cur_op->op_type == OSD_OP_IN) { delete cur_op;