Begin implementation of enqueue/dequeue

blocking-uring-test
Vitaliy Filippov 2019-11-06 19:27:48 +03:00
parent 3f5ad16748
commit 2f18a3d19e
3 changed files with 92 additions and 18 deletions

View File

@ -50,7 +50,7 @@ blockstore::~blockstore()
// main event loop - handle requests
void blockstore::handle_event(ring_data_t *data)
{
if (initialized != 0)
if (initialized != 10)
{
if (metadata_init_reader)
{
@ -119,6 +119,66 @@ void blockstore::loop()
}
else
{
// try to submit ops
auto op = submit_queue.begin();
while (op != submit_queue.end())
{
auto cur = op++;
if (((*cur)->flags & OP_TYPE_MASK) == OP_READ_DIRTY ||
((*cur)->flags & OP_TYPE_MASK) == OP_READ)
{
int dequeue_op = dequeue_read(*cur);
if (dequeue_op)
{
submit_queue.erase(cur);
}
else if ((*cur)->wait_for == WAIT_SQE)
{
// ring is full, stop submission
break;
}
}
}
}
}
int blockstore::enqueue_op(blockstore_operation *op)
{
if (op->offset >= block_size || op->len >= block_size-op->offset)
{
return -EINVAL;
}
submit_queue.push_back(op);
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
{
// Assign version number
auto dirty_it = dirty_queue.find(op->oid);
if (dirty_it != dirty_queue.end())
{
op->version = (*dirty_it).back().version + 1;
}
else
{
auto clean_it = object_db.find(op->oid);
if (clean_it != object_db.end())
{
op->version = (*clean_it).version + 1;
}
else
{
op->version = 1;
}
dirty_it = dirty_queue.emplace(op->oid, dirty_list()).first;
}
// Immediately add the operation into the dirty queue, so subsequent reads could see it
(*dirty_it).push_back((dirty_entry){
.version = op->version,
.state = ST_IN_FLIGHT,
.flags = 0,
.location = 0,
.offset = op->offset,
.size = op->len,
});
}
return 0;
}

View File

@ -13,7 +13,7 @@
#include <vector>
#include <map>
#include <deque>
#include <list>
#include <set>
#include <functional>
@ -110,11 +110,26 @@ public:
}
};
// SYNC must be submitted after previous WRITEs/DELETEs (not before!)
// READs to the same object must be submitted after previous WRITEs/DELETEs
// - Sync must be submitted after previous writes/deletes (not before!)
// - Reads to the same object must be submitted after previous writes/deletes
// are written (not necessarily synced) in their location. This is because we
// rely on read-modify-write for erasure coding and we must return new data
// to calculate parity for subsequent writes
// - Writes may be submitted in any order, because they don't overlap. Each write
// goes into a new location - either on the journal device or on the data device
// - Journal trim may be processed only after all versions are moved to
// the main storage AND after all read operations for older versions complete
// - If an operation can not be submitted because the ring is full
// we should stop submission of other operations. Otherwise some "scatter" reads
// may end up blocked for a long time.
// Otherwise, the submit order is free, that is all operations may be submitted immediately
// In fact, adding a write operation must immediately result in dirty_queue being populated
// write -> immediately add to dirty ops, immediately submit. postpone if ring full
// read -> check dirty ops, read or wait, remember max used journal offset, then unremember it
// sync -> take all current writes (inflight + pending), wait for them to finish, sync, move their state
// the question is: how to remember current writes.
#define OP_READ 1
#define OP_READ_DIRTY 2
#define OP_WRITE 3
@ -154,7 +169,7 @@ class blockstore
public:
spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
spp::sparse_hash_map<object_id, dirty_list, oid_hash> dirty_queue;
std::deque<blockstore_operation*> submit_queue;
std::list<blockstore_operation*> submit_queue;
std::set<blockstore_operation*> in_process_ops;
uint32_t block_order, block_size;
uint64_t block_count;
@ -197,7 +212,8 @@ public:
void loop();
// Read
int read(blockstore_operation *read_op);
int enqueue_op(blockstore_operation *op);
int dequeue_read(blockstore_operation *read_op);
int fulfill_read(blockstore_operation *read_op, uint32_t item_start, uint32_t item_end,
uint32_t item_state, uint64_t item_version, uint64_t item_location);
int fulfill_read_push(blockstore_operation *read_op, uint32_t item_start,

View File

@ -76,21 +76,21 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start,
return 0;
}
int blockstore::read(blockstore_operation *read_op)
int blockstore::dequeue_read(blockstore_operation *read_op)
{
auto clean_it = object_db.find(read_op->oid);
auto dirty_it = dirty_queue.find(read_op->oid);
if (clean_it == object_db.end() && dirty_it == object_db.end())
if (clean_it == object_db.end() && dirty_it == dirty_queue.end())
{
// region is not allocated - return zeroes
memset(read_op->buf, 0, read_op->len);
read_op->retval = read_op->len;
read_op->callback(read_op);
return 0;
return 1;
}
unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail;
uint64_t fulfilled = 0;
if (dirty_it != object_db.end())
if (dirty_it != dirty_queue.end())
{
dirty_list dirty = dirty_it->second;
for (int i = dirty.size()-1; i >= 0; i--)
@ -99,10 +99,9 @@ int blockstore::read(blockstore_operation *read_op)
{
if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0)
{
// need to wait for something, undo added requests and requeue op
// need to wait. undo added requests, don't dequeue op
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
submit_queue.push_front(read_op);
return 0;
}
}
@ -112,11 +111,9 @@ int blockstore::read(blockstore_operation *read_op)
{
if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0)
{
// need to wait for something, undo added requests and requeue op
// need to wait. undo added requests, don't dequeue op
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
// FIXME: manage enqueue/dequeue/requeue
submit_queue.push_front(read_op);
return 0;
}
}
@ -126,14 +123,15 @@ int blockstore::read(blockstore_operation *read_op)
memset(read_op->buf, 0, read_op->len);
read_op->retval = read_op->len;
read_op->callback(read_op);
return 0;
return 1;
}
read_op->retval = 0;
read_op->pending_ops = read_op->read_vec.size();
in_process_ops.insert(read_op);
int ret = ringloop->submit();
if (ret < 0)
{
throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
return 0;
return 1;
}