From 90f081f398109127a96565bc6fc3744cb74d09cc Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 8 Nov 2019 11:36:08 +0300 Subject: [PATCH] Check for op->wait_for conditions It's almost identical to just re-submit... so maybe it was pointless --- blockstore.cpp | 86 +++++++++++++++++++++++++++++++++----------- blockstore_read.cpp | 9 +++++ blockstore_write.cpp | 6 +++- 3 files changed, 79 insertions(+), 22 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index bd197b13..c4e66afc 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -164,51 +164,95 @@ void blockstore::loop() else { // try to submit ops - auto op = submit_queue.begin(); - while (op != submit_queue.end()) + auto cur = submit_queue.begin(); + while (cur != submit_queue.end()) { - auto cur = op++; - if ((*cur)->wait_for == WAIT_SQE) + auto op_ptr = cur; + auto op = *(cur++); + if (op->wait_for) { - + if (op->wait_for == WAIT_SQE) + { + if (io_uring_sq_space_left(ringloop->ring) < op->wait_detail) + { + // stop submission if there's still no free space + break; + } + op->wait_for = 0; + } + else if (op->wait_for == WAIT_IN_FLIGHT) + { + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->wait_detail, + }); + if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state)) + { + // do not submit + continue; + } + op->wait_for = 0; + } + else if (op->wait_for == WAIT_JOURNAL) + { + if (journal.used_start < op->wait_detail) + { + // do not submit + continue; + } + op->wait_for = 0; + } + else if (op->wait_for == WAIT_JOURNAL_BUFFER) + { + if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0) + { + // do not submit + continue; + } + op->wait_for = 0; + } + else + { + throw new std::runtime_error("BUG: op->wait_for value is unexpected"); + } } - else if ((*cur)->wait_for == WAIT_IN_FLIGHT) + if ((op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || + (op->flags & OP_TYPE_MASK) == OP_READ) { - - } - if (((*cur)->flags & OP_TYPE_MASK) == OP_READ_DIRTY || - ((*cur)->flags & OP_TYPE_MASK) == OP_READ) - { - int dequeue_op = dequeue_read(*cur); + int dequeue_op = dequeue_read(op); if (dequeue_op) { - submit_queue.erase(cur); + submit_queue.erase(op_ptr); } - else if ((*cur)->wait_for == WAIT_SQE) + else if (op->wait_for == WAIT_SQE) { // ring is full, stop submission break; } } - else if (((*cur)->flags & OP_TYPE_MASK) == OP_WRITE || - ((*cur)->flags & OP_TYPE_MASK) == OP_DELETE) + else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || + (op->flags & OP_TYPE_MASK) == OP_DELETE) { - int dequeue_op = dequeue_write(*cur); + int dequeue_op = dequeue_write(op); if (dequeue_op) { - submit_queue.erase(cur); + submit_queue.erase(op_ptr); } - else if ((*cur)->wait_for == WAIT_SQE) + else if (op->wait_for == WAIT_SQE) { // ring is full, stop submission break; } } - else if (((*cur)->flags & OP_TYPE_MASK) == OP_SYNC) + else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) { + // wait for all small writes to be submitted + // wait for all big writes to complete, submit data device fsync + // wait for the data device fsync to complete, then submit journal writes for big writes + // then submit an fsync operation } - else if (((*cur)->flags & OP_TYPE_MASK) == OP_STABLE) + else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) { } diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 43eadbda..9e1dbc6b 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -96,6 +96,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op) return 1; } unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; + unsigned ring_space = io_uring_sq_space_left(ringloop->ring); uint64_t fulfilled = 0; if (dirty_found) { @@ -108,6 +109,10 @@ int blockstore::dequeue_read(blockstore_operation *read_op) dirty.state, dirty_it->first.version, dirty.location) < 0) { // need to wait. undo added requests, don't dequeue op + if (read_op->wait_for == WAIT_SQE) + { + read_op->wait_detail = 1 + ring_space; + } ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); return 0; @@ -121,6 +126,10 @@ int blockstore::dequeue_read(blockstore_operation *read_op) if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0) { // need to wait. undo added requests, don't dequeue op + if (read_op->wait_for == WAIT_SQE) + { + read_op->wait_detail = 1 + ring_space; + } ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); return 0; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 5814c66f..463ef09a 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -23,6 +23,7 @@ int blockstore::dequeue_write(blockstore_operation *op) { // Pause until there are more requests available op->wait_for = WAIT_SQE; + op->wait_detail = 1; return 0; } struct ring_data_t *data = ((ring_data_t*)sqe->user_data); @@ -67,15 +68,18 @@ int blockstore::dequeue_write(blockstore_operation *op) { // No space in the journal. Wait for it. op->wait_for = WAIT_JOURNAL; - op->wait_detail = next_pos - journal.used_start; + op->wait_detail = next_pos; return 0; } // There is sufficient space. Get SQE(s) + unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; struct io_uring_sqe *sqe1 = get_sqe(), *sqe2 = two_sqes ? get_sqe() : NULL; if (!sqe1 || two_sqes && !sqe2) { // Pause until there are more requests available op->wait_for = WAIT_SQE; + op->wait_detail = two_sqes ? 2 : 1; + ringloop->ring->sq.sqe_tail = prev_sqe_pos; return 0; } struct ring_data_t *data1 = ((ring_data_t*)sqe1->user_data);