Simplified distributed block storage with strong consistency, like in Ceph
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

251 lines
7.1 KiB

#include "blockstore.h"
blockstore::blockstore(spp::sparse_hash_map<std::string, std::string> & config, ring_loop_t *ringloop)
{
this->ringloop = ringloop;
ring_consumer.loop = [this]() { loop(); };
ringloop->register_consumer(ring_consumer);
initialized = 0;
block_order = strtoull(config["block_size_order"].c_str(), NULL, 10);
if (block_order == 0)
{
block_order = DEFAULT_ORDER;
}
block_size = 1 << block_order;
if (block_size <= 1 || block_size >= MAX_BLOCK_SIZE)
{
throw std::runtime_error("Bad block size");
}
zero_object = (uint8_t*)memalign(DISK_ALIGNMENT, block_size);
data_fd = meta_fd = journal.fd = -1;
try
{
open_data(config);
open_meta(config);
open_journal(config);
calc_lengths(config);
data_alloc = allocator_create(block_count);
if (!data_alloc)
throw std::bad_alloc();
}
catch (std::exception & e)
{
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
throw;
}
int flusher_count = strtoull(config["flusher_count"].c_str(), NULL, 10);
if (!flusher_count)
flusher_count = 32;
flusher = new journal_flusher_t(flusher_count, this);
}
blockstore::~blockstore()
{
delete flusher;
free(zero_object);
ringloop->unregister_consumer(ring_consumer);
if (data_fd >= 0)
close(data_fd);
if (meta_fd >= 0 && meta_fd != data_fd)
close(meta_fd);
if (journal.fd >= 0 && journal.fd != meta_fd)
close(journal.fd);
free(journal.sector_buf);
free(journal.sector_info);
}
bool blockstore::is_started()
{
return initialized == 10;
}
// main event loop - produce requests
void blockstore::loop()
{
if (initialized != 10)
{
// read metadata, then journal
if (initialized == 0)
{
metadata_init_reader = new blockstore_init_meta(this);
initialized = 1;
printf("reading blockstore metadata\n");
}
if (initialized == 1)
{
int res = metadata_init_reader->loop();
if (!res)
{
delete metadata_init_reader;
metadata_init_reader = NULL;
journal_init_reader = new blockstore_init_journal(this);
initialized = 2;
printf("reading blockstore journal\n");
}
}
if (initialized == 2)
{
int res = journal_init_reader->loop();
if (!res)
{
delete journal_init_reader;
journal_init_reader = NULL;
initialized = 10;
printf("journal read\n");
}
}
}
else
{
// try to submit ops
auto cur_sync = in_progress_syncs.begin();
while (cur_sync != in_progress_syncs.end())
{
continue_sync(*cur_sync++);
}
auto cur = submit_queue.begin();
bool has_writes = false;
while (cur != submit_queue.end())
{
auto op_ptr = cur;
auto op = *(cur++);
if (op->wait_for)
{
check_wait(op);
if (op->wait_for == WAIT_SQE)
break;
else if (op->wait_for)
continue;
}
unsigned ring_space = io_uring_sq_space_left(&ringloop->ring);
unsigned prev_sqe_pos = ringloop->ring.sq.sqe_tail;
int dequeue_op = 0;
if ((op->flags & OP_TYPE_MASK) == OP_READ)
{
dequeue_op = dequeue_read(op);
}
else if ((op->flags & OP_TYPE_MASK) == OP_WRITE ||
(op->flags & OP_TYPE_MASK) == OP_DELETE)
{
has_writes = true;
dequeue_op = dequeue_write(op);
}
else if ((op->flags & OP_TYPE_MASK) == OP_SYNC)
{
// wait for all small writes to be submitted
// wait for all big writes to complete, submit data device fsync
// wait for the data device fsync to complete, then submit journal writes for big writes
// then submit an fsync operation
if (has_writes)
{
// Can't submit SYNC before previous writes
continue;
}
dequeue_op = dequeue_sync(op);
}
else if ((op->flags & OP_TYPE_MASK) == OP_STABLE)
{
dequeue_op = dequeue_stable(op);
}
if (dequeue_op)
{
submit_queue.erase(op_ptr);
}
else
{
ringloop->ring.sq.sqe_tail = prev_sqe_pos;
if (op->wait_for == WAIT_SQE)
{
op->wait_detail = 1 + ring_space;
// ring is full, stop submission
break;
}
}
}
flusher->loop();
int ret = ringloop->submit();
if (ret < 0)
{
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
}
}
}
bool blockstore::stop()
{
return false;
}
void blockstore::check_wait(blockstore_operation *op)
{
if (op->wait_for == WAIT_SQE)
{
if (io_uring_sq_space_left(&ringloop->ring) < op->wait_detail)
{
// stop submission if there's still no free space
return;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_IN_FLIGHT)
{
auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid,
.version = op->wait_detail,
});
if (dirty_it != dirty_db.end() && IS_IN_FLIGHT(dirty_it->second.state))
{
// do not submit
return;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_JOURNAL)
{
if (journal.used_start < op->wait_detail)
{
// do not submit
return;
}
op->wait_for = 0;
}
else if (op->wait_for == WAIT_JOURNAL_BUFFER)
{
if (journal.sector_info[((journal.cur_sector + 1) % journal.sector_count)].usage_count > 0)
{
// do not submit
return;
}
op->wait_for = 0;
}
else
{
throw std::runtime_error("BUG: op->wait_for value is unexpected");
}
}
int blockstore::enqueue_op(blockstore_operation *op)
{
if (op->offset >= block_size || op->len >= block_size-op->offset ||
(op->len % DISK_ALIGNMENT) ||
(op->flags & OP_TYPE_MASK) < OP_READ || (op->flags & OP_TYPE_MASK) > OP_DELETE)
{
// Basic verification not passed
return -EINVAL;
}
op->wait_for = 0;
op->sync_state = 0;
op->pending_ops = 0;
submit_queue.push_back(op);
if ((op->flags & OP_TYPE_MASK) == OP_WRITE)
{
enqueue_write(op);
}
ringloop->wakeup(ring_consumer);
return 0;
}