diff --git a/mon/mon.js b/mon/mon.js index 9692dd23..e49e6bf1 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -66,6 +66,7 @@ const etcd_tree = { autosync_interval: 5, client_queue_depth: 128, // unused recovery_queue_depth: 4, + recovery_sync_batch: 16, readonly: false, no_recovery: false, no_rebalance: false, diff --git a/src/osd.cpp b/src/osd.cpp index 5eea63c6..025d9bfc 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -96,6 +96,9 @@ void osd_t::parse_config(blockstore_config_t & config) recovery_queue_depth = strtoull(config["recovery_queue_depth"].c_str(), NULL, 10); if (recovery_queue_depth < 1 || recovery_queue_depth > MAX_RECOVERY_QUEUE) recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; + recovery_sync_batch = strtoull(config["recovery_sync_batch"].c_str(), NULL, 10); + if (recovery_sync_batch < 1 || recovery_sync_batch > MAX_RECOVERY_QUEUE) + recovery_sync_batch = DEFAULT_RECOVERY_BATCH; if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes") readonly = true; print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10); diff --git a/src/osd.h b/src/osd.h index 3673a587..3a86efad 100644 --- a/src/osd.h +++ b/src/osd.h @@ -37,6 +37,7 @@ #define DEFAULT_AUTOSYNC_INTERVAL 5 #define MAX_RECOVERY_QUEUE 2048 #define DEFAULT_RECOVERY_QUEUE 4 +#define DEFAULT_RECOVERY_BATCH 16 //#define OSD_STUB @@ -76,6 +77,7 @@ class osd_t int immediate_commit = IMMEDIATE_NONE; int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; + int recovery_sync_batch = DEFAULT_RECOVERY_BATCH; int log_level = 0; // cluster state @@ -97,9 +99,11 @@ class osd_t std::map pgs; std::set dirty_pgs; std::set dirty_osds; + int copies_to_delete_after_sync_count = 0; uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0; int peering_state = 0; std::map recovery_ops; + int recovery_done = 0; osd_op_t *autosync_op = NULL; // Unstable writes @@ -201,6 +205,7 @@ class osd_t void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval); void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op); void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set); + void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count); void submit_primary_sync_subops(osd_op_t *cur_op); void submit_primary_stab_subops(osd_op_t *cur_op); diff --git a/src/osd_flush.cpp b/src/osd_flush.cpp index c62023ae..bf0401ba 100644 --- a/src/osd_flush.cpp +++ b/src/osd_flush.cpp @@ -270,7 +270,6 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op) } op->osd_op->callback = [this, op](osd_op_t *osd_op) { - // Don't sync the write, it will be synced by our regular sync coroutine if (osd_op->reply.hdr.retval < 0) { // Error recovering object @@ -292,6 +291,17 @@ void osd_t::submit_recovery_op(osd_recovery_op_t *op) op->osd_op = NULL; recovery_ops.erase(op->oid); delete osd_op; + if (immediate_commit != IMMEDIATE_ALL) + { + recovery_done++; + if (recovery_done >= recovery_sync_batch) + { + // Force sync every operations + // This is required not to pile up an excessive amount of delete operations + autosync(); + recovery_done = 0; + } + } continue_recovery(); }; exec_op(op->osd_op); diff --git a/src/osd_peering.cpp b/src/osd_peering.cpp index 64facb96..38e5bb2d 100644 --- a/src/osd_peering.cpp +++ b/src/osd_peering.cpp @@ -103,6 +103,8 @@ void osd_t::reset_pg(pg_t & pg) { pg.cur_peers.clear(); pg.state_dict.clear(); + copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size(); + pg.copies_to_delete_after_sync.clear(); incomplete_objects -= pg.incomplete_objects.size(); misplaced_objects -= pg.misplaced_objects.size(); degraded_objects -= pg.degraded_objects.size(); diff --git a/src/osd_peering_pg.h b/src/osd_peering_pg.h index 6c5979cb..d0e729ee 100644 --- a/src/osd_peering_pg.h +++ b/src/osd_peering_pg.h @@ -56,6 +56,13 @@ struct obj_piece_id_t uint64_t osd_num; }; +struct obj_ver_osd_t +{ + uint64_t osd_num; + object_id oid; + uint64_t version; +}; + struct flush_action_t { bool rollback = false, make_stable = false; @@ -101,6 +108,7 @@ struct pg_t std::map state_dict; btree::btree_map incomplete_objects, misplaced_objects, degraded_objects; std::map flush_actions; + std::vector copies_to_delete_after_sync; btree::btree_map ver_override; pg_peering_state_t *peering_state = NULL; pg_flush_batch_t *flush_batch = NULL; diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index 960d4f6c..cd12f395 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -367,17 +367,44 @@ resume_7: } // Any kind of a non-clean object can have extra chunks, because we don't record objects // as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks - submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set); - if (op_data->n_subops > 0) + if (immediate_commit != IMMEDIATE_ALL) { -resume_8: - op_data->st = 8; - return; -resume_9: - if (op_data->errors > 0) + // We can't remove extra chunks yet if fsyncs are explicit, because + // new copies may not be committed to stable storage yet + // We can only remove extra chunks after a successful SYNC for this PG + for (auto & chunk: op_data->object_state->osd_set) { - pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + // Check is the same as in submit_primary_del_subops() + if (op_data->scheme == POOL_SCHEME_REPLICATED + ? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num) + : (chunk.osd_num != pg.cur_set[chunk.role])) + { + pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){ + .osd_num = chunk.osd_num, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role), + }, + .version = op_data->fact_ver, + }); + copies_to_delete_after_sync_count++; + } + } + } + else + { + submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set); + if (op_data->n_subops > 0) + { +resume_8: + op_data->st = 8; return; +resume_9: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } } } // Clear object state @@ -511,6 +538,8 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op) else if (op_data->st == 4) goto resume_4; else if (op_data->st == 5) goto resume_5; else if (op_data->st == 6) goto resume_6; + else if (op_data->st == 7) goto resume_7; + else if (op_data->st == 8) goto resume_8; assert(op_data->st == 0); if (syncs_in_progress.size() > 0) { @@ -572,11 +601,34 @@ resume_2: this->unstable_writes.clear(); } { - void *dirty_buf = malloc_or_die(sizeof(pool_pg_num_t)*dirty_pgs.size() + sizeof(osd_num_t)*dirty_osds.size()); + void *dirty_buf = malloc_or_die( + sizeof(pool_pg_num_t)*dirty_pgs.size() + + sizeof(osd_num_t)*dirty_osds.size() + + sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count + ); op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf; op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size()); op_data->dirty_pg_count = dirty_pgs.size(); op_data->dirty_osd_count = dirty_osds.size(); + if (this->copies_to_delete_after_sync_count) + { + op_data->copies_to_delete_count = 0; + op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count); + for (auto dirty_pg_num: dirty_pgs) + { + auto & pg = pgs.at(dirty_pg_num); + assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count); + memcpy( + op_data->copies_to_delete + op_data->copies_to_delete_count, + pg.copies_to_delete_after_sync.data(), + sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size() + ); + op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size(); + this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size(); + pg.copies_to_delete_after_sync.clear(); + } + assert(this->copies_to_delete_after_sync_count == 0); + } int dpg = 0; for (auto dirty_pg_num: dirty_pgs) { @@ -649,6 +701,36 @@ resume_6: } } } + if (op_data->copies_to_delete) + { + // Return 'copies to delete' back into respective PGs + for (int i = 0; i < op_data->copies_to_delete_count; i++) + { + auto & w = op_data->copies_to_delete[i]; + auto & pg = pgs.at((pool_pg_num_t){ + .pool_id = INODE_POOL(w.oid.inode), + .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size), + }); + if (pg.state & PG_ACTIVE) + { + pg.copies_to_delete_after_sync.push_back(w); + copies_to_delete_after_sync_count++; + } + } + } + } + else if (op_data->copies_to_delete) + { + // Actually delete copies which we wanted to delete + submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count); +resume_7: + op_data->st = 7; + return; +resume_8: + if (op_data->errors > 0) + { + goto resume_6; + } } for (int i = 0; i < op_data->dirty_pg_count; i++) { diff --git a/src/osd_primary.h b/src/osd_primary.h index 66ed5645..b53310f0 100644 --- a/src/osd_primary.h +++ b/src/osd_primary.h @@ -38,4 +38,8 @@ struct osd_primary_op_data_t osd_num_t *dirty_osds = NULL; int dirty_osd_count = 0; obj_ver_id *unstable_writes = NULL; + obj_ver_osd_t *copies_to_delete = NULL; + int copies_to_delete_count = 0; }; + +bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num); diff --git a/src/osd_primary_subops.cpp b/src/osd_primary_subops.cpp index 8f0988b8..a923996b 100644 --- a/src/osd_primary_subops.cpp +++ b/src/osd_primary_subops.cpp @@ -355,7 +355,7 @@ void osd_t::cancel_primary_write(osd_op_t *cur_op) } } -static bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num) +bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num) { for (uint64_t i = 0; i < size; i++) { @@ -371,78 +371,82 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, osd_num_t *cur_set, uint { osd_primary_op_data_t *op_data = cur_op->op_data; bool rep = op_data->scheme == POOL_SCHEME_REPLICATED; - int extra_chunks = 0; - // ordered comparison for EC/XOR, unordered for replicated pools + obj_ver_osd_t extra_chunks[loc_set.size()]; + int chunks_to_del = 0; for (auto & chunk: loc_set) { - if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) + // ordered comparison for EC/XOR, unordered for replicated pools + if (!cur_set || (rep + ? !contains_osd(cur_set, set_size, chunk.osd_num) + : (chunk.osd_num != cur_set[chunk.role]))) { - extra_chunks++; + extra_chunks[chunks_to_del++] = (obj_ver_osd_t){ + .osd_num = chunk.osd_num, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | (rep ? 0 : chunk.role), + }, + // Same version as write + .version = op_data->fact_ver, + }; } } - op_data->n_subops = extra_chunks; + submit_primary_del_batch(cur_op, extra_chunks, chunks_to_del); +} + +void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + op_data->n_subops = chunks_to_delete_count; op_data->done = op_data->errors = 0; - if (!extra_chunks) + if (!op_data->n_subops) { return; } - osd_op_t *subops = new osd_op_t[extra_chunks]; + osd_op_t *subops = new osd_op_t[chunks_to_delete_count]; op_data->subops = subops; - int i = 0; - for (auto & chunk: loc_set) + for (int i = 0; i < chunks_to_delete_count; i++) { - if (!cur_set || (rep ? !contains_osd(cur_set, set_size, chunk.osd_num) : chunk.osd_num != cur_set[chunk.role])) + auto & chunk = chunks_to_delete[i]; + if (chunk.osd_num == this->osd_num) { - int stripe_num = op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role; - if (chunk.osd_num == this->osd_num) - { - clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); - subops[i].op_type = (uint64_t)cur_op; - subops[i].bs_op = new blockstore_op_t({ - .opcode = BS_OP_DELETE, - .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) - { - handle_primary_bs_subop(subop); - }, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | stripe_num, - }, - // Same version as write - .version = op_data->fact_ver, - }); - bs->enqueue_op(subops[i].bs_op); - } - else - { - subops[i].op_type = OSD_OP_OUT; - subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num); - subops[i].req.sec_del = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, - .opcode = OSD_OP_SEC_DELETE, - }, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | stripe_num, - }, - // Same version as write - .version = op_data->fact_ver, - }; - subops[i].callback = [cur_op, this](osd_op_t *subop) + clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); + subops[i].op_type = (uint64_t)cur_op; + subops[i].bs_op = new blockstore_op_t({ + .opcode = BS_OP_DELETE, + .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) { - int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; - handle_primary_subop(subop, cur_op); - if (fail_fd >= 0) - { - // delete operation failed, drop the connection - c_cli.stop_client(fail_fd); - } - }; - c_cli.outbox_push(&subops[i]); - } - i++; + handle_primary_bs_subop(subop); + }, + .oid = chunk.oid, + .version = chunk.version, + }); + bs->enqueue_op(subops[i].bs_op); + } + else + { + subops[i].op_type = OSD_OP_OUT; + subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num); + subops[i].req.sec_del = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = c_cli.next_subop_id++, + .opcode = OSD_OP_SEC_DELETE, + }, + .oid = chunk.oid, + .version = chunk.version, + }; + subops[i].callback = [cur_op, this](osd_op_t *subop) + { + int fail_fd = subop->reply.hdr.retval != 0 ? subop->peer_fd : -1; + handle_primary_subop(subop, cur_op); + if (fail_fd >= 0) + { + // delete operation failed, drop the connection + c_cli.stop_client(fail_fd); + } + }; + c_cli.outbox_push(&subops[i]); } } }