From 351366d2281b8a6eaa63679f892d547287d07910 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 5 Nov 2019 02:12:04 +0300 Subject: [PATCH] Add ring_loop --- Makefile | 2 +- blockstore.cpp | 36 ++++++++++++++++++++++ blockstore.h | 17 +++++++++-- blockstore_init.cpp | 3 ++ blockstore_read.cpp | 41 ++++++++++++------------- ringloop.cpp | 73 +++++++++++++++++++++++++++++++++++++++++++++ ringloop.h | 40 +++++++++++++++++++++++++ test.cpp | 1 + 8 files changed, 190 insertions(+), 23 deletions(-) create mode 100644 ringloop.cpp create mode 100644 ringloop.h diff --git a/Makefile b/Makefile index 51f82cf7..798ea4be 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o crc32c.o test +all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o crc32c.o ringloop.o test crc32c.o: crc32c.c gcc -c -o $@ $< %.o: %.cpp diff --git a/blockstore.cpp b/blockstore.cpp index 133f841b..97e6845e 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -3,6 +3,7 @@ blockstore::blockstore(spp::sparse_hash_map & config, io_uring *ring) { this->ring = ring; + ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring->sq.ring_sz); initialized = 0; block_order = stoull(config["block_size_order"]); block_size = 1 << block_order; @@ -35,6 +36,7 @@ blockstore::blockstore(spp::sparse_hash_map & config, blockstore::~blockstore() { + free(ring_data); if (data_fd >= 0) close(data_fd); if (meta_fd >= 0 && meta_fd != data_fd) @@ -43,6 +45,16 @@ blockstore::~blockstore() close(journal_fd); } +struct io_uring_sqe* blockstore::get_sqe() +{ + struct io_uring_sqe* sqe = io_uring_get_sqe(ring); + if (sqe) + { + io_uring_sqe_set_data(sqe, ring_data + (sqe - ring->sq.sqes)); + } + return sqe; +} + // must be called in the event loop until it returns 0 int blockstore::init_loop() { @@ -74,3 +86,27 @@ int blockstore::init_loop() journal_init_reader = NULL; return 0; } + +// main event loop +int blockstore::main_loop() +{ + while (true) + { + struct io_uring_cqe *cqe; + io_uring_peek_cqe(ring, &cqe); + if (cqe) + { + struct ring_data *d = cqe->user_data; + if (d->source == SRC_BLOCKSTORE) + { + handle_event(); + } + else + { + // someone else + } + io_uring_cqe_seen(ring, cqe); + } + } + return 0; +} diff --git a/blockstore.h b/blockstore.h index c01ce067..18787234 100644 --- a/blockstore.h +++ b/blockstore.h @@ -140,6 +140,13 @@ struct blockstore_operation uint64_t wait_version; }; +/*struct ring_data_t +{ + uint64_t source; + struct iovec iov; // for single-entry read/write operations + void *op; +};*/ + class blockstore; #include "blockstore_init.h" @@ -167,6 +174,9 @@ public: uint32_t journal_crc32_last; struct io_uring *ring; + struct ring_data_t *ring_data; + + struct io_uring_sqe* get_sqe(); blockstore(spp::sparse_hash_map & config, struct io_uring *ring); ~blockstore(); @@ -183,10 +193,13 @@ public: blockstore_init_journal* journal_init_reader; int init_loop(); + // Event loop + int main_loop(); + // Read int read(blockstore_operation *read_op); - int fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end, + int fulfill_read(blockstore_operation *read_op, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location); - int fulfill_read_push(blockstore_operation & read_op, uint32_t item_start, + int fulfill_read_push(blockstore_operation *read_op, uint32_t item_start, uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end); }; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 02555175..f00d73ad 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -32,6 +32,7 @@ int blockstore_init_meta::read_loop() done_len = cqe->res; metadata_read += cqe->res; submitted = 0; + io_uring_cqe_seen(bs->ring, cqe); } } if (!submitted) @@ -160,6 +161,7 @@ int blockstore_init_journal::read_loop() crc32_last = je->crc32_replaced; step = 2; } + io_uring_cqe_seen(bs->ring, cqe); } } if (step == 2 || step == 3) @@ -189,6 +191,7 @@ int blockstore_init_journal::read_loop() wrapped = true; } submitted = 0; + io_uring_cqe_seen(bs->ring, cqe); } } if (!submitted) diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 51744679..8bdaf91a 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -1,6 +1,6 @@ #include "blockstore.h" -int blockstore::fulfill_read_push(blockstore_operation & read_op, uint32_t item_start, +int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_start, uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end) { if (cur_end > cur_start) @@ -8,49 +8,50 @@ int blockstore::fulfill_read_push(blockstore_operation & read_op, uint32_t item_ if (item_state == ST_IN_FLIGHT) { // Pause until it's written somewhere - read_op.wait_for = WAIT_IN_FLIGHT; - read_op.wait_version = item_version; + read_op->wait_for = WAIT_IN_FLIGHT; + read_op->wait_version = item_version; return -1; } else if (item_state == ST_DEL_WRITTEN || item_state == ST_DEL_SYNCED || item_state == ST_DEL_MOVED) { // item is unallocated - return zeroes - memset(read_op.buf + cur_start - read_op.offset, 0, cur_end - cur_start); + memset(read_op->buf + cur_start - read_op->offset, 0, cur_end - cur_start); return 0; } - struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + struct io_uring_sqe *sqe = get_sqe(); if (!sqe) { // Pause until there are more requests available - read_op.wait_for = WAIT_SQE; + read_op->wait_for = WAIT_SQE; return -1; } - read_op.read_vec[cur_start] = (struct iovec){ - read_op.buf + cur_start - read_op.offset, + read_op->read_vec[cur_start] = (struct iovec){ + read_op->buf + cur_start - read_op->offset, cur_end - cur_start }; + // Тут 2 вопроса - 1) куда сохранить iovec 2) как потом сопоставить i/o и cqe io_uring_prep_readv( sqe, IS_JOURNAL(item_state) ? journal_fd : data_fd, - // FIXME: &read_op.read_vec is forbidden - &read_op.read_vec[cur_start], 1, + // FIXME: &read_op->read_vec is forbidden + &read_op->read_vec[cur_start], 1, (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start ); - io_uring_sqe_set_data(sqe, 0/*read op link*/); + ((ring_data_t*)(sqe->user_data))->op = read_op; } return 0; } -int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start, uint32_t item_end, +int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location) { uint32_t cur_start = item_start; - if (cur_start < read_op.offset + read_op.len && item_end > read_op.offset) + if (cur_start < read_op->offset + read_op->len && item_end > read_op->offset) { - cur_start = cur_start < read_op.offset ? read_op.offset : cur_start; - item_end = item_end > read_op.offset + read_op.len ? read_op.offset + read_op.len : item_end; - auto fulfill_near = read_op.read_vec.lower_bound(cur_start); - if (fulfill_near != read_op.read_vec.begin()) + cur_start = cur_start < read_op->offset ? read_op->offset : cur_start; + item_end = item_end > read_op->offset + read_op->len ? read_op->offset + read_op->len : item_end; + auto fulfill_near = read_op->read_vec.lower_bound(cur_start); + if (fulfill_near != read_op->read_vec.begin()) { fulfill_near--; if (fulfill_near->first + fulfill_near->second.iov_len <= cur_start) @@ -58,7 +59,7 @@ int blockstore::fulfill_read(blockstore_operation & read_op, uint32_t item_start fulfill_near++; } } - while (fulfill_near != read_op.read_vec.end() && fulfill_near->first < item_end) + while (fulfill_near != read_op->read_vec.end() && fulfill_near->first < item_end) { if (fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, fulfill_near->first) < 0) { @@ -95,7 +96,7 @@ int blockstore::read(blockstore_operation *read_op) { if (read_op->flags == OP_READ_DIRTY || IS_STABLE(dirty[i].state)) { - if (fulfill_read(*read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0) + if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0) { // need to wait for something, undo added requests and requeue op ring->sq.sqe_tail = prev_sqe_pos; @@ -108,7 +109,7 @@ int blockstore::read(blockstore_operation *read_op) } if (clean_it != object_db.end()) { - if (fulfill_read(*read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0) + if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0) { // need to wait for something, undo added requests and requeue op ring->sq.sqe_tail = prev_sqe_pos; diff --git a/ringloop.cpp b/ringloop.cpp new file mode 100644 index 00000000..5b231aa4 --- /dev/null +++ b/ringloop.cpp @@ -0,0 +1,73 @@ +#include "ringloop.h" + +ring_loop_t::ring_loop_t(int qd) +{ + int ret = io_uring_queue_init(qd, ring, 0); + if (ret < 0) + { + throw new std::runtime_error(std::string("io_uring_queue_init: ") + strerror(-ret)); + } + ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring->sq.ring_sz); + if (!ring_data) + { + throw new std::bad_alloc(); + } +} + +ring_loop_t::~ring_loop_t() +{ + free(ring_data); +} + +struct io_uring_sqe* ring_loop_t::get_sqe() +{ + struct io_uring_sqe* sqe = io_uring_get_sqe(ring); + if (sqe) + { + io_uring_sqe_set_data(sqe, ring_data + (sqe - ring->sq.sqes)); + } + return sqe; +} + +int ring_loop_t::register_consumer(ring_consumer_t & consumer) +{ + consumer.number = consumers.size(); + consumers.push_back(consumer); + return consumer.number; +} + +void ring_loop_t::unregister_consumer(int number) +{ + if (number < consumers.size()) + { + consumers[number].handle_event = NULL; + consumers[number].loop = NULL; + } +} + +void ring_loop_t::loop(bool sleep) +{ + struct io_uring_cqe *cqe; + if (sleep) + { + io_uring_wait_cqe(ring, &cqe); + } + while ((io_uring_peek_cqe(ring, &cqe), cqe)) + { + struct ring_data_t *d = (struct ring_data_t*)cqe->user_data; + if (d->source < consumers.size()) + { + d->res = cqe->res; + ring_consumer_t & c = consumers[d->source]; + if (c.handle_event != NULL) + { + c.handle_event(d); + } + } + io_uring_cqe_seen(ring, cqe); + } + for (int i = 0; i < consumers.size(); i++) + { + consumers[i].loop(); + } +} diff --git a/ringloop.h b/ringloop.h new file mode 100644 index 00000000..c066675e --- /dev/null +++ b/ringloop.h @@ -0,0 +1,40 @@ +#pragma once + +#ifndef _LARGEFILE64_SOURCE +#define _LARGEFILE64_SOURCE +#endif + +#include +#include + +#include +#include + +struct ring_data_t +{ + uint64_t source; + struct iovec iov; // for single-entry read/write operations + int res; + void *op; +}; + +struct ring_consumer_t +{ + int number; + std::function handle_event; + std::function loop; +}; + +class ring_loop_t +{ + std::vector consumers; + struct ring_data_t *ring_data; +public: + struct io_uring *ring; + ring_loop_t(int qd); + ~ring_loop_t(); + struct io_uring_sqe* get_sqe(); + int register_consumer(ring_consumer_t & consumer); + void unregister_consumer(int number); + void loop(bool sleep); +}; diff --git a/test.cpp b/test.cpp index 79dd27ac..f006a3ce 100644 --- a/test.cpp +++ b/test.cpp @@ -41,6 +41,7 @@ static void test_write(struct io_uring *ring, int fd) printf("cqe failed: %d %s\n", ret, strerror(-ret)); else printf("result: %d\n", ret); + io_uring_cqe_seen(ring, cqe); free(buf); }