Browse Source

Check for op->wait_for conditions

It's almost identical to just re-submit... so maybe it was pointless
blocking-uring-test
Vitaliy Filippov 2 years ago
parent
commit
90f081f398
  1. 86
      blockstore.cpp
  2. 9
      blockstore_read.cpp
  3. 6
      blockstore_write.cpp

86
blockstore.cpp

@ -164,51 +164,95 @@ void blockstore::loop()
else
{
// try to submit ops
auto op = submit_queue.begin();
while (op != submit_queue.end())
auto cur = submit_queue.begin();
while (cur != submit_queue.end())
{
auto cur = op++;
if ((*cur)->wait_for == WAIT_SQE)
auto op_ptr = cur;
auto op = *(cur++);
if (op->wait_for)
{
}
else if ((*cur)->wait_for == WAIT_IN_FLIGHT)
{
if (op->wait_for == WAIT_SQE)
{
if (io_uring_sq_space_left(ringloop->ring) < op->wait_detail)
{
// stop submission if there's still no free space
break;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_IN_FLIGHT)
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->wait_detail,
});
if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state))
{
// do not submit
continue;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_JOURNAL)
{
if (journal.used_start < op->wait_detail)
{
// do not submit
continue;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_JOURNAL_BUFFER)
{
if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0)
{
// do not submit
continue;
}
op->wait_for = 0;
}
else
{
throw new std::runtime_error("BUG: op->wait_for value is unexpected");
}
}
if (((*cur)->flags & OP_TYPE_MASK) == OP_READ_DIRTY ||
((*cur)->flags & OP_TYPE_MASK) == OP_READ)
if ((op->flags & OP_TYPE_MASK) == OP_READ_DIRTY ||
(op->flags & OP_TYPE_MASK) == OP_READ)
{
int dequeue_op = dequeue_read(*cur);
int dequeue_op = dequeue_read(op);
if (dequeue_op)
{
submit_queue.erase(cur);
submit_queue.erase(op_ptr);
}
else if ((*cur)->wait_for == WAIT_SQE)
else if (op->wait_for == WAIT_SQE)
{
// ring is full, stop submission
break;
}
}
else if (((*cur)->flags & OP_TYPE_MASK) == OP_WRITE ||
((*cur)->flags & OP_TYPE_MASK) == OP_DELETE)
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
{
int dequeue_op = dequeue_write(*cur);
int dequeue_op = dequeue_write(op);
if (dequeue_op)
{
submit_queue.erase(cur);
submit_queue.erase(op_ptr);
}
else if ((*cur)->wait_for == WAIT_SQE)
else if (op->wait_for == WAIT_SQE)
{
// ring is full, stop submission
break;
}
}
else if (((*cur)->flags & OP_TYPE_MASK) == OP_SYNC)
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
{
// wait for all small writes to be submitted
// wait for all big writes to complete, submit data device fsync
// wait for the data device fsync to complete, then submit journal writes for big writes
// then submit an fsync operation
}
else if (((*cur)->flags & OP_TYPE_MASK) == OP_STABLE)
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{
}

9
blockstore_read.cpp

@ -96,6 +96,7 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
return 1;
}
unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail;
unsigned ring_space = io_uring_sq_space_left(ringloop->ring);
uint64_t fulfilled = 0;
if (dirty_found)
{
@ -108,6 +109,10 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
dirty.state, dirty_it->first.version, dirty.location) < 0)
{
// need to wait. undo added requests, don't dequeue op
if (read_op->wait_for == WAIT_SQE)
{
read_op->wait_detail = 1 + ring_space;
}
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
return 0;
@ -121,6 +126,10 @@ int blockstore::dequeue_read(blockstore_operation *read_op)
if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0)
{
// need to wait. undo added requests, don't dequeue op
if (read_op->wait_for == WAIT_SQE)
{
read_op->wait_detail = 1 + ring_space;
}
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
return 0;

6
blockstore_write.cpp

@ -23,6 +23,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
{
// Pause until there are more requests available
op->wait_for = WAIT_SQE;
op->wait_detail = 1;
return 0;
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
@ -67,15 +68,18 @@ int blockstore::dequeue_write(blockstore_operation *op)
{
// No space in the journal. Wait for it.
op->wait_for = WAIT_JOURNAL;
op->wait_detail = next_pos - journal.used_start;
op->wait_detail = next_pos;
return 0;
}
// There is sufficient space. Get SQE(s)
unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail;
struct io_uring_sqe *sqe1 = get_sqe(), *sqe2 = two_sqes ? get_sqe() : NULL;
if (!sqe1 || two_sqes && !sqe2)
{
// Pause until there are more requests available
op->wait_for = WAIT_SQE;
op->wait_detail = two_sqes ? 2 : 1;
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
return 0;
}
struct ring_data_t *data1 = ((ring_data_t*)sqe1->user_data);

Loading…
Cancel
Save