From 5859f913fc613db814e30d977c67412c10598e75 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 1 Dec 2021 00:33:02 +0300 Subject: [PATCH] Fix client failover in case of etcd shutdown or crash --- src/cluster_client.cpp | 1 + src/etcd_state_client.cpp | 74 +++++++++++++++++++++++++++++++++------ src/etcd_state_client.h | 4 +++ src/http_client.cpp | 12 +++++-- 4 files changed, 78 insertions(+), 13 deletions(-) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 45688c24..e707b129 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -64,6 +64,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); }; st_cli.on_change_hook = [this](std::map & changes) { on_change_hook(changes); }; st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); }; + st_cli.on_reload_hook = [this]() { st_cli.load_global_config(); }; st_cli.parse_config(config); st_cli.load_global_config(); diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index b16ff97b..3d18b959 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -17,6 +17,11 @@ etcd_state_client_t::~etcd_state_client_t() } watches.clear(); etcd_watches_initialised = -1; + if (ws_keepalive_timer >= 0) + { + tfd->clear_timer(ws_keepalive_timer); + ws_keepalive_timer = -1; + } #ifndef __MOCK__ if (etcd_watch_ws) { @@ -192,11 +197,18 @@ void etcd_state_client_t::start_etcd_watcher() etcd_address = etcd_address.substr(0, pos); } etcd_watches_initialised = 0; + ws_alive = 1; + if (etcd_watch_ws) + { + etcd_watch_ws->close(); + etcd_watch_ws = NULL; + } etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT, [this, cur_addr = selected_etcd_address](const http_response_t *msg) { if (msg->body.length()) { + ws_alive = 1; std::string json_err; json11::Json data = json11::Json::parse(msg->body, json_err); if (json_err != "") @@ -215,17 +227,32 @@ void etcd_state_client_t::start_etcd_watcher() if (data["result"]["compact_revision"].uint64_value()) { // we may miss events if we proceed - // FIXME: reload state and continue when used inside cluster_client - fprintf(stderr, "Revisions before %lu were compacted by etcd, exiting\n", - data["result"]["compact_revision"].uint64_value()); + // so we should restart from the beginning if we can + if (on_reload_hook != NULL) + { + fprintf(stderr, "Revisions before %lu were compacted by etcd, reloading state\n", + data["result"]["compact_revision"].uint64_value()); + etcd_watch_ws->close(); + etcd_watch_ws = NULL; + etcd_watch_revision = 0; + on_reload_hook(); + } + else + { + fprintf(stderr, "Revisions before %lu were compacted by etcd, exiting\n", + data["result"]["compact_revision"].uint64_value()); + exit(1); + } + } + else + { + fprintf(stderr, "Watch canceled by etcd, reason: %s, exiting\n", data["result"]["cancel_reason"].string_value().c_str()); exit(1); } - fprintf(stderr, "Watch canceled by etcd, reason: %s, exiting\n", data["result"]["cancel_reason"].string_value().c_str()); - exit(1); } if (etcd_watches_initialised == 4) { - etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); + etcd_watch_revision = data["result"]["header"]["revision"].uint64_value()+1; addresses_to_try.clear(); } // First gather all changes into a hash to remove multiple overwrites @@ -256,7 +283,9 @@ void etcd_state_client_t::start_etcd_watcher() if (msg->eof) { if (cur_addr == selected_etcd_address) + { selected_etcd_address = ""; + } etcd_watch_ws = NULL; if (etcd_watches_initialised == 0) { @@ -277,7 +306,7 @@ void etcd_state_client_t::start_etcd_watcher() { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/config/") }, { "range_end", base64_encode(etcd_prefix+"/config0") }, - { "start_revision", etcd_watch_revision+1 }, + { "start_revision", etcd_watch_revision }, { "watch_id", ETCD_CONFIG_WATCH_ID }, { "progress_notify", true }, } } @@ -286,7 +315,7 @@ void etcd_state_client_t::start_etcd_watcher() { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/osd/state/") }, { "range_end", base64_encode(etcd_prefix+"/osd/state0") }, - { "start_revision", etcd_watch_revision+1 }, + { "start_revision", etcd_watch_revision }, { "watch_id", ETCD_OSD_STATE_WATCH_ID }, { "progress_notify", true }, } } @@ -295,7 +324,7 @@ void etcd_state_client_t::start_etcd_watcher() { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/pg/state/") }, { "range_end", base64_encode(etcd_prefix+"/pg/state0") }, - { "start_revision", etcd_watch_revision+1 }, + { "start_revision", etcd_watch_revision }, { "watch_id", ETCD_PG_STATE_WATCH_ID }, { "progress_notify", true }, } } @@ -304,11 +333,34 @@ void etcd_state_client_t::start_etcd_watcher() { "create_request", json11::Json::object { { "key", base64_encode(etcd_prefix+"/pg/history/") }, { "range_end", base64_encode(etcd_prefix+"/pg/history0") }, - { "start_revision", etcd_watch_revision+1 }, + { "start_revision", etcd_watch_revision }, { "watch_id", ETCD_PG_HISTORY_WATCH_ID }, { "progress_notify", true }, } } }).dump()); + if (ws_keepalive_timer < 0) + { + ws_keepalive_timer = tfd->set_timer(ETCD_KEEPALIVE_TIMEOUT, true, [this](int) + { + if (!etcd_watch_ws) + { + // Do nothing + } + else if (!ws_alive) + { + etcd_watch_ws->close(); + etcd_watch_ws = NULL; + start_etcd_watcher(); + } + else + { + ws_alive = 0; + etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object { + { "progress_request", json11::Json::object { } } + }).dump()); + } + }); + } } void etcd_state_client_t::load_global_config() @@ -406,7 +458,7 @@ void etcd_state_client_t::load_pgs() } if (!etcd_watch_revision) { - etcd_watch_revision = data["header"]["revision"].uint64_value(); + etcd_watch_revision = data["header"]["revision"].uint64_value()+1; } for (auto & res: data["responses"].array_items()) { diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index d9e3f9f5..fbd96c75 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -15,6 +15,7 @@ #define MAX_ETCD_ATTEMPTS 5 #define ETCD_SLOW_TIMEOUT 5000 #define ETCD_QUICK_TIMEOUT 1000 +#define ETCD_KEEPALIVE_TIMEOUT 30000 #define DEFAULT_BLOCK_SIZE 128*1024 @@ -82,6 +83,8 @@ protected: std::vector addresses_to_try; std::vector watches; websocket_t *etcd_watch_ws = NULL; + int ws_keepalive_timer = -1; + int ws_alive = 0; uint64_t bs_block_size = DEFAULT_BLOCK_SIZE; void add_etcd_url(std::string); void pick_next_etcd(); @@ -103,6 +106,7 @@ public: std::function on_load_pgs_hook; std::function on_change_pg_history_hook; std::function on_change_osd_state_hook; + std::function on_reload_hook; json11::Json::object serialize_inode_cfg(inode_config_t *cfg); etcd_kv_t parse_etcd_kv(const json11::Json & kv_json); diff --git a/src/http_client.cpp b/src/http_client.cpp index b99c011a..7b23122f 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -451,7 +451,11 @@ bool http_co_t::handle_read() } if (want_streaming && parsed.body.size() > 0) { - callback(&parsed); + if (!ended) + { + // Don't deliver additional events after close() + callback(&parsed); + } parsed.body = ""; } } @@ -459,7 +463,11 @@ bool http_co_t::handle_read() { while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body)) { - callback(&parsed); + if (!ended) + { + // Don't deliver additional events after close() + callback(&parsed); + } parsed.body = ""; } }