Implement stop_all_pgs()

trace-sqes
Vitaliy Filippov 2020-05-14 19:29:35 +03:00
parent 7bda66b866
commit 2c3e84cc41
1 changed files with 291 additions and 199 deletions

490
lp/mon.js
View File

@ -46,18 +46,13 @@ class Mon
this.etcd_urls.push(scheme+'://'+url);
}
this.etcd_prefix = config.etcd_prefix || '/rage';
if (!/^\/+/.exec(this.etcd_prefix))
{
this.etcd_prefix = '/' + this.etcd_prefix;
}
this.etcd_prefix = this.etcd_prefix.replace(/\/\/+/g, '/').replace(/^\/?(.*[^\/])\/?$/, '/$1');
this.etcd_start_timeout = (config.etcd_start_timeout || 5) * 1000;
this.data = JSON.parse(JSON.stringify(Mon.etcd_tree));
this.state = JSON.parse(JSON.stringify(Mon.etcd_tree));
}
async start()
{
await this.load_cluster_state();
return;
await this.load_config();
await this.get_lease();
await this.become_master();
@ -71,11 +66,12 @@ class Mon
const res = await this.etcd_call('/txn', { success: [
{ requestRange: { key: b64(this.etcd_prefix+'/config/global') } }
] }, this.etcd_start_timeout, -1);
this.config = this.parse_kv(res.responses[0].response_range.kvs[0]).value || {};
if (!(this.config instanceof Object) || this.config instanceof Array)
{
throw new Error(this.etcd_prefix+'/config/global is not a JSON object');
}
this.parse_kv(res.responses[0].response_range.kvs[0]);
this.check_config();
}
check_config()
{
this.config.etcd_mon_timeout = Number(this.config.etcd_mon_timeout) || 0;
if (this.config.etcd_mon_timeout <= 0)
{
@ -97,6 +93,11 @@ class Mon
{
this.config.osd_out_time = 30*60; // 30 minutes by default
}
this.config.max_osd_combinations = Number(this.config.max_osd_combinations) || 10000;
if (this.config.max_osd_combinations < 100)
{
this.config.max_osd_combinations = 100;
}
}
async start_watcher(retries)
@ -138,7 +139,7 @@ class Mon
create_request: {
key: b64(this.etcd_prefix+'/'),
range_end: b64(this.etcd_prefix+'0'),
start_revision: this.etcd_watch_revision,
start_revision: ''+this.etcd_watch_revision,
watch_id: 1,
},
}));
@ -158,22 +159,22 @@ class Mon
}
else
{
let changed = false;
console.log('Revision '+data.result.header.revision+' events: ');
for (const e of data.result.events)
{
this.parse_kv(e.kv);
const key = e.kv.key.substr(this.etcd_prefix.length);
if (key.substr(0, 11) != '/osd/stats/' && key.substr(0, 10) != '/pg/stats/')
{
changed = true;
}
console.log(e);
}
if (this.changeTimer)
if (changed)
{
clearTimeout(this.changeTimer);
this.changeTimer = null;
this.schedule_recheck();
}
this.changeTimer = setTimeout(() =>
{
this.changeTimer = null;
this.recheck_pgs().catch(console.error);
}, this.config.mon_change_timeout || 1000);
}
});
}
@ -223,19 +224,24 @@ class Mon
this.parse_kv(kv);
}
}
this.data = data;
this.data.config.placement_tree = this.data.config.placement_tree || {};
this.state = data;
}
all_osds()
{
return Object.keys(this.state.osd.stats);
}
get_osd_tree()
{
const levels = this.data.config.placement_tree.levels || {};
this.state.config.placement_tree = this.state.config.placement_tree||{};
const levels = this.state.config.placement_tree.levels||{};
levels.host = levels.host || 100;
levels.osd = levels.osd || 101;
const tree = { '': { children: [] } };
for (const node_id in this.data.config.placement_tree)
for (const node_id in this.state.config.placement_tree.nodes||{})
{
const node_cfg = this.data.config.placement_tree[node_id];
const node_cfg = this.state.config.placement_tree.nodes[node_id];
if (!node_id || /^\d/.exec(node_id) ||
!node_cfg.level || !levels[node_cfg.level])
{
@ -244,11 +250,12 @@ class Mon
}
tree[node_id] = { id: node_id, level: node_cfg.level, parent: node_cfg.parent, children: [] };
}
// This requires monitor system time to be in sync with OSD system times (at least to some extent)
const down_time = Date.now()/1000 - this.config.osd_out_time;
for (const osd_num of Object.keys(this.data.osd.stats).sort((a, b) => a - b))
for (const osd_num of this.all_osds().sort((a, b) => a - b))
{
const stat = this.data.osd.stats[osd_num];
if (stat.size && (this.data.osd.state[osd_num] || Number(stat.time) >= down_time))
const stat = this.state.osd.stats[osd_num];
if (stat.size && (this.state.osd.state[osd_num] || Number(stat.time) >= down_time))
{
// Numeric IDs are reserved for OSDs
tree[osd_num] = tree[osd_num] || { id: osd_num, parent: stat.host };
@ -273,7 +280,217 @@ class Mon
tree[parent].children.push(tree[node_id]);
delete node_cfg.parent;
}
return LPOptimizer.flatten_tree(tree[''].children, levels, this.data.config.failure_domain, 'osd');
return LPOptimizer.flatten_tree(tree[''].children, levels, this.state.config.failure_domain, 'osd');
}
async stop_all_pgs()
{
let has_online = false, paused = true;
for (const pg in this.state.config.pgs.items||{})
{
const cur_state = ((this.state.pg.state[pg]||{}).state||[]).join(',');
if (cur_state != '' && cur_state != 'offline')
{
has_online = true;
}
if (!this.state.config.pgs.items[pg].pause)
{
paused = false;
}
}
if (!paused)
{
console.log('Stopping all PGs before changing PG count');
const new_cfg = JSON.parse(JSON.stringify(this.state.config.pgs));
for (const pg in new_cfg.items)
{
new_cfg.items[pg].pause = true;
}
// Check that no OSDs change their state before we pause PGs
// Doing this we make sure that OSDs don't wake up in the middle of our "transaction"
// and can't see the old PG configuration
const checks = [];
for (const osd_num of this.all_osds())
{
const key = b64(this.etcd_prefix+'/osd/state/'+osd_num);
checks.push({ key, target: 'MOD', result: 'LESS', mod_revision: ''+this.etcd_watch_revision });
}
const res = await this.etcd_call('/txn', {
compare: [
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
...checks,
],
success: [
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(new_cfg)) } },
],
}, this.config.etcd_mon_timeout, 0);
if (!res.succeeded)
{
return false;
}
this.state.config.pgs = new_cfg;
}
return !has_online;
}
scale_pg_count(prev_pgs, pg_history, new_pg_count)
{
const old_pg_count = prev_pgs.length;
// Add all possibly intersecting PGs into the history of new PGs
if (!(new_pg_count % old_pg_count))
{
// New PG count is a multiple of the old PG count
const mul = (new_pg_count / old_pg_count);
for (let i = 0; i < new_pg_count; i++)
{
const old_i = Math.floor(new_pg_count / mul);
pg_history[i] = JSON.parse(JSON.stringify(this.state.pg.history[1+old_i]));
}
}
else if (!(old_pg_count % new_pg_count))
{
// Old PG count is a multiple of the new PG count
const mul = (old_pg_count / new_pg_count);
for (let i = 0; i < new_pg_count; i++)
{
pg_history[i] = {
osd_sets: [],
all_peers: [],
};
for (let j = 0; j < mul; j++)
{
pg_history[i].osd_sets.push(prev_pgs[i*mul]);
const hist = this.state.pg.history[1+i*mul+j];
if (hist && hist.osd_sets && hist.osd_sets.length)
{
Array.prototype.push.apply(pg_history[i].osd_sets, hist.osd_sets);
}
if (hist && hist.all_peers && hist.all_peers.length)
{
Array.prototype.push.apply(pg_history[i].all_peers, hist.all_peers);
}
}
}
}
else
{
// Any PG may intersect with any PG after non-multiple PG count change
// So, merge ALL PGs history
let all_sets = {};
let all_peers = {};
for (const pg of prev_pgs)
{
all_sets[pg.join(' ')] = pg;
}
for (const pg in this.state.pg.history)
{
const hist = this.state.pg.history[pg];
if (hist && hist.osd_sets)
{
for (const pg of hist.osd_sets)
{
all_sets[pg.join(' ')] = pg;
}
}
if (hist && hist.all_peers)
{
for (const osd_num of hist.all_peers)
{
all_peers[osd_num] = Number(osd_num);
}
}
}
all_sets = Object.values(all_sets);
all_peers = Object.values(all_peers);
for (let i = 0; i < new_pg_count; i++)
{
pg_history[i] = { osd_sets: all_sets, all_peers };
}
}
// Mark history keys for removed PGs as removed
for (let i = new_pg_count; i < old_pg_count; i++)
{
pg_history[i] = null;
}
if (old_pg_count < new_pg_count)
{
for (let i = new_pg_count-1; i >= 0; i--)
{
prev_pgs[i] = prev_pgs[Math.floor(i/new_pg_count*old_pg_count)];
}
}
else if (old_pg_count > new_pg_count)
{
for (let i = 0; i < new_pg_count; i++)
{
prev_pgs[i] = prev_pgs[Math.round(i/new_pg_count*old_pg_count)];
}
prev_pgs.splice(new_pg_count, old_pg_count-new_pg_count);
}
}
async save_new_pgs(prev_pgs, new_pgs, pg_history, tree_hash)
{
const txn = [], checks = [];
const pg_items = {};
new_pgs.map((osd_set, i) =>
{
osd_set = osd_set.map(osd_num => osd_num === LPOptimizer.NO_OSD ? 0 : osd_num);
const alive_set = osd_set.filter(osd_num => osd_num);
pg_items[i+1] = {
osd_set,
primary: alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0,
};
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' '))
{
pg_history[i] = pg_history[i] || {};
pg_history[i].osd_sets = pg_history[i].osd_sets || [];
pg_history[i].osd_sets.push(prev_pgs[i]);
}
});
for (let i = 0; i < new_pgs.length || i < prev_pgs.length; i++)
{
checks.push({
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
target: 'MOD',
mod_revision: ''+this.etcd_watch_revision,
result: 'LESS',
});
if (pg_history[i])
{
txn.push({
requestPut: {
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
value: b64(JSON.stringify(pg_history[i])),
},
});
}
else
{
txn.push({
requestDeleteRange: {
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
},
});
}
}
this.state.config.pgs = {
hash: tree_hash,
items: pg_items,
};
const res = await this.etcd_call('/txn', {
compare: [
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: ''+this.etcd_watch_revision, result: 'LESS' },
...checks,
],
success: [
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.state.config.pgs)) } },
...txn,
],
}, this.config.etcd_mon_timeout, 0);
return res.succeeded;
}
async recheck_pgs()
@ -282,190 +499,68 @@ class Mon
// Recalculate PGs and save them to etcd if the configuration is changed
const tree_cfg = {
osd_tree: this.get_osd_tree(),
pg_count: this.data.config.global.pg_count || Object.keys(this.data.config.pgs.items||{}).length || 128,
max_osd_combinations: this.data.config.global.max_osd_combinations,
pg_count: this.config.pg_count || Object.keys(this.state.config.pgs.items||{}).length || 128,
max_osd_combinations: this.config.max_osd_combinations,
};
const tree_hash = sha1hex(stableStringify(tree_cfg));
if (this.data.config.pgs.hash != tree_hash)
if (this.state.config.pgs.hash != tree_hash)
{
// Something has changed
const pg_history = [];
const prev_pgs = [];
for (const pg in this.data.config.pgs.items||{})
for (const pg in this.state.config.pgs.items||{})
{
prev_pgs[pg-1] = this.data.config.pgs.items[pg].osd_set;
prev_pgs[pg-1] = this.state.config.pgs.items[pg].osd_set;
}
let pgs;
if (prev_pgs.length > 0)
const pg_history = [];
const old_pg_count = prev_pgs.length;
let optimize_result;
if (old_pg_count > 0)
{
if (prev_pgs.length != tree_cfg.pg_count)
if (old_pg_count != tree_cfg.pg_count)
{
// PG count changed. Need to bring all PGs down.
for (const pg in this.data.config.pgs.items)
if (!await this.stop_all_pgs())
{
const cur_state = ((this.data.pg.state[pg]||{}).state||[]).join(',');
if (cur_state != '' && cur_state != 'offline')
{
await this.stop_all_pgs();
return;
}
}
all_osds = Object.keys(all_osds);
// ...and add all possibly intersecting PGs into the history of new PGs
if (!(tree_cfg.pg_count % prev_pgs.length))
{
// New PG count is a multiple of the old PG count
const mul = (tree_cfg.pg_count / prev_pgs.length);
for (let i = 0; i < tree_cfg.pg_count; i++)
{
const old_i = Math.floor(tree_cfg.pg_count / mul);
pg_history[i] = JSON.parse(JSON.stringify(this.data.pg.history[1+old_i]));
}
}
else if (!(prev_pgs.length % tree_cfg.pg_count))
{
// Old PG count is a multiple of the new PG count
const mul = (prev_pgs.length / tree_cfg.pg_count);
for (let i = 0; i < tree_cfg.pg_count; i++)
{
pg_history[i] = {
osd_sets: [],
all_peers: [],
};
for (let j = 0; j < mul; j++)
{
pg_history[i].osd_sets.push(prev_pgs[i*mul]);
const hist = this.data.pg.history[1+i*mul+j];
if (hist && hist.osd_sets && hist.osd_sets.length)
{
Array.prototype.push.apply(pg_history[i].osd_sets, hist.osd_sets);
}
if (hist && hist.all_peers && hist.all_peers.length)
{
Array.prototype.push.apply(pg_history[i].all_peers, hist.all_peers);
}
}
}
}
else
{
// Any PG may intersect with any PG after non-multiple PG count change
// So, merge ALL PGs history
let all_sets = {};
let all_peers = {};
for (const pg of prev_pgs)
{
all_sets[pg.join(' ')] = pg;
}
for (const pg in this.data.pg.history)
{
const hist = this.data.pg.history[pg];
if (hist && hist.osd_sets)
{
for (const pg of hist.osd_sets)
{
all_sets[pg.join(' ')] = pg;
}
}
if (hist && hist.all_peers)
{
for (const osd_num of hist.all_peers)
{
all_peers[osd_num] = Number(osd_num);
}
}
}
all_sets = Object.values(all_sets);
all_peers = Object.values(all_peers);
for (let i = 0; i < tree_cfg.pg_count; i++)
{
pg_history[i] = { osd_sets: all_sets, all_peers };
}
}
// Mark history keys for removed PGs as removed
for (let i = tree_cfg.pg_count; i < prev_pgs.length; i++)
{
pg_history[i] = null;
this.schedule_recheck();
return;
}
this.scale_pg_count(prev_pgs, pg_history, new_pg_count);
}
if (prev_pgs.length < tree_cfg.pg_count)
{
for (let i = tree_cfg.pg_count-1; i >= 0; i--)
{
prev_pgs[i] = prev_pgs[Math.floor(i/tree_cfg.pg_count*prev_pgs.length)];
}
}
else if (prev_pgs.length > tree_cfg.pg_count)
{
for (let i = 0; i < tree_cfg.pg_count; i++)
{
prev_pgs[i] = prev_pgs[Math.round(i/tree_cfg.pg_count*prev_pgs.length)];
}
prev_pgs.splice(tree_cfg.pg_count, prev_pgs.length-tree_cfg.pg_count);
}
pgs = LPOptimizer.optimize_change(prev_pgs, tree_cfg.osd_tree, tree_cfg.max_osd_combinations);
optimize_result = await LPOptimizer.optimize_change(prev_pgs, tree_cfg.osd_tree, tree_cfg.max_osd_combinations);
}
else
{
pgs = LPOptimizer.optimize_initial(tree_cfg.osd_tree, tree_cfg.pg_count, tree_cfg.max_osd_combinations);
optimize_result = await LPOptimizer.optimize_initial(tree_cfg.osd_tree, tree_cfg.pg_count, tree_cfg.max_osd_combinations);
}
// FIXME: Handle insufficient failure domain count
const txn = [];
const pg_items = {};
pgs.map((osd_set, i) =>
if (!await this.save_new_pgs(prev_pgs, optimize_result.int_pgs, pg_history, tree_hash))
{
const alive_set = osd_set.filter(osd_num => osd_num);
pg_items[i+1] = {
osd_set,
primary: alive_set.length ? alive_set[Math.floor(Math.random()*alive_set.length)] : 0,
};
if (prev_pgs[i] && prev_pgs[i].join(' ') != osd_set.join(' '))
{
pg_history[i] = pg_history[i] || {};
pg_history[i].osd_sets = pg_history[i].osd_sets || [];
pg_history[i].osd_sets.push(prev_pgs[i]);
}
});
for (let i = 0; i < tree_cfg.pg_count || i < prev_pgs.length; i++)
{
if (pg_history[i])
{
txn.push({
requestPut: {
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
value: b64(JSON.stringify(pg_history[i])),
},
});
}
else
{
txn.push({
requestDeleteRange: {
key: b64(this.etcd_prefix+'/pg/history/'+(i+1)),
},
});
}
console.log('Someone changed PG configuration while we also tried to change it. Retrying in '+this.config.mon_change_timeout+' ms');
this.schedule_recheck();
return;
}
this.data.config.pgs = {
hash: tree_hash,
count: tree_cfg.pg_count,
items: pg_items,
};
const res = await this.etcd_call('/txn', {
compare: [
{ key: b64(this.etcd_prefix+'/mon/master'), target: 'LEASE', lease: ''+this.etcd_lease_id },
{ key: b64(this.etcd_prefix+'/config/pgs'), target: 'MOD', mod_revision: this.etcd_watch_revision, result: 'LESS' },
{ key: b64(this.etcd_prefix+'/pg/change_stamp'), target: 'MOD', mod_revision: this.etcd_watch_revision, result: 'LESS' },
],
success: [
{ requestPut: { key: b64(this.etcd_prefix+'/config/pgs'), value: b64(JSON.stringify(this.data.config.pgs)) } },
...txn,
],
}, this.config.etcd_mon_timeout, 0);
console.log('PG configuration successfully changed');
if (old_pg_count != optimize_result.int_pgs.length)
{
console.log(`PG count changed from: ${old_pg_count} to ${optimize_result.int_pgs.length}`);
}
LPOptimizer.print_change_stats(optimize_result);
}
}
schedule_recheck()
{
if (this.changeTimer)
{
clearTimeout(this.changeTimer);
this.changeTimer = null;
}
this.changeTimer = setTimeout(() =>
{
this.changeTimer = null;
this.recheck_pgs().catch(console.error);
}, this.config.mon_change_timeout || 1000);
}
parse_kv(kv)
{
if (!kv || !kv.key)
@ -475,7 +570,7 @@ class Mon
kv.key = de64(kv.key);
kv.value = kv.value ? JSON.parse(de64(kv.value)) : null;
const key = kv.key.substr(this.etcd_prefix.length).replace(/^\/+/, '').split('/');
const cur = this.data, orig = Mon.etcd_tree;
const cur = this.state, orig = Mon.etcd_tree;
for (let i = 0; i < key.length-1; i++)
{
if (!orig[key[i]])
@ -494,12 +589,9 @@ class Mon
cur[key[key.length-1]] = kv.value;
if (key.join('/') === 'config/global')
{
this.data.config.global = this.data.config.global || {};
this.data.config.global.max_osd_combinations = Number(this.data.config.global.max_osd_combinations) || 10000;
if (this.data.config.global.max_osd_combinations < 100)
{
this.data.config.global.max_osd_combinations = 100;
}
this.state.config.global = this.state.config.global || {};
this.config = this.state.config.global;
this.check_config();
}
}