From 165c204555ef66a9e7298120ef31d9d1bfd91cd4 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 2 Jun 2020 14:02:39 +0300 Subject: [PATCH] Fix BS_OP_DELETE (the implementation was untested up to this point) --- blockstore_flush.cpp | 35 ++++++++++++++--------------------- blockstore_flush.h | 2 +- blockstore_impl.cpp | 14 ++++++++++++-- blockstore_init.cpp | 33 +++++++++++++++++++-------------- blockstore_write.cpp | 3 +++ 5 files changed, 49 insertions(+), 38 deletions(-) diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 9f98c060..e2fa936b 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -275,32 +275,26 @@ resume_0: #endif flusher->active_flushers++; resume_1: + // Find it in clean_db + clean_it = bs->clean_db.find(cur.oid); + old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX); // Scan dirty versions of the object if (!scan_dirty(1)) { wait_state += 1; return false; } - if (copy_count == 0 && clean_loc == UINT64_MAX && !has_delete && !has_empty) + // Writes and deletes shouldn't happen at the same time + assert(!(copy_count > 0 || has_writes) || !has_delete); + if (copy_count == 0 && !has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX) { // Nothing to flush - flusher->active_flushers--; - repeat_it = flusher->sync_to_repeat.find(cur.oid); - if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version) - { - // Requeue version - flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); - } - flusher->sync_to_repeat.erase(repeat_it); - wait_state = 0; - goto resume_0; + bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc); + goto trim_journal; } - // Find it in clean_db - clean_it = bs->clean_db.find(cur.oid); - old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX); if (clean_loc == UINT64_MAX) { - if (copy_count > 0 && has_delete || old_clean_loc == UINT64_MAX) + if (old_clean_loc == UINT64_MAX) { // Object not allocated. This is a bug. char err[1024]; @@ -471,6 +465,7 @@ resume_1: } // Update clean_db and dirty_db, free old data locations update_clean_db(); + trim_journal: // Clear unused part of the journal every flushes if (!((++flusher->journal_trim_counter) % flusher->journal_trim_interval) || flusher->trim_wanted > 0) { @@ -530,7 +525,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) copy_count = 0; clean_loc = UINT64_MAX; has_delete = false; - has_empty = false; + has_writes = false; skip_copy = false; clean_init_bitmap = false; while (1) @@ -538,11 +533,8 @@ bool journal_flusher_co::scan_dirty(int wait_base) if (dirty_it->second.state == ST_J_STABLE && !skip_copy) { // First we submit all reads - if (dirty_it->second.len == 0) - { - has_empty = true; - } - else + has_writes = true; + if (dirty_it->second.len != 0) { offset = dirty_it->second.offset; end_offset = dirty_it->second.offset + dirty_it->second.len; @@ -584,6 +576,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) else if (dirty_it->second.state == ST_D_STABLE && !skip_copy) { // There is an unflushed big write. Copy small writes in its position + has_writes = true; clean_loc = dirty_it->second.location; clean_init_bitmap = true; clean_bitmap_offset = dirty_it->second.offset; diff --git a/blockstore_flush.h b/blockstore_flush.h index 8705567f..1bd45178 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -45,7 +45,7 @@ class journal_flusher_co std::map::iterator repeat_it; std::function simple_callback_r, simple_callback_w; - bool skip_copy, has_delete, has_empty; + bool skip_copy, has_delete, has_writes; blockstore_clean_db_t::iterator clean_it; std::vector v; std::vector::iterator it; diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 06f86e2b..a0aeb01a 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -144,7 +144,7 @@ void blockstore_impl_t::loop() { dequeue_op = dequeue_read(op); } - else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) + else if (op->opcode == BS_OP_WRITE) { if (has_writes == 2) { @@ -154,6 +154,16 @@ void blockstore_impl_t::loop() dequeue_op = dequeue_write(op); has_writes = dequeue_op ? 1 : 2; } + else if (op->opcode == BS_OP_DELETE) + { + if (has_writes == 2) + { + // Some writes could not be submitted + break; + } + dequeue_op = dequeue_del(op); + has_writes = dequeue_op ? 1 : 2; + } else if (op->opcode == BS_OP_SYNC) { // wait for all small writes to be submitted @@ -370,7 +380,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first) } }; } - if (op->opcode == BS_OP_WRITE && !enqueue_write(op)) + if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_DELETE) && !enqueue_write(op)) { std::function(op->callback)(op); return; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index 50afd339..7d6d6f87 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -671,20 +671,25 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u #ifdef BLOCKSTORE_DEBUG printf("je_delete oid=%lu:%lu ver=%lu\n", je->del.oid.inode, je->del.oid.stripe, je->del.version); #endif - // oid, version - obj_ver_id ov = { - .oid = je->del.oid, - .version = je->del.version, - }; - bs->dirty_db.emplace(ov, (dirty_entry){ - .state = ST_DEL_SYNCED, - .flags = 0, - .location = 0, - .offset = 0, - .len = 0, - .journal_sector = proc_pos, - }); - bs->journal.used_sectors[proc_pos]++; + auto clean_it = bs->clean_db.find(je->del.oid); + if (clean_it == bs->clean_db.end() || + clean_it->second.version < je->del.version) + { + // oid, version + obj_ver_id ov = { + .oid = je->del.oid, + .version = je->del.version, + }; + bs->dirty_db.emplace(ov, (dirty_entry){ + .state = ST_DEL_SYNCED, + .flags = 0, + .location = 0, + .offset = 0, + .len = 0, + .journal_sector = proc_pos, + }); + bs->journal.used_sectors[proc_pos]++; + } } started = true; pos += je->size; diff --git a/blockstore_write.cpp b/blockstore_write.cpp index bcbd6da5..fcb41bef 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -100,6 +100,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) .oid = op->oid, .version = op->version, }); + assert(dirty_it != dirty_db.end()); if (dirty_it->second.state == ST_J_WAIT_BIG) { return 0; @@ -292,6 +293,7 @@ int blockstore_impl_t::continue_write(blockstore_op_t *op) .oid = op->oid, .version = op->version, }); + assert(dirty_it != dirty_db.end()); if (PRIV(op)->op_state == 2) goto resume_2; else if (PRIV(op)->op_state == 4) @@ -435,6 +437,7 @@ int blockstore_impl_t::dequeue_del(blockstore_op_t *op) .oid = op->oid, .version = op->version, }); + assert(dirty_it != dirty_db.end()); blockstore_journal_check_t space_check(this); if (!space_check.check_available(op, 1, sizeof(journal_entry_del), 0)) {