Add pool support to the cluster client

Vitaliy Filippov 2020-09-03 00:52:41 +03:00
parent 293cb5bd1d
commit 4cdad634b5
5 changed files with 83 additions and 62 deletions

View File

@ -108,11 +108,6 @@ void cluster_client_t::continue_ops()
tfd->clear_timer(retry_timeout_id);
retry_timeout_id = 0;
}
if (!pg_count)
{
// Config is not loaded yet
return;
}
for (auto op_it = cur_ops.begin(); op_it != cur_ops.end(); )
{
continue_rw(*op_it++);
@ -156,6 +151,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
{
throw std::runtime_error("Bad block size");
}
// FIXME: pg_stripe_size may be a per-pool config
if (config.find("pg_stripe_size") != config.end())
{
pg_stripe_size = config["pg_stripe_size"].uint64_value();
@ -192,46 +188,47 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config)
void cluster_client_t::on_load_pgs_hook(bool success)
{
if (success)
for (auto pool_item: st_cli.pool_config)
{
pg_count = st_cli.pg_config.size();
if (pg_count)
{
for (auto op: offline_ops)
{
execute(op);
}
offline_ops.clear();
}
pg_counts[pool_item.first] = pool_item.second.real_pg_count;
}
for (auto op: offline_ops)
{
execute(op);
}
offline_ops.clear();
}
void cluster_client_t::on_change_hook(json11::Json::object & changes)
{
if (pg_count != st_cli.pg_config.size())
for (auto pool_item: st_cli.pool_config)
{
// At this point, all operations should be suspended
// And they have to be resliced!
for (auto op: cur_ops)
if (pg_counts[pool_item.first] != pool_item.second.real_pg_count)
{
op->needs_reslice = true;
}
for (auto op: unsynced_writes)
{
op->needs_reslice = true;
}
for (auto op: syncing_writes)
{
op->needs_reslice = true;
}
pg_count = st_cli.pg_config.size();
if (pg_count)
{
for (auto op: offline_ops)
// At this point, all pool operations should have been suspended
// And now they have to be resliced!
for (auto op: cur_ops)
{
execute(op);
if (INODE_POOL(op->inode) == pool_item.first)
{
op->needs_reslice = true;
}
}
offline_ops.clear();
for (auto op: unsynced_writes)
{
if (INODE_POOL(op->inode) == pool_item.first)
{
op->needs_reslice = true;
}
}
for (auto op: syncing_writes)
{
if (INODE_POOL(op->inode) == pool_item.first)
{
op->needs_reslice = true;
}
}
pg_counts[pool_item.first] = pool_item.second.real_pg_count;
}
}
continue_ops();
@ -275,13 +272,13 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd)
void cluster_client_t::execute(cluster_op_t *op)
{
op->retval = 0;
if (!pg_count)
if (!bs_disk_alignment)
{
// Config is not loaded yet, retry after connecting to etcd
// We're offline
offline_ops.push_back(op);
return;
}
op->retval = 0;
if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE ||
(op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len ||
op->offset % bs_disk_alignment || op->len % bs_disk_alignment))
@ -322,8 +319,11 @@ void cluster_client_t::execute(cluster_op_t *op)
void cluster_client_t::continue_rw(cluster_op_t *op)
{
if (!pg_count)
pool_id_t pool_id = INODE_POOL(op->inode);
if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
st_cli.pool_config[pool_id].real_pg_count == 0)
{
// Postpone operations to unknown pools
return;
}
if (op->opcode == OSD_OP_WRITE && !immediate_commit && !op->is_internal)
@ -434,7 +434,10 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
{
// Slice the request into individual object stripe requests
// Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC
uint64_t pg_block_size = bs_block_size * pg_part_count;
auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)];
uint64_t pg_block_size = bs_block_size * (
pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_minsize
);
uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size;
uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size;
op->retval = 0;
@ -444,7 +447,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
int i = 0;
for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size)
{
pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pg_count + 1;
pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pool_cfg.real_pg_count + 1;
uint64_t begin = (op->offset < stripe ? stripe : op->offset);
uint64_t end = (op->offset + op->len) > (stripe + pg_block_size)
? (stripe + pg_block_size) : (op->offset + op->len);
@ -480,8 +483,9 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part)
{
auto pg_it = st_cli.pg_config.find(part->pg_num);
if (pg_it != st_cli.pg_config.end() &&
auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)];
auto pg_it = pool_cfg.pg_config.find(part->pg_num);
if (pg_it != pool_cfg.pg_config.end() &&
!pg_it->second.pause && pg_it->second.cur_primary)
{
osd_num_t primary_osd = pg_it->second.cur_primary;

View File

@ -50,12 +50,11 @@ class cluster_client_t
timerfd_manager_t *tfd;
ring_loop_t *ringloop;
uint64_t pg_part_count = 2;
uint64_t pg_stripe_size = 0;
uint64_t bs_block_size = 0;
uint64_t bs_disk_alignment = 0;
uint64_t bs_bitmap_granularity = 0;
uint64_t pg_count = 0;
std::map<pool_id_t, uint64_t> pg_counts;
bool immediate_commit = false;
// FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory.
uint64_t client_dirty_limit = 0;

View File

@ -346,18 +346,19 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
printf("Pool %lu has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
continue;
}
this->pool_config[pool_id].exists = true;
this->pool_config[pool_id].id = pool_id;
this->pool_config[pool_id].name = pool_item.second["name"].string_value();
this->pool_config[pool_id].scheme = pool_item.second["scheme"] == "replicated" ? POOL_SCHEME_REPLICATED : POOL_SCHEME_XOR;
this->pool_config[pool_id].pg_size = pool_item.second["pg_size"].uint64_value();
this->pool_config[pool_id].pg_minsize = pool_item.second["pg_minsize"].uint64_value();
this->pool_config[pool_id].pg_count = pool_item.second["pg_count"].uint64_value();
this->pool_config[pool_id].failure_domain = pool_item.second["failure_domain"].string_value();
this->pool_config[pool_id].max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value();
if (!this->pool_config[pool_id].max_osd_combinations)
auto & parsed_cfg = this->pool_config[pool_id];
parsed_cfg.exists = true;
parsed_cfg.id = pool_id;
parsed_cfg.name = pool_item.second["name"].string_value();
parsed_cfg.scheme = pool_item.second["scheme"] == "replicated" ? POOL_SCHEME_REPLICATED : POOL_SCHEME_XOR;
parsed_cfg.pg_size = pool_item.second["pg_size"].uint64_value();
parsed_cfg.pg_minsize = pool_item.second["pg_minsize"].uint64_value();
parsed_cfg.pg_count = pool_item.second["pg_count"].uint64_value();
parsed_cfg.failure_domain = pool_item.second["failure_domain"].string_value();
parsed_cfg.max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value();
if (!parsed_cfg.max_osd_combinations)
{
this->pool_config[pool_id].max_osd_combinations = 10000;
parsed_cfg.max_osd_combinations = 10000;
}
}
}
@ -397,6 +398,27 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
}
}
}
for (auto & pool_item: this->pool_config)
{
int n = 0;
for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
{
if (pg_it->second.exists && pg_it->first != ++n)
{
printf(
"Invalid pool %lu PG configuration: PG numbers don't cover whole 1..%lu range\n",
pool_item.second.id, pool_item.second.pg_config.size()
);
for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
{
pg_it->second.exists = false;
}
n = 0;
break;
}
}
pool_item.second.real_pg_count = n;
}
}
else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/")
{

View File

@ -17,6 +17,7 @@
#define POOL_SCHEME_XOR 2
#define POOL_ID_MAX 0x10000
#define POOL_ID_BITS 16
#define INODE_POOL(inode) ((inode) >> (64 - POOL_ID_BITS))
struct json_kv_t
{
@ -47,6 +48,7 @@ struct pool_config_t
uint64_t scheme;
uint64_t pg_size, pg_minsize;
uint64_t pg_count;
uint64_t real_pg_count;
std::string failure_domain;
uint64_t max_osd_combinations;
std::map<pg_num_t, pg_config_t> pg_config;

View File

@ -452,12 +452,6 @@ void osd_t::on_load_pgs_hook(bool success)
void osd_t::apply_pg_count()
{
pg_num_t pg_count = st_cli.pg_config.size();
if (pg_count > 0 && (st_cli.pg_config.begin()->first != 1 || std::prev(st_cli.pg_config.end())->first != pg_count))
{
printf("Invalid PG configuration: PG numbers don't cover the whole 1..%d range\n", pg_count);
force_stop(1);
return;
}
if (this->pg_count != 0 && this->pg_count != pg_count)
{
// Check that all PGs are offline. It is not allowed to change PG count when any PGs are online