From 82a2b8e7d9cce498f34e992861d5bf622077124a Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 21 Nov 2019 23:45:19 +0300 Subject: [PATCH] Fix some extra bugs and it seems now it is even able to trim the journal --- blockstore_flush.cpp | 34 ++++++++++++++++++++++++---------- blockstore_init.cpp | 8 ++++---- blockstore_journal.h | 2 +- blockstore_stable.cpp | 12 ++++++++++-- test_blockstore.cpp | 2 +- 5 files changed, 40 insertions(+), 18 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 133d4647..c0420f6b 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -172,7 +172,7 @@ resume_0: data->iov = (struct iovec){ v.back().buf, (size_t)submit_len }; data->callback = simple_callback; my_uring_prep_readv( - sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset + sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset + dirty_it->second.location + offset - dirty_it->second.offset ); wait_count++; } @@ -185,7 +185,6 @@ resume_0: else if (dirty_it->second.state == ST_D_STABLE) { // There is an unflushed big write. Copy small writes in its position - printf("found "); if (!skip_copy) { clean_loc = dirty_it->second.location; @@ -274,10 +273,12 @@ resume_0: } else meta_it->second.usage_count++; - wait_state = 3; resume_3: if (wait_count > 0) + { + wait_state = 3; return; + } // Reads completed, submit writes for (it = v.begin(); it != v.end(); it++) { @@ -297,12 +298,12 @@ resume_0: wait_state = 5; return; } - *((clean_disk_entry*)meta_it->second.buf + meta_pos) = { + ((clean_disk_entry*)meta_it->second.buf)[meta_pos] = { .oid = cur.oid, .version = cur.version, }; // I consider unordered writes to data & metadata safe here - // BUT it requires that journal entries even older than clean_db should be replayed after restart + // BUT it requires that journal entries even older than clean_db are replayed after restart await_sqe(6); data->iov = (struct iovec){ meta_it->second.buf, 512 }; data->callback = simple_callback; @@ -310,10 +311,12 @@ resume_0: sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector ); wait_count++; - wait_state = 7; resume_7: if (wait_count > 0) + { + wait_state = 7; return; + } // Done, free all buffers meta_it->second.usage_count--; if (meta_it->second.usage_count == 0) @@ -383,7 +386,7 @@ resume_0: .location = clean_loc, }; dirty_it = dirty_end; - do + while (1) { if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) { @@ -394,8 +397,16 @@ resume_0: { bs->journal.used_sectors.erase(dirty_it->second.journal_sector); } + if (dirty_it == bs->dirty_db.begin()) + { + break; + } dirty_it--; - } while (dirty_it != bs->dirty_db.begin() && dirty_it->first.oid == cur.oid); + if (dirty_it->first.oid != cur.oid) + { + break; + } + } // Then, basically, remove everything up to the current version from dirty_db... if (dirty_it->first.oid != cur.oid) dirty_it++; @@ -417,6 +428,7 @@ resume_0: else { bs->journal.used_start = journal_used_it->first; + // next_free does not need updating here } } else if (journal_used_it->first > bs->journal.used_start) @@ -444,10 +456,12 @@ resume_0: data->iov = (struct iovec){ flusher->journal_superblock, 512 }; my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); wait_count++; - wait_state = 13; resume_13: if (wait_count > 0) + { + wait_state = 13; return; + } } do_not_trim: // All done @@ -458,8 +472,8 @@ resume_0: { // Requeue version flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); + flusher->sync_to_repeat.erase(repeat_it); } - flusher->sync_to_repeat.erase(repeat_it); goto resume_0; } } diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 0da111e5..0cc61d3f 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -58,7 +58,7 @@ int blockstore_init_meta::loop() int count = 512 / sizeof(clean_disk_entry); for (int sector = 0; sector < done_len; sector += 512) { - clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 1 ? bs->metadata_buf_size : 0) + sector); + clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0) + sector); // handle entries handle_entries(entries, count, bs->block_order); done_cnt += count; @@ -88,12 +88,13 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int auto clean_it = bs->clean_db.find(entries[i].oid); if (clean_it == end || clean_it->second.version < entries[i].version) { - entries_loaded++; if (clean_it != end) { // free the previous block allocator_set(bs->data_alloc, clean_it->second.version >> block_order, false); } + else + entries_loaded++; allocator_set(bs->data_alloc, done_cnt+i, true); bs->clean_db[entries[i].oid] = (struct clean_entry){ .version = entries[i].version, @@ -109,7 +110,6 @@ blockstore_init_journal::blockstore_init_journal(blockstore *bs) this->bs = bs; simple_callback = [this](ring_data_t *data1) { - printf("%d %d\n", data1->res, data1->iov.iov_len); if (data1->res != data1->iov.iov_len) { throw std::runtime_error(std::string("I/O operation failed while reading journal: ") + strerror(-data1->res)); @@ -235,7 +235,7 @@ resume_1: journal_pos = bs->journal.used_start = je_start->journal_start; crc32_last = 0; // Read journal - while (true) + while (1) { resume_2: if (submitted) diff --git a/blockstore_journal.h b/blockstore_journal.h index a68de6d5..0185dbe9 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -122,7 +122,7 @@ struct journal_t journal_sector_info_t *sector_info; uint64_t sector_count; int cur_sector = 0; - int in_sector_pos = 512; // no free space because sector is initially inmapped + int in_sector_pos = 512; // no free space because sector is initially unmapped // Used sector map // May use ~ 80 MB per 1 GB of used journal space in the worst case diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index a5d299af..3bcc7bd5 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -126,7 +126,7 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op auto dirty_it = dirty_db.find(*v); if (dirty_it != dirty_db.end()) { - do + while (1) { if (dirty_it->second.state == ST_J_SYNCED) { @@ -140,8 +140,16 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op { break; } + if (dirty_it == dirty_db.begin()) + { + break; + } dirty_it--; - } while (dirty_it != dirty_db.begin() && dirty_it->first.oid == v->oid); + if (dirty_it->first.oid != v->oid) + { + break; + } + } flusher->queue_flush(*v); } } diff --git a/test_blockstore.cpp b/test_blockstore.cpp index aae3d760..8afdf0cf 100644 --- a/test_blockstore.cpp +++ b/test_blockstore.cpp @@ -138,7 +138,7 @@ int main(int narg, char *args[]) }; ringloop->register_consumer(main_cons); - while (true) + while (1) { ringloop->loop(); }