diff --git a/blockstore.h b/blockstore.h index bc0cd069..0dd85935 100644 --- a/blockstore.h +++ b/blockstore.h @@ -202,6 +202,11 @@ public: // Suspend operation until there is some free space on the data device #define WAIT_FREE 5 +struct fulfill_read_t +{ + uint64_t offset, len; +}; + struct blockstore_operation { // flags contain operation type and possibly other flags @@ -231,7 +236,7 @@ private: int pending_ops; // Read - std::map read_vec; + std::vector read_vec; // Sync, write uint64_t min_used_journal_sector, max_used_journal_sector; @@ -312,8 +317,8 @@ class blockstore int dequeue_read(blockstore_operation *read_op); int fulfill_read(blockstore_operation *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location); - int fulfill_read_push(blockstore_operation *read_op, uint64_t &fulfilled, uint32_t item_start, - uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end); + int fulfill_read_push(blockstore_operation *op, void *buf, uint64_t offset, uint64_t len, + uint32_t item_state, uint64_t item_version); void handle_read_event(ring_data_t *data, blockstore_operation *op); // Write diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index a528d218..412516ec 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -200,7 +200,7 @@ bool journal_flusher_co::loop() { // First we submit all reads offset = dirty_it->second.offset; - len = dirty_it->second.len; + end_offset = dirty_it->second.offset + dirty_it->second.len; it = v.begin(); while (1) { @@ -210,7 +210,7 @@ bool journal_flusher_co::loop() if (it == v.end() || it->offset > offset) { submit_offset = dirty_it->second.location + offset - dirty_it->second.offset; - submit_len = it == v.end() || it->offset >= offset+len ? len : it->offset-offset; + submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset; it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) }); copy_count++; if (bs->journal.inmemory) @@ -230,10 +230,9 @@ bool journal_flusher_co::loop() wait_count++; } } - if (it == v.end() || it->offset+it->len >= offset+len) - { + offset = it->offset+it->len; + if (it == v.end() || offset >= end_offset) break; - } } } else if (dirty_it->second.state == ST_D_STABLE && !skip_copy) diff --git a/blockstore_flush.h b/blockstore_flush.h index f1c5f54f..f6e20eac 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -43,7 +43,7 @@ class journal_flusher_co std::vector v; std::vector::iterator it; int copy_count; - uint64_t offset, len, submit_offset, submit_len, clean_loc, old_clean_loc, old_clean_ver; + uint64_t offset, end_offset, submit_offset, submit_len, clean_loc, old_clean_loc, old_clean_ver; flusher_meta_write_t meta_old, meta_new; std::map::iterator repeat_it; std::function simple_callback_r, simple_callback_w; diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 7d6d9ee0..0002f3b2 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -1,50 +1,36 @@ #include "blockstore.h" -int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled, uint32_t item_start, - uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end) +int blockstore::fulfill_read_push(blockstore_operation *op, void *buf, uint64_t offset, uint64_t len, + uint32_t item_state, uint64_t item_version) { - if (cur_end > cur_start) + if (IS_IN_FLIGHT(item_state)) { - if (IS_IN_FLIGHT(item_state)) - { - // Pause until it's written somewhere - op->wait_for = WAIT_IN_FLIGHT; - op->wait_detail = item_version; - return 0; - } - else if (IS_DELETE(item_state)) - { - // item is unallocated - return zeroes - memset((uint8_t*)op->buf + cur_start - op->offset, 0, cur_end - cur_start); - return 1; - } - if (journal.inmemory && IS_JOURNAL(item_state)) - { - iovec v = { - (uint8_t*)op->buf + cur_start - op->offset, - cur_end - cur_start - }; - op->read_vec[cur_start] = v; - memcpy(v.iov_base, journal.buffer + item_location + cur_start - item_start, v.iov_len); - return 1; - } - BS_SUBMIT_GET_SQE(sqe, data); - data->iov = (struct iovec){ - (uint8_t*)op->buf + cur_start - op->offset, - cur_end - cur_start - }; - // FIXME: use simple std::vector instead of map for read_vec - op->read_vec[cur_start] = data->iov; - op->pending_ops++; - my_uring_prep_readv( - sqe, - IS_JOURNAL(item_state) ? journal.fd : data_fd, - &data->iov, 1, - (IS_JOURNAL(item_state) ? journal.offset : data_offset) + item_location + cur_start - item_start - ); - data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); }; - fulfilled += cur_end-cur_start; + // Pause until it's written somewhere + op->wait_for = WAIT_IN_FLIGHT; + op->wait_detail = item_version; + return 0; } + else if (IS_DELETE(item_state)) + { + // item is unallocated - return zeroes + memset(buf, 0, len); + return 1; + } + if (journal.inmemory && IS_JOURNAL(item_state)) + { + memcpy(buf, journal.buffer + offset, len); + return 1; + } + BS_SUBMIT_GET_SQE(sqe, data); + data->iov = (struct iovec){ buf, len }; + op->pending_ops++; + my_uring_prep_readv( + sqe, + IS_JOURNAL(item_state) ? journal.fd : data_fd, + &data->iov, 1, + (IS_JOURNAL(item_state) ? journal.offset : data_offset) + offset + ); + data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); }; return 1; } @@ -56,27 +42,28 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint64_t &fulfilled, { cur_start = cur_start < read_op->offset ? read_op->offset : cur_start; item_end = item_end > read_op->offset + read_op->len ? read_op->offset + read_op->len : item_end; - auto fulfill_near = read_op->read_vec.lower_bound(cur_start); - if (fulfill_near != read_op->read_vec.begin()) + auto it = read_op->read_vec.begin(); + while (1) { - fulfill_near--; - if (fulfill_near->first + fulfill_near->second.iov_len <= cur_start) + for (; it != read_op->read_vec.end(); it++) + if (it->offset >= cur_start) + break; + if (it == read_op->read_vec.end() || it->offset > cur_start) { - fulfill_near++; + fulfill_read_t el = { + .offset = cur_start, + .len = it == read_op->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start, + }; + it = read_op->read_vec.insert(it, el); + fulfilled += el.len; + if (!fulfill_read_push(read_op, read_op->buf + el.offset - read_op->offset, item_location + el.offset - item_start, el.len, item_state, item_version)) + { + return 0; + } } - } - while (fulfill_near != read_op->read_vec.end() && fulfill_near->first < item_end) - { - if (!fulfill_read_push(read_op, fulfilled, item_start, item_state, item_version, item_location, cur_start, fulfill_near->first)) - { - return 0; - } - cur_start = fulfill_near->first + fulfill_near->second.iov_len; - fulfill_near++; - } - if (!fulfill_read_push(read_op, fulfilled, item_start, item_state, item_version, item_location, cur_start, item_end)) - { - return 0; + cur_start = it->offset + it->len; + if (it == read_op->read_vec.end() || cur_start >= item_end) + break; } } return 1; @@ -141,10 +128,18 @@ int blockstore::dequeue_read(blockstore_operation *read_op) return 0; } } - if (!read_op->read_vec.size()) + if (!read_op->pending_ops) { - // region is not allocated - return zeroes - memset(read_op->buf, 0, read_op->len); + // everything is fulfilled from memory + if (!read_op->read_vec.size()) + { + // region is not allocated - return zeroes + memset(read_op->buf, 0, read_op->len); + } + if (fulfilled != read_op->len) + { + printf("BUG: fulfilled %lu < %d read bytes\n", fulfilled, read_op->len); + } read_op->retval = read_op->len; read_op->callback(read_op); return 1; diff --git a/fio_engine.cpp b/fio_engine.cpp index e84c70a2..a41c8d73 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -194,11 +194,15 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) }; op->offset = io->offset % bsd->bs->block_size; op->len = io->xfer_buflen; - op->callback = [io](blockstore_operation *op) + op->callback = [io, n](blockstore_operation *op) { io->error = op->retval < 0 ? -op->retval : 0; bs_data *bsd = (bs_data*)io->engine_data; + bsd->inflight--; bsd->completed.push_back(io); +#ifdef BLOCKSTORE_DEBUG + printf("--- OP_READ %llx n=%d retval=%d\n", io, n, op->retval); +#endif delete op; }; break; @@ -276,7 +280,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io) } #ifdef BLOCKSTORE_DEBUG - printf("+++ %s %llx\n", op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC", io); + printf("+++ %s %llx n=%d\n", op->flags == OP_READ ? "OP_READ" : (op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n); #endif io->error = 0; bsd->inflight++;