Fsync data before writing metadata

blocking-uring-test
Vitaliy Filippov 2019-12-02 23:16:19 +03:00
parent a165909745
commit aa3b252327
3 changed files with 49 additions and 43 deletions

View File

@ -272,6 +272,7 @@ class blockstore
uint64_t data_offset, data_size, data_len;
bool readonly = false;
// FIXME: separate flags for data, metadata and journal
bool disable_fsync = false;
bool inmemory_meta = false;
void *metadata_buffer = NULL;

View File

@ -5,7 +5,6 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore *bs)
this->bs = bs;
this->flusher_count = flusher_count;
active_flushers = 0;
active_until_sync = 0;
sync_threshold = flusher_count == 1 ? 1 : flusher_count/2;
journal_trim_interval = sync_threshold;
journal_trim_counter = 0;
@ -133,8 +132,6 @@ bool journal_flusher_co::loop()
goto resume_9;
else if (wait_state == 10)
goto resume_10;
else if (wait_state == 11)
goto resume_11;
else if (wait_state == 12)
goto resume_12;
else if (wait_state == 13)
@ -143,6 +140,12 @@ bool journal_flusher_co::loop()
goto resume_14;
else if (wait_state == 15)
goto resume_15;
else if (wait_state == 16)
goto resume_16;
else if (wait_state == 17)
goto resume_17;
else if (wait_state == 18)
goto resume_18;
resume_0:
if (!flusher->flush_queue.size())
{
@ -177,7 +180,6 @@ resume_0:
#endif
dirty_it = dirty_end;
flusher->active_flushers++;
flusher->active_until_sync++;
v.clear();
wait_count = 0;
copy_count = 0;
@ -261,7 +263,6 @@ resume_0:
{
// Nothing to flush
flusher->active_flushers--;
flusher->active_until_sync--;
repeat_it = flusher->sync_to_repeat.find(cur.oid);
if (repeat_it->second > cur.version)
{
@ -339,11 +340,17 @@ resume_0:
);
wait_count++;
}
// Sync data before writing metadata
resume_16:
resume_17:
resume_18:
if (copy_count && !fsync_batch(false, 16))
{
wait_state += 16;
return false;
}
resume_5:
// And metadata writes, but only after data writes complete
// FIXME: Oops. We must sync data before writing metadata.
// Because this will result in more fsyncs we should reduce flushers activity by postponing
// flushes until there is sufficient amount of work to do (for example, at least ~32 operations in the queue).
if (!bs->inmemory_meta && meta_new.it->second.state == 0 || wait_count > 0)
{
// metadata sector is still being read or data is still being written, wait for it
@ -409,14 +416,13 @@ resume_0:
free(it->buf);
}
v.clear();
// And sync everything (in batches - not per each operation!)
flusher->active_until_sync--;
// And sync metadata (in batches - not per each operation!)
resume_8:
resume_9:
resume_10:
resume_11:
if (!fsync_batch())
if (!fsync_batch(true, 8))
{
wait_state += 8;
return false;
}
// Update clean_db and dirty_db, free old data locations
@ -568,56 +574,54 @@ void journal_flusher_co::update_clean_db()
bs->dirty_db.erase(dirty_it, std::next(dirty_end));
}
bool journal_flusher_co::fsync_batch()
bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
{
if (wait_state == 8)
goto resume_8;
else if (wait_state == 9)
goto resume_9;
else if (wait_state == 10)
goto resume_10;
else if (wait_state == 11)
goto resume_11;
if (wait_state == wait_base)
goto resume_0;
else if (wait_state == wait_base+1)
goto resume_1;
else if (wait_state == wait_base+2)
goto resume_2;
if (!bs->disable_fsync)
{
cur_sync = flusher->syncs.end();
if (cur_sync == flusher->syncs.begin() || cur_sync->state == 1)
cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 });
else
while (cur_sync != flusher->syncs.begin())
{
cur_sync--;
if (cur_sync->fsync_meta == fsync_meta && cur_sync->state == 0)
goto sync_found;
}
cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){
.fsync_meta = fsync_meta,
.ready_count = 0,
.state = 0,
});
sync_found:
cur_sync->ready_count++;
if (cur_sync->ready_count >= flusher->sync_threshold ||
!flusher->active_until_sync && (!flusher->flush_queue.size() || flusher->active_flushers >= flusher->flusher_count))
if (cur_sync->ready_count >= flusher->sync_threshold || !flusher->flush_queue.size())
{
// Sync batch is ready. Do it.
await_sqe(9);
await_sqe(0);
data->callback = simple_callback_w;
data->iov = { 0 };
my_uring_prep_fsync(sqe, bs->data_fd, IORING_FSYNC_DATASYNC);
my_uring_prep_fsync(sqe, fsync_meta ? bs->meta_fd : bs->data_fd, IORING_FSYNC_DATASYNC);
cur_sync->state = 1;
wait_count++;
if (bs->meta_fd != bs->data_fd)
{
await_sqe(10);
data->callback = simple_callback_w;
data->iov = { 0 };
my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC);
wait_count++;
}
resume_11:
resume_1:
if (wait_count > 0)
{
wait_state = 11;
wait_state = 1;
return false;
}
// Sync completed. All previous coroutines waiting for it must be resumed
cur_sync->state = 1;
cur_sync->state = 2;
bs->ringloop->wakeup(bs->ring_consumer);
}
// Wait until someone else sends and completes a sync.
resume_8:
resume_2:
if (!cur_sync->state)
{
wait_state = 8;
wait_state = 2;
return false;
}
cur_sync->ready_count--;

View File

@ -14,6 +14,7 @@ struct meta_sector_t
struct flusher_sync_t
{
bool fsync_meta;
int ready_count;
int state;
};
@ -50,7 +51,7 @@ class journal_flusher_co
friend class journal_flusher_t;
bool modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base);
void update_clean_db();
bool fsync_batch();
bool fsync_batch(bool fsync_meta, int wait_base);
public:
journal_flusher_co();
bool loop();
@ -68,7 +69,7 @@ class journal_flusher_t
int journal_trim_counter, journal_trim_interval;
void* journal_superblock;
int active_flushers, active_until_sync;
int active_flushers;
std::list<flusher_sync_t> syncs;
std::map<object_id, uint64_t> sync_to_repeat;