Move blockstore journal fields to journal_t, implement multiple write buffers for journal sectors

blocking-uring-test
Vitaliy Filippov 2019-11-07 16:58:30 +03:00
parent 84c62840bd
commit 5330461029
7 changed files with 167 additions and 84 deletions

View File

@ -13,7 +13,7 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
{
throw new std::runtime_error("Bad block size");
}
data_fd = meta_fd = journal_fd = -1;
data_fd = meta_fd = journal.fd = -1;
try
{
open_data(config);
@ -30,8 +30,8 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal_fd >= 0 && journal_fd != meta_fd)
close(journal_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
throw e;
}
}
@ -43,8 +43,10 @@ blockstore::~blockstore()
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal_fd >= 0 && journal_fd != meta_fd)
close(journal_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
free(journal.sector_buf);
free(journal.sector_info);
}
// main event loop - handle requests
@ -81,6 +83,32 @@ void blockstore::handle_event(ring_data_t *data)
in_process_ops.erase(op);
}
}
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
{
op->pending_ops--;
if (data->res < 0)
{
// write error
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
op->retval = data->res;
}
if (op->used_journal_sector > 0)
{
uint64_t s = op->used_journal_sector-1;
if (journal.sector_info[s].usage_count > 0)
{
// The last write to this journal sector was made by this op, release the buffer
journal.sector_info[s].usage_count--;
}
op->used_journal_sector = 0;
}
if (op->pending_ops == 0)
{
}
}
}
}
@ -180,14 +208,14 @@ int blockstore::enqueue_op(blockstore_operation *op)
auto dirty_it = dirty_queue.find(op->oid);
if (dirty_it != dirty_queue.end())
{
op->version = (*dirty_it).back().version + 1;
op->version = dirty_it->second.back().version + 1;
}
else
{
auto clean_it = object_db.find(op->oid);
if (clean_it != object_db.end())
{
op->version = (*clean_it).version + 1;
op->version = clean_it->second.version + 1;
}
else
{
@ -196,7 +224,7 @@ int blockstore::enqueue_op(blockstore_operation *op)
dirty_it = dirty_queue.emplace(op->oid, dirty_list()).first;
}
// Immediately add the operation into the dirty queue, so subsequent reads could see it
(*dirty_it).push_back((dirty_entry){
dirty_it->second.push_back((dirty_entry){
.version = op->version,
.state = ST_IN_FLIGHT,
.flags = 0,

View File

@ -139,8 +139,14 @@ public:
#define OP_DELETE 6
#define OP_TYPE_MASK 0x7
// Suspend operation until there are more free SQEs
#define WAIT_SQE 1
// Suspend operation until version <wait_detail> of object <oid> is written
#define WAIT_IN_FLIGHT 2
// Suspend operation until there are <wait_detail> bytes of free space in the journal on disk
#define WAIT_JOURNAL 3
// Suspend operation until the next journal sector buffer is free
#define WAIT_JOURNAL_BUFFER 4
struct blockstore_operation
{
@ -158,6 +164,7 @@ struct blockstore_operation
int pending_ops;
int wait_for;
uint64_t wait_detail;
uint64_t used_journal_sector;
};
class blockstore;
@ -176,16 +183,13 @@ public:
uint64_t block_count;
allocator *data_alloc;
int journal_fd;
int meta_fd;
int data_fd;
uint64_t journal_offset, journal_size, journal_len;
uint64_t meta_offset, meta_size, meta_area, meta_len;
uint64_t data_offset, data_size, data_len;
uint64_t journal_start, journal_end;
uint32_t journal_crc32_last;
struct journal_t journal;
ring_loop_t *ringloop;

View File

@ -112,8 +112,9 @@ void blockstore_init_journal::handle_event(ring_data_t *data)
if (iszero((uint64_t*)journal_buffer, 3))
{
// Journal is empty
bs->journal_start = 512;
bs->journal_end = 512;
// FIXME handle this wrapping to 512 better
bs->journal.used_start = 512;
bs->journal.next_free = 512;
step = 99;
}
else
@ -128,7 +129,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data)
// Entry is corrupt
throw new std::runtime_error("first entry of the journal is corrupt");
}
journal_pos = bs->journal_start = je->journal_start;
journal_pos = bs->journal.used_start = je->journal_start;
crc32_last = je->crc32_replaced;
step = 2;
}
@ -147,7 +148,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data)
done_buf = submitted;
done_len = data->res;
journal_pos += data->res;
if (journal_pos >= bs->journal_len)
if (journal_pos >= bs->journal.len)
{
// Continue from the beginning
journal_pos = 512;
@ -177,7 +178,7 @@ int blockstore_init_journal::loop()
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = { journal_buffer, 512 };
io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset);
io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset);
bs->ringloop->submit();
step = 1;
}
@ -188,7 +189,7 @@ int blockstore_init_journal::loop()
{
if (step != 3)
{
if (journal_pos == bs->journal_start && wrapped)
if (journal_pos == bs->journal.used_start && wrapped)
{
step = 3;
}
@ -200,16 +201,16 @@ int blockstore_init_journal::loop()
throw new std::runtime_error("io_uring is full while trying to read journal");
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
uint64_t end = bs->journal_len;
if (journal_pos < bs->journal_start)
uint64_t end = bs->journal.len;
if (journal_pos < bs->journal.used_start)
{
end = bs->journal_start;
end = bs->journal.used_start;
}
data->iov = {
journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0),
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
};
io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset + journal_pos);
io_uring_prep_readv(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + journal_pos);
bs->ringloop->submit();
submitted = done_buf == 1 ? 2 : 1;
}
@ -233,7 +234,7 @@ int blockstore_init_journal::loop()
if (step == 99)
{
free(journal_buffer);
bs->journal_crc32_last = crc32_last;
bs->journal.crc32_last = crc32_last;
journal_buffer = NULL;
step = 100;
}
@ -261,7 +262,8 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
if (pos == 0)
{
// invalid entry in the beginning, this is definitely the end of the journal
bs->journal_end = done_pos + total_pos + pos;
// FIXME handle the edge case when the journal is full
bs->journal.next_free = done_pos + total_pos;
return 0;
}
else
@ -276,7 +278,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
{
// oid, version, offset, len
uint64_t location;
if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal_len)
if (cur_skip > 0 || done_pos + total_pos + je->small_write.len > bs->journal.len)
{
// data continues from the beginning of the journal
location = 512 + cur_skip;

View File

@ -98,3 +98,27 @@ inline uint32_t je_crc32(journal_entry *je)
{
return crc32c_zero4(((uint8_t*)je)+4, je->size-4);
}
struct journal_sector_info_t
{
uint64_t offset;
uint64_t usage_count;
};
struct journal_t
{
int fd;
uint64_t device_size;
uint64_t offset, len;
uint64_t next_free = 512;
uint64_t used_start = 512;
uint32_t crc32_last = 0;
// Current sector(s) used for writing
uint8_t *sector_buf;
journal_sector_info_t *sector_info;
uint64_t sector_count;
uint64_t cur_sector = 0;
uint64_t in_sector_pos = 0;
};

View File

@ -8,10 +8,10 @@ void blockstore::calc_lengths(spp::sparse_hash_map<std::string, std::string> & c
{
data_len = meta_offset - data_offset;
}
if (data_fd == journal_fd && data_offset < journal_offset)
if (data_fd == journal.fd && data_offset < journal.offset)
{
data_len = data_len < journal_offset-data_offset
? data_len : journal_offset-data_offset;
data_len = data_len < journal.offset-data_offset
? data_len : journal.offset-data_offset;
}
// meta
meta_area = (meta_fd == data_fd ? data_size : meta_size) - meta_offset;
@ -19,21 +19,21 @@ void blockstore::calc_lengths(spp::sparse_hash_map<std::string, std::string> & c
{
meta_area = data_offset - meta_offset;
}
if (meta_fd == journal_fd && meta_offset < journal_offset)
if (meta_fd == journal.fd && meta_offset < journal.offset)
{
meta_area = meta_area < journal_offset-meta_offset
? meta_area : journal_offset-meta_offset;
meta_area = meta_area < journal.offset-meta_offset
? meta_area : journal.offset-meta_offset;
}
// journal
journal_len = (journal_fd == data_fd ? data_size : (journal_fd == meta_fd ? meta_size : journal_size)) - journal_offset;
if (journal_fd == data_fd && journal_offset < data_offset)
journal.len = (journal.fd == data_fd ? data_size : (journal.fd == meta_fd ? meta_size : journal.device_size)) - journal.offset;
if (journal.fd == data_fd && journal.offset < data_offset)
{
journal_len = data_offset - journal_offset;
journal.len = data_offset - journal.offset;
}
if (journal_fd == meta_fd && journal_offset < meta_offset)
if (journal.fd == meta_fd && journal.offset < meta_offset)
{
journal_len = journal_len < meta_offset-journal_offset
? journal_len : meta_offset-journal_offset;
journal.len = journal.len < meta_offset-journal.offset
? journal.len : meta_offset-journal.offset;
}
// required metadata size
block_count = data_len / block_size;
@ -49,15 +49,15 @@ void blockstore::calc_lengths(spp::sparse_hash_map<std::string, std::string> & c
}
// requested journal size
uint64_t journal_wanted = stoull(config["journal_size"]);
if (journal_wanted > journal_len)
if (journal_wanted > journal.len)
{
throw new std::runtime_error("Requested journal_size is too large");
}
else if (journal_wanted > 0)
{
journal_len = journal_wanted;
journal.len = journal_wanted;
}
if (journal_len < MIN_JOURNAL_SIZE)
if (journal.len < MIN_JOURNAL_SIZE)
{
throw new std::runtime_error("Journal is too small");
}
@ -129,20 +129,20 @@ void blockstore::open_meta(spp::sparse_hash_map<std::string, std::string> & conf
void blockstore::open_journal(spp::sparse_hash_map<std::string, std::string> & config)
{
int sectsize;
journal_offset = stoull(config["journal_offset"]);
if (journal_offset % DISK_ALIGNMENT)
journal.offset = stoull(config["journal_offset"]);
if (journal.offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("journal_offset not aligned");
}
if (config["journal_device"] != "")
{
journal_fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR);
if (journal_fd == -1)
journal.fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR);
if (journal.fd == -1)
{
throw new std::runtime_error("Failed to open journal device");
}
if (ioctl(journal_fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(journal_fd, BLKGETSIZE64, &journal_size) < 0 ||
if (ioctl(journal.fd, BLKSSZGET, &sectsize) < 0 ||
ioctl(journal.fd, BLKGETSIZE64, &journal.device_size) < 0 ||
sectsize != 512)
{
throw new std::runtime_error("Journal device sector is not equal to 512 bytes");
@ -150,11 +150,22 @@ void blockstore::open_journal(spp::sparse_hash_map<std::string, std::string> & c
}
else
{
journal_fd = meta_fd;
journal_size = 0;
if (journal_offset >= data_size)
journal.fd = meta_fd;
journal.device_size = 0;
if (journal.offset >= data_size)
{
throw new std::runtime_error("journal_offset exceeds device size");
}
}
journal.sector_count = stoull(config["journal_sector_buffer_count"]);
if (!journal.sector_count)
{
journal.sector_count = 32;
}
journal.sector_buf = (uint8_t*)memalign(512, journal.sector_count * 512);
journal.sector_info = (journal_sector_info_t*)calloc(journal.sector_count, sizeof(journal_sector_info_t));
if (!journal.sector_buf || !journal.sector_info)
{
throw new std::bad_alloc();
}
}

View File

@ -33,9 +33,9 @@ int blockstore::fulfill_read_push(blockstore_operation *read_op, uint32_t item_s
read_op->read_vec[cur_start] = data->iov;
io_uring_prep_readv(
sqe,
IS_JOURNAL(item_state) ? journal_fd : data_fd,
IS_JOURNAL(item_state) ? journal.fd : data_fd,
&data->iov, 1,
(IS_JOURNAL(item_state) ? journal_offset : data_offset) + item_location + cur_start - item_start
(IS_JOURNAL(item_state) ? journal.offset : data_offset) + item_location + cur_start - item_start
);
data->op = read_op;
}

View File

@ -32,29 +32,39 @@ int blockstore::dequeue_write(blockstore_operation *op)
sqe, data_fd, &data->iov, 1, data_offset + (loc << block_order)
);
op->pending_ops = 1;
op->used_journal_sector = 0;
}
else
{
// Small (journaled) write
// First check if the journal has sufficient space
bool two_sqes = false;
uint64_t next_pos = journal_data_pos;
if (512 - journal_sector_pos < sizeof(struct journal_entry_small_write))
// FIXME Always two SQEs for now. Although it's possible to send 1
bool two_sqes = true;
uint64_t next_pos = journal.next_free;
if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write))
{
next_pos = next_pos + 512;
if (journal_len - next_pos < op->len)
two_sqes = true;
if (next_pos >= journal_len)
//if (journal.len - next_pos < op->len)
// two_sqes = true;
if (next_pos >= journal.len)
next_pos = 512;
}
else if (journal_sector + 512 != journal_data_pos || journal_len - journal_data_pos < op->len)
two_sqes = true;
next_pos = (journal_len - next_pos < op->len ? 512 : next_pos) + op->len;
if (next_pos >= journal_start)
// Also check if we have an unused memory buffer for the journal sector
if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0)
{
// No space in the journal. Wait until it's available
// No memory buffer available. Wait for it.
op->wait_for = WAIT_JOURNAL_BUFFER;
return 0;
}
}
//else if (journal.sector_info[journal.cur_sector].offset + 512 != journal.next_free ||
// journal.len - next_pos < op->len)
// two_sqes = true;
next_pos = (journal.len - next_pos < op->len ? 512 : next_pos) + op->len;
if (next_pos >= journal.used_start)
{
// No space in the journal. Wait for it.
op->wait_for = WAIT_JOURNAL;
op->wait_detail = next_pos - journal_start;
op->wait_detail = next_pos - journal.used_start;
return 0;
}
// There is sufficient space. Get SQE(s)
@ -68,50 +78,54 @@ int blockstore::dequeue_write(blockstore_operation *op)
struct ring_data_t *data1 = ((ring_data_t*)sqe1->user_data);
struct ring_data_t *data2 = two_sqes ? ((ring_data_t*)sqe2->user_data) : NULL;
// Got SQEs. Prepare journal sector write
if (512 - journal_sector_pos < sizeof(struct journal_entry_small_write))
if (512 - journal.in_sector_pos < sizeof(struct journal_entry_small_write))
{
// Move to the next journal sector
next_pos = journal_data_pos + 512;
if (next_pos >= journal_len)
next_pos = 512;
journal_sector = journal_data_pos;
journal_sector_pos = 0;
journal_data_pos = next_pos;
memset(journal_sector_buf, 0, 512);
// Also select next sector buffer in memory
journal.cur_sector = ((journal.cur_sector + 1) % journal.sector_count);
journal.sector_info[journal.cur_sector].offset = journal.next_free;
journal.in_sector_pos = 0;
journal.next_free = (journal.next_free + 512) >= journal.len ? journal.next_free + 512 : 512;
memset(journal.sector_buf + 512*journal.cur_sector, 0, 512);
}
journal_entry_small_write *je = (struct journal_entry_small_write*)(journal_sector_buf + journal_sector_pos);
journal_entry_small_write *je = (struct journal_entry_small_write*)(
journal.sector_buf + 512*journal.cur_sector + journal.in_sector_pos
);
*je = {
.crc32 = 0,
.magic = JOURNAL_MAGIC,
.type = JE_SMALL_WRITE,
.size = sizeof(struct journal_entry_small_write),
.crc32_prev = journal_crc32_last,
.crc32_prev = journal.crc32_last,
.oid = op->oid,
.version = op->version,
.offset = op->offset,
.len = op->len,
};
je.crc32 = je_crc32((journal_entry*)je);
data1->iov = (struct iovec){ journal_sector_buf, 512 };
je->crc32 = je_crc32((journal_entry*)je);
data1->iov = (struct iovec){ journal.sector_buf + 512*journal.cur_sector, 512 };
data1->op = op;
io_uring_prep_writev(
sqe1, journal_fd, &data1->iov, 1, journal_offset + journal_sector
sqe1, journal.fd, &data1->iov, 1, journal.offset + journal.sector_info[journal.cur_sector].offset
);
// Prepare journal data write
if (journal_len - journal_data_pos < op->len)
journal_data_pos = 512;
if (journal.len - journal.next_free < op->len)
journal.next_free = 512;
data2->iov = (struct iovec){ op->buf, op->len };
data2->op = op;
io_uring_prep_writev(
sqe2, journal_fd, &data2->iov, 1, journal_offset + journal_data_pos
sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free
);
(*dirty_it).location = journal_data_pos;
(*dirty_it).location = journal.next_free;
//(*dirty_it).state = ST_J_SUBMITTED;
// Move journal_data_pos
journal_data_pos += op->len;
if (journal_data_pos >= journal_len)
journal_data_pos = 512;
// Move journal.next_free and save last write for current sector
journal.next_free += op->len;
if (journal.next_free >= journal.len)
journal.next_free = 512;
journal.sector_info[journal.cur_sector].usage_count++;
journal.crc32_last = je->crc32;
op->pending_ops = 2;
op->used_journal_sector = 1 + journal.cur_sector;
}
in_process_ops.insert(op);
int ret = ringloop->submit();