From 9b5d8b9ad47ca5fcb5b531ecaea28d83efb67e83 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 2 Feb 2021 01:29:11 +0300 Subject: [PATCH] Fix multiple-sector journal writes, add assertions to not miss any SQEs --- blockstore_journal.cpp | 9 ++++++--- blockstore_journal.h | 9 +++++++-- blockstore_rollback.cpp | 32 ++++++++++++-------------------- blockstore_stable.cpp | 32 ++++++++++++-------------------- blockstore_sync.cpp | 32 ++++++++++++-------------------- blockstore_write.cpp | 1 - 6 files changed, 49 insertions(+), 66 deletions(-) diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index 22ea483e..6819c9b2 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -25,6 +25,10 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries : (bs->journal.block_size - next_in_pos) / size; if (fits > 0) { + if (fits > required) + { + fits = required; + } if (first_sector == -1) { first_sector = next_sector; @@ -116,12 +120,10 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int entries journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size) { - if (journal.block_size - journal.in_sector_pos < size || - journal.no_same_sector_overwrites && journal.sector_info[journal.cur_sector].written) + if (!journal.entry_fits(size)) { assert(!journal.sector_info[journal.cur_sector].dirty); // Move to the next journal sector - journal.sector_info[journal.cur_sector].written = false; if (journal.sector_info[journal.cur_sector].usage_count > 0) { // Also select next sector buffer in memory @@ -132,6 +134,7 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, { journal.dirty_start = journal.next_free; } + journal.sector_info[journal.cur_sector].written = false; journal.sector_info[journal.cur_sector].offset = journal.next_free; journal.in_sector_pos = 0; journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size; diff --git a/blockstore_journal.h b/blockstore_journal.h index 9fe91061..cfd5f210 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -133,7 +133,7 @@ inline uint32_t je_crc32(journal_entry *je) struct journal_sector_info_t { uint64_t offset; - uint64_t usage_count; + uint64_t usage_count; // flusher_count! bool written; bool dirty; }; @@ -170,13 +170,18 @@ struct journal_t ~journal_t(); bool trim(); uint64_t get_trim_pos(); + inline bool entry_fits(int size) + { + return !(block_size - in_sector_pos < size || + no_same_sector_overwrites && sector_info[cur_sector].written); + } }; struct blockstore_journal_check_t { blockstore_impl_t *bs; uint64_t next_pos, next_sector, next_in_pos; - int sectors_required, first_sector; + int sectors_required, first_sector; // "sectors to write" bool right_dir; // writing to the end or the beginning of the ring buffer blockstore_journal_check_t(blockstore_impl_t *bs); diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index c06e40b2..1a521f4e 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -83,35 +83,27 @@ skip_ov: // Prepare and submit journal entries auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); }; int s = 0, cur_sector = -1; - if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_rollback) && - journal.sector_info[journal.cur_sector].dirty) - { - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); - cur_sector = ((journal.cur_sector + 1) % journal.sector_count); - } for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { + if (!journal.entry_fits(sizeof(journal_entry_rollback)) && + journal.sector_info[journal.cur_sector].dirty) + { + if (cur_sector == -1) + PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + cur_sector = journal.cur_sector; + } journal_entry_rollback *je = (journal_entry_rollback*) prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback)); - journal.sector_info[journal.cur_sector].dirty = false; je->oid = v->oid; je->version = v->version; je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; - if (cur_sector != journal.cur_sector) - { - // Write previous sector. We should write the sector only after filling it, - // because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode - if (cur_sector != -1) - prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); - else - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - cur_sector = journal.cur_sector; - } } - if (cur_sector != -1) - prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + assert(s == space_check.sectors_required); + if (cur_sector == -1) + PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 5b507e50..b065603d 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -106,36 +106,28 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Prepare and submit journal entries auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); }; int s = 0, cur_sector = -1; - if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_stable) && - journal.sector_info[journal.cur_sector].dirty) - { - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); - cur_sector = ((journal.cur_sector + 1) % journal.sector_count); - } for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { // FIXME: Only stabilize versions that aren't stable yet + if (!journal.entry_fits(sizeof(journal_entry_stable)) && + journal.sector_info[journal.cur_sector].dirty) + { + if (cur_sector == -1) + PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + cur_sector = journal.cur_sector; + } journal_entry_stable *je = (journal_entry_stable*) prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); - journal.sector_info[journal.cur_sector].dirty = false; je->oid = v->oid; je->version = v->version; je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; - if (cur_sector != journal.cur_sector) - { - // Write previous sector. We should write the sector only after filling it, - // because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode - if (cur_sector != -1) - prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); - else - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - cur_sector = journal.cur_sector; - } } - if (cur_sector != -1) - prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + assert(s == space_check.sectors_required); + if (cur_sector == -1) + PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; PRIV(op)->op_state = 1; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 1ba99f01..59694437 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -120,21 +120,21 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) // Prepare and submit journal entries auto it = PRIV(op)->sync_big_writes.begin(); int s = 0, cur_sector = -1; - if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_big_write) && - journal.sector_info[journal.cur_sector].dirty) - { - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); - cur_sector = ((journal.cur_sector + 1) % journal.sector_count); - } while (it != PRIV(op)->sync_big_writes.end()) { + if (!journal.entry_fits(sizeof(journal_entry_big_write)) && + journal.sector_info[journal.cur_sector].dirty) + { + if (cur_sector == -1) + PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + cur_sector = journal.cur_sector; + } journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( journal, (dirty_db[*it].state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, sizeof(journal_entry_big_write) ); dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset; - journal.sector_info[journal.cur_sector].dirty = false; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG printf( @@ -151,19 +151,11 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; it++; - if (cur_sector != journal.cur_sector) - { - // Write the previous sector. We should write the sector only after filling it, - // because otherwise we'll write a lot more sectors in the "no_same_sector_overwrite" mode - if (cur_sector != -1) - prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); - else - PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; - cur_sector = journal.cur_sector; - } } - if (cur_sector != -1) - prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); + prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); + assert(s == space_check.sectors_required); + if (cur_sector == -1) + PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->pending_ops = s; PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index e18905b5..e7bb92c3 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -377,7 +377,6 @@ resume_2: sizeof(journal_entry_big_write) ); dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; - journal.sector_info[journal.cur_sector].dirty = false; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG printf(