From 3bfa2f5f39b2976b3f11e499ce1da3607acd8b7f Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 19 Nov 2019 18:07:40 +0300 Subject: [PATCH] Fix io_uring submission, journal sector selection --- blockstore.cpp | 10 ++++----- blockstore_journal.cpp | 50 ++++++++++++++++++++++++++++++++++++++---- blockstore_journal.h | 26 ++-------------------- blockstore_sync.cpp | 3 ++- test_blockstore.cpp | 47 +++++++++++++++++++++++++-------------- 5 files changed, 86 insertions(+), 50 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index 4a9a77ed..6377cd09 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -154,11 +154,6 @@ void blockstore::loop() } if (dequeue_op) { - int ret = ringloop->submit(); - if (ret < 0) - { - throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); - } submit_queue.erase(op_ptr); } else @@ -173,6 +168,11 @@ void blockstore::loop() } } flusher->loop(); + int ret = ringloop->submit(); + if (ret < 0) + { + throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); + } } } diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index fec6f2be..937c19ec 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -12,6 +12,7 @@ blockstore_journal_check_t::blockstore_journal_check_t(blockstore *bs) // Check if we can write entries of bytes and data bytes after them to the journal int blockstore_journal_check_t::check_available(blockstore_operation *op, int required, int size, int data_after) { + bool wrapped = false; while (1) { int fits = (512 - next_in_pos) / size; @@ -22,11 +23,21 @@ int blockstore_journal_check_t::check_available(blockstore_operation *op, int re sectors_required++; } if (required <= 0) + { break; - next_pos = (next_pos+512) < bs->journal.len ? next_pos+512 : 512; - next_sector = ((next_sector + 1) % bs->journal.sector_count); + } + next_pos = next_pos+512; + if (next_pos >= bs->journal.len) + { + next_pos = 512; + wrapped = true; + } next_in_pos = 0; if (bs->journal.sector_info[next_sector].usage_count > 0) + { + next_sector = ((next_sector + 1) % bs->journal.sector_count); + } + if (bs->journal.sector_info[next_sector].usage_count > 0) { // No memory buffer available. Wait for it. op->wait_for = WAIT_JOURNAL_BUFFER; @@ -35,9 +46,14 @@ int blockstore_journal_check_t::check_available(blockstore_operation *op, int re } if (data_after > 0) { - next_pos = (bs->journal.len - next_pos < data_after ? 512 : next_pos) + data_after; + next_pos = next_pos + data_after; + if (next_pos > bs->journal.len) + { + wrapped = true; + next_pos = 512 + data_after; + } } - if (next_pos >= bs->journal.used_start) + if (wrapped && next_pos >= bs->journal.used_start) { // No space in the journal. Wait for it. op->wait_for = WAIT_JOURNAL; @@ -47,6 +63,32 @@ int blockstore_journal_check_t::check_available(blockstore_operation *op, int re return 1; } +journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size) +{ + if (512 - journal.in_sector_pos < size) + { + // Move to the next journal sector + if (journal.sector_info[journal.cur_sector].usage_count > 0) + { + // Also select next sector buffer in memory + journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); + } + journal.sector_info[journal.cur_sector].offset = journal.next_free; + journal.in_sector_pos = 0; + journal.next_free = (journal.next_free+512) < journal.len ? journal.next_free + 512 : 512; + memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); + } + journal_entry *je = (struct journal_entry*)( + journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos + ); + journal.in_sector_pos += size; + je->magic = JOURNAL_MAGIC; + je->type = type; + je->size = size; + je->crc32_prev = journal.crc32_last; + return je; +} + void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb) { journal.sector_info[journal.cur_sector].usage_count++; diff --git a/blockstore_journal.h b/blockstore_journal.h index 9a21ada4..81933ec9 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -120,7 +120,7 @@ struct journal_t journal_sector_info_t *sector_info; uint64_t sector_count; int cur_sector = 0; - int in_sector_pos = 0; + int in_sector_pos = 512; // no free space because sector is initially inmapped // Used sector map // May use ~ 80 MB per 1 GB of used journal space in the worst case @@ -137,28 +137,6 @@ struct blockstore_journal_check_t int check_available(blockstore_operation *op, int required, int size, int data_after); }; -inline journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size) -{ - if (512 - journal.in_sector_pos < size) - { - // Move to the next journal sector - // Also select next sector buffer in memory - journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count); - journal.sector_info[journal.cur_sector].offset = journal.next_free; - journal.in_sector_pos = 0; - journal.next_free = (journal.next_free+512) < journal.len ? journal.next_free + 512 : 512; - memset(journal.sector_buf + 512*journal.cur_sector, 0, 512); - } - journal_entry *je = (struct journal_entry*)( - journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos - ); - journal.in_sector_pos += size; - je->magic = JOURNAL_MAGIC; - je->type = type; - je->size = size; - je->crc32_prev = journal.crc32_last; - return je; -} +journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size); -// FIXME: make inline void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function cb); diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 1ea6d009..4af5471a 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -101,6 +101,7 @@ int blockstore::continue_sync(blockstore_operation *op) data->callback = cb; op->pending_ops = 1 + s; op->sync_state = SYNC_JOURNAL_SYNC_SENT; + ringloop->submit(); } else { @@ -150,12 +151,12 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op) { dirty_db[*it].state = ST_J_SYNCED; } + ack_sync(op); } else { throw std::runtime_error("BUG: unexpected sync op state"); } - ack_sync(op); } } diff --git a/test_blockstore.cpp b/test_blockstore.cpp index 96b3e6f6..3c6dd301 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -82,30 +82,45 @@ int main(int narg, char *args[]) { printf("tick 1s\n"); }); + blockstore_operation op; - op.flags = OP_WRITE; - op.oid = { .inode = 1, .stripe = 0 }; - op.version = 0; - op.offset = 4096; - op.len = 4096; - op.buf = (uint8_t*)memalign(512, 4096); - memset(op.buf, 0xaa, 4096); - op.callback = [](blockstore_operation *op) - { - printf("completed %d\n", op->retval); - }; + int main_state = 0; ring_consumer_t main_cons; - bool bs_was_done = false; + op.callback = [&](blockstore_operation *op) + { + printf("op completed %d\n", op->retval); + if (main_state == 1) + main_state = 2; + else if (main_state == 3) + main_state = 4; + }; main_cons.loop = [&]() { - bool bs_done = bs->is_started(); - if (bs_done && !bs_was_done) + if (main_state == 0) { - printf("init completed\n"); + if (bs->is_started()) + { + printf("init completed\n"); + op.flags = OP_WRITE; + op.oid = { .inode = 1, .stripe = 0 }; + op.version = 0; + op.offset = 4096; + op.len = 4096; + op.buf = (uint8_t*)memalign(512, 4096); + memset(op.buf, 0xaa, 4096); + bs->enqueue_op(&op); + main_state = 1; + } + } + else if (main_state == 2) + { + printf("syncing\n"); + op.flags = OP_SYNC; bs->enqueue_op(&op); - bs_was_done = true; + main_state = 3; } }; + ringloop->register_consumer(main_cons); while (true) {