Refactor blockstore_init_meta into slightly more obvious code

Vitaliy Filippov 2022-08-21 01:21:13 +03:00
parent d83580bd68
commit 153c73574a
2 changed files with 80 additions and 69 deletions

View File

@ -22,20 +22,19 @@ blockstore_init_meta::blockstore_init_meta(blockstore_impl_t *bs)
this->bs = bs; this->bs = bs;
} }
void blockstore_init_meta::handle_event(ring_data_t *data) void blockstore_init_meta::handle_event(ring_data_t *data, int buf_num)
{ {
if (data->res < 0) if (data->res < 0)
{ {
throw std::runtime_error( throw std::runtime_error(
std::string("read metadata failed at offset ") + std::to_string(metadata_read) + std::string("read metadata failed at offset ") + std::to_string(bufs[buf_num].offset) +
std::string(": ") + strerror(-data->res) std::string(": ") + strerror(-data->res)
); );
} }
prev_done = data->res > 0 ? submitted : 0; if (buf_num >= 0)
done_len = data->res; bufs[buf_num].state = 2;
done_pos = metadata_read; submitted--;
metadata_read += data->res; bs->ringloop->wakeup();
submitted = 0;
} }
int blockstore_init_meta::loop() int blockstore_init_meta::loop()
@ -58,12 +57,12 @@ int blockstore_init_meta::loop()
// Read superblock // Read superblock
GET_SQE(); GET_SQE();
data->iov = { metadata_buffer, bs->dsk.meta_block_size }; data->iov = { metadata_buffer, bs->dsk.meta_block_size };
data->callback = [this](ring_data_t *data) { handle_event(data); }; data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
my_uring_prep_readv(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset); my_uring_prep_readv(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset);
bs->ringloop->submit(); bs->ringloop->submit();
submitted = 1; submitted++;
resume_1: resume_1:
if (submitted) if (submitted > 0)
{ {
wait_state = 1; wait_state = 1;
return 1; return 1;
@ -88,10 +87,10 @@ resume_1:
printf("Initializing metadata area\n"); printf("Initializing metadata area\n");
GET_SQE(); GET_SQE();
data->iov = (struct iovec){ metadata_buffer, bs->dsk.meta_block_size }; data->iov = (struct iovec){ metadata_buffer, bs->dsk.meta_block_size };
data->callback = [this](ring_data_t *data) { handle_event(data); }; data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset); my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset);
bs->ringloop->submit(); bs->ringloop->submit();
submitted = 1; submitted++;
resume_3: resume_3:
if (submitted > 0) if (submitted > 0)
{ {
@ -131,61 +130,64 @@ resume_1:
} }
// Skip superblock // Skip superblock
md_offset = bs->dsk.meta_block_size; md_offset = bs->dsk.meta_block_size;
metadata_read = bs->dsk.meta_block_size; next_offset = md_offset;
prev_done = 0;
done_len = 0;
done_pos = 0;
// Read the rest of the metadata // Read the rest of the metadata
while (1) resume_2:
if (next_offset < bs->dsk.meta_len && submitted == 0)
{ {
resume_2: // Submit one read
if (submitted) for (int i = 0; i < 2; i++)
{ {
wait_state = 2; if (!bufs[i].state)
return 1;
}
if (metadata_read < bs->dsk.meta_len)
{
GET_SQE();
data->iov = {
(uint8_t*)metadata_buffer + (bs->inmemory_meta
? metadata_read-md_offset
: (prev == 1 ? bs->metadata_buf_size : 0)),
bs->dsk.meta_len - metadata_read > bs->metadata_buf_size ? bs->metadata_buf_size : bs->dsk.meta_len - metadata_read,
};
data->callback = [this](ring_data_t *data) { handle_event(data); };
if (!zero_on_init)
my_uring_prep_readv(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + metadata_read);
else
{ {
// Fill metadata with zeroes bufs[i].buf = (uint8_t*)metadata_buffer + (bs->inmemory_meta
memset(data->iov.iov_base, 0, data->iov.iov_len); ? next_offset-md_offset
my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + metadata_read); : i*bs->metadata_buf_size);
bufs[i].offset = next_offset;
bufs[i].size = bs->dsk.meta_len-next_offset > bs->metadata_buf_size
? bs->metadata_buf_size : bs->dsk.meta_len-next_offset;
bufs[i].state = 1;
submitted++;
next_offset += bufs[i].size;
GET_SQE();
data->iov = { bufs[i].buf, bufs[i].size };
data->callback = [this, i](ring_data_t *data) { handle_event(data, i); };
if (!zero_on_init)
my_uring_prep_readv(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bufs[i].offset);
else
{
// Fill metadata with zeroes
memset(data->iov.iov_base, 0, data->iov.iov_len);
my_uring_prep_writev(sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bufs[i].offset);
}
bs->ringloop->submit();
break;
} }
bs->ringloop->submit();
submitted = (prev == 1 ? 2 : 1);
prev = submitted;
} }
if (prev_done) }
for (int i = 0; i < 2; i++)
{
if (bufs[i].state == 2)
{ {
void *done_buf = bs->inmemory_meta // Handle result
? ((uint8_t*)metadata_buffer + done_pos-md_offset) unsigned entries_per_block = bs->dsk.meta_block_size / bs->dsk.clean_entry_size;
: ((uint8_t*)metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0)); for (uint64_t sector = 0; sector < bufs[i].size; sector += bs->dsk.meta_block_size)
unsigned count = bs->dsk.meta_block_size / bs->dsk.clean_entry_size;
for (int sector = 0; sector < done_len; sector += bs->dsk.meta_block_size)
{ {
// handle <count> entries // handle <count> entries
handle_entries((uint8_t*)done_buf + sector, count, bs->dsk.block_order); handle_entries(
done_cnt += count; bufs[i].buf + sector, entries_per_block,
((bufs[i].offset + sector - md_offset) / bs->dsk.meta_block_size) * entries_per_block
);
} }
prev_done = 0; bufs[i].state = 0;
done_len = 0; bs->ringloop->wakeup();
}
if (!submitted)
{
break;
} }
} }
if (submitted > 0)
{
wait_state = 2;
return 1;
}
// metadata read finished // metadata read finished
printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->dsk.block_count); printf("Metadata entries loaded: %lu, free blocks: %lu / %lu\n", entries_loaded, bs->data_alloc->get_free_count(), bs->dsk.block_count);
if (!bs->inmemory_meta) if (!bs->inmemory_meta)
@ -198,8 +200,8 @@ resume_1:
GET_SQE(); GET_SQE();
my_uring_prep_fsync(sqe, bs->dsk.meta_fd, IORING_FSYNC_DATASYNC); my_uring_prep_fsync(sqe, bs->dsk.meta_fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 }; data->iov = { 0 };
data->callback = [this](ring_data_t *data) { handle_event(data); }; data->callback = [this](ring_data_t *data) { handle_event(data, -1); };
submitted = 1; submitted++;
bs->ringloop->submit(); bs->ringloop->submit();
resume_4: resume_4:
if (submitted > 0) if (submitted > 0)
@ -211,12 +213,12 @@ resume_1:
return 0; return 0;
} }
bool blockstore_init_meta::handle_entries(void* entries, unsigned count, int block_order) bool blockstore_init_meta::handle_entries(uint8_t *buf, uint64_t count, uint64_t done_cnt)
{ {
bool updated = false; bool updated = false;
for (unsigned i = 0; i < count; i++) for (uint64_t i = 0; i < count; i++)
{ {
clean_disk_entry *entry = (clean_disk_entry*)((uint8_t*)entries + i*bs->dsk.clean_entry_size); clean_disk_entry *entry = (clean_disk_entry*)(buf + i*bs->dsk.clean_entry_size);
if (!bs->inmemory_meta && bs->dsk.clean_entry_bitmap_size) if (!bs->inmemory_meta && bs->dsk.clean_entry_bitmap_size)
{ {
memcpy(bs->clean_bitmap + (done_cnt+i)*2*bs->dsk.clean_entry_bitmap_size, &entry->bitmap, 2*bs->dsk.clean_entry_bitmap_size); memcpy(bs->clean_bitmap + (done_cnt+i)*2*bs->dsk.clean_entry_bitmap_size, &entry->bitmap, 2*bs->dsk.clean_entry_bitmap_size);
@ -237,11 +239,11 @@ bool blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
memset(entry, 0, bs->dsk.clean_entry_size); memset(entry, 0, bs->dsk.clean_entry_size);
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
printf("Free block %lu from %lx:%lx v%lu (new location is %lu)\n", printf("Free block %lu from %lx:%lx v%lu (new location is %lu)\n",
clean_it->second.location >> block_order, clean_it->second.location >> bs->dsk.block_order,
clean_it->first.inode, clean_it->first.stripe, clean_it->second.version, clean_it->first.inode, clean_it->first.stripe, clean_it->second.version,
done_cnt+i); done_cnt+i);
#endif #endif
bs->data_alloc->set(clean_it->second.location >> block_order, false); bs->data_alloc->set(clean_it->second.location >> bs->dsk.block_order, false);
} }
else else
{ {
@ -254,7 +256,7 @@ bool blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
bs->data_alloc->set(done_cnt+i, true); bs->data_alloc->set(done_cnt+i, true);
clean_db[entry->oid] = (struct clean_entry){ clean_db[entry->oid] = (struct clean_entry){
.version = entry->version, .version = entry->version,
.location = (done_cnt+i) << block_order, .location = (done_cnt+i) << bs->dsk.block_order,
}; };
} }
else else

View File

@ -3,20 +3,29 @@
#pragma once #pragma once
struct blockstore_init_meta_buf
{
uint8_t *buf = NULL;
uint64_t size = 0;
uint64_t offset = 0;
int state = 0;
};
class blockstore_init_meta class blockstore_init_meta
{ {
blockstore_impl_t *bs; blockstore_impl_t *bs;
int wait_state = 0; int wait_state = 0;
bool zero_on_init = false; bool zero_on_init = false;
void *metadata_buffer = NULL; void *metadata_buffer = NULL;
uint64_t metadata_read = 0, md_offset = 0; blockstore_init_meta_buf bufs[2] = {};
int prev = 0, prev_done = 0, done_len = 0, submitted = 0; int submitted = 0;
uint64_t done_cnt = 0, done_pos = 0;
uint64_t entries_loaded = 0;
struct io_uring_sqe *sqe; struct io_uring_sqe *sqe;
struct ring_data_t *data; struct ring_data_t *data;
bool handle_entries(void *entries, unsigned count, int block_order); uint64_t md_offset = 0;
void handle_event(ring_data_t *data); uint64_t next_offset = 0;
uint64_t entries_loaded = 0;
bool handle_entries(uint8_t *buf, uint64_t count, uint64_t done_cnt);
void handle_event(ring_data_t *data, int buf_num);
public: public:
blockstore_init_meta(blockstore_impl_t *bs); blockstore_init_meta(blockstore_impl_t *bs);
int loop(); int loop();