diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 4ff3ac52..a3bc7696 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -382,30 +382,6 @@ void osd_t::on_change_etcd_state_hook(std::map & changes } } -void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num) -{ - auto pg_it = pgs.find({ - .pool_id = pool_id, - .pg_num = pg_num, - }); - if (pg_it != pgs.end() && pg_it->second.epoch > pg_it->second.reported_epoch && - st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg_it->second.epoch) - { - pg_it->second.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch; - object_id oid = { 0 }; - bool first = true; - for (auto op: pg_it->second.write_queue) - { - if (first || oid != op.first) - { - oid = op.first; - first = false; - continue_primary_write(op.second); - } - } - } -} - void osd_t::on_load_config_hook(json11::Json::object & global_config) { json11::Json::object osd_config = this->config; diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index 60870458..c8016ae4 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -174,7 +174,8 @@ resume_3: resume_10: if (pg.epoch > pg.reported_epoch) { - op_data->st = 10; +#define PG_EPOCH_WAIT_STATE 10 + op_data->st = PG_EPOCH_WAIT_STATE; return; } } @@ -305,6 +306,50 @@ continue_others: } } +void osd_t::on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num) +{ + auto pg_it = pgs.find({ + .pool_id = pool_id, + .pg_num = pg_num, + }); + if (pg_it == pgs.end()) + { + return; + } + auto & pg = pg_it->second; + if (pg.epoch > pg.reported_epoch && + st_cli.pool_config[pool_id].pg_config[pg_num].epoch >= pg.epoch) + { + pg.reported_epoch = st_cli.pool_config[pool_id].pg_config[pg_num].epoch; + std::vector resume_oids; + for (auto & op: pg.write_queue) + { + if (op.second->op_data->st == PG_EPOCH_WAIT_STATE) + { + // Run separately to prevent side effects + resume_oids.push_back(op.first); + } + } + for (auto & oid: resume_oids) + { + auto pg_it = pgs.find({ + .pool_id = pool_id, + .pg_num = pg_num, + }); + if (pg_it != pgs.end()) + { + auto & pg = pg_it->second; + auto op_it = pg.write_queue.find(oid); + if (op_it != pg.write_queue.end() && + op_it->second->op_data->st == PG_EPOCH_WAIT_STATE) + { + continue_primary_write(op_it->second); + } + } + } + } +} + bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state) { osd_primary_op_data_t *op_data = cur_op->op_data;