From 416a80b099f3ac948b19ab5964e12d19e3c2ad4b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 4 Jul 2020 22:15:58 +0300 Subject: [PATCH] Make blockstore object state a combination of type and workflow --- blockstore_flush.cpp | 6 ++--- blockstore_impl.h | 44 +++++++++++++++---------------------- blockstore_init.cpp | 6 ++--- blockstore_read.cpp | 9 ++++---- blockstore_stable.cpp | 14 +++--------- blockstore_sync.cpp | 14 ++++++------ blockstore_write.cpp | 51 +++++++++++++++++++------------------------ 7 files changed, 60 insertions(+), 84 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 08cd8545..ef11717b 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -530,7 +530,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) clean_init_bitmap = false; while (1) { - if (dirty_it->second.state == ST_J_STABLE && !skip_copy) + if (dirty_it->second.state == (BS_ST_SMALL_WRITE | BS_ST_STABLE) && !skip_copy) { // First we submit all reads has_writes = true; @@ -573,7 +573,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) } } } - else if (dirty_it->second.state == ST_D_STABLE && !skip_copy) + else if (dirty_it->second.state == (BS_ST_BIG_WRITE | BS_ST_STABLE) && !skip_copy) { // There is an unflushed big write. Copy small writes in its position has_writes = true; @@ -583,7 +583,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) clean_bitmap_len = dirty_it->second.len; skip_copy = true; } - else if (dirty_it->second.state == ST_DEL_STABLE && !skip_copy) + else if (dirty_it->second.state == (BS_ST_DELETE | BS_ST_STABLE) && !skip_copy) { // There is an unflushed delete has_delete = true; diff --git a/blockstore_impl.h b/blockstore_impl.h index 7a0528f1..b7e5906d 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -22,40 +22,30 @@ //#define BLOCKSTORE_DEBUG // States are not stored on disk. Instead, they're deduced from the journal -// FIXME: Rename to BS_ST_* -#define ST_J_WAIT_BIG 1 -#define ST_J_IN_FLIGHT 2 -#define ST_J_SUBMITTED 3 -#define ST_J_WRITTEN 4 -#define ST_J_SYNCED 5 -#define ST_J_STABLE 6 +#define BS_ST_SMALL_WRITE 0x01 +#define BS_ST_BIG_WRITE 0x02 +#define BS_ST_DELETE 0x03 -#define ST_D_IN_FLIGHT 15 -#define ST_D_SUBMITTED 16 -#define ST_D_WRITTEN 17 -#define ST_D_SYNCED 20 -#define ST_D_STABLE 21 - -#define ST_DEL_IN_FLIGHT 31 -#define ST_DEL_SUBMITTED 32 -#define ST_DEL_WRITTEN 33 -#define ST_DEL_SYNCED 34 -#define ST_DEL_STABLE 35 - -#define ST_CURRENT 48 +#define BS_ST_WAIT_BIG 0x10 +#define BS_ST_IN_FLIGHT 0x20 +#define BS_ST_SUBMITTED 0x30 +#define BS_ST_WRITTEN 0x40 +#define BS_ST_SYNCED 0x50 +#define BS_ST_STABLE 0x60 #define IMMEDIATE_NONE 0 #define IMMEDIATE_SMALL 1 #define IMMEDIATE_ALL 2 -#define IS_IN_FLIGHT(st) (st == ST_J_WAIT_BIG || st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) -#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT) -#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_SYNCED || st == ST_DEL_SYNCED) -#define IS_JOURNAL(st) (st >= ST_J_WAIT_BIG && st <= ST_J_STABLE) -#define IS_BIG_WRITE(st) (st >= ST_D_IN_FLIGHT && st <= ST_D_STABLE) -#define IS_DELETE(st) (st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_STABLE) -#define IS_UNSYNCED(st) (st >= ST_J_WAIT_BIG && st <= ST_J_WRITTEN || st >= ST_D_IN_FLIGHT && st <= ST_D_WRITTEN|| st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_WRITTEN) +#define BS_ST_TYPE_MASK 0x0F +#define BS_ST_WORKFLOW_MASK 0xF0 +#define IS_IN_FLIGHT(st) (((st) & 0xF0) <= BS_ST_SUBMITTED) +#define IS_STABLE(st) (((st) & 0xF0) == BS_ST_STABLE) +#define IS_SYNCED(st) (((st) & 0xF0) >= BS_ST_SYNCED) +#define IS_JOURNAL(st) (((st) & 0x0F) == BS_ST_SMALL_WRITE) +#define IS_BIG_WRITE(st) (((st) & 0x0F) == BS_ST_BIG_WRITE) +#define IS_DELETE(st) (((st) & 0x0F) == BS_ST_DELETE) #define BS_SUBMIT_GET_SQE(sqe, data) \ BS_SUBMIT_GET_ONLY_SQE(sqe); \ diff --git a/blockstore_init.cpp b/blockstore_init.cpp index c3dbcc67..bf10bbef 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -528,7 +528,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u .version = je->small_write.version, }; bs->dirty_db.emplace(ov, (dirty_entry){ - .state = ST_J_SYNCED, + .state = (BS_ST_SMALL_WRITE | BS_ST_SYNCED), .flags = 0, .location = location, .offset = je->small_write.offset, @@ -561,7 +561,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u .version = je->big_write.version, }; bs->dirty_db.emplace(ov, (dirty_entry){ - .state = ST_D_SYNCED, + .state = (BS_ST_BIG_WRITE | BS_ST_SYNCED), .flags = 0, .location = je->big_write.location, .offset = je->big_write.offset, @@ -616,7 +616,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u .version = je->del.version, }; bs->dirty_db.emplace(ov, (dirty_entry){ - .state = ST_DEL_SYNCED, + .state = (BS_ST_DELETE | BS_ST_SYNCED), .flags = 0, .location = 0, .offset = 0, diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 49f69bdf..f60fbdd4 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -157,7 +157,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) { if (!clean_entry_bitmap_size) { - if (!fulfill_read(read_op, fulfilled, 0, block_size, ST_CURRENT, 0, clean_it->second.location)) + if (!fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_BIG_WRITE | BS_ST_STABLE), 0, clean_it->second.location)) { // need to wait. undo added requests, don't dequeue op PRIV(read_op)->read_vec.clear(); @@ -189,7 +189,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) { // fill with zeroes fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity, - bmp_end * bitmap_granularity, ST_DEL_STABLE, 0, 0); + bmp_end * bitmap_granularity, (BS_ST_DELETE | BS_ST_STABLE), 0, 0); } bmp_start = bmp_end; while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size) @@ -199,7 +199,8 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) if (bmp_end > bmp_start) { if (!fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity, - bmp_end * bitmap_granularity, ST_CURRENT, 0, clean_it->second.location + bmp_start * bitmap_granularity)) + bmp_end * bitmap_granularity, (BS_ST_BIG_WRITE | BS_ST_STABLE), 0, + clean_it->second.location + bmp_start * bitmap_granularity)) { // need to wait. undo added requests, don't dequeue op PRIV(read_op)->read_vec.clear(); @@ -214,7 +215,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op) else if (fulfilled < read_op->len) { // fill remaining parts with zeroes - fulfill_read(read_op, fulfilled, 0, block_size, ST_DEL_STABLE, 0, 0); + fulfill_read(read_op, fulfilled, 0, block_size, (BS_ST_DELETE | BS_ST_STABLE), 0, 0); } assert(fulfilled == read_op->len); read_op->version = result_version; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index a8dda0e9..0e1ca8e8 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -64,7 +64,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op) // Already stable } } - else if (IS_UNSYNCED(dirty_it->second.state)) + else if (!IS_SYNCED(dirty_it->second.state)) { // Object not synced yet. Caller must sync it first op->retval = -EBUSY; @@ -184,17 +184,9 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v) { while (1) { - if (dirty_it->second.state == ST_J_SYNCED) + if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_SYNCED) { - dirty_it->second.state = ST_J_STABLE; - } - else if (dirty_it->second.state == ST_D_SYNCED) - { - dirty_it->second.state = ST_D_STABLE; - } - else if (dirty_it->second.state == ST_DEL_SYNCED) - { - dirty_it->second.state = ST_DEL_STABLE; + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_STABLE; } else if (IS_STABLE(dirty_it->second.state)) { diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 2e97660c..37f6dd02 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -257,13 +257,13 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) auto & unstab = unstable_writes[it->oid]; unstab = unstab < it->version ? it->version : unstab; auto dirty_it = dirty_db.find(*it); - dirty_it->second.state = ST_D_SYNCED; + dirty_it->second.state = (BS_ST_BIG_WRITE | BS_ST_SYNCED); dirty_it++; while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid) { - if (dirty_it->second.state == ST_J_WAIT_BIG) + if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG) { - dirty_it->second.state = ST_J_IN_FLIGHT; + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT; } dirty_it++; } @@ -275,15 +275,15 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) #endif auto & unstab = unstable_writes[it->oid]; unstab = unstab < it->version ? it->version : unstab; - if (dirty_db[*it].state == ST_DEL_WRITTEN) + if (dirty_db[*it].state == (BS_ST_DELETE | BS_ST_WRITTEN)) { - dirty_db[*it].state = ST_DEL_SYNCED; + dirty_db[*it].state = (BS_ST_DELETE | BS_ST_SYNCED); // Deletions are treated as immediately stable mark_stable(*it); } - else /* == ST_J_WRITTEN */ + else /* BS_ST_SMALL_WRITE | BS_ST_WRITTEN */ { - dirty_db[*it].state = ST_J_SYNCED; + dirty_db[*it].state = (BS_ST_SMALL_WRITE | BS_ST_SYNCED); } } in_progress_syncs.erase(PRIV(op)->in_progress_ptr); diff --git a/blockstore_write.cpp b/blockstore_write.cpp index b4f674cd..5636915f 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -18,9 +18,9 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) found = true; version = dirty_it->first.version + 1; deleted = IS_DELETE(dirty_it->second.state); - is_inflight_big = dirty_it->second.state >= ST_D_IN_FLIGHT && - dirty_it->second.state < ST_D_SYNCED || - dirty_it->second.state == ST_J_WAIT_BIG; + is_inflight_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE + ? !IS_SYNCED(dirty_it->second.state) + : ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG); } } if (!found) @@ -77,8 +77,10 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) }, (dirty_entry){ .state = (uint32_t)( is_del - ? ST_DEL_IN_FLIGHT - : (op->len == block_size || deleted ? ST_D_IN_FLIGHT : (is_inflight_big ? ST_J_WAIT_BIG : ST_J_IN_FLIGHT)) + ? (BS_ST_DELETE | BS_ST_IN_FLIGHT) + : (op->len == block_size || deleted + ? (BS_ST_BIG_WRITE | BS_ST_IN_FLIGHT) + : (is_inflight_big ? (BS_ST_SMALL_WRITE | BS_ST_WAIT_BIG) : (BS_ST_SMALL_WRITE | BS_ST_IN_FLIGHT))) ), .flags = 0, .location = 0, @@ -101,11 +103,12 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) .version = op->version, }); assert(dirty_it != dirty_db.end()); - if (dirty_it->second.state == ST_J_WAIT_BIG) + if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG) { + // Don't dequeue return 0; } - else if (dirty_it->second.state == ST_D_IN_FLIGHT) + else if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) { blockstore_journal_check_t space_check(this); if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION)) @@ -129,7 +132,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) } BS_SUBMIT_GET_SQE(sqe, data); dirty_it->second.location = loc << block_order; - dirty_it->second.state = ST_D_SUBMITTED; + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED; #ifdef BLOCKSTORE_DEBUG printf("Allocate block %lu\n", loc); #endif @@ -169,7 +172,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) PRIV(op)->op_state = 1; } } - else + else /* if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_SMALL_WRITE) */ { // Small (journaled) write // First check if the journal has sufficient space @@ -257,7 +260,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) // Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data } dirty_it->second.location = journal.next_free; - dirty_it->second.state = ST_J_SUBMITTED; + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SUBMITTED; journal.next_free += op->len; if (journal.next_free >= journal.len) { @@ -336,7 +339,7 @@ resume_4: #ifdef BLOCKSTORE_DEBUG printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); #endif - bool imm = dirty_it->second.state == ST_D_SUBMITTED + bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE); if (imm) @@ -344,31 +347,21 @@ resume_4: auto & unstab = unstable_writes[op->oid]; unstab = unstab < op->version ? op->version : unstab; } - if (dirty_it->second.state == ST_J_SUBMITTED) + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) + | (imm ? BS_ST_SYNCED : BS_ST_WRITTEN); + if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE) { - dirty_it->second.state = imm ? ST_J_SYNCED : ST_J_WRITTEN; - } - else if (dirty_it->second.state == ST_D_SUBMITTED) - { - dirty_it->second.state = imm ? ST_D_SYNCED : ST_D_WRITTEN; - } - else if (dirty_it->second.state == ST_DEL_SUBMITTED) - { - dirty_it->second.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN; - if (imm) - { - // Deletions are treated as immediately stable - mark_stable(dirty_it->first); - } + // Deletions are treated as immediately stable + mark_stable(dirty_it->first); } if (immediate_commit == IMMEDIATE_ALL) { dirty_it++; while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid) { - if (dirty_it->second.state == ST_J_WAIT_BIG) + if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG) { - dirty_it->second.state = ST_J_IN_FLIGHT; + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT; } dirty_it++; } @@ -487,7 +480,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) je->version = op->version; je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; - dirty_it->second.state = ST_DEL_SUBMITTED; + dirty_it->second.state = BS_ST_DELETE | BS_ST_SUBMITTED; if (immediate_commit != IMMEDIATE_NONE) { prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);