diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 77744675..c9439e70 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -141,14 +141,15 @@ resume_0: { // We don't flush different parts of history of the same object in parallel // So we check if someone is already flushing this object - // In that case we set sync_to_repeat to 2 and pick another object - // Another coroutine will see this "2" and re-queue the object after it finishes - repeat_it->second = cur.version; + // In that case we set sync_to_repeat and pick another object + // Another coroutine will see it and re-queue the object after it finishes + if (repeat_it->second < cur.version) + repeat_it->second = cur.version; wait_state = 0; goto resume_0; } else - repeat_it->second = 0; + flusher->sync_to_repeat[cur.oid] = 0; dirty_it = dirty_end; flusher->active_flushers++; flusher->active_until_sync++; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 2cc87c3f..c5ab19db 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -26,12 +26,9 @@ int blockstore::dequeue_sync(blockstore_operation *op) int r = continue_sync(op); if (r) { - int done = ack_sync(op); - if (!done) - { - op->prev_sync_count = in_progress_syncs.size(); - op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); - } + op->prev_sync_count = in_progress_syncs.size(); + op->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); + ack_sync(op); } return r; } @@ -39,7 +36,6 @@ int blockstore::dequeue_sync(blockstore_operation *op) int blockstore::continue_sync(blockstore_operation *op) { auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; - op->min_used_journal_sector = op->max_used_journal_sector = 0; if (op->sync_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal @@ -47,6 +43,7 @@ int blockstore::continue_sync(blockstore_operation *op) my_uring_prep_fsync(sqe, journal.fd, 0); data->iov = { 0 }; data->callback = cb; + op->min_used_journal_sector = op->max_used_journal_sector = 0; op->pending_ops = 1; op->sync_state = SYNC_JOURNAL_SYNC_SENT; } @@ -57,6 +54,7 @@ int blockstore::continue_sync(blockstore_operation *op) my_uring_prep_fsync(sqe, data_fd, 0); data->iov = { 0 }; data->callback = cb; + op->min_used_journal_sector = op->max_used_journal_sector = 0; op->pending_ops = 1; op->sync_state = SYNC_DATA_SYNC_SENT; } @@ -108,10 +106,6 @@ int blockstore::continue_sync(blockstore_operation *op) op->sync_state = SYNC_JOURNAL_SYNC_SENT; ringloop->submit(); } - else - { - return 0; - } return 1; } diff --git a/fio_engine.cpp b/fio_engine.cpp index 52a1cc5d..bdb0ea9d 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -15,7 +15,7 @@ struct bs_data ring_loop_t *ringloop; /* The list of completed io_u structs. */ std::vector completed; - int op_n = 0; + int op_n = 0, inflight = 0; }; struct bs_options @@ -151,6 +151,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) { io->error = op->retval < 0 ? -op->retval : 0; bs_data *bsd = (bs_data*)io->engine_data; + bsd->inflight--; bsd->completed.push_back(io); if (DEBUG) printf("--- OP_WRITE %llx n=%d retval=%d\n", io, n, op->retval); @@ -182,6 +183,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) io->error = op->retval < 0 ? -op->retval : 0; bs_data *bsd = (bs_data*)io->engine_data; bsd->completed.push_back(io); + bsd->inflight--; obj_ver_id *vers = (obj_ver_id*)op->buf; delete[] vers; if (DEBUG) @@ -193,6 +195,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) { io->error = op->retval < 0 ? -op->retval : 0; bsd->completed.push_back(io); + bsd->inflight--; if (DEBUG) printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval); delete op; @@ -207,6 +210,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) if (DEBUG) printf("+++ %s %llx\n", op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC", io); io->error = 0; + bsd->inflight++; bsd->bs->enqueue_op(op); bsd->op_n++;