From c3737ae3ff0e93d817746d9f26a3c1ebbd8c419e Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 9 Mar 2020 00:35:54 +0300 Subject: [PATCH] Add journal fsync to stabilize/rollback --- blockstore_flush.cpp | 5 +- blockstore_flush.h | 1 + blockstore_impl.cpp | 2 +- blockstore_impl.h | 5 +- blockstore_rollback.cpp | 100 ++++++++++++++++++++--------- blockstore_stable.cpp | 135 ++++++++++++++++++++++++++-------------- blockstore_sync.cpp | 48 +++++++------- 7 files changed, 193 insertions(+), 103 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 1f72d30e..de28da93 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -6,6 +6,7 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs) this->flusher_count = flusher_count; dequeuing = false; active_flushers = 0; + syncing_flushers = 0; sync_threshold = bs->journal_block_size / sizeof(journal_entry_stable); journal_trim_interval = sync_threshold; journal_trim_counter = 0; @@ -649,7 +650,8 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) }); sync_found: cur_sync->ready_count++; - if (cur_sync->ready_count >= flusher->sync_threshold || !flusher->flush_queue.size()) + flusher->syncing_flushers++; + if (flusher->syncing_flushers >= flusher->flusher_count || !flusher->flush_queue.size()) { // Sync batch is ready. Do it. await_sqe(0); @@ -675,6 +677,7 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) wait_state = 2; return false; } + flusher->syncing_flushers--; cur_sync->ready_count--; if (cur_sync->ready_count == 0) { diff --git a/blockstore_flush.h b/blockstore_flush.h index 6451b151..a09bc6f4 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -84,6 +84,7 @@ class journal_flusher_t void* journal_superblock; int active_flushers; + int syncing_flushers; std::list syncs; std::map sync_to_repeat; diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 96a4052d..4aa715ee 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -364,7 +364,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) // Call constructor without allocating memory. We'll call destructor before returning op back new ((void*)op->private_data) blockstore_op_private_t; PRIV(op)->wait_for = 0; - PRIV(op)->sync_state = 0; + PRIV(op)->op_state = 0; PRIV(op)->pending_ops = 0; if (!first) { diff --git a/blockstore_impl.h b/blockstore_impl.h index ec9d2383..de4a5a7b 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -147,6 +147,7 @@ struct blockstore_op_private_t int wait_for; uint64_t wait_detail; int pending_ops; + int op_state; // Read std::vector read_vec; @@ -161,7 +162,7 @@ struct blockstore_op_private_t std::vector sync_big_writes, sync_small_writes; int sync_small_checked, sync_big_checked; std::list::iterator in_progress_ptr; - int sync_state, prev_sync_count; + int prev_sync_count; }; // https://github.com/algorithm-ninja/cpp-btree @@ -280,11 +281,13 @@ class blockstore_impl_t // Stabilize int dequeue_stable(blockstore_op_t *op); + int continue_stable(blockstore_op_t *op); void handle_stable_event(ring_data_t *data, blockstore_op_t *op); void stabilize_object(object_id oid, uint64_t max_ver); // Rollback int dequeue_rollback(blockstore_op_t *op); + int continue_rollback(blockstore_op_t *op); void handle_rollback_event(ring_data_t *data, blockstore_op_t *op); void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc); diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index dfd04286..78aa47b6 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -2,6 +2,10 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) { + if (PRIV(op)->op_state) + { + return continue_rollback(op); + } obj_ver_id* v; int i, todo = op->len; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) @@ -110,6 +114,70 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) } PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; + PRIV(op)->op_state = 1; + return 1; +} + +int blockstore_impl_t::continue_rollback(blockstore_op_t *op) +{ + if (PRIV(op)->op_state == 2) + goto resume_2; + else if (PRIV(op)->op_state == 3) + goto resume_3; + else if (PRIV(op)->op_state == 5) + goto resume_5; + else + return 1; +resume_2: + // Release used journal sectors + release_journal_sectors(op); +resume_3: + if (!disable_journal_fsync) + { + io_uring_sqe *sqe = get_sqe(); + if (!sqe) + { + return 0; + } + ring_data_t *data = ((ring_data_t*)sqe->user_data); + my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); + data->iov = { 0 }; + data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; + PRIV(op)->pending_ops = 1; + PRIV(op)->op_state = 4; + return 1; + } +resume_5: + obj_ver_id* v; + int i; + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + { + // Erase dirty_db entries + auto rm_end = dirty_db.lower_bound((obj_ver_id){ + .oid = v->oid, + .version = UINT64_MAX, + }); + rm_end--; + auto rm_start = rm_end; + while (1) + { + if (rm_end->first.oid != v->oid) + break; + else if (rm_end->first.version <= v->version) + break; + rm_start = rm_end; + if (rm_end == dirty_db.begin()) + break; + rm_end--; + } + if (rm_end != rm_start) + erase_dirty(rm_start, rm_end, UINT64_MAX); + } + journal.trim(); + // Acknowledge op + op->retval = 0; + FINISH_OP(op); return 1; } @@ -126,37 +194,11 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t PRIV(op)->pending_ops--; if (PRIV(op)->pending_ops == 0) { - // Release used journal sectors - release_journal_sectors(op); - obj_ver_id* v; - int i; - for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + PRIV(op)->op_state++; + if (!continue_stable(op)) { - // Erase dirty_db entries - auto rm_end = dirty_db.lower_bound((obj_ver_id){ - .oid = v->oid, - .version = UINT64_MAX, - }); - rm_end--; - auto rm_start = rm_end; - while (1) - { - if (rm_end->first.oid != v->oid) - break; - else if (rm_end->first.version <= v->version) - break; - rm_start = rm_end; - if (rm_end == dirty_db.begin()) - break; - rm_end--; - } - if (rm_end != rm_start) - erase_dirty(rm_start, rm_end, UINT64_MAX); + submit_queue.push_front(op); } - journal.trim(); - // Acknowledge op - op->retval = 0; - FINISH_OP(op); } } diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index fdca50e6..f849ac9c 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -40,6 +40,10 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) { + if (PRIV(op)->op_state) + { + return continue_stable(op); + } obj_ver_id* v; int i, todo = 0; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) @@ -127,6 +131,87 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) } PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; + PRIV(op)->op_state = 1; + return 1; +} + +int blockstore_impl_t::continue_stable(blockstore_op_t *op) +{ + if (PRIV(op)->op_state == 2) + goto resume_2; + else if (PRIV(op)->op_state == 3) + goto resume_3; + else if (PRIV(op)->op_state == 5) + goto resume_5; + else + return 1; +resume_2: + // Release used journal sectors + release_journal_sectors(op); +resume_3: + if (!disable_journal_fsync) + { + io_uring_sqe *sqe = get_sqe(); + if (!sqe) + { + return 0; + } + ring_data_t *data = ((ring_data_t*)sqe->user_data); + my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); + data->iov = { 0 }; + data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; + PRIV(op)->pending_ops = 1; + PRIV(op)->op_state = 4; + return 1; + } +resume_5: + // Mark dirty_db entries as stable, acknowledge op completion + obj_ver_id* v; + int i; + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + { + // Mark all dirty_db entries up to op->version as stable + auto dirty_it = dirty_db.find(*v); + if (dirty_it != dirty_db.end()) + { + while (1) + { + if (dirty_it->second.state == ST_J_SYNCED) + { + dirty_it->second.state = ST_J_STABLE; + } + else if (dirty_it->second.state == ST_D_META_SYNCED) + { + dirty_it->second.state = ST_D_STABLE; + } + else if (dirty_it->second.state == ST_DEL_SYNCED) + { + dirty_it->second.state = ST_DEL_STABLE; + } + else if (IS_STABLE(dirty_it->second.state)) + { + break; + } + if (dirty_it == dirty_db.begin()) + { + break; + } + dirty_it--; + if (dirty_it->first.oid != v->oid) + { + break; + } + } +#ifdef BLOCKSTORE_DEBUG + printf("enqueue_flush %lu:%lu v%lu\n", v->oid.inode, v->oid.stripe, v->version); +#endif + flusher->enqueue_flush(*v); + } + } + // Acknowledge op + op->retval = 0; + FINISH_OP(op); return 1; } @@ -143,54 +228,10 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * PRIV(op)->pending_ops--; if (PRIV(op)->pending_ops == 0) { - // FIXME Oops. We must sync the device! - // Release used journal sectors - release_journal_sectors(op); - // Mark dirty_db entries as stable, acknowledge op completion - obj_ver_id* v; - int i; - for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + PRIV(op)->op_state++; + if (!continue_stable(op)) { - // Mark all dirty_db entries up to op->version as stable - auto dirty_it = dirty_db.find(*v); - if (dirty_it != dirty_db.end()) - { - while (1) - { - if (dirty_it->second.state == ST_J_SYNCED) - { - dirty_it->second.state = ST_J_STABLE; - } - else if (dirty_it->second.state == ST_D_META_SYNCED) - { - dirty_it->second.state = ST_D_STABLE; - } - else if (dirty_it->second.state == ST_DEL_SYNCED) - { - dirty_it->second.state = ST_DEL_STABLE; - } - else if (IS_STABLE(dirty_it->second.state)) - { - break; - } - if (dirty_it == dirty_db.begin()) - { - break; - } - dirty_it--; - if (dirty_it->first.oid != v->oid) - { - break; - } - } -#ifdef BLOCKSTORE_DEBUG - printf("enqueue_flush %lu:%lu v%lu\n", v->oid.inode, v->oid.stripe, v->version); -#endif - flusher->enqueue_flush(*v); - } + submit_queue.push_front(op); } - // Acknowledge op - op->retval = 0; - FINISH_OP(op); } } diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 48928429..2c5f5e35 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -11,7 +11,7 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) { - if (PRIV(op)->sync_state == 0) + if (PRIV(op)->op_state == 0) { stop_sync_submitted = false; PRIV(op)->sync_big_writes.swap(unsynced_big_writes); @@ -21,11 +21,11 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) unsynced_big_writes.clear(); unsynced_small_writes.clear(); if (PRIV(op)->sync_big_writes.size() > 0) - PRIV(op)->sync_state = SYNC_HAS_BIG; + PRIV(op)->op_state = SYNC_HAS_BIG; else if (PRIV(op)->sync_small_writes.size() > 0) - PRIV(op)->sync_state = SYNC_HAS_SMALL; + PRIV(op)->op_state = SYNC_HAS_SMALL; else - PRIV(op)->sync_state = SYNC_DONE; + PRIV(op)->op_state = SYNC_DONE; // Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes PRIV(op)->prev_sync_count = in_progress_syncs.size(); PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op); @@ -38,7 +38,7 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) int blockstore_impl_t::continue_sync(blockstore_op_t *op) { auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); }; - if (PRIV(op)->sync_state == SYNC_HAS_SMALL) + if (PRIV(op)->op_state == SYNC_HAS_SMALL) { // No big writes, just fsync the journal for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) @@ -56,15 +56,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = 1; - PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT; + PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; return 1; } else { - PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE; + PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE; } } - if (PRIV(op)->sync_state == SYNC_HAS_BIG) + if (PRIV(op)->op_state == SYNC_HAS_BIG) { for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++) { @@ -83,15 +83,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) data->callback = cb; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->pending_ops = 1; - PRIV(op)->sync_state = SYNC_DATA_SYNC_SENT; + PRIV(op)->op_state = SYNC_DATA_SYNC_SENT; return 1; } else { - PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE; + PRIV(op)->op_state = SYNC_DATA_SYNC_DONE; } } - if (PRIV(op)->sync_state == SYNC_DATA_SYNC_DONE) + if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE) { for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++) { @@ -153,10 +153,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; - PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT; + PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; return 1; } - if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_DONE) + if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_DONE) { if (!disable_journal_fsync) { @@ -165,15 +165,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) data->iov = { 0 }; data->callback = cb; PRIV(op)->pending_ops = 1; - PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT; + PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT; return 1; } else { - PRIV(op)->sync_state = SYNC_DONE; + PRIV(op)->op_state = SYNC_DONE; } } - if (PRIV(op)->sync_state == SYNC_DONE) + if (PRIV(op)->op_state == SYNC_DONE) { ack_sync(op); } @@ -196,17 +196,17 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op // Release used journal sectors release_journal_sectors(op); // Handle states - if (PRIV(op)->sync_state == SYNC_DATA_SYNC_SENT) + if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT) { - PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE; + PRIV(op)->op_state = SYNC_DATA_SYNC_DONE; } - else if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_SENT) + else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT) { - PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE; + PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE; } - else if (PRIV(op)->sync_state == SYNC_JOURNAL_SYNC_SENT) + else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT) { - PRIV(op)->sync_state = SYNC_DONE; + PRIV(op)->op_state = SYNC_DONE; ack_sync(op); } else @@ -218,7 +218,7 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op int blockstore_impl_t::ack_sync(blockstore_op_t *op) { - if (PRIV(op)->sync_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0) + if (PRIV(op)->op_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0) { // Remove dependency of subsequent syncs auto it = PRIV(op)->in_progress_ptr; @@ -230,7 +230,7 @@ int blockstore_impl_t::ack_sync(blockstore_op_t *op) { auto & next_sync = *it++; PRIV(next_sync)->prev_sync_count -= done_syncs; - if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->sync_state == SYNC_DONE) + if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->op_state == SYNC_DONE) { done_syncs++; // Acknowledge next_sync