Handle secondary OSD connection errors [in theory]

trace-sqes
Vitaliy Filippov 2020-03-30 19:43:12 +03:00
parent 43fe1d88e7
commit 8a8b619875
7 changed files with 174 additions and 67 deletions

View File

@ -93,7 +93,7 @@ Input:
- buf = pre-allocated obj_ver_id array <len> units long - buf = pre-allocated obj_ver_id array <len> units long
Output: Output:
- retval = 0 or negative error number (-EINVAL) - retval = 0 or negative error number (-EINVAL or -EBUSY if not synced)
## BS_OP_SYNC_STAB_ALL ## BS_OP_SYNC_STAB_ALL

View File

@ -67,7 +67,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
else if (IS_UNSYNCED(dirty_it->second.state)) else if (IS_UNSYNCED(dirty_it->second.state))
{ {
// Object not synced yet. Caller must sync it first // Object not synced yet. Caller must sync it first
op->retval = -EAGAIN; op->retval = -EBUSY;
FINISH_OP(op); FINISH_OP(op);
return 1; return 1;
} }

View File

@ -305,7 +305,7 @@ void osd_t::cancel_op(osd_op_t *op)
} }
else else
{ {
delete op; finish_op(op, -EPIPE);
} }
} }

5
osd.h
View File

@ -150,7 +150,7 @@ struct osd_client_t
// Outbound messages (replies or requests) // Outbound messages (replies or requests)
std::deque<osd_op_t*> outbox; std::deque<osd_op_t*> outbox;
// PGs dirtied by this client's primary-writes // PGs dirtied by this client's primary-writes (FIXME to drop the connection)
std::set<pg_num_t> dirty_pgs; std::set<pg_num_t> dirty_pgs;
// Write state // Write state
@ -276,7 +276,8 @@ class osd_t
void continue_primary_write(osd_op_t *cur_op); void continue_primary_write(osd_op_t *cur_op);
void continue_primary_sync(osd_op_t *cur_op); void continue_primary_sync(osd_op_t *cur_op);
void finish_op(osd_op_t *cur_op, int retval); void finish_op(osd_op_t *cur_op, int retval);
void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint64_t version); void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version);
void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval);
void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op);
void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_sync_subops(osd_op_t *cur_op);
void submit_primary_stab_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op);

View File

@ -256,13 +256,21 @@ void osd_t::start_pg_peering(pg_num_t pg_num)
pg.state = PG_PEERING; pg.state = PG_PEERING;
pg.print_state(); pg.print_state();
pg.state_dict.clear(); pg.state_dict.clear();
pg.incomplete_objects.clear();
pg.misplaced_objects.clear(); pg.misplaced_objects.clear();
pg.degraded_objects.clear(); pg.degraded_objects.clear();
pg.ver_override.clear();
pg.flush_actions.clear(); pg.flush_actions.clear();
pg.ver_override.clear();
if (pg.flush_batch) if (pg.flush_batch)
{
delete pg.flush_batch; delete pg.flush_batch;
}
pg.flush_batch = NULL; pg.flush_batch = NULL;
for (auto p: pg.write_queue)
{
cancel_op(p.second);
}
pg.write_queue.clear();
pg.pg_cursize = 0; pg.pg_cursize = 0;
for (int role = 0; role < pg.cur_set.size(); role++) for (int role = 0; role < pg.cur_set.size(); role++)
{ {

View File

@ -14,12 +14,13 @@
#define PG_PEERING (1<<1) #define PG_PEERING (1<<1)
#define PG_INCOMPLETE (1<<2) #define PG_INCOMPLETE (1<<2)
#define PG_ACTIVE (1<<3) #define PG_ACTIVE (1<<3)
#define PG_STOPPING (1<<4)
// Plus any of these: // Plus any of these:
#define PG_DEGRADED (1<<4) #define PG_DEGRADED (1<<5)
#define PG_HAS_INCOMPLETE (1<<5) #define PG_HAS_INCOMPLETE (1<<6)
#define PG_HAS_DEGRADED (1<<6) #define PG_HAS_DEGRADED (1<<7)
#define PG_HAS_MISPLACED (1<<7) #define PG_HAS_MISPLACED (1<<8)
#define PG_HAS_UNCLEAN (1<<8) #define PG_HAS_UNCLEAN (1<<9)
// FIXME: Safe default that doesn't depend on parity_block_size or pg_parity_size // FIXME: Safe default that doesn't depend on parity_block_size or pg_parity_size
#define STRIPE_MASK ((uint64_t)4096 - 1) #define STRIPE_MASK ((uint64_t)4096 - 1)
@ -110,7 +111,6 @@ struct pg_t
btree::btree_map<object_id, uint64_t> ver_override; btree::btree_map<object_id, uint64_t> ver_override;
pg_peering_state_t *peering_state = NULL; pg_peering_state_t *peering_state = NULL;
pg_flush_batch_t *flush_batch = NULL; pg_flush_batch_t *flush_batch = NULL;
int flush_actions_in_progress = 0;
std::multimap<object_id, osd_op_t*> write_queue; std::multimap<object_id, osd_op_t*> write_queue;

View File

@ -27,7 +27,7 @@ struct osd_primary_op_data_t
object_id oid; object_id oid;
uint64_t target_ver; uint64_t target_ver;
uint64_t fact_ver = 0; uint64_t fact_ver = 0;
int n_subops = 0, done = 0, errors = 0; int n_subops = 0, done = 0, errors = 0, epipe = 0;
int degraded = 0, pg_size, pg_minsize; int degraded = 0, pg_size, pg_minsize;
osd_rmw_stripe_t *stripes; osd_rmw_stripe_t *stripes;
osd_op_t *subops = NULL; osd_op_t *subops = NULL;
@ -175,7 +175,7 @@ resume_1:
resume_2: resume_2:
if (op_data->errors > 0) if (op_data->errors > 0)
{ {
finish_op(cur_op, -EIO); finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
return; return;
} }
if (op_data->degraded) if (op_data->degraded)
@ -251,8 +251,15 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
.opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ),
.callback = [cur_op, this](blockstore_op_t *subop) .callback = [cur_op, this](blockstore_op_t *subop)
{ {
handle_primary_subop(subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ, if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len)
cur_op, subop->retval == subop->len, subop->version); {
// die
throw std::runtime_error("local write operation failed");
}
handle_primary_subop(
subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ,
cur_op, subop->retval, subop->len, subop->version
);
}, },
.oid = { .oid = {
.inode = op_data->oid.inode, .inode = op_data->oid.inode,
@ -293,7 +300,15 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
{ {
// so it doesn't get freed // so it doesn't get freed
subop->buf = NULL; subop->buf = NULL;
handle_primary_subop(subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len, subop->reply.sec_rw.version); if (subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE && cur_op->reply.hdr.retval != cur_op->req.sec_rw.len)
{
// write operation failed, drop the connection
stop_client(subop->peer_fd);
}
handle_primary_subop(
subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval,
subop->req.sec_rw.len, subop->reply.sec_rw.version
);
}; };
outbox_push(clients[subops[subop].peer_fd], &subops[subop]); outbox_push(clients[subops[subop].peer_fd], &subops[subop]);
} }
@ -302,26 +317,26 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t*
} }
} }
void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint64_t version) void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version)
{ {
osd_primary_op_data_t *op_data = cur_op->op_data; osd_primary_op_data_t *op_data = cur_op->op_data;
if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) if (retval != expected)
{ {
if (op_data->fact_ver != 0 && op_data->fact_ver != version) if (retval == -EPIPE)
{ op_data->epipe++;
throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver));
}
op_data->fact_ver = version;
}
if (!ok)
{
// FIXME: Handle errors
op_data->errors++; op_data->errors++;
throw std::runtime_error("subop error for op "+std::to_string(cur_op->req.hdr.opcode)+": "+std::to_string(op_data->st));
} }
else else
{ {
op_data->done++; op_data->done++;
if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE)
{
if (op_data->fact_ver != 0 && op_data->fact_ver != version)
{
throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver));
}
op_data->fact_ver = version;
}
} }
if ((op_data->errors + op_data->done) >= op_data->n_subops) if ((op_data->errors + op_data->done) >= op_data->n_subops)
{ {
@ -347,6 +362,20 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint
} }
} }
void osd_t::pg_cancel_write_queue(pg_t & pg, object_id oid, int retval)
{
auto st_it = pg.write_queue.find(oid), it = st_it;
while (it != pg.write_queue.end() && it->first == oid)
{
finish_op(it->second, retval);
it++;
}
if (st_it != it)
{
pg.write_queue.erase(st_it, it);
}
}
void osd_t::continue_primary_write(osd_op_t *cur_op) void osd_t::continue_primary_write(osd_op_t *cur_op)
{ {
if (!cur_op->op_data && !prepare_primary_rw(cur_op)) if (!cur_op->op_data && !prepare_primary_rw(cur_op))
@ -417,6 +446,11 @@ resume_8:
op_data->st = 8; op_data->st = 8;
return; return;
resume_9: resume_9:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
reconstruct_stripes(op_data->stripes, pg.pg_size); reconstruct_stripes(op_data->stripes, pg.pg_size);
if (cur_op->req.rw.len > 0) if (cur_op->req.rw.len > 0)
{ {
@ -477,6 +511,11 @@ resume_2:
op_data->st = 2; op_data->st = 2;
return; return;
resume_3: resume_3:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
// Save version override for parallel reads // Save version override for parallel reads
pg.ver_override[op_data->oid] = op_data->fact_ver; pg.ver_override[op_data->oid] = op_data->fact_ver;
// Recover missing stripes, calculate parity // Recover missing stripes, calculate parity
@ -487,6 +526,11 @@ resume_4:
op_data->st = 4; op_data->st = 4;
return; return;
resume_5: resume_5:
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
// FIXME: Check for immediate_commit == IMMEDIATE_SMALL // FIXME: Check for immediate_commit == IMMEDIATE_SMALL
if (immediate_commit == IMMEDIATE_ALL) if (immediate_commit == IMMEDIATE_ALL)
{ {
@ -526,6 +570,11 @@ resume_7:
delete[] op_data->unstable_writes; delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL; op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL; op_data->unstable_write_osds = NULL;
if (op_data->errors > 0)
{
pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
} }
else else
{ {
@ -553,17 +602,15 @@ resume_7:
pg.ver_override.erase(oid); pg.ver_override.erase(oid);
finish_op(cur_op, cur_op->req.rw.len); finish_op(cur_op, cur_op->req.rw.len);
// Continue other write operations to the same object // Continue other write operations to the same object
auto next_it = pg.write_queue.find(oid);
auto this_it = next_it;
next_it++;
pg.write_queue.erase(this_it);
if (next_it != pg.write_queue.end() &&
next_it->first == oid)
{ {
auto next_it = pg.write_queue.find(oid); osd_op_t *next_op = next_it->second;
auto this_it = next_it; continue_primary_write(next_op);
next_it++;
pg.write_queue.erase(this_it);
if (next_it != pg.write_queue.end() &&
next_it->first == oid)
{
osd_op_t *next_op = next_it->second;
continue_primary_write(next_op);
}
} }
} }
@ -575,19 +622,20 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
{ {
cur_op->op_data = (osd_primary_op_data_t*)calloc(sizeof(osd_primary_op_data_t), 1); cur_op->op_data = (osd_primary_op_data_t*)calloc(sizeof(osd_primary_op_data_t), 1);
} }
if (cur_op->op_data->st == 1) goto resume_1; osd_primary_op_data_t *op_data = cur_op->op_data;
else if (cur_op->op_data->st == 2) goto resume_2; if (op_data->st == 1) goto resume_1;
else if (cur_op->op_data->st == 3) goto resume_3; else if (op_data->st == 2) goto resume_2;
else if (cur_op->op_data->st == 4) goto resume_4; else if (op_data->st == 3) goto resume_3;
else if (cur_op->op_data->st == 5) goto resume_5; else if (op_data->st == 4) goto resume_4;
else if (cur_op->op_data->st == 6) goto resume_6; else if (op_data->st == 5) goto resume_5;
assert(cur_op->op_data->st == 0); else if (op_data->st == 6) goto resume_6;
assert(op_data->st == 0);
if (syncs_in_progress.size() > 0) if (syncs_in_progress.size() > 0)
{ {
// Wait for previous syncs, if any // Wait for previous syncs, if any
// FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all // FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all
syncs_in_progress.push_back(cur_op); syncs_in_progress.push_back(cur_op);
cur_op->op_data->st = 1; op_data->st = 1;
resume_1: resume_1:
return; return;
} }
@ -605,18 +653,18 @@ resume_2:
// Save and clear unstable_writes // Save and clear unstable_writes
// FIXME: This is possible to do it on a per-client basis // FIXME: This is possible to do it on a per-client basis
// It would be cool not to copy them here at all, but someone has to deduplicate them by object IDs anyway // It would be cool not to copy them here at all, but someone has to deduplicate them by object IDs anyway
cur_op->op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>(); op_data->unstable_write_osds = new std::vector<unstable_osd_num_t>();
cur_op->op_data->unstable_writes = new obj_ver_id[unstable_writes.size()]; op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()];
{ {
osd_num_t last_osd = 0; osd_num_t last_osd = 0;
int last_start = 0, last_end = 0; int last_start = 0, last_end = 0;
for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++) for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++)
{ {
if (last_osd != it->first.osd_num) if (last_osd != it->first.osd_num)
{ {
if (last_osd != 0) if (last_osd != 0)
{ {
cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){ op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = last_osd, .osd_num = last_osd,
.start = last_start, .start = last_start,
.len = last_end - last_start, .len = last_end - last_start,
@ -625,7 +673,7 @@ resume_2:
last_osd = it->first.osd_num; last_osd = it->first.osd_num;
last_start = last_end; last_start = last_end;
} }
cur_op->op_data->unstable_writes[last_end] = (obj_ver_id){ op_data->unstable_writes[last_end] = (obj_ver_id){
.oid = it->first.oid, .oid = it->first.oid,
.version = it->second, .version = it->second,
}; };
@ -633,42 +681,72 @@ resume_2:
} }
if (last_osd != 0) if (last_osd != 0)
{ {
cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){ op_data->unstable_write_osds->push_back((unstable_osd_num_t){
.osd_num = last_osd, .osd_num = last_osd,
.start = last_start, .start = last_start,
.len = last_end - last_start, .len = last_end - last_start,
}); });
} }
} }
unstable_writes.clear(); this->unstable_writes.clear();
if (immediate_commit != IMMEDIATE_ALL) if (immediate_commit != IMMEDIATE_ALL)
{ {
// SYNC // SYNC
submit_primary_sync_subops(cur_op); submit_primary_sync_subops(cur_op);
resume_3: resume_3:
cur_op->op_data->st = 3; op_data->st = 3;
return; return;
}
resume_4: resume_4:
if (op_data->errors > 0)
{
goto resume_6;
}
}
// Stabilize version sets // Stabilize version sets
submit_primary_stab_subops(cur_op); submit_primary_stab_subops(cur_op);
resume_5: resume_5:
cur_op->op_data->st = 5; op_data->st = 5;
return; return;
resume_6: resume_6:
// FIXME: Free them correctly (via a destructor or so) // FIXME: Free them correctly (via a destructor or so)
delete cur_op->op_data->unstable_write_osds; if (op_data->errors > 0)
delete[] cur_op->op_data->unstable_writes; {
cur_op->op_data->unstable_writes = NULL; // Return objects back into the unstable write set
cur_op->op_data->unstable_write_osds = NULL; for (auto unstable_osd: *(op_data->unstable_write_osds))
{
for (int i = 0; i < unstable_osd.len; i++)
{
uint64_t & uv = this->unstable_writes[(osd_object_id_t){
.osd_num = unstable_osd.osd_num,
.oid = op_data->unstable_writes[i].oid,
}];
uv = uv < op_data->unstable_writes[i].version ? op_data->unstable_writes[i].version : uv;
}
// FIXME: But filter those from peered PGs
}
}
delete op_data->unstable_write_osds;
delete[] op_data->unstable_writes;
op_data->unstable_writes = NULL;
op_data->unstable_write_osds = NULL;
if (op_data->errors > 0)
{
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
}
else
{
finish: finish:
auto it = clients.find(cur_op->peer_fd);
if (it != clients.end())
it->second.dirty_pgs.clear();
finish_op(cur_op, 0);
}
assert(syncs_in_progress.front() == cur_op); assert(syncs_in_progress.front() == cur_op);
syncs_in_progress.pop_front(); syncs_in_progress.pop_front();
finish_op(cur_op, 0);
if (syncs_in_progress.size() > 0) if (syncs_in_progress.size() > 0)
{ {
cur_op = syncs_in_progress.front(); cur_op = syncs_in_progress.front();
cur_op->op_data->st++; op_data->st++;
goto resume_2; goto resume_2;
} }
} }
@ -690,7 +768,12 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
.opcode = BS_OP_SYNC, .opcode = BS_OP_SYNC,
.callback = [cur_op, this](blockstore_op_t *subop) .callback = [cur_op, this](blockstore_op_t *subop)
{ {
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval == 0, 0); if (subop->retval != 0)
{
// die
throw std::runtime_error("local sync operation failed");
}
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval, 0, 0);
}, },
}); });
bs->enqueue_op(subops[i].bs_op); bs->enqueue_op(subops[i].bs_op);
@ -709,7 +792,12 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op)
}; };
subops[i].callback = [cur_op, this](osd_op_t *subop) subops[i].callback = [cur_op, this](osd_op_t *subop)
{ {
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval == 0, 0); if (cur_op->reply.hdr.retval != 0)
{
// sync operation failed, drop the connection
stop_client(subop->peer_fd);
}
handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0);
}; };
outbox_push(clients[subops[i].peer_fd], &subops[i]); outbox_push(clients[subops[i].peer_fd], &subops[i]);
} }
@ -733,7 +821,12 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
.opcode = BS_OP_STABLE, .opcode = BS_OP_STABLE,
.callback = [cur_op, this](blockstore_op_t *subop) .callback = [cur_op, this](blockstore_op_t *subop)
{ {
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval == 0, 0); if (subop->retval != 0)
{
// die
throw std::runtime_error("local stabilize operation failed");
}
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval, 0, 0);
}, },
.len = (uint32_t)stab_osd.len, .len = (uint32_t)stab_osd.len,
.buf = (void*)(op_data->unstable_writes + stab_osd.start), .buf = (void*)(op_data->unstable_writes + stab_osd.start),
@ -756,7 +849,12 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id)); subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id));
subops[i].callback = [cur_op, this](osd_op_t *subop) subops[i].callback = [cur_op, this](osd_op_t *subop)
{ {
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval == 0, 0); if (cur_op->reply.hdr.retval != 0)
{
// sync operation failed, drop the connection
stop_client(subop->peer_fd);
}
handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0);
}; };
outbox_push(clients[subops[i].peer_fd], &subops[i]); outbox_push(clients[subops[i].peer_fd], &subops[i]);
} }