Rework write/sync ordering

Make syncs wait for all previous writes because it's the only way
to make sure that OSDs do not receive incomplete writes in LIST results
during peering when some writes are still in progress.

Also simplify blockstore submission queue logic.
Vitaliy Filippov 2021-03-06 17:42:09 +03:00
parent 21e7686037
commit 98f1e2c277
6 changed files with 83 additions and 141 deletions

View File

@ -101,21 +101,10 @@ void blockstore_impl_t::loop()
{ {
// try to submit ops // try to submit ops
unsigned initial_ring_space = ringloop->space_left(); unsigned initial_ring_space = ringloop->space_left();
// FIXME: rework this "sync polling"
auto cur_sync = in_progress_syncs.begin();
while (cur_sync != in_progress_syncs.end())
{
if (continue_sync(*cur_sync) != 2)
{
// List is unmodified
cur_sync++;
}
else
{
cur_sync = in_progress_syncs.begin();
}
}
auto cur = submit_queue.begin(); auto cur = submit_queue.begin();
// has_writes == 0 - no writes before the current queue item
// has_writes == 1 - some writes in progress
// has_writes == 2 - tried to submit some writes, but failed
int has_writes = 0; int has_writes = 0;
while (cur != submit_queue.end()) while (cur != submit_queue.end())
{ {
@ -142,10 +131,12 @@ void blockstore_impl_t::loop()
} }
unsigned ring_space = ringloop->space_left(); unsigned ring_space = ringloop->space_left();
unsigned prev_sqe_pos = ringloop->save(); unsigned prev_sqe_pos = ringloop->save();
bool dequeue_op = false; bool dequeue_op = false, cancel_op = false;
bool has_in_progress_sync = false;
if (op->opcode == BS_OP_READ) if (op->opcode == BS_OP_READ)
{ {
dequeue_op = dequeue_read(op); dequeue_op = dequeue_read(op);
cancel_op = !dequeue_op;
} }
else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE)
{ {
@ -154,8 +145,13 @@ void blockstore_impl_t::loop()
// Some writes already could not be submitted // Some writes already could not be submitted
continue; continue;
} }
dequeue_op = dequeue_write(op); int wr_st = dequeue_write(op);
has_writes = dequeue_op ? 1 : 2; // 0 = can't submit
// 1 = in progress
// 2 = completed, remove from queue
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
has_writes = wr_st > 0 ? 1 : 2;
} }
else if (op->opcode == BS_OP_DELETE) else if (op->opcode == BS_OP_DELETE)
{ {
@ -164,8 +160,10 @@ void blockstore_impl_t::loop()
// Some writes already could not be submitted // Some writes already could not be submitted
continue; continue;
} }
dequeue_op = dequeue_del(op); int wr_st = dequeue_del(op);
has_writes = dequeue_op ? 1 : 2; dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
has_writes = wr_st > 0 ? 1 : 2;
} }
else if (op->opcode == BS_OP_SYNC) else if (op->opcode == BS_OP_SYNC)
{ {
@ -178,29 +176,39 @@ void blockstore_impl_t::loop()
// Can't submit SYNC before previous writes // Can't submit SYNC before previous writes
continue; continue;
} }
dequeue_op = dequeue_sync(op); int wr_st = continue_sync(op, has_in_progress_sync);
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
if (dequeue_op != 2)
{
// Or we could just set has_writes=1...
has_in_progress_sync = true;
}
} }
else if (op->opcode == BS_OP_STABLE) else if (op->opcode == BS_OP_STABLE)
{ {
dequeue_op = dequeue_stable(op); int wr_st = dequeue_stable(op);
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
} }
else if (op->opcode == BS_OP_ROLLBACK) else if (op->opcode == BS_OP_ROLLBACK)
{ {
dequeue_op = dequeue_rollback(op); int wr_st = dequeue_rollback(op);
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
} }
else if (op->opcode == BS_OP_LIST) else if (op->opcode == BS_OP_LIST)
{ {
// LIST doesn't need to be blocked by previous modifications, // LIST doesn't need to be blocked by previous modifications
// it only needs to include all in-progress writes as they're guaranteed
// to be readable and stabilizable/rollbackable by subsequent operations
process_list(op); process_list(op);
dequeue_op = true; dequeue_op = true;
cancel_op = false;
} }
if (dequeue_op) if (dequeue_op)
{ {
submit_queue.erase(op_ptr); submit_queue.erase(op_ptr);
} }
else if (cancel_op)
{ {
ringloop->restore(prev_sqe_pos); ringloop->restore(prev_sqe_pos);
if (PRIV(op)->wait_for == WAIT_SQE) if (PRIV(op)->wait_for == WAIT_SQE)
@ -233,7 +241,7 @@ bool blockstore_impl_t::is_safe_to_stop()
{ {
// It's safe to stop blockstore when there are no in-flight operations, // It's safe to stop blockstore when there are no in-flight operations,
// no in-progress syncs and flusher isn't doing anything // no in-progress syncs and flusher isn't doing anything
if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || !readonly && flusher->is_active()) if (submit_queue.size() > 0 || !readonly && flusher->is_active())
{ {
return false; return false;
} }
@ -374,12 +382,6 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
std::function<void (blockstore_op_t*)>(op->callback)(op); std::function<void (blockstore_op_t*)>(op->callback)(op);
return; return;
} }
if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL)
{
op->retval = 0;
std::function<void (blockstore_op_t*)>(op->callback)(op);
return;
}
// Call constructor without allocating memory. We'll call destructor before returning op back // Call constructor without allocating memory. We'll call destructor before returning op back
new ((void*)op->private_data) blockstore_op_private_t; new ((void*)op->private_data) blockstore_op_private_t;
PRIV(op)->wait_for = 0; PRIV(op)->wait_for = 0;

View File

@ -160,8 +160,6 @@ struct blockstore_op_private_t
// Sync // Sync
std::vector<obj_ver_id> sync_big_writes, sync_small_writes; std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
int sync_small_checked, sync_big_checked; int sync_small_checked, sync_big_checked;
std::list<blockstore_op_t*>::iterator in_progress_ptr;
int prev_sync_count;
}; };
// https://github.com/algorithm-ninja/cpp-btree // https://github.com/algorithm-ninja/cpp-btree
@ -212,7 +210,6 @@ class blockstore_impl_t
blockstore_dirty_db_t dirty_db; blockstore_dirty_db_t dirty_db;
std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes; std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
std::list<blockstore_op_t*> in_progress_syncs; // ...and probably here, too
allocator *data_alloc = NULL; allocator *data_alloc = NULL;
uint8_t *zero_object; uint8_t *zero_object;
@ -279,11 +276,9 @@ class blockstore_impl_t
void handle_write_event(ring_data_t *data, blockstore_op_t *op); void handle_write_event(ring_data_t *data, blockstore_op_t *op);
// Sync // Sync
int dequeue_sync(blockstore_op_t *op); int continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync);
void handle_sync_event(ring_data_t *data, blockstore_op_t *op); void handle_sync_event(ring_data_t *data, blockstore_op_t *op);
int continue_sync(blockstore_op_t *op); void ack_sync(blockstore_op_t *op);
void ack_one_sync(blockstore_op_t *op);
int ack_sync(blockstore_op_t *op);
// Stabilize // Stabilize
int dequeue_stable(blockstore_op_t *op); int dequeue_stable(blockstore_op_t *op);

View File

@ -50,7 +50,7 @@ skip_ov:
{ {
op->retval = -EBUSY; op->retval = -EBUSY;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
if (dirty_it == dirty_db.begin()) if (dirty_it == dirty_db.begin())
{ {
@ -66,7 +66,7 @@ skip_ov:
// Already rolled back // Already rolled back
op->retval = 0; op->retval = 0;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
// Check journal space // Check journal space
blockstore_journal_check_t space_check(this); blockstore_journal_check_t space_check(this);
@ -151,7 +151,7 @@ resume_5:
// Acknowledge op // Acknowledge op
op->retval = 0; op->retval = 0;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov) void blockstore_impl_t::mark_rolled_back(const obj_ver_id & ov)
@ -216,10 +216,7 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t
if (PRIV(op)->pending_ops == 0) if (PRIV(op)->pending_ops == 0)
{ {
PRIV(op)->op_state++; PRIV(op)->op_state++;
if (!continue_rollback(op)) ringloop->wakeup();
{
submit_queue.push_front(op);
}
} }
} }

View File

@ -60,7 +60,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
// No such object version // No such object version
op->retval = -ENOENT; op->retval = -ENOENT;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
else else
{ {
@ -77,7 +77,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
// Object not synced yet. Caller must sync it first // Object not synced yet. Caller must sync it first
op->retval = -EBUSY; op->retval = -EBUSY;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
else if (!IS_STABLE(dirty_it->second.state)) else if (!IS_STABLE(dirty_it->second.state))
{ {
@ -89,7 +89,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
// Already stable // Already stable
op->retval = 0; op->retval = 0;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
// Check journal space // Check journal space
blockstore_journal_check_t space_check(this); blockstore_journal_check_t space_check(this);
@ -176,7 +176,7 @@ resume_5:
// Acknowledge op // Acknowledge op
op->retval = 0; op->retval = 0;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
void blockstore_impl_t::mark_stable(const obj_ver_id & v) void blockstore_impl_t::mark_stable(const obj_ver_id & v)
@ -228,9 +228,6 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
if (PRIV(op)->pending_ops == 0) if (PRIV(op)->pending_ops == 0)
{ {
PRIV(op)->op_state++; PRIV(op)->op_state++;
if (!continue_stable(op)) ringloop->wakeup();
{
submit_queue.push_front(op);
}
} }
} }

View File

@ -12,8 +12,15 @@
#define SYNC_JOURNAL_SYNC_SENT 7 #define SYNC_JOURNAL_SYNC_SENT 7
#define SYNC_DONE 8 #define SYNC_DONE 8
int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) int blockstore_impl_t::continue_sync(blockstore_op_t *op, bool queue_has_in_progress_sync)
{ {
if (immediate_commit == IMMEDIATE_ALL)
{
// We can return immediately because sync is only dequeued after all previous writes
op->retval = 0;
FINISH_OP(op);
return 2;
}
if (PRIV(op)->op_state == 0) if (PRIV(op)->op_state == 0)
{ {
stop_sync_submitted = false; stop_sync_submitted = false;
@ -29,34 +36,15 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
PRIV(op)->op_state = SYNC_HAS_SMALL; PRIV(op)->op_state = SYNC_HAS_SMALL;
else else
PRIV(op)->op_state = SYNC_DONE; PRIV(op)->op_state = SYNC_DONE;
// Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes
PRIV(op)->prev_sync_count = in_progress_syncs.size();
PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
} }
continue_sync(op);
// Always dequeue because we always add syncs to in_progress_syncs
return 1;
}
int blockstore_impl_t::continue_sync(blockstore_op_t *op)
{
auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
if (PRIV(op)->op_state == SYNC_HAS_SMALL) if (PRIV(op)->op_state == SYNC_HAS_SMALL)
{ {
// No big writes, just fsync the journal // No big writes, just fsync the journal
for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
{
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
{
// Wait for small inflight writes to complete
return 0;
}
}
if (journal.sector_info[journal.cur_sector].dirty) if (journal.sector_info[journal.cur_sector].dirty)
{ {
// Write out the last journal sector if it happens to be dirty // Write out the last journal sector if it happens to be dirty
BS_SUBMIT_GET_ONLY_SQE(sqe); BS_SUBMIT_GET_ONLY_SQE(sqe);
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb); prepare_journal_sector_write(journal, journal.cur_sector, sqe, [this, op](ring_data_t *data) { handle_sync_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT; PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
@ -69,21 +57,13 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
} }
if (PRIV(op)->op_state == SYNC_HAS_BIG) if (PRIV(op)->op_state == SYNC_HAS_BIG)
{ {
for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++)
{
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_big_writes[PRIV(op)->sync_big_checked]].state))
{
// Wait for big inflight writes to complete
return 0;
}
}
// 1st step: fsync data // 1st step: fsync data
if (!disable_data_fsync) if (!disable_data_fsync)
{ {
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
data->callback = cb; data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0; PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_DATA_SYNC_SENT; PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
@ -96,14 +76,6 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
} }
if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE) if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE)
{ {
for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
{
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
{
// Wait for small inflight writes to complete
return 0;
}
}
// 2nd step: Data device is synced, prepare & write journal entries // 2nd step: Data device is synced, prepare & write journal entries
// Check space in the journal and journal memory buffers // Check space in the journal and journal memory buffers
blockstore_journal_check_t space_check(this); blockstore_journal_check_t space_check(this);
@ -127,7 +99,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
{ {
if (cur_sector == -1) if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
cur_sector = journal.cur_sector; cur_sector = journal.cur_sector;
} }
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
@ -152,7 +124,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
journal.crc32_last = je->crc32; journal.crc32_last = je->crc32;
it++; it++;
} }
prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], cb); prepare_journal_sector_write(journal, journal.cur_sector, sqe[s++], [this, op](ring_data_t *data) { handle_sync_event(data, op); });
assert(s == space_check.sectors_to_write); assert(s == space_check.sectors_to_write);
if (cur_sector == -1) if (cur_sector == -1)
PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector; PRIV(op)->min_flushed_journal_sector = 1 + journal.cur_sector;
@ -168,7 +140,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
data->callback = cb; data->callback = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT; PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
return 1; return 1;
@ -178,9 +150,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
PRIV(op)->op_state = SYNC_DONE; PRIV(op)->op_state = SYNC_DONE;
} }
} }
if (PRIV(op)->op_state == SYNC_DONE) if (PRIV(op)->op_state == SYNC_DONE && !queue_has_in_progress_sync)
{ {
return ack_sync(op); ack_sync(op);
return 2;
} }
return 1; return 1;
} }
@ -212,42 +185,16 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op
else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT) else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
{ {
PRIV(op)->op_state = SYNC_DONE; PRIV(op)->op_state = SYNC_DONE;
ack_sync(op);
} }
else else
{ {
throw std::runtime_error("BUG: unexpected sync op state"); throw std::runtime_error("BUG: unexpected sync op state");
} }
ringloop->wakeup();
} }
} }
int blockstore_impl_t::ack_sync(blockstore_op_t *op) void blockstore_impl_t::ack_sync(blockstore_op_t *op)
{
if (PRIV(op)->op_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0)
{
// Remove dependency of subsequent syncs
auto it = PRIV(op)->in_progress_ptr;
int done_syncs = 1;
++it;
// Acknowledge sync
ack_one_sync(op);
while (it != in_progress_syncs.end())
{
auto & next_sync = *it++;
PRIV(next_sync)->prev_sync_count -= done_syncs;
if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->op_state == SYNC_DONE)
{
done_syncs++;
// Acknowledge next_sync
ack_one_sync(next_sync);
}
}
return 2;
}
return 0;
}
void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
{ {
// Handle states // Handle states
for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++) for (auto it = PRIV(op)->sync_big_writes.begin(); it != PRIV(op)->sync_big_writes.end(); it++)
@ -295,7 +242,6 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
} }
} }
} }
in_progress_syncs.erase(PRIV(op)->in_progress_ptr);
op->retval = 0; op->retval = 0;
FINISH_OP(op); FINISH_OP(op);
} }

View File

@ -170,7 +170,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{ {
// This is the flag value used to cancel operations // This is the flag value used to cancel operations
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
// Restore original low version number for unblocked operations // Restore original low version number for unblocked operations
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
@ -183,7 +183,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
// Original version is still invalid // Original version is still invalid
// All subsequent writes to the same object must be canceled too // All subsequent writes to the same object must be canceled too
cancel_all_writes(op, dirty_it, -EEXIST); cancel_all_writes(op, dirty_it, -EEXIST);
return 1; return 2;
} }
op->version = PRIV(op)->real_version; op->version = PRIV(op)->real_version;
PRIV(op)->real_version = 0; PRIV(op)->real_version = 0;
@ -217,7 +217,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
return 0; return 0;
} }
cancel_all_writes(op, dirty_it, -ENOSPC); cancel_all_writes(op, dirty_it, -ENOSPC);
return 1; return 2;
} }
write_iodepth++; write_iodepth++;
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
@ -370,7 +370,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
if (!PRIV(op)->pending_ops) if (!PRIV(op)->pending_ops)
{ {
PRIV(op)->op_state = 4; PRIV(op)->op_state = 4;
continue_write(op); return continue_write(op);
} }
else else
{ {
@ -384,17 +384,21 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
{ {
io_uring_sqe *sqe = NULL; io_uring_sqe *sqe = NULL;
journal_entry_big_write *je; journal_entry_big_write *je;
int op_state = PRIV(op)->op_state;
if (op_state != 2 && op_state != 4)
{
// In progress
return 1;
}
auto dirty_it = dirty_db.find((obj_ver_id){ auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid, .oid = op->oid,
.version = op->version, .version = op->version,
}); });
assert(dirty_it != dirty_db.end()); assert(dirty_it != dirty_db.end());
if (PRIV(op)->op_state == 2) if (op_state == 2)
goto resume_2; goto resume_2;
else if (PRIV(op)->op_state == 4) else if (op_state == 4)
goto resume_4; goto resume_4;
else
return 1;
resume_2: resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry // Only for the immediate_commit mode: prepare and submit big_write journal entry
sqe = get_sqe(); sqe = get_sqe();
@ -464,7 +468,7 @@ resume_4:
op->retval = op->len; op->retval = op->len;
write_iodepth--; write_iodepth--;
FINISH_OP(op); FINISH_OP(op);
return 1; return 2;
} }
void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op) void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op)
@ -483,10 +487,7 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
{ {
release_journal_sectors(op); release_journal_sectors(op);
PRIV(op)->op_state++; PRIV(op)->op_state++;
if (!continue_write(op)) ringloop->wakeup();
{
submit_queue.push_front(op);
}
} }
} }
@ -524,6 +525,10 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
int blockstore_impl_t::dequeue_del(blockstore_op_t *op) int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
{ {
if (PRIV(op)->op_state)
{
return continue_write(op);
}
auto dirty_it = dirty_db.find((obj_ver_id){ auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid, .oid = op->oid,
.version = op->version, .version = op->version,
@ -593,7 +598,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
if (!PRIV(op)->pending_ops) if (!PRIV(op)->pending_ops)
{ {
PRIV(op)->op_state = 4; PRIV(op)->op_state = 4;
continue_write(op); return continue_write(op);
} }
else else
{ {