Browse Source

Begin to debug ... all of this stuff :)

blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
eec1c35ea4
  1. 11
      Makefile
  2. 2
      allocator.cpp
  3. 26
      blockstore.cpp
  4. 8
      blockstore_flush.cpp
  5. 14
      blockstore_init.cpp
  6. 46
      blockstore_open.cpp
  7. 2
      blockstore_stable.cpp
  8. 4
      blockstore_sync.cpp
  9. 2
      blockstore_write.cpp
  10. 18
      ringloop.cpp
  11. 4
      ringloop.h
  12. 2
      test.cpp
  13. 16
      test_blockstore.cpp

11
Makefile

@ -1,10 +1,13 @@
all: allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o test
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_flush.o crc32c.o ringloop.o
all: $(BLOCKSTORE_OBJS) test test_blockstore
clean:
rm -f *.o
crc32c.o: crc32c.c
g++ -c -o $@ $<
%.o: %.cpp blockstore.h
g++ -Wall -Wno-sign-compare -Wno-parentheses -c -o $@ $<
g++ -g -Wall -Wno-sign-compare -Wno-parentheses -c -o $@ $<
test: test.cpp
g++ -O3 -o test -luring test.cpp
g++ -g -O3 -o test -luring test.cpp
test_blockstore: $(BLOCKSTORE_OBJS) test_blockstore.cpp
g++ -g -o test_blockstore -luring test_blockstore.cpp $(BLOCKSTORE_OBJS)

2
allocator.cpp

@ -22,7 +22,7 @@ allocator *allocator_create(uint64_t blocks)
buf->last_one_mask = (blocks % 64) == 0
? UINT64_MAX
: ~(UINT64_MAX << (64 - blocks % 64));
for (uint64_t i = 0; i < blocks; i++)
for (uint64_t i = 0; i < total; i++)
{
buf->mask[i] = 0;
}

26
blockstore.cpp

@ -6,11 +6,15 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
ring_consumer.loop = [this]() { loop(); };
ringloop->register_consumer(ring_consumer);
initialized = 0;
block_order = stoull(config["block_size_order"]);
block_order = strtoull(config["block_size_order"].c_str(), NULL, 10);
if (block_order == 0)
{
block_order = DEFAULT_ORDER;
}
block_size = 1 << block_order;
if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE)
{
throw new std::runtime_error("Bad block size");
throw std::runtime_error("Bad block size");
}
zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size);
data_fd = meta_fd = journal.fd = -1;
@ -22,7 +26,7 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
calc_lengths(config);
data_alloc = allocator_create(block_count);
if (!data_alloc)
throw new std::bad_alloc();
throw std::bad_alloc();
}
catch (std::exception & e)
{
@ -32,9 +36,9 @@ blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config,
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
throw e;
throw;
}
int flusher_count = stoull(config["flusher_count"]);
int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
if (!flusher_count)
flusher_count = 32;
flusher = new journal_flusher_t(flusher_count, this);
@ -110,8 +114,8 @@ void blockstore::loop()
else if (op->wait_for)
continue;
}
unsigned ring_space = io_uring_sq_space_left(ringloop->ring);
unsigned prev_sqe_pos = ringloop->ring->sq.sqe_tail;
unsigned ring_space = io_uring_sq_space_left(&ringloop->ring);
unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail;
int dequeue_op = 0;
if ((op->flags & OP_TYPE_MASK) == OP_READ)
{
@ -145,13 +149,13 @@ void blockstore::loop()
int ret = ringloop->submit();
if (ret < 0)
{
throw new std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
submit_queue.erase(op_ptr);
}
else
{
ringloop->ring->sq.sqe_tail = prev_sqe_pos;
ringloop->ring.sq.sqe_tail = prev_sqe_pos;
if (op->wait_for == WAIT_SQE)
{
op->wait_detail = 1 + ring_space;
@ -173,7 +177,7 @@ void blockstore::check_wait(blockstore_operation *op)
{
if (op->wait_for == WAIT_SQE)
{
if (io_uring_sq_space_left(ringloop->ring) < op->wait_detail)
if (io_uring_sq_space_left(&ringloop->ring) < op->wait_detail)
{
// stop submission if there's still no free space
return;
@ -213,7 +217,7 @@ void blockstore::check_wait(blockstore_operation *op)
}
else
{
throw new std::runtime_error("BUG: op->wait_for value is unexpected");
throw std::runtime_error("BUG: op->wait_for value is unexpected");
}
}

8
blockstore_flush.cpp

@ -26,7 +26,7 @@ journal_flusher_co::journal_flusher_co()
{
if (data->res < 0)
{
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
throw std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
}
wait_count--;
};
@ -190,7 +190,7 @@ resume_0:
}
else if (!IS_STABLE(dirty_it->second.state))
{
throw new std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state));
throw std::runtime_error("BUG: Unexpected dirty_entry state during flush: " + std::to_string(dirty_it->second.state));
}
dirty_it--;
} while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid);
@ -216,7 +216,7 @@ resume_0:
if (clean_it == bs->clean_db.end())
{
// Object not present at all. This is a bug.
throw new std::runtime_error("BUG: Object we are trying to flush not allocated on the data device");
throw std::runtime_error("BUG: Object we are trying to flush not allocated on the data device");
}
else
clean_loc = clean_it->second.location;
@ -245,7 +245,7 @@ resume_0:
{
if (data->res < 0)
{
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
throw std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
}
meta_it->second.state = 1;
wait_count--;

14
blockstore_init.cpp

@ -9,7 +9,7 @@ void blockstore_init_meta::handle_event(ring_data_t *data)
{
if (data->res < 0)
{
throw new std::runtime_error(
throw std::runtime_error(
std::string("read metadata failed at offset ") + std::to_string(metadata_read) +
std::string(": ") + strerror(-data->res)
);
@ -35,7 +35,7 @@ int blockstore_init_meta::loop()
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read metadata");
throw std::runtime_error("io_uring is full while trying to read metadata");
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = {
@ -116,7 +116,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data)
// Step 1: Read first block of the journal
if (data->res < 0)
{
throw new std::runtime_error(
throw std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(0) +
std::string(": ") + strerror(-data->res)
);
@ -139,7 +139,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data)
je_crc32((journal_entry*)je) != je->crc32)
{
// Entry is corrupt
throw new std::runtime_error("first entry of the journal is corrupt");
throw std::runtime_error("first entry of the journal is corrupt");
}
journal_pos = bs->journal.used_start = je->journal_start;
crc32_last = 0;
@ -152,7 +152,7 @@ void blockstore_init_journal::handle_event(ring_data_t *data)
// Step 3: Read journal
if (data->res < 0)
{
throw new std::runtime_error(
throw std::runtime_error(
std::string("read journal failed at offset ") + std::to_string(journal_pos) +
std::string(": ") + strerror(-data->res)
);
@ -187,7 +187,7 @@ int blockstore_init_journal::loop()
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read journal");
throw std::runtime_error("io_uring is full while trying to read journal");
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
data->iov = { journal_buffer, 512 };
@ -212,7 +212,7 @@ int blockstore_init_journal::loop()
struct io_uring_sqe *sqe = bs->get_sqe();
if (!sqe)
{
throw new std::runtime_error("io_uring is full while trying to read journal");
throw std::runtime_error("io_uring is full while trying to read journal");
}
struct ring_data_t *data = ((ring_data_t*)sqe->user_data);
uint64_t end = bs->journal.len;

46
blockstore_open.cpp

@ -40,18 +40,18 @@ void blockstore::calc_lengths(spp::sparse_hash_map<std::string, std::string> & c
meta_len = block_count * sizeof(clean_disk_entry);
if (meta_area < meta_len)
{
throw new std::runtime_error("Metadata area is too small");
throw std::runtime_error("Metadata area is too small");
}
metadata_buf_size = stoull(config["meta_buf_size"]);
metadata_buf_size = strtoull(config["meta_buf_size"].c_str(), NULL, 10);
if (metadata_buf_size < 65536)
{
metadata_buf_size = 4*1024*1024;
}
// requested journal size
uint64_t journal_wanted = stoull(config["journal_size"]);
uint64_t journal_wanted = strtoull(config["journal_size"].c_str(), NULL, 10);
if (journal_wanted > journal.len)
{
throw new std::runtime_error("Requested journal_size is too large");
throw std::runtime_error("Requested journal_size is too large");
}
else if (journal_wanted > 0)
{
@ -59,7 +59,7 @@ void blockstore::calc_lengths(spp::sparse_hash_map<std::string, std::string> & c
}
if (journal.len < MIN_JOURNAL_SIZE)
{
throw new std::runtime_error("Journal is too small");
throw std::runtime_error("Journal is too small");
}
}
@ -69,7 +69,7 @@ void check_size(int fd, uint64_t *size, std::string name)
struct stat st;
if (fstat(fd, &st) < 0)
{
throw new std::runtime_error("Failed to stat "+name);
throw std::runtime_error("Failed to stat "+name);
}
if (S_ISREG(st.st_mode))
{
@ -81,40 +81,40 @@ void check_size(int fd, uint64_t *size, std::string name)
ioctl(fd, BLKGETSIZE64, size) < 0 ||
sectsize != 512)
{
throw new std::runtime_error(name+" sector is not equal to 512 bytes");
throw std::runtime_error(name+" sector is not equal to 512 bytes");
}
}
else
{
throw new std::runtime_error(name+" is neither a file nor a block device");
throw std::runtime_error(name+" is neither a file nor a block device");
}
}
void blockstore::open_data(spp::sparse_hash_map<std::string, std::string> & config)
{
data_offset = stoull(config["data_offset"]);
data_offset = strtoull(config["data_offset"].c_str(), NULL, 10);
if (data_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("data_offset not aligned");
throw std::runtime_error("data_offset not aligned");
}
data_fd = open(config["data_device"].c_str(), O_DIRECT|O_RDWR);
if (data_fd == -1)
{
throw new std::runtime_error("Failed to open data device");
throw std::runtime_error("Failed to open data device");
}
check_size(data_fd, &data_size, "data device");
if (data_offset >= data_size)
{
throw new std::runtime_error("data_offset exceeds device size");
throw std::runtime_error("data_offset exceeds device size");
}
}
void blockstore::open_meta(spp::sparse_hash_map<std::string, std::string> & config)
{
meta_offset = stoull(config["meta_offset"]);
meta_offset = strtoull(config["meta_offset"].c_str(), NULL, 10);
if (meta_offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("meta_offset not aligned");
throw std::runtime_error("meta_offset not aligned");
}
if (config["meta_device"] != "")
{
@ -122,12 +122,12 @@ void blockstore::open_meta(spp::sparse_hash_map<std::string, std::string> & conf
meta_fd = open(config["meta_device"].c_str(), O_DIRECT|O_RDWR);
if (meta_fd == -1)
{
throw new std::runtime_error("Failed to open metadata device");
throw std::runtime_error("Failed to open metadata device");
}
check_size(meta_fd, &meta_size, "metadata device");
if (meta_offset >= meta_size)
{
throw new std::runtime_error("meta_offset exceeds device size");
throw std::runtime_error("meta_offset exceeds device size");
}
}
else
@ -136,24 +136,24 @@ void blockstore::open_meta(spp::sparse_hash_map<std::string, std::string> & conf
meta_size = 0;
if (meta_offset >= data_size)
{
throw new std::runtime_error("meta_offset exceeds device size");
throw std::runtime_error("meta_offset exceeds device size");
}
}
}
void blockstore::open_journal(spp::sparse_hash_map<std::string, std::string> & config)
{
journal.offset = stoull(config["journal_offset"]);
journal.offset = strtoull(config["journal_offset"].c_str(), NULL, 10);
if (journal.offset % DISK_ALIGNMENT)
{
throw new std::runtime_error("journal_offset not aligned");
throw std::runtime_error("journal_offset not aligned");
}
if (config["journal_device"] != "")
{
journal.fd = open(config["journal_device"].c_str(), O_DIRECT|O_RDWR);
if (journal.fd == -1)
{
throw new std::runtime_error("Failed to open journal device");
throw std::runtime_error("Failed to open journal device");
}
check_size(journal.fd, &journal.device_size, "metadata device");
}
@ -163,10 +163,10 @@ void blockstore::open_journal(spp::sparse_hash_map<std::string, std::string> & c
journal.device_size = 0;
if (journal.offset >= data_size)
{
throw new std::runtime_error("journal_offset exceeds device size");
throw std::runtime_error("journal_offset exceeds device size");
}
}
journal.sector_count = stoull(config["journal_sector_buffer_count"]);
journal.sector_count = strtoull(config["journal_sector_buffer_count"].c_str(), NULL, 10);
if (!journal.sector_count)
{
journal.sector_count = 32;
@ -175,6 +175,6 @@ void blockstore::open_journal(spp::sparse_hash_map<std::string, std::string> & c
journal.sector_info = (journal_sector_info_t*)calloc(journal.sector_count, sizeof(journal_sector_info_t));
if (!journal.sector_buf || !journal.sector_info)
{
throw new std::bad_alloc();
throw std::bad_alloc();
}
}

2
blockstore_stable.cpp

@ -109,7 +109,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
{
if (data->res < 0)
{
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
throw std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
}
op->pending_ops--;
if (op->pending_ops == 0)

4
blockstore_sync.cpp

@ -113,7 +113,7 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
{
if (data->res < 0)
{
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
throw std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
}
op->pending_ops--;
if (op->pending_ops == 0)
@ -150,7 +150,7 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
}
else
{
throw new std::runtime_error("BUG: unexpected sync op state");
throw std::runtime_error("BUG: unexpected sync op state");
}
ack_sync(op);
}

2
blockstore_write.cpp

@ -153,7 +153,7 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op)
if (data->res < 0)
{
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
throw new std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
throw std::runtime_error("write operation failed. in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111");
}
op->pending_ops--;
if (op->pending_ops == 0)

18
ringloop.cpp

@ -2,15 +2,15 @@
ring_loop_t::ring_loop_t(int qd)
{
int ret = io_uring_queue_init(qd, ring, 0);
int ret = io_uring_queue_init(qd, &ring, 0);
if (ret < 0)
{
throw new std::runtime_error(std::string("io_uring_queue_init: ") + strerror(-ret));
throw std::runtime_error(std::string("io_uring_queue_init: ") + strerror(-ret));
}
ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring->sq.ring_sz);
ring_data = (struct ring_data_t*)malloc(sizeof(ring_data_t) * ring.sq.ring_sz);
if (!ring_data)
{
throw new std::bad_alloc();
throw std::bad_alloc();
}
}
@ -21,10 +21,10 @@ ring_loop_t::~ring_loop_t()
struct io_uring_sqe* ring_loop_t::get_sqe()
{
struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
if (sqe)
{
io_uring_sqe_set_data(sqe, ring_data + (sqe - ring->sq.sqes));
io_uring_sqe_set_data(sqe, ring_data + (sqe - ring.sq.sqes));
}
return sqe;
}
@ -49,16 +49,16 @@ void ring_loop_t::loop(bool sleep)
struct io_uring_cqe *cqe;
if (sleep)
{
io_uring_wait_cqe(ring, &cqe);
io_uring_wait_cqe(&ring, &cqe);
}
while ((io_uring_peek_cqe(ring, &cqe), cqe))
while ((io_uring_peek_cqe(&ring, &cqe), cqe))
{
struct ring_data_t *d = (struct ring_data_t*)cqe->user_data;
if (d->callback)
{
d->callback(d);
}
io_uring_cqe_seen(ring, cqe);
io_uring_cqe_seen(&ring, cqe);
}
for (int i = 0; i < consumers.size(); i++)
{

4
ringloop.h

@ -28,7 +28,7 @@ class ring_loop_t
std::vector<ring_consumer_t> consumers;
struct ring_data_t *ring_data;
public:
struct io_uring *ring;
struct io_uring ring;
ring_loop_t(int qd);
~ring_loop_t();
struct io_uring_sqe* get_sqe();
@ -37,6 +37,6 @@ public:
void loop(bool sleep);
inline int submit()
{
return io_uring_submit(ring);
return io_uring_submit(&ring);
}
};

2
test.cpp

@ -178,7 +178,7 @@ int main0(int argc, char *argv[])
.flags = 0,
.location = (uint64_t)i << 17,
.offset = 0,
.size = 1 << 17,
.len = 1 << 17,
};
}
return 0;

16
test_blockstore.cpp

@ -0,0 +1,16 @@
#include <iostream>
#include "blockstore.h"
int main(int narg, char *args[])
{
spp::sparse_hash_map<std::string, std::string> config;
config["meta_device"] = "./test_meta.bin";
config["journal_device"] = "./test_journal.bin";
config["data_device"] = "./test_data.bin";
ring_loop_t *ringloop = new ring_loop_t(512);
blockstore *bs = new blockstore(config, ringloop);
delete bs;
delete ringloop;
return 0;
}
Loading…
Cancel
Save