React to down OSDs instantly, set timer to recheck PGs after <osd_out_time>

Vitaliy Filippov 2020-09-13 12:23:11 +03:00
parent 18692517be
commit ed26c33f85
1 changed files with 111 additions and 17 deletions

128
lp/mon.js
View File

@ -443,6 +443,7 @@ class Mon
levels.host = levels.host || 100;
levels.osd = levels.osd || 101;
const tree = { '': { children: [] } };
let up_osds = {};
for (const node_id in this.state.config.node_placement||{})
{
const node_cfg = this.state.config.node_placement[node_id];
@ -465,6 +466,11 @@ class Mon
let reweight = this.state.config.osd[osd_num] && Number(this.state.config.osd[osd_num].reweight);
if (reweight < 0 || isNaN(reweight))
reweight = 1;
if (this.state.osd.state[osd_num] && reweight > 0)
{
// React to down OSDs immediately
up_osds[osd_num] = true;
}
tree[osd_num] = tree[osd_num] || { id: osd_num, parent: stat.host };
tree[osd_num].level = 'osd';
tree[osd_num].size = reweight * stat.size / 1024 / 1024 / 1024 / 1024; // terabytes
@ -487,7 +493,7 @@ class Mon
tree[parent].children.push(tree[node_id]);
delete node_cfg.parent;
}
return LPOptimizer.flatten_tree(tree[''].children, levels, this.config.failure_domain, 'osd');
return { up_osds, osd_tree: LPOptimizer.flatten_tree(tree[''].children, levels, this.config.failure_domain, 'osd') };
}
async stop_all_pgs(pool_id)
@ -562,6 +568,8 @@ class Mon
});
for (let i = 0; i < new_pgs.length || i < prev_pgs.length; i++)
{
// FIXME: etcd has max_txn_ops limit, and it's 128 by default
// Sooo we probably want to change our storage scheme for PG histories...
request.compare.push({
key: b64(this.etcd_prefix+'/pg/history/'+pool_id+'/'+(i+1)),
target: 'MOD',
@ -649,8 +657,9 @@ class Mon
// Take configuration and state, check it against the stored configuration hash
// Recalculate PGs and save them to etcd if the configuration is changed
// FIXME: Also do not change anything if the distribution is good enough and no PGs are degraded
const { up_osds, osd_tree } = this.get_osd_tree();
const tree_cfg = {
osd_tree: this.get_osd_tree(),
osd_tree,
pools: this.state.config.pools,
};
const tree_hash = sha1hex(stableStringify(tree_cfg));
@ -706,32 +715,102 @@ class Mon
if (old_pg_count != optimize_result.int_pgs.length)
{
console.log(
`PG count for pool ${pool_id} (${pool_cfg.name || 'unnamed'}) `+
`changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}`
`PG count for pool ${pool_id} (${pool_cfg.name || 'unnamed'})`+
` changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}`
);
}
LPOptimizer.print_change_stats(optimize_result);
this.save_new_pgs_txn(etcd_request, pool_id, prev_pgs, optimize_result.int_pgs, pg_history);
}
this.state.config.pgs.hash = tree_hash;
etcd_request.compare.push(
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
);
etcd_request.success.push(
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } },
);
const res = await this.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0);
if (!res.succeeded)
await this.save_pg_config();
}
else
{
// Nothing changed, but we still want to check for down OSDs
let changed = false;
for (const pool_id in this.state.config.pools)
{
console.log('Someone changed PG configuration while we also tried to change it. Retrying in '+this.config.mon_change_timeout+' ms');
this.schedule_recheck();
return;
const pool_cfg = this.state.config.pools[pool_id];
if (!this.validate_pool_cfg(pool_id, pool_cfg, false))
{
continue;
}
for (const pg_num in ((this.state.config.pgs.items||{})[pool_id]||{})||{})
{
const pg_cfg = this.state.config.pgs.items[pool_id][pg];
if (!Number(pg_cfg.primary) || !up_osds[pg_cfg.primary])
{
const alive_set = pg_cfg.osd_set.filter(osd_num => up_osds[osd_num]);
const new_primary = alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0;
if (pg_cfg.primary != new_primary)
{
console.log(
`Moving pool ${pool_id} (${pool_cfg.name || 'unnamed'}) PG ${pg_num}`+
` primary OSD from ${pg_cfg.primary} to ${new_primary}`
);
changed = true;
pg_cfg.primary = new_primary;
}
}
}
}
if (changed)
{
await this.save_pg_config();
}
console.log('PG configuration successfully changed');
}
}
async save_pg_config(etcd_request = { compare: [], success: [] })
{
etcd_request.compare.push(
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
);
etcd_request.success.push(
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } },
);
const res = await this.etcd_call('/kv/txn', etcd_request, this.config.etcd_mon_timeout, 0);
if (!res.succeeded)
{
console.log('Someone changed PG configuration while we also tried to change it. Retrying in '+this.config.mon_change_timeout+' ms');
this.schedule_recheck();
return;
}
console.log('PG configuration successfully changed');
}
// Schedule next recheck at least at <unixtime>
schedule_next_recheck_at(unixtime)
{
this.next_recheck_at = !this.next_recheck_at || this.next_recheck_at > unixtime
? unixtime : this.next_recheck_at;
const now = Date.now()/1000;
if (this.next_recheck_timer)
{
clearTimeout(this.next_recheck_timer);
this.next_recheck_timer = null;
}
if (this.next_recheck_at < now)
{
this.next_recheck_at = 0;
this.schedule_recheck();
}
else
{
this.next_recheck_timer = setTimeout(() =>
{
this.next_recheck_timer = null;
this.next_recheck_at = 0;
this.schedule_recheck();
}, now-this.next_recheck_at);
}
}
// Schedule a recheck to run after a small timeout (1s)
// If already scheduled, cancel previous timer and schedule it again
// This is required for multiple change events to trigger at most 1 recheck in 1s
schedule_recheck()
{
if (this.recheck_timer)
@ -917,15 +996,30 @@ class Mon
this.state.config.global = this.state.config.global || {};
this.config = this.state.config.global;
this.check_config();
for (const osd_num in this.state.osd.stats)
{
// Recheck PGs <osd_out_time> later
this.schedule_next_recheck_at(
!this.state.osd.stats[osd_num] ? 0 : this.state.osd.stats[osd_num].time+this.config.osd_out_time
);
}
}
else if (key.join('/') === 'config/pools')
{
for (const pool_id in this.state.config.pools)
{
// Adjust pool configuration so PG distribution hash doesn't change on recheck()
const pool_cfg = this.state.config.pools[pool_id];
this.validate_pool_cfg(pool_id, pool_cfg, true);
}
}
else if (key[0] === 'osd' && key[1] === 'stats')
{
// Recheck PGs <osd_out_time> later
this.schedule_next_recheck_at(
!this.state.osd.stats[key[2]] ? 0 : this.state.osd.stats[key[2]].time+this.config.osd_out_time
);
}
}
async etcd_call(path, body, timeout, retries)