diff --git a/blockstore.cpp b/blockstore.cpp index 7bef9d17..bd197b13 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -92,7 +92,6 @@ void blockstore::handle_event(ring_data_t *data) // write error // FIXME: our state becomes corrupted after a write error. maybe do something better than just die throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111"); - op->retval = data->res; } if (op->used_journal_sector > 0) { @@ -106,9 +105,26 @@ void blockstore::handle_event(ring_data_t *data) } if (op->pending_ops == 0) { - + // Acknowledge write without sync + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + dirty_it->second.state = (dirty_it->second.state == ST_J_SUBMITTED + ? ST_J_WRITTEN : (dirty_it->second.state == ST_DEL_SUBMITTED ? ST_DEL_WRITTEN : ST_D_WRITTEN)); + op->retval = op->len; + op->callback(op); + in_process_ops.erase(op); } } + else if ((op->flags & OP_TYPE_MASK) == OP_SYNC) + { + + } + else if ((op->flags & OP_TYPE_MASK) == OP_STABLE) + { + + } } } @@ -152,6 +168,14 @@ void blockstore::loop() while (op != submit_queue.end()) { auto cur = op++; + if ((*cur)->wait_for == WAIT_SQE) + { + + } + else if ((*cur)->wait_for == WAIT_IN_FLIGHT) + { + + } if (((*cur)->flags & OP_TYPE_MASK) == OP_READ_DIRTY || ((*cur)->flags & OP_TYPE_MASK) == OP_READ) { @@ -201,6 +225,7 @@ int blockstore::enqueue_op(blockstore_operation *op) // Basic verification not passed return -EINVAL; } + op->wait_for = 0; submit_queue.push_back(op); if ((op->flags & OP_TYPE_MASK) == OP_WRITE) { diff --git a/blockstore.h b/blockstore.h index cb266349..9705385e 100644 --- a/blockstore.h +++ b/blockstore.h @@ -25,25 +25,34 @@ // States are not stored on disk. Instead, they're deduced from the journal #define ST_IN_FLIGHT 1 -#define ST_J_WRITTEN 2 -#define ST_J_SYNCED 3 -#define ST_J_STABLE 4 -#define ST_J_MOVED 5 -#define ST_J_MOVE_SYNCED 6 -#define ST_D_WRITTEN 16 -#define ST_D_SYNCED 17 -#define ST_D_META_WRITTEN 18 -#define ST_D_META_SYNCED 19 -#define ST_D_STABLE 20 -#define ST_D_META_MOVED 21 -#define ST_D_META_COMMITTED 22 -#define ST_DEL_WRITTEN 23 -#define ST_DEL_SYNCED 24 -#define ST_DEL_STABLE 25 -#define ST_DEL_MOVED 26 -#define ST_CURRENT 32 -#define IS_STABLE(st) ((st) == 4 || (st) == 5 || (st) == 6 || (st) == 20 || (st) == 21 || (st) == 22 || (st) == 32 || (st) == 24 || (st) == 25) -#define IS_JOURNAL(st) (st >= 2 && st <= 6) + +#define ST_J_SUBMITTED 2 +#define ST_J_WRITTEN 3 +#define ST_J_SYNCED 4 +#define ST_J_STABLE 5 +#define ST_J_MOVED 6 +#define ST_J_MOVE_SYNCED 7 + +#define ST_D_SUBMITTED 16 +#define ST_D_WRITTEN 17 +#define ST_D_SYNCED 18 +#define ST_D_META_WRITTEN 19 +#define ST_D_META_SYNCED 20 +#define ST_D_STABLE 21 +#define ST_D_META_MOVED 22 +#define ST_D_META_COMMITTED 23 + +#define ST_DEL_SUBMITTED 32 +#define ST_DEL_WRITTEN 33 +#define ST_DEL_SYNCED 34 +#define ST_DEL_STABLE 35 +#define ST_DEL_MOVED 36 + +#define ST_CURRENT 48 + +#define IS_IN_FLIGHT(st) (st == ST_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED) +#define IS_STABLE(st) (st >= ST_J_STABLE && st <= ST_J_MOVE_SYNCED || st >= ST_D_STABLE && st <= ST_D_META_COMMITTED || st >= ST_DEL_STABLE && st <= ST_DEL_MOVED || st == ST_CURRENT) +#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_MOVE_SYNCED) // Default object size is 128 KB #define DEFAULT_ORDER 17 diff --git a/blockstore_read.cpp b/blockstore_read.cpp index c826bbd4..43eadbda 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -5,7 +5,7 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s { if (cur_end > cur_start) { - if (item_state == ST_IN_FLIGHT) + if (IS_IN_FLIGHT(item_state)) { // Pause until it's written somewhere read_op->wait_for = WAIT_IN_FLIGHT; @@ -78,6 +78,7 @@ int blockstore::fulfill_read(blockstore_operation *read_op, uint32_t item_start, int blockstore::dequeue_read(blockstore_operation *read_op) { + // FIXME: allow to read specific version auto clean_it = object_db.find(read_op->oid); auto dirty_it = dirty_db.upper_bound((obj_ver_id){ .oid = read_op->oid, diff --git a/blockstore_write.cpp b/blockstore_write.cpp index 6997f60e..5814c66f 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -27,7 +27,7 @@ int blockstore::dequeue_write(blockstore_operation *op) } struct ring_data_t *data = ((ring_data_t*)sqe->user_data); dirty_it->second.location = loc << block_order; - //dirty_it->second.state = ST_D_SUBMITTED; + dirty_it->second.state = ST_D_SUBMITTED; allocator_set(data_alloc, loc, true); data->iov = (struct iovec){ op->buf, op->len }; data->op = op; @@ -120,7 +120,7 @@ int blockstore::dequeue_write(blockstore_operation *op) sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free ); dirty_it->second.location = journal.next_free; - //dirty_it->second.state = ST_J_SUBMITTED; + dirty_it->second.state = ST_J_SUBMITTED; // Move journal.next_free and save last write for current sector journal.next_free += op->len; if (journal.next_free >= journal.len)