From ec7acc8f3ab1d9d1ef61082e8852cc223e293c7a Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 5 Jul 2020 01:48:02 +0300 Subject: [PATCH] Add WRITE_STABLE operation for future replication support --- blockstore.h | 19 ++++++++++--------- blockstore_impl.cpp | 10 +++++----- blockstore_impl.h | 2 ++ blockstore_init.cpp | 25 +++++++++++++++++++++---- blockstore_journal.h | 4 +++- blockstore_sync.cpp | 20 +++++++++++++++----- blockstore_write.cpp | 20 +++++++++++++------- dump_journal.cpp | 13 +++++++++---- 8 files changed, 78 insertions(+), 35 deletions(-) diff --git a/blockstore.h b/blockstore.h index 97849ab3..50cdb5c5 100644 --- a/blockstore.h +++ b/blockstore.h @@ -27,13 +27,14 @@ #define BS_OP_MIN 1 #define BS_OP_READ 1 #define BS_OP_WRITE 2 -#define BS_OP_SYNC 3 -#define BS_OP_STABLE 4 -#define BS_OP_DELETE 5 -#define BS_OP_LIST 6 -#define BS_OP_ROLLBACK 7 -#define BS_OP_SYNC_STAB_ALL 8 -#define BS_OP_MAX 8 +#define BS_OP_WRITE_STABLE 3 +#define BS_OP_SYNC 4 +#define BS_OP_STABLE 5 +#define BS_OP_DELETE 6 +#define BS_OP_LIST 7 +#define BS_OP_ROLLBACK 8 +#define BS_OP_SYNC_STAB_ALL 9 +#define BS_OP_MAX 9 #define BS_OP_PRIVATE_DATA_SIZE 256 @@ -41,9 +42,9 @@ Blockstore opcode documentation: -## BS_OP_READ / BS_OP_WRITE +## BS_OP_READ / BS_OP_WRITE / BS_OP_WRITE_STABLE -Read or write object data. +Read or write object data. WRITE_STABLE writes a version that doesn't require marking as stable. Input: - oid = requested object diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index c6c60f15..c4d77f49 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -130,7 +130,7 @@ void blockstore_impl_t::loop() } else if (PRIV(op)->wait_for) { - if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) + if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) { has_writes = 2; } @@ -144,7 +144,7 @@ void blockstore_impl_t::loop() { dequeue_op = dequeue_read(op); } - else if (op->opcode == BS_OP_WRITE) + else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) { if (has_writes == 2) { @@ -329,13 +329,13 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) { if (op->opcode < BS_OP_MIN || op->opcode > BS_OP_MAX || - ((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE) && ( + ((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) && ( op->offset >= block_size || op->len > block_size-op->offset || (op->len % disk_alignment) )) || readonly && op->opcode != BS_OP_READ && op->opcode != BS_OP_LIST || - first && op->opcode == BS_OP_WRITE) + first && (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE)) { // Basic verification not passed op->retval = -EINVAL; @@ -380,7 +380,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) } }; } - if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) && !enqueue_write(op)) + if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op)) { std::function(op->callback)(op); return; diff --git a/blockstore_impl.h b/blockstore_impl.h index b7e5906d..50216d57 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -34,6 +34,8 @@ #define BS_ST_SYNCED 0x50 #define BS_ST_STABLE 0x60 +#define BS_ST_INSTANT 0x100 + #define IMMEDIATE_NONE 0 #define IMMEDIATE_SMALL 1 #define IMMEDIATE_ALL 2 diff --git a/blockstore_init.cpp b/blockstore_init.cpp index bf10bbef..a177e493 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -454,10 +454,15 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u break; } } - if (je->type == JE_SMALL_WRITE) + if (je->type == JE_SMALL_WRITE || je->type == JE_SMALL_WRITE_INSTANT) { #ifdef BLOCKSTORE_DEBUG - printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version, je->small_write.offset, je->small_write.len); + printf( + "je_small_write%s oid=%lu:%lu ver=%lu offset=%u len=%u\n", + je->type == JE_SMALL_WRITE_INSTANT ? "_instant" : "", + je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version, + je->small_write.offset, je->small_write.len + ); #endif // oid, version, offset, len uint64_t prev_free = next_free; @@ -544,12 +549,20 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u #endif auto & unstab = bs->unstable_writes[ov.oid]; unstab = unstab < ov.version ? ov.version : unstab; + if (je->type == JE_SMALL_WRITE_INSTANT) + { + bs->mark_stable(ov); + } } } - else if (je->type == JE_BIG_WRITE) + else if (je->type == JE_BIG_WRITE || je->type == JE_BIG_WRITE_INSTANT) { #ifdef BLOCKSTORE_DEBUG - printf("je_big_write oid=%lu:%lu ver=%lu loc=%lu\n", je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location); + printf( + "je_big_write%s oid=%lu:%lu ver=%lu loc=%lu\n", + je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "", + je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location + ); #endif auto clean_it = bs->clean_db.find(je->big_write.oid); if (clean_it == bs->clean_db.end() || @@ -575,6 +588,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u bs->journal.used_sectors[proc_pos]++; auto & unstab = bs->unstable_writes[ov.oid]; unstab = unstab < ov.version ? ov.version : unstab; + if (je->type == JE_BIG_WRITE_INSTANT) + { + bs->mark_stable(ov); + } } } else if (je->type == JE_STABLE) diff --git a/blockstore_journal.h b/blockstore_journal.h index ef6af539..b7ddf430 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -19,7 +19,9 @@ #define JE_STABLE 0x04 #define JE_DELETE 0x05 #define JE_ROLLBACK 0x06 -#define JE_MAX 0x06 +#define JE_SMALL_WRITE_INSTANT 0x07 +#define JE_BIG_WRITE_INSTANT 0x08 +#define JE_MAX 0x08 // crc32c comes first to ease calculation and is equal to crc32() struct __attribute__((__packed__)) journal_entry_start diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 37f6dd02..acfb5792 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -127,8 +127,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) } while (it != PRIV(op)->sync_big_writes.end()) { - journal_entry_big_write *je = (journal_entry_big_write*) - prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write)); + journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( + journal, (dirty_db[*it].state & BS_ST_INSTANT) ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, + sizeof(journal_entry_big_write) + ); dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset; journal.sector_info[journal.cur_sector].dirty = false; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; @@ -257,7 +259,11 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) auto & unstab = unstable_writes[it->oid]; unstab = unstab < it->version ? it->version : unstab; auto dirty_it = dirty_db.find(*it); - dirty_it->second.state = (BS_ST_BIG_WRITE | BS_ST_SYNCED); + dirty_it->second.state = ((dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED); + if (dirty_it->second.state & BS_ST_INSTANT) + { + mark_stable(dirty_it->first); + } dirty_it++; while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid) { @@ -281,9 +287,13 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op) // Deletions are treated as immediately stable mark_stable(*it); } - else /* BS_ST_SMALL_WRITE | BS_ST_WRITTEN */ + else /* (BS_ST_INSTANT?) | BS_ST_SMALL_WRITE | BS_ST_WRITTEN */ { - dirty_db[*it].state = (BS_ST_SMALL_WRITE | BS_ST_SYNCED); + dirty_db[*it].state = (dirty_db[*it].state & ~BS_ST_WORKFLOW_MASK) | BS_ST_SYNCED; + if (dirty_db[*it].state & BS_ST_INSTANT) + { + mark_stable(*it); + } } } in_progress_syncs.erase(PRIV(op)->in_progress_ptr); diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 5636915f..59997d97 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -78,7 +78,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) .state = (uint32_t)( is_del ? (BS_ST_DELETE | BS_ST_IN_FLIGHT) - : (op->len == block_size || deleted + : (op->opcode == BS_OP_WRITE_STABLE ? BS_ST_INSTANT : 0) | (op->len == block_size || deleted ? (BS_ST_BIG_WRITE | BS_ST_IN_FLIGHT) : (is_inflight_big ? (BS_ST_SMALL_WRITE | BS_ST_WAIT_BIG) : (BS_ST_SMALL_WRITE | BS_ST_IN_FLIGHT))) ), @@ -212,8 +212,10 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) } } // Then pre-fill journal entry - journal_entry_small_write *je = (journal_entry_small_write*) - prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(journal_entry_small_write)); + journal_entry_small_write *je = (journal_entry_small_write*)prefill_single_journal_entry( + journal, op->opcode == BS_OP_WRITE_STABLE ? JE_SMALL_WRITE_INSTANT : JE_SMALL_WRITE, + sizeof(journal_entry_small_write) + ); dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG @@ -310,7 +312,10 @@ resume_2: { return 0; } - je = (journal_entry_big_write*)prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write)); + je = (journal_entry_big_write*)prefill_single_journal_entry( + journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, + sizeof(journal_entry_big_write) + ); dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; journal.sector_info[journal.cur_sector].dirty = false; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; @@ -349,7 +354,7 @@ resume_4: } dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | (imm ? BS_ST_SYNCED : BS_ST_WRITTEN); - if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE) + if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT))) { // Deletions are treated as immediately stable mark_stable(dirty_it->first); @@ -465,8 +470,9 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) } } // Pre-fill journal entry - journal_entry_del *je = (journal_entry_del*) - prefill_single_journal_entry(journal, JE_DELETE, sizeof(struct journal_entry_del)); + journal_entry_del *je = (journal_entry_del*)prefill_single_journal_entry( + journal, JE_DELETE, sizeof(struct journal_entry_del) + ); dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG diff --git a/dump_journal.cpp b/dump_journal.cpp index bb266094..37f86509 100644 --- a/dump_journal.cpp +++ b/dump_journal.cpp @@ -104,10 +104,11 @@ void journal_dump_t::dump_block(void *buf) { printf("je_start start=%08lx\n", je->start.journal_start); } - else if (je->type == JE_SMALL_WRITE) + else if (je->type == JE_SMALL_WRITE || je->type == JE_SMALL_WRITE_INSTANT) { printf( - "je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u loc=%08lx", + "je_small_write%s oid=%lu:%lu ver=%lu offset=%u len=%u loc=%08lx", + je->type == JE_SMALL_WRITE_INSTANT ? "_instant" : "", je->small_write.oid.inode, je->small_write.oid.stripe, je->small_write.version, je->small_write.offset, je->small_write.len, je->small_write.data_offset @@ -139,9 +140,13 @@ void journal_dump_t::dump_block(void *buf) ); printf("\n"); } - else if (je->type == JE_BIG_WRITE) + else if (je->type == JE_BIG_WRITE || je->type == JE_BIG_WRITE_INSTANT) { - printf("je_big_write oid=%lu:%lu ver=%lu loc=%08lx\n", je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location); + printf( + "je_big_write%s oid=%lu:%lu ver=%lu loc=%08lx\n", + je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "", + je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location + ); } else if (je->type == JE_STABLE) {