From ad577c4aac09922a880e1a4b4e2e04b2c49080bb Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 9 Mar 2021 01:17:42 +0300 Subject: [PATCH] Add PING operation and timeouts to detect OSD failures when a host goes down --- src/cluster_client.cpp | 19 ++------- src/cluster_client.h | 1 + src/messenger.cpp | 92 ++++++++++++++++++++++++++++++++++++++++++ src/messenger.h | 8 ++++ src/osd.cpp | 16 ++++---- src/osd_ops.cpp | 1 + src/osd_ops.h | 3 +- 7 files changed, 116 insertions(+), 24 deletions(-) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 1095bdaf..9c48ecc9 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -8,13 +8,11 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd { this->ringloop = ringloop; this->tfd = tfd; - - log_level = config["log_level"].int64_value(); + this->config = config; msgr.osd_num = 0; msgr.tfd = tfd; msgr.ringloop = ringloop; - msgr.log_level = log_level; msgr.repeer_pgs = [this](osd_num_t peer_osd) { if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end()) @@ -67,8 +65,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd msgr.stop_client(op->peer_fd); delete op; }; - msgr.use_sync_send_recv = config["use_sync_send_recv"].bool_value() || - config["use_sync_send_recv"].uint64_value(); + msgr.init(); st_cli.tfd = tfd; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; @@ -185,16 +182,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) { up_wait_retry_interval = 50; } - msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value(); - if (!msgr.peer_connect_interval) - { - msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; - } - msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value(); - if (!msgr.peer_connect_timeout) - { - msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; - } + msgr.parse_config(config); + msgr.parse_config(this->config); st_cli.load_pgs(); } diff --git a/src/cluster_client.h b/src/cluster_client.h index a46daad5..df263ebe 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -82,6 +82,7 @@ class cluster_client_t public: etcd_state_client_t st_cli; osd_messenger_t msgr; + json11::Json config; cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd, json11::Json & config); ~cluster_client_t(); diff --git a/src/messenger.cpp b/src/messenger.cpp index d2f08a49..9e6588e1 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -26,14 +26,106 @@ osd_op_t::~osd_op_t() } } +void osd_messenger_t::init() +{ + keepalive_timer_id = tfd->set_timer(1000, true, [this](int) + { + for (auto cl_it = clients.begin(); cl_it != clients.end();) + { + auto cl = (cl_it++)->second; + if (!cl->osd_num) + { + // Do not run keepalive on regular clients + continue; + } + if (cl->ping_time_remaining > 0) + { + cl->ping_time_remaining--; + if (!cl->ping_time_remaining) + { + // Ping timed out, stop the client + stop_client(cl->peer_fd, true); + } + } + else if (cl->idle_time_remaining > 0) + { + cl->idle_time_remaining--; + if (!cl->idle_time_remaining) + { + // Connection is idle for , send ping + osd_op_t *op = new osd_op_t(); + op->op_type = OSD_OP_OUT; + op->peer_fd = cl->peer_fd; + op->req = (osd_any_op_t){ + .hdr = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_PING, + }, + }; + op->callback = [this, cl](osd_op_t *op) + { + int fail_fd = (op->reply.hdr.retval != 0 ? op->peer_fd : -1); + cl->ping_time_remaining = 0; + delete op; + if (fail_fd >= 0) + { + stop_client(fail_fd, true); + } + }; + outbox_push(op); + cl->ping_time_remaining = osd_ping_timeout; + cl->idle_time_remaining = osd_idle_timeout; + } + } + else + { + cl->idle_time_remaining = osd_idle_timeout; + } + } + }); +} + osd_messenger_t::~osd_messenger_t() { + if (keepalive_timer_id >= 0) + { + tfd->clear_timer(keepalive_timer_id); + keepalive_timer_id = -1; + } while (clients.size() > 0) { stop_client(clients.begin()->first, true); } } +void osd_messenger_t::parse_config(const json11::Json & config) +{ + this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() || + config["use_sync_send_recv"].uint64_value(); + this->peer_connect_interval = config["peer_connect_interval"].uint64_value(); + if (!this->peer_connect_interval) + { + this->peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; + } + this->peer_connect_timeout = config["peer_connect_timeout"].uint64_value(); + if (!this->peer_connect_timeout) + { + this->peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; + } + this->osd_idle_timeout = config["osd_idle_timeout"].uint64_value(); + if (!this->osd_idle_timeout) + { + this->osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT; + } + this->osd_ping_timeout = config["osd_ping_timeout"].uint64_value(); + if (!this->osd_ping_timeout) + { + this->osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT; + } + this->log_level = config["log_level"].uint64_value(); +} + void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state) { if (wanted_peers.find(peer_osd) == wanted_peers.end()) diff --git a/src/messenger.h b/src/messenger.h index 54a9af55..3a13efee 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -34,6 +34,7 @@ #define DEFAULT_PEER_CONNECT_INTERVAL 5 #define DEFAULT_PEER_CONNECT_TIMEOUT 5 +#define DEFAULT_OSD_PING_TIMEOUT 5 // Kind of a vector with small-list-optimisation struct osd_op_buf_list_t @@ -198,6 +199,8 @@ struct osd_client_t int peer_fd; int peer_state; int connect_timeout_id = -1; + int ping_time_remaining = 0; + int idle_time_remaining = 0; osd_num_t osd_num = 0; void *in_buf = NULL; @@ -251,6 +254,7 @@ struct osd_messenger_t { timerfd_manager_t *tfd; ring_loop_t *ringloop; + int keepalive_timer_id = -1; // osd_num_t is only for logging and asserts osd_num_t osd_num; @@ -258,6 +262,8 @@ struct osd_messenger_t int receive_buffer_size = 64*1024; int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; + int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT; + int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT; int log_level = 0; bool use_sync_send_recv = false; @@ -274,6 +280,8 @@ struct osd_messenger_t osd_op_stats_t stats; public: + void init(); + void parse_config(const json11::Json & config); void connect_peer(uint64_t osd_num, json11::Json peer_state); void stop_client(int peer_fd, bool force = false); void outbox_push(osd_op_t *cur_op); diff --git a/src/osd.cpp b/src/osd.cpp index 16c9ac85..14a2a8a2 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -37,6 +37,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo c_cli.ringloop = this->ringloop; c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); }; c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); }; + c_cli.init(); init_cluster(); @@ -100,14 +101,7 @@ void osd_t::parse_config(blockstore_config_t & config) slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10); if (!slow_log_interval) slow_log_interval = 10; - c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); - if (!c_cli.peer_connect_interval) - c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; - c_cli.peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10); - if (!c_cli.peer_connect_timeout) - c_cli.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; - log_level = strtoull(config["log_level"].c_str(), NULL, 10); - c_cli.log_level = log_level; + c_cli.parse_config(json_config); } void osd_t::bind_socket() @@ -211,6 +205,12 @@ void osd_t::exec_op(osd_op_t *cur_op) finish_op(cur_op, -EINVAL); return; } + if (cur_op->req.hdr.opcode == OSD_OP_PING) + { + // Pong + finish_op(cur_op, 0); + return; + } if (readonly && cur_op->req.hdr.opcode != OSD_OP_SEC_READ && cur_op->req.hdr.opcode != OSD_OP_SEC_LIST && diff --git a/src/osd_ops.cpp b/src/osd_ops.cpp index 3689cb8f..0d8b1214 100644 --- a/src/osd_ops.cpp +++ b/src/osd_ops.cpp @@ -19,4 +19,5 @@ const char* osd_op_names[] = { "primary_write", "primary_sync", "primary_delete", + "ping", }; diff --git a/src/osd_ops.h b/src/osd_ops.h index 2ddad111..f429ecdf 100644 --- a/src/osd_ops.h +++ b/src/osd_ops.h @@ -27,7 +27,8 @@ #define OSD_OP_WRITE 12 #define OSD_OP_SYNC 13 #define OSD_OP_DELETE 14 -#define OSD_OP_MAX 14 +#define OSD_OP_PING 15 +#define OSD_OP_MAX 15 // Alignment & limit for read/write operations #ifndef MEM_ALIGNMENT #define MEM_ALIGNMENT 512