From 3f5ad167482ac1c6ad421ff244c75c0b040888c4 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 5 Nov 2019 14:10:23 +0300 Subject: [PATCH] Fix ringloop, implement first version of handle_event for reads --- Makefile | 2 ++ blockstore.cpp | 19 ++++++++++++++++++- blockstore.h | 9 +++++++-- blockstore_init.cpp | 6 +++--- blockstore_read.cpp | 19 +++++++++++-------- ringloop.cpp | 12 ++++++++++++ ringloop.h | 1 + 7 files changed, 54 insertions(+), 14 deletions(-) diff --git a/Makefile b/Makefile index 798ea4be..f22f92a4 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,6 @@ all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o crc32c.o ringloop.o test +clean: + rm -f *.o crc32c.o: crc32c.c gcc -c -o $@ $< %.o: %.cpp diff --git a/blockstore.cpp b/blockstore.cpp index 5c0d6590..89969a0a 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -63,7 +63,24 @@ void blockstore::handle_event(ring_data_t *data) } else { - + struct blockstore_operation* op = (struct blockstore_operation*)data->op; + if ((op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || + (op->flags & OP_TYPE_MASK) == OP_READ) + { + op->pending_ops--; + if (data->res < 0) + { + // read error + op->retval = data->res; + } + if (op->pending_ops == 0) + { + if (op->retval == 0) + op->retval = op->len; + op->callback(op); + in_process_ops.erase(op); + } + } } } diff --git a/blockstore.h b/blockstore.h index 551fbe04..c506f4e8 100644 --- a/blockstore.h +++ b/blockstore.h @@ -121,6 +121,7 @@ public: #define OP_SYNC 4 #define OP_STABLE 5 #define OP_DELETE 6 +#define OP_TYPE_MASK 0x7 #define WAIT_SQE 1 #define WAIT_IN_FLIGHT 2 @@ -135,9 +136,10 @@ struct blockstore_operation uint32_t offset; uint32_t len; uint8_t *buf; + int retval; std::map read_vec; - int completed; + int pending_ops; int wait_for; uint64_t wait_version; }; @@ -171,7 +173,10 @@ public: ring_loop_t *ringloop; - struct io_uring_sqe* get_sqe(); + inline struct io_uring_sqe* get_sqe() + { + return ringloop->get_sqe(ring_consumer.number); + } blockstore(spp::sparse_hash_map & config, ring_loop_t *ringloop); ~blockstore(); diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 5ecca4c0..ab4141c3 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -32,7 +32,7 @@ int blockstore_init_meta::loop() } if (!submitted) { - struct io_uring_sqe *sqe = bs->ringloop->get_sqe(); + struct io_uring_sqe *sqe = bs->get_sqe(); if (!sqe) { throw new std::runtime_error("io_uring is full while trying to read metadata"); @@ -170,7 +170,7 @@ int blockstore_init_journal::loop() if (step == 0) { // Step 1: Read first block of the journal - struct io_uring_sqe *sqe = bs->ringloop->get_sqe(); + struct io_uring_sqe *sqe = bs->get_sqe(); if (!sqe) { throw new std::runtime_error("io_uring is full while trying to read journal"); @@ -194,7 +194,7 @@ int blockstore_init_journal::loop() } else { - struct io_uring_sqe *sqe = bs->ringloop->get_sqe(); + struct io_uring_sqe *sqe = bs->get_sqe(); if (!sqe) { throw new std::runtime_error("io_uring is full while trying to read journal"); diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 93bb0aad..710858f8 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -25,19 +25,19 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s read_op->wait_for = WAIT_SQE; return -1; } - read_op->read_vec[cur_start] = (struct iovec){ + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + data->iov = (struct iovec){ read_op->buf + cur_start - read_op->offset, cur_end - cur_start }; - // Тут 2 вопроса - 1) куда сохранить iovec 2) как потом сопоставить i/o и cqe + read_op->read_vec[cur_start] = data->iov; io_uring_prep_readv( sqe, IS_JOURNAL(item_state) ? journal_fd : data_fd, - // FIXME: &read_op->read_vec is forbidden - &read_op->read_vec[cur_start], 1, + &data->iov, 1, (IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start ); - ((ring_data_t*)(sqe->user_data))->op = read_op; + data->op = read_op; } return 0; } @@ -84,6 +84,7 @@ int blockstore::read(blockstore_operation *read_op) { // region is not allocated - return zeroes memset(read_op->buf, 0, read_op->len); + read_op->retval = read_op->len; read_op->callback(read_op); return 0; } @@ -94,7 +95,7 @@ int blockstore::read(blockstore_operation *read_op) dirty_list dirty = dirty_it->second; for (int i = dirty.size()-1; i >= 0; i--) { - if (read_op->flags == OP_READ_DIRTY || IS_STABLE(dirty[i].state)) + if ((read_op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || IS_STABLE(dirty[i].state)) { if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0) { @@ -114,7 +115,7 @@ int blockstore::read(blockstore_operation *read_op) // need to wait for something, undo added requests and requeue op ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); - // FIXME: bad implementation + // FIXME: manage enqueue/dequeue/requeue submit_queue.push_front(read_op); return 0; } @@ -123,10 +124,12 @@ int blockstore::read(blockstore_operation *read_op) { // region is not allocated - return zeroes memset(read_op->buf, 0, read_op->len); + read_op->retval = read_op->len; read_op->callback(read_op); return 0; } - // FIXME reap events! + read_op->retval = 0; + read_op->pending_ops = read_op->read_vec.size(); int ret = ringloop->submit(); if (ret < 0) { diff --git a/ringloop.cpp b/ringloop.cpp index 5b231aa4..521fa6c0 100644 --- a/ringloop.cpp +++ b/ringloop.cpp @@ -29,6 +29,18 @@ struct io_uring_sqe* ring_loop_t::get_sqe() return sqe; } +struct io_uring_sqe* ring_loop_t::get_sqe(int consumer) +{ + struct io_uring_sqe* sqe = io_uring_get_sqe(ring); + if (sqe) + { + struct ring_data_t *data = ring_data + (sqe - ring->sq.sqes); + io_uring_sqe_set_data(sqe, data); + data->source = consumer; + } + return sqe; +} + int ring_loop_t::register_consumer(ring_consumer_t & consumer) { consumer.number = consumers.size(); diff --git a/ringloop.h b/ringloop.h index 69f7d7c2..42ea265b 100644 --- a/ringloop.h +++ b/ringloop.h @@ -34,6 +34,7 @@ public: ring_loop_t(int qd); ~ring_loop_t(); struct io_uring_sqe* get_sqe(); + struct io_uring_sqe* get_sqe(int consumer); int register_consumer(ring_consumer_t & consumer); void unregister_consumer(int number); void loop(bool sleep);