Browse Source

Implement journaled write throttling for the SSD+HDD case

rdma-zerocopy
Vitaliy Filippov 6 months ago
parent
commit
2ab423d4ef
  1. 4
      src/blockstore.cpp
  2. 3
      src/blockstore.h
  3. 3
      src/blockstore_impl.cpp
  4. 13
      src/blockstore_impl.h
  5. 21
      src/blockstore_open.cpp
  6. 193
      src/blockstore_write.cpp
  7. 6
      src/fio_engine.cpp
  8. 9
      src/osd.cpp
  9. 5
      src/test_blockstore.cpp

4
src/blockstore.cpp

@ -3,9 +3,9 @@
#include "blockstore_impl.h"
blockstore_t::blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop)
blockstore_t::blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd)
{
impl = new blockstore_impl_t(config, ringloop);
impl = new blockstore_impl_t(config, ringloop, tfd);
}
blockstore_t::~blockstore_t()

3
src/blockstore.h

@ -16,6 +16,7 @@
#include "object_id.h"
#include "ringloop.h"
#include "timerfd_manager.h"
// Memory alignment for direct I/O (usually 512 bytes)
// All other alignments must be a multiple of this one
@ -158,7 +159,7 @@ class blockstore_t
{
blockstore_impl_t *impl;
public:
blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop);
blockstore_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd);
~blockstore_t();
// Event loop

3
src/blockstore_impl.cpp

@ -3,9 +3,10 @@
#include "blockstore_impl.h"
blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop)
blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd)
{
assert(sizeof(blockstore_op_private_t) <= BS_OP_PRIVATE_DATA_SIZE);
this->tfd = tfd;
this->ringloop = ringloop;
ring_consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&ring_consumer);

13
src/blockstore_impl.h

@ -9,6 +9,7 @@
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <time.h>
#include <unistd.h>
#include <linux/fs.h>
@ -158,6 +159,7 @@ struct blockstore_op_private_t
struct iovec iov_zerofill[3];
// Warning: must not have a default value here because it's written to before calling constructor in blockstore_write.cpp O_o
uint64_t real_version;
timespec tv_begin;
// Sync
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
@ -203,6 +205,14 @@ class blockstore_impl_t
unsigned max_flusher_count, min_flusher_count;
// Maximum queue depth
unsigned max_write_iodepth = 128;
// Enable small (journaled) write throttling, useful for the SSD+HDD case
bool throttle_small_writes = false;
// Target data device iops, bandwidth and parallelism for throttling (100/100/1 is the default for HDD)
int throttle_target_iops = 100;
int throttle_target_mbs = 100;
int throttle_target_parallelism = 1;
// Minimum difference in microseconds between target and real execution times to throttle the response
int throttle_threshold_us = 50;
/******* END OF OPTIONS *******/
struct ring_consumer_t ring_consumer;
@ -233,6 +243,7 @@ class blockstore_impl_t
bool live = false, queue_stall = false;
ring_loop_t *ringloop;
timerfd_manager_t *tfd;
bool stop_sync_submitted;
@ -303,7 +314,7 @@ class blockstore_impl_t
public:
blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop);
blockstore_impl_t(blockstore_config_t & config, ring_loop_t *ringloop, timerfd_manager_t *tfd);
~blockstore_impl_t();
// Event loop

21
src/blockstore_open.cpp

@ -79,6 +79,11 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config)
max_flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
min_flusher_count = strtoull(config["min_flusher_count"].c_str(), NULL, 10);
max_write_iodepth = strtoull(config["max_write_iodepth"].c_str(), NULL, 10);
throttle_small_writes = config["throttle_small_writes"] == "true" || config["throttle_small_writes"] == "1" || config["throttle_small_writes"] == "yes";
throttle_target_iops = strtoull(config["throttle_target_iops"].c_str(), NULL, 10);
throttle_target_mbs = strtoull(config["throttle_target_mbs"].c_str(), NULL, 10);
throttle_target_parallelism = strtoull(config["throttle_target_parallelism"].c_str(), NULL, 10);
throttle_threshold_us = strtoull(config["throttle_threshold_us"].c_str(), NULL, 10);
// Validate
if (!block_size)
{
@ -180,6 +185,22 @@ void blockstore_impl_t::parse_config(blockstore_config_t & config)
{
throw std::runtime_error("immediate_commit=all requires disable_journal_fsync and disable_data_fsync");
}
if (!throttle_target_iops)
{
throttle_target_iops = 100;
}
if (!throttle_target_mbs)
{
throttle_target_mbs = 100;
}
if (!throttle_target_parallelism)
{
throttle_target_parallelism = 1;
}
if (!throttle_threshold_us)
{
throttle_threshold_us = 50;
}
// init some fields
clean_entry_bitmap_size = block_size / bitmap_granularity / 8;
clean_entry_size = sizeof(clean_disk_entry) + 2*clean_entry_bitmap_size;

193
src/blockstore_write.cpp

@ -122,6 +122,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
else
{
state = (op->len == block_size || deleted ? BS_ST_BIG_WRITE : BS_ST_SMALL_WRITE);
if (state == BS_ST_SMALL_WRITE && throttle_small_writes)
clock_gettime(CLOCK_REALTIME, &PRIV(op)->tv_begin);
if (wait_del)
state |= BS_ST_WAIT_DEL;
else if (state == BS_ST_SMALL_WRITE && wait_big)
@ -424,105 +426,148 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
int blockstore_impl_t::continue_write(blockstore_op_t *op)
{
io_uring_sqe *sqe = NULL;
journal_entry_big_write *je;
int op_state = PRIV(op)->op_state;
if (op_state != 2 && op_state != 4)
{
// In progress
return 1;
}
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
assert(dirty_it != dirty_db.end());
if (op_state == 2)
goto resume_2;
else if (op_state == 4)
goto resume_4;
else if (op_state == 6)
goto resume_6;
else
{
// In progress
return 1;
}
resume_2:
// Only for the immediate_commit mode: prepare and submit big_write journal entry
BS_SUBMIT_GET_SQE_DECL(sqe);
je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + clean_entry_bitmap_size
);
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
assert(dirty_it != dirty_db.end());
io_uring_sqe *sqe = NULL;
BS_SUBMIT_GET_SQE_DECL(sqe);
journal_entry_big_write *je = (journal_entry_big_write*)prefill_single_journal_entry(
journal, op->opcode == BS_OP_WRITE_STABLE ? JE_BIG_WRITE_INSTANT : JE_BIG_WRITE,
sizeof(journal_entry_big_write) + clean_entry_bitmap_size
);
dirty_it->second.journal_sector = journal.sector_info[journal.cur_sector].offset;
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]++;
#ifdef BLOCKSTORE_DEBUG
printf(
"journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version,
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
);
printf(
"journal offset %08lx is used by %lx:%lx v%lu (%lu refs)\n",
journal.sector_info[journal.cur_sector].offset, op->oid.inode, op->oid.stripe, op->version,
journal.used_sectors[journal.sector_info[journal.cur_sector].offset]
);
#endif
je->oid = op->oid;
je->version = op->version;
je->offset = op->offset;
je->len = op->len;
je->location = dirty_it->second.location;
memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size);
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
prepare_journal_sector_write(journal, journal.cur_sector, sqe,
[this, op](ring_data_t *data) { handle_write_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 3;
return 1;
je->oid = op->oid;
je->version = op->version;
je->offset = op->offset;
je->len = op->len;
je->location = dirty_it->second.location;
memcpy((void*)(je+1), (clean_entry_bitmap_size > sizeof(void*) ? dirty_it->second.bitmap : &dirty_it->second.bitmap), clean_entry_bitmap_size);
je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32;
prepare_journal_sector_write(journal, journal.cur_sector, sqe,
[this, op](ring_data_t *data) { handle_write_event(data, op); });
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 3;
return 1;
}
resume_4:
// Switch object state
#ifdef BLOCKSTORE_DEBUG
printf("Ack write %lx:%lx v%lu = state 0x%x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state);
#endif
bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
? (immediate_commit == IMMEDIATE_ALL)
: (immediate_commit != IMMEDIATE_NONE);
if (imm)
{
auto & unstab = unstable_writes[op->oid];
unstab = unstab < op->version ? op->version : unstab;
}
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
{
// Deletions and 'instant' operations are treated as immediately stable
mark_stable(dirty_it->first);
}
if (!imm)
{
if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
assert(dirty_it != dirty_db.end());
bool is_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE;
bool imm = is_big ? (immediate_commit == IMMEDIATE_ALL) : (immediate_commit != IMMEDIATE_NONE);
if (imm)
{
// Remember big write as unsynced
unsynced_big_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
auto & unstab = unstable_writes[op->oid];
unstab = unstab < op->version ? op->version : unstab;
}
else
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK)
| (imm ? BS_ST_SYNCED : BS_ST_WRITTEN);
if (imm && ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE || (dirty_it->second.state & BS_ST_INSTANT)))
{
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
// Deletions and 'instant' operations are treated as immediately stable
mark_stable(dirty_it->first);
}
}
if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
{
// Unblock small writes
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
if (!imm)
{
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
if (is_big)
{
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
// Remember big write as unsynced
unsynced_big_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
}
else
{
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){
.oid = op->oid,
.version = op->version,
});
}
}
if (imm && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
{
// Unblock small writes
dirty_it++;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == op->oid)
{
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG)
{
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | BS_ST_IN_FLIGHT;
}
dirty_it++;
}
}
// Apply throttling to not fill the journal too fast for the SSD+HDD case
if (!is_big && throttle_small_writes)
{
// Apply throttling
timespec tv_end;
clock_gettime(CLOCK_REALTIME, &tv_end);
uint64_t exec_us =
(tv_end.tv_sec - PRIV(op)->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - PRIV(op)->tv_begin.tv_nsec)/1000;
// Compare with target execution time
// 100% free -> target time = 0
// 0% free -> target time = iodepth/parallelism * (iops + size/bw) / write per second
uint64_t used_start = journal.get_trim_pos();
uint64_t journal_free_space = journal.next_free < used_start
? (used_start - journal.next_free)
: (journal.len - journal.next_free + used_start - journal.block_size);
uint64_t ref_us =
(write_iodepth <= throttle_target_parallelism ? 100 : 100*write_iodepth/throttle_target_parallelism)
* (1000000/throttle_target_iops + op->len*1000000/throttle_target_mbs/1024/1024)
/ 100;
ref_us -= ref_us * journal_free_space / journal.len;
if (ref_us > exec_us + throttle_threshold_us)
{
// Pause reply
tfd->set_timer_us(ref_us-exec_us, false, [this, op](int timer_id)
{
PRIV(op)->op_state++;
ringloop->wakeup();
});
PRIV(op)->op_state = 5;
return 1;
}
}
}
resume_6:
// Acknowledge write
op->retval = op->len;
write_iodepth--;

6
src/fio_engine.cpp

@ -25,6 +25,7 @@
// -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
#include "blockstore.h"
#include "epoll_manager.h"
#include "fio_headers.h"
#include "json11/json11.hpp"
@ -32,6 +33,7 @@
struct bs_data
{
blockstore_t *bs;
epoll_manager_t *epmgr;
ring_loop_t *ringloop;
/* The list of completed io_u structs. */
std::vector<io_u*> completed;
@ -104,6 +106,7 @@ static void bs_cleanup(struct thread_data *td)
}
safe:
delete bsd->bs;
delete bsd->epmgr;
delete bsd->ringloop;
delete bsd;
}
@ -129,7 +132,8 @@ static int bs_init(struct thread_data *td)
}
}
bsd->ringloop = new ring_loop_t(512);
bsd->bs = new blockstore_t(config, bsd->ringloop);
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
bsd->bs = new blockstore_t(config, bsd->ringloop, bsd->epmgr->tfd);
while (1)
{
bsd->ringloop->loop();

9
src/osd.cpp

@ -27,14 +27,15 @@ osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop)
this->config = config;
this->ringloop = ringloop;
epmgr = new epoll_manager_t(ringloop);
// FIXME: Use timerfd_interval based directly on io_uring
this->tfd = epmgr->tfd;
// FIXME: Create Blockstore from on-disk superblock config and check it against the OSD cluster config
this->bs = new blockstore_t(config, ringloop);
this->bs = new blockstore_t(config, ringloop, tfd);
parse_config(config);
epmgr = new epoll_manager_t(ringloop);
this->tfd = epmgr->tfd;
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{
print_stats();

5
src/test_blockstore.cpp

@ -3,6 +3,7 @@
#include <malloc.h>
#include "blockstore.h"
#include "epoll_manager.h"
int main(int narg, char *args[])
{
@ -11,7 +12,8 @@ int main(int narg, char *args[])
config["journal_device"] = "./test_journal.bin";
config["data_device"] = "./test_data.bin";
ring_loop_t *ringloop = new ring_loop_t(512);
blockstore_t *bs = new blockstore_t(config, ringloop);
epoll_manager_t *epmgr = new epoll_manager_t(ringloop);
blockstore_t *bs = new blockstore_t(config, ringloop, epmgr->tfd);
blockstore_op_t op;
int main_state = 0;
@ -120,6 +122,7 @@ int main(int narg, char *args[])
ringloop->wait();
}
delete bs;
delete epmgr;
delete ringloop;
return 0;
}
Loading…
Cancel
Save