diff --git a/cluster_client.cpp b/cluster_client.cpp index 96721808..7b12da6e 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -108,11 +108,6 @@ void cluster_client_t::continue_ops() tfd->clear_timer(retry_timeout_id); retry_timeout_id = 0; } - if (!pg_count) - { - // Config is not loaded yet - return; - } for (auto op_it = cur_ops.begin(); op_it != cur_ops.end(); ) { continue_rw(*op_it++); @@ -156,6 +151,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) { throw std::runtime_error("Bad block size"); } + // FIXME: pg_stripe_size may be a per-pool config if (config.find("pg_stripe_size") != config.end()) { pg_stripe_size = config["pg_stripe_size"].uint64_value(); @@ -192,46 +188,47 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) void cluster_client_t::on_load_pgs_hook(bool success) { - if (success) + for (auto pool_item: st_cli.pool_config) { - pg_count = st_cli.pg_config.size(); - if (pg_count) - { - for (auto op: offline_ops) - { - execute(op); - } - offline_ops.clear(); - } + pg_counts[pool_item.first] = pool_item.second.real_pg_count; } + for (auto op: offline_ops) + { + execute(op); + } + offline_ops.clear(); } void cluster_client_t::on_change_hook(json11::Json::object & changes) { - if (pg_count != st_cli.pg_config.size()) + for (auto pool_item: st_cli.pool_config) { - // At this point, all operations should be suspended - // And they have to be resliced! - for (auto op: cur_ops) + if (pg_counts[pool_item.first] != pool_item.second.real_pg_count) { - op->needs_reslice = true; - } - for (auto op: unsynced_writes) - { - op->needs_reslice = true; - } - for (auto op: syncing_writes) - { - op->needs_reslice = true; - } - pg_count = st_cli.pg_config.size(); - if (pg_count) - { - for (auto op: offline_ops) + // At this point, all pool operations should have been suspended + // And now they have to be resliced! + for (auto op: cur_ops) { - execute(op); + if (INODE_POOL(op->inode) == pool_item.first) + { + op->needs_reslice = true; + } } - offline_ops.clear(); + for (auto op: unsynced_writes) + { + if (INODE_POOL(op->inode) == pool_item.first) + { + op->needs_reslice = true; + } + } + for (auto op: syncing_writes) + { + if (INODE_POOL(op->inode) == pool_item.first) + { + op->needs_reslice = true; + } + } + pg_counts[pool_item.first] = pool_item.second.real_pg_count; } } continue_ops(); @@ -275,13 +272,13 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd) void cluster_client_t::execute(cluster_op_t *op) { - op->retval = 0; - if (!pg_count) + if (!bs_disk_alignment) { - // Config is not loaded yet, retry after connecting to etcd + // We're offline offline_ops.push_back(op); return; } + op->retval = 0; if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE || (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len || op->offset % bs_disk_alignment || op->len % bs_disk_alignment)) @@ -322,8 +319,11 @@ void cluster_client_t::execute(cluster_op_t *op) void cluster_client_t::continue_rw(cluster_op_t *op) { - if (!pg_count) + pool_id_t pool_id = INODE_POOL(op->inode); + if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() || + st_cli.pool_config[pool_id].real_pg_count == 0) { + // Postpone operations to unknown pools return; } if (op->opcode == OSD_OP_WRITE && !immediate_commit && !op->is_internal) @@ -434,7 +434,10 @@ void cluster_client_t::slice_rw(cluster_op_t *op) { // Slice the request into individual object stripe requests // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC - uint64_t pg_block_size = bs_block_size * pg_part_count; + auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)]; + uint64_t pg_block_size = bs_block_size * ( + pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_minsize + ); uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; op->retval = 0; @@ -444,7 +447,7 @@ void cluster_client_t::slice_rw(cluster_op_t *op) int i = 0; for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) { - pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pg_count + 1; + pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pool_cfg.real_pg_count + 1; uint64_t begin = (op->offset < stripe ? stripe : op->offset); uint64_t end = (op->offset + op->len) > (stripe + pg_block_size) ? (stripe + pg_block_size) : (op->offset + op->len); @@ -480,8 +483,9 @@ void cluster_client_t::slice_rw(cluster_op_t *op) bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part) { - auto pg_it = st_cli.pg_config.find(part->pg_num); - if (pg_it != st_cli.pg_config.end() && + auto & pool_cfg = st_cli.pool_config[INODE_POOL(op->inode)]; + auto pg_it = pool_cfg.pg_config.find(part->pg_num); + if (pg_it != pool_cfg.pg_config.end() && !pg_it->second.pause && pg_it->second.cur_primary) { osd_num_t primary_osd = pg_it->second.cur_primary; diff --git a/cluster_client.h b/cluster_client.h index ac0c4a86..7571c65b 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -50,12 +50,11 @@ class cluster_client_t timerfd_manager_t *tfd; ring_loop_t *ringloop; - uint64_t pg_part_count = 2; uint64_t pg_stripe_size = 0; uint64_t bs_block_size = 0; uint64_t bs_disk_alignment = 0; uint64_t bs_bitmap_granularity = 0; - uint64_t pg_count = 0; + std::map pg_counts; bool immediate_commit = false; // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory. uint64_t client_dirty_limit = 0; diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp index c73f5e0b..9a95fb34 100644 --- a/etcd_state_client.cpp +++ b/etcd_state_client.cpp @@ -346,18 +346,19 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso printf("Pool %lu has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id); continue; } - this->pool_config[pool_id].exists = true; - this->pool_config[pool_id].id = pool_id; - this->pool_config[pool_id].name = pool_item.second["name"].string_value(); - this->pool_config[pool_id].scheme = pool_item.second["scheme"] == "replicated" ? POOL_SCHEME_REPLICATED : POOL_SCHEME_XOR; - this->pool_config[pool_id].pg_size = pool_item.second["pg_size"].uint64_value(); - this->pool_config[pool_id].pg_minsize = pool_item.second["pg_minsize"].uint64_value(); - this->pool_config[pool_id].pg_count = pool_item.second["pg_count"].uint64_value(); - this->pool_config[pool_id].failure_domain = pool_item.second["failure_domain"].string_value(); - this->pool_config[pool_id].max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value(); - if (!this->pool_config[pool_id].max_osd_combinations) + auto & parsed_cfg = this->pool_config[pool_id]; + parsed_cfg.exists = true; + parsed_cfg.id = pool_id; + parsed_cfg.name = pool_item.second["name"].string_value(); + parsed_cfg.scheme = pool_item.second["scheme"] == "replicated" ? POOL_SCHEME_REPLICATED : POOL_SCHEME_XOR; + parsed_cfg.pg_size = pool_item.second["pg_size"].uint64_value(); + parsed_cfg.pg_minsize = pool_item.second["pg_minsize"].uint64_value(); + parsed_cfg.pg_count = pool_item.second["pg_count"].uint64_value(); + parsed_cfg.failure_domain = pool_item.second["failure_domain"].string_value(); + parsed_cfg.max_osd_combinations = pool_item.second["max_osd_combinations"].uint64_value(); + if (!parsed_cfg.max_osd_combinations) { - this->pool_config[pool_id].max_osd_combinations = 10000; + parsed_cfg.max_osd_combinations = 10000; } } } @@ -397,6 +398,27 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso } } } + for (auto & pool_item: this->pool_config) + { + int n = 0; + for (auto pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) + { + if (pg_it->second.exists && pg_it->first != ++n) + { + printf( + "Invalid pool %lu PG configuration: PG numbers don't cover whole 1..%lu range\n", + pool_item.second.id, pool_item.second.pg_config.size() + ); + for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++) + { + pg_it->second.exists = false; + } + n = 0; + break; + } + } + pool_item.second.real_pg_count = n; + } } else if (key.substr(0, etcd_prefix.length()+12) == etcd_prefix+"/pg/history/") { diff --git a/etcd_state_client.h b/etcd_state_client.h index c608ea62..a6b27b11 100644 --- a/etcd_state_client.h +++ b/etcd_state_client.h @@ -17,6 +17,7 @@ #define POOL_SCHEME_XOR 2 #define POOL_ID_MAX 0x10000 #define POOL_ID_BITS 16 +#define INODE_POOL(inode) ((inode) >> (64 - POOL_ID_BITS)) struct json_kv_t { @@ -47,6 +48,7 @@ struct pool_config_t uint64_t scheme; uint64_t pg_size, pg_minsize; uint64_t pg_count; + uint64_t real_pg_count; std::string failure_domain; uint64_t max_osd_combinations; std::map pg_config; diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 772f70d1..c1b6277c 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -452,12 +452,6 @@ void osd_t::on_load_pgs_hook(bool success) void osd_t::apply_pg_count() { pg_num_t pg_count = st_cli.pg_config.size(); - if (pg_count > 0 && (st_cli.pg_config.begin()->first != 1 || std::prev(st_cli.pg_config.end())->first != pg_count)) - { - printf("Invalid PG configuration: PG numbers don't cover the whole 1..%d range\n", pg_count); - force_stop(1); - return; - } if (this->pg_count != 0 && this->pg_count != pg_count) { // Check that all PGs are offline. It is not allowed to change PG count when any PGs are online