Fix stripe reconstruction in recovery, only write modified object parts

trace-sqes
Vitaliy Filippov 2020-03-28 13:51:30 +03:00
parent c0a22d825d
commit 1b30120918
3 changed files with 82 additions and 41 deletions

View File

@ -32,6 +32,7 @@ struct osd_primary_op_data_t
osd_rmw_stripe_t *stripes;
osd_op_t *subops = NULL;
void *recovery_buf = NULL;
uint64_t *prev_set = NULL;
// for sync. oops, requires freeing
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
obj_ver_id *unstable_writes = NULL;
@ -396,49 +397,82 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
}
pg.write_queue.emplace(op_data->oid, cur_op);
}
if (pg.state != PG_ACTIVE)
if (pg.state & PG_HAS_DEGRADED)
{
// If the object is degraded, read and write the whole object
auto st_it = pg.degraded_objects.find(op_data->oid);
if (st_it != pg.degraded_objects.end())
op_data->prev_set = NULL;
{
uint64_t* cur_set = st_it->second->read_target.data();
for (int i = 0; i < pg.pg_minsize; i++)
auto st_it = pg.degraded_objects.find(op_data->oid);
if (st_it != pg.degraded_objects.end())
op_data->prev_set = st_it->second->read_target.data();
}
if (op_data->prev_set != NULL)
{
// Read the whole object
op_data->recovery_buf = memalign(512, pg.pg_size * bs_block_size);
for (int i = 0; i < pg.pg_size; i++)
{
op_data->stripes[i] = {
.req_start = 0,
.req_end = bs_block_size,
.read_start = 0,
.read_end = bs_block_size,
};
op_data->stripes[i].read_buf = op_data->recovery_buf + i*bs_block_size;
op_data->stripes[i].read_start = 0;
op_data->stripes[i].read_end = bs_block_size;
op_data->stripes[i].missing = op_data->prev_set[i] == 0;
}
assert(extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) >= 0);
op_data->degraded = 1;
op_data->recovery_buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
submit_primary_subops(SUBMIT_READ, pg.pg_size, cur_set, cur_op);
submit_primary_subops(SUBMIT_READ, pg.pg_size, op_data->prev_set, cur_op);
resume_8:
op_data->st = 8;
return;
resume_9:
reconstruct_stripes(op_data->stripes, pg.pg_size);
if (cur_op->req.rw.len > 0)
{
memcpy(
op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe,
cur_op->buf, cur_op->req.rw.len
);
// Write modified parts
uint32_t start = 0, end = 0;
for (int role = 0; role < pg.pg_minsize; role++)
{
if (op_data->stripes[role].req_end != 0)
{
start = !end || op_data->stripes[role].req_start < start ? op_data->stripes[role].req_start : start;
end = std::max(op_data->stripes[role].req_end, end);
op_data->stripes[role].write_start = op_data->stripes[role].req_start;
op_data->stripes[role].write_end = op_data->stripes[role].req_end;
op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size + op_data->stripes[role].write_start;
}
}
for (int role = pg.pg_minsize; role < pg.pg_size; role++)
{
op_data->stripes[role].write_start = start;
op_data->stripes[role].write_end = end;
op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size + op_data->stripes[role].write_start;
}
}
if (cur_op->buf)
{
free(cur_op->buf);
}
cur_op->buf = op_data->recovery_buf;
op_data->recovery_buf = NULL;
// Also write recovered parts
uint64_t *cur_set = pg.cur_set.data();
for (int role = 0; role < pg.pg_size; role++)
{
if (cur_set[role] != op_data->prev_set[role])
{
op_data->stripes[role].write_start = 0;
op_data->stripes[role].write_end = bs_block_size;
op_data->stripes[role].write_buf = op_data->recovery_buf + role*bs_block_size;
}
}
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Send writes
submit_primary_subops(SUBMIT_WRITE, pg.pg_size, cur_set, cur_op);
op_data->st = 4;
return;
}
}
goto resume_1;
resume_8:
return;
resume_9:
for (int i = 0; i < pg.pg_minsize; i++)
{
op_data->stripes[i].read_start = 0;
op_data->stripes[i].read_end = 0;
}
memcpy(
op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe,
cur_op->buf, cur_op->req.rw.len
);
free(cur_op->buf);
cur_op->buf = op_data->recovery_buf;
op_data->recovery_buf = NULL;
// Determine blocks to write, bypass RMW_READ
cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize);
goto resume_3;
resume_1:
// Determine blocks to read and write
cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize);
@ -450,7 +484,7 @@ resume_2:
resume_3:
// Save version override for parallel reads
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Calculate parity
// Recover missing stripes, calculate parity
calc_rmw_parity(op_data->stripes, pg.pg_size);
// Send writes
submit_primary_subops(SUBMIT_WRITE, pg.pg_size, pg.cur_set.data(), cur_op);

View File

@ -321,13 +321,8 @@ static void xor_multiple_buffers(buf_len_t *xor1, int n1, buf_len_t *xor2, int n
}
}
void calc_rmw_parity(osd_rmw_stripe_t *stripes, int pg_size)
void reconstruct_stripes(osd_rmw_stripe_t *stripes, int pg_size)
{
if (stripes[pg_size-1].missing)
{
// Parity OSD is unavailable
return;
}
for (int role = 0; role < pg_size; role++)
{
if (stripes[role].read_end != 0 && stripes[role].missing)
@ -337,6 +332,16 @@ void calc_rmw_parity(osd_rmw_stripe_t *stripes, int pg_size)
break;
}
}
}
void calc_rmw_parity(osd_rmw_stripe_t *stripes, int pg_size)
{
if (stripes[pg_size-1].missing)
{
// Parity OSD is unavailable
return;
}
reconstruct_stripes(stripes, pg_size);
// Calculate new parity (EC k+1)
int parity = pg_size-1, prev = -2;
auto wr_end = stripes[parity].write_end;

View File

@ -25,6 +25,8 @@ struct osd_rmw_stripe_t
void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint32_t start, uint32_t len, osd_rmw_stripe_t *stripes);
void reconstruct_stripes(osd_rmw_stripe_t *stripes, int pg_size);
void reconstruct_stripe(osd_rmw_stripe_t *stripes, int pg_size, int role);
int extend_missing_stripes(osd_rmw_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size);