diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 4fe78c5e..2d9525f0 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -67,7 +67,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) else if (IS_UNSYNCED(dirty_it->second.state)) { // Object not synced yet. Caller must sync it first - op->retval = EAGAIN; + op->retval = -EAGAIN; FINISH_OP(op); return 1; } diff --git a/osd.h b/osd.h index af1bcf8c..5e4122e7 100644 --- a/osd.h +++ b/osd.h @@ -169,10 +169,10 @@ struct osd_object_id_t struct osd_recovery_state_t { - int st; - pg_num_t pg_num; - object_id oid; - osd_op_t *op; + int st = 0; + pg_num_t pg_num = 0; + object_id oid = { 0 }; + osd_op_t *op = NULL; }; class osd_t @@ -197,7 +197,7 @@ class osd_t int peering_state = 0; unsigned pg_count = 0; uint64_t next_subop_id = 1; - osd_recovery_state_t *recovery_state; + osd_recovery_state_t recovery_state; // Unstable writes std::map unstable_writes; @@ -276,7 +276,7 @@ class osd_t void continue_primary_write(osd_op_t *cur_op); void continue_primary_sync(osd_op_t *cur_op); void finish_op(osd_op_t *cur_op, int retval); - void handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version); + void handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint64_t version); void submit_primary_subops(int submit_type, int read_pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/osd_flush.cpp b/osd_flush.cpp index c63120e4..96b74565 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -190,67 +190,68 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback bool osd_t::continue_recovery() { pg_t *pg = NULL; - if (recovery_state->st == 0) goto resume_0; - else if (recovery_state->st == 1) goto resume_1; - else if (recovery_state->st == 2) goto resume_2; - else if (recovery_state->st == 3) goto resume_3; - else if (recovery_state->st == 4) goto resume_4; + if (recovery_state.st == 0) goto resume_0; + else if (recovery_state.st == 1) goto resume_1; + else if (recovery_state.st == 2) goto resume_2; + else if (recovery_state.st == 3) goto resume_3; + else if (recovery_state.st == 4) goto resume_4; resume_0: for (auto p: pgs) { if (p.second.state & PG_HAS_DEGRADED) { - recovery_state->pg_num = p.first; + recovery_state.pg_num = p.first; goto resume_1; } } - recovery_state->st = 0; + recovery_state.st = 0; return false; resume_1: - pg = &pgs[recovery_state->pg_num]; + pg = &pgs[recovery_state.pg_num]; if (!pg->degraded_objects.size()) { pg->state = pg->state & ~PG_HAS_DEGRADED; + pg->print_state(); goto resume_0; } - recovery_state->oid = pg->degraded_objects.begin()->first; - recovery_state->op = new osd_op_t(); - recovery_state->op->op_type = OSD_OP_OUT; - recovery_state->op->req = { + recovery_state.oid = pg->degraded_objects.begin()->first; + recovery_state.op = new osd_op_t(); + recovery_state.op->op_type = OSD_OP_OUT; + recovery_state.op->req = { .rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = 0, + .id = 1, .opcode = OSD_OP_WRITE, }, - .inode = recovery_state->oid.inode, - .offset = recovery_state->oid.stripe, + .inode = recovery_state.oid.inode, + .offset = recovery_state.oid.stripe, .len = 0, }, }; - recovery_state->op->callback = [this](osd_op_t *op) + recovery_state.op->callback = [this](osd_op_t *op) { if (op->reply.hdr.retval < 0) - recovery_state->st += 1; // error + recovery_state.st += 1; // error else - recovery_state->st += 2; // ok + recovery_state.st += 2; // ok continue_recovery(); }; - exec_op(recovery_state->op); - recovery_state->st = 2; + exec_op(recovery_state.op); + recovery_state.st = 2; resume_2: return true; resume_3: // FIXME handle error throw std::runtime_error("failed to recover an object"); resume_4: - delete recovery_state->op; - recovery_state->op = NULL; + delete recovery_state.op; + recovery_state.op = NULL; // Don't sync the write, it will be synced by our regular sync coroutine - pg = &pgs[recovery_state->pg_num]; + pg = &pgs[recovery_state.pg_num]; pg_osd_set_state_t *st; { - auto st_it = pg->degraded_objects.find(recovery_state->oid); + auto st_it = pg->degraded_objects.find(recovery_state.oid); st = st_it->second; pg->degraded_objects.erase(st_it); } @@ -311,12 +312,12 @@ resume_4: } new_st = &st_it->second; new_st->object_count++; - pg->misplaced_objects[recovery_state->oid] = new_st; + pg->misplaced_objects[recovery_state.oid] = new_st; } if (!st->object_count) { pg->state_dict.erase(st->osd_set); } - recovery_state->st = 0; + recovery_state.st = 0; goto resume_0; } diff --git a/osd_peering.cpp b/osd_peering.cpp index 30c83597..09b9e9df 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -177,9 +177,9 @@ void osd_t::handle_peers() { p.second.calc_object_states(); if (p.second.state & PG_HAS_UNCLEAN) - { peering_state = peering_state | OSD_FLUSHING_PGS; - } + else + peering_state = peering_state | OSD_RECOVERING; } else { diff --git a/osd_primary.cpp b/osd_primary.cpp index 5abca5a9..2798779c 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -233,7 +233,9 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* { zero_read = -1; } + uint64_t op_version = w ? op_data->fact_ver+1 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver); osd_op_t *subops = new osd_op_t[n_subops]; + op_data->fact_ver = 0; op_data->done = op_data->errors = 0; op_data->n_subops = n_subops; op_data->subops = subops; @@ -254,13 +256,14 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* .opcode = (uint64_t)(w ? BS_OP_WRITE : BS_OP_READ), .callback = [cur_op, this](blockstore_op_t *subop) { - handle_primary_subop(cur_op, subop->retval == subop->len, subop->version); + handle_primary_subop(subop->opcode == BS_OP_WRITE ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ, + cur_op, subop->retval == subop->len, subop->version); }, .oid = { .inode = op_data->oid.inode, .stripe = op_data->oid.stripe | role, }, - .version = w ? 0 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver), + .version = op_version, .offset = w ? stripes[role].write_start : stripes[role].read_start, .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, .buf = w ? stripes[role].write_buf : stripes[role].read_buf, @@ -282,7 +285,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* .inode = op_data->oid.inode, .stripe = op_data->oid.stripe | role, }, - .version = w ? 0 : (submit_type == SUBMIT_RMW_READ ? UINT64_MAX : op_data->target_ver), + .version = op_version, .offset = w ? stripes[role].write_start : stripes[role].read_start, .len = w ? stripes[role].write_end - stripes[role].write_start : stripes[role].read_end - stripes[role].read_start, }; @@ -295,7 +298,7 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* { // so it doesn't get freed subop->buf = NULL; - handle_primary_subop(cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len, subop->reply.sec_rw.version); + handle_primary_subop(subop->req.hdr.opcode, cur_op, subop->reply.hdr.retval == subop->req.sec_rw.len, subop->reply.sec_rw.version); }; outbox_push(clients[subops[subop].peer_fd], &subops[subop]); } @@ -304,14 +307,22 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* } } -void osd_t::handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version) +void osd_t::handle_primary_subop(uint64_t opcode, osd_op_t *cur_op, int ok, uint64_t version) { osd_primary_op_data_t *op_data = cur_op->op_data; - op_data->fact_ver = version; + if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) + { + if (op_data->fact_ver != 0 && op_data->fact_ver != version) + { + throw std::runtime_error("different fact_versions returned from subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)); + } + op_data->fact_ver = version; + } if (!ok) { // FIXME: Handle errors op_data->errors++; + throw std::runtime_error("subop error for op "+std::to_string(cur_op->req.hdr.opcode)+": "+std::to_string(op_data->st)); } else { @@ -413,6 +424,11 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) resume_8: return; resume_9: + for (int i = 0; i < pg.pg_minsize; i++) + { + op_data->stripes[i].read_start = 0; + op_data->stripes[i].read_end = 0; + } memcpy( op_data->recovery_buf + cur_op->req.rw.offset - op_data->oid.stripe, cur_op->buf, cur_op->req.rw.len @@ -420,8 +436,11 @@ resume_9: free(cur_op->buf); cur_op->buf = op_data->recovery_buf; op_data->recovery_buf = NULL; + // Determine blocks to write, bypass RMW_READ + cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize); + goto resume_3; resume_1: - // Determine blocks to read + // Determine blocks to read and write cur_op->rmw_buf = calc_rmw_reads(cur_op->buf, op_data->stripes, pg.cur_set.data(), pg.pg_size, pg.pg_minsize, pg.pg_cursize); // Read required blocks submit_primary_subops(SUBMIT_RMW_READ, pg.pg_size, pg.cur_set.data(), cur_op); @@ -641,7 +660,7 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) .opcode = BS_OP_SYNC, .callback = [cur_op, this](blockstore_op_t *subop) { - handle_primary_subop(cur_op, subop->retval == 0, 0); + handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->retval == 0, 0); }, }); bs->enqueue_op(subops[i].bs_op); @@ -660,7 +679,7 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) }; subops[i].callback = [cur_op, this](osd_op_t *subop) { - handle_primary_subop(cur_op, subop->reply.hdr.retval == 0, 0); + handle_primary_subop(OSD_OP_SECONDARY_SYNC, cur_op, subop->reply.hdr.retval == 0, 0); }; outbox_push(clients[subops[i].peer_fd], &subops[i]); } @@ -684,7 +703,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) .opcode = BS_OP_STABLE, .callback = [cur_op, this](blockstore_op_t *subop) { - handle_primary_subop(cur_op, subop->retval == 0, 0); + handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->retval == 0, 0); }, .len = (uint32_t)stab_osd.len, .buf = (void*)(op_data->unstable_writes + stab_osd.start), @@ -707,7 +726,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) subops[i].send_list.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id)); subops[i].callback = [cur_op, this](osd_op_t *subop) { - handle_primary_subop(cur_op, subop->reply.hdr.retval == 0, 0); + handle_primary_subop(OSD_OP_SECONDARY_STABILIZE, cur_op, subop->reply.hdr.retval == 0, 0); }; outbox_push(clients[subops[i].peer_fd], &subops[i]); }