Add replicated pool support to OSD logic

...in theory :-D now it needs some testing
Vitaliy Filippov 2020-09-04 22:17:44 +03:00
parent 168cc2c803
commit 4f9b5286a0
11 changed files with 195 additions and 83 deletions

View File

@ -15,7 +15,7 @@ for (let i = 2; i < process.argv.length; i++)
if (!options.etcd_url)
{
console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+' --etcd_url "http://127.0.0.1:2379,..." --etcd_prefix "/rage" --etcd_start_timeout 5');
console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+' --etcd_url "http://127.0.0.1:2379,..." --etcd_prefix "/vitastor" --etcd_start_timeout 5');
process.exit();
}

View File

@ -136,7 +136,9 @@ class Mon
/* <pool_id>: {
<pg_id>: {
primary: osd_num_t,
state: ("starting"|"peering"|"incomplete"|"active"|"stopping"|"offline"|"degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|"left_on_dead")[],
state: ("starting"|"peering"|"incomplete"|"active"|"stopping"|"offline"|
"degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"|
"has_invalid"|"left_on_dead")[],
}
}, */
},

2
osd.h
View File

@ -193,7 +193,7 @@ class osd_t
void add_bs_subop_stats(osd_op_t *subop);
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set);
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set);
void submit_primary_sync_subops(osd_op_t *cur_op);
void submit_primary_stab_subops(osd_op_t *cur_op);

View File

@ -26,7 +26,7 @@ void osd_t::handle_peers()
degraded_objects += p.second.degraded_objects.size();
if ((p.second.state & (PG_ACTIVE | PG_HAS_UNCLEAN)) == (PG_ACTIVE | PG_HAS_UNCLEAN))
peering_state = peering_state | OSD_FLUSHING_PGS;
else
else if (p.second.state & PG_ACTIVE)
peering_state = peering_state | OSD_RECOVERING;
}
else
@ -184,6 +184,7 @@ void osd_t::start_pg_peering(pg_t & pg)
{
pg.state = PG_INCOMPLETE;
report_pg_state(pg);
return;
}
}
}
@ -191,6 +192,7 @@ void osd_t::start_pg_peering(pg_t & pg)
{
pg.state = PG_INCOMPLETE;
report_pg_state(pg);
return;
}
std::set<osd_num_t> cur_peers;
for (auto pg_osd: pg.all_peers)

View File

@ -33,6 +33,7 @@ struct obj_piece_ver_t
struct pg_obj_state_check_t
{
pg_t *pg;
bool replicated = false;
std::vector<obj_ver_role> list;
int list_pos;
int obj_start = 0, obj_end = 0, ver_start = 0, ver_end = 0;
@ -41,7 +42,7 @@ struct pg_obj_state_check_t
uint64_t last_ver = 0;
uint64_t target_ver = 0;
uint64_t n_copies = 0, has_roles = 0, n_roles = 0, n_stable = 0, n_mismatched = 0;
uint64_t n_unstable = 0, n_buggy = 0;
uint64_t n_unstable = 0, n_invalid = 0;
pg_osd_set_t osd_set;
int log_level;
@ -73,6 +74,12 @@ void pg_obj_state_check_t::walk()
{
finish_object();
}
if (pg->state & PG_HAS_INVALID)
{
// Stop PGs with "invalid" objects
pg->state = PG_INCOMPLETE | PG_HAS_INVALID;
return;
}
if (pg->pg_cursize < pg->pg_size)
{
pg->state |= PG_DEGRADED;
@ -92,7 +99,7 @@ void pg_obj_state_check_t::start_object()
target_ver = 0;
ver_start = list_pos;
has_roles = n_copies = n_roles = n_stable = n_mismatched = 0;
n_unstable = n_buggy = 0;
n_unstable = n_invalid = 0;
}
void pg_obj_state_check_t::handle_version()
@ -111,11 +118,11 @@ void pg_obj_state_check_t::handle_version()
has_roles = n_copies = n_roles = n_stable = n_mismatched = 0;
last_ver = list[list_pos].version;
}
int replica = (list[list_pos].oid.stripe & STRIPE_MASK);
unsigned replica = (list[list_pos].oid.stripe & STRIPE_MASK);
n_copies++;
if (replica >= pg->pg_size)
if (replicated && replica > 0 || replica >= pg->pg_size)
{
n_buggy++;
n_invalid++;
}
else
{
@ -123,14 +130,32 @@ void pg_obj_state_check_t::handle_version()
{
n_stable++;
}
if (pg->cur_set[replica] != list[list_pos].osd_num)
if (replicated)
{
n_mismatched++;
int i;
for (i = 0; i < pg->cur_set.size(); i++)
{
if (pg->cur_set[i] == list[list_pos].osd_num)
{
break;
}
}
if (i == pg->cur_set.size())
{
n_mismatched++;
}
}
if (!(has_roles & (1 << replica)))
else
{
has_roles = has_roles | (1 << replica);
n_roles++;
if (pg->cur_set[replica] != list[list_pos].osd_num)
{
n_mismatched++;
}
if (!(has_roles & (1 << replica)))
{
has_roles = has_roles | (1 << replica);
n_roles++;
}
}
}
}
@ -151,11 +176,14 @@ void pg_obj_state_check_t::finish_object()
obj_end = list_pos;
// Remember the decision
uint64_t state = 0;
if (n_buggy > 0)
if (n_invalid > 0)
{
state = OBJ_BUGGY;
// FIXME: bring pg offline
throw std::runtime_error("buggy object state");
// It's not allowed to change the replication scheme for a pool other than by recreating it
// So we must bring the PG offline
state = OBJ_INCOMPLETE;
pg->state |= PG_HAS_INVALID;
pg->total_count++;
return;
}
if (n_unstable > 0)
{
@ -201,7 +229,7 @@ void pg_obj_state_check_t::finish_object()
{
return;
}
if (n_roles < pg->pg_minsize)
if (!replicated && n_roles < pg->pg_minsize)
{
if (log_level > 1)
{
@ -221,7 +249,7 @@ void pg_obj_state_check_t::finish_object()
}
if (n_mismatched > 0)
{
if (n_roles >= pg->pg_cursize && log_level > 1)
if (log_level > 1 && (replicated || n_roles >= pg->pg_cursize))
{
printf("Object is misplaced: inode=%lu stripe=%lu version=%lu/%lu\n", oid.inode, oid.stripe, target_ver, max_ver);
}
@ -234,14 +262,16 @@ void pg_obj_state_check_t::finish_object()
{
for (int i = obj_start; i < obj_end; i++)
{
printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
printf("v%lu present on: osd %lu, role %ld%s\n", list[i].version, list[i].osd_num,
(list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
}
}
else
{
for (int i = ver_start; i < ver_end; i++)
{
printf("Target version present on: osd %lu, role %ld%s\n", list[i].osd_num, (list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
printf("Target version present on: osd %lu, role %ld%s\n", list[i].osd_num,
(list[i].oid.stripe & STRIPE_MASK), list[i].is_stable ? " (stable)" : "");
}
}
}
@ -343,6 +373,7 @@ void pg_t::calc_object_states(int log_level)
pg_obj_state_check_t st;
st.log_level = log_level;
st.pg = this;
st.replicated = (this->scheme == POOL_SCHEME_REPLICATED);
auto ps = peering_state;
epoch = 0;
for (auto it: ps->list_results)
@ -384,7 +415,7 @@ void pg_t::calc_object_states(int log_level)
void pg_t::print_state()
{
printf(
"[PG %u] is %s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pg_num,
"[PG %u] is %s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pg_num,
(state & PG_STARTING) ? "starting" : "",
(state & PG_OFFLINE) ? "offline" : "",
(state & PG_PEERING) ? "peering" : "",
@ -396,6 +427,8 @@ void pg_t::print_state()
(state & PG_HAS_DEGRADED) ? " + has_degraded" : "",
(state & PG_HAS_MISPLACED) ? " + has_misplaced" : "",
(state & PG_HAS_UNCLEAN) ? " + has_unclean" : "",
(state & PG_HAS_INVALID) ? " + has_invalid" : "",
(state & PG_LEFT_ON_DEAD) ? " + left_on_dead" : "",
total_count
);
}

View File

@ -13,14 +13,16 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
{
// PG number is calculated from the offset
// Our EC scheme stores data in fixed chunks equal to (K*block size)
// K = pg_minsize and will be a property of the inode. Not it's hardcoded (FIXME)
uint64_t pg_block_size = bs_block_size * 2;
// K = pg_minsize in case of EC/XOR, or 1 for replicated pools
pool_id_t pool_id = INODE_POOL(cur_op->req.rw.inode);
auto & pool_cfg = st_cli.pool_config[pool_id];
uint64_t pg_block_size = bs_block_size * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_minsize);
object_id oid = {
.inode = cur_op->req.rw.inode,
// oid.stripe = starting offset of the parity stripe
.stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size,
};
pool_id_t pool_id = INODE_POOL(oid.inode);
// FIXME: pg_stripe_size may be a per-pool config
pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pg_stripe_size) % pg_counts[pool_id] + 1;
auto pg_it = pgs.find({ .pool_id = pool_id, .pg_num = pg_num });
if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE))
@ -37,13 +39,15 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
return false;
}
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc(
sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * pg_it->second.pg_size, 1
sizeof(osd_primary_op_data_t) + sizeof(osd_rmw_stripe_t) * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size), 1
);
op_data->pg_num = pg_num;
op_data->oid = oid;
op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1));
op_data->stripes = pool_cfg.scheme == POOL_SCHEME_REPLICATED ? NULL : ((osd_rmw_stripe_t*)(op_data+1));
op_data->scheme = pool_cfg.scheme;
cur_op->op_data = op_data;
split_stripes(pg_it->second.pg_minsize, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
split_stripes((pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_minsize),
bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
pg_it->second.inflight++;
return true;
}
@ -88,7 +92,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
else if (op_data->st == 2) goto resume_2;
{
auto & pg = pgs[{ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }];
for (int role = 0; role < pg.pg_minsize; role++)
for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_minsize); role++)
{
op_data->stripes[role].read_start = op_data->stripes[role].req_start;
op_data->stripes[role].read_end = op_data->stripes[role].req_end;
@ -96,7 +100,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
// Determine version
auto vo_it = pg.ver_override.find(op_data->oid);
op_data->target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
if (pg.state == PG_ACTIVE)
if (pg.state == PG_ACTIVE || op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Fast happy-path
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0);
@ -210,10 +214,33 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
resume_1:
// Determine blocks to read and write
// Missing chunks are allowed to be overwritten even in incomplete objects
// FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for the lower performance impact
// FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size);
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Simplified algorithm
op_data->stripes[0].write_start = op_data->stripes[0].req_start;
op_data->stripes[0].write_end = op_data->stripes[0].req_end;
op_data->stripes[0].write_buf = cur_op->buf;
if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
op_data->stripes[0].write_end != bs_block_size))
{
// Object is degraded/misplaced and will be moved to <write_osd_set>
op_data->stripes[0].read_start = 0;
op_data->stripes[0].read_end = bs_block_size;
cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign(MEM_ALIGNMENT, bs_block_size);
if (!cur_op->rmw_buf)
{
printf("Failed to allocate %u bytes\n", bs_block_size);
exit(1);
}
}
}
else
{
cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set,
pg.pg_size, pg.pg_minsize, pg.pg_cursize, pg.cur_set.data(), bs_block_size);
}
// Read required blocks
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
resume_2:
@ -227,8 +254,27 @@ resume_3:
}
// Save version override for parallel reads
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Recover missing stripes, calculate parity
calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Only (possibly) copy new data from the request into the recovery buffer
if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 ||
op_data->stripes[0].write_end != bs_block_size))
{
memcpy(
op_data->stripes[0].read_buf + op_data->stripes[0].req_start,
op_data->stripes[0].write_buf,
op_data->stripes[0].req_end - op_data->stripes[0].req_start
);
op_data->stripes[0].write_buf = op_data->stripes[0].read_buf;
op_data->stripes[0].write_start = 0;
op_data->stripes[0].write_end = bs_block_size;
}
}
else
{
// Recover missing stripes, calculate parity
calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size);
}
// Send writes
if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch)
{
@ -291,7 +337,7 @@ resume_5:
if (op_data->object_state->state & OBJ_MISPLACED)
{
// Remove extra chunks
submit_primary_del_subops(cur_op, pg.cur_set.data(), op_data->object_state->osd_set);
submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set);
if (op_data->n_subops > 0)
{
resume_8:
@ -314,7 +360,9 @@ resume_9:
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL
resume_6:
resume_7:
if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
// FIXME: Replicated writes are always "immediate"
if (op_data->scheme != POOL_SCHEME_REPLICATED &&
!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6))
{
return;
}
@ -657,7 +705,7 @@ resume_3:
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Submit deletes
op_data->fact_ver++;
submit_primary_del_subops(cur_op, NULL, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set);
submit_primary_del_subops(cur_op, NULL, 0, op_data->object_state ? op_data->object_state->osd_set : pg.cur_loc_set);
resume_4:
op_data->st = 4;
return;

View File

@ -20,6 +20,7 @@ struct osd_primary_op_data_t
object_id oid;
uint64_t target_ver;
uint64_t fact_ver = 0;
uint64_t scheme = 0;
int n_subops = 0, done = 0, errors = 0, epipe = 0;
int degraded = 0, pg_size, pg_minsize;
osd_rmw_stripe_t *stripes;

View File

@ -78,9 +78,10 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
{
bool w = submit_type == SUBMIT_WRITE;
bool wr = submit_type == SUBMIT_WRITE;
osd_primary_op_data_t *op_data = cur_op->op_data;
osd_rmw_stripe_t *stripes = op_data->stripes;
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
// Allocate subops
int n_subops = 0, zero_read = -1;
for (int role = 0; role < pg_size; role++)
@ -89,12 +90,12 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
{
zero_read = role;
}
if (osd_set[role] != 0 && (w || stripes[role].read_end != 0))
if (osd_set[role] != 0 && (wr || !rep && stripes[role].read_end != 0))
{
n_subops++;
}
}
if (!n_subops && submit_type == SUBMIT_RMW_READ)
if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep))
{
n_subops = 1;
}
@ -111,36 +112,37 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
for (int role = 0; role < pg_size; role++)
{
// We always submit zero-length writes to all replicas, even if the stripe is not modified
if (!(w || stripes[role].read_end != 0 || zero_read == role))
if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role))
{
continue;
}
osd_num_t role_osd_num = osd_set[role];
if (role_osd_num != 0)
{
int stripe_num = rep ? 0 : role;
if (role_osd_num == this->osd_num)
{
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
subops[i].op_type = (uint64_t)cur_op;
subops[i].bs_op = new blockstore_op_t({
.opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ),
.opcode = (uint64_t)(wr ? (rep ? BS_OP_WRITE_STABLE : BS_OP_WRITE) : BS_OP_READ),
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
{
handle_primary_bs_subop(subop);
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
.stripe = op_data->oid.stripe | stripe_num,
},
.version = op_version,
.offset = w ? stripes[role].write_start : stripes[role].read_start,
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
.buf = w ? stripes[role].write_buf : stripes[role].read_buf,
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
.buf = wr ? stripes[stripe_num].write_buf : stripes[stripe_num].read_buf,
});
#ifdef OSD_DEBUG
printf(
"Submit %s to local: %lu:%lu v%lu %u-%u\n", w ? "write" : "read",
op_data->oid.inode, op_data->oid.stripe | role, op_version,
"Submit %s to local: %lu:%lu v%lu %u-%u\n", wr ? "write" : "read",
op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version,
subops[i].bs_op->offset, subops[i].bs_op->len
);
#endif
@ -154,35 +156,35 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.opcode = (uint64_t)(w ? OSD_OP_SEC_WRITE : OSD_OP_SEC_READ),
.opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ),
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | role,
.stripe = op_data->oid.stripe | stripe_num,
},
.version = op_version,
.offset = w ? stripes[role].write_start : stripes[role].read_start,
.len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start,
.offset = wr ? stripes[stripe_num].write_start : stripes[stripe_num].read_start,
.len = wr ? stripes[stripe_num].write_end - stripes[stripe_num].write_start : stripes[stripe_num].read_end - stripes[stripe_num].read_start,
};
#ifdef OSD_DEBUG
printf(
"Submit %s to osd %lu: %lu:%lu v%lu %u-%u\n", w ? "write" : "read", role_osd_num,
op_data->oid.inode, op_data->oid.stripe | role, op_version,
"Submit %s to osd %lu: %lu:%lu v%lu %u-%u\n", wr ? "write" : "read", role_osd_num,
op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version,
subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len
);
#endif
if (w)
if (wr)
{
if (stripes[role].write_end > stripes[role].write_start)
if (stripes[stripe_num].write_end > stripes[stripe_num].write_start)
{
subops[i].iov.push_back(stripes[role].write_buf, stripes[role].write_end - stripes[role].write_start);
subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
}
}
else
{
if (stripes[role].read_end > stripes[role].read_start)
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
{
subops[i].iov.push_back(stripes[role].read_buf, stripes[role].read_end - stripes[role].read_start);
subops[i].iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start);
}
}
subops[i].callback = [cur_op, this](osd_op_t *subop)
@ -205,21 +207,23 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
static uint64_t bs_op_to_osd_op[] = {
0,
OSD_OP_SEC_READ, // BS_OP_READ
OSD_OP_SEC_WRITE, // BS_OP_WRITE
OSD_OP_SEC_SYNC, // BS_OP_SYNC
OSD_OP_SEC_STABILIZE, // BS_OP_STABLE
OSD_OP_SEC_DELETE, // BS_OP_DELETE
OSD_OP_SEC_LIST, // BS_OP_LIST
OSD_OP_SEC_ROLLBACK, // BS_OP_ROLLBACK
OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL
OSD_OP_SEC_READ, // BS_OP_READ = 1
OSD_OP_SEC_WRITE, // BS_OP_WRITE = 2
OSD_OP_SEC_WRITE_STABLE, // BS_OP_WRITE_STABLE = 3
OSD_OP_SEC_SYNC, // BS_OP_SYNC = 4
OSD_OP_SEC_STABILIZE, // BS_OP_STABLE = 5
OSD_OP_SEC_DELETE, // BS_OP_DELETE = 6
OSD_OP_SEC_LIST, // BS_OP_LIST = 7
OSD_OP_SEC_ROLLBACK, // BS_OP_ROLLBACK = 8
OSD_OP_TEST_SYNC_STAB_ALL, // BS_OP_SYNC_STAB_ALL = 9
};
void osd_t::handle_primary_bs_subop(osd_op_t *subop)
{
osd_op_t *cur_op = (osd_op_t*)subop->op_type;
blockstore_op_t *bs_op = subop->bs_op;
int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE ? bs_op->len : 0;
int expected = bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE
|| bs_op->opcode == BS_OP_WRITE_STABLE ? bs_op->len : 0;
if (bs_op->retval != expected && bs_op->opcode != BS_OP_READ)
{
// die
@ -231,7 +235,7 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop)
add_bs_subop_stats(subop);
subop->req.hdr.opcode = bs_op_to_osd_op[bs_op->opcode];
subop->reply.hdr.retval = bs_op->retval;
if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE)
if (bs_op->opcode == BS_OP_READ || bs_op->opcode == BS_OP_WRITE || bs_op->opcode == BS_OP_WRITE_STABLE)
{
subop->req.sec_rw.len = bs_op->len;
subop->reply.sec_rw.version = bs_op->version;
@ -269,7 +273,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
uint64_t opcode = subop->req.hdr.opcode;
int retval = subop->reply.hdr.retval;
int expected = opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE
? subop->req.sec_rw.len : 0;
|| opcode == OSD_OP_SEC_WRITE_STABLE ? subop->req.sec_rw.len : 0;
osd_primary_op_data_t *op_data = cur_op->op_data;
if (retval != expected)
{
@ -283,7 +287,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
else
{
op_data->done++;
if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE)
if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)
{
uint64_t version = subop->reply.sec_rw.version;
#ifdef OSD_DEBUG
@ -346,13 +350,27 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op)
}
}
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_osd_set_t & loc_set)
static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num)
{
for (uint64_t i = 0; i < size; i++)
{
if (osd_set[i] == osd_num)
{
return true;
}
}
return false;
}
void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint64_t pg_size, pg_osd_set_t & loc_set)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
int extra_chunks = 0;
// ordered comparison for EC/XOR, unordered for replicated pools
for (auto & chunk: loc_set)
{
if (!cur_set || chunk.osd_num != cur_set[chunk.role])
if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
{
extra_chunks++;
}
@ -368,8 +386,9 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os
int i = 0;
for (auto & chunk: loc_set)
{
if (!cur_set || chunk.osd_num != cur_set[chunk.role])
if (!cur_set || (rep ? contains_osd(cur_set, pg_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role]))
{
int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role;
if (chunk.osd_num == this->osd_num)
{
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
@ -382,7 +401,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
.stripe = op_data->oid.stripe | stripe_num,
},
// Same version as write
.version = op_data->fact_ver,
@ -401,7 +420,7 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os
},
.oid = {
.inode = op_data->oid.inode,
.stripe = op_data->oid.stripe | chunk.role,
.stripe = op_data->oid.stripe | stripe_num,
},
// Same version as write
.version = op_data->fact_ver,

View File

@ -153,6 +153,11 @@ void* alloc_read_buffer(osd_rmw_stripe_t *stripes, int read_pg_size, uint64_t ad
}
// Allocate buffer
void *buf = memalign(MEM_ALIGNMENT, buf_size);
if (!buf)
{
printf("Failed to allocate %lu bytes\n", buf_size);
exit(1);
}
uint64_t buf_pos = add_size;
for (int role = 0; role < read_pg_size; role++)
{

View File

@ -1,8 +1,8 @@
#include "pg_states.h"
const int pg_state_bit_count = 13;
const int pg_state_bit_count = 14;
const int pg_state_bits[13] = {
const int pg_state_bits[14] = {
PG_STARTING,
PG_PEERING,
PG_INCOMPLETE,
@ -14,10 +14,11 @@ const int pg_state_bits[13] = {
PG_HAS_DEGRADED,
PG_HAS_MISPLACED,
PG_HAS_UNCLEAN,
PG_HAS_INVALID,
PG_LEFT_ON_DEAD,
};
const char *pg_state_names[13] = {
const char *pg_state_names[14] = {
"starting",
"peering",
"incomplete",
@ -29,5 +30,6 @@ const char *pg_state_names[13] = {
"has_degraded",
"has_misplaced",
"has_unclean",
"has_invalid",
"left_on_dead",
};

View File

@ -15,7 +15,8 @@
#define PG_HAS_DEGRADED (1<<8)
#define PG_HAS_MISPLACED (1<<9)
#define PG_HAS_UNCLEAN (1<<10)
#define PG_LEFT_ON_DEAD (1<<11)
#define PG_HAS_INVALID (1<<11)
#define PG_LEFT_ON_DEAD (1<<12)
// FIXME: Safe default that doesn't depend on pg_stripe_size or pg_block_size
#define STRIPE_MASK ((uint64_t)4096 - 1)
@ -26,7 +27,6 @@
#define OBJ_MISPLACED 0x08
#define OBJ_NEEDS_STABLE 0x10000
#define OBJ_NEEDS_ROLLBACK 0x20000
#define OBJ_BUGGY 0x80000
extern const int pg_state_bits[];
extern const char *pg_state_names[];