diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 390fa119..c2bcb5b3 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -101,21 +101,10 @@ void blockstore_impl_t::loop() { // try to submit ops unsigned initial_ring_space = ringloop->space_left(); - // FIXME: rework this "sync polling" - auto cur_sync = in_progress_syncs.begin(); - while (cur_sync != in_progress_syncs.end()) - { - if (continue_sync(*cur_sync) != 2) - { - // List is unmodified - cur_sync++; - } - else - { - cur_sync = in_progress_syncs.begin(); - } - } auto cur = submit_queue.begin(); + // has_writes == 0 - no writes before the current queue item + // has_writes == 1 - some writes in progress + // has_writes == 2 - tried to submit some writes, but failed int has_writes = 0; while (cur != submit_queue.end()) { @@ -142,10 +131,12 @@ void blockstore_impl_t::loop() } unsigned ring_space = ringloop->space_left(); unsigned prev_sqe_pos = ringloop->save(); - bool dequeue_op = false; + bool dequeue_op = false, cancel_op = false; + bool has_in_progress_sync = false; if (op->opcode == BS_OP_READ) { dequeue_op = dequeue_read(op); + cancel_op = !dequeue_op; } else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) { @@ -154,8 +145,13 @@ void blockstore_impl_t::loop() // Some writes already could not be submitted continue; } - dequeue_op = dequeue_write(op); - has_writes = dequeue_op ? 1 : 2; + int wr_st = dequeue_write(op); + // 0 = can't submit + // 1 = in progress + // 2 = completed, remove from queue + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; + has_writes = wr_st > 0 ? 1 : 2; } else if (op->opcode == BS_OP_DELETE) { @@ -164,8 +160,10 @@ void blockstore_impl_t::loop() // Some writes already could not be submitted continue; } - dequeue_op = dequeue_del(op); - has_writes = dequeue_op ? 1 : 2; + int wr_st = dequeue_del(op); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; + has_writes = wr_st > 0 ? 1 : 2; } else if (op->opcode == BS_OP_SYNC) { @@ -178,29 +176,39 @@ void blockstore_impl_t::loop() // Can't submit SYNC before previous writes continue; } - dequeue_op = dequeue_sync(op); + int wr_st = continue_sync(op, has_in_progress_sync); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; + if (dequeue_op != 2) + { + // Or we could just set has_writes=1... + has_in_progress_sync = true; + } } else if (op->opcode == BS_OP_STABLE) { - dequeue_op = dequeue_stable(op); + int wr_st = dequeue_stable(op); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; } else if (op->opcode == BS_OP_ROLLBACK) { - dequeue_op = dequeue_rollback(op); + int wr_st = dequeue_rollback(op); + dequeue_op = wr_st == 2; + cancel_op = wr_st == 0; } else if (op->opcode == BS_OP_LIST) { - // LIST doesn't need to be blocked by previous modifications, - // it only needs to include all in-progress writes as they're guaranteed - // to be readable and stabilizable/rollbackable by subsequent operations + // LIST doesn't need to be blocked by previous modifications process_list(op); dequeue_op = true; + cancel_op = false; } if (dequeue_op) { submit_queue.erase(op_ptr); } - else + if (cancel_op) { ringloop->restore(prev_sqe_pos); if (PRIV(op)->wait_for == WAIT_SQE) @@ -233,7 +241,7 @@ bool blockstore_impl_t::is_safe_to_stop() { // It's safe to stop blockstore when there are no in-flight operations, // no in-progress syncs and flusher isn't doing anything - if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active()) + if (submit_queue.size() > 0 || !readonly && flusher->is_active()) { return false; } @@ -374,12 +382,6 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) std::function(op->callback)(op); return; } - if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL) - { - op->retval = 0; - std::function(op->callback)(op); - return; - } // Call constructor without allocating memory. We'll call destructor before returning op back new ((void*)op->private_data) blockstore_op_private_t; PRIV(op)->wait_for = 0; diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index 94636b12..f589c3ba 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -160,8 +160,6 @@ struct blockstore_op_private_t // Sync std::vector sync_big_writes, sync_small_writes; int sync_small_checked, sync_big_checked; - std::list::iterator in_progress_ptr; - int prev_sync_count; }; // https://github.com/algorithm-ninja/cpp-btree @@ -212,7 +210,6 @@ class blockstore_impl_t blockstore_dirty_db_t dirty_db; std::list submit_queue; // FIXME: funny thing is that vector is better here std::vector unsynced_big_writes, unsynced_small_writes; - std::list in_progress_syncs; // ...and probably here, too allocator *data_alloc = NULL; uint8_t *zero_object; @@ -279,11 +276,9 @@ class blockstore_impl_t void handle_write_event(ring_data_t *data, blockstore_op_t *op); // Sync - int dequeue_sync(blockstore_op_t *op); + int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync); void handle_sync_event(ring_data_t *data, blockstore_op_t *op); - int continue_sync(blockstore_op_t *op); - void ack_one_sync(blockstore_op_t *op); - int ack_sync(blockstore_op_t *op); + void ack_sync(blockstore_op_t *op); // Stabilize int dequeue_stable(blockstore_op_t *op); diff --git a/src/blockstore_rollback.cpp b/src/blockstore_rollback.cpp index 3040a360..58249719 100644 --- a/src/blockstore_rollback.cpp +++ b/src/blockstore_rollback.cpp @@ -50,7 +50,7 @@ skip_ov: { op->retval = -EBUSY; FINISH_OP(op); - return 1; + return 2; } if (dirty_it == dirty_db.begin()) { @@ -66,7 +66,7 @@ skip_ov: // Already rolled back op->retval = 0; FINISH_OP(op); - return 1; + return 2; } // Check journal space blockstore_journal_check_t space_check(this); @@ -151,7 +151,7 @@ resume_5: // Acknowledge op op->retval = 0; FINISH_OP(op); - return 1; + return 2; } void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov) @@ -216,10 +216,7 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t if (PRIV(op)->pending_ops == 0) { PRIV(op)->op_state++; - if (!continue_rollback(op)) - { - submit_queue.push_front(op); - } + ringloop->wakeup(); } } diff --git a/src/blockstore_stable.cpp b/src/blockstore_stable.cpp index 3a520ee9..86bc31c3 100644 --- a/src/blockstore_stable.cpp +++ b/src/blockstore_stable.cpp @@ -60,7 +60,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // No such object version op->retval = -ENOENT; FINISH_OP(op); - return 1; + return 2; } else { @@ -77,7 +77,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Object not synced yet. Caller must sync it first op->retval = -EBUSY; FINISH_OP(op); - return 1; + return 2; } else if (!IS_STABLE(dirty_it->second.state)) { @@ -89,7 +89,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Already stable op->retval = 0; FINISH_OP(op); - return 1; + return 2; } // Check journal space blockstore_journal_check_t space_check(this); @@ -176,7 +176,7 @@ resume_5: // Acknowledge op op->retval = 0; FINISH_OP(op); - return 1; + return 2; } void blockstore_impl_t::mark_stable(const obj_ver_id & v) @@ -228,9 +228,6 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * if (PRIV(op)->pending_ops == 0) { PRIV(op)->op_state++; - if (!continue_stable(op)) - { - submit_queue.push_front(op); - } + ringloop->wakeup(); } } diff --git a/src/blockstore_sync.cpp b/src/blockstore_sync.cpp index 348548f5..bcce8607 100644 --- a/src/blockstore_sync.cpp +++ b/src/blockstore_sync.cpp @@ -12,8 +12,15 @@ #define SYNC_JOURNAL_SYNC_SENT 7 #define SYNC_DONE 8 -int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) +int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync) { + if (immediate_commit == IMMEDIATE_ALL) + { + // We can return immediately because sync is only dequeued after all previous writes + op->retval = 0; + FINISH_OP(op); + return 2; + } if (PRIV(op)->op_state == 0) { stop_sync_submitted = false; @@ -29,34 +36,15 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) PRIV(op)->op_state = SYNC_HAS_SMALL; else PRIV(op)->op_state = SYNC_DONE; - // Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes - PRIV(op)->prev_sync_count = in_progress_syncs.size(); - PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); } - continue_sync(op); - // Always dequeue because we always add syncs to in_progress_syncs - return 1; -} - -int blockstore_impl_t::continue_sync(blockstore_op_t *op) -{ - auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; if (PRIV(op)->op_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal - for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) - { - if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state)) - { - // Wait for small inflight writes to complete - return 0; - } - } if (journal.sector_info[journal.cur_sector].dirty) { // Write out the last journal sector if it happens to be dirty BS_SUBMIT_GET_ONLY_SQE(sqe); - prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); }); PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; @@ -69,21 +57,13 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } if (PRIV(op)->op_state == SYNC_HAS_BIG) { - for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++) - { - if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_big_writes[PRIV(op)->sync_big_checked]].state)) - { - // Wait for big inflight writes to complete - return 0; - } - } // 1st step: fsync data if (!disable_data_fsync) { BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = cb; + data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_DATA_SYNC_SENT; @@ -96,14 +76,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE) { - for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) - { - if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state)) - { - // Wait for small inflight writes to complete - return 0; - } - } // 2nd step: Data device is synced, prepare & write journal entries // Check space in the journal and journal memory buffers blockstore_journal_check_t space_check(this); @@ -127,7 +99,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) { if (cur_sector == -1) PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); }); cur_sector = journal.cur_sector; } journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( @@ -152,7 +124,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) journal.crc32_last = je->crc32; it++; } - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); }); assert(s == space_check.sectors_to_write); if (cur_sector == -1) PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; @@ -168,7 +140,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) BS_SUBMIT_GET_SQE(sqe, data); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; - data->callback = cb; + data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; PRIV(op)->pending_ops = 1; PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT; return 1; @@ -178,9 +150,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) PRIV(op)->op_state = SYNC_DONE; } } - if (PRIV(op)->op_state == SYNC_DONE) + if (PRIV(op)->op_state == SYNC_DONE && !queue_has_in_progress_sync) { - return ack_sync(op); + ack_sync(op); + return 2; } return 1; } @@ -212,42 +185,16 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT) { PRIV(op)->op_state = SYNC_DONE; - ack_sync(op); } else { throw std::runtime_error("BUG: unexpected sync op state"); } + ringloop->wakeup(); } } -int blockstore_impl_t::ack_sync(blockstore_op_t *op) -{ - if (PRIV(op)->op_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0) - { - // Remove dependency of subsequent syncs - auto it = PRIV(op)->in_progress_ptr; - int done_syncs = 1; - ++it; - // Acknowledge sync - ack_one_sync(op); - while (it != in_progress_syncs.end()) - { - auto & next_sync = *it++; - PRIV(next_sync)->prev_sync_count -= done_syncs; - if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->op_state == SYNC_DONE) - { - done_syncs++; - // Acknowledge next_sync - ack_one_sync(next_sync); - } - } - return 2; - } - return 0; -} - -void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) +void blockstore_impl_t::ack_sync(blockstore_op_t *op) { // Handle states for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++) @@ -295,7 +242,6 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) } } } - in_progress_syncs.erase(PRIV(op)->in_progress_ptr); op->retval = 0; FINISH_OP(op); } diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index deb877f5..42f1689a 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -170,7 +170,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) { // This is the flag value used to cancel operations FINISH_OP(op); - return 1; + return 2; } // Restore original low version number for unblocked operations #ifdef BLOCKSTORE_DEBUG @@ -183,7 +183,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) // Original version is still invalid // All subsequent writes to the same object must be canceled too cancel_all_writes(op, dirty_it, -EEXIST); - return 1; + return 2; } op->version = PRIV(op)->real_version; PRIV(op)->real_version = 0; @@ -217,7 +217,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) return 0; } cancel_all_writes(op, dirty_it, -ENOSPC); - return 1; + return 2; } write_iodepth++; BS_SUBMIT_GET_SQE(sqe, data); @@ -370,7 +370,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) if (!PRIV(op)->pending_ops) { PRIV(op)->op_state = 4; - continue_write(op); + return continue_write(op); } else { @@ -384,17 +384,21 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op) { io_uring_sqe *sqe = NULL; journal_entry_big_write *je; + int op_state = PRIV(op)->op_state; + if (op_state != 2 && op_state != 4) + { + // In progress + return 1; + } auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, }); assert(dirty_it != dirty_db.end()); - if (PRIV(op)->op_state == 2) + if (op_state == 2) goto resume_2; - else if (PRIV(op)->op_state == 4) + else if (op_state == 4) goto resume_4; - else - return 1; resume_2: // Only for the immediate_commit mode: prepare and submit big_write journal entry sqe = get_sqe(); @@ -464,7 +468,7 @@ resume_4: op->retval = op->len; write_iodepth--; FINISH_OP(op); - return 1; + return 2; } void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op) @@ -483,10 +487,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o { release_journal_sectors(op); PRIV(op)->op_state++; - if (!continue_write(op)) - { - submit_queue.push_front(op); - } + ringloop->wakeup(); } } @@ -524,6 +525,10 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op) int blockstore_impl_t::dequeue_del(blockstore_op_t *op) { + if (PRIV(op)->op_state) + { + return continue_write(op); + } auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, @@ -593,7 +598,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) if (!PRIV(op)->pending_ops) { PRIV(op)->op_state = 4; - continue_write(op); + return continue_write(op); } else {