Wait for writes to complete before issuing an fsync in blockstore_sync

Also fix a dormant bug (OP_SYNC could clear unsynced_*_writes and not be added into syncs_in_progress)
blocking-uring-test
Vitaliy Filippov 2020-01-29 16:40:11 +03:00
parent dcc9e75c63
commit 1e286eed08
2 changed files with 64 additions and 51 deletions

View File

@ -163,6 +163,7 @@ struct blockstore_op_private_t
// Sync // Sync
std::vector<obj_ver_id> sync_big_writes, sync_small_writes; std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
int sync_small_checked, sync_big_checked;
std::list<blockstore_op_t*>::iterator in_progress_ptr; std::list<blockstore_op_t*>::iterator in_progress_ptr;
int sync_state, prev_sync_count; int sync_state, prev_sync_count;
}; };

View File

@ -4,8 +4,10 @@
#define SYNC_HAS_BIG 2 #define SYNC_HAS_BIG 2
#define SYNC_DATA_SYNC_SENT 3 #define SYNC_DATA_SYNC_SENT 3
#define SYNC_DATA_SYNC_DONE 4 #define SYNC_DATA_SYNC_DONE 4
#define SYNC_JOURNAL_SYNC_SENT 5 #define SYNC_JOURNAL_WRITE_SENT 5
#define SYNC_DONE 6 #define SYNC_JOURNAL_WRITE_DONE 6
#define SYNC_JOURNAL_SYNC_SENT 7
#define SYNC_DONE 8
int blockstore_impl_t::dequeue_sync(blockstore_op_t *op) int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
{ {
@ -14,23 +16,25 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
stop_sync_submitted = false; stop_sync_submitted = false;
PRIV(op)->sync_big_writes.swap(unsynced_big_writes); PRIV(op)->sync_big_writes.swap(unsynced_big_writes);
PRIV(op)->sync_small_writes.swap(unsynced_small_writes); PRIV(op)->sync_small_writes.swap(unsynced_small_writes);
unsynced_big_writes.clear();
unsynced_small_writes.clear();
if (PRIV(op)->sync_big_writes.size() > 0) if (PRIV(op)->sync_big_writes.size() > 0)
PRIV(op)->sync_state = SYNC_HAS_BIG; PRIV(op)->sync_state = SYNC_HAS_BIG;
else if (PRIV(op)->sync_small_writes.size() > 0) else if (PRIV(op)->sync_small_writes.size() > 0)
PRIV(op)->sync_state = SYNC_HAS_SMALL; PRIV(op)->sync_state = SYNC_HAS_SMALL;
else else
PRIV(op)->sync_state = SYNC_DONE; PRIV(op)->sync_state = SYNC_DONE;
unsynced_big_writes.clear(); // Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes
unsynced_small_writes.clear(); PRIV(op)->prev_sync_count = in_progress_syncs.size();
PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
} }
int r = continue_sync(op); int r = continue_sync(op);
if (r) if (r)
{ {
PRIV(op)->prev_sync_count = in_progress_syncs.size();
PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
ack_sync(op); ack_sync(op);
} }
return r; // Always dequeue because we always add syncs to in_progress_syncs
return 1;
} }
int blockstore_impl_t::continue_sync(blockstore_op_t *op) int blockstore_impl_t::continue_sync(blockstore_op_t *op)
@ -39,51 +43,42 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
if (PRIV(op)->sync_state == SYNC_HAS_SMALL) if (PRIV(op)->sync_state == SYNC_HAS_SMALL)
{ {
// No big writes, just fsync the journal // No big writes, just fsync the journal
int n_sqes = disable_journal_fsync ? 0 : 1; for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
{
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
{
// Wait for small inflight writes to complete
return 0;
}
}
if (journal.sector_info[journal.cur_sector].dirty) if (journal.sector_info[journal.cur_sector].dirty)
{ {
n_sqes++; // Write out the last journal sector if it happens to be dirty
} BS_SUBMIT_GET_ONLY_SQE(sqe);
if (n_sqes > 0) prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
{ PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
io_uring_sqe* sqes[n_sqes]; PRIV(op)->pending_ops = 1;
for (int i = 0; i < n_sqes; i++) PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT;
{ return 1;
BS_SUBMIT_GET_SQE_DECL(sqes[i]);
}
int s = 0;
if (journal.sector_info[journal.cur_sector].dirty)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqes[s++], cb);
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
}
else
{
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
}
if (!disable_journal_fsync)
{
// FIXME: Wait for completion of writes before issuing an fsync
// Fsync and write requests posted at the same time can be reordered
ring_data_t *data = ((ring_data_t*)sqes[s]->user_data);
my_uring_prep_fsync(sqes[s++], journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = cb;
}
PRIV(op)->pending_ops = s;
PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT;
} }
else else
{ {
PRIV(op)->sync_state = SYNC_DONE; PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE;
} }
} }
else if (PRIV(op)->sync_state == SYNC_HAS_BIG) if (PRIV(op)->sync_state == SYNC_HAS_BIG)
{ {
for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++)
{
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_big_writes[PRIV(op)->sync_big_checked]].state))
{
// Wait for big inflight writes to complete
return 0;
}
}
// 1st step: fsync data // 1st step: fsync data
if (!disable_data_fsync) if (!disable_data_fsync)
{ {
// FIXME Wait for completion of writes before issuing an fsync
BS_SUBMIT_GET_SQE(sqe, data); BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, data_fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
@ -91,6 +86,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0; PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
PRIV(op)->pending_ops = 1; PRIV(op)->pending_ops = 1;
PRIV(op)->sync_state = SYNC_DATA_SYNC_SENT; PRIV(op)->sync_state = SYNC_DATA_SYNC_SENT;
return 1;
} }
else else
{ {
@ -99,6 +95,14 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
} }
if (PRIV(op)->sync_state == SYNC_DATA_SYNC_DONE) if (PRIV(op)->sync_state == SYNC_DATA_SYNC_DONE)
{ {
for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
{
if (IS_IN_FLIGHT(dirty_db[PRIV(op)->sync_small_writes[PRIV(op)->sync_small_checked]].state))
{
// Wait for small inflight writes to complete
return 0;
}
}
// 2nd step: Data device is synced, prepare & write journal entries // 2nd step: Data device is synced, prepare & write journal entries
// Check space in the journal and journal memory buffers // Check space in the journal and journal memory buffers
blockstore_journal_check_t space_check(this); blockstore_journal_check_t space_check(this);
@ -107,8 +111,8 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
return 0; return 0;
} }
// Get SQEs. Don't bother about merging, submit each journal sector as a separate request // Get SQEs. Don't bother about merging, submit each journal sector as a separate request
struct io_uring_sqe *sqe[space_check.sectors_required + (disable_journal_fsync ? 0 : 1)]; struct io_uring_sqe *sqe[space_check.sectors_required];
for (int i = 0; i < space_check.sectors_required + (disable_journal_fsync ? 0 : 1); i++) for (int i = 0; i < space_check.sectors_required; i++)
{ {
BS_SUBMIT_GET_SQE_DECL(sqe[i]); BS_SUBMIT_GET_SQE_DECL(sqe[i]);
} }
@ -150,22 +154,26 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
} }
} }
PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
// ... And a journal fsync PRIV(op)->pending_ops = s;
PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT;
return 1;
}
if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_DONE)
{
if (!disable_journal_fsync) if (!disable_journal_fsync)
{ {
// FIXME Wait for completion of writes before issuing an fsync BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe[s], journal.fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
struct ring_data_t *data = ((ring_data_t*)sqe[s]->user_data);
data->iov = { 0 }; data->iov = { 0 };
data->callback = cb; data->callback = cb;
PRIV(op)->pending_ops = 1 + s; PRIV(op)->pending_ops = 1;
PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT;
return 1;
} }
else else
{ {
PRIV(op)->pending_ops = s; PRIV(op)->sync_state = SYNC_DONE;
} }
PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT;
ringloop->submit();
} }
return 1; return 1;
} }
@ -190,6 +198,10 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op
{ {
PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE; PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE;
} }
else if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_SENT)
{
PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE;
}
else if (PRIV(op)->sync_state == SYNC_JOURNAL_SYNC_SENT) else if (PRIV(op)->sync_state == SYNC_JOURNAL_SYNC_SENT)
{ {
PRIV(op)->sync_state = SYNC_DONE; PRIV(op)->sync_state = SYNC_DONE;