From 19abe6227e088954e9d28ae01caf61ffd3bac03b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 17 Dec 2019 01:44:08 +0300 Subject: [PATCH] Fix submission ring overflow & ring_data_t reuse conflicts --- blockstore_flush.cpp | 6 +++--- blockstore_impl.cpp | 8 ++++---- blockstore_impl.h | 1 + fio_engine.cpp | 2 +- osd.cpp | 19 ++++++++++++------- ringloop.cpp | 29 ++++++++++++++++++++++++++--- ringloop.h | 37 ++++++++++++++++++++++++++----------- 7 files changed, 73 insertions(+), 29 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 8d1a16479..505864e3b 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -37,7 +37,7 @@ journal_flusher_co::journal_flusher_co() { throw std::runtime_error( "write operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ - "). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" + "). state "+std::to_string(wait_state)+". in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111" ); } wait_count--; @@ -443,7 +443,6 @@ bool journal_flusher_co::loop() { // Update journal "superblock" await_sqe(12); - data->callback = simple_callback_w; *((journal_entry_start*)flusher->journal_superblock) = { .crc32 = 0, .magic = JOURNAL_MAGIC, @@ -454,6 +453,7 @@ bool journal_flusher_co::loop() }; ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); data->iov = (struct iovec){ flusher->journal_superblock, 512 }; + data->callback = simple_callback_w; my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); wait_count++; resume_13: @@ -611,8 +611,8 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) { // Sync batch is ready. Do it. await_sqe(0); - data->callback = simple_callback_w; data->iov = { 0 }; + data->callback = simple_callback_w; my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC); cur_sync->state = 1; wait_count++; diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index abdb851e3..c9d412d72 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -131,8 +131,8 @@ void blockstore_impl_t::loop() continue; } } - unsigned ring_space = io_uring_sq_space_left(&ringloop->ring); - unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail; + unsigned ring_space = ringloop->space_left(); + unsigned prev_sqe_pos = ringloop->save(); int dequeue_op = 0; if ((op->flags & OP_TYPE_MASK) == OP_READ) { @@ -172,7 +172,7 @@ void blockstore_impl_t::loop() } else { - ringloop->ring.sq.sqe_tail = prev_sqe_pos; + ringloop->restore(prev_sqe_pos); if (PRIV(op)->wait_for == WAIT_SQE) { PRIV(op)->wait_detail = 1 + ring_space; @@ -225,7 +225,7 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op) { if (PRIV(op)->wait_for == WAIT_SQE) { - if (io_uring_sq_space_left(&ringloop->ring) < PRIV(op)->wait_detail) + if (ringloop->space_left() < PRIV(op)->wait_detail) { // stop submission if there's still no free space return; diff --git a/blockstore_impl.h b/blockstore_impl.h index 68a4f1cd6..7da131a48 100644 --- a/blockstore_impl.h +++ b/blockstore_impl.h @@ -1,6 +1,7 @@ #pragma once #include "blockstore.h" +#include "timerfd_interval.h" #include #include diff --git a/fio_engine.cpp b/fio_engine.cpp index ab43ecba5..2ce796200 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -133,7 +133,7 @@ static void bs_cleanup(struct thread_data *td) bsd->ringloop->loop(); if (bsd->bs->is_safe_to_stop()) goto safe; - } while (bsd->ringloop->loop_again); + } while (bsd->ringloop->get_loop_again()); bsd->ringloop->wait(); } safe: diff --git a/osd.cpp b/osd.cpp index af7a87a55..0d54b2030 100644 --- a/osd.cpp +++ b/osd.cpp @@ -194,15 +194,20 @@ void osd_t::stop_client(int peer_fd) throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } auto it = clients.find(peer_fd); - if (it->second.read_ready) + for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) { - for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) + if (*rit == peer_fd) { - if (*rit == peer_fd) - { - read_ready_clients.erase(rit); - break; - } + read_ready_clients.erase(rit); + break; + } + } + for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++) + { + if (*wit == peer_fd) + { + write_ready_clients.erase(wit); + break; } } clients.erase(it); diff --git a/ringloop.cpp b/ringloop.cpp index 11358efca..7c1598be2 100644 --- a/ringloop.cpp +++ b/ringloop.cpp @@ -7,16 +7,23 @@ ring_loop_t::ring_loop_t(int qd) { throw std::runtime_error(std::string("io_uring_queue_init: ") + strerror(-ret)); } - ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring.sq.ring_sz); - if (!ring_data) + free_ring_data_ptr = *ring.cq.kring_entries; + ring_datas = (struct ring_data_t*)malloc(sizeof(ring_data_t) * free_ring_data_ptr); + free_ring_data = (int*)malloc(sizeof(int) * free_ring_data_ptr); + if (!ring_datas || !free_ring_data) { throw std::bad_alloc(); } + for (int i = 0; i < free_ring_data_ptr; i++) + { + free_ring_data[i] = i; + } } ring_loop_t::~ring_loop_t() { - free(ring_data); + free(free_ring_data); + free(ring_datas); io_uring_queue_exit(&ring); } @@ -52,6 +59,7 @@ void ring_loop_t::loop() d->res = cqe->res; d->callback(d); } + free_ring_data[free_ring_data_ptr++] = d - ring_datas; io_uring_cqe_seen(&ring, cqe); } do @@ -63,3 +71,18 @@ void ring_loop_t::loop() } } while (loop_again); } + +unsigned ring_loop_t::save() +{ + return ring.sq.sqe_tail; +} + +void ring_loop_t::restore(unsigned sqe_tail) +{ + assert(ring.sq.sqe_tail >= sqe_tail); + for (unsigned i = sqe_tail; i < ring.sq.sqe_tail; i++) + { + free_ring_data[free_ring_data_ptr++] = ((ring_data_t*)ring.sq.sqes[i & *ring.sq.kring_mask].user_data) - ring_datas; + } + ring.sq.sqe_tail = sqe_tail; +} diff --git a/ringloop.h b/ringloop.h index df232e285..4014cfb42 100644 --- a/ringloop.h +++ b/ringloop.h @@ -4,8 +4,9 @@ #define _LARGEFILE64_SOURCE #endif -#include #include +#include +#include #include #include @@ -119,26 +120,26 @@ struct ring_consumer_t class ring_loop_t { std::vector consumers; - struct ring_data_t *ring_data; -public: + struct ring_data_t *ring_datas; + int *free_ring_data; + unsigned free_ring_data_ptr; bool loop_again; struct io_uring ring; +public: ring_loop_t(int qd); ~ring_loop_t(); + int register_consumer(ring_consumer_t & consumer); + void unregister_consumer(ring_consumer_t & consumer); + inline struct io_uring_sqe* get_sqe() { - // FIXME: Limit inflight ops count to not overflow the completion ring + if (free_ring_data_ptr == 0) + return NULL; struct io_uring_sqe* sqe = io_uring_get_sqe(&ring); if (sqe) - { - io_uring_sqe_set_data(sqe, ring_data + (sqe - ring.sq.sqes)); - } + io_uring_sqe_set_data(sqe, ring_datas + free_ring_data[--free_ring_data_ptr]); return sqe; } - int register_consumer(ring_consumer_t & consumer); - void wakeup(); - void unregister_consumer(ring_consumer_t & consumer); - void loop(); inline int submit() { return io_uring_submit(&ring); @@ -148,4 +149,18 @@ public: struct io_uring_cqe *cqe; return io_uring_wait_cqe(&ring, &cqe); } + inline unsigned space_left() + { + return free_ring_data_ptr; + } + inline bool get_loop_again() + { + return loop_again; + } + + void loop(); + void wakeup(); + + unsigned save(); + void restore(unsigned sqe_tail); };