Fix client failover in case of etcd shutdown or crash

test-assert
Vitaliy Filippov 2021-12-01 00:33:02 +03:00
parent cac6a1d8d1
commit 5859f913fc
4 changed files with 78 additions and 13 deletions

View File

@ -64,6 +64,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_hook(changes); };
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
st_cli.on_reload_hook = [this]() { st_cli.load_global_config(); };
st_cli.parse_config(config);
st_cli.load_global_config();

View File

@ -17,6 +17,11 @@ etcd_state_client_t::~etcd_state_client_t()
}
watches.clear();
etcd_watches_initialised = -1;
if (ws_keepalive_timer >= 0)
{
tfd->clear_timer(ws_keepalive_timer);
ws_keepalive_timer = -1;
}
#ifndef __MOCK__
if (etcd_watch_ws)
{
@ -192,11 +197,18 @@ void etcd_state_client_t::start_etcd_watcher()
etcd_address = etcd_address.substr(0, pos);
}
etcd_watches_initialised = 0;
ws_alive = 1;
if (etcd_watch_ws)
{
etcd_watch_ws->close();
etcd_watch_ws = NULL;
}
etcd_watch_ws = open_websocket(tfd, etcd_address, etcd_api_path+"/watch", ETCD_SLOW_TIMEOUT,
[this, cur_addr = selected_etcd_address](const http_response_t *msg)
{
if (msg->body.length())
{
ws_alive = 1;
std::string json_err;
json11::Json data = json11::Json::parse(msg->body, json_err);
if (json_err != "")
@ -215,17 +227,32 @@ void etcd_state_client_t::start_etcd_watcher()
if (data["result"]["compact_revision"].uint64_value())
{
// we may miss events if we proceed
// FIXME: reload state and continue when used inside cluster_client
// so we should restart from the beginning if we can
if (on_reload_hook != NULL)
{
fprintf(stderr, "Revisions before %lu were compacted by etcd, reloading state\n",
data["result"]["compact_revision"].uint64_value());
etcd_watch_ws->close();
etcd_watch_ws = NULL;
etcd_watch_revision = 0;
on_reload_hook();
}
else
{
fprintf(stderr, "Revisions before %lu were compacted by etcd, exiting\n",
data["result"]["compact_revision"].uint64_value());
exit(1);
}
}
else
{
fprintf(stderr, "Watch canceled by etcd, reason: %s, exiting\n", data["result"]["cancel_reason"].string_value().c_str());
exit(1);
}
}
if (etcd_watches_initialised == 4)
{
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value()+1;
addresses_to_try.clear();
}
// First gather all changes into a hash to remove multiple overwrites
@ -256,7 +283,9 @@ void etcd_state_client_t::start_etcd_watcher()
if (msg->eof)
{
if (cur_addr == selected_etcd_address)
{
selected_etcd_address = "";
}
etcd_watch_ws = NULL;
if (etcd_watches_initialised == 0)
{
@ -277,7 +306,7 @@ void etcd_state_client_t::start_etcd_watcher()
{ "create_request", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/config/") },
{ "range_end", base64_encode(etcd_prefix+"/config0") },
{ "start_revision", etcd_watch_revision+1 },
{ "start_revision", etcd_watch_revision },
{ "watch_id", ETCD_CONFIG_WATCH_ID },
{ "progress_notify", true },
} }
@ -286,7 +315,7 @@ void etcd_state_client_t::start_etcd_watcher()
{ "create_request", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/osd/state/") },
{ "range_end", base64_encode(etcd_prefix+"/osd/state0") },
{ "start_revision", etcd_watch_revision+1 },
{ "start_revision", etcd_watch_revision },
{ "watch_id", ETCD_OSD_STATE_WATCH_ID },
{ "progress_notify", true },
} }
@ -295,7 +324,7 @@ void etcd_state_client_t::start_etcd_watcher()
{ "create_request", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/pg/state/") },
{ "range_end", base64_encode(etcd_prefix+"/pg/state0") },
{ "start_revision", etcd_watch_revision+1 },
{ "start_revision", etcd_watch_revision },
{ "watch_id", ETCD_PG_STATE_WATCH_ID },
{ "progress_notify", true },
} }
@ -304,11 +333,34 @@ void etcd_state_client_t::start_etcd_watcher()
{ "create_request", json11::Json::object {
{ "key", base64_encode(etcd_prefix+"/pg/history/") },
{ "range_end", base64_encode(etcd_prefix+"/pg/history0") },
{ "start_revision", etcd_watch_revision+1 },
{ "start_revision", etcd_watch_revision },
{ "watch_id", ETCD_PG_HISTORY_WATCH_ID },
{ "progress_notify", true },
} }
}).dump());
if (ws_keepalive_timer < 0)
{
ws_keepalive_timer = tfd->set_timer(ETCD_KEEPALIVE_TIMEOUT, true, [this](int)
{
if (!etcd_watch_ws)
{
// Do nothing
}
else if (!ws_alive)
{
etcd_watch_ws->close();
etcd_watch_ws = NULL;
start_etcd_watcher();
}
else
{
ws_alive = 0;
etcd_watch_ws->post_message(WS_TEXT, json11::Json(json11::Json::object {
{ "progress_request", json11::Json::object { } }
}).dump());
}
});
}
}
void etcd_state_client_t::load_global_config()
@ -406,7 +458,7 @@ void etcd_state_client_t::load_pgs()
}
if (!etcd_watch_revision)
{
etcd_watch_revision = data["header"]["revision"].uint64_value();
etcd_watch_revision = data["header"]["revision"].uint64_value()+1;
}
for (auto & res: data["responses"].array_items())
{

View File

@ -15,6 +15,7 @@
#define MAX_ETCD_ATTEMPTS 5
#define ETCD_SLOW_TIMEOUT 5000
#define ETCD_QUICK_TIMEOUT 1000
#define ETCD_KEEPALIVE_TIMEOUT 30000
#define DEFAULT_BLOCK_SIZE 128*1024
@ -82,6 +83,8 @@ protected:
std::vector<std::string> addresses_to_try;
std::vector<inode_watch_t*> watches;
websocket_t *etcd_watch_ws = NULL;
int ws_keepalive_timer = -1;
int ws_alive = 0;
uint64_t bs_block_size = DEFAULT_BLOCK_SIZE;
void add_etcd_url(std::string);
void pick_next_etcd();
@ -103,6 +106,7 @@ public:
std::function<void(bool)> on_load_pgs_hook;
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
std::function<void(osd_num_t)> on_change_osd_state_hook;
std::function<void()> on_reload_hook;
json11::Json::object serialize_inode_cfg(inode_config_t *cfg);
etcd_kv_t parse_etcd_kv(const json11::Json & kv_json);

View File

@ -451,7 +451,11 @@ bool http_co_t::handle_read()
}
if (want_streaming && parsed.body.size() > 0)
{
if (!ended)
{
// Don't deliver additional events after close()
callback(&parsed);
}
parsed.body = "";
}
}
@ -459,7 +463,11 @@ bool http_co_t::handle_read()
{
while (ws_parse_frame(response, parsed.ws_msg_type, parsed.body))
{
if (!ended)
{
// Don't deliver additional events after close()
callback(&parsed);
}
parsed.body = "";
}
}