diff --git a/mon/mon.js b/mon/mon.js index 3e198ad2..68b1b5d8 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -342,6 +342,9 @@ class Mon this.etcd_start_timeout = (config.etcd_start_timeout || 5) * 1000; this.state = JSON.parse(JSON.stringify(this.constructor.etcd_tree)); this.signals_set = false; + this.ws = null; + this.ws_alive = false; + this.ws_keepalive_timer = null; this.on_stop_cb = () => this.on_stop(0).catch(console.error); } @@ -462,8 +465,20 @@ class Mon restart_watcher(cur_addr) { + if (this.ws) + { + this.ws.close(); + this.ws = null; + } + if (this.ws_keepalive_timer) + { + clearInterval(this.ws_keepalive_timer); + this.ws_keepalive_timer = null; + } if (this.selected_etcd_url == cur_addr) + { this.selected_etcd_url = null; + } this.start_watcher(this.config.etcd_mon_retries).catch(this.die); } @@ -483,6 +498,7 @@ class Mon const timer_id = setTimeout(() => { this.ws.close(); + this.ws = null; ok(false); }, this.config.etcd_mon_timeout); this.ws = new WebSocket(base+'/watch'); @@ -511,6 +527,20 @@ class Mon this.die('Failed to open etcd watch websocket'); } const cur_addr = this.selected_etcd_url; + this.ws_alive = true; + this.ws_keepalive_timer = setInterval(() => + { + if (this.ws_alive) + { + this.ws_alive = false; + this.ws.send(JSON.stringify({ progress_request: {} })); + } + else + { + console.log('etcd websocket timed out, restarting it'); + this.restart_watcher(cur_addr); + } + }, (Number(this.config.etcd_keepalive_interval) || 30)*1000); this.ws.on('error', () => this.restart_watcher(cur_addr)); this.ws.send(JSON.stringify({ create_request: { @@ -523,6 +553,7 @@ class Mon })); this.ws.on('message', (msg) => { + this.ws_alive = true; let data; try {