From 7456f0f7e237ebb5d23dda7fe6346804e6ecb8f2 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 8 Nov 2019 19:54:31 +0300 Subject: [PATCH] Remove duplicate code --- blockstore.cpp | 56 +++++++++++++++++++-------------------- blockstore.h | 10 +++++++ blockstore_read.cpp | 62 ++++++++++++++------------------------------ blockstore_write.cpp | 58 ++++------------------------------------- 4 files changed, 60 insertions(+), 126 deletions(-) diff --git a/blockstore.cpp b/blockstore.cpp index 8ec4486e..0088220c 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -221,34 +221,19 @@ void blockstore::loop() throw new std::runtime_error("BUG: op->wait_for value is unexpected"); } } + unsigned ring_space = io_uring_sq_space_left(ringloop->ring); + unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; + int dequeue_op = 0; if ((op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || (op->flags & OP_TYPE_MASK) == OP_READ) { - int dequeue_op = dequeue_read(op); - if (dequeue_op) - { - submit_queue.erase(op_ptr); - } - else if (op->wait_for == WAIT_SQE) - { - // ring is full, stop submission - break; - } + dequeue_op = dequeue_read(op); } else if ((op->flags & OP_TYPE_MASK) == OP_WRITE || (op->flags & OP_TYPE_MASK) == OP_DELETE) { - int dequeue_op = dequeue_write(op); - if (dequeue_op) - { - submit_queue.erase(op_ptr); - } - else if (op->wait_for == WAIT_SQE) - { - // ring is full, stop submission - break; - } has_writes = true; + dequeue_op = dequeue_write(op); } else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) { @@ -261,21 +246,32 @@ void blockstore::loop() // Can't submit SYNC before previous writes continue; } - int dequeue_op = dequeue_sync(op); - if (dequeue_op) - { - submit_queue.erase(op_ptr); - } - else if (op->wait_for == WAIT_SQE) - { - // ring is full, stop submission - break; - } + dequeue_op = dequeue_sync(op); } else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) { } + if (dequeue_op) + { + int ret = ringloop->submit(); + if (ret < 0) + { + throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); + } + submit_queue.erase(op_ptr); + in_process_ops.insert(op); + } + else + { + ringloop->ring->sq.sqe_tail = prev_sqe_pos; + if (op->wait_for == WAIT_SQE) + { + op->wait_detail = 1 + ring_space; + // ring is full, stop submission + break; + } + } } } } diff --git a/blockstore.h b/blockstore.h index 28badf76..b625ce67 100644 --- a/blockstore.h +++ b/blockstore.h @@ -65,6 +65,16 @@ #define STRIPE_NUM(oid) ((oid) >> 4) #define STRIPE_REPLICA(oid) ((oid) & 0xf) +#define BS_SUBMIT_GET_SQE(sqe, data) \ + struct io_uring_sqe *sqe = get_sqe();\ + if (!sqe)\ + {\ + // Pause until there are more requests available\ + op->wait_for = WAIT_SQE;\ + return 0;\ + }\ + struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + // 16 bytes per object/stripe id // stripe includes replica number in 4 least significant bits struct __attribute__((__packed__)) object_id diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 9e1dbc6b..79a9e9d2 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -1,6 +1,6 @@ #include "blockstore.h" -int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_start, +int blockstore::fulfill_read_push(blockstore_operation *op, uint32_t item_start, uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end) { if (cur_end > cur_start) @@ -8,38 +8,31 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s if (IS_IN_FLIGHT(item_state)) { // Pause until it's written somewhere - read_op->wait_for = WAIT_IN_FLIGHT; - read_op->wait_detail = item_version; - return -1; + op->wait_for = WAIT_IN_FLIGHT; + op->wait_detail = item_version; + return 0; } else if (item_state == ST_DEL_WRITTEN || item_state == ST_DEL_SYNCED || item_state == ST_DEL_MOVED) { // item is unallocated - return zeroes - memset(read_op->buf + cur_start - read_op->offset, 0, cur_end - cur_start); - return 0; + memset(op->buf + cur_start - op->offset, 0, cur_end - cur_start); + return 1; } - struct io_uring_sqe *sqe = get_sqe(); - if (!sqe) - { - // Pause until there are more requests available - read_op->wait_for = WAIT_SQE; - return -1; - } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + BS_SUBMIT_GET_SQE(sqe, data); data->iov = (struct iovec){ - read_op->buf + cur_start - read_op->offset, + op->buf + cur_start - op->offset, cur_end - cur_start }; - read_op->read_vec[cur_start] = data->iov; + op->read_vec[cur_start] = data->iov; io_uring_prep_readv( sqe, IS_JOURNAL(item_state) ? journal.fd : data_fd, &data->iov, 1, (IS_JOURNAL(item_state) ? journal.offset : data_offset) + item_location + cur_start - item_start ); - data->op = read_op; + data->op = op; } - return 0; + return 1; } int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start, uint32_t item_end, @@ -61,19 +54,19 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start, } while (fulfill_near != read_op->read_vec.end() && fulfill_near->first < item_end) { - if (fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, fulfill_near->first) < 0) + if (!fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, fulfill_near->first)) { - return -1; + return 0; } cur_start = fulfill_near->first + fulfill_near->second.iov_len; fulfill_near++; } - if (fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, item_end) < 0) + if (!fulfill_read_push(read_op, item_start, item_state, item_version, item_location, cur_start, item_end)) { - return -1; + return 0; } } - return 0; + return 1; } int blockstore::dequeue_read(blockstore_operation *read_op) @@ -95,8 +88,6 @@ int blockstore::dequeue_read(blockstore_operation *read_op) read_op->callback(read_op); return 1; } - unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; - unsigned ring_space = io_uring_sq_space_left(ringloop->ring); uint64_t fulfilled = 0; if (dirty_found) { @@ -105,15 +96,10 @@ int blockstore::dequeue_read(blockstore_operation *read_op) dirty_entry& dirty = dirty_it->second; if ((read_op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || IS_STABLE(dirty.state)) { - if (fulfill_read(read_op, dirty.offset, dirty.offset + dirty.size, - dirty.state, dirty_it->first.version, dirty.location) < 0) + if (!fulfill_read(read_op, dirty.offset, dirty.offset + dirty.size, + dirty.state, dirty_it->first.version, dirty.location)) { // need to wait. undo added requests, don't dequeue op - if (read_op->wait_for == WAIT_SQE) - { - read_op->wait_detail = 1 + ring_space; - } - ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); return 0; } @@ -123,14 +109,9 @@ int blockstore::dequeue_read(blockstore_operation *read_op) } if (clean_it != object_db.end()) { - if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0) + if (!fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location)) { // need to wait. undo added requests, don't dequeue op - if (read_op->wait_for == WAIT_SQE) - { - read_op->wait_detail = 1 + ring_space; - } - ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); return 0; } @@ -146,10 +127,5 @@ int blockstore::dequeue_read(blockstore_operation *read_op) read_op->retval = 0; read_op->pending_ops = read_op->read_vec.size(); in_process_ops.insert(read_op); - int ret = ringloop->submit(); - if (ret < 0) - { - throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); - } return 1; } diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 25b95ebe..dc645879 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -18,15 +18,7 @@ int blockstore::dequeue_write(blockstore_operation *op) op->callback(op); return 1; } - struct io_uring_sqe *sqe = get_sqe(); - if (!sqe) - { - // Pause until there are more requests available - op->wait_for = WAIT_SQE; - op->wait_detail = 1; - return 0; - } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + BS_GET_SQE(sqe, data); dirty_it->second.location = loc << block_order; dirty_it->second.state = ST_D_SUBMITTED; allocator_set(data_alloc, loc, true); @@ -43,7 +35,6 @@ int blockstore::dequeue_write(blockstore_operation *op) // Small (journaled) write // First check if the journal has sufficient space // FIXME Always two SQEs for now. Although it's possible to send 1 sometimes - bool two_sqes = true; uint64_t next_pos = journal.next_free; if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)) { @@ -73,17 +64,8 @@ int blockstore::dequeue_write(blockstore_operation *op) } // There is sufficient space. Get SQE(s) unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; - struct io_uring_sqe *sqe1 = get_sqe(), *sqe2 = two_sqes ? get_sqe() : NULL; - if (!sqe1 || two_sqes && !sqe2) - { - // Pause until there are more requests available - op->wait_for = WAIT_SQE; - op->wait_detail = two_sqes ? 2 : 1; - ringloop->ring->sq.sqe_tail = prev_sqe_pos; - return 0; - } - struct ring_data_t *data1 = ((ring_data_t*)sqe1->user_data); - struct ring_data_t *data2 = two_sqes ? ((ring_data_t*)sqe2->user_data) : NULL; + BS_GET_SQE(sqe1, data1); + BS_GET_SQE(sqe2, data2); // Got SQEs. Prepare journal sector write if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write)) { @@ -134,12 +116,6 @@ int blockstore::dequeue_write(blockstore_operation *op) op->pending_ops = 2; op->used_journal_sector = 1 + journal.cur_sector; } - in_process_ops.insert(op); - int ret = ringloop->submit(); - if (ret < 0) - { - throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); - } return 1; } @@ -161,32 +137,15 @@ int blockstore::dequeue_sync(blockstore_operation *op) if (op->has_big_writes == 0x10000 || op->has_big_writes == ST_D_META_WRITTEN) { // Just fsync the journal - struct io_uring_sqe *sqe = get_sqe(); - if (!sqe) - { - // Pause until there are more requests available - op->wait_for = WAIT_SQE; - op->wait_detail = 1; - return 0; - } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + BS_SUBMIT_GET_SQE(sqe, data); io_uring_prep_fsync(sqe, journal.fd, 0); data->op = op; op->pending_ops = 1; } else if (op->has_big_writes == ST_D_WRITTEN) { - // FIXME: try to remove duplicated get_sqe+!sqe+data code // 1st step: fsync data - struct io_uring_sqe *sqe = get_sqe(); - if (!sqe) - { - // Pause until there are more requests available - op->wait_for = WAIT_SQE; - op->wait_detail = 1; - return 0; - } - struct ring_data_t *data = ((ring_data_t*)sqe->user_data); + BS_SUBMIT_GET_SQE(sqe, data); io_uring_prep_fsync(sqe, data_fd, 0); data->op = op; op->pending_ops = 1; @@ -196,12 +155,5 @@ int blockstore::dequeue_sync(blockstore_operation *op) // 2nd step: Data device is synced, prepare & write journal entries } - // FIXME: try to remove this duplicated code, too - in_process_ops.insert(op); - int ret = ringloop->submit(); - if (ret < 0) - { - throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); - } return 1; }