Do not overwrite same journal sector multiple times

It doesn't reduce actual WA, but it reduces tail latency (Q=32, 10% / 50% / 90% / 99% / 99.95%):
- write: 766us/979us/1090us/1303us/1729us vs 1074us/1450us/2212us/3261us/4113us
- sync: 701us/881us/1188us/1762us/2540us vs 269us/955us/1663us/2638us/4146us
blocking-uring-test
Vitaliy Filippov 2020-01-15 01:55:30 +03:00
parent 111516381f
commit a3d3949dce
7 changed files with 163 additions and 91 deletions

View File

@ -125,6 +125,12 @@ void blockstore_impl_t::loop()
if (PRIV(op)->wait_for)
{
check_wait(op);
#ifdef BLOCKSTORE_DEBUG
if (PRIV(op)->wait_for)
{
printf("still waiting for %d\n", PRIV(op)->wait_for);
}
#endif
if (PRIV(op)->wait_for == WAIT_SQE)
{
break;
@ -270,7 +276,9 @@ void blockstore_impl_t::check_wait(blockstore_op_t *op)
}
else if (PRIV(op)->wait_for == WAIT_JOURNAL_BUFFER)
{
if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0)
int next = ((journal.cur_sector + 1) % journal.sector_count);
if (journal.sector_info[next].usage_count > 0 ||
journal.sector_info[next].dirty)
{
// do not submit
return;

View File

@ -256,6 +256,8 @@ class blockstore_impl_t
void enqueue_write(blockstore_op_t *op);
int dequeue_write(blockstore_op_t *op);
int dequeue_del(blockstore_op_t *op);
void ack_write(blockstore_op_t *op);
void release_journal_sectors(blockstore_op_t *op);
void handle_write_event(ring_data_t *data, blockstore_op_t *op);
// Sync

View File

@ -22,6 +22,11 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
next_in_pos += fits * size;
sectors_required++;
}
else if (bs->journal.sector_info[next_sector].dirty)
{
// sectors_required is more like "sectors to write"
sectors_required++;
}
if (required <= 0)
{
break;
@ -33,13 +38,19 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
right_dir = false;
}
next_in_pos = 0;
if (bs->journal.sector_info[next_sector].usage_count > 0)
if (bs->journal.sector_info[next_sector].usage_count > 0 ||
bs->journal.sector_info[next_sector].dirty)
{
next_sector = ((next_sector + 1) % bs->journal.sector_count);
}
if (bs->journal.sector_info[next_sector].usage_count > 0)
if (bs->journal.sector_info[next_sector].usage_count > 0 ||
bs->journal.sector_info[next_sector].dirty)
{
// No memory buffer available. Wait for it.
#ifdef BLOCKSTORE_DEBUG
printf("next journal buffer %d is still dirty=%d used=%d\n", next_sector,
bs->journal.sector_info[next_sector].dirty, bs->journal.sector_info[next_sector].usage_count);
#endif
PRIV(op)->wait_for = WAIT_JOURNAL_BUFFER;
return 0;
}
@ -68,6 +79,7 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
{
if (JOURNAL_BLOCK_SIZE - journal.in_sector_pos < size)
{
assert(!journal.sector_info[journal.cur_sector].dirty);
// Move to the next journal sector
if (journal.sector_info[journal.cur_sector].usage_count > 0)
{
@ -91,22 +103,24 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
je->type = type;
je->size = size;
je->crc32_prev = journal.crc32_last;
journal.sector_info[journal.cur_sector].dirty = true;
return je;
}
void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb)
void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb)
{
journal.sector_info[journal.cur_sector].usage_count++;
journal.sector_info[cur_sector].dirty = false;
journal.sector_info[cur_sector].usage_count++;
ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + JOURNAL_BLOCK_SIZE*journal.cur_sector),
? journal.buffer + journal.sector_info[cur_sector].offset
: journal.sector_buf + JOURNAL_BLOCK_SIZE*cur_sector),
JOURNAL_BLOCK_SIZE
};
data->callback = cb;
my_uring_prep_writev(
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
sqe, journal.fd, &data->iov, 1, journal.offset + journal.sector_info[cur_sector].offset
);
}

View File

@ -112,6 +112,7 @@ struct journal_sector_info_t
{
uint64_t offset;
uint64_t usage_count;
bool dirty;
};
struct journal_t
@ -154,4 +155,4 @@ struct blockstore_journal_check_t
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size);
void prepare_journal_sector_write(journal_t & journal, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);
void prepare_journal_sector_write(journal_t & journal, int sector, io_uring_sqe *sqe, std::function<void(ring_data_t*)> cb);

View File

@ -94,6 +94,14 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
// Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
int s = 0, cur_sector = -1;
if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_stable) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector;
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
}
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
auto unstab_it = unstable_writes.find(v->oid);
@ -104,6 +112,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
}
journal_entry_stable *je = (journal_entry_stable*)
prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable));
journal.sector_info[journal.cur_sector].dirty = false;
je->oid = v->oid;
je->version = v->version;
je->crc32 = je_crc32((journal_entry*)je);
@ -113,7 +122,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
if (cur_sector == -1)
PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector;
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal, sqe[s++], cb);
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
}
}
PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
@ -135,18 +144,7 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
if (PRIV(op)->min_used_journal_sector > 0)
{
uint64_t s = PRIV(op)->min_used_journal_sector;
while (1)
{
journal.sector_info[s-1].usage_count--;
if (s == PRIV(op)->max_used_journal_sector)
break;
s = 1 + s % journal.sector_count;
}
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
}
release_journal_sectors(op);
// First step: mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v;
int i;

View File

@ -39,14 +39,36 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
if (PRIV(op)->sync_state == SYNC_HAS_SMALL)
{
// No big writes, just fsync the journal
if (!disable_fsync)
int n_sqes = disable_fsync ? 0 : 1;
if (journal.sector_info[journal.cur_sector].dirty)
{
BS_SUBMIT_GET_SQE(sqe, data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = cb;
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
PRIV(op)->pending_ops = 1;
n_sqes++;
}
if (n_sqes > 0)
{
io_uring_sqe* sqes[n_sqes];
for (int i = 0; i < n_sqes; i++)
{
BS_SUBMIT_GET_SQE_DECL(sqes[i]);
}
int s = 0;
if (journal.sector_info[journal.cur_sector].dirty)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqes[s++], cb);
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
}
else
{
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
}
if (!disable_fsync)
{
ring_data_t *data = ((ring_data_t*)sqes[s]->user_data);
my_uring_prep_fsync(sqes[s++], journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = cb;
}
PRIV(op)->pending_ops = s;
PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT;
}
else
@ -90,11 +112,20 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
// Prepare and submit journal entries
auto it = PRIV(op)->sync_big_writes.begin();
int s = 0, cur_sector = -1;
if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_big_write) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)
PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector;
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
}
while (it != PRIV(op)->sync_big_writes.end())
{
journal_entry_big_write *je = (journal_entry_big_write*)
prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write));
dirty_db[*it].journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.sector_info[journal.cur_sector].dirty = false;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
#ifdef BLOCKSTORE_DEBUG
printf("journal offset %lu is used by %lu:%lu v%lu\n", dirty_db[*it].journal_sector, it->oid.inode, it->oid.stripe, it->version);
@ -112,7 +143,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
if (cur_sector == -1)
PRIV(op)->min_used_journal_sector = 1 + journal.cur_sector;
cur_sector = journal.cur_sector;
prepare_journal_sector_write(journal, sqe[s++], cb);
prepare_journal_sector_write(journal, cur_sector, sqe[s++], cb);
}
}
PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
@ -147,18 +178,7 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
if (PRIV(op)->min_used_journal_sector > 0)
{
uint64_t s = PRIV(op)->min_used_journal_sector;
while (1)
{
journal.sector_info[s-1].usage_count--;
if (s == PRIV(op)->max_used_journal_sector)
break;
s = 1 + s % journal.sector_count;
}
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
}
release_journal_sectors(op);
// Handle states
if (PRIV(op)->sync_state == SYNC_DATA_SYNC_SENT)
{

View File

@ -137,7 +137,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{
// Small (journaled) write
// First check if the journal has sufficient space
// FIXME Always two SQEs for now. Although it's possible to send 1 sometimes
blockstore_journal_check_t space_check(this);
if (unsynced_big_writes.size() && !space_check.check_available(op, unsynced_big_writes.size(), sizeof(journal_entry_big_write), 0)
|| !space_check.check_available(op, 1, sizeof(journal_entry_small_write), op->len + JOURNAL_STABILIZE_RESERVATION))
@ -145,18 +144,34 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
return 0;
}
// There is sufficient space. Get SQE(s)
BS_SUBMIT_GET_ONLY_SQE(sqe1);
struct io_uring_sqe *sqe1 = NULL;
if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_small_write) &&
journal.sector_info[journal.cur_sector].dirty)
{
// Write current journal sector only if it's dirty and full
BS_SUBMIT_GET_SQE_DECL(sqe1);
}
struct io_uring_sqe *sqe2 = NULL;
struct ring_data_t *data2 = NULL;
if (op->len > 0)
{
BS_SUBMIT_GET_SQE_DECL(sqe2);
data2 = ((ring_data_t*)sqe2->user_data);
}
// FIXME: Write journal sector here only if it is full. Otherwise, defer it until SYNC. This will help reduce WA
// Got SQEs. Prepare journal sector write
// Got SQEs. Prepare previous journal sector write if required
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
if (sqe1)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
// FIXME rename to min/max _flushing
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
}
else
{
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
}
// Then pre-fill journal entry
journal_entry_small_write *je = (journal_entry_small_write*)
prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(struct journal_entry_small_write));
prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(journal_entry_small_write));
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
#ifdef BLOCKSTORE_DEBUG
@ -172,9 +187,6 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
je->crc32_data = crc32c(0, op->buf, op->len);
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
prepare_journal_sector_write(journal, sqe1, cb);
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
if (op->len > 0)
{
// Prepare journal data write
@ -183,28 +195,34 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
// Copy data
memcpy(journal.buffer + journal.next_free, op->buf, op->len);
}
ring_data_t *data2 = ((ring_data_t*)sqe2->user_data);
data2->iov = (struct iovec){ op->buf, op->len };
data2->callback = cb;
my_uring_prep_writev(
sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
);
PRIV(op)->pending_ops = 2;
PRIV(op)->pending_ops++;
}
else
{
// Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data
PRIV(op)->pending_ops = 1;
}
dirty_it->second.location = journal.next_free;
dirty_it->second.state = ST_J_SUBMITTED;
journal.next_free += op->len;
if (journal.next_free >= journal.len)
{
journal.next_free = JOURNAL_BLOCK_SIZE;
}
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
if (!PRIV(op)->pending_ops)
{
ack_write(op);
}
}
return 1;
}
@ -223,45 +241,56 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
if (PRIV(op)->min_used_journal_sector > 0)
{
uint64_t s = PRIV(op)->min_used_journal_sector;
while (1)
{
journal.sector_info[s-1].usage_count--;
if (s == PRIV(op)->max_used_journal_sector)
break;
s = 1 + s % journal.sector_count;
}
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
}
// Switch object state
auto & dirty_entry = dirty_db[(obj_ver_id){
.oid = op->oid,
.version = op->version,
}];
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state);
#endif
if (dirty_entry.state == ST_J_SUBMITTED)
{
dirty_entry.state = ST_J_WRITTEN;
}
else if (dirty_entry.state == ST_D_SUBMITTED)
{
dirty_entry.state = ST_D_WRITTEN;
}
else if (dirty_entry.state == ST_DEL_SUBMITTED)
{
dirty_entry.state = ST_DEL_WRITTEN;
}
// Acknowledge write without sync
op->retval = op->len;
FINISH_OP(op);
release_journal_sectors(op);
ack_write(op);
}
}
void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
{
// Release used journal sectors
if (PRIV(op)->min_used_journal_sector > 0 &&
PRIV(op)->max_used_journal_sector > 0)
{
uint64_t s = PRIV(op)->min_used_journal_sector;
while (1)
{
journal.sector_info[s-1].usage_count--;
if (s == PRIV(op)->max_used_journal_sector)
break;
s = 1 + s % journal.sector_count;
}
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 0;
}
}
void blockstore_impl_t::ack_write(blockstore_op_t *op)
{
// Switch object state
auto & dirty_entry = dirty_db[(obj_ver_id){
.oid = op->oid,
.version = op->version,
}];
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state);
#endif
if (dirty_entry.state == ST_J_SUBMITTED)
{
dirty_entry.state = ST_J_WRITTEN;
}
else if (dirty_entry.state == ST_D_SUBMITTED)
{
dirty_entry.state = ST_D_WRITTEN;
}
else if (dirty_entry.state == ST_DEL_SUBMITTED)
{
dirty_entry.state = ST_DEL_WRITTEN;
}
// Acknowledge write without sync
op->retval = op->len;
FINISH_OP(op);
}
int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
{
auto dirty_it = dirty_db.find((obj_ver_id){
@ -287,7 +316,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
prepare_journal_sector_write(journal, sqe, cb);
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
dirty_it->second.state = ST_DEL_SUBMITTED;