Browse Source

Implement immediate commit mode

trace-sqes
Vitaliy Filippov 2 years ago
parent
commit
3f522c66e6
  1. 2
      blockstore_impl.cpp
  2. 17
      blockstore_impl.h
  3. 4
      blockstore_init.cpp
  4. 26
      blockstore_open.cpp
  5. 2
      blockstore_stable.cpp
  6. 2
      blockstore_sync.cpp
  7. 230
      blockstore_write.cpp
  8. 2
      test.cpp

2
blockstore_impl.cpp

@ -355,7 +355,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
op->callback(op);
return;
}
if (0 && op->opcode == BS_OP_SYNC && immediate_commit)
if (op->opcode == BS_OP_SYNC && immediate_commit == IMMEDIATE_ALL)
{
op->retval = 0;
op->callback(op);

17
blockstore_impl.h

@ -34,8 +34,7 @@
#define ST_D_IN_FLIGHT 15
#define ST_D_SUBMITTED 16
#define ST_D_WRITTEN 17
#define ST_D_META_WRITTEN 19
#define ST_D_META_SYNCED 20
#define ST_D_SYNCED 20
#define ST_D_STABLE 21
#define ST_DEL_IN_FLIGHT 31
@ -46,13 +45,17 @@
#define ST_CURRENT 48
#define IMMEDIATE_NONE 0
#define IMMEDIATE_SMALL 1
#define IMMEDIATE_ALL 2
#define IS_IN_FLIGHT(st) (st == ST_J_IN_FLIGHT || st == ST_D_IN_FLIGHT || st == ST_DEL_IN_FLIGHT || st == ST_J_SUBMITTED || st == ST_D_SUBMITTED || st == ST_DEL_SUBMITTED)
#define IS_STABLE(st) (st == ST_J_STABLE || st == ST_D_STABLE || st == ST_DEL_STABLE || st == ST_CURRENT)
#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_META_SYNCED || st == ST_DEL_SYNCED)
#define IS_SYNCED(st) (IS_STABLE(st) || st == ST_J_SYNCED || st == ST_D_SYNCED || st == ST_DEL_SYNCED)
#define IS_JOURNAL(st) (st >= ST_J_SUBMITTED && st <= ST_J_STABLE)
#define IS_BIG_WRITE(st) (st >= ST_D_SUBMITTED && st <= ST_D_STABLE)
#define IS_DELETE(st) (st >= ST_DEL_SUBMITTED && st <= ST_DEL_STABLE)
#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_META_WRITTEN || st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN)
#define IS_UNSYNCED(st) (st >= ST_J_SUBMITTED && st <= ST_J_WRITTEN || st >= ST_D_SUBMITTED && st <= ST_D_WRITTEN|| st >= ST_DEL_SUBMITTED && st <= ST_DEL_WRITTEN)
#define BS_SUBMIT_GET_SQE(sqe, data) \
BS_SUBMIT_GET_ONLY_SQE(sqe); \
@ -195,8 +198,8 @@ class blockstore_impl_t
// It is safe to disable fsync() if drive write cache is writethrough
bool disable_data_fsync = false, disable_meta_fsync = false, disable_journal_fsync = false;
// Enable if you want every operation to be executed with an "implicit fsync"
// FIXME Not implemented yet
bool immediate_commit = false;
// Suitable only for server SSDs with capacitors, requires disabled data and journal fsyncs
int immediate_commit = IMMEDIATE_NONE;
bool inmemory_meta = false;
int flusher_count;
/******* END OF OPTIONS *******/
@ -268,7 +271,7 @@ class blockstore_impl_t
bool enqueue_write(blockstore_op_t *op);
int dequeue_write(blockstore_op_t *op);
int dequeue_del(blockstore_op_t *op);
void ack_write(blockstore_op_t *op);
int continue_write(blockstore_op_t *op);
void release_journal_sectors(blockstore_op_t *op);
void handle_write_event(ring_data_t *data, blockstore_op_t *op);

4
blockstore_init.cpp

@ -558,7 +558,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
.version = je->big_write.version,
};
bs->dirty_db.emplace(ov, (dirty_entry){
.state = ST_D_META_SYNCED,
.state = ST_D_SYNCED,
.flags = 0,
.location = je->big_write.location,
.offset = je->big_write.offset,
@ -595,7 +595,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
{
while (1)
{
it->second.state = (it->second.state == ST_D_META_SYNCED
it->second.state = (it->second.state == ST_D_SYNCED
? ST_D_STABLE
: (it->second.state == ST_DEL_SYNCED ? ST_DEL_STABLE : ST_J_STABLE));
if (it == bs->dirty_db.begin())

26
blockstore_open.cpp

@ -34,6 +34,14 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config)
{
disable_journal_fsync = true;
}
if (config["immediate_commit"] == "all")
{
immediate_commit = IMMEDIATE_ALL;
}
else if (config["immediate_commit"] == "small")
{
immediate_commit = IMMEDIATE_SMALL;
}
metadata_buf_size = strtoull(config["meta_buf_size"].c_str(), NULL, 10);
cfg_journal_size = strtoull(config["journal_size"].c_str(), NULL, 10);
data_device = config["data_device"];
@ -129,6 +137,22 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config)
{
metadata_buf_size = 4*1024*1024;
}
if (meta_device == "")
{
disable_meta_fsync = disable_data_fsync;
}
if (journal_device == "")
{
disable_journal_fsync = disable_meta_fsync;
}
if (immediate_commit != IMMEDIATE_NONE && !disable_journal_fsync)
{
throw std::runtime_error("immediate_commit requires disable_journal_fsync");
}
if (immediate_commit == IMMEDIATE_ALL && !disable_data_fsync)
{
throw std::runtime_error("immediate_commit=all requires disable_journal_fsync and disable_data_fsync");
}
// init some fields
clean_entry_bitmap_size = block_size / bitmap_granularity / 8;
clean_entry_size = sizeof(clean_disk_entry) + clean_entry_bitmap_size;
@ -283,7 +307,6 @@ void blockstore_impl_t::open_meta()
else
{
meta_fd = data_fd;
disable_meta_fsync = disable_data_fsync;
meta_size = 0;
if (meta_offset >= data_size)
{
@ -306,7 +329,6 @@ void blockstore_impl_t::open_journal()
else
{
journal.fd = meta_fd;
disable_journal_fsync = disable_meta_fsync;
journal.device_size = 0;
if (journal.offset >= data_size)
{

2
blockstore_stable.cpp

@ -181,7 +181,7 @@ resume_5:
{
dirty_it->second.state = ST_J_STABLE;
}
else if (dirty_it->second.state == ST_D_META_SYNCED)
else if (dirty_it->second.state == ST_D_SYNCED)
{
dirty_it->second.state = ST_D_STABLE;
}

2
blockstore_sync.cpp

@ -252,7 +252,7 @@ void blockstore_impl_t::ack_one_sync(blockstore_op_t *op)
#endif
auto & unstab = unstable_writes[it->oid];
unstab = unstab < it->version ? it->version : unstab;
dirty_db[*it].state = ST_D_META_SYNCED;
dirty_db[*it].state = ST_D_SYNCED;
}
for (auto it = PRIV(op)->sync_small_writes.begin(); it != PRIV(op)->sync_small_writes.end(); it++)
{

230
blockstore_write.cpp

@ -75,6 +75,10 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
// First step of the write algorithm: dequeue operation and submit initial write(s)
int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{
if (PRIV(op)->op_state)
{
return continue_write(op);
}
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
@ -129,11 +133,19 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
);
PRIV(op)->pending_ops = 1;
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
// Remember big write as unsynced
unsynced_big_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
if (immediate_commit != IMMEDIATE_ALL)
{
// Remember big write as unsynced
unsynced_big_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
PRIV(op)->op_state = 3;
}
else
{
PRIV(op)->op_state = 1;
}
}
else
{
@ -147,10 +159,11 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
}
// There is sufficient space. Get SQE(s)
struct io_uring_sqe *sqe1 = NULL;
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_small_write) &&
if (immediate_commit != IMMEDIATE_NONE ||
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_small_write) &&
journal.sector_info[journal.cur_sector].dirty)
{
// Write current journal sector only if it's dirty and full
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_GET_SQE_DECL(sqe1);
}
struct io_uring_sqe *sqe2 = NULL;
@ -160,15 +173,18 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
}
// Got SQEs. Prepare previous journal sector write if required
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
if (sqe1)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
}
else
if (immediate_commit == IMMEDIATE_NONE)
{
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
if (sqe1)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
}
else
{
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
}
}
// Then pre-fill journal entry
journal_entry_small_write *je = (journal_entry_small_write*)
@ -188,6 +204,12 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
je->crc32_data = crc32c(0, op->buf, op->len);
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
if (immediate_commit != IMMEDIATE_NONE)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe1, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
}
if (op->len > 0)
{
// Prepare journal data write
@ -215,16 +237,96 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
{
journal.next_free = journal_block_size;
}
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
if (immediate_commit == IMMEDIATE_NONE)
{
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
}
if (!PRIV(op)->pending_ops)
{
ack_write(op);
PRIV(op)->op_state = 4;
continue_write(op);
}
else
{
PRIV(op)->op_state = 3;
}
}
return 1;
}
int blockstore_impl_t::continue_write(blockstore_op_t *op)
{
io_uring_sqe *sqe = NULL;
journal_entry_big_write *je;
auto & dirty_entry = dirty_db[(obj_ver_id){
.oid = op->oid,
.version = op->version,
}];
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 4)
goto resume_4;
else
return 1;
resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry
sqe = get_sqe();
if (!sqe)
{
return 0;
}
je = (journal_entry_big_write*)prefill_single_journal_entry(journal, JE_BIG_WRITE, sizeof(journal_entry_big_write));
dirty_entry.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.sector_info[journal.cur_sector].dirty = false;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
#ifdef BLOCKSTORE_DEBUG
printf("journal offset %lu is used by %lu:%lu v%lu\n", journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version);
#endif
je->oid = op->oid;
je->version = op->version;
je->offset = op->offset;
je->len = op->len;
je->location = dirty_entry.location;
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
prepare_journal_sector_write(journal, journal.cur_sector, sqe,
[this, op](ring_data_t *data) { handle_write_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 3;
return 1;
resume_4:
// Switch object state
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state);
#endif
bool imm = dirty_entry.state == ST_D_SUBMITTED
? (immediate_commit == IMMEDIATE_ALL)
: (immediate_commit != IMMEDIATE_NONE);
if (imm)
{
auto & unstab = unstable_writes[op->oid];
unstab = unstab < op->version ? op->version : unstab;
}
if (dirty_entry.state == ST_J_SUBMITTED)
{
dirty_entry.state = imm ? ST_J_SYNCED : ST_J_WRITTEN;
}
else if (dirty_entry.state == ST_D_SUBMITTED)
{
dirty_entry.state = imm ? ST_D_SYNCED : ST_D_WRITTEN;
}
else if (dirty_entry.state == ST_DEL_SUBMITTED)
{
dirty_entry.state = imm ? ST_DEL_SYNCED : ST_DEL_WRITTEN;
}
// Acknowledge write
op->retval = op->len;
FINISH_OP(op);
return 1;
}
@ -243,7 +345,11 @@ void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *o
if (PRIV(op)->pending_ops == 0)
{
release_journal_sectors(op);
ack_write(op);
PRIV(op)->op_state++;
if (!continue_write(op))
{
submit_queue.push_front(op);
}
}
}
@ -275,33 +381,6 @@ void blockstore_impl_t::release_journal_sectors(blockstore_op_t *op)
}
}
void blockstore_impl_t::ack_write(blockstore_op_t *op)
{
// Switch object state
auto & dirty_entry = dirty_db[(obj_ver_id){
.oid = op->oid,
.version = op->version,
}];
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lu:%lu v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_entry.state);
#endif
if (dirty_entry.state == ST_J_SUBMITTED)
{
dirty_entry.state = ST_J_WRITTEN;
}
else if (dirty_entry.state == ST_D_SUBMITTED)
{
dirty_entry.state = ST_D_WRITTEN;
}
else if (dirty_entry.state == ST_DEL_SUBMITTED)
{
dirty_entry.state = ST_DEL_WRITTEN;
}
// Acknowledge write without sync
op->retval = op->len;
FINISH_OP(op);
}
int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
{
auto dirty_it = dirty_db.find((obj_ver_id){
@ -313,8 +392,30 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
{
return 0;
}
BS_SUBMIT_GET_ONLY_SQE(sqe);
io_uring_sqe *sqe = NULL;
if (immediate_commit != IMMEDIATE_NONE ||
(journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_del) &&
journal.sector_info[journal.cur_sector].dirty)
{
// Write current journal sector only if it's dirty and full, or in the immediate_commit mode
BS_SUBMIT_GET_SQE_DECL(sqe);
}
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
// Prepare journal sector write
if (immediate_commit == IMMEDIATE_NONE)
{
if (sqe)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
}
else
{
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
}
}
// Pre-fill journal entry
journal_entry_del *je = (journal_entry_del*)
prefill_single_journal_entry(journal, JE_DELETE, sizeof(struct journal_entry_del));
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
@ -326,15 +427,26 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op)
je->version = op->version;
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
dirty_it->second.state = ST_DEL_SUBMITTED;
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
if (immediate_commit != IMMEDIATE_NONE)
{
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops++;
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
}
if (!PRIV(op)->pending_ops)
{
PRIV(op)->op_state = 4;
continue_write(op);
}
else
{
PRIV(op)->op_state = 3;
}
return 1;
}

2
test.cpp

@ -181,7 +181,7 @@ int main0(int argc, char *argv[])
},
.version = 1,
}] = (dirty_entry){
.state = ST_D_META_SYNCED,
.state = ST_D_SYNCED,
.flags = 0,
.location = (uint64_t)i << 17,
.offset = 0,

Loading…
Cancel
Save