Browse Source

Do not start small writes before finishing the last big write to the same object

trace-sqes
Vitaliy Filippov 2 years ago
parent
commit
eba053febe
  1. 21
      blockstore_impl.h
  2. 2
      blockstore_init.cpp
  3. 12
      blockstore_sync.cpp
  4. 60
      blockstore_write.cpp

21
blockstore_impl.h

@ -25,11 +25,12 @@
// States are not stored on disk. Instead, they're deduced from the journal
// FIXME: Rename to BS_ST_*
#define ST_J_IN_FLIGHT 1
#define ST_J_SUBMITTED 2
#define ST_J_WRITTEN 3
#define ST_J_SYNCED 4
#define ST_J_STABLE 5
#define ST_J_WAIT_BIG 1
#define ST_J_IN_FLIGHT 2
#define ST_J_SUBMITTED 3
#define ST_J_WRITTEN 4
#define ST_J_SYNCED 5
#define ST_J_STABLE 6
#define ST_D_IN_FLIGHT 15
#define ST_D_SUBMITTED 16
@ -49,13 +50,13 @@
#define IMMEDIATE_SMALL 1
#define IMMEDIATE_ALL 2
#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_IN_FLIGHT(st) (st == ST_J_WAIT_BIG || st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT)
#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_SYNCED || st == ST_DEL_SYNCED)
#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE)
#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE)
#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE)
#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_WRITTEN|| st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN)
#define IS_JOURNAL(st) (st >= ST_J_WAIT_BIG && st <= ST_J_STABLE)
#define IS_BIG_WRITE(st) (st >= ST_D_IN_FLIGHT && st <= ST_D_STABLE)
#define IS_DELETE(st) (st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_STABLE)
#define IS_UNSYNCED(st) (st >= ST_J_WAIT_BIG && st <= ST_J_WRITTEN || st >= ST_D_IN_FLIGHT && st <= ST_D_WRITTEN|| st >= ST_DEL_IN_FLIGHT && st <= ST_DEL_WRITTEN)
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \

2
blockstore_init.cpp

@ -521,7 +521,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
}
auto clean_it = bs->clean_db.find(je->small_write.oid);
if (clean_it == bs->clean_db.end() ||
clean_it->second.version < je->big_write.version)
clean_it->second.version < je->small_write.version)
{
obj_ver_id ov = {
.oid = je->small_write.oid,

12
blockstore_sync.cpp

@ -252,7 +252,17 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
#endif
auto & unstab = unstable_writes[it->oid];
unstab = unstab < it->version ? it->version : unstab;
dirty_db[*it].state = ST_D_SYNCED;
auto dirty_it = dirty_db.find(*it);
dirty_it->second.state = ST_D_SYNCED;
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == it->oid)
{
if (dirty_it->second.state == ST_J_WAIT_BIG)
{
dirty_it->second.state = ST_J_IN_FLIGHT;
}
dirty_it++;
}
}
for (auto it = PRIV(op)->sync_small_writes.begin(); it != PRIV(op)->sync_small_writes.end(); it++)
{

60
blockstore_write.cpp

@ -4,6 +4,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
{
// Check or assign version number
bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE);
bool is_inflight_big = false;
uint64_t version = 1;
if (dirty_db.size() > 0)
{
@ -17,6 +18,9 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
found = true;
version = dirty_it->first.version + 1;
deleted = IS_DELETE(dirty_it->second.state);
is_inflight_big = dirty_it->second.state >= ST_D_IN_FLIGHT &&
dirty_it->second.state < ST_D_SYNCED ||
dirty_it->second.state == ST_J_WAIT_BIG;
}
}
if (!found)
@ -47,6 +51,18 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
op->retval = 0;
return false;
}
if (is_inflight_big && !is_del && !deleted && op->len < block_size &&
immediate_commit != IMMEDIATE_ALL)
{
// Issue an additional sync so that the previous big write can reach the journal
blockstore_op_t *sync_op = new blockstore_op_t;
sync_op->opcode = BS_OP_SYNC;
sync_op->callback = [this, op](blockstore_op_t *sync_op)
{
delete sync_op;
};
enqueue_op(sync_op);
}
// Immediately add the operation into dirty_db, so subsequent reads could see it
#ifdef BLOCKSTORE_DEBUG
if (is_del)
@ -61,7 +77,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
.state = (uint32_t)(
is_del
? ST_DEL_IN_FLIGHT
: (op->len == block_size || deleted ? ST_D_IN_FLIGHT : ST_J_IN_FLIGHT)
: (op->len == block_size || deleted ? ST_D_IN_FLIGHT : (is_inflight_big ? ST_J_WAIT_BIG : ST_J_IN_FLIGHT))
),
.flags = 0,
.location = 0,
@ -83,7 +99,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
.oid = op->oid,
.version = op->version,
});
if (dirty_it->second.state == ST_D_IN_FLIGHT)
if (dirty_it->second.state == ST_J_WAIT_BIG)
{
return 0;
}
else if (dirty_it->second.state == ST_D_IN_FLIGHT)
{
blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
@ -262,10 +282,10 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op)
{
io_uring_sqe *sqe = NULL;
journal_entry_big_write *je;
auto & dirty_entry = dirty_db[(obj_ver_id){
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
}];
});
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 4)
@ -280,7 +300,7 @@ resume_2:
return 0;
}
je = (journal_entry_big_write*)prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write));
dirty_entry.journal_sector = journal.sector_info[journal.cur_sector].offset;
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.sector_info[journal.cur_sector].dirty = false;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
#ifdef BLOCKSTORE_DEBUG
@ -290,7 +310,7 @@ resume_2:
je->version = op->version;
je->offset = op->offset;
je->len = op->len;
je->location = dirty_entry.location;
je->location = dirty_it->second.location;
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
prepare_journal_sector_write(journal, journal.cur_sector, sqe,
@ -302,9 +322,9 @@ resume_2:
resume_4:
// Switch object state
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state);
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
#endif
bool imm = dirty_entry.state == ST_D_SUBMITTED
bool imm = dirty_it->second.state == ST_D_SUBMITTED
? (immediate_commit == IMMEDIATE_ALL)
: (immediate_commit != IMMEDIATE_NONE);
if (imm)
@ -312,17 +332,29 @@ resume_4:
auto & unstab = unstable_writes[op->oid];
unstab = unstab < op->version ? op->version : unstab;
}
if (dirty_entry.state == ST_J_SUBMITTED)
if (dirty_it->second.state == ST_J_SUBMITTED)
{
dirty_it->second.state = imm ? ST_J_SYNCED : ST_J_WRITTEN;
}
else if (dirty_it->second.state == ST_D_SUBMITTED)
{
dirty_entry.state = imm ? ST_J_SYNCED : ST_J_WRITTEN;
dirty_it->second.state = imm ? ST_D_SYNCED : ST_D_WRITTEN;
}
else if (dirty_entry.state == ST_D_SUBMITTED)
else if (dirty_it->second.state == ST_DEL_SUBMITTED)
{
dirty_entry.state = imm ? ST_D_SYNCED : ST_D_WRITTEN;
dirty_it->second.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN;
}
else if (dirty_entry.state == ST_DEL_SUBMITTED)
if (immediate_commit == IMMEDIATE_ALL)
{
dirty_entry.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN;
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
{
if (dirty_it->second.state == ST_J_WAIT_BIG)
{
dirty_it->second.state = ST_J_IN_FLIGHT;
}
dirty_it++;
}
}
// Acknowledge write
op->retval = op->len;

Loading…
Cancel
Save