From f4d06ba1029a34c383f9d36268bbee6d3009675b Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 2 Dec 2019 02:11:23 +0300 Subject: [PATCH] OP_DELETE flushing --- blockstore_flush.cpp | 448 ++++++++++++++++++++++++++---------------- blockstore_flush.h | 18 +- blockstore_init.cpp | 8 +- blockstore_init.h | 2 +- blockstore_stable.cpp | 11 +- 5 files changed, 301 insertions(+), 186 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index aab441b6..d4973097 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -105,11 +105,11 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) if (!sqe)\ {\ wait_state = label;\ - return;\ + return false;\ }\ data = ((ring_data_t*)sqe->user_data); -void journal_flusher_co::loop() +bool journal_flusher_co::loop() { // This is much better than implementing the whole function as an FSM // Maybe I should consider a coroutine library like https://github.com/hnes/libaco ... @@ -139,11 +139,15 @@ void journal_flusher_co::loop() goto resume_12; else if (wait_state == 13) goto resume_13; + else if (wait_state == 14) + goto resume_14; + else if (wait_state == 15) + goto resume_15; resume_0: if (!flusher->flush_queue.size()) { wait_state = 0; - return; + return true; } cur.oid = flusher->flush_queue.front(); cur.version = flusher->flush_versions[cur.oid]; @@ -178,6 +182,7 @@ resume_0: wait_count = 0; copy_count = 0; clean_loc = UINT64_MAX; + has_delete = false; skip_copy = false; while (1) { @@ -221,13 +226,16 @@ resume_0: } } } - else if (dirty_it->second.state == ST_D_STABLE) + else if (dirty_it->second.state == ST_D_STABLE && !skip_copy) { // There is an unflushed big write. Copy small writes in its position - if (!skip_copy) - { - clean_loc = dirty_it->second.location; - } + clean_loc = dirty_it->second.location; + skip_copy = true; + } + else if (dirty_it->second.state == ST_DEL_STABLE && !skip_copy) + { + // There is an unflushed delete + has_delete = true; skip_copy = true; } else if (!IS_STABLE(dirty_it->second.state)) @@ -249,7 +257,7 @@ resume_0: break; } } - if (copy_count == 0 && clean_loc == UINT64_MAX) + if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete) { // Nothing to flush flusher->active_flushers--; @@ -267,10 +275,11 @@ resume_0: { auto clean_it = bs->clean_db.find(cur.oid); old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX); + old_clean_ver = (clean_it != bs->clean_db.end() ? clean_it->second.version : 0); } if (clean_loc == UINT64_MAX) { - if (old_clean_loc == UINT64_MAX) + if (copy_count > 0 && has_delete || old_clean_loc == UINT64_MAX) { // Object not present at all. This is a bug. char err[1024]; @@ -283,50 +292,41 @@ resume_0: else clean_loc = old_clean_loc; } - // Also we need to submit the metadata read. We do a read-modify-write for every operation. - // But we must check if the same sector is already in memory. - // Another option is to keep all raw metadata in memory all the time. FIXME: Maybe add this mode. - // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, - // so I'll avoid it as long as I can. - meta_sector = ((clean_loc >> bs->block_order) / (512 / sizeof(clean_disk_entry))) * 512; - meta_pos = ((clean_loc >> bs->block_order) % (512 / sizeof(clean_disk_entry))); - meta_it = flusher->meta_sectors.find(meta_sector); - if (meta_it == flusher->meta_sectors.end()) + else + has_delete = false; + // Also we need to submit metadata read(s). We do read-modify-write cycle(s) for every operation. + resume_2: + if (!modify_meta_read(clean_loc, meta_new, 2)) { - // Not in memory yet, read it - meta_it = flusher->meta_sectors.emplace(meta_sector, (meta_sector_t){ - .offset = meta_sector, - .len = 512, - .state = 0, // 0 = not read yet - .buf = memalign(512, 512), - .usage_count = 1, - }).first; - await_sqe(2); - data->iov = (struct iovec){ meta_it->second.buf, 512 }; - data->callback = [this](ring_data_t* data) + wait_state += 2; + return false; + } + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { + resume_14: + if (!modify_meta_read(old_clean_loc, meta_old, 14)) { - if (data->res != data->iov.iov_len) - { - throw std::runtime_error( - "metadata read operation failed ("+std::to_string(data->res)+" != "+std::to_string(data->iov.iov_len)+ - "). can't continue, sorry :-(" - ); - } - meta_it->second.state = 1; - wait_count--; - }; - my_uring_prep_readv( - sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector - ); - wait_count++; + wait_state += 14; + return false; + } } else - meta_it->second.usage_count++; + meta_old.submitted = false; resume_3: if (wait_count > 0) { wait_state = 3; - return; + return false; + } + if (meta_new.submitted) + { + meta_new.it->second.state = 1; + bs->ringloop->wakeup(bs->ring_consumer); + } + if (meta_old.submitted) + { + meta_old.it->second.state = 1; + bs->ringloop->wakeup(bs->ring_consumer); } // Reads completed, submit writes for (it = v.begin(); it != v.end(); it++) @@ -340,167 +340,110 @@ resume_0: wait_count++; } resume_5: - // And a metadata write, but only after data writes complete - if (meta_it->second.state == 0 || wait_count > 0) + // And metadata writes, but only after data writes complete + if (meta_new.it->second.state == 0 || wait_count > 0) { // metadata sector is still being read or data is still being written, wait for it wait_state = 5; - return; + return false; } - ((clean_disk_entry*)meta_it->second.buf)[meta_pos] = { - .oid = cur.oid, - .version = cur.version, - }; + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { + if (meta_old.it->second.state == 0) + { + wait_state = 5; + return false; + } + ((clean_disk_entry*)meta_old.it->second.buf)[meta_old.pos] = { 0 }; + await_sqe(15); + data->iov = (struct iovec){ meta_old.it->second.buf, 512 }; + data->callback = simple_callback_w; + my_uring_prep_writev( + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_old.sector + ); + wait_count++; + } + ((clean_disk_entry*)meta_new.it->second.buf)[meta_new.pos] = has_delete + ? (clean_disk_entry){ 0 } + : (clean_disk_entry){ + .oid = cur.oid, + .version = cur.version, + }; await_sqe(6); - data->iov = (struct iovec){ meta_it->second.buf, 512 }; + data->iov = (struct iovec){ meta_new.it->second.buf, 512 }; data->callback = simple_callback_w; my_uring_prep_writev( - sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_sector + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + meta_new.sector ); wait_count++; resume_7: if (wait_count > 0) { wait_state = 7; - return; + return false; } // Done, free all buffers - meta_it->second.usage_count--; - if (meta_it->second.usage_count == 0) + meta_new.it->second.usage_count--; + if (meta_new.it->second.usage_count == 0) { - free(meta_it->second.buf); - flusher->meta_sectors.erase(meta_it); + free(meta_new.it->second.buf); + flusher->meta_sectors.erase(meta_new.it); + } + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { + meta_old.it->second.usage_count--; + if (meta_old.it->second.usage_count == 0) + { + free(meta_old.it->second.buf); + flusher->meta_sectors.erase(meta_old.it); + } } for (it = v.begin(); it != v.end(); it++) { free(it->buf); } v.clear(); + // And sync everything (in batches - not per each operation!) flusher->active_until_sync--; - if (!bs->disable_fsync) + resume_8: + resume_9: + resume_10: + resume_11: + if (!fsync_batch()) { - // And sync everything (in batches - not per each operation!) - cur_sync = flusher->syncs.end(); - if (cur_sync == flusher->syncs.begin() || cur_sync->state == 1) - cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 }); - else - cur_sync--; - cur_sync->ready_count++; - if (cur_sync->ready_count >= flusher->sync_threshold || - !flusher->active_until_sync && (!flusher->flush_queue.size() || flusher->active_flushers >= flusher->flusher_count)) - { - // Sync batch is ready. Do it. - await_sqe(9); - data->callback = simple_callback_w; - data->iov = { 0 }; - my_uring_prep_fsync(sqe, bs->data_fd, IORING_FSYNC_DATASYNC); - wait_count++; - if (bs->meta_fd != bs->data_fd) - { - await_sqe(10); - data->callback = simple_callback_w; - data->iov = { 0 }; - my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC); - wait_count++; - } - resume_11: - if (wait_count > 0) - { - wait_state = 11; - return; - } - // Sync completed. All previous coroutines waiting for it must be resumed - cur_sync->state = 1; - bs->ringloop->wakeup(bs->ring_consumer); - } - // Wait until someone else sends and completes a sync. - resume_8: - if (!cur_sync->state) - { - wait_state = 8; - return; - } - cur_sync->ready_count--; - if (cur_sync->ready_count == 0) - { - flusher->syncs.erase(cur_sync); - } + return false; } // Update clean_db and dirty_db, free old data locations - if (old_clean_loc != clean_loc) - { -#ifdef BLOCKSTORE_DEBUG - printf("Free block %lu\n", old_clean_loc >> bs->block_order); -#endif - bs->data_alloc->set(old_clean_loc >> bs->block_order, false); - } - bs->clean_db[cur.oid] = { - .version = cur.version, - .location = clean_loc, - }; - dirty_it = dirty_end; - while (1) - { - if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) - { -#ifdef BLOCKSTORE_DEBUG - printf("Free block %lu\n", dirty_it->second.location >> bs->block_order); -#endif - bs->data_alloc->set(dirty_it->second.location >> bs->block_order, false); - } -#ifdef BLOCKSTORE_DEBUG - printf("remove usage of journal offset %lu by %lu:%lu v%lu\n", dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version); -#endif - int used = --bs->journal.used_sectors[dirty_it->second.journal_sector]; - if (used == 0) - { - bs->journal.used_sectors.erase(dirty_it->second.journal_sector); - } - if (dirty_it == bs->dirty_db.begin()) - { - break; - } - dirty_it--; - if (dirty_it->first.oid != cur.oid) - { - break; - } - } - // Then, basically, remove everything up to the current version from dirty_db... - if (dirty_it->first.oid != cur.oid) - dirty_it++; - bs->dirty_db.erase(dirty_it, std::next(dirty_end)); + update_clean_db(); // Clear unused part of the journal every flushes if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval)) { flusher->journal_trim_counter = 0; - if (!bs->journal.trim()) + if (bs->journal.trim()) { - goto do_not_trim; - } - // Update journal "superblock" - await_sqe(12); - data->callback = simple_callback_w; - *((journal_entry_start*)flusher->journal_superblock) = { - .crc32 = 0, - .magic = JOURNAL_MAGIC, - .type = JE_START, - .size = sizeof(journal_entry_start), - .reserved = 0, - .journal_start = bs->journal.used_start, - }; - ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); - data->iov = (struct iovec){ flusher->journal_superblock, 512 }; - my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); - wait_count++; - resume_13: - if (wait_count > 0) - { - wait_state = 13; - return; + // Update journal "superblock" + await_sqe(12); + data->callback = simple_callback_w; + *((journal_entry_start*)flusher->journal_superblock) = { + .crc32 = 0, + .magic = JOURNAL_MAGIC, + .type = JE_START, + .size = sizeof(journal_entry_start), + .reserved = 0, + .journal_start = bs->journal.used_start, + }; + ((journal_entry_start*)flusher->journal_superblock)->crc32 = je_crc32((journal_entry*)flusher->journal_superblock); + data->iov = (struct iovec){ flusher->journal_superblock, 512 }; + my_uring_prep_writev(sqe, bs->journal.fd, &data->iov, 1, bs->journal.offset); + wait_count++; + resume_13: + if (wait_count > 0) + { + wait_state = 13; + return false; + } } } - do_not_trim: // All done #ifdef BLOCKSTORE_DEBUG printf("Flushed %lu:%lu v%lu\n", cur.oid.inode, cur.oid.stripe, cur.version); @@ -515,4 +458,161 @@ resume_0: flusher->sync_to_repeat.erase(repeat_it); goto resume_0; } + return true; +} + +bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base) +{ + if (wait_state == wait_base) + goto resume_0; + // But we must check if the same sector is already in memory. + // Another option is to keep all raw metadata in memory all the time. FIXME: Maybe add this mode. + // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, + // so I'll avoid it as long as I can. + wr.sector = ((meta_loc >> bs->block_order) / (512 / sizeof(clean_disk_entry))) * 512; + wr.pos = ((meta_loc >> bs->block_order) % (512 / sizeof(clean_disk_entry))); + wr.it = flusher->meta_sectors.find(wr.sector); + if (wr.it == flusher->meta_sectors.end()) + { + // Not in memory yet, read it + wr.it = flusher->meta_sectors.emplace(wr.sector, (meta_sector_t){ + .offset = wr.sector, + .len = 512, + .state = 0, // 0 = not read yet + .buf = memalign(512, 512), + .usage_count = 1, + }).first; + await_sqe(0); + data->iov = (struct iovec){ wr.it->second.buf, 512 }; + data->callback = simple_callback_r; + wr.submitted = true; + my_uring_prep_readv( + sqe, bs->meta_fd, &data->iov, 1, bs->meta_offset + wr.sector + ); + wait_count++; + } + else + { + wr.submitted = false; + wr.it->second.usage_count++; + } + return true; +} + +void journal_flusher_co::update_clean_db() +{ + if (old_clean_loc != UINT64_MAX && old_clean_loc != clean_loc) + { +#ifdef BLOCKSTORE_DEBUG + printf("Free block %lu\n", old_clean_loc >> bs->block_order); +#endif + bs->data_alloc->set(old_clean_loc >> bs->block_order, false); + } + if (has_delete) + { + auto clean_it = bs->clean_db.find(cur.oid); + bs->clean_db.erase(clean_it); + bs->data_alloc->set(clean_loc >> bs->block_order, false); + clean_loc = UINT64_MAX; + } + else + { + bs->clean_db[cur.oid] = { + .version = cur.version, + .location = clean_loc, + }; + } + dirty_it = dirty_end; + while (1) + { + if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) + { +#ifdef BLOCKSTORE_DEBUG + printf("Free block %lu\n", dirty_it->second.location >> bs->block_order); +#endif + bs->data_alloc->set(dirty_it->second.location >> bs->block_order, false); + } +#ifdef BLOCKSTORE_DEBUG + printf("remove usage of journal offset %lu by %lu:%lu v%lu\n", dirty_it->second.journal_sector, dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version); +#endif + int used = --bs->journal.used_sectors[dirty_it->second.journal_sector]; + if (used == 0) + { + bs->journal.used_sectors.erase(dirty_it->second.journal_sector); + } + if (dirty_it == bs->dirty_db.begin()) + { + break; + } + dirty_it--; + if (dirty_it->first.oid != cur.oid) + { + break; + } + } + // Then, basically, remove everything up to the current version from dirty_db... + if (dirty_it->first.oid != cur.oid) + dirty_it++; + bs->dirty_db.erase(dirty_it, std::next(dirty_end)); +} + +bool journal_flusher_co::fsync_batch() +{ + if (wait_state == 8) + goto resume_8; + else if (wait_state == 9) + goto resume_9; + else if (wait_state == 10) + goto resume_10; + else if (wait_state == 11) + goto resume_11; + if (!bs->disable_fsync) + { + cur_sync = flusher->syncs.end(); + if (cur_sync == flusher->syncs.begin() || cur_sync->state == 1) + cur_sync = flusher->syncs.emplace(flusher->syncs.end(), (flusher_sync_t){ .ready_count = 0, .state = 0 }); + else + cur_sync--; + cur_sync->ready_count++; + if (cur_sync->ready_count >= flusher->sync_threshold || + !flusher->active_until_sync && (!flusher->flush_queue.size() || flusher->active_flushers >= flusher->flusher_count)) + { + // Sync batch is ready. Do it. + await_sqe(9); + data->callback = simple_callback_w; + data->iov = { 0 }; + my_uring_prep_fsync(sqe, bs->data_fd, IORING_FSYNC_DATASYNC); + wait_count++; + if (bs->meta_fd != bs->data_fd) + { + await_sqe(10); + data->callback = simple_callback_w; + data->iov = { 0 }; + my_uring_prep_fsync(sqe, bs->meta_fd, IORING_FSYNC_DATASYNC); + wait_count++; + } + resume_11: + if (wait_count > 0) + { + wait_state = 11; + return false; + } + // Sync completed. All previous coroutines waiting for it must be resumed + cur_sync->state = 1; + bs->ringloop->wakeup(bs->ring_consumer); + } + // Wait until someone else sends and completes a sync. + resume_8: + if (!cur_sync->state) + { + wait_state = 8; + return false; + } + cur_sync->ready_count--; + if (cur_sync->ready_count == 0) + { + flusher->syncs.erase(cur_sync); + } + } + return true; } diff --git a/blockstore_flush.h b/blockstore_flush.h index 5393717a..e1432a0c 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -18,6 +18,13 @@ struct flusher_sync_t int state; }; +struct flusher_meta_write_t +{ + uint64_t sector, pos; + bool submitted; + std::map::iterator it; +}; + class journal_flusher_t; // Journal flusher coroutine @@ -28,21 +35,24 @@ class journal_flusher_co int wait_state, wait_count; struct io_uring_sqe *sqe; struct ring_data_t *data; - bool skip_copy; + bool skip_copy, has_delete; obj_ver_id cur; std::map::iterator dirty_it, dirty_start, dirty_end; std::vector v; std::vector::iterator it; int copy_count; - uint64_t offset, len, submit_offset, submit_len, clean_loc, old_clean_loc, meta_sector, meta_pos; - std::map::iterator meta_it; + uint64_t offset, len, submit_offset, submit_len, clean_loc, old_clean_loc, old_clean_ver; + flusher_meta_write_t meta_old, meta_new; std::map::iterator repeat_it; std::function simple_callback_r, simple_callback_w; std::list::iterator cur_sync; friend class journal_flusher_t; + bool modify_meta_read(uint64_t meta_loc, flusher_meta_write_t &wr, int wait_base); + void update_clean_db(); + bool fsync_batch(); public: journal_flusher_co(); - void loop(); + bool loop(); }; // Journal flusher itself diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 0f2cd806..ca7bf1d9 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -56,7 +56,7 @@ int blockstore_init_meta::loop() } if (prev_done) { - int count = 512 / sizeof(clean_disk_entry); + unsigned count = 512 / sizeof(clean_disk_entry); for (int sector = 0; sector < done_len; sector += 512) { clean_disk_entry *entries = (clean_disk_entry*)(metadata_buffer + (prev_done == 2 ? bs->metadata_buf_size : 0) + sector); @@ -79,7 +79,7 @@ int blockstore_init_meta::loop() return 0; } -void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int count, int block_order) +void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, unsigned count, int block_order) { for (unsigned i = 0; i < count; i++) { @@ -106,10 +106,12 @@ void blockstore_init_meta::handle_entries(struct clean_disk_entry* entries, int .location = (done_cnt+i) << block_order, }; } -#ifdef BLOCKSTORE_DEBUG else + { +#ifdef BLOCKSTORE_DEBUG printf("Old clean entry %lu: %lu:%lu v%lu\n", done_cnt+i, entries[i].oid.inode, entries[i].oid.stripe, entries[i].version); #endif + } } } } diff --git a/blockstore_init.h b/blockstore_init.h index 5d213889..e0666d24 100644 --- a/blockstore_init.h +++ b/blockstore_init.h @@ -11,7 +11,7 @@ class blockstore_init_meta uint64_t entries_loaded = 0; struct io_uring_sqe *sqe; struct ring_data_t *data; - void handle_entries(struct clean_disk_entry* entries, int count, int block_order); + void handle_entries(struct clean_disk_entry* entries, unsigned count, int block_order); void handle_event(ring_data_t *data); public: blockstore_init_meta(blockstore *bs); diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 8fe9eb44..7d53830f 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -22,10 +22,13 @@ // Stabilize delete: // 1) Remove metadata entry and sync it // 2) Remove dirty_db entry and clear previous journal entries -// Note that it will lead to problems in a degraded cluster, because deleting 2 of 3 replicas -// and restarting the last replica will then result in extra "missing" objects. To solve that -// we need to store the "tombstones" of deleted objects. We can't do that with current simple -// metadata storage so we'll skip TRIM implementation for now. +// We have 2 problems here: +// - In the cluster environment, we must store the "tombstones" of deleted objects until +// all replicas (not just quorum) agrees about their deletion. That is, "stabilize" is +// not possible for deletes in degraded placement groups +// - With simple "fixed" metadata tables we can't just clear the metadata entry of the latest +// object version. We must clear all previous entries, too. +// FIXME Fix both problems - probably, by switching from "fixed" metadata tables to "dynamic" // AND We must do it in batches, for the sake of reduced fsync call count // AND We must know what we stabilize. Basic workflow is like: