Exit if unable to restart watches

FIXME: It's probably not OK for the client to exit in this case
test-assert
Vitaliy Filippov 2021-11-28 01:43:31 +03:00
parent a8f5c71ae8
commit 7a0b5212fe
2 changed files with 33 additions and 6 deletions

View File

@ -341,7 +341,7 @@ class Mon
this.etcd_start_timeout = (config.etcd_start_timeout || 5) * 1000; this.etcd_start_timeout = (config.etcd_start_timeout || 5) * 1000;
this.state = JSON.parse(JSON.stringify(this.constructor.etcd_tree)); this.state = JSON.parse(JSON.stringify(this.constructor.etcd_tree));
this.signals_set = false; this.signals_set = false;
this.on_stop_cb = () => this.on_stop().catch(console.error); this.on_stop_cb = () => this.on_stop(0).catch(console.error);
} }
parse_etcd_addresses(addrs) parse_etcd_addresses(addrs)
@ -530,12 +530,25 @@ class Mon
catch (e) catch (e)
{ {
} }
if (!data || !data.result || !data.result.events) if (!data || !data.result)
{ {
if (!data || !data.result || !data.result.watch_id) console.error('Unknown message received from watch websocket: '+msg);
}
else if (data.result.canceled)
{
// etcd watch canceled
if (data.result.compact_revision)
{ {
console.error('Garbage received from watch websocket: '+msg); // we may miss events if we proceed
console.error('Revisions before '+data.result.compact_revision+' were compacted by etcd, exiting');
this.on_stop(1);
} }
console.error('Watch canceled by etcd, reason: '+data.result.cancel_reason+', exiting');
this.on_stop(1);
}
else if (data.result.created)
{
// etcd watch created
} }
else else
{ {
@ -639,11 +652,11 @@ class Mon
} }
} }
async on_stop() async on_stop(status)
{ {
clearInterval(this.lease_timer); clearInterval(this.lease_timer);
await this.etcd_call('/lease/revoke', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries); await this.etcd_call('/lease/revoke', { ID: this.etcd_lease_id }, this.config.etcd_mon_timeout, this.config.etcd_mon_retries);
process.exit(0); process.exit(status);
} }
async become_master() async become_master()

View File

@ -209,6 +209,20 @@ void etcd_state_client_t::start_etcd_watcher()
{ {
etcd_watches_initialised++; etcd_watches_initialised++;
} }
if (data["result"]["canceled"].bool_value())
{
// etcd watch canceled, maybe because the revision was compacted
if (data["result"]["compact_revision"].uint64_value())
{
// we may miss events if we proceed
// FIXME: reload state and continue when used inside cluster_client
fprintf(stderr, "Revisions before %lu were compacted by etcd, exiting\n",
data["result"]["compact_revision"].uint64_value());
exit(1);
}
fprintf(stderr, "Watch canceled by etcd, reason: %s, exiting\n", data["result"]["cancel_reason"].string_value().c_str());
exit(1);
}
if (etcd_watches_initialised == 4) if (etcd_watches_initialised == 4)
{ {
etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); etcd_watch_revision = data["result"]["header"]["revision"].uint64_value();