Extract alignments to options

blocking-uring-test
Vitaliy Filippov 2020-01-16 00:35:35 +03:00
parent 57ecbb2cda
commit 43f6cfeb73
16 changed files with 274 additions and 263 deletions

View File

@ -21,8 +21,8 @@ timerfd_interval.o: timerfd_interval.cpp timerfd_interval.h
libblockstore.so: $(BLOCKSTORE_OBJS)
g++ $(CXXFLAGS) -o libblockstore.so -shared $(BLOCKSTORE_OBJS) -ltcmalloc_minimal -luring
libfio_blockstore.so: ./libblockstore.so fio_engine.cpp
g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp ./libblockstore.so -ltcmalloc_minimal -luring
libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o
g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring
osd_exec_secondary.o: osd_exec_secondary.cpp osd.h osd_ops.h
g++ $(CXXFLAGS) -c -o $@ $<

View File

@ -50,11 +50,6 @@ uint32_t blockstore_t::get_block_size()
return impl->get_block_size();
}
uint32_t blockstore_t::get_block_order()
{
return impl->get_block_order();
}
uint64_t blockstore_t::get_block_count()
{
return impl->get_block_count();

View File

@ -101,6 +101,5 @@ public:
std::map<object_id, uint64_t> & get_unstable_writes();
uint32_t get_block_size();
uint32_t get_block_order();
uint64_t get_block_count();
};

View File

@ -8,7 +8,7 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
sync_threshold = flusher_count == 1 ? 1 : flusher_count/2;
journal_trim_interval = sync_threshold;
journal_trim_counter = 0;
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, JOURNAL_BLOCK_SIZE);
journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign(MEM_ALIGNMENT, bs->journal_block_size);
co = new journal_flusher_co[flusher_count];
for (int i = 0; i < flusher_count; i++)
{
@ -316,7 +316,7 @@ resume_1:
}
memset(meta_old.buf + meta_old.pos*bs->clean_entry_size, 0, bs->clean_entry_size);
await_sqe(15);
data->iov = (struct iovec){ meta_old.buf, META_BLOCK_SIZE };
data->iov = (struct iovec){ meta_old.buf, bs->meta_block_size };
data->callback = simple_callback_w;
my_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_old.sector
@ -338,7 +338,7 @@ resume_1:
}
}
await_sqe(6);
data->iov = (struct iovec){ meta_new.buf, META_BLOCK_SIZE };
data->iov = (struct iovec){ meta_new.buf, bs->meta_block_size };
data->callback = simple_callback_w;
my_uring_prep_writev(
sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_new.sector
@ -402,7 +402,7 @@ resume_1:
.journal_start = bs->journal.used_start,
};
((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock);
data->iov = (struct iovec){ flusher->journal_superblock, JOURNAL_BLOCK_SIZE };
data->iov = (struct iovec){ flusher->journal_superblock, bs->journal_block_size };
data->callback = simple_callback_w;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++;
@ -533,8 +533,8 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
// We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
// so I'll avoid it as long as I can.
wr.sector = ((meta_loc >> bs->block_order) / (META_BLOCK_SIZE / bs->clean_entry_size)) * META_BLOCK_SIZE;
wr.pos = ((meta_loc >> bs->block_order) % (META_BLOCK_SIZE / bs->clean_entry_size));
wr.sector = ((meta_loc >> bs->block_order) / (bs->meta_block_size / bs->clean_entry_size)) * bs->meta_block_size;
wr.pos = ((meta_loc >> bs->block_order) % (bs->meta_block_size / bs->clean_entry_size));
if (bs->inmemory_meta)
{
wr.buf = bs->metadata_buffer + wr.sector;
@ -544,16 +544,16 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
if (wr.it == flusher->meta_sectors.end())
{
// Not in memory yet, read it
wr.buf = memalign(MEM_ALIGNMENT, META_BLOCK_SIZE);
wr.buf = memalign(MEM_ALIGNMENT, bs->meta_block_size);
wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){
.offset = wr.sector,
.len = META_BLOCK_SIZE,
.len = bs->meta_block_size,
.state = 0, // 0 = not read yet
.buf = wr.buf,
.usage_count = 1,
}).first;
await_sqe(0);
data->iov = (struct iovec){ wr.it->second.buf, META_BLOCK_SIZE };
data->iov = (struct iovec){ wr.it->second.buf, bs->meta_block_size };
data->callback = simple_callback_r;
wr.submitted = true;
my_uring_prep_readv(
@ -690,19 +690,19 @@ void journal_flusher_co::bitmap_set(void *bitmap, uint64_t start, uint64_t len)
{
if (start == 0)
{
if (len == 32*BITMAP_GRANULARITY)
if (len == 32*bs->bitmap_granularity)
{
*((uint32_t*)bitmap) = UINT32_MAX;
return;
}
else if (len == 64*BITMAP_GRANULARITY)
else if (len == 64*bs->bitmap_granularity)
{
*((uint64_t*)bitmap) = UINT64_MAX;
return;
}
}
unsigned bit_start = start / BITMAP_GRANULARITY;
unsigned bit_end = ((start + len) + BITMAP_GRANULARITY - 1) / BITMAP_GRANULARITY;
unsigned bit_start = start / bs->bitmap_granularity;
unsigned bit_end = ((start + len) + bs->bitmap_granularity - 1) / bs->bitmap_granularity;
while (bit_start < bit_end)
{
if (!(bit_start & 7) && bit_end >= bit_start+8)

View File

@ -7,24 +7,15 @@ blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *
ring_consumer.loop = [this]() { loop(); };
ringloop->register_consumer(ring_consumer);
initialized = 0;
block_order = strtoull(config["block_size_order"].c_str(), NULL, 10);
if (block_order == 0)
{
block_order = DEFAULT_ORDER;
}
block_size = 1 << block_order;
if (block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE)
{
throw std::runtime_error("Bad block size");
}
zero_object = (uint8_t*)memalign(MEM_ALIGNMENT, block_size);
data_fd = meta_fd = journal.fd = -1;
parse_config(config);
try
{
open_data(config);
open_meta(config);
open_journal(config);
calc_lengths(config);
open_data();
open_meta();
open_journal();
calc_lengths();
data_alloc = new allocator(block_count);
}
catch (std::exception & e)
@ -37,9 +28,6 @@ blockstore_impl_t::blockstore_impl_t(blockstore_config_t & config, ring_loop_t *
close(journal.fd);
throw;
}
int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
if (!flusher_count)
flusher_count = 32;
flusher = new journal_flusher_t(flusher_count, this);
}
@ -306,7 +294,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
((type == BS_OP_READ || type == BS_OP_WRITE) && (
op->offset >= block_size ||
op->len > block_size-op->offset ||
(op->len % DISK_ALIGNMENT)
(op->len % disk_alignment)
)) ||
readonly && type != BS_OP_READ ||
first && type == BS_OP_WRITE)

View File

@ -26,16 +26,6 @@
// Memory alignment for direct I/O (usually 512 bytes)
// All other alignments must be a multiple of this one
#define MEM_ALIGNMENT 512
// FIXME: Make following constants configurable
// Required write alignment and journal/metadata/data areas' location alignment
#define DISK_ALIGNMENT 512
// Journal block size - minimum_io_size of the journal device is the best choice
#define JOURNAL_BLOCK_SIZE 512
// Metadata block size - minimum_io_size of the metadata device is the best choice
#define META_BLOCK_SIZE 512
// Sparse write tracking granularity. 4 KB is a good choice. Must be a multiple
// of the write alignment.
#define BITMAP_GRANULARITY 4096
// States are not stored on disk. Instead, they're deduced from the journal
@ -181,8 +171,36 @@ struct blockstore_op_private_t
#include "blockstore_flush.h"
struct blockstore_params_t
{
uint32_t block_size;
};
class blockstore_impl_t
{
/******* OPTIONS *******/
std::string data_device, meta_device, journal_device;
uint32_t block_size;
uint64_t meta_offset;
uint64_t data_offset;
uint64_t cfg_journal_size;
// Required write alignment and journal/metadata/data areas' location alignment
uint64_t disk_alignment = 512;
// Journal block size - minimum_io_size of the journal device is the best choice
uint64_t journal_block_size = 512;
// Metadata block size - minimum_io_size of the metadata device is the best choice
uint64_t meta_block_size = 512;
// Sparse write tracking granularity. 4 KB is a good choice. Must be a multiple of disk_alignment
uint64_t bitmap_granularity = 4096;
bool readonly = false;
// FIXME: separate flags for data, metadata and journal
// It is safe to disable fsync() if drive write cache is writethrough
bool disable_fsync = false;
bool inmemory_meta = false;
int flusher_count;
/******* END OF OPTIONS *******/
struct ring_consumer_t ring_consumer;
// Another option is https://github.com/algorithm-ninja/cpp-btree
@ -195,21 +213,15 @@ class blockstore_impl_t
allocator *data_alloc = NULL;
uint8_t *zero_object;
uint32_t block_order;
uint64_t block_count;
uint32_t block_order, block_size;
uint32_t clean_entry_bitmap_size = 0, clean_entry_size = 0;
int meta_fd;
int data_fd;
uint64_t meta_size, meta_area, meta_len;
uint64_t data_size, data_len;
uint64_t meta_offset, meta_size, meta_area, meta_len;
uint64_t data_offset, data_size, data_len;
bool readonly = false;
// FIXME: separate flags for data, metadata and journal
// It is safe to disable fsync() if drive write cache is writethrough
bool disable_fsync = false;
bool inmemory_meta = false;
void *metadata_buffer = NULL;
struct journal_t journal;
@ -231,10 +243,11 @@ class blockstore_impl_t
friend class journal_flusher_t;
friend class journal_flusher_co;
void calc_lengths(blockstore_config_t & config);
void open_data(blockstore_config_t & config);
void open_meta(blockstore_config_t & config);
void open_journal(blockstore_config_t & config);
void parse_config(blockstore_config_t & config);
void calc_lengths();
void open_data();
void open_meta();
void open_journal();
// Asynchronous init
int initialized;
@ -302,6 +315,5 @@ public:
std::map<object_id, uint64_t> unstable_writes;
inline uint32_t get_block_size() { return block_size; }
inline uint32_t get_block_order() { return block_order; }
inline uint64_t get_block_count() { return block_count; }
};

View File

@ -65,8 +65,8 @@ int blockstore_init_meta::loop()
void *done_buf = bs->inmemory_meta
? (metadata_buffer + done_pos)
: (metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0));
unsigned count = META_BLOCK_SIZE / bs->clean_entry_size;
for (int sector = 0; sector < done_len; sector += META_BLOCK_SIZE)
unsigned count = bs->meta_block_size / bs->clean_entry_size;
for (int sector = 0; sector < done_len; sector += bs->meta_block_size)
{
// handle <count> entries
handle_entries(done_buf + sector, count, bs->block_order);
@ -135,6 +135,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
blockstore_init_journal::blockstore_init_journal(blockstore_impl_t *bs)
{
this->bs = bs;
next_free = bs->journal.block_size;
simple_callback = [this](ring_data_t *data1)
{
if (data1->res != data1->iov.iov_len)
@ -171,7 +172,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data1)
if (journal_pos >= bs->journal.len)
{
// Continue from the beginning
journal_pos = JOURNAL_BLOCK_SIZE;
journal_pos = bs->journal.block_size;
wrapped = true;
}
submitted_buf = NULL;
@ -198,7 +199,7 @@ int blockstore_init_journal::loop()
printf("Reading blockstore journal\n");
if (!bs->journal.inmemory)
{
submitted_buf = memalign(MEM_ALIGNMENT, 2*JOURNAL_BLOCK_SIZE);
submitted_buf = memalign(MEM_ALIGNMENT, 2*bs->journal.block_size);
if (!submitted_buf)
throw std::bad_alloc();
}
@ -209,7 +210,7 @@ int blockstore_init_journal::loop()
if (!sqe)
throw std::runtime_error("io_uring is full while trying to read journal");
data = ((ring_data_t*)sqe->user_data);
data->iov = { submitted_buf, JOURNAL_BLOCK_SIZE };
data->iov = { submitted_buf, bs->journal.block_size };
data->callback = simple_callback;
my_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
bs->ringloop->submit();
@ -223,18 +224,18 @@ resume_1:
if (iszero((uint64_t*)submitted_buf, 3))
{
// Journal is empty
// FIXME handle this wrapping to JOURNAL_BLOCK_SIZE better (maybe)
bs->journal.used_start = JOURNAL_BLOCK_SIZE;
bs->journal.next_free = JOURNAL_BLOCK_SIZE;
// FIXME handle this wrapping to journal_block_size better (maybe)
bs->journal.used_start = bs->journal.block_size;
bs->journal.next_free = bs->journal.block_size;
// Initialize journal "superblock" and the first block
memset(submitted_buf, 0, 2*JOURNAL_BLOCK_SIZE);
memset(submitted_buf, 0, 2*bs->journal.block_size);
*((journal_entry_start*)submitted_buf) = {
.crc32 = 0,
.magic = JOURNAL_MAGIC,
.type = JE_START,
.size = sizeof(journal_entry_start),
.reserved = 0,
.journal_start = JOURNAL_BLOCK_SIZE,
.journal_start = bs->journal.block_size,
};
((journal_entry_start*)submitted_buf)->crc32 = je_crc32((journal_entry*)submitted_buf);
if (bs->readonly)
@ -246,7 +247,7 @@ resume_1:
// Cool effect. Same operations result in journal replay.
// FIXME: Randomize initial crc32. Track crc32 when trimming.
GET_SQE();
data->iov = (struct iovec){ submitted_buf, 2*JOURNAL_BLOCK_SIZE };
data->iov = (struct iovec){ submitted_buf, 2*bs->journal.block_size };
data->callback = simple_callback;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
wait_count++;
@ -326,7 +327,7 @@ resume_1:
if (init_write_buf && !bs->readonly)
{
GET_SQE();
data->iov = { init_write_buf, JOURNAL_BLOCK_SIZE };
data->iov = { init_write_buf, bs->journal.block_size };
data->callback = simple_callback;
wait_count++;
my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + init_write_sector);
@ -393,8 +394,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
uint64_t proc_pos, pos;
if (continue_pos != 0)
{
proc_pos = (continue_pos / JOURNAL_BLOCK_SIZE) * JOURNAL_BLOCK_SIZE;
pos = continue_pos % JOURNAL_BLOCK_SIZE;
proc_pos = (continue_pos / bs->journal.block_size) * bs->journal.block_size;
pos = continue_pos % bs->journal.block_size;
continue_pos = 0;
goto resume;
}
@ -402,13 +403,13 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
{
proc_pos = next_free;
pos = 0;
next_free += JOURNAL_BLOCK_SIZE;
next_free += bs->journal.block_size;
if (next_free >= bs->journal.len)
{
next_free = JOURNAL_BLOCK_SIZE;
next_free = bs->journal.block_size;
}
resume:
while (pos < JOURNAL_BLOCK_SIZE)
while (pos < bs->journal.block_size)
{
journal_entry *je = (journal_entry*)(buf + proc_pos - done_pos + pos);
if (je->magic != JOURNAL_MAGIC || je_crc32(je) != je->crc32 ||
@ -436,13 +437,13 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
if (next_free + je->small_write.len > bs->journal.len)
{
// data continues from the beginning of the journal
next_free = JOURNAL_BLOCK_SIZE;
next_free = bs->journal.block_size;
}
uint64_t location = next_free;
next_free += je->small_write.len;
if (next_free >= bs->journal.len)
{
next_free = JOURNAL_BLOCK_SIZE;
next_free = bs->journal.block_size;
}
if (location != je->small_write.data_offset)
{
@ -483,7 +484,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
{
// journal entry is corrupt, stop here
// interesting thing is that we must clear the corrupt entry if we're not readonly
memset(buf + proc_pos - done_pos + pos, 0, JOURNAL_BLOCK_SIZE - pos);
memset(buf + proc_pos - done_pos + pos, 0, bs->journal.block_size - pos);
bs->journal.next_free = prev_free;
init_write_buf = buf + proc_pos - done_pos;
init_write_sector = proc_pos;

View File

@ -31,7 +31,7 @@ class blockstore_init_journal
uint64_t entries_loaded = 0;
uint32_t crc32_last = 0;
bool started = false;
uint64_t next_free = JOURNAL_BLOCK_SIZE;
uint64_t next_free;
std::vector<bs_init_journal_done> done;
uint64_t journal_pos = 0;
uint64_t continue_pos = 0;

View File

@ -15,7 +15,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
{
while (1)
{
int fits = (JOURNAL_BLOCK_SIZE - next_in_pos) / size;
int fits = (bs->journal.block_size - next_in_pos) / size;
if (fits > 0)
{
required -= fits;
@ -31,10 +31,10 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
{
break;
}
next_pos = next_pos + JOURNAL_BLOCK_SIZE;
next_pos = next_pos + bs->journal.block_size;
if (next_pos >= bs->journal.len)
{
next_pos = JOURNAL_BLOCK_SIZE;
next_pos = bs->journal.block_size;
right_dir = false;
}
next_in_pos = 0;
@ -60,11 +60,11 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
next_pos = next_pos + data_after;
if (next_pos > bs->journal.len)
{
next_pos = JOURNAL_BLOCK_SIZE + data_after;
next_pos = bs->journal.block_size + data_after;
right_dir = false;
}
}
if (!right_dir && next_pos >= bs->journal.used_start-JOURNAL_BLOCK_SIZE)
if (!right_dir && next_pos >= bs->journal.used_start-bs->journal.block_size)
{
// No space in the journal. Wait until used_start changes.
PRIV(op)->wait_for = WAIT_JOURNAL;
@ -77,7 +77,7 @@ int blockstore_journal_check_t::check_available(blockstore_op_t *op, int require
journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type, uint32_t size)
{
if (JOURNAL_BLOCK_SIZE - journal.in_sector_pos < size)
if (journal.block_size - journal.in_sector_pos < size)
{
assert(!journal.sector_info[journal.cur_sector].dirty);
// Move to the next journal sector
@ -88,15 +88,15 @@ journal_entry* prefill_single_journal_entry(journal_t & journal, uint16_t type,
}
journal.sector_info[journal.cur_sector].offset = journal.next_free;
journal.in_sector_pos = 0;
journal.next_free = (journal.next_free+JOURNAL_BLOCK_SIZE) < journal.len ? journal.next_free + JOURNAL_BLOCK_SIZE : JOURNAL_BLOCK_SIZE;
journal.next_free = (journal.next_free+journal.block_size) < journal.len ? journal.next_free + journal.block_size : journal.block_size;
memset(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + JOURNAL_BLOCK_SIZE*journal.cur_sector, 0, JOURNAL_BLOCK_SIZE);
: journal.sector_buf + journal.block_size*journal.cur_sector, 0, journal.block_size);
}
journal_entry *je = (struct journal_entry*)(
(journal.inmemory
? journal.buffer + journal.sector_info[journal.cur_sector].offset
: journal.sector_buf + JOURNAL_BLOCK_SIZE*journal.cur_sector) + journal.in_sector_pos
: journal.sector_buf + journal.block_size*journal.cur_sector) + journal.in_sector_pos
);
journal.in_sector_pos += size;
je->magic = JOURNAL_MAGIC;
@ -115,8 +115,8 @@ void prepare_journal_sector_write(journal_t & journal, int cur_sector, io_uring_
data->iov = (struct iovec){
(journal.inmemory
? journal.buffer + journal.sector_info[cur_sector].offset
: journal.sector_buf + JOURNAL_BLOCK_SIZE*cur_sector),
JOURNAL_BLOCK_SIZE
: journal.sector_buf + journal.block_size*cur_sector),
journal.block_size
};
data->callback = cb;
my_uring_prep_writev(

View File

@ -122,9 +122,10 @@ struct journal_t
bool inmemory = false;
void *buffer = NULL;
uint64_t block_size = 512;
uint64_t offset, len;
uint64_t next_free = JOURNAL_BLOCK_SIZE;
uint64_t used_start = JOURNAL_BLOCK_SIZE;
uint64_t next_free = 0;
uint64_t used_start = 0;
uint32_t crc32_last = 0;
// Current sector(s) used for writing
@ -132,7 +133,7 @@ struct journal_t
journal_sector_info_t *sector_info = NULL;
uint64_t sector_count;
int cur_sector = 0;
int in_sector_pos = JOURNAL_BLOCK_SIZE; // no free space because sector is initially unmapped
int in_sector_pos = 0;
// Used sector map
// May use ~ 80 MB per 1 GB of used journal space in the worst case

View File

@ -1,7 +1,23 @@
#include "blockstore_impl.h"
void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
static uint32_t is_power_of_two(uint64_t value)
{
uint32_t l = 0;
while (value > 1)
{
if (value & 1)
{
return 64;
}
value = value >> 1;
l++;
}
return l;
}
void blockstore_impl_t::parse_config(blockstore_config_t & config)
{
// Parse
if (config["readonly"] == "true" || config["readonly"] == "1" || config["readonly"] == "yes")
{
readonly = true;
@ -10,6 +26,112 @@ void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
{
disable_fsync = true;
}
metadata_buf_size = strtoull(config["meta_buf_size"].c_str(), NULL, 10);
cfg_journal_size = strtoull(config["journal_size"].c_str(), NULL, 10);
data_device = config["data_device"];
data_offset = strtoull(config["data_offset"].c_str(), NULL, 10);
meta_device = config["meta_device"];
meta_offset = strtoull(config["meta_offset"].c_str(), NULL, 10);
block_size = strtoull(config["block_size"].c_str(), NULL, 10);
inmemory_meta = config["inmemory_metadata"] != "false";
journal_device = config["journal_device"];
journal.offset = strtoull(config["journal_offset"].c_str(), NULL, 10);
journal.sector_count = strtoull(config["journal_sector_buffer_count"].c_str(), NULL, 10);
journal.inmemory = config["inmemory_journal"] != "false";
disk_alignment = strtoull(config["disk_alignment"].c_str(), NULL, 10);
journal_block_size = strtoull(config["journal_block_size"].c_str(), NULL, 10);
meta_block_size = strtoull(config["meta_block_size"].c_str(), NULL, 10);
bitmap_granularity = strtoull(config["bitmap_granularity"].c_str(), NULL, 10);
flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
// Validate
if (!block_size)
{
block_size = (1 << DEFAULT_ORDER);
}
if ((block_order = is_power_of_two(block_size)) >= 64 || block_size < MIN_BLOCK_SIZE || block_size >= MAX_BLOCK_SIZE)
{
throw std::runtime_error("Bad block size");
}
if (!flusher_count)
{
flusher_count = 32;
}
if (!disk_alignment)
{
disk_alignment = 512;
}
else if (disk_alignment % MEM_ALIGNMENT)
{
throw std::runtime_error("disk_alingment must be a multiple of "+std::to_string(MEM_ALIGNMENT));
}
if (!journal_block_size)
{
journal_block_size = 512;
}
else if (journal_block_size % MEM_ALIGNMENT)
{
throw std::runtime_error("journal_block_size must be a multiple of "+std::to_string(MEM_ALIGNMENT));
}
if (!meta_block_size)
{
meta_block_size = 512;
}
else if (meta_block_size % MEM_ALIGNMENT)
{
throw std::runtime_error("meta_block_size must be a multiple of "+std::to_string(MEM_ALIGNMENT));
}
if (data_offset % disk_alignment)
{
throw std::runtime_error("data_offset must be a multiple of disk_alignment = "+std::to_string(disk_alignment));
}
if (!bitmap_granularity)
{
bitmap_granularity = 4096;
}
else if (bitmap_granularity % disk_alignment)
{
throw std::runtime_error("Sparse write tracking granularity must be a multiple of disk_alignment = "+std::to_string(disk_alignment));
}
if (block_size % bitmap_granularity)
{
throw std::runtime_error("Block size must be a multiple of sparse write tracking granularity");
}
if (journal_device == meta_device || meta_device == "" && journal_device == data_device)
{
journal_device = "";
}
if (meta_device == data_device)
{
meta_device = "";
}
if (meta_offset % meta_block_size)
{
throw std::runtime_error("meta_offset must be a multiple of meta_block_size = "+std::to_string(meta_block_size));
}
if (journal.offset % journal_block_size)
{
throw std::runtime_error("journal_offset must be a multiple of journal_block_size = "+std::to_string(journal_block_size));
}
if (journal.sector_count < 2)
{
journal.sector_count = 32;
}
if (metadata_buf_size < 65536)
{
metadata_buf_size = 4*1024*1024;
}
// init some fields
clean_entry_bitmap_size = block_size / bitmap_granularity / 8;
clean_entry_size = sizeof(clean_disk_entry) + clean_entry_bitmap_size;
journal.block_size = journal_block_size;
journal.next_free = journal_block_size;
journal.used_start = journal_block_size;
// no free space because sector is initially unmapped
journal.in_sector_pos = journal_block_size;
}
void blockstore_impl_t::calc_lengths()
{
// data
data_len = data_size - data_offset;
if (data_fd == meta_fd && data_offset < meta_offset)
@ -44,28 +166,12 @@ void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
? journal.len : meta_offset-journal.offset;
}
// required metadata size
if (BITMAP_GRANULARITY % DISK_ALIGNMENT)
{
throw std::runtime_error("Sparse write tracking granularity must be a multiple of write alignment");
}
if (block_size % BITMAP_GRANULARITY)
{
throw std::runtime_error("Block size must be a multiple of sparse write tracking granularity");
}
clean_entry_bitmap_size = block_size / BITMAP_GRANULARITY / 8;
clean_entry_size = sizeof(clean_disk_entry) + clean_entry_bitmap_size;
block_count = data_len / block_size;
meta_len = ((block_count - 1 + META_BLOCK_SIZE / clean_entry_size) / (META_BLOCK_SIZE / clean_entry_size)) * META_BLOCK_SIZE;
meta_len = ((block_count - 1 + meta_block_size / clean_entry_size) / (meta_block_size / clean_entry_size)) * meta_block_size;
if (meta_area < meta_len)
{
throw std::runtime_error("Metadata area is too small, need at least "+std::to_string(meta_len)+" bytes");
}
metadata_buf_size = strtoull(config["meta_buf_size"].c_str(), NULL, 10);
if (metadata_buf_size < 65536)
{
metadata_buf_size = 4*1024*1024;
}
inmemory_meta = config["inmemory_metadata"] != "false";
if (inmemory_meta)
{
metadata_buffer = memalign(MEM_ALIGNMENT, meta_len);
@ -79,14 +185,13 @@ void blockstore_impl_t::calc_lengths(blockstore_config_t & config)
throw std::runtime_error("Failed to allocate memory for the metadata sparse write bitmap");
}
// requested journal size
uint64_t journal_wanted = strtoull(config["journal_size"].c_str(), NULL, 10);
if (journal_wanted > journal.len)
if (cfg_journal_size > journal.len)
{
throw std::runtime_error("Requested journal_size is too large");
}
else if (journal_wanted > 0)
else if (cfg_journal_size > 0)
{
journal.len = journal_wanted;
journal.len = cfg_journal_size;
}
if (journal.len < MIN_JOURNAL_SIZE)
{
@ -127,14 +232,9 @@ void check_size(int fd, uint64_t *size, std::string name)
}
}
void blockstore_impl_t::open_data(blockstore_config_t & config)
void blockstore_impl_t::open_data()
{
data_offset = strtoull(config["data_offset"].c_str(), NULL, 10);
if (data_offset % DISK_ALIGNMENT)
{
throw std::runtime_error("data_offset not aligned");
}
data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR);
data_fd = open(data_device.c_str(), O_DIRECT|O_RDWR);
if (data_fd == -1)
{
throw std::runtime_error("Failed to open data device");
@ -142,21 +242,16 @@ void blockstore_impl_t::open_data(blockstore_config_t & config)
check_size(data_fd, &data_size, "data device");
if (data_offset >= data_size)
{
throw std::runtime_error("data_offset exceeds device size");
throw std::runtime_error("data_offset exceeds device size = "+std::to_string(data_size));
}
}
void blockstore_impl_t::open_meta(blockstore_config_t & config)
void blockstore_impl_t::open_meta()
{
meta_offset = strtoull(config["meta_offset"].c_str(), NULL, 10);
if (meta_offset % DISK_ALIGNMENT)
{
throw std::runtime_error("meta_offset not aligned");
}
if (config["meta_device"] != "" && config["meta_device"] != config["data_device"])
if (meta_device != "")
{
meta_offset = 0;
meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR);
meta_fd = open(meta_device.c_str(), O_DIRECT|O_RDWR);
if (meta_fd == -1)
{
throw std::runtime_error("Failed to open metadata device");
@ -164,7 +259,7 @@ void blockstore_impl_t::open_meta(blockstore_config_t & config)
check_size(meta_fd, &meta_size, "metadata device");
if (meta_offset >= meta_size)
{
throw std::runtime_error("meta_offset exceeds device size");
throw std::runtime_error("meta_offset exceeds device size = "+std::to_string(meta_size));
}
}
else
@ -173,21 +268,16 @@ void blockstore_impl_t::open_meta(blockstore_config_t & config)
meta_size = 0;
if (meta_offset >= data_size)
{
throw std::runtime_error("meta_offset exceeds device size");
throw std::runtime_error("meta_offset exceeds device size = "+std::to_string(data_size));
}
}
}
void blockstore_impl_t::open_journal(blockstore_config_t & config)
void blockstore_impl_t::open_journal()
{
journal.offset = strtoull(config["journal_offset"].c_str(), NULL, 10);
if (journal.offset % DISK_ALIGNMENT)
if (journal_device != "")
{
throw std::runtime_error("journal_offset not aligned");
}
if (config["journal_device"] != "" && config["journal_device"] != config["meta_device"])
{
journal.fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR);
journal.fd = open(journal_device.c_str(), O_DIRECT|O_RDWR);
if (journal.fd == -1)
{
throw std::runtime_error("Failed to open journal device");
@ -203,25 +293,15 @@ void blockstore_impl_t::open_journal(blockstore_config_t & config)
throw std::runtime_error("journal_offset exceeds device size");
}
}
journal.sector_count = strtoull(config["journal_sector_buffer_count"].c_str(), NULL, 10);
if (!journal.sector_count)
{
journal.sector_count = 32;
}
journal.sector_info = (journal_sector_info_t*)calloc(journal.sector_count, sizeof(journal_sector_info_t));
if (!journal.sector_info)
{
throw std::bad_alloc();
}
if (config["inmemory_journal"] == "false")
if (!journal.inmemory)
{
journal.inmemory = false;
journal.sector_buf = (uint8_t*)memalign(MEM_ALIGNMENT, journal.sector_count * JOURNAL_BLOCK_SIZE);
journal.sector_buf = (uint8_t*)memalign(MEM_ALIGNMENT, journal.sector_count * journal_block_size);
if (!journal.sector_buf)
throw std::bad_alloc();
}
else
{
journal.inmemory = true;
}
}

View File

@ -144,15 +144,15 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
uint8_t *clean_entry_bitmap;
if (inmemory_meta)
{
uint64_t sector = (meta_loc / (META_BLOCK_SIZE / clean_entry_size)) * META_BLOCK_SIZE;
uint64_t pos = (meta_loc % (META_BLOCK_SIZE / clean_entry_size));
uint64_t sector = (meta_loc / (meta_block_size / clean_entry_size)) * meta_block_size;
uint64_t pos = (meta_loc % (meta_block_size / clean_entry_size));
clean_entry_bitmap = (uint8_t*)(metadata_buffer + sector + pos*clean_entry_size + sizeof(clean_disk_entry));
}
else
{
clean_entry_bitmap = (uint8_t*)(clean_bitmap + meta_loc*clean_entry_bitmap_size);
}
uint64_t bmp_start = 0, bmp_end = 0, bmp_size = block_size/BITMAP_GRANULARITY;
uint64_t bmp_start = 0, bmp_end = 0, bmp_size = block_size/bitmap_granularity;
while (bmp_start < bmp_size)
{
while (!(clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7))) && bmp_end < bmp_size)
@ -162,8 +162,8 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
if (bmp_end > bmp_start)
{
// fill with zeroes
fulfill_read(read_op, fulfilled, bmp_start * BITMAP_GRANULARITY,
bmp_end * BITMAP_GRANULARITY, ST_DEL_STABLE, 0, 0);
fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
bmp_end * bitmap_granularity, ST_DEL_STABLE, 0, 0);
}
bmp_start = bmp_end;
while (clean_entry_bitmap[bmp_end >> 3] & (1 << (bmp_end & 0x7)) && bmp_end < bmp_size)
@ -172,8 +172,8 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
}
if (bmp_end > bmp_start)
{
if (!fulfill_read(read_op, fulfilled, bmp_start * BITMAP_GRANULARITY,
bmp_end * BITMAP_GRANULARITY, ST_CURRENT, 0, clean_it->second.location + bmp_start * BITMAP_GRANULARITY))
if (!fulfill_read(read_op, fulfilled, bmp_start * bitmap_granularity,
bmp_end * bitmap_granularity, ST_CURRENT, 0, clean_it->second.location + bmp_start * bitmap_granularity))
{
// need to wait. undo added requests, don't dequeue op
PRIV(read_op)->read_vec.clear();

View File

@ -94,7 +94,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
// Prepare and submit journal entries
auto cb = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
int s = 0, cur_sector = -1;
if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_stable) &&
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_stable) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)

View File

@ -112,7 +112,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
// Prepare and submit journal entries
auto it = PRIV(op)->sync_big_writes.begin();
int s = 0, cur_sector = -1;
if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_big_write) &&
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_big_write) &&
journal.sector_info[journal.cur_sector].dirty)
{
if (cur_sector == -1)

View File

@ -106,9 +106,9 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
printf("Allocate block %lu\n", loc);
#endif
data_alloc->set(loc, true);
uint64_t stripe_offset = (op->offset % BITMAP_GRANULARITY);
uint64_t stripe_end = (op->offset + op->len) % BITMAP_GRANULARITY;
// Zero fill up to BITMAP_GRANULARITY
uint64_t stripe_offset = (op->offset % bitmap_granularity);
uint64_t stripe_end = (op->offset + op->len) % bitmap_granularity;
// Zero fill up to bitmap_granularity
int vcnt = 0;
if (stripe_offset)
{
@ -117,7 +117,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ op->buf, op->len };
if (stripe_end)
{
stripe_end = BITMAP_GRANULARITY - stripe_end;
stripe_end = bitmap_granularity - stripe_end;
PRIV(op)->iov_zerofill[vcnt++] = (struct iovec){ zero_object, stripe_end };
}
data->iov.iov_len = op->len + stripe_offset + stripe_end; // to check it in the callback
@ -145,7 +145,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
}
// There is sufficient space. Get SQE(s)
struct io_uring_sqe *sqe1 = NULL;
if ((JOURNAL_BLOCK_SIZE - journal.in_sector_pos) < sizeof(journal_entry_small_write) &&
if ((journal_block_size - journal.in_sector_pos) < sizeof(journal_entry_small_write) &&
journal.sector_info[journal.cur_sector].dirty)
{
// Write current journal sector only if it's dirty and full
@ -178,7 +178,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
printf("journal offset %lu is used by %lu:%lu v%lu\n", dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version);
#endif
// Figure out where data will be
journal.next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : JOURNAL_BLOCK_SIZE;
journal.next_free = (journal.next_free + op->len) <= journal.len ? journal.next_free : journal_block_size;
je->oid = op->oid;
je->version = op->version;
je->offset = op->offset;
@ -212,7 +212,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
journal.next_free += op->len;
if (journal.next_free >= journal.len)
{
journal.next_free = JOURNAL_BLOCK_SIZE;
journal.next_free = journal_block_size;
}
// Remember small write as unsynced
unsynced_small_writes.push_back((obj_ver_id){

View File

@ -9,17 +9,17 @@
// Random write:
//
// fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
// -data_device=./test_data.bin -meta_device=./test_meta.bin -journal_device=./test_journal.bin -size=1000M
// -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
//
// Linear write:
//
// fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \
// -data_device=./test_data.bin -meta_device=./test_meta.bin -journal_device=./test_journal.bin -size=1000M
// -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
//
// Random read (run with -iodepth=32 or -iodepth=1):
//
// fio -thread -ioengine=./libfio_blockstore.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \
// -data_device=./test_data.bin -meta_device=./test_meta.bin -journal_device=./test_journal.bin -size=1000M
// -bs_config='{"data_device":"./test_data.bin"}' -size=1000M
#include "blockstore.h"
extern "C" {
@ -28,6 +28,8 @@ extern "C" {
#include "fio/optgroup.h"
}
#include "json11/json11.hpp"
struct bs_data
{
blockstore_t *bs;
@ -40,80 +42,16 @@ struct bs_data
struct bs_options
{
int __pad;
char *data_device = NULL, *meta_device = NULL, *journal_device = NULL, *disable_fsync = NULL, *block_size_order = NULL;
char *data_offset = NULL, *meta_offset = NULL, *journal_offset = NULL;
char *json_config = NULL;
};
static struct fio_option options[] = {
{
.name = "data_device",
.lname = "Data device",
.name = "bs_config",
.lname = "JSON config for Blockstore",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, data_device),
.help = "Name of the data device/file",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "meta_device",
.lname = "Metadata device",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, meta_device),
.help = "Name of the metadata device/file",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "journal_device",
.lname = "Journal device",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, journal_device),
.help = "Name of the journal device/file",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "data_offset",
.lname = "Data offset",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, data_offset),
.help = "Data offset",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "meta_offset",
.lname = "Metadata offset",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, meta_offset),
.help = "Metadata offset",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "journal_offset",
.lname = "Journal offset",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, journal_offset),
.help = "Journal offset",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "disable_fsync",
.lname = "Disable fsync",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, disable_fsync),
.help = "Disable fsyncs for blockstore (unsafe if your disk has cache)",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "block_size_order",
.lname = "Power of 2 for blockstore block size",
.type = FIO_OPT_STR_STORE,
.off1 = offsetof(struct bs_options, block_size_order),
.help = "Set blockstore block size to 2^this value (from 12 to 27)",
.off1 = offsetof(struct bs_options, json_config),
.help = "JSON config for Blockstore",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
@ -178,21 +116,18 @@ static int bs_init(struct thread_data *td)
bs_data *bsd = (bs_data*)td->io_ops_data;
blockstore_config_t config;
config["journal_device"] = o->journal_device;
config["meta_device"] = o->meta_device;
config["data_device"] = o->data_device;
if (o->block_size_order)
config["block_size_order"] = o->block_size_order;
if (o->disable_fsync)
config["disable_fsync"] = o->disable_fsync;
if (o->data_offset)
config["data_offset"] = o->data_offset;
if (o->meta_offset)
config["meta_offset"] = o->meta_offset;
if (o->journal_offset)
config["journal_offset"] = o->journal_offset;
if (read_only)
config["readonly"] = "true";
if (o->json_config)
{
std::string json_err;
auto json_cfg = json11::Json::parse(o->json_config, json_err);
for (auto p: json_cfg.object_items())
{
if (p.second.is_string())
config[p.first] = p.second.string_value();
else
config[p.first] = p.second.dump();
}
}
bsd->ringloop = new ring_loop_t(512);
bsd->bs = new blockstore_t(config, bsd->ringloop);
while (1)
@ -230,7 +165,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
op->buf = io->xfer_buf;
op->oid = {
.inode = 1,
.stripe = io->offset >> bsd->bs->get_block_order(),
.stripe = io->offset / bsd->bs->get_block_size(),
};
op->version = UINT64_MAX; // last unstable
op->offset = io->offset % bsd->bs->get_block_size();
@ -252,7 +187,7 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
op->buf = io->xfer_buf;
op->oid = {
.inode = 1,
.stripe = io->offset >> bsd->bs->get_block_order(),
.stripe = io->offset / bsd->bs->get_block_size(),
};
op->version = 0; // assign automatically
op->offset = io->offset % bsd->bs->get_block_size();