From 2ab423d4ef323de1df92317bc38efac03df537c6 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 28 Mar 2021 22:47:35 +0300 Subject: [PATCH] Implement journaled write throttling for the SSD+HDD case --- src/blockstore.cpp | 4 +- src/blockstore.h | 3 +- src/blockstore_impl.cpp | 3 +- src/blockstore_impl.h | 13 ++- src/blockstore_open.cpp | 21 +++++ src/blockstore_write.cpp | 193 ++++++++++++++++++++++++--------------- src/fio_engine.cpp | 6 +- src/osd.cpp | 9 +- src/test_blockstore.cpp | 5 +- 9 files changed, 172 insertions(+), 85 deletions(-) diff --git a/src/blockstore.cpp b/src/blockstore.cpp index e4a70f4c..bb4b4112 100644 --- a/src/blockstore.cpp +++ b/src/blockstore.cpp @@ -3,9 +3,9 @@ #include "blockstore_impl.h" -blockstore_t::blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop) +blockstore_t::blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd) { - impl = new blockstore_impl_t(config, ringloop); + impl = new blockstore_impl_t(config, ringloop, tfd); } blockstore_t::~blockstore_t() diff --git a/src/blockstore.h b/src/blockstore.h index 8ce67097..07817bf8 100644 --- a/src/blockstore.h +++ b/src/blockstore.h @@ -16,6 +16,7 @@ #include "object_id.h" #include "ringloop.h" +#include "timerfd_manager.h" // Memory alignment for direct I/O (usually 512 bytes) // All other alignments must be a multiple of this one @@ -158,7 +159,7 @@ class blockstore_t { blockstore_impl_t *impl; public: - blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop); + blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd); ~blockstore_t(); // Event loop diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 5a27c306..be20c512 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -3,9 +3,10 @@ #include "blockstore_impl.h" -blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop) +blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd) { assert(sizeof(blockstore_op_private_t) <= BS_OP_PRIVATE_DATA_SIZE); + this->tfd = tfd; this->ringloop = ringloop; ring_consumer.loop = [this]() { loop(); }; ringloop->register_consumer(&ring_consumer); diff --git a/src/blockstore_impl.h b/src/blockstore_impl.h index 43b95f63..f4e9e734 100644 --- a/src/blockstore_impl.h +++ b/src/blockstore_impl.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -158,6 +159,7 @@ struct blockstore_op_private_t struct iovec iov_zerofill[3]; // Warning: must not have a default value here because it's written to before calling constructor in blockstore_write.cpp O_o uint64_t real_version; + timespec tv_begin; // Sync std::vector sync_big_writes, sync_small_writes; @@ -203,6 +205,14 @@ class blockstore_impl_t unsigned max_flusher_count, min_flusher_count; // Maximum queue depth unsigned max_write_iodepth = 128; + // Enable small (journaled) write throttling, useful for the SSD+HDD case + bool throttle_small_writes = false; + // Target data device iops, bandwidth and parallelism for throttling (100/100/1 is the default for HDD) + int throttle_target_iops = 100; + int throttle_target_mbs = 100; + int throttle_target_parallelism = 1; + // Minimum difference in microseconds between target and real execution times to throttle the response + int throttle_threshold_us = 50; /******* END OF OPTIONS *******/ struct ring_consumer_t ring_consumer; @@ -233,6 +243,7 @@ class blockstore_impl_t bool live = false, queue_stall = false; ring_loop_t *ringloop; + timerfd_manager_t *tfd; bool stop_sync_submitted; @@ -303,7 +314,7 @@ class blockstore_impl_t public: - blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop); + blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd); ~blockstore_impl_t(); // Event loop diff --git a/src/blockstore_open.cpp b/src/blockstore_open.cpp index 429a9de0..5c97c842 100644 --- a/src/blockstore_open.cpp +++ b/src/blockstore_open.cpp @@ -79,6 +79,11 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config) max_flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10); min_flusher_count = strtoull(config["min_flusher_count"].c_str(), NULL, 10); max_write_iodepth = strtoull(config["max_write_iodepth"].c_str(), NULL, 10); + throttle_small_writes = config["throttle_small_writes"] == "true" || config["throttle_small_writes"] == "1" || config["throttle_small_writes"] == "yes"; + throttle_target_iops = strtoull(config["throttle_target_iops"].c_str(), NULL, 10); + throttle_target_mbs = strtoull(config["throttle_target_mbs"].c_str(), NULL, 10); + throttle_target_parallelism = strtoull(config["throttle_target_parallelism"].c_str(), NULL, 10); + throttle_threshold_us = strtoull(config["throttle_threshold_us"].c_str(), NULL, 10); // Validate if (!block_size) { @@ -180,6 +185,22 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config) { throw std::runtime_error("immediate_commit=all requires disable_journal_fsync and disable_data_fsync"); } + if (!throttle_target_iops) + { + throttle_target_iops = 100; + } + if (!throttle_target_mbs) + { + throttle_target_mbs = 100; + } + if (!throttle_target_parallelism) + { + throttle_target_parallelism = 1; + } + if (!throttle_threshold_us) + { + throttle_threshold_us = 50; + } // init some fields clean_entry_bitmap_size = block_size / bitmap_granularity / 8; clean_entry_size = sizeof(clean_disk_entry) + 2*clean_entry_bitmap_size; diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index fc670ed1..2a8b6b47 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -122,6 +122,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) else { state = (op->len == block_size || deleted ? BS_ST_BIG_WRITE : BS_ST_SMALL_WRITE); + if (state == BS_ST_SMALL_WRITE && throttle_small_writes) + clock_gettime(CLOCK_REALTIME, &PRIV(op)->tv_begin); if (wait_del) state |= BS_ST_WAIT_DEL; else if (state == BS_ST_SMALL_WRITE && wait_big) @@ -424,105 +426,148 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) int blockstore_impl_t::continue_write(blockstore_op_t *op) { - io_uring_sqe *sqe = NULL; - journal_entry_big_write *je; int op_state = PRIV(op)->op_state; - if (op_state != 2 && op_state != 4) - { - // In progress - return 1; - } - auto dirty_it = dirty_db.find((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); - assert(dirty_it != dirty_db.end()); if (op_state == 2) goto resume_2; else if (op_state == 4) goto resume_4; + else if (op_state == 6) + goto resume_6; + else + { + // In progress + return 1; + } resume_2: // Only for the immediate_commit mode: prepare and submit big_write journal entry - BS_SUBMIT_GET_SQE_DECL(sqe); - je = (journal_entry_big_write*)prefill_single_journal_entry( - journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, - sizeof(journal_entry_big_write) + clean_entry_bitmap_size - ); - dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; - journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; + { + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + assert(dirty_it != dirty_db.end()); + io_uring_sqe *sqe = NULL; + BS_SUBMIT_GET_SQE_DECL(sqe); + journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry( + journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE, + sizeof(journal_entry_big_write) + clean_entry_bitmap_size + ); + dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset; + journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++; #ifdef BLOCKSTORE_DEBUG - printf( - "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n", - journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version, - journal.used_sectors[journal.sector_info[journal.cur_sector].offset] - ); + printf( + "journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n", + journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version, + journal.used_sectors[journal.sector_info[journal.cur_sector].offset] + ); #endif - je->oid = op->oid; - je->version = op->version; - je->offset = op->offset; - je->len = op->len; - je->location = dirty_it->second.location; - memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size); - je->crc32 = je_crc32((journal_entry*)je); - journal.crc32_last = je->crc32; - prepare_journal_sector_write(journal, journal.cur_sector, sqe, - [this, op](ring_data_t *data) { handle_write_event(data, op); }); - PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; - PRIV(op)->pending_ops = 1; - PRIV(op)->op_state = 3; - return 1; + je->oid = op->oid; + je->version = op->version; + je->offset = op->offset; + je->len = op->len; + je->location = dirty_it->second.location; + memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size); + je->crc32 = je_crc32((journal_entry*)je); + journal.crc32_last = je->crc32; + prepare_journal_sector_write(journal, journal.cur_sector, sqe, + [this, op](ring_data_t *data) { handle_write_event(data, op); }); + PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector; + PRIV(op)->pending_ops = 1; + PRIV(op)->op_state = 3; + return 1; + } resume_4: // Switch object state #ifdef BLOCKSTORE_DEBUG printf("Ack write %lx:%lx v%lu = state 0x%x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); #endif - bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE - ? (immediate_commit == IMMEDIATE_ALL) - : (immediate_commit != IMMEDIATE_NONE); - if (imm) { - auto & unstab = unstable_writes[op->oid]; - unstab = unstab < op->version ? op->version : unstab; - } - dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) - | (imm ? BS_ST_SYNCED : BS_ST_WRITTEN); - if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT))) - { - // Deletions and 'instant' operations are treated as immediately stable - mark_stable(dirty_it->first); - } - if (!imm) - { - if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) + auto dirty_it = dirty_db.find((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + assert(dirty_it != dirty_db.end()); + bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE; + bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE); + if (imm) { - // Remember big write as unsynced - unsynced_big_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); + auto & unstab = unstable_writes[op->oid]; + unstab = unstab < op->version ? op->version : unstab; } - else + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) + | (imm ? BS_ST_SYNCED : BS_ST_WRITTEN); + if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT))) { - // Remember small write as unsynced - unsynced_small_writes.push_back((obj_ver_id){ - .oid = op->oid, - .version = op->version, - }); + // Deletions and 'instant' operations are treated as immediately stable + mark_stable(dirty_it->first); } - } - if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) - { - // Unblock small writes - dirty_it++; - while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid) + if (!imm) { - if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG) + if (is_big) { - dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT; + // Remember big write as unsynced + unsynced_big_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); } + else + { + // Remember small write as unsynced + unsynced_small_writes.push_back((obj_ver_id){ + .oid = op->oid, + .version = op->version, + }); + } + } + if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) + { + // Unblock small writes dirty_it++; + while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid) + { + if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG) + { + dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT; + } + dirty_it++; + } + } + // Apply throttling to not fill the journal too fast for the SSD+HDD case + if (!is_big && throttle_small_writes) + { + // Apply throttling + timespec tv_end; + clock_gettime(CLOCK_REALTIME, &tv_end); + uint64_t exec_us = + (tv_end.tv_sec - PRIV(op)->tv_begin.tv_sec)*1000000 + + (tv_end.tv_nsec - PRIV(op)->tv_begin.tv_nsec)/1000; + // Compare with target execution time + // 100% free -> target time = 0 + // 0% free -> target time = iodepth/parallelism * (iops + size/bw) / write per second + uint64_t used_start = journal.get_trim_pos(); + uint64_t journal_free_space = journal.next_free < used_start + ? (used_start - journal.next_free) + : (journal.len - journal.next_free + used_start - journal.block_size); + uint64_t ref_us = + (write_iodepth <= throttle_target_parallelism ? 100 : 100*write_iodepth/throttle_target_parallelism) + * (1000000/throttle_target_iops + op->len*1000000/throttle_target_mbs/1024/1024) + / 100; + ref_us -= ref_us * journal_free_space / journal.len; + if (ref_us > exec_us + throttle_threshold_us) + { + // Pause reply + tfd->set_timer_us(ref_us-exec_us, false, [this, op](int timer_id) + { + PRIV(op)->op_state++; + ringloop->wakeup(); + }); + PRIV(op)->op_state = 5; + return 1; + } } } +resume_6: // Acknowledge write op->retval = op->len; write_iodepth--; diff --git a/src/fio_engine.cpp b/src/fio_engine.cpp index ffc58e9d..a2f73bd0 100644 --- a/src/fio_engine.cpp +++ b/src/fio_engine.cpp @@ -25,6 +25,7 @@ // -bs_config='{"data_device":"./test_data.bin"}' -size=1000M #include "blockstore.h" +#include "epoll_manager.h" #include "fio_headers.h" #include "json11/json11.hpp" @@ -32,6 +33,7 @@ struct bs_data { blockstore_t *bs; + epoll_manager_t *epmgr; ring_loop_t *ringloop; /* The list of completed io_u structs. */ std::vector completed; @@ -104,6 +106,7 @@ static void bs_cleanup(struct thread_data *td) } safe: delete bsd->bs; + delete bsd->epmgr; delete bsd->ringloop; delete bsd; } @@ -129,7 +132,8 @@ static int bs_init(struct thread_data *td) } } bsd->ringloop = new ring_loop_t(512); - bsd->bs = new blockstore_t(config, bsd->ringloop); + bsd->epmgr = new epoll_manager_t(bsd->ringloop); + bsd->bs = new blockstore_t(config, bsd->ringloop, bsd->epmgr->tfd); while (1) { bsd->ringloop->loop(); diff --git a/src/osd.cpp b/src/osd.cpp index db7b05a0..5ddb508f 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -27,14 +27,15 @@ osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop) this->config = config; this->ringloop = ringloop; + epmgr = new epoll_manager_t(ringloop); + // FIXME: Use timerfd_interval based directly on io_uring + this->tfd = epmgr->tfd; + // FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config - this->bs = new blockstore_t(config, ringloop); + this->bs = new blockstore_t(config, ringloop, tfd); parse_config(config); - epmgr = new epoll_manager_t(ringloop); - this->tfd = epmgr->tfd; - this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) { print_stats(); diff --git a/src/test_blockstore.cpp b/src/test_blockstore.cpp index 16af03ef..ddcf43b3 100644 --- a/src/test_blockstore.cpp +++ b/src/test_blockstore.cpp @@ -3,6 +3,7 @@ #include #include "blockstore.h" +#include "epoll_manager.h" int main(int narg, char *args[]) { @@ -11,7 +12,8 @@ int main(int narg, char *args[]) config["journal_device"] = "./test_journal.bin"; config["data_device"] = "./test_data.bin"; ring_loop_t *ringloop = new ring_loop_t(512); - blockstore_t *bs = new blockstore_t(config, ringloop); + epoll_manager_t *epmgr = new epoll_manager_t(ringloop); + blockstore_t *bs = new blockstore_t(config, ringloop, epmgr->tfd); blockstore_op_t op; int main_state = 0; @@ -120,6 +122,7 @@ int main(int narg, char *args[]) ringloop->wait(); } delete bs; + delete epmgr; delete ringloop; return 0; }