Browse Source

Begin sync implementation

blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
153de65ce7
  1. 22
      blockstore.cpp
  2. 15
      blockstore.h
  3. 63
      blockstore_write.cpp

22
blockstore.cpp

@ -115,6 +115,10 @@ void blockstore::handle_event(ring_data_t *data)
op->retval = op->len;
op->callback(op);
in_process_ops.erase(op);
unsynced_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
}
}
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
@ -165,6 +169,7 @@ void blockstore::loop()
{
// try to submit ops
auto cur = submit_queue.begin();
bool has_writes = false;
while (cur != submit_queue.end())
{
auto op_ptr = cur;
@ -243,6 +248,7 @@ void blockstore::loop()
// ring is full, stop submission
break;
}
has_writes = true;
}
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
{
@ -250,7 +256,21 @@ void blockstore::loop()
// wait for all big writes to complete, submit data device fsync
// wait for the data device fsync to complete, then submit journal writes for big writes
// then submit an fsync operation
if (has_writes)
{
// Can't submit SYNC before previous writes
continue;
}
int dequeue_op = dequeue_sync(op);
if (dequeue_op)
{
submit_queue.erase(op_ptr);
}
else if (op->wait_for == WAIT_SQE)
{
// ring is full, stop submission
break;
}
}
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{

15
blockstore.h

@ -13,6 +13,7 @@
#include <vector>
#include <map>
#include <deque>
#include <list>
#include <set>
#include <functional>
@ -53,6 +54,8 @@
#define IS_IN_FLIGHT(st) (st == ST_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_STABLE(st) (st >= ST_J_STABLE && st <= ST_J_MOVE_SYNCED || st >= ST_D_STABLE && st <= ST_D_META_COMMITTED || st >= ST_DEL_STABLE && st <= ST_DEL_MOVED || st == ST_CURRENT)
#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_MOVE_SYNCED)
#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_META_COMMITTED)
#define IS_UNSYNCED(st) (st == ST_J_WRITTEN || st >= ST_D_WRITTEN && st <= ST_D_META_WRITTEN || st == ST_DEL_WRITTEN)
// Default object size is 128 KB
#define DEFAULT_ORDER 17
@ -182,11 +185,16 @@ struct blockstore_operation
uint8_t *buf;
int retval;
std::map<uint64_t, struct iovec> read_vec;
int pending_ops;
// Wait status
int wait_for;
uint64_t wait_detail;
int pending_ops;
// FIXME make all of these pointers and put them into a union
std::map<uint64_t, struct iovec> read_vec;
uint64_t used_journal_sector;
std::deque<obj_ver_id> sync_writes;
bool has_big_writes;
};
class blockstore;
@ -197,9 +205,11 @@ class blockstore
{
struct ring_consumer_t ring_consumer;
public:
// Another option is https://github.com/algorithm-ninja/cpp-btree
spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
std::map<obj_ver_id, dirty_entry> dirty_db;
std::list<blockstore_operation*> submit_queue;
std::deque<obj_ver_id> unsynced_writes;
std::set<blockstore_operation*> in_process_ops;
uint32_t block_order, block_size;
uint64_t block_count;
@ -252,4 +262,5 @@ public:
int dequeue_write(blockstore_operation *op);
// Sync
int dequeue_sync(blockstore_operation *op);
};

63
blockstore_write.cpp

@ -142,3 +142,66 @@ int blockstore::dequeue_write(blockstore_operation *op)
}
return 1;
}
int blockstore::dequeue_sync(blockstore_operation *op)
{
op->has_big_writes = 0x10000;
op->sync_writes.swap(unsynced_writes);
unsynced_writes.clear();
auto it = sync_writes.begin();
while (it != sync_writes.end())
{
uint32_t state = dirty_db[*it].state;
if (IS_BIG_WRITE(state))
{
op->has_big_writes = op->has_big_writes < state ? op->has_big_writes : state;
}
it++;
}
if (op->has_big_writes == 0x10000 || op->has_big_writes == ST_D_META_WRITTEN)
{
// Just fsync the journal
struct io_uring_sqe *sqe = get_sqe();
if (!sqe)
{
// Pause until there are more requests available
op->wait_for = WAIT_SQE;
op->wait_detail = 1;
return 0;
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
io_uring_prep_fsync(sqe, journal.fd, 0);
data->op = op;
op->pending_ops = 1;
}
else if (op->has_big_writes == ST_D_WRITTEN)
{
// FIXME: try to remove duplicated get_sqe+!sqe+data code
// 1st step: fsync data
struct io_uring_sqe *sqe = get_sqe();
if (!sqe)
{
// Pause until there are more requests available
op->wait_for = WAIT_SQE;
op->wait_detail = 1;
return 0;
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
io_uring_prep_fsync(sqe, data_fd, 0);
data->op = op;
op->pending_ops = 1;
}
else if (op->has_big_writes == ST_D_SYNCED)
{
// 2nd step: Data device is synced, prepare & write journal entries
}
// FIXME: try to remove this duplicated code, too
in_process_ops.insert(op);
int ret = ringloop->submit();
if (ret < 0)
{
throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
return 1;
}

Loading…
Cancel
Save