diff --git a/allocator.cpp b/allocator.cpp index 991aadd1..9eedf351 100644 --- a/allocator.cpp +++ b/allocator.cpp @@ -19,7 +19,7 @@ allocator::allocator(uint64_t blocks) total -= p2; total += (blocks+63) / 64; mask = new uint64_t[2 + total]; - size = blocks; + size = free = blocks; last_one_mask = (blocks % 64) == 0 ? UINT64_MAX : ~(UINT64_MAX << (64 - blocks % 64)); @@ -55,6 +55,10 @@ void allocator::set(uint64_t addr, bool value) uint64_t bit = cur_addr % 64; if (((mask[last] >> bit) & 1) != value64) { + if (is_last) + { + free += value ? -1 : 1; + } if (value) { mask[last] = mask[last] | (1l << bit); @@ -120,3 +124,8 @@ uint64_t allocator::find_free() } return addr; } + +uint64_t allocator::get_free_count() +{ + return free; +} diff --git a/allocator.h b/allocator.h index 4adb2dfa..835071fd 100644 --- a/allocator.h +++ b/allocator.h @@ -6,6 +6,7 @@ class allocator { uint64_t size; + uint64_t free; uint64_t last_one_mask; uint64_t *mask; public: @@ -13,4 +14,5 @@ public: ~allocator(); void set(uint64_t addr, bool value); uint64_t find_free(); + uint64_t get_free_count(); }; diff --git a/blockstore.cpp b/blockstore.cpp index 4cbcaee0..ea06d9d8 100644 --- a/blockstore.cpp +++ b/blockstore.cpp @@ -260,6 +260,14 @@ void blockstore::check_wait(blockstore_operation *op) } op->wait_for = 0; } + else if (op->wait_for == WAIT_FREE) + { + if (!data_alloc->get_free_count() && !flusher->is_active()) + { + return; + } + op->wait_for = 0; + } else { throw std::runtime_error("BUG: op->wait_for value is unexpected"); diff --git a/blockstore.h b/blockstore.h index 5bef2c8f..8101398a 100644 --- a/blockstore.h +++ b/blockstore.h @@ -198,6 +198,8 @@ public: #define WAIT_JOURNAL 3 // Suspend operation until the next journal sector buffer is free #define WAIT_JOURNAL_BUFFER 4 +// Suspend operation until there is some free space on the data device +#define WAIT_FREE 5 struct blockstore_operation { diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 9638f522..f526099c 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -72,7 +72,7 @@ int blockstore_init_meta::loop() } } // metadata read finished - printf("Metadata entries loaded: %d\n", entries_loaded); + printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count); free(metadata_buffer); metadata_buffer = NULL; return 0; @@ -93,10 +93,9 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int if (clean_it != bs->clean_db.end()) { // free the previous block - bs->data_alloc->set(clean_it->second.version >> block_order, false); + bs->data_alloc->set(clean_it->second.location >> block_order, false); } - else - entries_loaded++; + entries_loaded++; bs->data_alloc->set(done_cnt+i, true); bs->clean_db[entries[i].oid] = (struct clean_entry){ .version = entries[i].version, @@ -287,7 +286,7 @@ resume_1: } } } - printf("Journal entries loaded: %d\n", entries_loaded); + printf("Journal entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->block_count); if (!bs->journal.inmemory) { free(journal_buffer); @@ -397,6 +396,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t len) .len = bs->block_size, .journal_sector = proc_pos, }); + bs->data_alloc->set(je->big_write.location >> bs->block_order, true); bs->journal.used_sectors[proc_pos]++; auto & unstab = bs->unstable_writes[ov.oid]; unstab = unstab < ov.version ? ov.version : unstab; diff --git a/blockstore_init.h b/blockstore_init.h index db9ee77c..e85705fd 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -7,7 +7,7 @@ class blockstore_init_meta uint8_t *metadata_buffer = NULL; uint64_t metadata_read = 0; int prev = 0, prev_done = 0, done_len = 0, submitted = 0, done_cnt = 0; - int entries_loaded = 0; + uint64_t entries_loaded = 0; struct io_uring_sqe *sqe; struct ring_data_t *data; void handle_entries(struct clean_disk_entry* entries, int count, int block_order); @@ -21,7 +21,7 @@ class blockstore_init_journal { blockstore *bs; int wait_state = 0, wait_count = 0; - int entries_loaded = 0; + uint64_t entries_loaded = 0; void *journal_buffer = NULL; uint32_t crc32_last = 0; bool started = false; diff --git a/blockstore_open.cpp b/blockstore_open.cpp index 8e0b70ee..0b34d252 100644 --- a/blockstore_open.cpp +++ b/blockstore_open.cpp @@ -37,7 +37,7 @@ void blockstore::calc_lengths(blockstore_config_t & config) } // required metadata size block_count = data_len / block_size; - meta_len = (block_count / (512 / sizeof(clean_disk_entry))) * 512; + meta_len = ((block_count - 1 + 512 / sizeof(clean_disk_entry)) / (512 / sizeof(clean_disk_entry))) * 512; if (meta_area < meta_len) { throw std::runtime_error("Metadata area is too small"); diff --git a/blockstore_write.cpp b/blockstore_write.cpp index ee0857a6..498607bc 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -49,7 +49,6 @@ void blockstore::enqueue_write(blockstore_operation *op) // First step of the write algorithm: dequeue operation and submit initial write(s) int blockstore::dequeue_write(blockstore_operation *op) { - auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; auto dirty_it = dirty_db.find((obj_ver_id){ .oid = op->oid, .version = op->version, @@ -61,6 +60,12 @@ int blockstore::dequeue_write(blockstore_operation *op) if (loc == UINT64_MAX) { // no space + if (flusher->is_active()) + { + // hope that some space will be available after flush + op->wait_for = WAIT_FREE; + return 0; + } op->retval = -ENOSPC; op->callback(op); return 1; @@ -87,7 +92,7 @@ int blockstore::dequeue_write(blockstore_operation *op) op->iov_zerofill[0] = (struct iovec){ op->buf, op->len }; data->iov.iov_len = op->len; // to check it in the callback } - data->callback = cb; + data->callback = [this, op](ring_data_t *data) { handle_write_event(data, op); }; my_uring_prep_writev( sqe, data_fd, op->iov_zerofill, vcnt, data_offset + (loc << block_order) ); @@ -134,6 +139,7 @@ int blockstore::dequeue_write(blockstore_operation *op) je->crc32_data = crc32c(0, op->buf, op->len); je->crc32 = je_crc32((journal_entry*)je); journal.crc32_last = je->crc32; + auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; prepare_journal_sector_write(journal, sqe1, cb); op->min_used_journal_sector = op->max_used_journal_sector = 1 + journal.cur_sector; // Prepare journal data write