Fix ringloop, implement first version of handle_event for reads

blocking-uring-test
Vitaliy Filippov 2019-11-05 14:10:23 +03:00
parent 82cf0a170e
commit 3f5ad16748
7 changed files with 54 additions and 14 deletions

View File

@ -1,4 +1,6 @@
all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_read.o crc32c.o ringloop.o test
clean:
rm -f *.o
crc32c.o: crc32c.c
gcc -c -o $@ $<
%.o: %.cpp

View File

@ -63,7 +63,24 @@ void blockstore::handle_event(ring_data_t *data)
}
else
{
struct blockstore_operation* op = (struct blockstore_operation*)data->op;
if ((op->flags & OP_TYPE_MASK) == OP_READ_DIRTY ||
(op->flags & OP_TYPE_MASK) == OP_READ)
{
op->pending_ops--;
if (data->res < 0)
{
// read error
op->retval = data->res;
}
if (op->pending_ops == 0)
{
if (op->retval == 0)
op->retval = op->len;
op->callback(op);
in_process_ops.erase(op);
}
}
}
}

View File

@ -121,6 +121,7 @@ public:
#define OP_SYNC 4
#define OP_STABLE 5
#define OP_DELETE 6
#define OP_TYPE_MASK 0x7
#define WAIT_SQE 1
#define WAIT_IN_FLIGHT 2
@ -135,9 +136,10 @@ struct blockstore_operation
uint32_t offset;
uint32_t len;
uint8_t *buf;
int retval;
std::map<uint64_t, struct iovec> read_vec;
int completed;
int pending_ops;
int wait_for;
uint64_t wait_version;
};
@ -171,7 +173,10 @@ public:
ring_loop_t *ringloop;
struct io_uring_sqe* get_sqe();
inline struct io_uring_sqe* get_sqe()
{
return ringloop->get_sqe(ring_consumer.number);
}
blockstore(spp::sparse_hash_map<std::string, std::string> & config, ring_loop_t *ringloop);
~blockstore();

View File

@ -32,7 +32,7 @@ int blockstore_init_meta::loop()
}
if (!submitted)
{
struct io_uring_sqe *sqe = bs->ringloop->get_sqe();
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read metadata");
@ -170,7 +170,7 @@ int blockstore_init_journal::loop()
if (step == 0)
{
// Step 1: Read first block of the journal
struct io_uring_sqe *sqe = bs->ringloop->get_sqe();
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read journal");
@ -194,7 +194,7 @@ int blockstore_init_journal::loop()
}
else
{
struct io_uring_sqe *sqe = bs->ringloop->get_sqe();
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read journal");

View File

@ -25,19 +25,19 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s
read_op->wait_for = WAIT_SQE;
return -1;
}
read_op->read_vec[cur_start] = (struct iovec){
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){
read_op->buf + cur_start - read_op->offset,
cur_end - cur_start
};
// Тут 2 вопроса - 1) куда сохранить iovec 2) как потом сопоставить i/o и cqe
read_op->read_vec[cur_start] = data->iov;
io_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal_fd : data_fd,
// FIXME: &read_op->read_vec is forbidden
&read_op->read_vec[cur_start], 1,
&data->iov, 1,
(IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start
);
((ring_data_t*)(sqe->user_data))->op = read_op;
data->op = read_op;
}
return 0;
}
@ -84,6 +84,7 @@ int blockstore::read(blockstore_operation *read_op)
{
// region is not allocated - return zeroes
memset(read_op->buf, 0, read_op->len);
read_op->retval = read_op->len;
read_op->callback(read_op);
return 0;
}
@ -94,7 +95,7 @@ int blockstore::read(blockstore_operation *read_op)
dirty_list dirty = dirty_it->second;
for (int i = dirty.size()-1; i >= 0; i--)
{
if (read_op->flags == OP_READ_DIRTY || IS_STABLE(dirty[i].state))
if ((read_op->flags & OP_TYPE_MASK) == OP_READ_DIRTY || IS_STABLE(dirty[i].state))
{
if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0)
{
@ -114,7 +115,7 @@ int blockstore::read(blockstore_operation *read_op)
// need to wait for something, undo added requests and requeue op
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
// FIXME: bad implementation
// FIXME: manage enqueue/dequeue/requeue
submit_queue.push_front(read_op);
return 0;
}
@ -123,10 +124,12 @@ int blockstore::read(blockstore_operation *read_op)
{
// region is not allocated - return zeroes
memset(read_op->buf, 0, read_op->len);
read_op->retval = read_op->len;
read_op->callback(read_op);
return 0;
}
// FIXME reap events!
read_op->retval = 0;
read_op->pending_ops = read_op->read_vec.size();
int ret = ringloop->submit();
if (ret < 0)
{

View File

@ -29,6 +29,18 @@ struct io_uring_sqe* ring_loop_t::get_sqe()
return sqe;
}
struct io_uring_sqe* ring_loop_t::get_sqe(int consumer)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
if (sqe)
{
struct ring_data_t *data = ring_data + (sqe - ring->sq.sqes);
io_uring_sqe_set_data(sqe, data);
data->source = consumer;
}
return sqe;
}
int ring_loop_t::register_consumer(ring_consumer_t & consumer)
{
consumer.number = consumers.size();

View File

@ -34,6 +34,7 @@ public:
ring_loop_t(int qd);
~ring_loop_t();
struct io_uring_sqe* get_sqe();
struct io_uring_sqe* get_sqe(int consumer);
int register_consumer(ring_consumer_t & consumer);
void unregister_consumer(int number);
void loop(bool sleep);