Implement parallel recovery

trace-sqes
Vitaliy Filippov 2020-04-04 02:18:29 +03:00
parent dfb6e15eaa
commit 6212195440
4 changed files with 154 additions and 105 deletions

View File

@ -103,6 +103,9 @@ void osd_t::parse_config(blockstore_config_t & config)
autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10); autosync_interval = strtoull(config["autosync_interval"].c_str(), NULL, 10);
if (autosync_interval < 0 || autosync_interval > MAX_AUTOSYNC_INTERVAL) if (autosync_interval < 0 || autosync_interval > MAX_AUTOSYNC_INTERVAL)
autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; autosync_interval = DEFAULT_AUTOSYNC_INTERVAL;
recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10);
if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE)
recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
} }
void osd_t::bind_socket() void osd_t::bind_socket()

13
osd.h
View File

@ -44,6 +44,8 @@
#define MAX_AUTOSYNC_INTERVAL 3600 #define MAX_AUTOSYNC_INTERVAL 3600
#define DEFAULT_AUTOSYNC_INTERVAL 5 #define DEFAULT_AUTOSYNC_INTERVAL 5
#define MAX_RECOVERY_QUEUE 2048
#define DEFAULT_RECOVERY_QUEUE 4
//#define OSD_STUB //#define OSD_STUB
@ -172,12 +174,13 @@ struct osd_object_id_t
object_id oid; object_id oid;
}; };
struct osd_recovery_state_t struct osd_recovery_op_t
{ {
int st = 0; int st = 0;
bool degraded = false;
pg_num_t pg_num = 0; pg_num_t pg_num = 0;
object_id oid = { 0 }; object_id oid = { 0 };
osd_op_t *op = NULL; osd_op_t *osd_op = NULL;
}; };
class osd_t class osd_t
@ -195,6 +198,7 @@ class osd_t
int receive_buffer_size = 9000; int receive_buffer_size = 9000;
int immediate_commit = IMMEDIATE_NONE; int immediate_commit = IMMEDIATE_NONE;
int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds
int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE;
// peer OSDs // peer OSDs
@ -205,7 +209,7 @@ class osd_t
int peering_state = 0; int peering_state = 0;
unsigned pg_count = 0; unsigned pg_count = 0;
uint64_t next_subop_id = 1; uint64_t next_subop_id = 1;
osd_recovery_state_t recovery_state; std::map<object_id, osd_recovery_op_t> recovery_ops;
osd_op_t *autosync_op = NULL; osd_op_t *autosync_op = NULL;
// Unstable writes // Unstable writes
@ -273,7 +277,10 @@ class osd_t
void submit_pg_flush_ops(pg_num_t pg_num); void submit_pg_flush_ops(pg_num_t pg_num);
void handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok); void handle_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, osd_num_t osd_num, bool ok);
void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data); void submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback, osd_num_t osd_num, int count, obj_ver_id *data);
bool pick_next_recovery(osd_recovery_op_t &op);
void submit_recovery_op(osd_recovery_op_t *op);
bool continue_recovery(); bool continue_recovery();
pg_osd_set_state_t* change_osd_set(pg_osd_set_state_t *st, pg_t *pg);
// op execution // op execution
void exec_op(osd_op_t *cur_op); void exec_op(osd_op_t *cur_op);

View File

@ -186,139 +186,177 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback
} }
} }
// Just trigger write requests for degraded objects. They'll be recovered during writing bool osd_t::pick_next_recovery(osd_recovery_op_t &op)
bool osd_t::continue_recovery()
{ {
pg_t *pg = NULL; for (auto pg_it = pgs.begin(); pg_it != pgs.end(); pg_it++)
if (recovery_state.st == 0) goto resume_0;
else if (recovery_state.st == 1) goto resume_1;
else if (recovery_state.st == 2) goto resume_2;
else if (recovery_state.st == 3) goto resume_3;
else if (recovery_state.st == 4) goto resume_4;
resume_0:
for (auto p: pgs)
{ {
if (p.second.state & PG_HAS_DEGRADED) if ((pg_it->second.state & (PG_ACTIVE | PG_HAS_DEGRADED)) == (PG_ACTIVE | PG_HAS_DEGRADED))
{ {
recovery_state.pg_num = p.first; for (auto obj_it = pg_it->second.degraded_objects.begin(); obj_it != pg_it->second.degraded_objects.end(); obj_it++)
goto resume_1; {
if (recovery_ops.find(obj_it->first) == recovery_ops.end())
{
op.degraded = true;
op.pg_num = pg_it->first;
op.oid = obj_it->first;
return true;
}
}
} }
} }
recovery_state.st = 0;
return false; return false;
resume_1: }
pg = &pgs[recovery_state.pg_num];
if (!pg->degraded_objects.size()) void osd_t::submit_recovery_op(osd_recovery_op_t *op)
{ {
pg->state = pg->state & ~PG_HAS_DEGRADED; op->osd_op = new osd_op_t();
pg->print_state(); op->osd_op->op_type = OSD_OP_OUT;
goto resume_0; op->osd_op->req = {
}
recovery_state.oid = pg->degraded_objects.begin()->first;
recovery_state.op = new osd_op_t();
recovery_state.op->op_type = OSD_OP_OUT;
recovery_state.op->req = {
.rw = { .rw = {
.header = { .header = {
.magic = SECONDARY_OSD_OP_MAGIC, .magic = SECONDARY_OSD_OP_MAGIC,
.id = 1, .id = 1,
.opcode = OSD_OP_WRITE, .opcode = OSD_OP_WRITE,
}, },
.inode = recovery_state.oid.inode, .inode = op->oid.inode,
.offset = recovery_state.oid.stripe, .offset = op->oid.stripe,
.len = 0, .len = 0,
}, },
}; };
recovery_state.op->callback = [this](osd_op_t *op) op->osd_op->callback = [this, op](osd_op_t *osd_op)
{ {
if (op->reply.hdr.retval < 0) // Don't sync the write, it will be synced by our regular sync coroutine
recovery_state.st += 1; // error if (osd_op->reply.hdr.retval < 0)
{
// Error recovering object
if (osd_op->reply.hdr.retval == -EPIPE)
{
// PG is stopped or one of the OSDs is gone, error is harmless
}
else
{
throw std::runtime_error("Failed to recover an object");
}
}
else else
recovery_state.st += 2; // ok {
pg_t *pg = &pgs[op->pg_num];
pg_osd_set_state_t *st;
if (op->degraded)
{
auto st_it = pg->degraded_objects.find(op->oid);
st = st_it->second;
pg->degraded_objects.erase(st_it);
degraded_objects--;
if (!pg->degraded_objects.size())
{
pg->state = pg->state & ~PG_HAS_DEGRADED;
pg->print_state();
}
}
else
{
auto st_it = pg->misplaced_objects.find(op->oid);
st = st_it->second;
pg->misplaced_objects.erase(st_it);
misplaced_objects--;
if (!pg->misplaced_objects.size())
{
pg->state = pg->state & ~PG_HAS_MISPLACED;
pg->print_state();
}
}
if (st->state == OBJ_DEGRADED)
{
pg->clean_count++;
}
else
{
assert(st->state == (OBJ_DEGRADED|OBJ_MISPLACED));
pg->misplaced_objects[op->oid] = change_osd_set(st, pg);
}
st->object_count--;
if (!st->object_count)
{
pg->state_dict.erase(st->osd_set);
}
}
recovery_ops.erase(op->oid);
delete osd_op;
op->osd_op = NULL;
continue_recovery(); continue_recovery();
}; };
exec_op(recovery_state.op); exec_op(op->osd_op);
recovery_state.st = 2; }
resume_2:
return true; // Just trigger write requests for degraded objects. They'll be recovered during writing
resume_3: bool osd_t::continue_recovery()
// FIXME handle error {
throw std::runtime_error("failed to recover an object"); while (recovery_ops.size() < recovery_queue_depth)
resume_4:
delete recovery_state.op;
recovery_state.op = NULL;
// Don't sync the write, it will be synced by our regular sync coroutine
pg = &pgs[recovery_state.pg_num];
pg_osd_set_state_t *st;
{ {
auto st_it = pg->degraded_objects.find(recovery_state.oid); osd_recovery_op_t op;
st = st_it->second; if (pick_next_recovery(op))
pg->degraded_objects.erase(st_it);
degraded_objects--;
}
st->object_count--;
if (st->state == OBJ_DEGRADED)
{
pg->clean_count++;
}
else
{
assert(st->state == (OBJ_DEGRADED|OBJ_MISPLACED));
pg_osd_set_state_t *new_st;
pg_osd_set_t new_set(st->osd_set);
for (uint64_t role = 0; role < pg->pg_size; role++)
{ {
if (pg->cur_set[role] != 0) recovery_ops[op.oid] = op;
submit_recovery_op(&recovery_ops[op.oid]);
}
else
return false;
}
return true;
}
// This is likely not needed at all, because we'll always recover objects to the clean state
pg_osd_set_state_t* osd_t::change_osd_set(pg_osd_set_state_t *st, pg_t *pg)
{
pg_osd_set_state_t *new_st;
pg_osd_set_t new_set(st->osd_set);
for (uint64_t role = 0; role < pg->pg_size; role++)
{
if (pg->cur_set[role] != 0)
{
// Maintain order (outdated -> role -> osd_num)
int added = 0;
for (int j = 0; j < new_set.size(); j++)
{ {
// Maintain order (outdated -> role -> osd_num) if (new_set[j].role == role && new_set[j].osd_num == pg->cur_set[role])
int added = 0;
for (int j = 0; j < new_set.size(); j++)
{ {
if (new_set[j].role == role && new_set[j].osd_num == pg->cur_set[role]) if (new_set[j].outdated)
{ {
if (new_set[j].outdated) if (!added)
new_set[j].outdated = false;
else
{ {
if (!added) new_set.erase(new_set.begin()+j);
new_set[j].outdated = false; j--;
else
{
new_set.erase(new_set.begin()+j);
j--;
}
} }
break;
}
else if (!added && (new_set[j].outdated || new_set[j].role > role ||
new_set[j].role == role && new_set[j].osd_num > pg->cur_set[role]))
{
new_set.insert(new_set.begin()+j, (pg_obj_loc_t){
.role = role,
.osd_num = pg->cur_set[role],
.outdated = false,
});
added = 1;
} }
break;
}
else if (!added && (new_set[j].outdated || new_set[j].role > role ||
new_set[j].role == role && new_set[j].osd_num > pg->cur_set[role]))
{
new_set.insert(new_set.begin()+j, (pg_obj_loc_t){
.role = role,
.osd_num = pg->cur_set[role],
.outdated = false,
});
added = 1;
} }
} }
} }
auto st_it = pg->state_dict.find(new_set);
if (st_it != pg->state_dict.end())
{
st_it = pg->state_dict.emplace(new_set, (pg_osd_set_state_t){
.read_target = pg->cur_set,
.osd_set = new_set,
.state = OBJ_MISPLACED,
.object_count = 0,
}).first;
}
new_st = &st_it->second;
new_st->object_count++;
pg->misplaced_objects[recovery_state.oid] = new_st;
} }
if (!st->object_count) auto st_it = pg->state_dict.find(new_set);
if (st_it != pg->state_dict.end())
{ {
pg->state_dict.erase(st->osd_set); st_it = pg->state_dict.emplace(new_set, (pg_osd_set_state_t){
.read_target = pg->cur_set,
.osd_set = new_set,
.state = OBJ_MISPLACED,
.object_count = 0,
}).first;
} }
recovery_state.st = 0; new_st = &st_it->second;
goto resume_0; new_st->object_count++;
return new_st;
} }

View File

@ -185,6 +185,7 @@ void osd_t::handle_peers()
p.second.calc_object_states(); p.second.calc_object_states();
incomplete_objects += p.second.incomplete_objects.size(); incomplete_objects += p.second.incomplete_objects.size();
misplaced_objects += p.second.misplaced_objects.size(); misplaced_objects += p.second.misplaced_objects.size();
// FIXME: degraded objects may currently include misplaced, too! Report them separately?
degraded_objects += p.second.degraded_objects.size(); degraded_objects += p.second.degraded_objects.size();
if (p.second.state & PG_HAS_UNCLEAN) if (p.second.state & PG_HAS_UNCLEAN)
peering_state = peering_state | OSD_FLUSHING_PGS; peering_state = peering_state | OSD_FLUSHING_PGS;