|
|
@ -1,5 +1,5 @@ |
|
|
|
#include "osd.h" |
|
|
|
#include "xor.h" |
|
|
|
#include "osd_rmw.h" |
|
|
|
|
|
|
|
// read: read directly or read paired stripe(s), reconstruct, return
|
|
|
|
// write: read paired stripe(s), modify, write
|
|
|
@ -10,19 +10,7 @@ |
|
|
|
//
|
|
|
|
// sync: sync peers, get unstable versions from somewhere, stabilize them
|
|
|
|
|
|
|
|
struct off_len_t |
|
|
|
{ |
|
|
|
uint64_t offset, len; |
|
|
|
}; |
|
|
|
|
|
|
|
struct osd_read_stripe_t |
|
|
|
{ |
|
|
|
uint64_t pos; |
|
|
|
uint32_t start, end; |
|
|
|
uint32_t real_start, real_end; |
|
|
|
}; |
|
|
|
|
|
|
|
struct osd_primary_read_t |
|
|
|
struct osd_primary_op_data_t |
|
|
|
{ |
|
|
|
pg_num_t pg_num; |
|
|
|
object_id oid; |
|
|
@ -31,6 +19,9 @@ struct osd_primary_read_t |
|
|
|
int degraded = 0, pg_size, pg_minsize; |
|
|
|
osd_read_stripe_t *stripes; |
|
|
|
osd_op_t *subops = NULL; |
|
|
|
void *rmw_buf = NULL; |
|
|
|
|
|
|
|
bool should_read_version = false; |
|
|
|
}; |
|
|
|
|
|
|
|
void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) |
|
|
@ -43,35 +34,19 @@ void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) |
|
|
|
outbox_push(this->clients[cur_op->peer_fd], cur_op); |
|
|
|
} |
|
|
|
|
|
|
|
inline void split_stripes(uint64_t pg_minsize, uint32_t bs_block_size, uint64_t start, uint64_t end, osd_read_stripe_t *stripes) |
|
|
|
{ |
|
|
|
for (int role = 0; role < pg_minsize; role++) |
|
|
|
{ |
|
|
|
if (start < (1+role)*bs_block_size && end > role*bs_block_size) |
|
|
|
{ |
|
|
|
stripes[role].real_start = stripes[role].start |
|
|
|
= start < role*bs_block_size ? 0 : start-role*bs_block_size; |
|
|
|
stripes[role].real_end = stripes[role].end |
|
|
|
= end > (role+1)*bs_block_size ? bs_block_size : end-role*bs_block_size; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void osd_t::exec_primary_read(osd_op_t *cur_op) |
|
|
|
bool osd_t::prepare_primary_rw(osd_op_t *cur_op) |
|
|
|
{ |
|
|
|
// PG number is calculated from the offset
|
|
|
|
// Our EC scheme stores data in fixed chunks equal to (K*block size)
|
|
|
|
// But we must not use K in the process of calculating the PG number
|
|
|
|
// So we calculate the PG number using a separate setting which should be per-inode (FIXME)
|
|
|
|
uint64_t start = cur_op->op.rw.offset; |
|
|
|
uint64_t end = cur_op->op.rw.offset + cur_op->op.rw.len; |
|
|
|
// FIXME Real pg_num should equal the below expression + 1
|
|
|
|
pg_num_t pg_num = (cur_op->op.rw.inode + cur_op->op.rw.offset / parity_block_size) % pg_count; |
|
|
|
// FIXME: Postpone operations in inactive PGs
|
|
|
|
if (pg_num > pgs.size() || !(pgs[pg_num].state & PG_ACTIVE)) |
|
|
|
{ |
|
|
|
finish_primary_op(cur_op, -EINVAL); |
|
|
|
return; |
|
|
|
return false; |
|
|
|
} |
|
|
|
uint64_t pg_parity_size = bs_block_size * pgs[pg_num].pg_minsize; |
|
|
|
object_id oid = { |
|
|
@ -80,56 +55,72 @@ void osd_t::exec_primary_read(osd_op_t *cur_op) |
|
|
|
.stripe = (cur_op->op.rw.offset / parity_block_size) * parity_block_size + |
|
|
|
((cur_op->op.rw.offset % parity_block_size) / pg_parity_size) * pg_parity_size |
|
|
|
}; |
|
|
|
if (end > (oid.stripe + pg_parity_size) || |
|
|
|
(start % bs_disk_alignment) != 0 || |
|
|
|
(end % bs_disk_alignment) != 0) |
|
|
|
if ((cur_op->op.rw.offset + cur_op->op.rw.len) > (oid.stripe + pg_parity_size) || |
|
|
|
(cur_op->op.rw.offset % bs_disk_alignment) != 0 || |
|
|
|
(cur_op->op.rw.len % bs_disk_alignment) != 0) |
|
|
|
{ |
|
|
|
finish_primary_op(cur_op, -EINVAL); |
|
|
|
return; |
|
|
|
return false; |
|
|
|
} |
|
|
|
osd_primary_read_t *op_data = (osd_primary_read_t*)calloc( |
|
|
|
sizeof(osd_primary_read_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1 |
|
|
|
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc( |
|
|
|
sizeof(osd_primary_op_data_t) + sizeof(osd_read_stripe_t) * pgs[pg_num].pg_size, 1 |
|
|
|
); |
|
|
|
op_data->pg_num = pg_num; |
|
|
|
op_data->oid = oid; |
|
|
|
op_data->stripes = ((osd_read_stripe_t*)(op_data+1)); |
|
|
|
cur_op->op_data = op_data; |
|
|
|
split_stripes(pgs[pg_num].pg_minsize, bs_block_size, start, end, op_data->stripes); |
|
|
|
// Determine version
|
|
|
|
split_stripes(pgs[pg_num].pg_minsize, bs_block_size, (uint32_t)(cur_op->op.rw.offset - oid.stripe), cur_op->op.rw.len, op_data->stripes); |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
void osd_t::exec_primary_read(osd_op_t *cur_op) |
|
|
|
{ |
|
|
|
if (!prepare_primary_rw(cur_op)) |
|
|
|
{ |
|
|
|
auto vo_it = pgs[pg_num].ver_override.find(oid); |
|
|
|
op_data->target_ver = vo_it != pgs[pg_num].ver_override.end() ? vo_it->second : UINT64_MAX; |
|
|
|
return; |
|
|
|
} |
|
|
|
if (pgs[pg_num].state == PG_ACTIVE) |
|
|
|
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; |
|
|
|
auto & pg = pgs[op_data->pg_num]; |
|
|
|
for (int role = 0; role < pg.pg_minsize; role++) |
|
|
|
{ |
|
|
|
op_data->stripes[role].read_start = op_data->stripes[role].req_start; |
|
|
|
op_data->stripes[role].read_end = op_data->stripes[role].req_end; |
|
|
|
} |
|
|
|
// Determine version
|
|
|
|
auto vo_it = pg.ver_override.find(op_data->oid); |
|
|
|
op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX; |
|
|
|
if (pg.state == PG_ACTIVE) |
|
|
|
{ |
|
|
|
// Fast happy-path
|
|
|
|
submit_read_subops(pgs[pg_num].pg_minsize, pgs[pg_num].cur_set.data(), cur_op); |
|
|
|
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); |
|
|
|
submit_read_subops(pg.pg_minsize, pg.cur_set.data(), cur_op); |
|
|
|
cur_op->send_list.push_back(cur_op->buf, cur_op->op.rw.len); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
// PG may be degraded or have misplaced objects
|
|
|
|
spp::sparse_hash_map<object_id, pg_osd_set_state_t*> obj_states; |
|
|
|
auto st_it = pgs[pg_num].obj_states.find(oid); |
|
|
|
uint64_t* cur_set = (st_it != pgs[pg_num].obj_states.end() |
|
|
|
auto st_it = pg.obj_states.find(op_data->oid); |
|
|
|
uint64_t* cur_set = (st_it != pg.obj_states.end() |
|
|
|
? st_it->second->read_target.data() |
|
|
|
: pgs[pg_num].cur_set.data()); |
|
|
|
if (extend_missing_stripes(op_data->stripes, cur_set, pgs[pg_num].pg_minsize, pgs[pg_num].pg_size) < 0) |
|
|
|
: pg.cur_set.data()); |
|
|
|
if (extend_missing_stripes(op_data->stripes, cur_set, pg.pg_minsize, pg.pg_size) < 0) |
|
|
|
{ |
|
|
|
free(op_data); |
|
|
|
finish_primary_op(cur_op, -EIO); |
|
|
|
return; |
|
|
|
} |
|
|
|
// Submit reads
|
|
|
|
submit_read_subops(pgs[pg_num].pg_size, cur_set, cur_op); |
|
|
|
op_data->pg_minsize = pgs[pg_num].pg_minsize; |
|
|
|
op_data->pg_size = pgs[pg_num].pg_size; |
|
|
|
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); |
|
|
|
submit_read_subops(pg.pg_size, cur_set, cur_op); |
|
|
|
op_data->pg_minsize = pg.pg_minsize; |
|
|
|
op_data->pg_size = pg.pg_size; |
|
|
|
op_data->degraded = 1; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) |
|
|
|
{ |
|
|
|
osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data; |
|
|
|
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; |
|
|
|
if (!ok) |
|
|
|
op_data->errors++; |
|
|
|
else |
|
|
@ -152,42 +143,15 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) |
|
|
|
osd_read_stripe_t *stripes = op_data->stripes; |
|
|
|
for (int role = 0; role < op_data->pg_minsize; role++) |
|
|
|
{ |
|
|
|
if (stripes[role].end != 0 && stripes[role].real_end == 0) |
|
|
|
if (stripes[role].read_end != 0 && stripes[role].missing) |
|
|
|
{ |
|
|
|
int prev = -2; |
|
|
|
for (int other = 0; other < op_data->pg_size; other++) |
|
|
|
{ |
|
|
|
if (other != role) |
|
|
|
{ |
|
|
|
if (prev == -2) |
|
|
|
{ |
|
|
|
prev = other; |
|
|
|
} |
|
|
|
else if (prev >= 0) |
|
|
|
{ |
|
|
|
memxor( |
|
|
|
cur_op->buf + stripes[prev].pos + (stripes[prev].real_start - stripes[role].start), |
|
|
|
cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[other].start), |
|
|
|
cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start |
|
|
|
); |
|
|
|
prev = -1; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
memxor( |
|
|
|
cur_op->buf + stripes[role].pos, |
|
|
|
cur_op->buf + stripes[other].pos + (stripes[other].real_start - stripes[role].start), |
|
|
|
cur_op->buf + stripes[role].pos, stripes[role].end - stripes[role].start |
|
|
|
); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
reconstruct_stripe(stripes, op_data->pg_size, role); |
|
|
|
} |
|
|
|
if (stripes[role].end != 0) |
|
|
|
if (stripes[role].req_end != 0) |
|
|
|
{ |
|
|
|
// Send buffer in parts to avoid copying
|
|
|
|
cur_op->send_list.push_back( |
|
|
|
cur_op->buf + stripes[role].pos + (stripes[role].real_start - stripes[role].start), stripes[role].end |
|
|
|
stripes[role].read_buf + (stripes[role].read_start - stripes[role].req_start), stripes[role].req_end |
|
|
|
); |
|
|
|
} |
|
|
|
} |
|
|
@ -198,75 +162,38 @@ void osd_t::handle_primary_read_subop(osd_op_t *cur_op, int ok) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
int osd_t::extend_missing_stripes(osd_read_stripe_t *stripes, osd_num_t *osd_set, int minsize, int size) |
|
|
|
{ |
|
|
|
for (int role = 0; role < minsize; role++) |
|
|
|
{ |
|
|
|
if (stripes[role].end != 0 && osd_set[role] == 0) |
|
|
|
{ |
|
|
|
stripes[role].real_start = stripes[role].real_end = 0; |
|
|
|
// Stripe is missing. Extend read to other stripes.
|
|
|
|
// We need at least pg_minsize stripes to recover the lost part.
|
|
|
|
int exist = 0; |
|
|
|
for (int j = 0; j < size; j++) |
|
|
|
{ |
|
|
|
if (osd_set[j] != 0) |
|
|
|
{ |
|
|
|
if (stripes[j].real_end == 0 || j >= minsize) |
|
|
|
{ |
|
|
|
stripes[j].real_start = stripes[role].start; |
|
|
|
stripes[j].real_end = stripes[role].end; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
stripes[j].real_start = stripes[j].start < stripes[role].start ? stripes[j].start : stripes[role].start; |
|
|
|
stripes[j].real_end = stripes[j].end > stripes[role].end ? stripes[j].end : stripes[role].end; |
|
|
|
} |
|
|
|
exist++; |
|
|
|
if (exist >= minsize) |
|
|
|
{ |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if (exist < minsize) |
|
|
|
{ |
|
|
|
// Less than minsize stripes are available for this object
|
|
|
|
return -1; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op) |
|
|
|
{ |
|
|
|
osd_primary_read_t *op_data = (osd_primary_read_t*)cur_op->op_data; |
|
|
|
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; |
|
|
|
osd_read_stripe_t *stripes = op_data->stripes; |
|
|
|
uint64_t buf_size = 0; |
|
|
|
int n_subops = 0; |
|
|
|
// Allocate subops
|
|
|
|
int n_subops = 0, force_read = -1; |
|
|
|
for (int role = 0; role < read_pg_size; role++) |
|
|
|
{ |
|
|
|
if (stripes[role].real_end != 0) |
|
|
|
if (osd_set[role] == this->osd_num || osd_set[role] != 0 && force_read == -1) |
|
|
|
{ |
|
|
|
n_subops++; |
|
|
|
stripes[role].pos = buf_size; |
|
|
|
buf_size += stripes[role].real_end - stripes[role].real_start; |
|
|
|
force_read = role; |
|
|
|
} |
|
|
|
else if (stripes[role].end != 0) |
|
|
|
if (osd_set[role] != 0 && stripes[role].read_end != 0) |
|
|
|
{ |
|
|
|
stripes[role].pos = buf_size; |
|
|
|
buf_size += stripes[role].end - stripes[role].start; |
|
|
|
n_subops++; |
|
|
|
} |
|
|
|
} |
|
|
|
if (!n_subops && op_data->should_read_version) |
|
|
|
{ |
|
|
|
n_subops = 1; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
force_read = -1; |
|
|
|
} |
|
|
|
osd_op_t *subops = new osd_op_t[n_subops]; |
|
|
|
cur_op->buf = memalign(MEM_ALIGNMENT, buf_size); |
|
|
|
op_data->n_subops = n_subops; |
|
|
|
op_data->subops = subops; |
|
|
|
int subop = 0; |
|
|
|
for (int role = 0; role < read_pg_size; role++) |
|
|
|
{ |
|
|
|
if (stripes[role].real_end == 0) |
|
|
|
if (stripes[role].read_end == 0 && force_read != role) |
|
|
|
{ |
|
|
|
continue; |
|
|
|
} |
|
|
@ -277,7 +204,7 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op |
|
|
|
{ |
|
|
|
subops[subop].bs_op = { |
|
|
|
.opcode = BS_OP_READ, |
|
|
|
.callback = [this, cur_op](blockstore_op_t *subop) |
|
|
|
.callback = [cur_op, this](blockstore_op_t *subop) |
|
|
|
{ |
|
|
|
handle_primary_read_subop(cur_op, subop->retval == subop->len); |
|
|
|
}, |
|
|
@ -286,9 +213,9 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op |
|
|
|
.stripe = op_data->oid.stripe | role, |
|
|
|
}, |
|
|
|
.version = op_data->target_ver, |
|
|
|
.offset = stripes[role].real_start, |
|
|
|
.len = stripes[role].real_end - stripes[role].real_start, |
|
|
|
.buf = cur_op->buf + stripes[role].pos, |
|
|
|
.offset = stripes[role].read_start, |
|
|
|
.len = stripes[role].read_end - stripes[role].read_start, |
|
|
|
.buf = stripes[role].read_buf, |
|
|
|
}; |
|
|
|
bs->enqueue_op(&subops[subop].bs_op); |
|
|
|
} |
|
|
@ -307,11 +234,11 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op |
|
|
|
.stripe = op_data->oid.stripe | role, |
|
|
|
}, |
|
|
|
.version = op_data->target_ver, |
|
|
|
.offset = stripes[role].real_start, |
|
|
|
.len = stripes[role].real_end - stripes[role].real_start, |
|
|
|
.offset = stripes[role].read_start, |
|
|
|
.len = stripes[role].read_end - stripes[role].read_start, |
|
|
|
}; |
|
|
|
subops[subop].buf = cur_op->buf + stripes[role].pos; |
|
|
|
subops[subop].callback = [this, cur_op](osd_op_t *subop) |
|
|
|
subops[subop].buf = stripes[role].read_buf; |
|
|
|
subops[subop].callback = [cur_op, this](osd_op_t *subop) |
|
|
|
{ |
|
|
|
// so it doesn't get freed. FIXME: do it better
|
|
|
|
subop->buf = NULL; |
|
|
@ -326,31 +253,44 @@ void osd_t::submit_read_subops(int read_pg_size, const uint64_t* osd_set, osd_op |
|
|
|
|
|
|
|
void osd_t::exec_primary_write(osd_op_t *cur_op) |
|
|
|
{ |
|
|
|
// "RAID5" EC(k+1) parity modification variants (Px = previous, Nx = new):
|
|
|
|
// 1,2,3 write N1 -> read P2 -> write N3 = N1^P2
|
|
|
|
// _,2,3 write N1 -> read P2 -> write N3 = N1^P2
|
|
|
|
// 1,_,3 write N1 -> read P1,P3 -> write N3 = N1^P3^P1
|
|
|
|
// 1,2,_ write N1 -> read nothing
|
|
|
|
// 1,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P2^P3
|
|
|
|
// (or read P1,P4 -> write N4 = N1^P4^P1)
|
|
|
|
// 1,_,3,4 write N1 -> read P1,P4 -> write N4 = N1^P4^P1
|
|
|
|
// _,2,3,4 write N1 -> read P2,P3 -> write N4 = N1^P3^P2
|
|
|
|
// 1,2,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1
|
|
|
|
// 1,_,3,4,5 write N1 -> read P1,P5 -> write N5 = N1^P5^P1
|
|
|
|
// _,2,3,4,5 write N1 -> read P2,P3,P4 -> write N5 = N1^P2^P3^P4
|
|
|
|
//
|
|
|
|
// I.e, when we write a part:
|
|
|
|
// 1) If parity is missing and all other parts are available:
|
|
|
|
// just overwrite the part
|
|
|
|
// 2) If the modified part is missing and all other parts are available:
|
|
|
|
// read all other parts except parity, xor them all with the new data
|
|
|
|
// 3) If all parts are available and size=3:
|
|
|
|
// read the paired data stripe, xor it with the new data
|
|
|
|
// 4) Otherwise:
|
|
|
|
// read old parity and old data of the modified part, xor them both with the new data
|
|
|
|
// Ouсh. Scary. But faster than the generic variant.
|
|
|
|
//
|
|
|
|
// Generic variant for jerasure is a simple RMW process: read all -> decode -> modify -> encode -> write
|
|
|
|
if (!prepare_primary_rw(cur_op)) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)cur_op->op_data; |
|
|
|
auto & pg = pgs[op_data->pg_num]; |
|
|
|
// Check if actions are pending for this object
|
|
|
|
auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){ |
|
|
|
.oid = op_data->oid, |
|
|
|
.osd_num = 0, |
|
|
|
}); |
|
|
|
if (act_it != pg.obj_stab_actions.end() && |
|
|
|
act_it->first.oid.inode == op_data->oid.inode && |
|
|
|
(act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) |
|
|
|
{ |
|
|
|
// FIXME postpone the request until actions are done
|
|
|
|
free(op_data); |
|
|
|
finish_primary_op(cur_op, -EIO); |
|
|
|
return; |
|
|
|
} |
|
|
|
// Check if there are other write requests to the same object
|
|
|
|
|
|
|
|
// Determine blocks to read
|
|
|
|
op_data->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize); |
|
|
|
op_data->should_read_version = true; |
|
|
|
// Read required blocks
|
|
|
|
submit_read_subops(pg.pg_size, pg.cur_set.data(), cur_op); |
|
|
|
// ->>>>> Continue from the callback
|
|
|
|
// Calculate parity
|
|
|
|
calc_rmw_parity(op_data->stripes, op_data->pg_size); |
|
|
|
// Save version override if degraded
|
|
|
|
|
|
|
|
// Send writes
|
|
|
|
|
|
|
|
// ->>>>> Continue from the callback
|
|
|
|
// Remember version as unstable
|
|
|
|
|
|
|
|
// Remove version override if degraded
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|