Browse Source

Fix read_fulfill, use vector

blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
aaea3e1f99
  1. 11
      blockstore.h
  2. 9
      blockstore_flush.cpp
  3. 2
      blockstore_flush.h
  4. 121
      blockstore_read.cpp
  5. 8
      fio_engine.cpp

11
blockstore.h

@ -202,6 +202,11 @@ public:
// Suspend operation until there is some free space on the data device
#define WAIT_FREE 5
struct fulfill_read_t
{
uint64_t offset, len;
};
struct blockstore_operation
{
// flags contain operation type and possibly other flags
@ -231,7 +236,7 @@ private:
int pending_ops;
// Read
std::map<uint64_t, struct iovec> read_vec;
std::vector<fulfill_read_t> read_vec;
// Sync, write
uint64_t min_used_journal_sector, max_used_journal_sector;
@ -312,8 +317,8 @@ class blockstore
int dequeue_read(blockstore_operation *read_op);
int fulfill_read(blockstore_operation *read_op, uint64_t &fulfilled, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location);
int fulfill_read_push(blockstore_operation *read_op, uint64_t &fulfilled, uint32_t item_start,
uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end);
int fulfill_read_push(blockstore_operation *op, void *buf, uint64_t offset, uint64_t len,
uint32_t item_state, uint64_t item_version);
void handle_read_event(ring_data_t *data, blockstore_operation *op);
// Write

9
blockstore_flush.cpp

@ -200,7 +200,7 @@ bool journal_flusher_co::loop()
{
// First we submit all reads
offset = dirty_it->second.offset;
len = dirty_it->second.len;
end_offset = dirty_it->second.offset + dirty_it->second.len;
it = v.begin();
while (1)
{
@ -210,7 +210,7 @@ bool journal_flusher_co::loop()
if (it == v.end() || it->offset > offset)
{
submit_offset = dirty_it->second.location + offset - dirty_it->second.offset;
submit_len = it == v.end() || it->offset >= offset+len ? len : it->offset-offset;
submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset;
it = v.insert(it, (copy_buffer_t){ .offset = offset, .len = submit_len, .buf = memalign(512, submit_len) });
copy_count++;
if (bs->journal.inmemory)
@ -230,10 +230,9 @@ bool journal_flusher_co::loop()
wait_count++;
}
}
if (it == v.end() || it->offset+it->len >= offset+len)
{
offset = it->offset+it->len;
if (it == v.end() || offset >= end_offset)
break;
}
}
}
else if (dirty_it->second.state == ST_D_STABLE && !skip_copy)

2
blockstore_flush.h

@ -43,7 +43,7 @@ class journal_flusher_co
std::vector<copy_buffer_t> v;
std::vector<copy_buffer_t>::iterator it;
int copy_count;
uint64_t offset, len, submit_offset, submit_len, clean_loc, old_clean_loc, old_clean_ver;
uint64_t offset, end_offset, submit_offset, submit_len, clean_loc, old_clean_loc, old_clean_ver;
flusher_meta_write_t meta_old, meta_new;
std::map<object_id, uint64_t>::iterator repeat_it;
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;

121
blockstore_read.cpp

@ -1,50 +1,36 @@
#include "blockstore.h"
int blockstore::fulfill_read_push(blockstore_operation *op, uint64_t &fulfilled, uint32_t item_start,
uint32_t item_state, uint64_t item_version, uint64_t item_location, uint32_t cur_start, uint32_t cur_end)
int blockstore::fulfill_read_push(blockstore_operation *op, void *buf, uint64_t offset, uint64_t len,
uint32_t item_state, uint64_t item_version)
{
if (cur_end > cur_start)
if (IS_IN_FLIGHT(item_state))
{
if (IS_IN_FLIGHT(item_state))
{
// Pause until it's written somewhere
op->wait_for = WAIT_IN_FLIGHT;
op->wait_detail = item_version;
return 0;
}
else if (IS_DELETE(item_state))
{
// item is unallocated - return zeroes
memset((uint8_t*)op->buf + cur_start - op->offset, 0, cur_end - cur_start);
return 1;
}
if (journal.inmemory && IS_JOURNAL(item_state))
{
iovec v = {
(uint8_t*)op->buf + cur_start - op->offset,
cur_end - cur_start
};
op->read_vec[cur_start] = v;
memcpy(v.iov_base, journal.buffer + item_location + cur_start - item_start, v.iov_len);
return 1;
}
BS_SUBMIT_GET_SQE(sqe, data);
data->iov = (struct iovec){
(uint8_t*)op->buf + cur_start - op->offset,
cur_end - cur_start
};
// FIXME: use simple std::vector instead of map for read_vec
op->read_vec[cur_start] = data->iov;
op->pending_ops++;
my_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal.fd : data_fd,
&data->iov, 1,
(IS_JOURNAL(item_state) ? journal.offset : data_offset) + item_location + cur_start - item_start
);
data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); };
fulfilled += cur_end-cur_start;
// Pause until it's written somewhere
op->wait_for = WAIT_IN_FLIGHT;
op->wait_detail = item_version;
return 0;
}
else if (IS_DELETE(item_state))
{
// item is unallocated - return zeroes
memset(buf, 0, len);
return 1;
}
if (journal.inmemory && IS_JOURNAL(item_state))
{
memcpy(buf, journal.buffer + offset, len);
return 1;
}
BS_SUBMIT_GET_SQE(sqe, data);
data->iov = (struct iovec){ buf, len };
op->pending_ops++;
my_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal.fd : data_fd,
&data->iov, 1,
(IS_JOURNAL(item_state) ? journal.offset : data_offset) + offset
);
data->callback = [this, op](ring_data_t *data) { handle_read_event(data, op); };
return 1;
}
@ -56,27 +42,28 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint64_t &fulfilled,
{
cur_start = cur_start < read_op->offset ? read_op->offset : cur_start;
item_end = item_end > read_op->offset + read_op->len ? read_op->offset + read_op->len : item_end;
auto fulfill_near = read_op->read_vec.lower_bound(cur_start);
if (fulfill_near != read_op->read_vec.begin())
{
fulfill_near--;
if (fulfill_near->first + fulfill_near->second.iov_len <= cur_start)
{
fulfill_near++;
}
}
while (fulfill_near != read_op->read_vec.end() && fulfill_near->first < item_end)
auto it = read_op->read_vec.begin();
while (1)
{
if (!fulfill_read_push(read_op, fulfilled, item_start, item_state, item_version, item_location, cur_start, fulfill_near->first))
for (; it != read_op->read_vec.end(); it++)
if (it->offset >= cur_start)
break;
if (it == read_op->read_vec.end() || it->offset > cur_start)
{
return 0;
fulfill_read_t el = {
.offset = cur_start,
.len = it == read_op->read_vec.end() || it->offset >= item_end ? item_end-cur_start : it->offset-cur_start,
};
it = read_op->read_vec.insert(it, el);
fulfilled += el.len;
if (!fulfill_read_push(read_op, read_op->buf + el.offset - read_op->offset, item_location + el.offset - item_start, el.len, item_state, item_version))
{
return 0;
}
}
cur_start = fulfill_near->first + fulfill_near->second.iov_len;
fulfill_near++;
}
if (!fulfill_read_push(read_op, fulfilled, item_start, item_state, item_version, item_location, cur_start, item_end))
{
return 0;
cur_start = it->offset + it->len;
if (it == read_op->read_vec.end() || cur_start >= item_end)
break;
}
}
return 1;
@ -141,10 +128,18 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
return 0;
}
}
if (!read_op->read_vec.size())
if (!read_op->pending_ops)
{
// region is not allocated - return zeroes
memset(read_op->buf, 0, read_op->len);
// everything is fulfilled from memory
if (!read_op->read_vec.size())
{
// region is not allocated - return zeroes
memset(read_op->buf, 0, read_op->len);
}
if (fulfilled != read_op->len)
{
printf("BUG: fulfilled %lu < %d read bytes\n", fulfilled, read_op->len);
}
read_op->retval = read_op->len;
read_op->callback(read_op);
return 1;

8
fio_engine.cpp

@ -194,11 +194,15 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
};
op->offset = io->offset % bsd->bs->block_size;
op->len = io->xfer_buflen;
op->callback = [io](blockstore_operation *op)
op->callback = [io, n](blockstore_operation *op)
{
io->error = op->retval < 0 ? -op->retval : 0;
bs_data *bsd = (bs_data*)io->engine_data;
bsd->inflight--;
bsd->completed.push_back(io);
#ifdef BLOCKSTORE_DEBUG
printf("--- OP_READ %llx n=%d retval=%d\n", io, n, op->retval);
#endif
delete op;
};
break;
@ -276,7 +280,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
}
#ifdef BLOCKSTORE_DEBUG
printf("+++ %s %llx\n", op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC", io);
printf("+++ %s %llx n=%d\n", op->flags == OP_READ ? "OP_READ" : (op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC"), io, n);
#endif
io->error = 0;
bsd->inflight++;

Loading…
Cancel
Save