Browse Source

Port everything to ring_loop

blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
82cf0a170e
  1. 99
      blockstore.cpp
  2. 21
      blockstore.h
  3. 193
      blockstore_init.cpp
  4. 10
      blockstore_init.h
  5. 8
      blockstore_read.cpp
  6. 8
      ringloop.h

99
blockstore.cpp

@ -1,9 +1,11 @@
#include "blockstore.h"
blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config, io_uring *ring)
blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config, ring_loop_t *ringloop)
{
this->ring = ring;
ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring->sq.ring_sz);
this->ringloop = ringloop;
ring_consumer.handle_event = [this](ring_data_t *d) { handle_event(d); };
ring_consumer.loop = [this]() { loop(); };
ringloop->register_consumer(ring_consumer);
initialized = 0;
block_order = stoull(config["block_size_order"]);
block_size = 1 << block_order;
@ -36,7 +38,7 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
blockstore::~blockstore()
{
free(ring_data);
ringloop->unregister_consumer(ring_consumer.number);
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
@ -45,68 +47,61 @@ blockstore::~blockstore()
close(journal_fd);
}
struct io_uring_sqe* blockstore::get_sqe()
// main event loop - handle requests
void blockstore::handle_event(ring_data_t *data)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
if (sqe)
if (initialized != 0)
{
io_uring_sqe_set_data(sqe, ring_data + (sqe - ring->sq.sqes));
}
return sqe;
}
// must be called in the event loop until it returns 0
int blockstore::init_loop()
{
// read metadata, then journal
if (initialized)
{
return 0;
}
if (!metadata_init_reader)
{
metadata_init_reader = new blockstore_init_meta(this);
}
if (metadata_init_reader->read_loop())
{
return 1;
}
if (!journal_init_reader)
{
journal_init_reader = new blockstore_init_journal(this);
if (metadata_init_reader)
{
metadata_init_reader->handle_event(data);
}
else if (journal_init_reader)
{
journal_init_reader->handle_event(data);
}
}
if (journal_init_reader->read_loop())
else
{
return 1;
}
initialized = true;
delete metadata_init_reader;
delete journal_init_reader;
metadata_init_reader = NULL;
journal_init_reader = NULL;
return 0;
}
// main event loop
int blockstore::main_loop()
// main event loop - produce requests
void blockstore::loop()
{
while (true)
if (initialized != 10)
{
struct io_uring_cqe *cqe;
io_uring_peek_cqe(ring, &cqe);
if (cqe)
// read metadata, then journal
if (initialized == 0)
{
metadata_init_reader = new blockstore_init_meta(this);
initialized = 1;
}
else if (initialized == 1)
{
struct ring_data *d = cqe->user_data;
if (d->source == SRC_BLOCKSTORE)
int res = metadata_init_reader->loop();
if (!res)
{
handle_event();
delete metadata_init_reader;
metadata_init_reader = NULL;
journal_init_reader = new blockstore_init_journal(this);
initialized = 2;
}
else
}
else if (initialized == 2)
{
int res = journal_init_reader->loop();
if (!res)
{
// someone else
delete journal_init_reader;
journal_init_reader = NULL;
initialized = 10;
}
io_uring_cqe_seen(ring, cqe);
}
}
return 0;
else
{
}
}

21
blockstore.h

@ -17,9 +17,11 @@
#include <set>
#include <functional>
#include "allocator.h"
#include "sparsepp/sparsepp/spp.h"
#include "allocator.h"
#include "ringloop.h"
// States are not stored on disk. Instead, they're deduced from the journal
#define ST_IN_FLIGHT 1
@ -140,19 +142,13 @@ struct blockstore_operation
uint64_t wait_version;
};
/*struct ring_data_t
{
uint64_t source;
struct iovec iov; // for single-entry read/write operations
void *op;
};*/
class blockstore;
#include "blockstore_init.h"
class blockstore
{
struct ring_consumer_t ring_consumer;
public:
spp::sparse_hash_map<object_id, clean_entry, oid_hash> object_db;
spp::sparse_hash_map<object_id, dirty_list, oid_hash> dirty_queue;
@ -173,12 +169,11 @@ public:
uint64_t journal_start, journal_end;
uint32_t journal_crc32_last;
struct io_uring *ring;
struct ring_data_t *ring_data;
ring_loop_t *ringloop;
struct io_uring_sqe* get_sqe();
blockstore(spp::sparse_hash_map<std::string, std::string> & config, struct io_uring *ring);
blockstore(spp::sparse_hash_map<std::string, std::string> & config, ring_loop_t *ringloop);
~blockstore();
void calc_lengths(spp::sparse_hash_map<std::string, std::string> & config);
@ -191,10 +186,10 @@ public:
int metadata_buf_size;
blockstore_init_meta* metadata_init_reader;
blockstore_init_journal* journal_init_reader;
int init_loop();
// Event loop
int main_loop();
void handle_event(ring_data_t* data);
void loop();
// Read
int read(blockstore_operation *read_op);

193
blockstore_init.cpp

@ -5,7 +5,22 @@ blockstore_init_meta::blockstore_init_meta(blockstore *bs)
this->bs = bs;
}
int blockstore_init_meta::read_loop()
void blockstore_init_meta::handle_event(ring_data_t *data)
{
if (data->res < 0)
{
throw new std::runtime_error(
std::string("read metadata failed at offset ") + std::to_string(metadata_read) +
std::string(": ") + strerror(-data->res)
);
}
prev_done = data->res > 0 ? submitted : 0;
done_len = data->res;
metadata_read += data->res;
submitted = 0;
}
int blockstore_init_meta::loop()
{
if (metadata_read >= bs->meta_len)
{
@ -15,39 +30,20 @@ int blockstore_init_meta::read_loop()
{
metadata_buffer = (uint8_t*)memalign(512, 2*bs->metadata_buf_size);
}
if (submitted)
{
struct io_uring_cqe *cqe;
io_uring_peek_cqe(bs->ring, &cqe);
if (cqe)
{
if (cqe->res < 0)
{
throw new std::runtime_error(
std::string("read metadata failed at offset ") + std::to_string(metadata_read) +
std::string(": ") + strerror(-cqe->res)
);
}
prev_done = cqe->res > 0 ? submitted : 0;
done_len = cqe->res;
metadata_read += cqe->res;
submitted = 0;
io_uring_cqe_seen(bs->ring, cqe);
}
}
if (!submitted)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring);
struct io_uring_sqe *sqe = bs->ringloop->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read metadata");
}
submit_iov = {
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = {
metadata_buffer + (prev == 1 ? bs->metadata_buf_size : 0),
bs->meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->meta_len - metadata_read,
};
io_uring_prep_readv(sqe, bs->meta_fd, &submit_iov, 1, bs->meta_offset + metadata_read);
io_uring_submit(bs->ring);
io_uring_prep_readv(sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + metadata_read);
bs->ringloop->submit();
submitted = (prev == 1 ? 2 : 1);
prev = submitted;
}
@ -101,7 +97,67 @@ bool iszero(uint64_t *buf, int len)
return true;
}
int blockstore_init_journal::read_loop()
void blockstore_init_journal::handle_event(ring_data_t *data)
{
if (step == 1)
{
// Step 1: Read first block of the journal
if (data->res < 0)
{
throw new std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(0) +
std::string(": ") + strerror(-data->res)
);
}
if (iszero((uint64_t*)journal_buffer, 3))
{
// Journal is empty
bs->journal_start = 512;
bs->journal_end = 512;
step = 99;
}
else
{
// First block always contains a single JE_START entry
journal_entry_start *je = (journal_entry_start*)journal_buffer;
if (je->magic != JOURNAL_MAGIC ||
je->type != JE_START ||
je->size != sizeof(journal_entry_start) ||
je_crc32((journal_entry*)je) != je->crc32)
{
// Entry is corrupt
throw new std::runtime_error("first entry of the journal is corrupt");
}
journal_pos = bs->journal_start = je->journal_start;
crc32_last = je->crc32_replaced;
step = 2;
}
}
else if (step == 2 || step == 3)
{
// Step 3: Read journal
if (data->res < 0)
{
throw new std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(journal_pos) +
std::string(": ") + strerror(-data->res)
);
}
done_pos = journal_pos;
done_buf = submitted;
done_len = data->res;
journal_pos += data->res;
if (journal_pos >= bs->journal_len)
{
// Continue from the beginning
journal_pos = 512;
wrapped = true;
}
submitted = 0;
}
}
int blockstore_init_journal::loop()
{
if (step == 100)
{
@ -114,86 +170,20 @@ int blockstore_init_journal::read_loop()
if (step == 0)
{
// Step 1: Read first block of the journal
struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring);
struct io_uring_sqe *sqe = bs->ringloop->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read journal");
}
submit_iov = { journal_buffer, 512 };
io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset);
io_uring_submit(bs->ring);
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = { journal_buffer, 512 };
io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset);
bs->ringloop->submit();
step = 1;
}
if (step == 1)
{
// Step 2: Get the completion event and check the beginning for <START> entry
struct io_uring_cqe *cqe;
io_uring_peek_cqe(bs->ring, &cqe);
if (cqe)
{
if (cqe->res < 0)
{
throw new std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(0) +
std::string(": ") + strerror(-cqe->res)
);
}
if (iszero((uint64_t*)journal_buffer, 3))
{
// Journal is empty
bs->journal_start = 512;
bs->journal_end = 512;
step = 99;
}
else
{
// First block always contains a single JE_START entry
journal_entry_start *je = (journal_entry_start*)journal_buffer;
if (je->magic != JOURNAL_MAGIC ||
je->type != JE_START ||
je->size != sizeof(journal_entry_start) ||
je_crc32((journal_entry*)je) != je->crc32)
{
// Entry is corrupt
throw new std::runtime_error("first entry of the journal is corrupt");
}
journal_pos = bs->journal_start = je->journal_start;
crc32_last = je->crc32_replaced;
step = 2;
}
io_uring_cqe_seen(bs->ring, cqe);
}
}
if (step == 2 || step == 3)
{
// Step 3: Read journal
if (submitted)
{
struct io_uring_cqe *cqe;
io_uring_peek_cqe(bs->ring, &cqe);
if (cqe)
{
if (cqe->res < 0)
{
throw new std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(journal_pos) +
std::string(": ") + strerror(-cqe->res)
);
}
done_pos = journal_pos;
done_buf = submitted;
done_len = cqe->res;
journal_pos += cqe->res;
if (journal_pos >= bs->journal_len)
{
// Continue from the beginning
journal_pos = 512;
wrapped = true;
}
submitted = 0;
io_uring_cqe_seen(bs->ring, cqe);
}
}
if (!submitted)
{
if (step != 3)
@ -204,22 +194,23 @@ int blockstore_init_journal::read_loop()
}
else
{
struct io_uring_sqe *sqe = io_uring_get_sqe(bs->ring);
struct io_uring_sqe *sqe = bs->ringloop->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read journal");
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
uint64_t end = bs->journal_len;
if (journal_pos < bs->journal_start)
{
end = bs->journal_start;
}
submit_iov = {
data->iov = {
journal_buffer + (done_buf == 1 ? JOURNAL_BUFFER_SIZE : 0),
end - journal_pos < JOURNAL_BUFFER_SIZE ? end - journal_pos : JOURNAL_BUFFER_SIZE,
};
io_uring_prep_readv(sqe, bs->journal_fd, &submit_iov, 1, bs->journal_offset + journal_pos);
io_uring_submit(bs->ring);
io_uring_prep_readv(sqe, bs->journal_fd, &data->iov, 1, bs->journal_offset + journal_pos);
bs->ringloop->submit();
submitted = done_buf == 1 ? 2 : 1;
}
}

10
blockstore_init.h

@ -5,12 +5,12 @@ class blockstore_init_meta
blockstore *bs;
uint8_t *metadata_buffer = NULL;
uint64_t metadata_read = 0;
struct iovec submit_iov;
int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0;
void handle_entries(struct clean_disk_entry* entries, int count);
public:
blockstore_init_meta(blockstore* bs);
int read_loop();
blockstore_init_meta(blockstore *bs);
void handle_event(ring_data_t *data);
int loop();
};
class blockstore_init_journal
@ -19,7 +19,6 @@ class blockstore_init_journal
uint8_t *journal_buffer = NULL;
int step = 0;
uint32_t crc32_last = 0;
struct iovec submit_iov;
uint64_t done_pos = 0, journal_pos = 0;
uint64_t cur_skip = 0;
bool wrapped = false;
@ -27,5 +26,6 @@ class blockstore_init_journal
int handle_journal_part(void *buf, uint64_t len);
public:
blockstore_init_journal(blockstore* bs);
int read_loop();
void handle_event(ring_data_t *data);
int loop();
};

8
blockstore_read.cpp

@ -87,7 +87,7 @@ int blockstore::read(blockstore_operation *read_op)
read_op->callback(read_op);
return 0;
}
unsigned prev_sqe_pos = ring->sq.sqe_tail;
unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail;
uint64_t fulfilled = 0;
if (dirty_it != object_db.end())
{
@ -99,7 +99,7 @@ int blockstore::read(blockstore_operation *read_op)
if (fulfill_read(read_op, dirty[i].offset, dirty[i].offset + dirty[i].size, dirty[i].state, dirty[i].version, dirty[i].location) < 0)
{
// need to wait for something, undo added requests and requeue op
ring->sq.sqe_tail = prev_sqe_pos;
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
submit_queue.push_front(read_op);
return 0;
@ -112,7 +112,7 @@ int blockstore::read(blockstore_operation *read_op)
if (fulfill_read(read_op, 0, block_size, ST_CURRENT, 0, clean_it->second.location) < 0)
{
// need to wait for something, undo added requests and requeue op
ring->sq.sqe_tail = prev_sqe_pos;
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
read_op->read_vec.clear();
// FIXME: bad implementation
submit_queue.push_front(read_op);
@ -127,7 +127,7 @@ int blockstore::read(blockstore_operation *read_op)
return 0;
}
// FIXME reap events!
int ret = io_uring_submit(ring);
int ret = ringloop->submit();
if (ret < 0)
{
throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));

8
ringloop.h

@ -21,8 +21,8 @@ struct ring_data_t
struct ring_consumer_t
{
int number;
std::function<void (ring_data_t*)> handle_event;
std::function<void ()> loop;
std::function<void(ring_data_t*)> handle_event;
std::function<void(void)> loop;
};
class ring_loop_t
@ -37,4 +37,8 @@ public:
int register_consumer(ring_consumer_t & consumer);
void unregister_consumer(int number);
void loop(bool sleep);
inline int submit()
{
return io_uring_submit(ring);
}
};

Loading…
Cancel
Save