From 98f1e2c277b189917554d7d3b5ffb82b3f509f03 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 6 Mar 2021 17:42:09 +0300 Subject: [PATCH] Rework write/sync ordering Make syncs wait for all previous writes because it's the only way to make sure that OSDs do not receive incomplete writes in LIST results during peering when some writes are still in progress. Also simplify blockstore submission queue logic. --- src/blockstore_impl.cpp | 68 ++++++++++++++-------------- src/blockstore_impl.h | 9 +--- src/blockstore_rollback.cpp | 11 ++--- src/blockstore_stable.cpp | 13 +++--- src/blockstore_sync.cpp | 90 ++++++++----------------------------- src/blockstore_write.cpp | 33 ++++++++------ 6 files changed, 83 insertions(+), 141 deletions(-) diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 390fa119..c2bcb5b3 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -101,21 +101,10 @@ void blockstore_impl_t::loop() { // try to submit ops unsigned initial_ring_space = ringloop->space_left(); - // FIXME: rework this "sync polling" - auto cur_sync = in_progress_syncs.begin(); - while (cur_sync != in_progress_syncs.end()) - { - if (continue_sync(*cur_sync) != 2) - { - // List is unmodified - cur_sync++; - } - else - { - cur_sync = in_progress_syncs.begin(); - } - } auto cur = submit_queue.begin(); + // has_writes == 0 - no writes before the current queue item + // has_writes == 1 - some writes in progress + // has_writes == 2 - tried to submit some writes, but failed int has_writes = 0; while (cur != submit_queue.end()) { @@ -142,10 +131,12 @@ void blockstore_impl_t::loop() } unsigned ring_space = ringloop->space_left(); unsigned prev_sqe_pos = ringloop->save(); - bool dequeue_op = false; + bool dequeue_op = false, cancel_op = false; + bool has_in_progress_sync = false; if (op->opcode == BS_OP_READ) { dequeue_op = dequeue_read(op); + cancel_op = !dequeue_op; } else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) { @@ -154,8 +145,13 @@ void blockstore_impl_t::loop() // Some writes already could not be submitted continue; } - dequeue_op = dequeue_write(op); - has_writes = dequeue_op ? 1 : 2; + int wr_st = dequeue_write(op); + // 0 = can't submit + // 1 = in progress + // 2 = completed, remove from queue + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; + has_writes = wr_st > 0 ? 1 : 2; } else if (op->opcode == BS_OP_DELETE) { @@ -164,8 +160,10 @@ void blockstore_impl_t::loop() // Some writes already could not be submitted continue; } - dequeue_op = dequeue_del(op); - has_writes = dequeue_op ? 1 : 2; + int wr_st = dequeue_del(op); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; + has_writes = wr_st > 0 ? 1 : 2; } else if (op->opcode == BS_OP_SYNC) { @@ -178,29 +176,39 @@ void blockstore_impl_t::loop() // Can't submit SYNC before previous writes continue; } - dequeue_op = dequeue_sync(op); + int wr_st = continue_sync(op, has_in_progress_sync); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; + if (dequeue_op != 2) + { + // Or we could just set has_writes=1... + has_in_progress_sync = true; + } } else if (op->opcode == BS_OP_STABLE) { - dequeue_op = dequeue_stable(op); + int wr_st = dequeue_stable(op); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; } else if (op->opcode == BS_OP_ROLLBACK) { - dequeue_op = dequeue_rollback(op); + int wr_st = dequeue_rollback(op); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; } else if (op->opcode == BS_OP_LIST) { - // LIST doesn't need to be blocked by previous modifications, - // it only needs to include all in-progress writes as they're guaranteed - // to be readable and stabilizable/rollbackable by subsequent operations + // LIST doesn't need to be blocked by previous modifications process_list(op); dequeue_op = true; + cancel_op = false; } if (dequeue_op) { submit_queue.erase(op_ptr); } - else + if (cancel_op) { ringloop->restore(prev_sqe_pos); if (PRIV(op)->wait_for == WAIT_SQE) @@ -233,7 +241,7 @@ bool blockstore_impl_t::is_safe_to_stop() { // It's safe to stop blockstore when there are no in-flight operations, // no in-progress syncs and flusher isn't doing anything - if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active()) + if (submit_queue.size() > 0 || !readonly && flusher->is_active()) { return false; } @@ -374,12 +382,6 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) std::function(op->callback)(op); return; } - if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL) - { - op->retval = 0; - std::function(op->callback)(op); - return; - } // Call constructor without allocating memory. We'll call destructor before returning op back new ((void*)op->private_data) blockstore_op_private_t; PRIV(op)->wait_for = 0; diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index 94636b12..f589c3ba 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -160,8 +160,6 @@ struct blockstore_op_private_t // Sync std::vector sync_big_writes, sync_small_writes; int sync_small_checked, sync_big_checked; - std::list::iterator in_progress_ptr; - int prev_sync_count; }; // https://github.com/algorithm-ninja/cpp-btree @@ -212,7 +210,6 @@ class blockstore_impl_t blockstore_dirty_db_t dirty_db; std::list submit_queue; // FIXME: funny thing is that vector is better here std::vector unsynced_big_writes, unsynced_small_writes; - std::list in_progress_syncs; // ...and probably here, too allocator *data_alloc = NULL; uint8_t *zero_object; @@ -279,11 +276,9 @@ class blockstore_impl_t void handle_write_event(ring_data_t *data, blockstore_op_t *op); // Sync - int dequeue_sync(blockstore_op_t *op); + int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync); void handle_sync_event(ring_data_t *data, blockstore_op_t *op); - int continue_sync(blockstore_op_t *op); - void ack_one_sync(blockstore_op_t *op); - int ack_sync(blockstore_op_t *op); + void ack_sync(blockstore_op_t *op); // Stabilize int dequeue_stable(blockstore_op_t *op); diff --git a/src/blockstore_rollback.cpp b/src/blockstore_rollback.cpp index 3040a360..58249719 100644 --- a/src/blockstore_rollback.cpp +++ b/src/blockstore_rollback.cpp @@ -50,7 +50,7 @@ skip_ov: { op->retval = -EBUSY; FINISH_OP(op); - return 1; + return 2; } if (dirty_it == dirty_db.begin()) { @@ -66,7 +66,7 @@ skip_ov: // Already rolled back op->retval = 0; FINISH_OP(op); - return 1; + return 2; } // Check journal space blockstore_journal_check_t space_check(this); @@ -151,7 +151,7 @@ resume_5: // Acknowledge op op->retval = 0; FINISH_OP(op); - return 1; + return 2; } void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov) @@ -216,10 +216,7 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t if (PRIV(op)->pending_ops == 0) { PRIV(op)->op_state++; - if (!continue_rollback(op)) - { - submit_queue.push_front(op); - } + ringloop->wakeup(); } } diff --git a/src/blockstore_stable.cpp b/src/blockstore_stable.cpp index 3a520ee9..86bc31c3 100644 --- a/src/blockstore_stable.cpp +++ b/src/blockstore_stable.cpp @@ -60,7 +60,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // No such object version op->retval = -ENOENT; FINISH_OP(op); - return 1; + return 2; } else { @@ -77,7 +77,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Object not synced yet. Caller must sync it first op->retval = -EBUSY; FINISH_OP(op); - return 1; + return 2; } else if (!IS_STABLE(dirty_it->second.state)) { @@ -89,7 +89,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Already stable op->retval = 0; FINISH_OP(op); - return 1; + return 2; } // Check journal space blockstore_journal_check_t space_check(this); @@ -176,7 +176,7 @@ resume_5: // Acknowledge op op->retval = 0; FINISH_OP(op); - return 1; + return 2; } void blockstore_impl_t::mark_stable(const obj_ver_id & v) @@ -228,9 +228,6 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * if (PRIV(op)->pending_ops == 0) { PRIV(op)->op_state++; - if (!continue_stable(op)) - { - submit_queue.push_front(op); - } + ringloop->wakeup(); } } diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index 348548f5..bcce8607 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -12,8 +12,15 @@ #define SYNC_JOURNAL_SYNC_SENT 7 #define SYNC_DONE 8 -int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) +int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync) { + if (immediate_commit == IMMEDIATE_ALL) + { + // We can return immediately because sync is only dequeued after all previous writes + op->retval = 0; + FINISH_OP(op); + return 2; + } if (PRIV(op)->op_state == 0) { stop_sync_submitted = false; @@ -29,34 +36,15 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) PRIV(op)->op_state = SYNC_HAS_SMALL; else PRIV(op)->op_state = SYNC_DONE; - // Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes - PRIV(op)->prev_sync_count = in_progress_syncs.size(); - PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); } - continue_sync(op); - // Always dequeue because we always add syncs to in_progress_syncs - return 1; -} - -int blockstore_impl_t::continue_sync(blockstore_op_t *op) -{ - auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; if (PRIV(op)->op_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal - for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) - { - if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state)) - { - // Wait for small inflight writes to complete - return 0; - } - } if (journal.sector_info[journal.cur_sector].dirty) { // Write out the last journal sector if it happens to be dirty BS_SUBMIT_GET_ONLY_SQE(sqe); - prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); }); PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; @@ -69,21 +57,13 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } if (PRIV(op)->op_state == SYNC_HAS_BIG) { - for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++) - { - if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_big_writes[PRIV(op)->sync_big_checked]].state)) - { - // Wait for big inflight writes to complete - return 0; - } - } // 1st step: fsync data if (!disable_data_fsync) { BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = cb; + data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_DATA_SYNC_SENT; @@ -96,14 +76,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE) { - for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) - { - if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state)) - { - // Wait for small inflight writes to complete - return 0; - } - } // 2nd step: Data device is synced, prepare & write journal entries // Check space in the journal and journal memory buffers blockstore_journal_check_t space_check(this); @@ -127,7 +99,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) { if (cur_sector == -1) PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); }); cur_sector = journal.cur_sector; } journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( @@ -152,7 +124,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) journal.crc32_last = je->crc32; it++; } - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); }); assert(s == space_check.sectors_to_write); if (cur_sector == -1) PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; @@ -168,7 +140,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = cb; + data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT; return 1; @@ -178,9 +150,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) PRIV(op)->op_state = SYNC_DONE; } } - if (PRIV(op)->op_state == SYNC_DONE) + if (PRIV(op)->op_state == SYNC_DONE && !queue_has_in_progress_sync) { - return ack_sync(op); + ack_sync(op); + return 2; } return 1; } @@ -212,42 +185,16 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT) { PRIV(op)->op_state = SYNC_DONE; - ack_sync(op); } else { throw std::runtime_error("BUG: unexpected sync op state"); } + ringloop->wakeup(); } } -int blockstore_impl_t::ack_sync(blockstore_op_t *op) -{ - if (PRIV(op)->op_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0) - { - // Remove dependency of subsequent syncs - auto it = PRIV(op)->in_progress_ptr; - int done_syncs = 1; - ++it; - // Acknowledge sync - ack_one_sync(op); - while (it != in_progress_syncs.end()) - { - auto & next_sync = *it++; - PRIV(next_sync)->prev_sync_count -= done_syncs; - if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->op_state == SYNC_DONE) - { - done_syncs++; - // Acknowledge next_sync - ack_one_sync(next_sync); - } - } - return 2; - } - return 0; -} - -void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) +void blockstore_impl_t::ack_sync(blockstore_op_t *op) { // Handle states for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++) @@ -295,7 +242,6 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) } } } - in_progress_syncs.erase(PRIV(op)->in_progress_ptr); op->retval = 0; FINISH_OP(op); } diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index deb877f5..42f1689a 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -170,7 +170,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { // This is the flag value used to cancel operations FINISH_OP(op); - return 1; + return 2; } // Restore original low version number for unblocked operations #ifdef BLOCKSTORE_DEBUG @@ -183,7 +183,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) // Original version is still invalid // All subsequent writes to the same object must be canceled too cancel_all_writes(op, dirty_it, -EEXIST); - return 1; + return 2; } op->version = PRIV(op)->real_version; PRIV(op)->real_version = 0; @@ -217,7 +217,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) return 0; } cancel_all_writes(op, dirty_it, -ENOSPC); - return 1; + return 2; } write_iodepth++; BS_SUBMIT_GET_SQE(sqe, data); @@ -370,7 +370,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) if (!PRIV(op)->pending_ops) { PRIV(op)->op_state = 4; - continue_write(op); + return continue_write(op); } else { @@ -384,17 +384,21 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op) { io_uring_sqe *sqe = NULL; journal_entry_big_write *je; + int op_state = PRIV(op)->op_state; + if (op_state != 2 && op_state != 4) + { + // In progress + return 1; + } auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, }); assert(dirty_it != dirty_db.end()); - if (PRIV(op)->op_state == 2) + if (op_state == 2) goto resume_2; - else if (PRIV(op)->op_state == 4) + else if (op_state == 4) goto resume_4; - else - return 1; resume_2: // Only for the immediate_commit mode: prepare and submit big_write journal entry sqe = get_sqe(); @@ -464,7 +468,7 @@ resume_4: op->retval = op->len; write_iodepth--; FINISH_OP(op); - return 1; + return 2; } void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op) @@ -483,10 +487,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o { release_journal_sectors(op); PRIV(op)->op_state++; - if (!continue_write(op)) - { - submit_queue.push_front(op); - } + ringloop->wakeup(); } } @@ -524,6 +525,10 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) int blockstore_impl_t::dequeue_del(blockstore_op_t *op) { + if (PRIV(op)->op_state) + { + return continue_write(op); + } auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, @@ -593,7 +598,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) if (!PRIV(op)->pending_ops) { PRIV(op)->op_state = 4; - continue_write(op); + return continue_write(op); } else {