From c71b67f2f7ce6ac0466d529b5447b3036ace4930 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 23 Feb 2020 20:34:37 +0300 Subject: [PATCH] Move SYNC_STAB_ALL into blockstore implementation --- blockstore.h | 3 ++- blockstore_impl.cpp | 39 +++++++++++++++++++++++++++++++++++ blockstore_write.cpp | 2 -- fio_engine.cpp | 46 ++++++------------------------------------ osd_exec_secondary.cpp | 32 ++--------------------------- 5 files changed, 49 insertions(+), 73 deletions(-) diff --git a/blockstore.h b/blockstore.h index 0bdab53a..3c2637de 100644 --- a/blockstore.h +++ b/blockstore.h @@ -30,7 +30,8 @@ #define BS_OP_DELETE 5 #define BS_OP_LIST 6 #define BS_OP_ROLLBACK 7 -#define BS_OP_MAX 7 +#define BS_OP_SYNC_STAB_ALL 8 +#define BS_OP_MAX 8 #define BS_OP_PRIVATE_DATA_SIZE 256 diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 2eb76cca..6af26a8c 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -312,8 +312,47 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) op->callback(op); return; } + if (op->opcode == BS_OP_SYNC_STAB_ALL) + { + std::function *old_callback = new std::function(op->callback); + op->opcode = BS_OP_SYNC; + op->callback = [this, old_callback](blockstore_op_t *op) + { + if (op->retval >= 0 && unstable_writes.size() > 0) + { + op->opcode = BS_OP_STABLE; + op->len = unstable_writes.size(); + obj_ver_id *vers = new obj_ver_id[op->len]; + op->buf = vers; + int i = 0; + for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++) + { + vers[i] = { + .oid = it->first, + .version = it->second, + }; + } + unstable_writes.clear(); + op->callback = [this, old_callback](blockstore_op_t *op) + { + obj_ver_id *vers = (obj_ver_id*)op->buf; + delete[] vers; + op->buf = NULL; + (*old_callback)(op); + delete old_callback; + }; + this->enqueue_op(op); + } + else + { + (*old_callback)(op); + delete old_callback; + } + }; + } if (op->opcode == BS_OP_WRITE && !enqueue_write(op)) { + op->callback(op); return; } if (0 && op->opcode == BS_OP_SYNC && immediate_commit) diff --git a/blockstore_write.cpp b/blockstore_write.cpp index d2761e89..0270f0b7 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -39,14 +39,12 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) { // Invalid version requested op->retval = -EINVAL; - FINISH_OP(op); return false; } if (deleted && is_del) { // Already deleted op->retval = 0; - FINISH_OP(op); return false; } // Immediately add the operation into dirty_db, so subsequent reads could see it diff --git a/fio_engine.cpp b/fio_engine.cpp index 5c470167..80870245 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -211,51 +211,17 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) bsd->last_sync = false; break; case DDIR_SYNC: - op->opcode = BS_OP_SYNC; + op->opcode = BS_OP_SYNC_STAB_ALL; op->callback = [io, n](blockstore_op_t *op) { bs_data *bsd = (bs_data*)io->engine_data; - auto & unstable_writes = bsd->bs->get_unstable_writes(); - if (op->retval >= 0 && unstable_writes.size() > 0) - { - op->opcode = BS_OP_STABLE; - op->len = unstable_writes.size(); - obj_ver_id *vers = new obj_ver_id[op->len]; - op->buf = vers; - int i = 0; - for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++) - { - vers[i] = { - .oid = it->first, - .version = it->second, - }; - } - unstable_writes.clear(); - op->callback = [io, n](blockstore_op_t *op) - { - io->error = op->retval < 0 ? -op->retval : 0; - bs_data *bsd = (bs_data*)io->engine_data; - bsd->completed.push_back(io); - bsd->inflight--; - obj_ver_id *vers = (obj_ver_id*)op->buf; - delete[] vers; + io->error = op->retval < 0 ? -op->retval : 0; + bsd->completed.push_back(io); + bsd->inflight--; #ifdef BLOCKSTORE_DEBUG - printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval); + printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval); #endif - delete op; - }; - bsd->bs->enqueue_op(op); - } - else - { - io->error = op->retval < 0 ? -op->retval : 0; - bsd->completed.push_back(io); - bsd->inflight--; -#ifdef BLOCKSTORE_DEBUG - printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval); -#endif - delete op; - } + delete op; }; bsd->last_sync = true; break; diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp index f1e883a7..0702f820 100644 --- a/osd_exec_secondary.cpp +++ b/osd_exec_secondary.cpp @@ -84,44 +84,16 @@ void osd_t::exec_sync_stab_all(osd_op_t *cur_op) { // Sync and stabilize all objects // This command is only valid for tests - // FIXME: Dedup between here & fio_engine if (!allow_test_ops) { cur_op->bs_op.retval = -EINVAL; secondary_op_callback(cur_op); return; } - cur_op->bs_op.opcode = BS_OP_SYNC; + cur_op->bs_op.opcode = BS_OP_SYNC_STAB_ALL; cur_op->bs_op.callback = [this, cur_op](blockstore_op_t *bs_op) { - auto & unstable_writes = bs->get_unstable_writes(); - if (bs_op->retval >= 0 && unstable_writes.size() > 0) - { - bs_op->opcode = BS_OP_STABLE; - bs_op->len = unstable_writes.size(); - obj_ver_id *vers = new obj_ver_id[bs_op->len]; - bs_op->buf = vers; - int i = 0; - for (auto it = unstable_writes.begin(); it != unstable_writes.end(); it++, i++) - { - vers[i] = { - .oid = it->first, - .version = it->second, - }; - } - unstable_writes.clear(); - bs_op->callback = [this, cur_op](blockstore_op_t *bs_op) - { - secondary_op_callback(cur_op); - obj_ver_id *vers = (obj_ver_id*)bs_op->buf; - delete[] vers; - }; - bs->enqueue_op(bs_op); - } - else - { - secondary_op_callback(cur_op); - } + secondary_op_callback(cur_op); }; #ifdef OSD_STUB cur_op->bs_op.retval = 0;