Add pool support to OSD, part 1

This just fixes all the code so it builds and works like before,
but doesn't yet bring the support for replicated pools.
Vitaliy Filippov 2020-09-04 10:54:21 +03:00
parent 4cdad634b5
commit 168cc2c803
12 changed files with 247 additions and 204 deletions

View File

@ -315,35 +315,35 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
if (pool_item.second["pg_size"].uint64_value() < 1 ||
pool_item.second["scheme"] == "xor" && pool_item.second["pg_size"].uint64_value() < 3)
{
printf("Pool %lu has invalid pg_size, skipping pool\n", pool_id);
printf("Pool %u has invalid pg_size, skipping pool\n", pool_id);
continue;
}
if (pool_item.second["pg_minsize"].uint64_value() < 1 ||
pool_item.second["pg_minsize"].uint64_value() > pool_item.second["pg_size"].uint64_value() ||
pool_item.second["pg_minsize"].uint64_value() < (pool_item.second["pg_size"].uint64_value() - 1))
{
printf("Pool %lu has invalid pg_minsize, skipping pool\n", pool_id);
printf("Pool %u has invalid pg_minsize, skipping pool\n", pool_id);
continue;
}
if (pool_item.second["pg_count"].uint64_value() < 1)
{
printf("Pool %lu has invalid pg_count, skipping pool\n", pool_id);
printf("Pool %u has invalid pg_count, skipping pool\n", pool_id);
continue;
}
if (pool_item.second["name"].string_value() == "")
{
printf("Pool %lu has empty name, skipping pool\n", pool_id);
printf("Pool %u has empty name, skipping pool\n", pool_id);
continue;
}
if (pool_item.second["scheme"] != "replicated" && pool_item.second["scheme"] != "xor")
{
printf("Pool %lu has invalid coding scheme (only \"xor\" and \"replicated\" are allowed), skipping pool\n", pool_id);
printf("Pool %u has invalid coding scheme (only \"xor\" and \"replicated\" are allowed), skipping pool\n", pool_id);
continue;
}
if (pool_item.second["max_osd_combinations"].uint64_value() > 0 &&
pool_item.second["max_osd_combinations"].uint64_value() < 100)
{
printf("Pool %lu has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
printf("Pool %u has invalid max_osd_combinations (must be at least 100), skipping pool\n", pool_id);
continue;
}
auto & parsed_cfg = this->pool_config[pool_id];
@ -360,6 +360,15 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
{
parsed_cfg.max_osd_combinations = 10000;
}
for (auto & pg_item: parsed_cfg.pg_config)
{
if (pg_item.second.target_set.size() != parsed_cfg.pg_size)
{
printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
pool_id, pg_item.first, pg_item.second.target_set.size(), parsed_cfg.pg_size);
pg_item.second.pause = true;
}
}
}
}
else if (key == etcd_prefix+"/config/pgs")
@ -384,7 +393,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
pg_num_t pg_num = stoull_full(pg_item.first);
if (!pg_num)
{
printf("Bad key in pool %lu PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str());
printf("Bad key in pool %u PG configuration: %s (must be a number), skipped\n", pool_id, pg_item.first.c_str());
continue;
}
auto & parsed_cfg = this->pool_config[pool_id].pg_config[pg_num];
@ -396,6 +405,12 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
{
parsed_cfg.target_set.push_back(pg_osd.uint64_value());
}
if (parsed_cfg.target_set.size() != pool_config[pool_id].pg_size)
{
printf("Pool %u PG %u configuration is invalid: osd_set size %lu != pool pg_size %lu\n",
pool_id, pg_num, parsed_cfg.target_set.size(), pool_config[pool_id].pg_size);
parsed_cfg.pause = true;
}
}
}
for (auto & pool_item: this->pool_config)
@ -406,7 +421,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
if (pg_it->second.exists && pg_it->first != ++n)
{
printf(
"Invalid pool %lu PG configuration: PG numbers don't cover whole 1..%lu range\n",
"Invalid pool %u PG configuration: PG numbers don't cover whole 1..%lu range\n",
pool_item.second.id, pool_item.second.pg_config.size()
);
for (pg_it = pool_item.second.pg_config.begin(); pg_it != pool_item.second.pg_config.end(); pg_it++)
@ -426,7 +441,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
pool_id_t pool_id = 0;
pg_num_t pg_num = 0;
char null_byte = 0;
sscanf(key.c_str() + etcd_prefix.length()+12, "%lu/%u%c", &pool_id, &pg_num, &null_byte);
sscanf(key.c_str() + etcd_prefix.length()+12, "%u/%u%c", &pool_id, &pg_num, &null_byte);
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
{
printf("Bad etcd key %s, ignoring\n", key.c_str());
@ -465,7 +480,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
pool_id_t pool_id = 0;
pg_num_t pg_num = 0;
char null_byte = 0;
sscanf(key.c_str() + etcd_prefix.length()+10, "%lu/%u%c", &pool_id, &pg_num, &null_byte);
sscanf(key.c_str() + etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte);
if (!pool_id || pool_id >= POOL_ID_MAX || !pg_num || null_byte != 0)
{
printf("Bad etcd key %s, ignoring\n", key.c_str());
@ -492,7 +507,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
}
if (i >= pg_state_bit_count)
{
printf("Unexpected PG %u state keyword in etcd: %s\n", pg_num, e.dump().c_str());
printf("Unexpected pool %u PG %u state keyword in etcd: %s\n", pool_id, pg_num, e.dump().c_str());
return;
}
}
@ -501,7 +516,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso
(state & PG_PEERING) && state != PG_PEERING ||
(state & PG_INCOMPLETE) && state != PG_INCOMPLETE)
{
printf("Unexpected PG %u state in etcd: primary=%lu, state=%s\n", pg_num, cur_primary, value["state"].dump().c_str());
printf("Unexpected pool %u PG %u state in etcd: primary=%lu, state=%s\n", pool_id, pg_num, cur_primary, value["state"].dump().c_str());
return;
}
this->pool_config[pool_id].pg_config[pg_num].cur_primary = cur_primary;

View File

@ -13,12 +13,6 @@
#define ETCD_SLOW_TIMEOUT 5000
#define ETCD_QUICK_TIMEOUT 1000
#define POOL_SCHEME_REPLICATED 1
#define POOL_SCHEME_XOR 2
#define POOL_ID_MAX 0x10000
#define POOL_ID_BITS 16
#define INODE_POOL(inode) ((inode) >> (64 - POOL_ID_BITS))
struct json_kv_t
{
std::string key;
@ -38,8 +32,6 @@ struct pg_config_t
uint64_t epoch;
};
typedef uint64_t pool_id_t;
struct pool_config_t
{
bool exists;

View File

@ -193,7 +193,7 @@ struct osd_client_t
std::map<int, osd_op_t*> sent_ops;
// PGs dirtied by this client's primary-writes
std::set<pg_num_t> dirty_pgs;
std::set<pool_pg_num_t> dirty_pgs;
// Write state
osd_op_t *write_op = NULL;

24
osd.h
View File

@ -48,7 +48,6 @@ struct osd_recovery_op_t
{
int st = 0;
bool degraded = false;
pg_num_t pg_num = 0;
object_id oid = { 0 };
osd_op_t *osd_op = NULL;
};
@ -82,18 +81,18 @@ class osd_t
std::string etcd_lease_id;
json11::Json self_state;
bool loading_peer_config = false;
std::set<pg_num_t> pg_state_dirty;
std::set<pool_pg_num_t> pg_state_dirty;
bool pg_config_applied = false;
bool etcd_reporting_pg_state = false;
bool etcd_reporting_stats = false;
// peers and PGs
std::map<pg_num_t, pg_t> pgs;
std::set<pg_num_t> dirty_pgs;
std::map<pool_id_t, pg_num_t> pg_counts;
std::map<pool_pg_num_t, pg_t> pgs;
std::set<pool_pg_num_t> dirty_pgs;
uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0;
int peering_state = 0;
unsigned pg_count = 0;
std::map<object_id, osd_recovery_op_t> recovery_ops;
osd_op_t *autosync_op = NULL;
@ -126,7 +125,7 @@ class osd_t
void parse_config(blockstore_config_t & config);
void init_cluster();
void on_change_osd_state_hook(osd_num_t peer_osd);
void on_change_pg_history_hook(pg_num_t pg_num);
void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num);
void on_change_etcd_state_hook(json11::Json::object & changes);
void on_load_config_hook(json11::Json::object & changes);
json11::Json on_load_pgs_checks_hook();
@ -152,17 +151,17 @@ class osd_t
void parse_test_peer(std::string peer);
void handle_peers();
void repeer_pgs(osd_num_t osd_num);
void start_pg_peering(pg_num_t pg_num);
void start_pg_peering(pg_t & pg);
void submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
void discard_list_subop(osd_op_t *list_op);
bool stop_pg(pg_num_t pg_num);
bool stop_pg(pg_t & pg);
void finish_stop_pg(pg_t & pg);
// flushing, recovery and backfill
void submit_pg_flush_ops(pg_num_t pg_num);
void handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval);
void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
void submit_pg_flush_ops(pg_t & pg);
void handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval);
void submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data);
bool pick_next_recovery(osd_recovery_op_t &op);
void submit_recovery_op(osd_recovery_op_t *op);
bool continue_recovery();
@ -200,6 +199,9 @@ class osd_t
inline pg_num_t map_to_pg(object_id oid)
{
uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];
if (!pg_count)
pg_count = 1;
return (oid.inode + oid.stripe / pg_stripe_size) % pg_count + 1;
}

View File

@ -14,7 +14,7 @@ void osd_t::init_cluster()
{
if (run_primary)
{
// Test version of clustering code with 1 PG and 2 peers
// Test version of clustering code with 1 pool, 1 PG and 2 peers
// Example: peers = 2:127.0.0.1:11204,3:127.0.0.1:11205
std::string peerstr = config["peers"];
while (peerstr.size())
@ -27,15 +27,16 @@ void osd_t::init_cluster()
{
throw std::runtime_error("run_primary requires at least 2 peers");
}
pgs[1] = (pg_t){
pgs[{ 1, 1 }] = (pg_t){
.state = PG_PEERING,
.pg_cursize = 0,
.pool_id = 1,
.pg_num = 1,
.target_set = { 1, 2, 3 },
.cur_set = { 0, 0, 0 },
};
report_pg_state(pgs[1]);
pg_count = 1;
report_pg_state(pgs[{ 1, 1 }]);
pg_counts[1] = 1;
}
bind_socket();
}
@ -44,7 +45,7 @@ void osd_t::init_cluster()
st_cli.tfd = tfd;
st_cli.log_level = log_level;
st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); };
st_cli.on_change_pg_history_hook = [this](pg_num_t pg_num) { on_change_pg_history_hook(pg_num); };
st_cli.on_change_pg_history_hook = [this](pool_id_t pool_id, pg_num_t pg_num) { on_change_pg_history_hook(pool_id, pg_num); };
st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); };
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); };
@ -223,13 +224,16 @@ void osd_t::on_change_etcd_state_hook(json11::Json::object & changes)
apply_pg_config();
}
void osd_t::on_change_pg_history_hook(pg_num_t pg_num)
void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num)
{
auto pg_it = pgs.find(pg_num);
auto pg_it = pgs.find({
.pool_id = pool_id,
.pg_num = pg_num,
});
if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch &&
st_cli.pg_config[pg_num].epoch >= pg_it->second.epoch)
st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg_it->second.epoch)
{
pg_it->second.reported_epoch = st_cli.pg_config[pg_num].epoch;
pg_it->second.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch;
object_id oid = { 0 };
bool first = true;
for (auto op: pg_it->second.write_queue)
@ -451,144 +455,156 @@ void osd_t::on_load_pgs_hook(bool success)
void osd_t::apply_pg_count()
{
pg_num_t pg_count = st_cli.pg_config.size();
if (this->pg_count != 0 && this->pg_count != pg_count)
for (auto & pool_item: st_cli.pool_config)
{
// Check that all PGs are offline. It is not allowed to change PG count when any PGs are online
// The external tool must wait for all PGs to come down before changing PG count
// If it doesn't wait, a restarted OSD may apply the new count immediately which will lead to bugs
// So an OSD just dies if it detects PG count change while there are active PGs
int still_active = 0;
for (auto & kv: pgs)
if (pool_item.second.real_pg_count != 0 &&
pool_item.second.real_pg_count != pg_counts[pool_item.first])
{
if (kv.second.state & PG_ACTIVE)
// Check that all pool PGs are offline. It is not allowed to change PG count when any PGs are online
// The external tool must wait for all PGs to come down before changing PG count
// If it doesn't wait, a restarted OSD may apply the new count immediately which will lead to bugs
// So an OSD just dies if it detects PG count change while there are active PGs
int still_active = 0;
for (auto & kv: pgs)
{
still_active++;
if (kv.first.pool_id == pool_item.first && (kv.second.state & PG_ACTIVE))
{
still_active++;
}
}
if (still_active > 0)
{
printf("[OSD %lu] PG count change detected, but %d PG(s) are still active. This is not allowed. Exiting\n", this->osd_num, still_active);
force_stop(1);
return;
}
}
if (still_active > 0)
{
printf("[OSD %lu] PG count change detected, but %d PG(s) are still active. This is not allowed. Exiting\n", this->osd_num, still_active);
force_stop(1);
return;
}
this->pg_counts[pool_item.first] = pool_item.second.real_pg_count;
}
this->pg_count = pg_count;
}
void osd_t::apply_pg_config()
{
bool all_applied = true;
for (auto & kv: st_cli.pg_config)
for (auto & pool_item: st_cli.pool_config)
{
pg_num_t pg_num = kv.first;
auto & pg_cfg = kv.second;
bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num &&
!pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num);
bool currently_taken = this->pgs.find(pg_num) != this->pgs.end() &&
this->pgs[pg_num].state != PG_OFFLINE;
if (currently_taken && !take)
auto pool_id = pool_item.first;
for (auto & kv: pool_item.second.pg_config)
{
// Stop this PG
stop_pg(pg_num);
}
else if (take)
{
// Take this PG
std::set<osd_num_t> all_peers;
for (osd_num_t pg_osd: pg_cfg.target_set)
pg_num_t pg_num = kv.first;
auto & pg_cfg = kv.second;
bool take = pg_cfg.exists && pg_cfg.primary == this->osd_num &&
!pg_cfg.pause && (!pg_cfg.cur_primary || pg_cfg.cur_primary == this->osd_num);
auto pg_it = this->pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
bool currently_taken = pg_it != this->pgs.end() && pg_it->second.state != PG_OFFLINE;
if (currently_taken && !take)
{
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
// Stop this PG
stop_pg(pg_it->second);
}
for (osd_num_t pg_osd: pg_cfg.all_peers)
else if (take)
{
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
for (auto & hist_item: pg_cfg.target_history)
{
for (auto pg_osd: hist_item)
// Take this PG
std::set<osd_num_t> all_peers;
for (osd_num_t pg_osd: pg_cfg.target_set)
{
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
}
if (currently_taken)
{
if (this->pgs[pg_num].state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING))
for (osd_num_t pg_osd: pg_cfg.all_peers)
{
if (this->pgs[pg_num].target_set == pg_cfg.target_set)
if (pg_osd != 0)
{
// No change in osd_set; history changes are ignored
continue;
all_peers.insert(pg_osd);
}
else
}
for (auto & hist_item: pg_cfg.target_history)
{
for (auto pg_osd: hist_item)
{
// Stop PG, reapply change after stopping
stop_pg(pg_num);
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
}
if (currently_taken)
{
if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING))
{
if (pg_it->second.target_set == pg_cfg.target_set)
{
// No change in osd_set; history changes are ignored
continue;
}
else
{
// Stop PG, reapply change after stopping
stop_pg(pg_it->second);
all_applied = false;
continue;
}
}
else if (pg_it->second.state & PG_STOPPING)
{
// Reapply change after stopping
all_applied = false;
continue;
}
}
else if (this->pgs[pg_num].state & PG_STOPPING)
{
// Reapply change after stopping
all_applied = false;
continue;
}
else if (this->pgs[pg_num].state & PG_STARTING)
{
if (pg_cfg.cur_primary == this->osd_num)
else if (pg_it->second.state & PG_STARTING)
{
// PG locked, continue
if (pg_cfg.cur_primary == this->osd_num)
{
// PG locked, continue
}
else
{
// Reapply change after locking the PG
all_applied = false;
continue;
}
}
else
{
// Reapply change after locking the PG
all_applied = false;
continue;
throw std::runtime_error("Unexpected PG "+std::to_string(pg_num)+" state: "+std::to_string(pg_it->second.state));
}
}
auto & pg = this->pgs[{ .pool_id = pool_id, .pg_num = pg_num }];
pg = (pg_t){
.state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING,
.scheme = pool_item.second.scheme,
.pg_cursize = 0,
.pg_size = pool_item.second.pg_size,
.pg_minsize = pool_item.second.pg_minsize,
.pool_id = pool_id,
.pg_num = pg_num,
.reported_epoch = pg_cfg.epoch,
.target_history = pg_cfg.target_history,
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
.target_set = pg_cfg.target_set,
};
this->pg_state_dirty.insert({ .pool_id = pool_id, .pg_num = pg_num });
pg.print_state();
if (pg_cfg.cur_primary == this->osd_num)
{
// Add peers
for (auto pg_osd: all_peers)
{
if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end())
{
c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
start_pg_peering(pg);
}
else
{
throw std::runtime_error("Unexpected PG "+std::to_string(pg_num)+" state: "+std::to_string(this->pgs[pg_num].state));
// Reapply change after locking the PG
all_applied = false;
}
}
this->pgs[pg_num] = (pg_t){
.state = pg_cfg.cur_primary == this->osd_num ? PG_PEERING : PG_STARTING,
.pg_cursize = 0,
.pg_num = pg_num,
.reported_epoch = pg_cfg.epoch,
.target_history = pg_cfg.target_history,
.all_peers = std::vector<osd_num_t>(all_peers.begin(), all_peers.end()),
.target_set = pg_cfg.target_set,
};
this->pg_state_dirty.insert(pg_num);
this->pgs[pg_num].print_state();
if (pg_cfg.cur_primary == this->osd_num)
{
// Add peers
for (auto pg_osd: all_peers)
{
if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end())
{
c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]);
}
}
start_pg_peering(pg_num);
}
else
{
// Reapply change after locking the PG
all_applied = false;
}
}
}
report_pg_states();
@ -601,7 +617,7 @@ void osd_t::report_pg_states()
{
return;
}
std::vector<std::pair<pg_num_t,bool>> reporting_pgs;
std::vector<std::pair<pool_pg_num_t,bool>> reporting_pgs;
json11::Json::array checks;
json11::Json::array success;
json11::Json::array failure;
@ -613,8 +629,8 @@ void osd_t::report_pg_states()
continue;
}
auto & pg = pg_it->second;
reporting_pgs.push_back({ pg.pg_num, pg.history_changed });
std::string state_key_base64 = base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num));
reporting_pgs.push_back({ *it, pg.history_changed });
std::string state_key_base64 = base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num));
if (pg.state == PG_STARTING)
{
// Check that the PG key does not exist
@ -656,7 +672,7 @@ void osd_t::report_pg_states()
}
success.push_back(json11::Json::object {
{ "request_put", json11::Json::object {
{ "key", base64_encode(st_cli.etcd_prefix+"/pg/state/"+std::to_string(pg.pg_num)) },
{ "key", state_key_base64 },
{ "value", base64_encode(json11::Json(json11::Json::object {
{ "primary", this->osd_num },
{ "state", pg_state_keywords },
@ -669,7 +685,7 @@ void osd_t::report_pg_states()
{
// Prevent race conditions (for the case when the monitor is updating this key at the same time)
pg.history_changed = false;
std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pg_num));
std::string history_key = base64_encode(st_cli.etcd_prefix+"/pg/history/"+std::to_string(pg.pool_id)+"/"+std::to_string(pg.pg_num));
json11::Json::object history_value = {
{ "epoch", pg.epoch },
{ "all_peers", pg.all_peers },
@ -724,14 +740,20 @@ void osd_t::report_pg_states()
auto kv = st_cli.parse_etcd_kv(res["kvs"][0]);
if (kv.key.substr(st_cli.etcd_prefix.length()+10) == st_cli.etcd_prefix+"/pg/state/")
{
pg_num_t pg_num = stoull_full(kv.key.substr(st_cli.etcd_prefix.length()+10));
auto pg_it = pgs.find(pg_num);
if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING)
pool_id_t pool_id = 0;
pg_num_t pg_num = 0;
char null_byte = 0;
sscanf(kv.key.c_str() + st_cli.etcd_prefix.length()+10, "%u/%u%c", &pool_id, &pg_num, &null_byte);
if (null_byte == 0)
{
// Live PG state update failed
printf("Failed to report state of PG %u which is live. Race condition detected, exiting\n", pg_num);
force_stop(1);
return;
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
if (pg_it != pgs.end() && pg_it->second.state != PG_OFFLINE && pg_it->second.state != PG_STARTING)
{
// Live PG state update failed
printf("Failed to report state of pool %u PG %u which is live. Race condition detected, exiting\n", pool_id, pg_num);
force_stop(1);
return;
}
}
}
}

View File

@ -2,9 +2,8 @@
#define FLUSH_BATCH 512
void osd_t::submit_pg_flush_ops(pg_num_t pg_num)
void osd_t::submit_pg_flush_ops(pg_t & pg)
{
pg_t & pg = pgs[pg_num];
pg_flush_batch_t *fb = new pg_flush_batch_t();
pg.flush_batch = fb;
auto it = pg.flush_actions.begin(), prev_it = pg.flush_actions.begin();
@ -45,7 +44,7 @@ void osd_t::submit_pg_flush_ops(pg_num_t pg_num)
if (l.second.size() > 0)
{
fb->flush_ops++;
submit_flush_op(pg.pg_num, fb, true, l.first, l.second.size(), l.second.data());
submit_flush_op(pg.pool_id, pg.pg_num, fb, true, l.first, l.second.size(), l.second.data());
}
}
for (auto & l: fb->stable_lists)
@ -53,14 +52,15 @@ void osd_t::submit_pg_flush_ops(pg_num_t pg_num)
if (l.second.size() > 0)
{
fb->flush_ops++;
submit_flush_op(pg.pg_num, fb, false, l.first, l.second.size(), l.second.data());
submit_flush_op(pg.pool_id, pg.pg_num, fb, false, l.first, l.second.size(), l.second.data());
}
}
}
void osd_t::handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval)
void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t peer_osd, int retval)
{
if (pgs.find(pg_num) == pgs.end() || pgs[pg_num].flush_batch != fb)
pool_pg_num_t pg_id = { .pool_id = pool_id, .pg_num = pg_num };
if (pgs.find(pg_id) == pgs.end() || pgs[pg_id].flush_batch != fb)
{
// Throw the result away
return;
@ -92,7 +92,7 @@ void osd_t::handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb
{
// This flush batch is done
std::vector<osd_op_t*> continue_ops;
auto & pg = pgs[pg_num];
auto & pg = pgs[pg_id];
auto it = pg.flush_actions.begin(), prev_it = it;
auto erase_start = it;
while (1)
@ -153,7 +153,7 @@ void osd_t::handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb
}
}
void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t peer_osd, int count, obj_ver_id *data)
{
osd_op_t *op = new osd_op_t();
// Copy buffer so it gets freed along with the operation
@ -165,10 +165,10 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback
clock_gettime(CLOCK_REALTIME, &op->tv_begin);
op->bs_op = new blockstore_op_t({
.opcode = (uint64_t)(rollback ? BS_OP_ROLLBACK : BS_OP_STABLE),
.callback = [this, op, pg_num, fb](blockstore_op_t *bs_op)
.callback = [this, op, pool_id, pg_num, fb](blockstore_op_t *bs_op)
{
add_bs_subop_stats(op);
handle_flush_op(bs_op->opcode == BS_OP_ROLLBACK, pg_num, fb, this->osd_num, bs_op->retval);
handle_flush_op(bs_op->opcode == BS_OP_ROLLBACK, pool_id, pg_num, fb, this->osd_num, bs_op->retval);
delete op->bs_op;
op->bs_op = NULL;
delete op;
@ -195,9 +195,9 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback
.len = count * sizeof(obj_ver_id),
},
};
op->callback = [this, pg_num, fb, peer_osd](osd_op_t *op)
op->callback = [this, pool_id, pg_num, fb, peer_osd](osd_op_t *op)
{
handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pg_num, fb, peer_osd, op->reply.hdr.retval);
handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval);
delete op;
};
c_cli.outbox_push(op);
@ -215,7 +215,6 @@ bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
{
op.degraded = true;
op.pg_num = pg_it->first;
op.oid = obj_it->first;
return true;
}
@ -231,7 +230,6 @@ bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
{
op.degraded = false;
op.pg_num = pg_it->first;
op.oid = obj_it->first;
return true;
}

View File

@ -1,4 +1,24 @@
#pragma once
#define POOL_SCHEME_REPLICATED 1
#define POOL_SCHEME_XOR 2
#define POOL_ID_MAX 0x10000
#define POOL_ID_BITS 16
#define INODE_POOL(inode) (pool_id_t)((inode) >> (64 - POOL_ID_BITS))
// Pool ID is 16 bits long
typedef uint32_t pool_id_t;
typedef uint64_t osd_num_t;
typedef uint32_t pg_num_t;
struct pool_pg_num_t
{
pool_id_t pool_id;
pg_num_t pg_num;
};
inline bool operator < (const pool_pg_num_t & a, const pool_pg_num_t & b)
{
return a.pool_id < b.pool_id || a.pool_id == b.pool_id && a.pg_num < b.pg_num;
}

View File

@ -50,7 +50,7 @@ void osd_t::handle_peers()
{
if (!p.second.flush_batch)
{
submit_pg_flush_ops(p.first);
submit_pg_flush_ops(p.second);
}
still = true;
}
@ -89,25 +89,18 @@ void osd_t::repeer_pgs(osd_num_t peer_osd)
{
// Repeer this pg
printf("[PG %u] Repeer because of OSD %lu\n", p.second.pg_num, peer_osd);
start_pg_peering(p.second.pg_num);
start_pg_peering(p.second);
}
}
}
}
// Repeer on each connect/disconnect peer event
void osd_t::start_pg_peering(pg_num_t pg_num)
void osd_t::start_pg_peering(pg_t & pg)
{
auto & pg = pgs[pg_num];
pg.state = PG_PEERING;
this->peering_state |= OSD_PEERING_PGS;
report_pg_state(pg);
if (parsed_cfg.target_set.size() != 3)
{
printf("Bad PG %u config format: incorrect osd_set = %s\n", pg_num, pg_item.second["osd_set"].dump().c_str());
parsed_cfg.target_set.resize(3);
parsed_cfg.pause = true;
}
// Reset PG state
pg.cur_peers.clear();
pg.state_dict.clear();
@ -132,20 +125,19 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); )
{
// Forget this PG's unstable writes
pg_num_t n = (it->first.oid.inode + it->first.oid.stripe / pg_stripe_size) % pg_count + 1;
if (n == pg.pg_num)
if (INODE_POOL(it->first.oid.inode) == pg.pool_id && map_to_pg(it->first.oid) == pg.pg_num)
unstable_writes.erase(it++);
else
it++;
}
dirty_pgs.erase(pg.pg_num);
dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
// Drop connections of clients who have this PG in dirty_pgs
if (immediate_commit != IMMEDIATE_ALL)
{
std::vector<int> to_stop;
for (auto & cp: c_cli.clients)
{
if (cp.second.dirty_pgs.find(pg_num) != cp.second.dirty_pgs.end())
if (cp.second.dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second.dirty_pgs.end())
{
to_stop.push_back(cp.first);
}
@ -351,7 +343,9 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
op->bs_op = new blockstore_op_t();
op->bs_op->opcode = BS_OP_LIST;
op->bs_op->oid.stripe = pg_stripe_size;
op->bs_op->len = pg_count;
op->bs_op->oid.inode = ((uint64_t)ps->pool_id << (64 - POOL_ID_BITS));
op->bs_op->version = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1;
op->bs_op->len = pg_counts[ps->pool_id];
op->bs_op->offset = ps->pg_num-1;
op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op)
{
@ -391,8 +385,10 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps)
.opcode = OSD_OP_SEC_LIST,
},
.list_pg = ps->pg_num,
.pg_count = pg_count,
.pg_count = pg_counts[ps->pool_id],
.pg_stripe_size = pg_stripe_size,
.min_inode = ((uint64_t)(ps->pool_id) << (64 - POOL_ID_BITS)),
.max_inode = ((uint64_t)(ps->pool_id+1) << (64 - POOL_ID_BITS)) - 1,
},
};
op->callback = [this, ps, role_osd](osd_op_t *op)
@ -448,14 +444,8 @@ void osd_t::discard_list_subop(osd_op_t *list_op)
}
}
bool osd_t::stop_pg(pg_num_t pg_num)
bool osd_t::stop_pg(pg_t & pg)
{
auto pg_it = pgs.find(pg_num);
if (pg_it == pgs.end())
{
return false;
}
auto & pg = pg_it->second;
if (pg.peering_state)
{
// Stop peering
@ -498,7 +488,7 @@ void osd_t::finish_stop_pg(pg_t & pg)
void osd_t::report_pg_state(pg_t & pg)
{
pg.print_state();
this->pg_state_dirty.insert(pg.pg_num);
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
if (pg.state == PG_ACTIVE && (pg.target_history.size() > 0 || pg.all_peers.size() > pg.target_set.size()))
{
// Clear history of active+clean PGs

View File

@ -44,6 +44,7 @@ struct pg_peering_state_t
// osd_num -> list result
std::unordered_map<osd_num_t, osd_op_t*> list_ops;
std::unordered_map<osd_num_t, pg_list_result_t> list_results;
pool_id_t pool_id = 0;
pg_num_t pg_num = 0;
};
@ -71,8 +72,10 @@ struct pg_flush_batch_t
struct pg_t
{
int state = 0;
uint64_t pg_cursize = 3, pg_size = 3, pg_minsize = 2;
pg_num_t pg_num;
uint64_t scheme = 0;
uint64_t pg_cursize = 0, pg_size = 0, pg_minsize = 0;
pool_id_t pool_id = 0;
pg_num_t pg_num = 0;
uint64_t clean_count = 0, total_count = 0;
// epoch number - should increase with each non-clean activation of the PG
uint64_t epoch = 0, reported_epoch = 0;

View File

@ -20,8 +20,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
// oid.stripe = starting offset of the parity stripe
.stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
};
pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pg_stripe_size) % pg_count + 1;
auto pg_it = pgs.find(pg_num);
pool_id_t pool_id = INODE_POOL(oid.inode);
pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pg_stripe_size) % pg_counts[pool_id] + 1;
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
{
// This OSD is not primary for this PG or the PG is inactive
@ -86,7 +87,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
{
auto & pg = pgs[op_data->pg_num];
auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
for (int role = 0; role < pg.pg_minsize; role++)
{
op_data->stripes[role].read_start = op_data->stripes[role].req_start;
@ -190,7 +191,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
return;
}
osd_primary_op_data_t *op_data = cur_op->op_data;
auto & pg = pgs[op_data->pg_num];
auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;
@ -246,7 +247,7 @@ resume_3:
{
// Report newer epoch before writing
// FIXME: We may report only one PG state here...
this->pg_state_dirty.insert(pg.pg_num);
this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
pg.history_changed = true;
report_pg_states();
resume_10:
@ -399,8 +400,8 @@ resume_7:
}
// Remember PG as dirty to drop the connection when PG goes offline
// (this is required because of the "lazy sync")
c_cli.clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num);
dirty_pgs.insert(op_data->pg_num);
c_cli.clients[cur_op->peer_fd].dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}
return true;
}
@ -445,7 +446,7 @@ resume_2:
{
op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
op_data->dirty_pgs = new pg_num_t[dirty_pgs.size()];
op_data->dirty_pgs = new pool_pg_num_t[dirty_pgs.size()];
op_data->dirty_pg_count = dirty_pgs.size();
osd_num_t last_osd = 0;
int last_start = 0, last_end = 0;
@ -515,7 +516,7 @@ resume_6:
{
// Except those from peered PGs
auto & w = op_data->unstable_writes[i];
pg_num_t wpg = map_to_pg(w.oid);
pool_pg_num_t wpg = { .pool_id = INODE_POOL(w.oid.inode), .pg_num = map_to_pg(w.oid) };
if (pgs[wpg].state & PG_ACTIVE)
{
uint64_t & dest = this->unstable_writes[(osd_object_id_t){
@ -621,7 +622,7 @@ void osd_t::continue_primary_del(osd_op_t *cur_op)
return;
}
osd_primary_op_data_t *op_data = cur_op->op_data;
auto & pg = pgs[op_data->pg_num];
auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
else if (op_data->st == 3) goto resume_3;

View File

@ -29,7 +29,7 @@ struct osd_primary_op_data_t
// for sync. oops, requires freeing
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
pg_num_t *dirty_pgs = NULL;
pool_pg_num_t *dirty_pgs = NULL;
int dirty_pg_count = 0;
obj_ver_id *unstable_writes = NULL;
};

View File

@ -37,7 +37,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
{
if (cur_op->op_data->pg_num > 0)
{
auto & pg = pgs[cur_op->op_data->pg_num];
auto & pg = pgs[{ .pool_id = INODE_POOL(cur_op->op_data->oid.inode), .pg_num = cur_op->op_data->pg_num }];
pg.inflight--;
assert(pg.inflight >= 0);
if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch)