From 82cf0a170eabece1b50cdb7e067699c43e265183 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 5 Nov 2019 02:43:21 +0300 Subject: [PATCH] Port everything to ring_loop --- blockstore.cpp | 121 +++++++++++++-------------- blockstore.h | 21 ++--- blockstore_init.cpp | 193 +++++++++++++++++++++----------------------- blockstore_init.h | 10 +-- blockstore_read.cpp | 8 +- ringloop.h | 8 +- 6 files changed, 173 insertions(+), 188 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index 97e6845e..5c0d6590 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -1,9 +1,11 @@ #include "blockstore.h" -blockstore::blockstore(spp::sparse_hash_map & config, io_uring *ring) +blockstore::blockstore(spp::sparse_hash_map & config, ring_loop_t *ringloop) { - this->ring = ring; - ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring->sq.ring_sz); + this->ringloop = ringloop; + ring_consumer.handle_event = [this](ring_data_t *d) { handle_event(d); }; + ring_consumer.loop = [this]() { loop(); }; + ringloop->register_consumer(ring_consumer); initialized = 0; block_order = stoull(config["block_size_order"]); block_size = 1 << block_order; @@ -36,7 +38,7 @@ blockstore::blockstore(spp::sparse_hash_map & config, blockstore::~blockstore() { - free(ring_data); + ringloop->unregister_consumer(ring_consumer.number); if (data_fd >= 0) close(data_fd); if (meta_fd >= 0 && meta_fd != data_fd) @@ -45,68 +47,61 @@ blockstore::~blockstore() close(journal_fd); } -struct io_uring_sqe* blockstore::get_sqe() +// main event loop - handle requests +void blockstore::handle_event(ring_data_t *data) { - struct io_uring_sqe* sqe = io_uring_get_sqe(ring); - if (sqe) + if (initialized != 0) { - io_uring_sqe_set_data(sqe, ring_data + (sqe - ring->sq.sqes)); - } - return sqe; -} - -// must be called in the event loop until it returns 0 -int blockstore::init_loop() -{ - // read metadata, then journal - if (initialized) - { - return 0; - } - if (!metadata_init_reader) - { - metadata_init_reader = new blockstore_init_meta(this); - } - if (metadata_init_reader->read_loop()) - { - return 1; - } - if (!journal_init_reader) - { - journal_init_reader = new blockstore_init_journal(this); - } - if (journal_init_reader->read_loop()) - { - return 1; - } - initialized = true; - delete metadata_init_reader; - delete journal_init_reader; - metadata_init_reader = NULL; - journal_init_reader = NULL; - return 0; -} - -// main event loop -int blockstore::main_loop() -{ - while (true) - { - struct io_uring_cqe *cqe; - io_uring_peek_cqe(ring, &cqe); - if (cqe) + if (metadata_init_reader) { - struct ring_data *d = cqe->user_data; - if (d->source == SRC_BLOCKSTORE) - { - handle_event(); - } - else - { - // someone else - } - io_uring_cqe_seen(ring, cqe); + metadata_init_reader->handle_event(data); + } + else if (journal_init_reader) + { + journal_init_reader->handle_event(data); } } - return 0; + else + { + + } +} + +// main event loop - produce requests +void blockstore::loop() +{ + if (initialized != 10) + { + // read metadata, then journal + if (initialized == 0) + { + metadata_init_reader = new blockstore_init_meta(this); + initialized = 1; + } + else if (initialized == 1) + { + int res = metadata_init_reader->loop(); + if (!res) + { + delete metadata_init_reader; + metadata_init_reader = NULL; + journal_init_reader = new blockstore_init_journal(this); + initialized = 2; + } + } + else if (initialized == 2) + { + int res = journal_init_reader->loop(); + if (!res) + { + delete journal_init_reader; + journal_init_reader = NULL; + initialized = 10; + } + } + } + else + { + + } } diff --git a/blockstore.h b/blockstore.h index 18787234..551fbe04 100644 --- a/blockstore.h +++ b/blockstore.h @@ -17,9 +17,11 @@ #include #include -#include "allocator.h" #include "sparsepp/sparsepp/spp.h" +#include "allocator.h" +#include "ringloop.h" + // States are not stored on disk. Instead, they're deduced from the journal #define ST_IN_FLIGHT 1 @@ -140,19 +142,13 @@ struct blockstore_operation uint64_t wait_version; }; -/*struct ring_data_t -{ - uint64_t source; - struct iovec iov; // for single-entry read/write operations - void *op; -};*/ - class blockstore; #include "blockstore_init.h" class blockstore { + struct ring_consumer_t ring_consumer; public: spp::sparse_hash_map object_db; spp::sparse_hash_map dirty_queue; @@ -173,12 +169,11 @@ public: uint64_t journal_start, journal_end; uint32_t journal_crc32_last; - struct io_uring *ring; - struct ring_data_t *ring_data; + ring_loop_t *ringloop; struct io_uring_sqe* get_sqe(); - blockstore(spp::sparse_hash_map & config, struct io_uring *ring); + blockstore(spp::sparse_hash_map & config, ring_loop_t *ringloop); ~blockstore(); void calc_lengths(spp::sparse_hash_map & config); @@ -191,10 +186,10 @@ public: int metadata_buf_size; blockstore_init_meta* metadata_init_reader; blockstore_init_journal* journal_init_reader; - int init_loop(); // Event loop - int main_loop(); + void handle_event(ring_data_t* data); + void loop(); // Read int read(blockstore_operation *read_op); diff --git a/blockstore_init.cpp b/blockstore_init.cpp index f00d73ad..5ecca4c0 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -5,7 +5,22 @@ blockstore_init_meta::blockstore_init_meta(blockstore *bs) this->bs = bs; } -int blockstore_init_meta::read_loop() +void blockstore_init_meta::handle_event(ring_data_t *data) +{ + if (data->res < 0) + { + throw new std::runtime_error( + std::string("read metadata failed at offset ") + std::to_string(metadata_read) + + std::string(": ") + strerror(-data->res) + ); + } + prev_done = data->res > 0 ? submitted : 0; + done_len = data->res; + metadata_read += data->res; + submitted = 0; +} + +int blockstore_init_meta::loop() { if (metadata_read >= bs->meta_len) { @@ -15,39 +30,20 @@ int blockstore_init_meta::read_loop() { metadata_buffer = (uint8_t*)memalign(512, 2*bs->metadata_buf_size); } - if (submitted) - { - struct io_uring_cqe *cqe; - io_uring_peek_cqe(bs->ring, &cqe); - if (cqe) - { - if (cqe->res < 0) - { - throw new std::runtime_error( - std::string("read metadata failed at offset ") + std::to_string(metadata_read) + - std::string(": ") + strerror(-cqe->res) - ); - } - prev_done = cqe->res > 0 ? submitted : 0; - done_len = cqe->res; - metadata_read += cqe->res; - submitted = 0; - io_uring_cqe_seen(bs->ring, cqe); - } - } if (!submitted) { - struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); + struct io_uring_sqe *sqe = bs->ringloop->get_sqe(); if (!sqe) { throw new std::runtime_error("io_uring is full while trying to read metadata"); } - submit_iov = { + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + data->iov = { metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0), bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read, }; - io_uring_prep_readv(sqe, bs->meta_fd, &submit_iov, 1, bs->meta_offset + metadata_read); - io_uring_submit(bs->ring); + io_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read); + bs->ringloop->submit(); submitted = (prev == 1 ? 2 : 1); prev = submitted; } @@ -101,7 +97,67 @@ bool iszero(uint64_t *buf, int len) return true; } -int blockstore_init_journal::read_loop() +void blockstore_init_journal::handle_event(ring_data_t *data) +{ + if (step == 1) + { + // Step 1: Read first block of the journal + if (data->res < 0) + { + throw new std::runtime_error( + std::string("read journal failed at offset ") + std::to_string(0) + + std::string(": ") + strerror(-data->res) + ); + } + if (iszero((uint64_t*)journal_buffer, 3)) + { + // Journal is empty + bs->journal_start = 512; + bs->journal_end = 512; + step = 99; + } + else + { + // First block always contains a single JE_START entry + journal_entry_start *je = (journal_entry_start*)journal_buffer; + if (je->magic != JOURNAL_MAGIC || + je->type != JE_START || + je->size != sizeof(journal_entry_start) || + je_crc32((journal_entry*)je) != je->crc32) + { + // Entry is corrupt + throw new std::runtime_error("first entry of the journal is corrupt"); + } + journal_pos = bs->journal_start = je->journal_start; + crc32_last = je->crc32_replaced; + step = 2; + } + } + else if (step == 2 || step == 3) + { + // Step 3: Read journal + if (data->res < 0) + { + throw new std::runtime_error( + std::string("read journal failed at offset ") + std::to_string(journal_pos) + + std::string(": ") + strerror(-data->res) + ); + } + done_pos = journal_pos; + done_buf = submitted; + done_len = data->res; + journal_pos += data->res; + if (journal_pos >= bs->journal_len) + { + // Continue from the beginning + journal_pos = 512; + wrapped = true; + } + submitted = 0; + } +} + +int blockstore_init_journal::loop() { if (step == 100) { @@ -114,86 +170,20 @@ int blockstore_init_journal::read_loop() if (step == 0) { // Step 1: Read first block of the journal - struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); + struct io_uring_sqe *sqe = bs->ringloop->get_sqe(); if (!sqe) { throw new std::runtime_error("io_uring is full while trying to read journal"); } - submit_iov = { journal_buffer, 512 }; - io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset); - io_uring_submit(bs->ring); + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + data->iov = { journal_buffer, 512 }; + io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset); + bs->ringloop->submit(); step = 1; } - if (step == 1) - { - // Step 2: Get the completion event and check the beginning for entry - struct io_uring_cqe *cqe; - io_uring_peek_cqe(bs->ring, &cqe); - if (cqe) - { - if (cqe->res < 0) - { - throw new std::runtime_error( - std::string("read journal failed at offset ") + std::to_string(0) + - std::string(": ") + strerror(-cqe->res) - ); - } - if (iszero((uint64_t*)journal_buffer, 3)) - { - // Journal is empty - bs->journal_start = 512; - bs->journal_end = 512; - step = 99; - } - else - { - // First block always contains a single JE_START entry - journal_entry_start *je = (journal_entry_start*)journal_buffer; - if (je->magic != JOURNAL_MAGIC || - je->type != JE_START || - je->size != sizeof(journal_entry_start) || - je_crc32((journal_entry*)je) != je->crc32) - { - // Entry is corrupt - throw new std::runtime_error("first entry of the journal is corrupt"); - } - journal_pos = bs->journal_start = je->journal_start; - crc32_last = je->crc32_replaced; - step = 2; - } - io_uring_cqe_seen(bs->ring, cqe); - } - } if (step == 2 || step == 3) { // Step 3: Read journal - if (submitted) - { - struct io_uring_cqe *cqe; - io_uring_peek_cqe(bs->ring, &cqe); - if (cqe) - { - if (cqe->res < 0) - { - throw new std::runtime_error( - std::string("read journal failed at offset ") + std::to_string(journal_pos) + - std::string(": ") + strerror(-cqe->res) - ); - } - done_pos = journal_pos; - done_buf = submitted; - done_len = cqe->res; - journal_pos += cqe->res; - if (journal_pos >= bs->journal_len) - { - // Continue from the beginning - journal_pos = 512; - wrapped = true; - } - submitted = 0; - io_uring_cqe_seen(bs->ring, cqe); - } - } if (!submitted) { if (step != 3) @@ -204,22 +194,23 @@ int blockstore_init_journal::read_loop() } else { - struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring); + struct io_uring_sqe *sqe = bs->ringloop->get_sqe(); if (!sqe) { throw new std::runtime_error("io_uring is full while trying to read journal"); } + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); uint64_t end = bs->journal_len; if (journal_pos < bs->journal_start) { end = bs->journal_start; } - submit_iov = { + data->iov = { journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0), end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE, }; - io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset + journal_pos); - io_uring_submit(bs->ring); + io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset + journal_pos); + bs->ringloop->submit(); submitted = done_buf == 1 ? 2 : 1; } } diff --git a/blockstore_init.h b/blockstore_init.h index e240ee93..477f5a67 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -5,12 +5,12 @@ class blockstore_init_meta blockstore *bs; uint8_t *metadata_buffer = NULL; uint64_t metadata_read = 0; - struct iovec submit_iov; int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0; void handle_entries(struct clean_disk_entry* entries, int count); public: - blockstore_init_meta(blockstore* bs); - int read_loop(); + blockstore_init_meta(blockstore *bs); + void handle_event(ring_data_t *data); + int loop(); }; class blockstore_init_journal @@ -19,7 +19,6 @@ class blockstore_init_journal uint8_t *journal_buffer = NULL; int step = 0; uint32_t crc32_last = 0; - struct iovec submit_iov; uint64_t done_pos = 0, journal_pos = 0; uint64_t cur_skip = 0; bool wrapped = false; @@ -27,5 +26,6 @@ class blockstore_init_journal int handle_journal_part(void *buf, uint64_t len); public: blockstore_init_journal(blockstore* bs); - int read_loop(); + void handle_event(ring_data_t *data); + int loop(); }; diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 8bdaf91a..93bb0aad 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -87,7 +87,7 @@ int blockstore::read(blockstore_operation *read_op) read_op->callback(read_op); return 0; } - unsigned prev_sqe_pos = ring->sq.sqe_tail; + unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; uint64_t fulfilled = 0; if (dirty_it != object_db.end()) { @@ -99,7 +99,7 @@ int blockstore::read(blockstore_operation *read_op) if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0) { // need to wait for something, undo added requests and requeue op - ring->sq.sqe_tail = prev_sqe_pos; + ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); submit_queue.push_front(read_op); return 0; @@ -112,7 +112,7 @@ int blockstore::read(blockstore_operation *read_op) if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0) { // need to wait for something, undo added requests and requeue op - ring->sq.sqe_tail = prev_sqe_pos; + ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); // FIXME: bad implementation submit_queue.push_front(read_op); @@ -127,7 +127,7 @@ int blockstore::read(blockstore_operation *read_op) return 0; } // FIXME reap events! - int ret = io_uring_submit(ring); + int ret = ringloop->submit(); if (ret < 0) { throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); diff --git a/ringloop.h b/ringloop.h index c066675e..69f7d7c2 100644 --- a/ringloop.h +++ b/ringloop.h @@ -21,8 +21,8 @@ struct ring_data_t struct ring_consumer_t { int number; - std::function handle_event; - std::function loop; + std::function handle_event; + std::function loop; }; class ring_loop_t @@ -37,4 +37,8 @@ public: int register_consumer(ring_consumer_t & consumer); void unregister_consumer(int number); void loop(bool sleep); + inline int submit() + { + return io_uring_submit(ring); + } };