Fix metadata area size calculation, print free space, wait for free space

FIXME: Now it crashes with -ENOSPC on linear overwrite
blocking-uring-test
Vitaliy Filippov 2019-11-28 20:23:26 +03:00
parent 9fa0d3325f
commit b6fff5a77e
8 changed files with 38 additions and 11 deletions

View File

@ -19,7 +19,7 @@ allocator::allocator(uint64_t blocks)
total -= p2; total -= p2;
total += (blocks+63) / 64; total += (blocks+63) / 64;
mask = new uint64_t[2 + total]; mask = new uint64_t[2 + total];
size = blocks; size = free = blocks;
last_one_mask = (blocks % 64) == 0 last_one_mask = (blocks % 64) == 0
? UINT64_MAX ? UINT64_MAX
: ~(UINT64_MAX << (64 - blocks % 64)); : ~(UINT64_MAX << (64 - blocks % 64));
@ -55,6 +55,10 @@ void allocator::set(uint64_t addr, bool value)
uint64_t bit = cur_addr % 64; uint64_t bit = cur_addr % 64;
if (((mask[last] >> bit) & 1) != value64) if (((mask[last] >> bit) & 1) != value64)
{ {
if (is_last)
{
free += value ? -1 : 1;
}
if (value) if (value)
{ {
mask[last] = mask[last] | (1l << bit); mask[last] = mask[last] | (1l << bit);
@ -120,3 +124,8 @@ uint64_t allocator::find_free()
} }
return addr; return addr;
} }
uint64_t allocator::get_free_count()
{
return free;
}

View File

@ -6,6 +6,7 @@
class allocator class allocator
{ {
uint64_t size; uint64_t size;
uint64_t free;
uint64_t last_one_mask; uint64_t last_one_mask;
uint64_t *mask; uint64_t *mask;
public: public:
@ -13,4 +14,5 @@ public:
~allocator(); ~allocator();
void set(uint64_t addr, bool value); void set(uint64_t addr, bool value);
uint64_t find_free(); uint64_t find_free();
uint64_t get_free_count();
}; };

View File

@ -260,6 +260,14 @@ void blockstore::check_wait(blockstore_operation *op)
} }
op->wait_for = 0; op->wait_for = 0;
} }
else if (op->wait_for == WAIT_FREE)
{
if (!data_alloc->get_free_count() && !flusher->is_active())
{
return;
}
op->wait_for = 0;
}
else else
{ {
throw std::runtime_error("BUG: op->wait_for value is unexpected"); throw std::runtime_error("BUG: op->wait_for value is unexpected");

View File

@ -198,6 +198,8 @@ public:
#define WAIT_JOURNAL 3 #define WAIT_JOURNAL 3
// Suspend operation until the next journal sector buffer is free // Suspend operation until the next journal sector buffer is free
#define WAIT_JOURNAL_BUFFER 4 #define WAIT_JOURNAL_BUFFER 4
// Suspend operation until there is some free space on the data device
#define WAIT_FREE 5
struct blockstore_operation struct blockstore_operation
{ {

View File

@ -72,7 +72,7 @@ int blockstore_init_meta::loop()
} }
} }
// metadata read finished // metadata read finished
printf("Metadata entries loaded: %d\n", entries_loaded); printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count);
free(metadata_buffer); free(metadata_buffer);
metadata_buffer = NULL; metadata_buffer = NULL;
return 0; return 0;
@ -93,10 +93,9 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int
if (clean_it != bs->clean_db.end()) if (clean_it != bs->clean_db.end())
{ {
// free the previous block // free the previous block
bs->data_alloc->set(clean_it->second.version >> block_order, false); bs->data_alloc->set(clean_it->second.location >> block_order, false);
} }
else entries_loaded++;
entries_loaded++;
bs->data_alloc->set(done_cnt+i, true); bs->data_alloc->set(done_cnt+i, true);
bs->clean_db[entries[i].oid] = (struct clean_entry){ bs->clean_db[entries[i].oid] = (struct clean_entry){
.version = entries[i].version, .version = entries[i].version,
@ -287,7 +286,7 @@ resume_1:
} }
} }
} }
printf("Journal entries loaded: %d\n", entries_loaded); printf("Journal entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count);
if (!bs->journal.inmemory) if (!bs->journal.inmemory)
{ {
free(journal_buffer); free(journal_buffer);
@ -397,6 +396,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len)
.len = bs->block_size, .len = bs->block_size,
.journal_sector = proc_pos, .journal_sector = proc_pos,
}); });
bs->data_alloc->set(je->big_write.location >> bs->block_order, true);
bs->journal.used_sectors[proc_pos]++; bs->journal.used_sectors[proc_pos]++;
auto & unstab = bs->unstable_writes[ov.oid]; auto & unstab = bs->unstable_writes[ov.oid];
unstab = unstab < ov.version ? ov.version : unstab; unstab = unstab < ov.version ? ov.version : unstab;

View File

@ -7,7 +7,7 @@ class blockstore_init_meta
uint8_t *metadata_buffer = NULL; uint8_t *metadata_buffer = NULL;
uint64_t metadata_read = 0; uint64_t metadata_read = 0;
int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0; int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0;
int entries_loaded = 0; uint64_t entries_loaded = 0;
struct io_uring_sqe *sqe; struct io_uring_sqe *sqe;
struct ring_data_t *data; struct ring_data_t *data;
void handle_entries(struct clean_disk_entry* entries, int count, int block_order); void handle_entries(struct clean_disk_entry* entries, int count, int block_order);
@ -21,7 +21,7 @@ class blockstore_init_journal
{ {
blockstore *bs; blockstore *bs;
int wait_state = 0, wait_count = 0; int wait_state = 0, wait_count = 0;
int entries_loaded = 0; uint64_t entries_loaded = 0;
void *journal_buffer = NULL; void *journal_buffer = NULL;
uint32_t crc32_last = 0; uint32_t crc32_last = 0;
bool started = false; bool started = false;

View File

@ -37,7 +37,7 @@ void blockstore::calc_lengths(blockstore_config_t & config)
} }
// required metadata size // required metadata size
block_count = data_len / block_size; block_count = data_len / block_size;
meta_len = (block_count / (512 / sizeof(clean_disk_entry))) * 512; meta_len = ((block_count - 1 + 512 / sizeof(clean_disk_entry)) / (512 / sizeof(clean_disk_entry))) * 512;
if (meta_area < meta_len) if (meta_area < meta_len)
{ {
throw std::runtime_error("Metadata area is too small"); throw std::runtime_error("Metadata area is too small");

View File

@ -49,7 +49,6 @@ void blockstore::enqueue_write(blockstore_operation *op)
// First step of the write algorithm: dequeue operation and submit initial write(s) // First step of the write algorithm: dequeue operation and submit initial write(s)
int blockstore::dequeue_write(blockstore_operation *op) int blockstore::dequeue_write(blockstore_operation *op)
{ {
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
auto dirty_it = dirty_db.find((obj_ver_id){ auto dirty_it = dirty_db.find((obj_ver_id){
.oid = op->oid, .oid = op->oid,
.version = op->version, .version = op->version,
@ -61,6 +60,12 @@ int blockstore::dequeue_write(blockstore_operation *op)
if (loc == UINT64_MAX) if (loc == UINT64_MAX)
{ {
// no space // no space
if (flusher->is_active())
{
// hope that some space will be available after flush
op->wait_for = WAIT_FREE;
return 0;
}
op->retval = -ENOSPC; op->retval = -ENOSPC;
op->callback(op); op->callback(op);
return 1; return 1;
@ -87,7 +92,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
op->iov_zerofill[0] = (struct iovec){ op->buf, op->len }; op->iov_zerofill[0] = (struct iovec){ op->buf, op->len };
data->iov.iov_len = op->len; // to check it in the callback data->iov.iov_len = op->len; // to check it in the callback
} }
data->callback = cb; data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); };
my_uring_prep_writev( my_uring_prep_writev(
sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order) sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order)
); );
@ -134,6 +139,7 @@ int blockstore::dequeue_write(blockstore_operation *op)
je->crc32_data = crc32c(0, op->buf, op->len); je->crc32_data = crc32c(0, op->buf, op->len);
je->crc32 = je_crc32((journal_entry*)je); je->crc32 = je_crc32((journal_entry*)je);
journal.crc32_last = je->crc32; journal.crc32_last = je->crc32;
auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); };
prepare_journal_sector_write(journal, sqe1, cb); prepare_journal_sector_write(journal, sqe1, cb);
op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector;
// Prepare journal data write // Prepare journal data write