diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 35dec441..f2f0e7a9 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -8,10 +8,12 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs) this->bs = bs; this->flusher_count = flusher_count; dequeuing = false; + trimming = false; active_flushers = 0; syncing_flushers = 0; + // FIXME: allow to configure flusher_start_threshold and journal_trim_interval flusher_start_threshold = bs->journal_block_size / sizeof(journal_entry_stable); - journal_trim_interval = flusher_start_threshold; + journal_trim_interval = 512; journal_trim_counter = 0; journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign_or_die(MEM_ALIGNMENT, bs->journal_block_size); co = new journal_flusher_co[flusher_count]; @@ -172,6 +174,12 @@ bool journal_flusher_co::loop() goto resume_17; else if (wait_state == 18) goto resume_18; + else if (wait_state == 19) + goto resume_19; + else if (wait_state == 20) + goto resume_20; + else if (wait_state == 21) + goto resume_21; resume_0: if (!flusher->flush_queue.size() || !flusher->dequeuing) { @@ -484,9 +492,18 @@ resume_1: if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0) { flusher->journal_trim_counter = 0; - if (bs->journal.trim()) + new_trim_pos = bs->journal.get_trim_pos(); + if (new_trim_pos != bs->journal.used_start) { - // Update journal "superblock" + resume_19: + // Wait for other coroutines trimming the journal, if any + if (flusher->trimming) + { + wait_state = 19; + return false; + } + flusher->trimming = true; + // First update journal "superblock" and only then update in memory await_sqe(12); *((journal_entry_start*)flusher->journal_superblock) = { .crc32 = 0, @@ -494,7 +511,7 @@ resume_1: .type = JE_START, .size = sizeof(journal_entry_start), .reserved = 0, - .journal_start = bs->journal.used_start, + .journal_start = new_trim_pos, }; ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); data->iov = (struct iovec){ flusher->journal_superblock, bs->journal_block_size }; @@ -507,6 +524,24 @@ resume_1: wait_state = 13; return false; } + if (!bs->disable_journal_fsync) + { + await_sqe(20); + my_uring_prep_fsync(sqe, bs->journal.fd, IORING_FSYNC_DATASYNC); + data->iov = { 0 }; + data->callback = simple_callback_w; + resume_21: + if (wait_count > 0) + { + wait_state = 21; + return false; + } + } + bs->journal.used_start = new_trim_pos; +#ifdef BLOCKSTORE_DEBUG + printf("Journal trimmed to %08lx (next_free=%08lx)\n", bs->journal.used_start, bs->journal.next_free); +#endif + flusher->trimming = false; } } // All done diff --git a/blockstore_flush.h b/blockstore_flush.h index f8a1be3c..b0dc3c18 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -59,6 +59,8 @@ class journal_flusher_co uint64_t clean_bitmap_offset, clean_bitmap_len; void *new_clean_bitmap; + uint64_t new_trim_pos; + // local: scan_dirty() uint64_t offset, end_offset, submit_offset, submit_len; @@ -85,6 +87,7 @@ class journal_flusher_t friend class journal_flusher_co; int journal_trim_counter, journal_trim_interval; + bool trimming; void* journal_superblock; int active_flushers; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 3d061628..caed4190 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -399,8 +399,6 @@ resume_1: } } } - // Trim journal on start so we don't stall when all entries are older - bs->journal.trim(); bs->journal.dirty_start = bs->journal.next_free; printf( "Journal entries loaded: %lu, free journal space: %lu bytes (%08lx..%08lx is used), free blocks: %lu / %lu\n", diff --git a/blockstore_journal.cpp b/blockstore_journal.cpp index a9be5f03..22ea483e 100644 --- a/blockstore_journal.cpp +++ b/blockstore_journal.cpp @@ -184,7 +184,7 @@ journal_t::~journal_t() buffer = NULL; } -bool journal_t::trim() +uint64_t journal_t::get_trim_pos() { auto journal_used_it = used_sectors.lower_bound(used_start); #ifdef BLOCKSTORE_DEBUG @@ -202,26 +202,19 @@ bool journal_t::trim() if (journal_used_it == used_sectors.end()) { // Journal is empty - used_start = next_free; + return next_free; } else { - used_start = journal_used_it->first; - // next_free does not need updating here + // next_free does not need updating during trim + return journal_used_it->first; } } else if (journal_used_it->first > used_start) { // Journal is cleared up to - used_start = journal_used_it->first; + return journal_used_it->first; } - else - { - // Can't trim journal - return false; - } -#ifdef BLOCKSTORE_DEBUG - printf("Journal trimmed to %08lx (next_free=%08lx)\n", used_start, next_free); -#endif - return true; + // Can't trim journal + return used_start; } diff --git a/blockstore_journal.h b/blockstore_journal.h index bda2cd77..9fe91061 100644 --- a/blockstore_journal.h +++ b/blockstore_journal.h @@ -169,6 +169,7 @@ struct journal_t ~journal_t(); bool trim(); + uint64_t get_trim_pos(); }; struct blockstore_journal_check_t diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index 35f2ddfa..717e77ae 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -148,7 +148,6 @@ resume_5: { mark_rolled_back(*v); } - journal.trim(); // Acknowledge op op->retval = 0; FINISH_OP(op);