// Copyright (c) Vitaliy Filippov, 2019+ // License: VNPL-1.1 (see README.md for details) #include "blockstore_impl.h" #define META_BLOCK_UNREAD 0 #define META_BLOCK_READ 1 journal_flusher_t::journal_flusher_t(blockstore_impl_t *bs) { this->bs = bs; this->max_flusher_count = bs->max_flusher_count; this->min_flusher_count = bs->min_flusher_count; this->cur_flusher_count = bs->min_flusher_count; this->target_flusher_count = bs->min_flusher_count; dequeuing = false; trimming = false; active_flushers = 0; syncing_flushers = 0; // FIXME: allow to configure flusher_start_threshold and journal_trim_interval flusher_start_threshold = bs->dsk.journal_block_size / sizeof(journal_entry_stable); journal_trim_counter = bs->journal.flush_journal ? 1 : 0; trim_wanted = bs->journal.flush_journal ? 1 : 0; journal_superblock = bs->journal.inmemory ? bs->journal.buffer : memalign_or_die(MEM_ALIGNMENT, bs->dsk.journal_block_size); co = new journal_flusher_co[max_flusher_count]; for (int i = 0; i < max_flusher_count; i++) { co[i].bs = bs; co[i].flusher = this; } } journal_flusher_co::journal_flusher_co() { wait_state = 0; simple_callback_r = [this](ring_data_t* data) { bs->live = true; if (data->res != data->iov.iov_len) bs->disk_error_abort("read operation during flush", data->res, data->iov.iov_len); wait_count--; }; simple_callback_rj = [this](ring_data_t* data) { bs->live = true; if (data->res != data->iov.iov_len) bs->disk_error_abort("read operation during flush", data->res, data->iov.iov_len); wait_journal_count--; }; simple_callback_w = [this](ring_data_t* data) { bs->live = true; if (data->res != data->iov.iov_len) bs->disk_error_abort("write operation during flush", data->res, data->iov.iov_len); wait_count--; }; } journal_flusher_t::~journal_flusher_t() { if (!bs->journal.inmemory) free(journal_superblock); delete[] co; } bool journal_flusher_t::is_active() { return active_flushers > 0 || dequeuing; } void journal_flusher_t::loop() { target_flusher_count = bs->write_iodepth*2; if (target_flusher_count < min_flusher_count) target_flusher_count = min_flusher_count; else if (target_flusher_count > max_flusher_count) target_flusher_count = max_flusher_count; if (target_flusher_count > cur_flusher_count) cur_flusher_count = target_flusher_count; else if (target_flusher_count < cur_flusher_count) { while (target_flusher_count < cur_flusher_count) { if (co[cur_flusher_count-1].wait_state) break; cur_flusher_count--; } } if (trim_wanted) co[0].try_trim = true; for (int i = 0; (active_flushers > 0 || dequeuing || trim_wanted > 0) && i < cur_flusher_count; i++) co[i].loop(); } void journal_flusher_t::enqueue_flush(obj_ver_id ov) { #ifdef BLOCKSTORE_DEBUG printf("enqueue_flush %jx:%jx v%ju\n", ov.oid.inode, ov.oid.stripe, ov.version); #endif auto it = flush_versions.find(ov.oid); if (it != flush_versions.end()) { if (it->second < ov.version) it->second = ov.version; } else { flush_versions[ov.oid] = ov.version; flush_queue.push_back(ov.oid); } if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0)) { dequeuing = true; bs->ringloop->wakeup(); } } void journal_flusher_t::unshift_flush(obj_ver_id ov, bool force) { #ifdef BLOCKSTORE_DEBUG printf("unshift_flush %jx:%jx v%ju\n", ov.oid.inode, ov.oid.stripe, ov.version); #endif auto it = flush_versions.find(ov.oid); if (it != flush_versions.end()) { if (it->second < ov.version) it->second = ov.version; } else { flush_versions[ov.oid] = ov.version; if (!force) flush_queue.push_front(ov.oid); } if (force) flush_queue.push_front(ov.oid); if (force || !dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0)) { dequeuing = true; bs->ringloop->wakeup(); } } void journal_flusher_t::remove_flush(object_id oid) { #ifdef BLOCKSTORE_DEBUG printf("undo_flush %jx:%jx\n", oid.inode, oid.stripe); #endif auto v_it = flush_versions.find(oid); if (v_it != flush_versions.end()) { flush_versions.erase(v_it); for (auto q_it = flush_queue.begin(); q_it != flush_queue.end(); q_it++) { if (*q_it == oid) { flush_queue.erase(q_it); break; } } } } bool journal_flusher_t::is_mutated(uint64_t clean_loc) { for (int i = 0; i < cur_flusher_count; i++) { if (co[i].clean_loc == clean_loc && co[i].copy_count > 0) { return true; } } return false; } void journal_flusher_t::request_trim() { dequeuing = true; trim_wanted++; bs->ringloop->wakeup(); } void journal_flusher_t::mark_trim_possible() { if (trim_wanted > 0) { dequeuing = true; journal_trim_counter = 0; bs->ringloop->wakeup(); } } void journal_flusher_t::release_trim() { trim_wanted--; } void journal_flusher_t::dump_diagnostics() { const char *unflushable_type = ""; obj_ver_id unflushable = {}; // Try to find out if there is a flushable object for information for (object_id cur_oid: flush_queue) { obj_ver_id cur = { .oid = cur_oid, .version = flush_versions[cur_oid] }; auto dirty_end = bs->dirty_db.find(cur); if (dirty_end == bs->dirty_db.end()) { // Already flushed continue; } auto repeat_it = sync_to_repeat.find(cur.oid); if (repeat_it != sync_to_repeat.end()) { // Someone is already flushing it unflushable_type = "locked,"; unflushable = cur; break; } if (dirty_end->second.journal_sector >= bs->journal.dirty_start && (bs->journal.dirty_start >= bs->journal.used_start || dirty_end->second.journal_sector < bs->journal.used_start)) { // Object is more recent than possible to flush bool found = try_find_older(dirty_end, cur); if (!found) { unflushable_type = "dirty,"; unflushable = cur; break; } } unflushable_type = "ok,"; unflushable = cur; break; } printf( "Flusher: queued=%zd first=%s%jx:%jx trim_wanted=%d dequeuing=%d trimming=%d cur=%d target=%d active=%d syncing=%d\n", flush_queue.size(), unflushable_type, unflushable.oid.inode, unflushable.oid.stripe, trim_wanted, dequeuing, trimming, cur_flusher_count, target_flusher_count, active_flushers, syncing_flushers ); } bool journal_flusher_t::try_find_older(std::map::iterator & dirty_end, obj_ver_id & cur) { bool found = false; while (dirty_end != bs->dirty_db.begin()) { dirty_end--; if (dirty_end->first.oid != cur.oid) { break; } if (!(dirty_end->second.journal_sector >= bs->journal.dirty_start && (bs->journal.dirty_start >= bs->journal.used_start || dirty_end->second.journal_sector < bs->journal.used_start))) { found = true; cur.version = dirty_end->first.version; break; } } return found; } bool journal_flusher_t::try_find_other(std::map::iterator & dirty_end, obj_ver_id & cur) { int search_left = flush_queue.size() - 1; #ifdef BLOCKSTORE_DEBUG printf("Flusher overran writers (%jx:%jx v%ju, dirty_start=%08jx) - searching for older flushes (%d left)\n", cur.oid.inode, cur.oid.stripe, cur.version, bs->journal.dirty_start, search_left); #endif while (search_left > 0) { cur.oid = flush_queue.front(); cur.version = flush_versions[cur.oid]; flush_queue.pop_front(); flush_versions.erase(cur.oid); dirty_end = bs->dirty_db.find(cur); if (dirty_end != bs->dirty_db.end()) { if (dirty_end->second.journal_sector >= bs->journal.dirty_start && (bs->journal.dirty_start >= bs->journal.used_start || dirty_end->second.journal_sector < bs->journal.used_start)) { #ifdef BLOCKSTORE_DEBUG printf("Write %jx:%jx v%ju is too new: offset=%08jx\n", cur.oid.inode, cur.oid.stripe, cur.version, dirty_end->second.journal_sector); #endif enqueue_flush(cur); } else { auto repeat_it = sync_to_repeat.find(cur.oid); if (repeat_it != sync_to_repeat.end()) { if (repeat_it->second < cur.version) repeat_it->second = cur.version; } else { sync_to_repeat[cur.oid] = 0; break; } } } search_left--; } if (search_left <= 0) { #ifdef BLOCKSTORE_DEBUG printf("No older flushes, stopping\n"); #endif } return search_left > 0; } #define await_sqe(label) \ resume_##label:\ sqe = bs->get_sqe();\ if (!sqe)\ {\ wait_state = wait_base+label;\ return false;\ }\ data = ((ring_data_t*)sqe->user_data); bool journal_flusher_co::loop() { int wait_base = 0; // This is much better than implementing the whole function as an FSM // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... // Or just C++ coroutines, but they require some wrappers if (wait_state == 1) goto resume_1; else if (wait_state == 2) goto resume_2; else if (wait_state == 3) goto resume_3; else if (wait_state == 4) goto resume_4; else if (wait_state == 5) goto resume_5; else if (wait_state == 6) goto resume_6; else if (wait_state == 7) goto resume_7; else if (wait_state == 8) goto resume_8; else if (wait_state == 9) goto resume_9; else if (wait_state == 10) goto resume_10; else if (wait_state == 11) goto resume_11; else if (wait_state == 12) goto resume_12; else if (wait_state == 13) goto resume_13; else if (wait_state == 14) goto resume_14; else if (wait_state == 15) goto resume_15; else if (wait_state == 16) goto resume_16; else if (wait_state == 17) goto resume_17; else if (wait_state == 18) goto resume_18; else if (wait_state == 19) goto resume_19; else if (wait_state == 20) goto resume_20; else if (wait_state == 21) goto resume_21; else if (wait_state == 22) goto resume_22; else if (wait_state == 23) goto resume_23; else if (wait_state == 24) goto resume_24; else if (wait_state == 25) goto resume_25; else if (wait_state == 26) goto resume_26; else if (wait_state == 27) goto resume_27; else if (wait_state == 28) goto resume_28; else if (wait_state == 29) goto resume_29; else if (wait_state == 30) goto resume_30; resume_0: if (flusher->flush_queue.size() < flusher->min_flusher_count && !flusher->trim_wanted || !flusher->flush_queue.size() || !flusher->dequeuing) { stop_flusher: if (flusher->trim_wanted > 0 && try_trim) { // Attempt forced trim try_trim = false; flusher->active_flushers++; goto trim_journal; } flusher->dequeuing = false; wait_state = 0; return true; } try_trim = true; cur.oid = flusher->flush_queue.front(); cur.version = flusher->flush_versions[cur.oid]; flusher->flush_queue.pop_front(); flusher->flush_versions.erase(cur.oid); dirty_end = bs->dirty_db.find(cur); if (dirty_end != bs->dirty_db.end()) { repeat_it = flusher->sync_to_repeat.find(cur.oid); if (repeat_it != flusher->sync_to_repeat.end()) { #ifdef BLOCKSTORE_DEBUG printf("Postpone %jx:%jx v%ju\n", cur.oid.inode, cur.oid.stripe, cur.version); #endif // We don't flush different parts of history of the same object in parallel // So we check if someone is already flushing this object // In that case we set sync_to_repeat and pick another object // Another coroutine will see it and re-queue the object after it finishes if (repeat_it->second < cur.version) repeat_it->second = cur.version; wait_state = 0; goto resume_0; } else flusher->sync_to_repeat[cur.oid] = 0; if (dirty_end->second.journal_sector >= bs->journal.dirty_start && (bs->journal.dirty_start >= bs->journal.used_start || dirty_end->second.journal_sector < bs->journal.used_start)) { flusher->enqueue_flush(cur); // We can't flush journal sectors that are still written to // However, as we group flushes by oid, current oid may have older writes to flush! // And it may even block writes if we don't flush the older version // (if it's in the beginning of the journal)... // So first try to find an older version of the same object to flush. if (!flusher->try_find_older(dirty_end, cur)) { // Try other objects flusher->sync_to_repeat.erase(cur.oid); if (!flusher->try_find_other(dirty_end, cur)) { cur.oid = {}; goto stop_flusher; } } } #ifdef BLOCKSTORE_DEBUG printf("Flushing %jx:%jx v%ju\n", cur.oid.inode, cur.oid.stripe, cur.version); #endif flusher->active_flushers++; // Find it in clean_db { auto & clean_db = bs->clean_db_shard(cur.oid); auto clean_it = clean_db.find(cur.oid); old_clean_ver = (clean_it != clean_db.end() ? clean_it->second.version : 0); old_clean_loc = (clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX); } // Scan dirty versions of the object to determine what we need to read scan_dirty(); // Writes and deletes shouldn't happen at the same time assert(!has_writes || !has_delete); if (!has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX) { // Nothing to flush bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc); goto release_oid; } if (clean_loc == UINT64_MAX) { if (old_clean_loc == UINT64_MAX) { // Object not allocated. This is a bug. char err[1024]; snprintf( err, 1024, "BUG: Object %jx:%jx v%ju that we are trying to flush is not allocated on the data device", cur.oid.inode, cur.oid.stripe, cur.version ); throw std::runtime_error(err); } else { clean_loc = old_clean_loc; clean_ver = old_clean_ver; } } // Submit dirty data and old checksum data reads resume_1: resume_2: if (!read_dirty(1)) return false; // Also we may need to read metadata. We do read-modify-write cycle(s) for every operation. resume_3: resume_4: if (!modify_meta_do_reads(3)) return false; // Now, if csum_block_size is > bitmap_granularity and if we are doing partial checksum block updates, // perform a trick: clear bitmap bits in the metadata entry and recalculate block checksum with zeros // in place of overwritten parts. Then, even if the actual partial update fully or partially fails, // we'll have a correct checksum because it won't include overwritten parts! // The same thing actually happens even when csum_block_size == bitmap_granularity, but in that case // we never need to read (and thus verify) overwritten parts from the data device. resume_5: resume_6: resume_7: resume_8: resume_9: resume_10: resume_11: resume_12: if (fill_incomplete && !clear_incomplete_csum_block_bits(5)) return false; // Wait for journal data reads if the journal is not inmemory resume_13: if (wait_journal_count > 0) { wait_state = wait_base+13; return false; } if (bs->dsk.csum_block_size) { // Mark objects used by reads as modified auto uo_it = bs->used_clean_objects.find(clean_loc); if (uo_it != bs->used_clean_objects.end()) { uo_it->second.was_changed = true; } } // Submit data writes for (it = v.begin(); it != v.end(); it++) { if (it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) { await_sqe(14); data->iov = (struct iovec){ it->buf, (size_t)it->len }; data->callback = simple_callback_w; my_uring_prep_writev( sqe, bs->dsk.data_fd, &data->iov, 1, bs->dsk.data_offset + clean_loc + it->offset ); wait_count++; } } // Wait for data writes and metadata reads resume_15: resume_16: if (!wait_meta_reads(15)) return false; // Sync data before writing metadata resume_17: resume_18: resume_19: if (copy_count && !fsync_batch(false, 17)) return false; // Modify the new metadata entry update_metadata_entry(); // Update clean_db - it must be equal to the metadata entry update_clean_db(); // And write metadata entries if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) { // zero out old metadata entry { clean_disk_entry *old_entry = (clean_disk_entry*)((uint8_t*)meta_old.buf + meta_old.pos*bs->dsk.clean_entry_size); if (old_entry->oid.inode != 0 && old_entry->oid != cur.oid) { printf("Fatal error (metadata corruption or bug): tried to wipe metadata entry %ju (%jx:%jx v%ju) as old location of %jx:%jx\n", old_clean_loc >> bs->dsk.block_order, old_entry->oid.inode, old_entry->oid.stripe, old_entry->version, cur.oid.inode, cur.oid.stripe); exit(1); } } memset((uint8_t*)meta_old.buf + meta_old.pos*bs->dsk.clean_entry_size, 0, bs->dsk.clean_entry_size); resume_20: if (meta_old.sector != meta_new.sector && !write_meta_block(meta_old, 20)) return false; } resume_21: if (!write_meta_block(meta_new, 21)) return false; resume_22: if (wait_count > 0) { wait_state = wait_base+22; return false; } // Done, free all buffers free_buffers(); // And sync metadata (in batches - not per each operation!) resume_23: resume_24: resume_25: if (!fsync_batch(true, 23)) return false; // Free the data block only when metadata is synced free_data_blocks(); // Erase dirty_db entries bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc); #ifdef BLOCKSTORE_DEBUG printf("Flushed %jx:%jx v%ju (%d copies, wr:%d, del:%d), %jd left\n", cur.oid.inode, cur.oid.stripe, cur.version, copy_count, has_writes, has_delete, flusher->flush_queue.size()); #endif release_oid: repeat_it = flusher->sync_to_repeat.find(cur.oid); if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version) { // Requeue version flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }, false); } flusher->sync_to_repeat.erase(repeat_it); trim_journal: // Clear unused part of the journal every flushes if (bs->journal_trim_interval && !((++flusher->journal_trim_counter) % bs->journal_trim_interval) || flusher->trim_wanted > 0) { resume_26: resume_27: resume_28: resume_29: resume_30: if (!trim_journal(26)) return false; } // All done flusher->active_flushers--; wait_state = 0; goto resume_0; } return true; } void journal_flusher_co::update_metadata_entry() { clean_disk_entry *new_entry = (clean_disk_entry*)((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size); if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid) { printf( has_delete ? "Fatal error (metadata corruption or bug): tried to delete metadata entry %ju (%jx:%jx v%ju) while deleting %jx:%jx v%ju\n" : "Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %ju (%jx:%jx v%ju) with %jx:%jx v%ju\n", clean_loc >> bs->dsk.block_order, new_entry->oid.inode, new_entry->oid.stripe, new_entry->version, cur.oid.inode, cur.oid.stripe, cur.version ); exit(1); } if (has_delete) { // Zero out the new metadata entry memset((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size, 0, bs->dsk.clean_entry_size); } else { // Set initial internal bitmap bits from the big write if (clean_init_bitmap) { memset(new_clean_bitmap, 0, bs->dsk.clean_entry_bitmap_size); bitmap_set(new_clean_bitmap, clean_bitmap_offset, clean_bitmap_len, bs->dsk.bitmap_granularity); } for (auto it = v.begin(); it != v.end(); it++) { // Set internal bitmap bits from small writes if (it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) bitmap_set(new_clean_bitmap, it->offset, it->len, bs->dsk.bitmap_granularity); } // Copy latest external bitmap/attributes { void *dyn_ptr = bs->alloc_dyn_data ? (uint8_t*)dirty_end->second.dyn_data+sizeof(int) : (uint8_t*)&dirty_end->second.dyn_data; memcpy(new_clean_bitmap + bs->dsk.clean_entry_bitmap_size, dyn_ptr, bs->dsk.clean_entry_bitmap_size); } // Copy initial (big_write) data checksums if (bs->dsk.csum_block_size && clean_init_bitmap) { uint8_t *new_clean_data_csum = new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size; // big_write partial checksums are calculated from a padded csum_block_size, we can just copy them memset(new_clean_data_csum, 0, bs->dsk.data_block_size / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF)); uint64_t dyn_size = bs->dsk.dirty_dyn_size(clean_bitmap_offset, clean_bitmap_len); uint32_t *csums = (uint32_t*)(clean_init_dyn_ptr + bs->dsk.clean_entry_bitmap_size); memcpy(new_clean_data_csum + clean_bitmap_offset / bs->dsk.csum_block_size * (bs->dsk.data_csum_type & 0xFF), csums, dyn_size - bs->dsk.clean_entry_bitmap_size); } // Calculate or copy small_write checksums uint32_t *new_data_csums = (uint32_t*)(new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size); if (bs->dsk.csum_block_size) calc_block_checksums(new_data_csums, false); // Update entry new_entry->oid = cur.oid; new_entry->version = cur.version; if (!bs->inmemory_meta) { auto inmem_bmp = (uint8_t*)bs->clean_bitmaps + (clean_loc >> bs->dsk.block_order)*2*bs->dsk.clean_entry_bitmap_size; memcpy(inmem_bmp, new_clean_bitmap, 2*bs->dsk.clean_entry_bitmap_size); } if (bs->dsk.meta_format >= BLOCKSTORE_META_FORMAT_V2) { // Calculate metadata entry checksum uint32_t *new_entry_csum = (uint32_t*)((uint8_t*)new_entry + bs->dsk.clean_entry_size - 4); *new_entry_csum = crc32c(0, new_entry, bs->dsk.clean_entry_size - 4); } } } void journal_flusher_co::free_buffers() { if (!bs->inmemory_meta) { meta_new.it->second.usage_count--; if (meta_new.it->second.usage_count == 0) { free(meta_new.it->second.buf); flusher->meta_sectors.erase(meta_new.it); } if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) { meta_old.it->second.usage_count--; if (meta_old.it->second.usage_count == 0) { free(meta_old.it->second.buf); flusher->meta_sectors.erase(meta_old.it); } } } for (auto it = v.begin(); it != v.end(); it++) { // Free it if it's not taken from the journal if (it->buf && (it->copy_flags == COPY_BUF_JOURNAL || (it->copy_flags & COPY_BUF_CSUM_FILL)) && (!bs->journal.inmemory || it->buf < bs->journal.buffer || it->buf >= (uint8_t*)bs->journal.buffer + bs->journal.len)) { free(it->buf); } } v.clear(); } bool journal_flusher_co::write_meta_block(flusher_meta_write_t & meta_block, int wait_base) { if (wait_state == wait_base) goto resume_0; await_sqe(0); data->iov = (struct iovec){ meta_block.buf, (size_t)bs->dsk.meta_block_size }; data->callback = simple_callback_w; my_uring_prep_writev( sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bs->dsk.meta_block_size + meta_block.sector ); wait_count++; return true; } // Punch holes in incomplete checksum blocks bool journal_flusher_co::clear_incomplete_csum_block_bits(int wait_base) { if (wait_state == wait_base) goto resume_0; else if (wait_state == wait_base+1) goto resume_1; else if (wait_state == wait_base+2) goto resume_2; else if (wait_state == wait_base+3) goto resume_3; else if (wait_state == wait_base+4) goto resume_4; else if (wait_state == wait_base+5) goto resume_5; else if (wait_state == wait_base+6) goto resume_6; else if (wait_state == wait_base+7) goto resume_7; cleared_incomplete = false; for (auto it = v.begin(); it != v.end(); it++) { if ((it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) && bitmap_check(new_clean_bitmap, it->offset, it->len, bs->dsk.bitmap_granularity)) { cleared_incomplete = true; break; } } if (cleared_incomplete) { // This modification may only happen in place assert(old_clean_loc == clean_loc); // Wait for data writes and metadata reads resume_0: resume_1: if (!wait_meta_reads(wait_base+0)) return false; resume_2: if (wait_journal_count > 0) { wait_state = wait_base+2; return false; } // Verify data checksums for (i = v.size()-1; i >= 0 && (v[i].copy_flags & COPY_BUF_CSUM_FILL); i--) { // If we encounter bad checksums during flush, we still update the bad block, // but intentionally mangle checksums to avoid hiding the corruption. iovec iov = { .iov_base = v[i].buf, .iov_len = (size_t)v[i].len }; if (!(v[i].copy_flags & COPY_BUF_JOURNAL)) { assert(!(v[i].offset % bs->dsk.csum_block_size)); assert(!(v[i].len % bs->dsk.csum_block_size)); bs->verify_padded_checksums(new_clean_bitmap, new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size, v[i].offset, &iov, 1, [&](uint32_t bad_block, uint32_t calc_csum, uint32_t stored_csum) { printf("Checksum mismatch in object %jx:%jx v%ju in data area at offset 0x%jx+0x%x: got %08x, expected %08x\n", cur.oid.inode, cur.oid.stripe, old_clean_ver, old_clean_loc, bad_block, calc_csum, stored_csum); for (uint32_t j = 0; j < bs->dsk.csum_block_size; j += bs->dsk.bitmap_granularity) { // Simplest method of mangling: flip one byte in every sector ((uint8_t*)v[i].buf)[j+bad_block-v[i].offset] ^= 0xff; } }); } else { bs->verify_journal_checksums(v[i].csum_buf, v[i].offset, &iov, 1, [&](uint32_t bad_block, uint32_t calc_csum, uint32_t stored_csum) { printf("Checksum mismatch in object %jx:%jx v%ju in journal at offset 0x%jx+0x%x (block offset 0x%jx): got %08x, expected %08x\n", cur.oid.inode, cur.oid.stripe, old_clean_ver, v[i].disk_offset, bad_block, v[i].offset, calc_csum, stored_csum); bad_block += (v[i].offset/bs->dsk.csum_block_size) * bs->dsk.csum_block_size; uint32_t bad_block_end = bad_block + bs->dsk.csum_block_size + (v[i].offset/bs->dsk.csum_block_size) * bs->dsk.csum_block_size; if (bad_block < v[i].offset) bad_block = v[i].offset; if (bad_block_end > v[i].offset+v[i].len) bad_block_end = v[i].offset+v[i].len; bad_block -= v[i].offset; bad_block_end -= v[i].offset; for (uint32_t j = bad_block; j < bad_block_end; j += bs->dsk.bitmap_granularity) { // Simplest method of mangling: flip one byte in every sector ((uint8_t*)v[i].buf)[j] ^= 0xff; } }); } } { clean_disk_entry *new_entry = (clean_disk_entry*)((uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size); if (new_entry->oid != cur.oid) { printf( "Fatal error (metadata corruption or bug): tried to make holes in %ju (%jx:%jx v%ju) with %jx:%jx v%ju\n", clean_loc >> bs->dsk.block_order, new_entry->oid.inode, new_entry->oid.stripe, new_entry->version, cur.oid.inode, cur.oid.stripe, cur.version ); } assert(new_entry->oid == cur.oid); // Actually clear bits for (auto it = v.begin(); it != v.end(); it++) { if (it->copy_flags == COPY_BUF_JOURNAL || it->copy_flags == (COPY_BUF_JOURNAL|COPY_BUF_COALESCED)) bitmap_clear(new_clean_bitmap, it->offset, it->len, bs->dsk.bitmap_granularity); } // Calculate block checksums with new holes uint32_t *new_data_csums = (uint32_t*)(new_clean_bitmap + 2*bs->dsk.clean_entry_bitmap_size); calc_block_checksums(new_data_csums, true); if (!bs->inmemory_meta) { auto inmem_bmp = (uint8_t*)bs->clean_bitmaps + (clean_loc >> bs->dsk.block_order)*2*bs->dsk.clean_entry_bitmap_size; memcpy(inmem_bmp, new_clean_bitmap, 2*bs->dsk.clean_entry_bitmap_size); } if (bs->dsk.meta_format >= BLOCKSTORE_META_FORMAT_V2) { // calculate metadata entry checksum uint32_t *new_entry_csum = (uint32_t*)((uint8_t*)new_entry + bs->dsk.clean_entry_size - 4); *new_entry_csum = crc32c(0, new_entry, bs->dsk.clean_entry_size - 4); } } // Write and fsync the modified metadata entry resume_3: if (!write_meta_block(meta_new, wait_base+3)) return false; resume_4: if (wait_count > 0) { wait_state = wait_base+4; return false; } resume_5: resume_6: resume_7: if (!fsync_batch(true, wait_base+5)) return false; } return true; } void journal_flusher_co::calc_block_checksums(uint32_t *new_data_csums, bool skip_overwrites) { uint64_t block_offset = 0; uint32_t block_done = 0; uint32_t block_csum = 0; for (auto it = v.begin(); it != v.end(); it++) { if (it->copy_flags & COPY_BUF_CSUM_FILL) break; if (block_done == 0) { // `v` should contain aligned items, possibly split into pieces assert(!(it->offset % bs->dsk.csum_block_size)); block_offset = it->offset; } bool zero = (it->copy_flags & COPY_BUF_ZERO) || (skip_overwrites && (it->copy_flags & COPY_BUF_JOURNAL)); auto len = it->len; while ((block_done+len) >= bs->dsk.csum_block_size) { if (!skip_overwrites && !block_done && it->csum_buf) { // We may take existing checksums if an overwrite contains a full block auto full_csum_offset = (it->offset+it->len-len+bs->dsk.csum_block_size-1) / bs->dsk.csum_block_size - it->offset / bs->dsk.csum_block_size; auto full_csum_count = len/bs->dsk.csum_block_size; memcpy(new_data_csums + block_offset/bs->dsk.csum_block_size, it->csum_buf + full_csum_offset*4, full_csum_count*4); len -= full_csum_count*bs->dsk.csum_block_size; block_offset += full_csum_count*bs->dsk.csum_block_size; } else { auto cur_len = bs->dsk.csum_block_size-block_done; block_csum = zero ? crc32c_pad(block_csum, NULL, 0, cur_len, 0) : crc32c(block_csum, (uint8_t*)it->buf+(it->len-len), cur_len); new_data_csums[block_offset / bs->dsk.csum_block_size] = block_csum; block_csum = 0; block_done = 0; block_offset += bs->dsk.csum_block_size; len -= cur_len; } } if (len > 0) { block_csum = zero ? crc32c_pad(block_csum, NULL, 0, len, 0) : crc32c(block_csum, (uint8_t*)it->buf+(it->len-len), len); block_done += len; } } // `v` should contain aligned items, possibly split into pieces assert(!block_done); } void journal_flusher_co::scan_dirty() { dirty_it = dirty_start = dirty_end; v.clear(); copy_count = 0; clean_loc = UINT64_MAX; clean_ver = 0; has_delete = false; has_writes = false; skip_copy = false; clean_init_bitmap = false; fill_incomplete = false; read_to_fill_incomplete = 0; while (1) { if (!IS_STABLE(dirty_it->second.state)) { char err[1024]; snprintf( err, 1024, "BUG: Unexpected dirty_entry %jx:%jx v%ju unstable state during flush: 0x%x", dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state ); throw std::runtime_error(err); } else if (IS_JOURNAL(dirty_it->second.state) && !skip_copy) { // Partial dirty overwrite has_writes = true; if (dirty_it->second.len != 0) { uint64_t blk_begin = 0, blk_end = 0; uint8_t *blk_buf = NULL; bs->find_holes( v, dirty_it->second.offset, dirty_it->second.offset + dirty_it->second.len, [&](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end) { if (alloc) return 0; copy_count++; uint64_t submit_offset = dirty_it->second.location + cur_start - dirty_it->second.offset; auto it = v.insert(v.begin()+pos, (copy_buffer_t){ .copy_flags = COPY_BUF_JOURNAL, .offset = cur_start, .len = cur_end-cur_start, .disk_offset = submit_offset, }); if (bs->journal.inmemory) { // Take it from memory, don't copy it it->buf = (uint8_t*)bs->journal.buffer + submit_offset; } if (bs->dsk.csum_block_size) { // FIXME Remove this > sizeof(void*) inline perversion from everywhere. // I think it doesn't matter but I couldn't stop myself from implementing it :) uint8_t* dyn_from = (uint8_t*)(bs->alloc_dyn_data ? (uint8_t*)dirty_it->second.dyn_data+sizeof(int) : (uint8_t*)&dirty_it->second.dyn_data) + bs->dsk.clean_entry_bitmap_size; it->csum_buf = dyn_from + (it->offset/bs->dsk.csum_block_size - dirty_it->second.offset/bs->dsk.csum_block_size) * (bs->dsk.data_csum_type & 0xFF); if (cur_start % bs->dsk.csum_block_size || cur_end % bs->dsk.csum_block_size) { // Small write not aligned for checksums. We may have to pad it fill_incomplete = true; if (!bs->journal.inmemory) { bs->pad_journal_read(v, *it, dirty_it->second.offset, dirty_it->second.offset + dirty_it->second.len, dirty_it->second.location, dyn_from, NULL, cur_start, cur_end-cur_start, blk_begin, blk_end, blk_buf); } } } return 0; } ); } } else if (IS_BIG_WRITE(dirty_it->second.state) && !skip_copy) { // There is an unflushed big write. Copy small writes in its position has_writes = true; clean_loc = dirty_it->second.location; clean_ver = dirty_it->first.version; clean_init_bitmap = true; clean_bitmap_offset = dirty_it->second.offset; clean_bitmap_len = dirty_it->second.len; clean_init_dyn_ptr = bs->alloc_dyn_data ? (uint8_t*)dirty_it->second.dyn_data+sizeof(int) : (uint8_t*)&dirty_it->second.dyn_data; skip_copy = true; } else if (IS_DELETE(dirty_it->second.state) && !skip_copy) { // There is an unflushed delete has_delete = true; skip_copy = true; } dirty_start = dirty_it; if (dirty_it == bs->dirty_db.begin()) { break; } dirty_it--; if (dirty_it->first.oid != cur.oid) { break; } } if (fill_incomplete && !clean_init_bitmap) { // Rescan and fill incomplete writes with old data to calculate checksums if (old_clean_loc == UINT64_MAX) { // May happen if the metadata entry is corrupt, but journal isn't // FIXME: Report corrupted object to the upper layer (OSD) printf( "Warning: object %jx:%jx has overwrites, but doesn't have a clean version." " Metadata is likely corrupted. Dropping object from the DB.\n", cur.oid.inode, cur.oid.stripe ); v.clear(); has_writes = false; has_delete = skip_copy = true; copy_count = 0; fill_incomplete = false; read_to_fill_incomplete = 0; return; } uint8_t *bmp_ptr = bs->get_clean_entry_bitmap(old_clean_loc, 0); uint64_t fulfilled = 0; int last = v.size()-1; while (last >= 0 && (v[last].copy_flags & COPY_BUF_CSUM_FILL)) last--; read_to_fill_incomplete = bs->fill_partial_checksum_blocks( v, fulfilled, bmp_ptr, NULL, false, NULL, v[0].offset/bs->dsk.csum_block_size * bs->dsk.csum_block_size, ((v[last].offset+v[last].len-1) / bs->dsk.csum_block_size + 1) * bs->dsk.csum_block_size ); } else if (fill_incomplete && clean_init_bitmap) { // If we actually have partial checksum block overwrites AND a new clean_loc // at the same time then we can't use our fancy checksum block mutation algorithm. // So in this case we'll have to first flush the clean write separately. while (!IS_BIG_WRITE(dirty_end->second.state)) { assert(dirty_end != bs->dirty_db.begin()); dirty_end--; } flusher->enqueue_flush(cur); cur.version = dirty_end->first.version; #ifdef BLOCKSTORE_DEBUG printf("Partial checksum block overwrites found - rewinding flush back to %jx:%jx v%ju\n", cur.oid.inode, cur.oid.stripe, cur.version); #endif v.clear(); copy_count = 0; fill_incomplete = false; read_to_fill_incomplete = 0; } } bool journal_flusher_co::read_dirty(int wait_base) { if (wait_state == wait_base) goto resume_0; else if (wait_state == wait_base+1) goto resume_1; wait_count = wait_journal_count = 0; if (bs->journal.inmemory && !read_to_fill_incomplete) { // Happy path: nothing to read :) return true; } for (i = 1; i <= v.size() && (v[v.size()-i].copy_flags & COPY_BUF_CSUM_FILL); i++) { if (v[v.size()-i].copy_flags & COPY_BUF_JOURNAL) continue; // Read old data from disk to calculate checksums await_sqe(0); auto & vi = v[v.size()-i]; assert(vi.len != 0); vi.buf = memalign_or_die(MEM_ALIGNMENT, vi.len); data->iov = (struct iovec){ vi.buf, (size_t)vi.len }; data->callback = simple_callback_r; my_uring_prep_readv( sqe, bs->dsk.data_fd, &data->iov, 1, bs->dsk.data_offset + old_clean_loc + vi.offset ); wait_count++; bs->find_holes(v, vi.offset, vi.offset+vi.len, [this, buf = (uint8_t*)vi.buf-vi.offset](int pos, bool alloc, uint32_t cur_start, uint32_t cur_end) { if (!alloc) { v.insert(v.begin()+pos, (copy_buffer_t){ .copy_flags = COPY_BUF_DATA, .offset = cur_start, .len = cur_end-cur_start, .buf = buf+cur_start, }); return 1; } return 0; }); } if (!bs->journal.inmemory) { for (i = 0; i < v.size(); i++) { if (v[i].copy_flags == COPY_BUF_JOURNAL || v[i].copy_flags == (COPY_BUF_JOURNAL | COPY_BUF_CSUM_FILL)) { // Read journal data from disk if (!v[i].buf) v[i].buf = memalign_or_die(MEM_ALIGNMENT, v[i].len); await_sqe(1); data->iov = (struct iovec){ v[i].buf, (size_t)v[i].len }; data->callback = simple_callback_rj; my_uring_prep_readv( sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset + v[i].disk_offset ); wait_journal_count++; } } } return true; } bool journal_flusher_co::modify_meta_do_reads(int wait_base) { if (wait_state == wait_base) goto resume_0; else if (wait_state == wait_base+1) goto resume_1; resume_0: if (!modify_meta_read(clean_loc, meta_new, wait_base+0)) return false; new_clean_bitmap = (uint8_t*)meta_new.buf + meta_new.pos*bs->dsk.clean_entry_size + sizeof(clean_disk_entry); if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) { resume_1: if (!modify_meta_read(old_clean_loc, meta_old, wait_base+1)) return false; } else meta_old.submitted = false; return true; } bool journal_flusher_co::wait_meta_reads(int wait_base) { if (wait_state == wait_base) goto resume_0; else if (wait_state == wait_base+1) goto resume_1; resume_0: if (wait_count > 0) { wait_state = wait_base+0; return false; } // Our own reads completed if (meta_new.submitted) { meta_new.it->second.state = META_BLOCK_READ; bs->ringloop->wakeup(); } if (meta_old.submitted) { meta_old.it->second.state = META_BLOCK_READ; bs->ringloop->wakeup(); } resume_1: if (!bs->inmemory_meta && (meta_new.it->second.state == META_BLOCK_UNREAD || (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) && meta_old.it->second.state == META_BLOCK_UNREAD)) { // Metadata block is being read by another coroutine wait_state = wait_base+1; return false; } // All reads completed return true; } bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base) { if (wait_state == wait_base) goto resume_0; // We must check if the same sector is already in memory if we don't keep all metadata in memory all the time. // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, // so I'll avoid it as long as I can. wr.submitted = false; wr.sector = ((meta_loc >> bs->dsk.block_order) / (bs->dsk.meta_block_size / bs->dsk.clean_entry_size)) * bs->dsk.meta_block_size; wr.pos = ((meta_loc >> bs->dsk.block_order) % (bs->dsk.meta_block_size / bs->dsk.clean_entry_size)); if (bs->inmemory_meta) { wr.buf = (uint8_t*)bs->metadata_buffer + wr.sector; return true; } wr.it = flusher->meta_sectors.find(wr.sector); if (wr.it == flusher->meta_sectors.end()) { // Not in memory yet, read it wr.buf = memalign_or_die(MEM_ALIGNMENT, bs->dsk.meta_block_size); wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){ .offset = wr.sector, .len = bs->dsk.meta_block_size, .state = META_BLOCK_UNREAD, // 0 = not read yet .buf = wr.buf, .usage_count = 1, }).first; await_sqe(0); data->iov = (struct iovec){ wr.it->second.buf, (size_t)bs->dsk.meta_block_size }; data->callback = simple_callback_r; wr.submitted = true; my_uring_prep_readv( sqe, bs->dsk.meta_fd, &data->iov, 1, bs->dsk.meta_offset + bs->dsk.meta_block_size + wr.sector ); wait_count++; } else { wr.buf = wr.it->second.buf; wr.it->second.usage_count++; } return true; } void journal_flusher_co::update_clean_db() { auto & clean_db = bs->clean_db_shard(cur.oid); if (has_delete) { clean_db.erase(cur.oid); } else { clean_db[cur.oid] = { .version = cur.version, .location = clean_loc, }; } } void journal_flusher_co::free_data_blocks() { if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) { auto uo_it = bs->used_clean_objects.find(old_clean_loc); bool used = uo_it != bs->used_clean_objects.end(); #ifdef BLOCKSTORE_DEBUG printf("%s block %ju from %jx:%jx v%ju (new location is %ju)\n", used ? "Postpone free" : "Free", old_clean_loc >> bs->dsk.block_order, cur.oid.inode, cur.oid.stripe, cur.version, clean_loc >> bs->dsk.block_order); #endif if (used) uo_it->second.was_freed = true; else bs->data_alloc->set(old_clean_loc >> bs->dsk.block_order, false); } if (has_delete) { assert(clean_loc == old_clean_loc); auto uo_it = bs->used_clean_objects.find(old_clean_loc); bool used = uo_it != bs->used_clean_objects.end(); #ifdef BLOCKSTORE_DEBUG printf("%s block %ju from %jx:%jx v%ju (delete)\n", used ? "Postpone free" : "Free", old_clean_loc >> bs->dsk.block_order, cur.oid.inode, cur.oid.stripe, cur.version); #endif if (used) uo_it->second.was_freed = true; else bs->data_alloc->set(old_clean_loc >> bs->dsk.block_order, false); } } bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base) { if (wait_state == wait_base) goto resume_0; else if (wait_state == wait_base+1) goto resume_1; else if (wait_state == wait_base+2) goto resume_2; if (!(fsync_meta ? bs->disable_meta_fsync : bs->disable_data_fsync)) { cur_sync = flusher->syncs.end(); while (cur_sync != flusher->syncs.begin()) { cur_sync--; if (cur_sync->fsync_meta == fsync_meta && cur_sync->state == 0) { goto sync_found; } } cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .fsync_meta = fsync_meta, .ready_count = 0, .state = 0, }); sync_found: cur_sync->ready_count++; flusher->syncing_flushers++; resume_1: if (!cur_sync->state) { if (flusher->syncing_flushers >= flusher->active_flushers || !flusher->flush_queue.size()) { // Sync batch is ready. Do it. await_sqe(0); data->iov = { 0 }; data->callback = simple_callback_w; my_uring_prep_fsync(sqe, fsync_meta ? bs->dsk.meta_fd : bs->dsk.data_fd, IORING_FSYNC_DATASYNC); cur_sync->state = 1; wait_count++; resume_2: if (wait_count > 0) { wait_state = wait_base+2; return false; } // Sync completed. All previous coroutines waiting for it must be resumed cur_sync->state = 2; bs->ringloop->wakeup(); } else { // Wait until someone else sends and completes a sync. wait_state = wait_base+1; return false; } } flusher->syncing_flushers--; cur_sync->ready_count--; if (cur_sync->ready_count == 0) { flusher->syncs.erase(cur_sync); } } return true; } bool journal_flusher_co::trim_journal(int wait_base) { if (wait_state == wait_base) goto resume_0; else if (wait_state == wait_base+1) goto resume_1; else if (wait_state == wait_base+2) goto resume_2; else if (wait_state == wait_base+3) goto resume_3; else if (wait_state == wait_base+4) goto resume_4; new_trim_pos = bs->journal.get_trim_pos(); if (new_trim_pos != bs->journal.used_start) { resume_0: // Wait for other coroutines trimming the journal, if any if (flusher->trimming) { wait_state = wait_base+0; return false; } flusher->trimming = true; // Recheck the position with the "lock" taken new_trim_pos = bs->journal.get_trim_pos(); if (new_trim_pos != bs->journal.used_start) { // First update journal "superblock" and only then update in memory await_sqe(1); *((journal_entry_start*)flusher->journal_superblock) = { .crc32 = 0, .magic = JOURNAL_MAGIC, .type = JE_START, .size = ((!bs->dsk.data_csum_type && ((journal_entry_start*)flusher->journal_superblock)->version == JOURNAL_VERSION_V1) ? (uint32_t)JE_START_V1_SIZE : (uint32_t)JE_START_V2_SIZE), .reserved = 0, .journal_start = new_trim_pos, .version = (uint64_t)(!bs->dsk.data_csum_type && ((journal_entry_start*)flusher->journal_superblock)->version == JOURNAL_VERSION_V1 ? JOURNAL_VERSION_V1 : JOURNAL_VERSION_V2), .data_csum_type = bs->dsk.data_csum_type, .csum_block_size = bs->dsk.csum_block_size, }; ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); data->iov = (struct iovec){ flusher->journal_superblock, (size_t)bs->dsk.journal_block_size }; data->callback = simple_callback_w; my_uring_prep_writev(sqe, bs->dsk.journal_fd, &data->iov, 1, bs->journal.offset); wait_count++; resume_2: if (wait_count > 0) { wait_state = wait_base+2; return false; } if (!bs->disable_journal_fsync) { await_sqe(3); my_uring_prep_fsync(sqe, bs->dsk.journal_fd, IORING_FSYNC_DATASYNC); data->iov = { 0 }; data->callback = simple_callback_w; wait_count++; resume_4: if (wait_count > 0) { wait_state = wait_base+4; return false; } } if (new_trim_pos < bs->journal.used_start ? (bs->journal.dirty_start >= bs->journal.used_start || bs->journal.dirty_start < new_trim_pos) : (bs->journal.dirty_start >= bs->journal.used_start && bs->journal.dirty_start < new_trim_pos)) { bs->journal.dirty_start = new_trim_pos; } bs->journal.used_start = new_trim_pos; #ifdef BLOCKSTORE_DEBUG printf("Journal trimmed to %08jx (next_free=%08jx dirty_start=%08jx)\n", bs->journal.used_start, bs->journal.next_free, bs->journal.dirty_start); #endif if (bs->journal.flush_journal && !flusher->flush_queue.size()) { assert(bs->journal.used_start == bs->journal.next_free); printf("Journal flushed\n"); exit(0); } } flusher->journal_trim_counter = 0; flusher->trimming = false; } return true; }