diff --git a/blockstore.cpp b/blockstore.cpp index 89969a0a..6b3f4f07 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -50,7 +50,7 @@ blockstore::~blockstore() // main event loop - handle requests void blockstore::handle_event(ring_data_t *data) { - if (initialized != 0) + if (initialized != 10) { if (metadata_init_reader) { @@ -119,6 +119,66 @@ void blockstore::loop() } else { - + // try to submit ops + auto op = submit_queue.begin(); + while (op != submit_queue.end()) + { + auto cur = op++; + if (((*cur)->flags & OP_TYPE_MASK) == OP_READ_DIRTY || + ((*cur)->flags & OP_TYPE_MASK) == OP_READ) + { + int dequeue_op = dequeue_read(*cur); + if (dequeue_op) + { + submit_queue.erase(cur); + } + else if ((*cur)->wait_for == WAIT_SQE) + { + // ring is full, stop submission + break; + } + } + } } } + +int blockstore::enqueue_op(blockstore_operation *op) +{ + if (op->offset >= block_size || op->len >= block_size-op->offset) + { + return -EINVAL; + } + submit_queue.push_back(op); + if ((op->flags & OP_TYPE_MASK) == OP_WRITE) + { + // Assign version number + auto dirty_it = dirty_queue.find(op->oid); + if (dirty_it != dirty_queue.end()) + { + op->version = (*dirty_it).back().version + 1; + } + else + { + auto clean_it = object_db.find(op->oid); + if (clean_it != object_db.end()) + { + op->version = (*clean_it).version + 1; + } + else + { + op->version = 1; + } + dirty_it = dirty_queue.emplace(op->oid, dirty_list()).first; + } + // Immediately add the operation into the dirty queue, so subsequent reads could see it + (*dirty_it).push_back((dirty_entry){ + .version = op->version, + .state = ST_IN_FLIGHT, + .flags = 0, + .location = 0, + .offset = op->offset, + .size = op->len, + }); + } + return 0; +} diff --git a/blockstore.h b/blockstore.h index c506f4e8..7e31514f 100644 --- a/blockstore.h +++ b/blockstore.h @@ -13,7 +13,7 @@ #include #include -#include +#include #include #include @@ -110,11 +110,26 @@ public: } }; -// SYNC must be submitted after previous WRITEs/DELETEs (not before!) -// READs to the same object must be submitted after previous WRITEs/DELETEs +// - Sync must be submitted after previous writes/deletes (not before!) +// - Reads to the same object must be submitted after previous writes/deletes +// are written (not necessarily synced) in their location. This is because we +// rely on read-modify-write for erasure coding and we must return new data +// to calculate parity for subsequent writes +// - Writes may be submitted in any order, because they don't overlap. Each write +// goes into a new location - either on the journal device or on the data device +// - Journal trim may be processed only after all versions are moved to +// the main storage AND after all read operations for older versions complete +// - If an operation can not be submitted because the ring is full +// we should stop submission of other operations. Otherwise some "scatter" reads +// may end up blocked for a long time. // Otherwise, the submit order is free, that is all operations may be submitted immediately // In fact, adding a write operation must immediately result in dirty_queue being populated +// write -> immediately add to dirty ops, immediately submit. postpone if ring full +// read -> check dirty ops, read or wait, remember max used journal offset, then unremember it +// sync -> take all current writes (inflight + pending), wait for them to finish, sync, move their state +// the question is: how to remember current writes. + #define OP_READ 1 #define OP_READ_DIRTY 2 #define OP_WRITE 3 @@ -154,7 +169,7 @@ class blockstore public: spp::sparse_hash_map object_db; spp::sparse_hash_map dirty_queue; - std::deque submit_queue; + std::list submit_queue; std::set in_process_ops; uint32_t block_order, block_size; uint64_t block_count; @@ -197,7 +212,8 @@ public: void loop(); // Read - int read(blockstore_operation *read_op); + int enqueue_op(blockstore_operation *op); + int dequeue_read(blockstore_operation *read_op); int fulfill_read(blockstore_operation *read_op, uint32_t item_start, uint32_t item_end, uint32_t item_state, uint64_t item_version, uint64_t item_location); int fulfill_read_push(blockstore_operation *read_op, uint32_t item_start, diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 710858f8..b2d1305b 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -76,21 +76,21 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start, return 0; } -int blockstore::read(blockstore_operation *read_op) +int blockstore::dequeue_read(blockstore_operation *read_op) { auto clean_it = object_db.find(read_op->oid); auto dirty_it = dirty_queue.find(read_op->oid); - if (clean_it == object_db.end() && dirty_it == object_db.end()) + if (clean_it == object_db.end() && dirty_it == dirty_queue.end()) { // region is not allocated - return zeroes memset(read_op->buf, 0, read_op->len); read_op->retval = read_op->len; read_op->callback(read_op); - return 0; + return 1; } unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail; uint64_t fulfilled = 0; - if (dirty_it != object_db.end()) + if (dirty_it != dirty_queue.end()) { dirty_list dirty = dirty_it->second; for (int i = dirty.size()-1; i >= 0; i--) @@ -99,10 +99,9 @@ int blockstore::read(blockstore_operation *read_op) { if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0) { - // need to wait for something, undo added requests and requeue op + // need to wait. undo added requests, don't dequeue op ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); - submit_queue.push_front(read_op); return 0; } } @@ -112,11 +111,9 @@ int blockstore::read(blockstore_operation *read_op) { if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0) { - // need to wait for something, undo added requests and requeue op + // need to wait. undo added requests, don't dequeue op ringloop->ring->sq.sqe_tail = prev_sqe_pos; read_op->read_vec.clear(); - // FIXME: manage enqueue/dequeue/requeue - submit_queue.push_front(read_op); return 0; } } @@ -126,14 +123,15 @@ int blockstore::read(blockstore_operation *read_op) memset(read_op->buf, 0, read_op->len); read_op->retval = read_op->len; read_op->callback(read_op); - return 0; + return 1; } read_op->retval = 0; read_op->pending_ops = read_op->read_vec.size(); + in_process_ops.insert(read_op); int ret = ringloop->submit(); if (ret < 0) { throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret)); } - return 0; + return 1; }