Browse Source

Ack writes

blocking-uring-test
Vitaliy Filippov 2 years ago
parent
commit
a5f2d8b85e
  1. 29
      blockstore.cpp
  2. 47
      blockstore.h
  3. 3
      blockstore_read.cpp
  4. 4
      blockstore_write.cpp

29
blockstore.cpp

@ -92,7 +92,6 @@ void blockstore::handle_event(ring_data_t *data)
// write error
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
op->retval = data->res;
}
if (op->used_journal_sector > 0)
{
@ -106,9 +105,26 @@ void blockstore::handle_event(ring_data_t *data)
}
if (op->pending_ops == 0)
{
// Acknowledge write without sync
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
dirty_it->second.state = (dirty_it->second.state == ST_J_SUBMITTED
? ST_J_WRITTEN : (dirty_it->second.state == ST_DEL_SUBMITTED ? ST_DEL_WRITTEN : ST_D_WRITTEN));
op->retval = op->len;
op->callback(op);
in_process_ops.erase(op);
}
}
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
{
}
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{
}
}
}
@ -152,6 +168,14 @@ void blockstore::loop()
while (op != submit_queue.end())
{
auto cur = op++;
if ((*cur)->wait_for == WAIT_SQE)
{
}
else if ((*cur)->wait_for == WAIT_IN_FLIGHT)
{
}
if (((*cur)->flags & OP_TYPE_MASK) == OP_READ_DIRTY ||
((*cur)->flags & OP_TYPE_MASK) == OP_READ)
{
@ -201,6 +225,7 @@ int blockstore::enqueue_op(blockstore_operation *op)
// Basic verification not passed
return -EINVAL;
}
op->wait_for = 0;
submit_queue.push_back(op);
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
{

47
blockstore.h

@ -25,25 +25,34 @@
// States are not stored on disk. Instead, they're deduced from the journal
#define ST_IN_FLIGHT 1
#define ST_J_WRITTEN 2
#define ST_J_SYNCED 3
#define ST_J_STABLE 4
#define ST_J_MOVED 5
#define ST_J_MOVE_SYNCED 6
#define ST_D_WRITTEN 16
#define ST_D_SYNCED 17
#define ST_D_META_WRITTEN 18
#define ST_D_META_SYNCED 19
#define ST_D_STABLE 20
#define ST_D_META_MOVED 21
#define ST_D_META_COMMITTED 22
#define ST_DEL_WRITTEN 23
#define ST_DEL_SYNCED 24
#define ST_DEL_STABLE 25
#define ST_DEL_MOVED 26
#define ST_CURRENT 32
#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32 || (st) == 24 || (st) == 25)
#define IS_JOURNAL(st) (st >= 2 && st <= 6)
#define ST_J_SUBMITTED 2
#define ST_J_WRITTEN 3
#define ST_J_SYNCED 4
#define ST_J_STABLE 5
#define ST_J_MOVED 6
#define ST_J_MOVE_SYNCED 7
#define ST_D_SUBMITTED 16
#define ST_D_WRITTEN 17
#define ST_D_SYNCED 18
#define ST_D_META_WRITTEN 19
#define ST_D_META_SYNCED 20
#define ST_D_STABLE 21
#define ST_D_META_MOVED 22
#define ST_D_META_COMMITTED 23
#define ST_DEL_SUBMITTED 32
#define ST_DEL_WRITTEN 33
#define ST_DEL_SYNCED 34
#define ST_DEL_STABLE 35
#define ST_DEL_MOVED 36
#define ST_CURRENT 48
#define IS_IN_FLIGHT(st) (st == ST_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_STABLE(st) (st >= ST_J_STABLE && st <= ST_J_MOVE_SYNCED || st >= ST_D_STABLE && st <= ST_D_META_COMMITTED || st >= ST_DEL_STABLE && st <= ST_DEL_MOVED || st == ST_CURRENT)
#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_MOVE_SYNCED)
// Default object size is 128 KB
#define DEFAULT_ORDER 17

3
blockstore_read.cpp

@ -5,7 +5,7 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s
{
if (cur_end > cur_start)
{
if (item_state == ST_IN_FLIGHT)
if (IS_IN_FLIGHT(item_state))
{
// Pause until it's written somewhere
read_op->wait_for = WAIT_IN_FLIGHT;
@ -78,6 +78,7 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start,
int blockstore::dequeue_read(blockstore_operation *read_op)
{
// FIXME: allow to read specific version
auto clean_it = object_db.find(read_op->oid);
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
.oid = read_op->oid,

4
blockstore_write.cpp

@ -27,7 +27,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
dirty_it->second.location = loc << block_order;
//dirty_it->second.state = ST_D_SUBMITTED;
dirty_it->second.state = ST_D_SUBMITTED;
allocator_set(data_alloc, loc, true);
data->iov = (struct iovec){ op->buf, op->len };
data->op = op;
@ -120,7 +120,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
);
dirty_it->second.location = journal.next_free;
//dirty_it->second.state = ST_J_SUBMITTED;
dirty_it->second.state = ST_J_SUBMITTED;
// Move journal.next_free and save last write for current sector
journal.next_free += op->len;
if (journal.next_free >= journal.len)

Loading…
Cancel
Save