diff --git a/blockstore.h b/blockstore.h index eec27b86..670bea33 100644 --- a/blockstore.h +++ b/blockstore.h @@ -93,7 +93,7 @@ Input: - buf = pre-allocated obj_ver_id array units long Output: -- retval = 0 or negative error number (-EINVAL) +- retval = 0 or negative error number (-EINVAL or -EBUSY if not synced) ## BS_OP_SYNC_STAB_ALL diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 2d9525f0..e5fba5ab 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -67,7 +67,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) else if (IS_UNSYNCED(dirty_it->second.state)) { // Object not synced yet. Caller must sync it first - op->retval = -EAGAIN; + op->retval = -EBUSY; FINISH_OP(op); return 1; } diff --git a/osd.cpp b/osd.cpp index 233b4139..d23b2233 100644 --- a/osd.cpp +++ b/osd.cpp @@ -305,7 +305,7 @@ void osd_t::cancel_op(osd_op_t *op) } else { - delete op; + finish_op(op, -EPIPE); } } diff --git a/osd.h b/osd.h index 5e4122e7..24d18c25 100644 --- a/osd.h +++ b/osd.h @@ -150,7 +150,7 @@ struct osd_client_t // Outbound messages (replies or requests) std::deque outbox; - // PGs dirtied by this client's primary-writes + // PGs dirtied by this client's primary-writes (FIXME to drop the connection) std::set dirty_pgs; // Write state @@ -276,7 +276,8 @@ class osd_t void continue_primary_write(osd_op_t *cur_op); void continue_primary_sync(osd_op_t *cur_op); void finish_op(osd_op_t *cur_op, int retval); - void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint64_t version); + void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version); + void pg_cancel_write_queue(pg_t & pg, object_id oid, int retval); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/osd_peering.cpp b/osd_peering.cpp index 09b9e9df..a9c48fdd 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -256,13 +256,21 @@ void osd_t::start_pg_peering(pg_num_t pg_num) pg.state = PG_PEERING; pg.print_state(); pg.state_dict.clear(); + pg.incomplete_objects.clear(); pg.misplaced_objects.clear(); pg.degraded_objects.clear(); - pg.ver_override.clear(); pg.flush_actions.clear(); + pg.ver_override.clear(); if (pg.flush_batch) + { delete pg.flush_batch; + } pg.flush_batch = NULL; + for (auto p: pg.write_queue) + { + cancel_op(p.second); + } + pg.write_queue.clear(); pg.pg_cursize = 0; for (int role = 0; role < pg.cur_set.size(); role++) { diff --git a/osd_peering_pg.h b/osd_peering_pg.h index 60ed31a3..6d07fe7e 100644 --- a/osd_peering_pg.h +++ b/osd_peering_pg.h @@ -14,12 +14,13 @@ #define PG_PEERING (1<<1) #define PG_INCOMPLETE (1<<2) #define PG_ACTIVE (1<<3) +#define PG_STOPPING (1<<4) // Plus any of these: -#define PG_DEGRADED (1<<4) -#define PG_HAS_INCOMPLETE (1<<5) -#define PG_HAS_DEGRADED (1<<6) -#define PG_HAS_MISPLACED (1<<7) -#define PG_HAS_UNCLEAN (1<<8) +#define PG_DEGRADED (1<<5) +#define PG_HAS_INCOMPLETE (1<<6) +#define PG_HAS_DEGRADED (1<<7) +#define PG_HAS_MISPLACED (1<<8) +#define PG_HAS_UNCLEAN (1<<9) // FIXME: Safe default that doesn't depend on parity_block_size or pg_parity_size #define STRIPE_MASK ((uint64_t)4096 - 1) @@ -110,7 +111,6 @@ struct pg_t btree::btree_map ver_override; pg_peering_state_t *peering_state = NULL; pg_flush_batch_t *flush_batch = NULL; - int flush_actions_in_progress = 0; std::multimap write_queue; diff --git a/osd_primary.cpp b/osd_primary.cpp index 5fa28163..36411ee5 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -27,7 +27,7 @@ struct osd_primary_op_data_t object_id oid; uint64_t target_ver; uint64_t fact_ver = 0; - int n_subops = 0, done = 0, errors = 0; + int n_subops = 0, done = 0, errors = 0, epipe = 0; int degraded = 0, pg_size, pg_minsize; osd_rmw_stripe_t *stripes; osd_op_t *subops = NULL; @@ -175,7 +175,7 @@ resume_1: resume_2: if (op_data->errors > 0) { - finish_op(cur_op, -EIO); + finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO); return; } if (op_data->degraded) @@ -251,8 +251,15 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), .callback = [cur_op, this](blockstore_op_t *subop) { - handle_primary_subop(subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ, - cur_op, subop->retval == subop->len, subop->version); + if (subop->opcode == BS_OP_WRITE && subop->retval != subop->len) + { + // die + throw std::runtime_error("local write operation failed"); + } + handle_primary_subop( + subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ, + cur_op, subop->retval, subop->len, subop->version + ); }, .oid = { .inode = op_data->oid.inode, @@ -293,7 +300,15 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* { // so it doesn't get freed subop->buf = NULL; - handle_primary_subop(subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len, subop->reply.sec_rw.version); + if (subop->req.hdr.opcode == OSD_OP_SECONDARY_WRITE && cur_op->reply.hdr.retval != cur_op->req.sec_rw.len) + { + // write operation failed, drop the connection + stop_client(subop->peer_fd); + } + handle_primary_subop( + subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval, + subop->req.sec_rw.len, subop->reply.sec_rw.version + ); }; outbox_push(clients[subops[subop].peer_fd], &subops[subop]); } @@ -302,26 +317,26 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* } } -void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint64_t version) +void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int retval, int expected, uint64_t version) { osd_primary_op_data_t *op_data = cur_op->op_data; - if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) + if (retval != expected) { - if (op_data->fact_ver != 0 && op_data->fact_ver != version) - { - throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)); - } - op_data->fact_ver = version; - } - if (!ok) - { - // FIXME: Handle errors + if (retval == -EPIPE) + op_data->epipe++; op_data->errors++; - throw std::runtime_error("subop error for op "+std::to_string(cur_op->req.hdr.opcode)+": "+std::to_string(op_data->st)); } else { op_data->done++; + if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) + { + if (op_data->fact_ver != 0 && op_data->fact_ver != version) + { + throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)); + } + op_data->fact_ver = version; + } } if ((op_data->errors + op_data->done) >= op_data->n_subops) { @@ -347,6 +362,20 @@ void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint } } +void osd_t::pg_cancel_write_queue(pg_t & pg, object_id oid, int retval) +{ + auto st_it = pg.write_queue.find(oid), it = st_it; + while (it != pg.write_queue.end() && it->first == oid) + { + finish_op(it->second, retval); + it++; + } + if (st_it != it) + { + pg.write_queue.erase(st_it, it); + } +} + void osd_t::continue_primary_write(osd_op_t *cur_op) { if (!cur_op->op_data && !prepare_primary_rw(cur_op)) @@ -417,6 +446,11 @@ resume_8: op_data->st = 8; return; resume_9: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } reconstruct_stripes(op_data->stripes, pg.pg_size); if (cur_op->req.rw.len > 0) { @@ -477,6 +511,11 @@ resume_2: op_data->st = 2; return; resume_3: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } // Save version override for parallel reads pg.ver_override[op_data->oid] = op_data->fact_ver; // Recover missing stripes, calculate parity @@ -487,6 +526,11 @@ resume_4: op_data->st = 4; return; resume_5: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } // FIXME: Check for immediate_commit == IMMEDIATE_SMALL if (immediate_commit == IMMEDIATE_ALL) { @@ -526,6 +570,11 @@ resume_7: delete[] op_data->unstable_writes; op_data->unstable_writes = NULL; op_data->unstable_write_osds = NULL; + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } } else { @@ -553,17 +602,15 @@ resume_7: pg.ver_override.erase(oid); finish_op(cur_op, cur_op->req.rw.len); // Continue other write operations to the same object + auto next_it = pg.write_queue.find(oid); + auto this_it = next_it; + next_it++; + pg.write_queue.erase(this_it); + if (next_it != pg.write_queue.end() && + next_it->first == oid) { - auto next_it = pg.write_queue.find(oid); - auto this_it = next_it; - next_it++; - pg.write_queue.erase(this_it); - if (next_it != pg.write_queue.end() && - next_it->first == oid) - { - osd_op_t *next_op = next_it->second; - continue_primary_write(next_op); - } + osd_op_t *next_op = next_it->second; + continue_primary_write(next_op); } } @@ -575,19 +622,20 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op) { cur_op->op_data = (osd_primary_op_data_t*)calloc(sizeof(osd_primary_op_data_t), 1); } - if (cur_op->op_data->st == 1) goto resume_1; - else if (cur_op->op_data->st == 2) goto resume_2; - else if (cur_op->op_data->st == 3) goto resume_3; - else if (cur_op->op_data->st == 4) goto resume_4; - else if (cur_op->op_data->st == 5) goto resume_5; - else if (cur_op->op_data->st == 6) goto resume_6; - assert(cur_op->op_data->st == 0); + osd_primary_op_data_t *op_data = cur_op->op_data; + if (op_data->st == 1) goto resume_1; + else if (op_data->st == 2) goto resume_2; + else if (op_data->st == 3) goto resume_3; + else if (op_data->st == 4) goto resume_4; + else if (op_data->st == 5) goto resume_5; + else if (op_data->st == 6) goto resume_6; + assert(op_data->st == 0); if (syncs_in_progress.size() > 0) { // Wait for previous syncs, if any // FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all syncs_in_progress.push_back(cur_op); - cur_op->op_data->st = 1; + op_data->st = 1; resume_1: return; } @@ -605,18 +653,18 @@ resume_2: // Save and clear unstable_writes // FIXME: This is possible to do it on a per-client basis // It would be cool not to copy them here at all, but someone has to deduplicate them by object IDs anyway - cur_op->op_data->unstable_write_osds = new std::vector(); - cur_op->op_data->unstable_writes = new obj_ver_id[unstable_writes.size()]; + op_data->unstable_write_osds = new std::vector(); + op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()]; { osd_num_t last_osd = 0; int last_start = 0, last_end = 0; - for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++) + for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++) { if (last_osd != it->first.osd_num) { if (last_osd != 0) { - cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ .osd_num = last_osd, .start = last_start, .len = last_end - last_start, @@ -625,7 +673,7 @@ resume_2: last_osd = it->first.osd_num; last_start = last_end; } - cur_op->op_data->unstable_writes[last_end] = (obj_ver_id){ + op_data->unstable_writes[last_end] = (obj_ver_id){ .oid = it->first.oid, .version = it->second, }; @@ -633,42 +681,72 @@ resume_2: } if (last_osd != 0) { - cur_op->op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ .osd_num = last_osd, .start = last_start, .len = last_end - last_start, }); } } - unstable_writes.clear(); + this->unstable_writes.clear(); if (immediate_commit != IMMEDIATE_ALL) { // SYNC submit_primary_sync_subops(cur_op); resume_3: - cur_op->op_data->st = 3; + op_data->st = 3; return; - } resume_4: + if (op_data->errors > 0) + { + goto resume_6; + } + } // Stabilize version sets submit_primary_stab_subops(cur_op); resume_5: - cur_op->op_data->st = 5; + op_data->st = 5; return; resume_6: // FIXME: Free them correctly (via a destructor or so) - delete cur_op->op_data->unstable_write_osds; - delete[] cur_op->op_data->unstable_writes; - cur_op->op_data->unstable_writes = NULL; - cur_op->op_data->unstable_write_osds = NULL; + if (op_data->errors > 0) + { + // Return objects back into the unstable write set + for (auto unstable_osd: *(op_data->unstable_write_osds)) + { + for (int i = 0; i < unstable_osd.len; i++) + { + uint64_t & uv = this->unstable_writes[(osd_object_id_t){ + .osd_num = unstable_osd.osd_num, + .oid = op_data->unstable_writes[i].oid, + }]; + uv = uv < op_data->unstable_writes[i].version ? op_data->unstable_writes[i].version : uv; + } + // FIXME: But filter those from peered PGs + } + } + delete op_data->unstable_write_osds; + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + op_data->unstable_write_osds = NULL; + if (op_data->errors > 0) + { + finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO); + } + else + { finish: + auto it = clients.find(cur_op->peer_fd); + if (it != clients.end()) + it->second.dirty_pgs.clear(); + finish_op(cur_op, 0); + } assert(syncs_in_progress.front() == cur_op); syncs_in_progress.pop_front(); - finish_op(cur_op, 0); if (syncs_in_progress.size() > 0) { cur_op = syncs_in_progress.front(); - cur_op->op_data->st++; + op_data->st++; goto resume_2; } } @@ -690,7 +768,12 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) .opcode = BS_OP_SYNC, .callback = [cur_op, this](blockstore_op_t *subop) { - handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval == 0, 0); + if (subop->retval != 0) + { + // die + throw std::runtime_error("local sync operation failed"); + } + handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval, 0, 0); }, }); bs->enqueue_op(subops[i].bs_op); @@ -709,7 +792,12 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) }; subops[i].callback = [cur_op, this](osd_op_t *subop) { - handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval == 0, 0); + if (cur_op->reply.hdr.retval != 0) + { + // sync operation failed, drop the connection + stop_client(subop->peer_fd); + } + handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval, 0, 0); }; outbox_push(clients[subops[i].peer_fd], &subops[i]); } @@ -733,7 +821,12 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) .opcode = BS_OP_STABLE, .callback = [cur_op, this](blockstore_op_t *subop) { - handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval == 0, 0); + if (subop->retval != 0) + { + // die + throw std::runtime_error("local stabilize operation failed"); + } + handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval, 0, 0); }, .len = (uint32_t)stab_osd.len, .buf = (void*)(op_data->unstable_writes + stab_osd.start), @@ -756,7 +849,12 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id)); subops[i].callback = [cur_op, this](osd_op_t *subop) { - handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval == 0, 0); + if (cur_op->reply.hdr.retval != 0) + { + // sync operation failed, drop the connection + stop_client(subop->peer_fd); + } + handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval, 0, 0); }; outbox_push(clients[subops[i].peer_fd], &subops[i]); }