diff --git a/mon/mon.js b/mon/mon.js index 036e9b26..5fd202b6 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -183,7 +183,7 @@ const etcd_tree = { /* : { : { primary: osd_num_t, - state: ("starting"|"peering"|"incomplete"|"active"|"stopping"|"offline"| + state: ("starting"|"peering"|"incomplete"|"active"|"repeering"|"stopping"|"offline"| "degraded"|"has_incomplete"|"has_degraded"|"has_misplaced"|"has_unclean"| "has_invalid"|"left_on_dead")[], } diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 607b9591..e5355db8 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -558,7 +558,7 @@ void osd_t::apply_pg_config() } if (currently_taken) { - if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING)) + if (pg_it->second.state & (PG_ACTIVE | PG_INCOMPLETE | PG_PEERING | PG_REPEERING)) { if (pg_it->second.target_set == pg_cfg.target_set) { diff --git a/src/osd_flush.cpp b/src/osd_flush.cpp index 37ecc40b..62ab408f 100644 --- a/src/osd_flush.cpp +++ b/src/osd_flush.cpp @@ -149,10 +149,14 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p { continue_primary_write(op); } - if (pg.inflight == 0 && (pg.state & PG_STOPPING)) + if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch) { finish_stop_pg(pg); } + else if ((pg.state & PG_REPEERING) && pg.inflight == 0 && !pg.flush_batch) + { + start_pg_peering(pg); + } } } diff --git a/src/osd_peering.cpp b/src/osd_peering.cpp index 38e5bb2d..d8fac206 100644 --- a/src/osd_peering.cpp +++ b/src/osd_peering.cpp @@ -77,10 +77,11 @@ void osd_t::repeer_pgs(osd_num_t peer_osd) // Re-peer affected PGs for (auto & p: pgs) { + auto & pg = p.second; bool repeer = false; - if (p.second.state & (PG_PEERING | PG_ACTIVE | PG_INCOMPLETE)) + if (pg.state & (PG_PEERING | PG_ACTIVE | PG_INCOMPLETE)) { - for (osd_num_t pg_osd: p.second.all_peers) + for (osd_num_t pg_osd: pg.all_peers) { if (pg_osd == peer_osd) { @@ -91,8 +92,17 @@ void osd_t::repeer_pgs(osd_num_t peer_osd) if (repeer) { // Repeer this pg - printf("[PG %u/%u] Repeer because of OSD %lu\n", p.second.pool_id, p.second.pg_num, peer_osd); - start_pg_peering(p.second); + printf("[PG %u/%u] Repeer because of OSD %lu\n", pg.pool_id, pg.pg_num, peer_osd); + if (!(pg.state & (PG_ACTIVE | PG_REPEERING)) || pg.inflight == 0 && !pg.flush_batch) + { + start_pg_peering(pg); + } + else + { + // Stop accepting new operations, wait for current ones to finish or fail + pg.state = pg.state & ~PG_ACTIVE | PG_REPEERING; + report_pg_state(pg); + } } } } @@ -334,9 +344,10 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p { // FIXME: Mark peer as failed and don't reconnect immediately after dropping the connection printf("Failed to sync OSD %lu: %ld (%s), disconnecting peer\n", role_osd, op->reply.hdr.retval, strerror(-op->reply.hdr.retval)); + int fail_fd = op->peer_fd; ps->list_ops.erase(role_osd); - c_cli.stop_client(op->peer_fd); delete op; + c_cli.stop_client(fail_fd); return; } delete op; @@ -413,9 +424,10 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) if (op->reply.hdr.retval < 0) { printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval); + int fail_fd = op->peer_fd; ps->list_ops.erase(role_osd); - c_cli.stop_client(op->peer_fd); delete op; + c_cli.stop_client(fail_fd); return; } printf( @@ -484,15 +496,13 @@ bool osd_t::stop_pg(pg_t & pg) { return false; } - if (!(pg.state & PG_ACTIVE)) + if (!(pg.state & (PG_ACTIVE | PG_REPEERING))) { finish_stop_pg(pg); return true; } - pg.state = pg.state & ~PG_ACTIVE | PG_STOPPING; - if (pg.inflight == 0 && !pg.flush_batch && - // We must either forget all PG's unstable writes or wait for it to become clean - dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end()) + pg.state = pg.state & ~PG_ACTIVE & ~PG_REPEERING | PG_STOPPING; + if (pg.inflight == 0 && !pg.flush_batch) { finish_stop_pg(pg); } diff --git a/src/osd_peering_pg.cpp b/src/osd_peering_pg.cpp index 77da3a18..9f631d93 100644 --- a/src/osd_peering_pg.cpp +++ b/src/osd_peering_pg.cpp @@ -430,12 +430,13 @@ void pg_t::calc_object_states(int log_level) void pg_t::print_state() { printf( - "[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num, + "[PG %u/%u] is %s%s%s%s%s%s%s%s%s%s%s%s%s%s (%lu objects)\n", pool_id, pg_num, (state & PG_STARTING) ? "starting" : "", (state & PG_OFFLINE) ? "offline" : "", (state & PG_PEERING) ? "peering" : "", (state & PG_INCOMPLETE) ? "incomplete" : "", (state & PG_ACTIVE) ? "active" : "", + (state & PG_REPEERING) ? "repeering" : "", (state & PG_STOPPING) ? "stopping" : "", (state & PG_DEGRADED) ? " + degraded" : "", (state & PG_HAS_INCOMPLETE) ? " + has_incomplete" : "", diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index 26129a14..d62434ca 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -291,20 +291,18 @@ resume_5: free_object_state(pg, &op_data->object_state); } pg.total_count--; - object_id oid = op_data->oid; - finish_op(cur_op, cur_op->req.rw.len); - // Continue other write operations to the same object - auto next_it = pg.write_queue.find(oid); - auto this_it = next_it; - if (this_it != pg.write_queue.end() && this_it->second == cur_op) + osd_op_t *next_op = NULL; + auto next_it = pg.write_queue.find(op_data->oid); + if (next_it != pg.write_queue.end() && next_it->second == cur_op) { - next_it++; - pg.write_queue.erase(this_it); - if (next_it != pg.write_queue.end() && - next_it->first == oid) - { - osd_op_t *next_op = next_it->second; - continue_primary_write(next_op); - } + pg.write_queue.erase(next_it++); + if (next_it != pg.write_queue.end() && next_it->first == op_data->oid) + next_op = next_it->second; + } + finish_op(cur_op, cur_op->req.rw.len); + if (next_op) + { + // Continue next write to the same object + continue_primary_write(next_op); } } diff --git a/src/osd_primary_subops.cpp b/src/osd_primary_subops.cpp index 7f1c793e..47aa0a0c 100644 --- a/src/osd_primary_subops.cpp +++ b/src/osd_primary_subops.cpp @@ -43,12 +43,14 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) auto & pg = pgs.at({ .pool_id = INODE_POOL(cur_op->op_data->oid.inode), .pg_num = cur_op->op_data->pg_num }); pg.inflight--; assert(pg.inflight >= 0); - if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch && - // We must either forget all PG's unstable writes or wait for it to become clean - dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end()) + if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch) { finish_stop_pg(pg); } + else if ((pg.state & PG_REPEERING) && pg.inflight == 0 && !pg.flush_batch) + { + start_pg_peering(pg); + } } assert(!cur_op->op_data->subops); assert(!cur_op->op_data->unstable_write_osds); @@ -194,14 +196,7 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s } subops[i].callback = [cur_op, this](osd_op_t *subop) { - int fail_fd = subop->req.hdr.opcode == OSD_OP_SEC_WRITE && - subop->reply.hdr.retval != subop->req.sec_rw.len ? subop->peer_fd : -1; handle_primary_subop(subop, cur_op); - if (fail_fd >= 0) - { - // write operation failed, drop the connection - c_cli.stop_client(fail_fd); - } }; c_cli.outbox_push(&subops[i]); } @@ -247,6 +242,7 @@ void osd_t::handle_primary_bs_subop(osd_op_t *subop) } delete bs_op; subop->bs_op = NULL; + subop->peer_fd = -1; handle_primary_subop(subop, cur_op); } @@ -288,6 +284,11 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) op_data->epipe++; } op_data->errors++; + if (subop->peer_fd >= 0) + { + // Drop connection on any error + c_cli.stop_client(subop->peer_fd); + } } else { @@ -438,13 +439,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_ } }; subops[i].callback = [cur_op, this](osd_op_t *subop) { - int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; handle_primary_subop(subop, cur_op); - if (fail_fd >= 0) - { - // delete operation failed, drop the connection - c_cli.stop_client(fail_fd); - } }; c_cli.outbox_push(&subops[i]); } @@ -489,13 +484,7 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op) } }; subops[i].callback = [cur_op, this](osd_op_t *subop) { - int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; handle_primary_subop(subop, cur_op); - if (fail_fd >= 0) - { - // sync operation failed, drop the connection - c_cli.stop_client(fail_fd); - } }; c_cli.outbox_push(&subops[i]); } @@ -554,13 +543,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) subops[i].iov.push_back(op_data->unstable_writes + stab_osd.start, stab_osd.len * sizeof(obj_ver_id)); subops[i].callback = [cur_op, this](osd_op_t *subop) { - int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; handle_primary_subop(subop, cur_op); - if (fail_fd >= 0) - { - // sync operation failed, drop the connection - c_cli.stop_client(fail_fd); - } }; c_cli.outbox_push(&subops[i]); } @@ -578,7 +561,7 @@ void osd_t::pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, return; } std::vector cancel_ops; - while (it != pg.write_queue.end()) + while (it != pg.write_queue.end() && it->first == oid) { cancel_ops.push_back(it->second); it++; diff --git a/src/osd_primary_sync.cpp b/src/osd_primary_sync.cpp index 536e0b29..376181eb 100644 --- a/src/osd_primary_sync.cpp +++ b/src/osd_primary_sync.cpp @@ -218,12 +218,14 @@ resume_8: { auto & pg = pgs.at(op_data->dirty_pgs[i]); pg.inflight--; - if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch && - // We must either forget all PG's unstable writes or wait for it to become clean - dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end()) + if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch) { finish_stop_pg(pg); } + else if ((pg.state & PG_REPEERING) && pg.inflight == 0 && !pg.flush_batch) + { + start_pg_peering(pg); + } } // FIXME: Free those in the destructor? free(op_data->dirty_pgs); diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index 17945e88..d40257cf 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -252,19 +252,20 @@ resume_9: } cur_op->reply.hdr.retval = cur_op->req.rw.len; continue_others: - object_id oid = op_data->oid; + osd_op_t *next_op = NULL; + auto next_it = pg.write_queue.find(op_data->oid); // Remove the operation from queue before calling finish_op so it doesn't see the completed operation in queue - auto next_it = pg.write_queue.find(oid); if (next_it != pg.write_queue.end() && next_it->second == cur_op) { pg.write_queue.erase(next_it++); + if (next_it != pg.write_queue.end() && next_it->first == op_data->oid) + next_op = next_it->second; } // finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :) - finish_op(cur_op, cur_op->reply.hdr.retval); - // Continue other write operations to the same object - if (next_it != pg.write_queue.end() && next_it->first == oid) + finish_op(cur_op, cur_op->req.rw.len); + if (next_op) { - osd_op_t *next_op = next_it->second; + // Continue next write to the same object continue_primary_write(next_op); } } diff --git a/src/pg_states.cpp b/src/pg_states.cpp index 16938cf2..cb1da2c1 100644 --- a/src/pg_states.cpp +++ b/src/pg_states.cpp @@ -3,13 +3,14 @@ #include "pg_states.h" -const int pg_state_bit_count = 14; +const int pg_state_bit_count = 15; -const int pg_state_bits[14] = { +const int pg_state_bits[15] = { PG_STARTING, PG_PEERING, PG_INCOMPLETE, PG_ACTIVE, + PG_REPEERING, PG_STOPPING, PG_OFFLINE, PG_DEGRADED, @@ -21,11 +22,12 @@ const int pg_state_bits[14] = { PG_LEFT_ON_DEAD, }; -const char *pg_state_names[14] = { +const char *pg_state_names[15] = { "starting", "peering", "incomplete", "active", + "repeering", "stopping", "offline", "degraded", diff --git a/src/pg_states.h b/src/pg_states.h index 3d1281f1..554d754a 100644 --- a/src/pg_states.h +++ b/src/pg_states.h @@ -10,16 +10,17 @@ #define PG_PEERING (1<<1) #define PG_INCOMPLETE (1<<2) #define PG_ACTIVE (1<<3) -#define PG_STOPPING (1<<4) -#define PG_OFFLINE (1<<5) +#define PG_REPEERING (1<<4) +#define PG_STOPPING (1<<5) +#define PG_OFFLINE (1<<6) // Plus any of these: -#define PG_DEGRADED (1<<6) -#define PG_HAS_INCOMPLETE (1<<7) -#define PG_HAS_DEGRADED (1<<8) -#define PG_HAS_MISPLACED (1<<9) -#define PG_HAS_UNCLEAN (1<<10) -#define PG_HAS_INVALID (1<<11) -#define PG_LEFT_ON_DEAD (1<<12) +#define PG_DEGRADED (1<<7) +#define PG_HAS_INCOMPLETE (1<<8) +#define PG_HAS_DEGRADED (1<<9) +#define PG_HAS_MISPLACED (1<<10) +#define PG_HAS_UNCLEAN (1<<11) +#define PG_HAS_INVALID (1<<12) +#define PG_LEFT_ON_DEAD (1<<13) // Lower bits that represent object role (EC 0/1/2... or always 0 with replication) // 12 bits is a safe default that doesn't depend on pg_stripe_size or pg_block_size