From 2b09710d6ff7e247942fce7353677e74701af2bd Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 24 Jan 2020 20:10:18 +0300 Subject: [PATCH] Implement blockstore rollback operation Rollback operation is required for the primary OSD to kill unstable object versions in OSD peers so they don't occupy journal space --- Makefile | 2 +- blockstore.h | 4 +- blockstore_flush.cpp | 36 +------- blockstore_flush.h | 2 +- blockstore_impl.cpp | 29 ++++--- blockstore_impl.h | 9 +- blockstore_init.cpp | 53 +++++++++++- blockstore_journal.h | 13 +++ blockstore_rollback.cpp | 187 ++++++++++++++++++++++++++++++++++++++++ blockstore_stable.cpp | 2 +- blockstore_write.cpp | 2 +- osd_exec_secondary.cpp | 4 + osd_ops.h | 3 + 13 files changed, 291 insertions(+), 55 deletions(-) create mode 100644 blockstore_rollback.cpp diff --git a/Makefile b/Makefile index 80864b64..1048b7b5 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ - blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o + blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd clean: diff --git a/blockstore.h b/blockstore.h index c1a75c9c..6377e00c 100644 --- a/blockstore.h +++ b/blockstore.h @@ -25,8 +25,8 @@ #define BS_OP_STABLE 4 #define BS_OP_DELETE 5 #define BS_OP_LIST 6 -#define BS_OP_MAX 6 -#define BS_OP_TYPE_MASK 0x7 +#define BS_OP_ROLLBACK 7 +#define BS_OP_MAX 7 #define BS_OP_PRIVATE_DATA_SIZE 256 diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 529eb647..b2798055 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -438,7 +438,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) { goto resume_0; } - dirty_it = dirty_end; + dirty_it = dirty_start = dirty_end; v.clear(); wait_count = 0; copy_count = 0; @@ -511,6 +511,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) ); throw std::runtime_error(err); } + dirty_start = dirty_it; if (dirty_it == bs->dirty_db.begin()) { break; @@ -593,38 +594,7 @@ void journal_flusher_co::update_clean_db() .location = clean_loc, }; } - dirty_it = dirty_end; - while (1) - { - if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) - { -#ifdef BLOCKSTORE_DEBUG - printf("Free block %lu\n", dirty_it->second.location >> bs->block_order); -#endif - bs->data_alloc->set(dirty_it->second.location >> bs->block_order, false); - } -#ifdef BLOCKSTORE_DEBUG - printf("remove usage of journal offset %lu by %lu:%lu v%lu\n", dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version); -#endif - int used = --bs->journal.used_sectors[dirty_it->second.journal_sector]; - if (used == 0) - { - bs->journal.used_sectors.erase(dirty_it->second.journal_sector); - } - if (dirty_it == bs->dirty_db.begin()) - { - break; - } - dirty_it--; - if (dirty_it->first.oid != cur.oid) - { - break; - } - } - // Then, basically, remove everything up to the current version from dirty_db... - if (dirty_it->first.oid != cur.oid) - dirty_it++; - bs->dirty_db.erase(dirty_it, std::next(dirty_end)); + bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc); } bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) diff --git a/blockstore_flush.h b/blockstore_flush.h index dfa514ba..cdf726e4 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -41,7 +41,7 @@ class journal_flusher_co std::list::iterator cur_sync; obj_ver_id cur; - std::map::iterator dirty_it, dirty_end; + std::map::iterator dirty_it, dirty_start, dirty_end; std::map::iterator repeat_it; std::function simple_callback_r, simple_callback_w; diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 1f36e0cc..9f4cb496 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -127,8 +127,7 @@ void blockstore_impl_t::loop() } else if (PRIV(op)->wait_for) { - if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE || - (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE) + if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) { has_writes = 2; } @@ -138,12 +137,11 @@ void blockstore_impl_t::loop() unsigned ring_space = ringloop->space_left(); unsigned prev_sqe_pos = ringloop->save(); int dequeue_op = 0; - if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_READ) + if (op->opcode == BS_OP_READ) { dequeue_op = dequeue_read(op); } - else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_WRITE || - (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE) + else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) { if (has_writes == 2) { @@ -153,7 +151,7 @@ void blockstore_impl_t::loop() dequeue_op = dequeue_write(op); has_writes = dequeue_op ? 1 : 2; } - else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_SYNC) + else if (op->opcode == BS_OP_SYNC) { // wait for all small writes to be submitted // wait for all big writes to complete, submit data device fsync @@ -166,11 +164,15 @@ void blockstore_impl_t::loop() } dequeue_op = dequeue_sync(op); } - else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_STABLE) + else if (op->opcode == BS_OP_STABLE) { dequeue_op = dequeue_stable(op); } - else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_LIST) + else if (op->opcode == BS_OP_ROLLBACK) + { + dequeue_op = dequeue_rollback(op); + } + else if (op->opcode == BS_OP_LIST) { process_list(op); dequeue_op = true; @@ -296,15 +298,14 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) { - int type = op->opcode & BS_OP_TYPE_MASK; - if (type < BS_OP_MIN || type > BS_OP_MAX || - ((type == BS_OP_READ || type == BS_OP_WRITE) && ( + if (op->opcode < BS_OP_MIN || op->opcode > BS_OP_MAX || + ((op->opcode == BS_OP_READ || op->opcode == BS_OP_WRITE) && ( op->offset >= block_size || op->len > block_size-op->offset || (op->len % disk_alignment) )) || - readonly && type != BS_OP_READ || - first && type == BS_OP_WRITE) + readonly && op->opcode != BS_OP_READ || + first && op->opcode == BS_OP_WRITE) { // Basic verification not passed op->retval = -EINVAL; @@ -324,7 +325,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) { submit_queue.push_front(op); } - if (type == BS_OP_WRITE) + if (op->opcode == BS_OP_WRITE) { enqueue_write(op); } diff --git a/blockstore_impl.h b/blockstore_impl.h index 9c259116..721c4d76 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -167,6 +167,8 @@ struct blockstore_op_private_t int sync_state, prev_sync_count; }; +typedef std::map blockstore_dirty_db_t; + #include "blockstore_init.h" #include "blockstore_flush.h" @@ -199,7 +201,7 @@ class blockstore_impl_t // Another option is https://github.com/algorithm-ninja/cpp-btree spp::sparse_hash_map clean_db; uint8_t *clean_bitmap = NULL; - std::map dirty_db; + blockstore_dirty_db_t dirty_db; std::list submit_queue; // FIXME: funny thing is that vector is better here std::vector unsynced_big_writes, unsynced_small_writes; std::list in_progress_syncs; // ...and probably here, too @@ -278,6 +280,11 @@ class blockstore_impl_t void handle_stable_event(ring_data_t *data, blockstore_op_t *op); void stabilize_object(object_id oid, uint64_t max_ver); + // Rollback + int dequeue_rollback(blockstore_op_t *op); + void handle_rollback_event(ring_data_t *data, blockstore_op_t *op); + void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc); + // List void process_list(blockstore_op_t *op); diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 4a068a39..bdbe12d6 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -559,7 +559,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u if (it == bs->dirty_db.end()) { // journal contains a legitimate STABLE entry for a non-existing dirty write - // this probably means that journal was trimmed between WRITTEN and STABLE entries + // this probably means that journal was trimmed between WRITE and STABLE entries // skip it } else @@ -583,6 +583,57 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u bs->unstable_writes.erase(unstab_it); } } + else if (je->type == JE_ROLLBACK) + { +#ifdef BLOCKSTORE_DEBUG + printf("je_rollback oid=%lu:%lu ver=%lu\n", je->rollback.oid.inode, je->rollback.oid.stripe, je->rollback.version); +#endif + // rollback dirty writes of up to + auto it = bs->dirty_db.lower_bound((obj_ver_id){ + .oid = je->rollback.oid, + .version = UINT64_MAX, + }); + if (it != bs->dirty_db.begin()) + { + uint64_t max_unstable = 0; + auto rm_start = it; + auto rm_end = it; + it--; + while (it->first.oid == je->rollback.oid && + it->first.version > je->rollback.version && + !IS_IN_FLIGHT(it->second.state) && + !IS_STABLE(it->second.state)) + { + if (it->first.oid != je->rollback.oid) + break; + else if (it->first.version <= je->rollback.version) + { + if (!IS_STABLE(it->second.state)) + max_unstable = it->first.version; + break; + } + else if (IS_STABLE(it->second.state)) + break; + // Remove entry + rm_start = it; + if (it == bs->dirty_db.begin()) + break; + it--; + } + if (rm_start != rm_end) + { + bs->erase_dirty(rm_start, rm_end, UINT64_MAX); + } + auto unstab_it = bs->unstable_writes.find(je->rollback.oid); + if (unstab_it != bs->unstable_writes.end()) + { + if (max_unstable == 0) + bs->unstable_writes.erase(unstab_it); + else + unstab_it->second = max_unstable; + } + } + } else if (je->type == JE_DELETE) { #ifdef BLOCKSTORE_DEBUG diff --git a/blockstore_journal.h b/blockstore_journal.h index af15dd57..5d14985a 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -17,6 +17,7 @@ #define JE_BIG_WRITE 0x03 #define JE_STABLE 0x04 #define JE_DELETE 0x05 +#define JE_ROLLBACK 0x06 // crc32c comes first to ease calculation and is equal to crc32() struct __attribute__((__packed__)) journal_entry_start @@ -71,6 +72,17 @@ struct __attribute__((__packed__)) journal_entry_stable uint64_t version; }; +struct __attribute__((__packed__)) journal_entry_rollback +{ + uint32_t crc32; + uint16_t magic; + uint16_t type; + uint32_t size; + uint32_t crc32_prev; + object_id oid; + uint64_t version; +}; + struct __attribute__((__packed__)) journal_entry_del { uint32_t crc32; @@ -98,6 +110,7 @@ struct __attribute__((__packed__)) journal_entry journal_entry_small_write small_write; journal_entry_big_write big_write; journal_entry_stable stable; + journal_entry_rollback rollback; journal_entry_del del; }; }; diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp new file mode 100644 index 00000000..aa259196 --- /dev/null +++ b/blockstore_rollback.cpp @@ -0,0 +1,187 @@ +#include "blockstore_impl.h" + +int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op) +{ + obj_ver_id* v; + int i, todo = op->len; + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + { + // Check that there are some versions greater than v->version (which may be zero), + // check that they're unstable, synced, and not currently written to + auto dirty_it = dirty_db.lower_bound((obj_ver_id){ + .oid = v->oid, + .version = UINT64_MAX, + }); + if (dirty_it == dirty_db.begin()) + { + bad_op: + op->retval = -EINVAL; + FINISH_OP(op); + return 1; + } + else + { + dirty_it--; + if (dirty_it->first.oid != v->oid || dirty_it->first.version < v->version) + { + goto bad_op; + } + while (dirty_it->first.oid == v->oid && dirty_it->first.version > v->version) + { + if (!IS_SYNCED(dirty_it->second.state) || + IS_STABLE(dirty_it->second.state)) + { + goto bad_op; + } + if (dirty_it == dirty_db.begin()) + { + break; + } + dirty_it--; + } + } + } + // Check journal space + blockstore_journal_check_t space_check(this); + if (!space_check.check_available(op, todo, sizeof(journal_entry_rollback), 0)) + { + return 0; + } + // There is sufficient space. Get SQEs + struct io_uring_sqe *sqe[space_check.sectors_required]; + for (i = 0; i < space_check.sectors_required; i++) + { + BS_SUBMIT_GET_SQE_DECL(sqe[i]); + } + // Prepare and submit journal entries + auto cb = [this, op](ring_data_t *data) { handle_rollback_event(data, op); }; + int s = 0, cur_sector = -1; + if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_rollback) && + journal.sector_info[journal.cur_sector].dirty) + { + if (cur_sector == -1) + PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; + cur_sector = journal.cur_sector; + prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); + } + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + { + // FIXME This is here only for the purpose of tracking unstable_writes. Remove if not required + // FIXME ...aaaand this is similar to blockstore_init.cpp - maybe dedup it? + auto dirty_it = dirty_db.lower_bound((obj_ver_id){ + .oid = v->oid, + .version = UINT64_MAX, + }); + uint64_t max_unstable = 0; + while (dirty_it != dirty_db.begin()) + { + dirty_it--; + if (dirty_it->first.oid != v->oid) + break; + else if (dirty_it->first.version <= v->version) + { + if (!IS_STABLE(dirty_it->second.state)) + max_unstable = dirty_it->first.version; + break; + } + } + auto unstab_it = unstable_writes.find(v->oid); + if (unstab_it != unstable_writes.end()) + { + if (max_unstable == 0) + unstable_writes.erase(unstab_it); + else + unstab_it->second = max_unstable; + } + journal_entry_rollback *je = (journal_entry_rollback*) + prefill_single_journal_entry(journal, JE_ROLLBACK, sizeof(journal_entry_rollback)); + journal.sector_info[journal.cur_sector].dirty = false; + je->oid = v->oid; + je->version = v->version; + je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; + if (cur_sector != journal.cur_sector) + { + if (cur_sector == -1) + PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector; + cur_sector = journal.cur_sector; + prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb); + } + } + PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops = s; + return 1; +} + +void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t *op) +{ + live = true; + if (data->res != data->iov.iov_len) + { + throw std::runtime_error( + "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ + "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" + ); + } + PRIV(op)->pending_ops--; + if (PRIV(op)->pending_ops == 0) + { + // Release used journal sectors + release_journal_sectors(op); + obj_ver_id* v; + int i; + for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) + { + // Erase dirty_db entries + auto rm_end = dirty_db.lower_bound((obj_ver_id){ + .oid = v->oid, + .version = UINT64_MAX, + }); + rm_end--; + auto rm_start = rm_end; + while (1) + { + if (rm_end->first.oid != v->oid) + break; + else if (rm_end->first.version <= v->version) + break; + rm_start = rm_end; + if (rm_end == dirty_db.begin()) + break; + rm_end--; + } + if (rm_end != rm_start) + erase_dirty(rm_start, rm_end, UINT64_MAX); + } + journal.trim(); + // Acknowledge op + op->retval = 0; + FINISH_OP(op); + } +} + +void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc) +{ + auto dirty_it = dirty_end; + while (dirty_it != dirty_start) + { + dirty_it--; + if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) + { +#ifdef BLOCKSTORE_DEBUG + printf("Free block %lu\n", dirty_it->second.location >> block_order); +#endif + data_alloc->set(dirty_it->second.location >> block_order, false); + } +#ifdef BLOCKSTORE_DEBUG + printf("remove usage of journal offset %lu by %lu:%lu v%lu\n", dirty_it->second.journal_sector, + dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version); +#endif + int used = --journal.used_sectors[dirty_it->second.journal_sector]; + if (used == 0) + { + journal.used_sectors.erase(dirty_it->second.journal_sector); + } + } + dirty_db.erase(dirty_start, dirty_end); +} diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index d98e959c..781fe58b 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -145,7 +145,7 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t * { // Release used journal sectors release_journal_sectors(op); - // First step: mark dirty_db entries as stable, acknowledge op completion + // Mark dirty_db entries as stable, acknowledge op completion obj_ver_id* v; int i; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 1e83f9f2..00ca6d0c 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -3,7 +3,7 @@ void blockstore_impl_t::enqueue_write(blockstore_op_t *op) { // Check or assign version number - bool found = false, deleted = false, is_del = (op->opcode & BS_OP_TYPE_MASK) == BS_OP_DELETE; + bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE); uint64_t version = 1; if (dirty_db.size() > 0) { diff --git a/osd_exec_secondary.cpp b/osd_exec_secondary.cpp index d7f68e32..26cc2518 100644 --- a/osd_exec_secondary.cpp +++ b/osd_exec_secondary.cpp @@ -139,5 +139,9 @@ void osd_t::make_reply(osd_op_t *op) op->reply.hdr.retval = op->bs_op.retval; if (op->op.hdr.opcode == OSD_OP_SECONDARY_LIST) op->reply.sec_list.stable_count = op->bs_op.version; + else if (op->op.hdr.opcode == OSD_OP_SECONDARY_WRITE) + op->reply.sec_rw.version = op->bs_op.version; + else if (op->op.hdr.opcode == OSD_OP_SECONDARY_DELETE) + op->reply.sec_del.version = op->bs_op.version; } } diff --git a/osd_ops.h b/osd_ops.h index 204ed36b..a87a4b2f 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -64,6 +64,8 @@ struct __attribute__((__packed__)) osd_op_secondary_rw_t struct __attribute__((__packed__)) osd_reply_secondary_rw_t { osd_reply_header_t header; + // for writes: assigned version number + uint64_t version; }; // delete object on the secondary OSD @@ -79,6 +81,7 @@ struct __attribute__((__packed__)) osd_op_secondary_del_t struct __attribute__((__packed__)) osd_reply_secondary_del_t { osd_reply_header_t header; + uint64_t version; }; // sync to the secondary OSD