From 45f34fb3b2fa484f314ad6cf2df7debf07dc9ed1 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 28 Nov 2019 22:36:38 +0300 Subject: [PATCH] Fix linear overwrite, make metadata writes ordered, ignore older entries when recovering journal --- allocator.cpp | 4 -- blockstore_flush.cpp | 14 ++++--- blockstore_init.cpp | 97 +++++++++++++++++++++++++++----------------- blockstore_init.h | 3 +- blockstore_write.cpp | 3 ++ fio_engine.cpp | 15 ++++++- 6 files changed, 87 insertions(+), 49 deletions(-) diff --git a/allocator.cpp b/allocator.cpp index 9eedf3510..d41c55824 100644 --- a/allocator.cpp +++ b/allocator.cpp @@ -71,10 +71,6 @@ void allocator::set(uint64_t addr, bool value) else { mask[last] = mask[last] & ~(1l << bit); - if (mask[last] != 0) - { - break; - } } is_last = false; if (p2 > 1) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 297095176..976a02a7e 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -340,11 +340,11 @@ resume_0: ); wait_count++; } - // And a metadata write resume_5: - if (meta_it->second.state == 0) + // And a metadata write, but only after data writes complete + if (meta_it->second.state == 0 || wait_count > 0) { - // metadata sector is still being read, wait for it + // metadata sector is still being read or data is still being written, wait for it wait_state = 5; return; } @@ -352,8 +352,6 @@ resume_0: .oid = cur.oid, .version = cur.version, }; - // I consider unordered writes to data & metadata safe here - // BUT it requires that journal entries even older than clean_db are replayed after restart await_sqe(6); data->iov = (struct iovec){ meta_it->second.buf, 512 }; data->callback = simple_callback_w; @@ -432,6 +430,9 @@ resume_0: // Update clean_db and dirty_db, free old data locations if (old_clean_loc != clean_loc) { +#ifdef BLOCKSTORE_DEBUG + printf("Free block %lu\n", old_clean_loc >> bs->block_order); +#endif bs->data_alloc->set(old_clean_loc >> bs->block_order, false); } bs->clean_db[cur.oid] = { @@ -443,6 +444,9 @@ resume_0: { if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) { +#ifdef BLOCKSTORE_DEBUG + printf("Free block %lu\n", dirty_it->second.location >> bs->block_order); +#endif bs->data_alloc->set(dirty_it->second.location >> bs->block_order, false); } #ifdef BLOCKSTORE_DEBUG diff --git a/blockstore_init.cpp b/blockstore_init.cpp index f526099c2..a43180196 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -85,23 +85,30 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int if (entries[i].oid.inode > 0) { auto clean_it = bs->clean_db.find(entries[i].oid); -#ifdef BLOCKSTORE_DEBUG - printf("Clean entry %u: %lu:%lu v%lu\n", done_cnt+i, entries[i].oid.inode, entries[i].oid.stripe, entries[i].version); -#endif if (clean_it == bs->clean_db.end() || clean_it->second.version < entries[i].version) { if (clean_it != bs->clean_db.end()) { // free the previous block +#ifdef BLOCKSTORE_DEBUG + printf("Free block %lu\n", clean_it->second.location >> bs->block_order); +#endif bs->data_alloc->set(clean_it->second.location >> block_order, false); } entries_loaded++; +#ifdef BLOCKSTORE_DEBUG + printf("Allocate block (clean entry) %lu: %lu:%lu v%lu\n", done_cnt+i, entries[i].oid.inode, entries[i].oid.stripe, entries[i].version); +#endif bs->data_alloc->set(done_cnt+i, true); bs->clean_db[entries[i].oid] = (struct clean_entry){ .version = entries[i].version, .location = (done_cnt+i) << block_order, }; } +#ifdef BLOCKSTORE_DEBUG + else + printf("Old clean entry %lu: %lu:%lu v%lu\n", done_cnt+i, entries[i].oid.inode, entries[i].oid.stripe, entries[i].version); +#endif } } } @@ -286,6 +293,7 @@ resume_1: } } } + // FIXME Trim journal on start so we don't stall when all entries are older printf("Journal entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count); if (!bs->journal.inmemory) { @@ -356,50 +364,63 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) snprintf(err, 1024, "BUG: calculated journal data offset (%lu) != stored journal data offset (%lu)", location, je->small_write.data_offset); throw std::runtime_error(err); } - obj_ver_id ov = { - .oid = je->small_write.oid, - .version = je->small_write.version, - }; + auto clean_it = bs->clean_db.find(je->small_write.oid); + if (clean_it == bs->clean_db.end() || + clean_it->second.version < je->big_write.version) + { + obj_ver_id ov = { + .oid = je->small_write.oid, + .version = je->small_write.version, + }; #ifdef BLOCKSTORE_DEBUG - printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len); + printf("je_small_write oid=%lu:%lu ver=%lu offset=%u len=%u\n", ov.oid.inode, ov.oid.stripe, ov.version, je->small_write.offset, je->small_write.len); #endif - bs->dirty_db.emplace(ov, (dirty_entry){ - .state = ST_J_SYNCED, - .flags = 0, - .location = location, - .offset = je->small_write.offset, - .len = je->small_write.len, - .journal_sector = proc_pos, - }); - bs->journal.used_sectors[proc_pos]++; + bs->dirty_db.emplace(ov, (dirty_entry){ + .state = ST_J_SYNCED, + .flags = 0, + .location = location, + .offset = je->small_write.offset, + .len = je->small_write.len, + .journal_sector = proc_pos, + }); + bs->journal.used_sectors[proc_pos]++; #ifdef BLOCKSTORE_DEBUG - printf("journal offset %lu is used by %lu:%lu v%lu\n", proc_pos, ov.oid.inode, ov.oid.stripe, ov.version); + printf("journal offset %lu is used by %lu:%lu v%lu\n", proc_pos, ov.oid.inode, ov.oid.stripe, ov.version); #endif - auto & unstab = bs->unstable_writes[ov.oid]; - unstab = unstab < ov.version ? ov.version : unstab; + auto & unstab = bs->unstable_writes[ov.oid]; + unstab = unstab < ov.version ? ov.version : unstab; + } } else if (je->type == JE_BIG_WRITE) { - // oid, version, block - obj_ver_id ov = { - .oid = je->big_write.oid, - .version = je->big_write.version, - }; + auto clean_it = bs->clean_db.find(je->big_write.oid); + if (clean_it == bs->clean_db.end() || + clean_it->second.version < je->big_write.version) + { + // oid, version, block + obj_ver_id ov = { + .oid = je->big_write.oid, + .version = je->big_write.version, + }; #ifdef BLOCKSTORE_DEBUG - printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); + printf("je_big_write oid=%lu:%lu ver=%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); #endif - bs->dirty_db.emplace(ov, (dirty_entry){ - .state = ST_D_META_SYNCED, - .flags = 0, - .location = je->big_write.location, - .offset = 0, - .len = bs->block_size, - .journal_sector = proc_pos, - }); - bs->data_alloc->set(je->big_write.location >> bs->block_order, true); - bs->journal.used_sectors[proc_pos]++; - auto & unstab = bs->unstable_writes[ov.oid]; - unstab = unstab < ov.version ? ov.version : unstab; + bs->dirty_db.emplace(ov, (dirty_entry){ + .state = ST_D_META_SYNCED, + .flags = 0, + .location = je->big_write.location, + .offset = 0, + .len = bs->block_size, + .journal_sector = proc_pos, + }); +#ifdef BLOCKSTORE_DEBUG + printf("Allocate block %lu\n", je->big_write.location >> bs->block_order); +#endif + bs->data_alloc->set(je->big_write.location >> bs->block_order, true); + bs->journal.used_sectors[proc_pos]++; + auto & unstab = bs->unstable_writes[ov.oid]; + unstab = unstab < ov.version ? ov.version : unstab; + } } else if (je->type == JE_STABLE) { diff --git a/blockstore_init.h b/blockstore_init.h index e85705fdb..8b17657cc 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -6,7 +6,8 @@ class blockstore_init_meta int wait_state = 0, wait_count = 0; uint8_t *metadata_buffer = NULL; uint64_t metadata_read = 0; - int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0; + int prev = 0, prev_done = 0, done_len = 0, submitted = 0; + uint64_t done_cnt = 0; uint64_t entries_loaded = 0; struct io_uring_sqe *sqe; struct ring_data_t *data; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 498607bcd..a6e078465 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -73,6 +73,9 @@ int blockstore::dequeue_write(blockstore_operation *op) BS_SUBMIT_GET_SQE(sqe, data); dirty_it->second.location = loc << block_order; dirty_it->second.state = ST_D_SUBMITTED; +#ifdef BLOCKSTORE_DEBUG + printf("Allocate block %lu\n", loc); +#endif data_alloc->set(loc, true); int vcnt = 0; if (op->version == 1 && op->len != block_size) diff --git a/fio_engine.cpp b/fio_engine.cpp index 8743b9271..416ade0e2 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -1,6 +1,19 @@ // FIO engine to test Blockstore +// +// Random write: +// // fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \ -// -data_device=./test_data.bin -meta_device=./test_meta.bin -journal_device=./test_journal.bin -size=1G +// -data_device=./test_data.bin -meta_device=./test_meta.bin -journal_device=./test_journal.bin -size=1000M +// +// Linear write: +// +// fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \ +// -data_device=./test_data.bin -meta_device=./test_meta.bin -journal_device=./test_journal.bin -size=1000M +// +// Random read (run with -iodepth=32 or -iodepth=1): +// +// fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \ +// -data_device=./test_data.bin -meta_device=./test_meta.bin -journal_device=./test_journal.bin -size=1000M #include "blockstore.h" extern "C" {