diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index f94f359a..e4550e9a 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -76,6 +76,9 @@ void journal_flusher_t::loop() void journal_flusher_t::enqueue_flush(obj_ver_id ov) { +#ifdef BLOCKSTORE_DEBUG + printf("enqueue_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); +#endif auto it = flush_versions.find(ov.oid); if (it != flush_versions.end()) { @@ -94,8 +97,11 @@ void journal_flusher_t::enqueue_flush(obj_ver_id ov) } } -void journal_flusher_t::unshift_flush(obj_ver_id ov) +void journal_flusher_t::unshift_flush(obj_ver_id ov, bool force) { +#ifdef BLOCKSTORE_DEBUG + printf("unshift_flush %lx:%lx v%lu\n", ov.oid.inode, ov.oid.stripe, ov.version); +#endif auto it = flush_versions.find(ov.oid); if (it != flush_versions.end()) { @@ -105,15 +111,38 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov) else { flush_versions[ov.oid] = ov.version; + if (!force) + flush_queue.push_front(ov.oid); } - flush_queue.push_front(ov.oid); - if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0)) + if (force) + flush_queue.push_front(ov.oid); + if (force || !dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0)) { dequeuing = true; bs->ringloop->wakeup(); } } +void journal_flusher_t::remove_flush(object_id oid) +{ +#ifdef BLOCKSTORE_DEBUG + printf("undo_flush %lx:%lx\n", oid.inode, oid.stripe); +#endif + auto v_it = flush_versions.find(oid); + if (v_it != flush_versions.end()) + { + flush_versions.erase(v_it); + for (auto q_it = flush_queue.begin(); q_it != flush_queue.end(); q_it++) + { + if (*q_it == oid) + { + flush_queue.erase(q_it); + break; + } + } + } +} + void journal_flusher_t::request_trim() { dequeuing = true; @@ -319,8 +348,8 @@ resume_1: return false; } // Writes and deletes shouldn't happen at the same time - assert(!(copy_count > 0 || has_writes) || !has_delete); - if (copy_count == 0 && !has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX) + assert(!has_writes || !has_delete); + if (!has_writes && !has_delete || has_delete && old_clean_loc == UINT64_MAX) { // Nothing to flush bs->erase_dirty(dirty_start, std::next(dirty_end), clean_loc); @@ -445,8 +474,8 @@ resume_1: clean_disk_entry *new_entry = (clean_disk_entry*)(meta_new.buf + meta_new.pos*bs->clean_entry_size); if (new_entry->oid.inode != 0 && new_entry->oid != cur.oid) { - printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lx (%lx:%lx) with %lx:%lx\n", - clean_loc, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe); + printf("Fatal error (metadata corruption or bug): tried to overwrite non-zero metadata entry %lu (%lx:%lx) with %lx:%lx\n", + clean_loc >> bs->block_order, new_entry->oid.inode, new_entry->oid.stripe, cur.oid.inode, cur.oid.stripe); exit(1); } new_entry->oid = cur.oid; @@ -513,7 +542,7 @@ resume_1: if (repeat_it != flusher->sync_to_repeat.end() && repeat_it->second > cur.version) { // Requeue version - flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }); + flusher->unshift_flush({ .oid = cur.oid, .version = repeat_it->second }, false); } flusher->sync_to_repeat.erase(repeat_it); trim_journal: @@ -602,7 +631,7 @@ bool journal_flusher_co::scan_dirty(int wait_base) { char err[1024]; snprintf( - err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu state during flush: %d", + err, 1024, "BUG: Unexpected dirty_entry %lx:%lx v%lu unstable state during flush: %d", dirty_it->first.oid.inode, dirty_it->first.oid.stripe, dirty_it->first.version, dirty_it->second.state ); throw std::runtime_error(err); diff --git a/blockstore_flush.h b/blockstore_flush.h index 1d5e0d07..73bc058a 100644 --- a/blockstore_flush.h +++ b/blockstore_flush.h @@ -107,5 +107,6 @@ public: void request_trim(); void release_trim(); void enqueue_flush(obj_ver_id oid); - void unshift_flush(obj_ver_id oid); + void unshift_flush(obj_ver_id oid, bool force); + void remove_flush(object_id oid); }; diff --git a/blockstore_init.cpp b/blockstore_init.cpp index cf80080f..64ee86f5 100644 --- a/blockstore_init.cpp +++ b/blockstore_init.cpp @@ -111,7 +111,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo { // free the previous block #ifdef BLOCKSTORE_DEBUG - printf("Free block %lu (new location is %lu)\n", clean_it->second.location >> block_order, done_cnt+i >> block_order); + printf("Free block %lu (new location is %lu)\n", clean_it->second.location >> block_order, done_cnt+i); #endif bs->data_alloc->set(clean_it->second.location >> block_order, false); } @@ -557,9 +557,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u { #ifdef BLOCKSTORE_DEBUG printf( - "je_big_write%s oid=%lx:%lx ver=%lu loc=%08lx\n", + "je_big_write%s oid=%lx:%lx ver=%lu loc=%lu\n", je->type == JE_BIG_WRITE_INSTANT ? "_instant" : "", - je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location + je->big_write.oid.inode, je->big_write.oid.stripe, je->big_write.version, je->big_write.location >> bs->block_order ); #endif auto dirty_it = bs->dirty_db.upper_bound((obj_ver_id){ @@ -570,13 +570,18 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u { dirty_it--; if (dirty_it->first.oid == je->big_write.oid && + dirty_it->first.version >= je->big_write.version && (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_DELETE) { // It is allowed to overwrite a deleted object with a - // version number less than deletion version number, + // version number smaller than deletion version number, // because the presence of a BIG_WRITE entry means that - // the data for it is already on disk. - // Purge all dirty and clean entries for this object. + // its data and metadata are already flushed. + // We don't know if newer versions are flushed, but + // the previous delete definitely is. + // So we flush previous dirty entries, but retain the clean one. + // This feature is required for writes happening shortly + // after deletes. auto dirty_end = dirty_it; dirty_end++; while (1) @@ -592,13 +597,14 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u break; } } - bs->erase_dirty(dirty_it, dirty_end, UINT64_MAX); auto clean_it = bs->clean_db.find(je->big_write.oid); - if (clean_it != bs->clean_db.end()) - { - bs->data_alloc->set(clean_it->second.location >> bs->block_order, false); - bs->clean_db.erase(clean_it); - } + bs->erase_dirty( + dirty_it, dirty_end, + clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX + ); + // Remove it from the flusher's queue, too + // Otherwise it may end up referring to a small unstable write after reading the rest of the journal + bs->flusher->remove_flush(je->big_write.oid); } } auto clean_it = bs->clean_db.find(je->big_write.oid); diff --git a/blockstore_rollback.cpp b/blockstore_rollback.cpp index ae1b5a1c..6e757bcd 100644 --- a/blockstore_rollback.cpp +++ b/blockstore_rollback.cpp @@ -243,6 +243,9 @@ void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, if (IS_DELETE(dirty_it->second.state)) { object_id oid = dirty_it->first.oid; +#ifdef BLOCKSTORE_DEBUG + printf("Unblock writes-after-delete %lx:%lx v%lx\n", oid.inode, oid.stripe, dirty_it->first.version); +#endif dirty_it = dirty_end; // Unblock operations blocked by delete flushing uint32_t next_state = BS_ST_IN_FLIGHT; diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 02bf885a..df2f0978 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -213,9 +213,6 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v) break; } } -#ifdef BLOCKSTORE_DEBUG - printf("enqueue_flush %lx:%lx v%lu\n", v.oid.inode, v.oid.stripe, v.version); -#endif flusher->enqueue_flush(v); } auto unstab_it = unstable_writes.find(v.oid); diff --git a/blockstore_write.cpp b/blockstore_write.cpp index c43ca239..e18905b5 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -57,13 +57,16 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) { // It's allowed to write versions with low numbers over deletes // However, we have to flush those deletes first as we use version number for ordering +#ifdef BLOCKSTORE_DEBUG + printf("Write %lx:%lx v%lu over delete (real v%lu) offset=%u len=%u\n", op->oid.inode, op->oid.stripe, version, op->version, op->offset, op->len); +#endif wait_del = true; PRIV(op)->real_version = op->version; op->version = version; flusher->unshift_flush((obj_ver_id){ .oid = op->oid, .version = version-1, - }); + }, true); } else { @@ -87,7 +90,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) #ifdef BLOCKSTORE_DEBUG if (is_del) printf("Delete %lx:%lx v%lu\n", op->oid.inode, op->oid.stripe, op->version); - else + else if (!wait_del) printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len); #endif // FIXME No strict need to add it into dirty_db here, it's just left @@ -141,6 +144,9 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) if (PRIV(op)->real_version != 0) { // Restore original low version number for unblocked operations +#ifdef BLOCKSTORE_DEBUG + printf("Restoring %lx:%lx version: v%lu -> v%lu\n", op->oid.inode, op->oid.stripe, op->version, PRIV(op)->real_version); +#endif auto prev_it = dirty_it; prev_it--; if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version) @@ -396,7 +402,7 @@ resume_2: resume_4: // Switch object state #ifdef BLOCKSTORE_DEBUG - printf("Ack write %lx:%lx v%lu = %d\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); + printf("Ack write %lx:%lx v%lu = state %x\n", op->oid.inode, op->oid.stripe, op->version, dirty_it->second.state); #endif bool imm = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE ? (immediate_commit == IMMEDIATE_ALL)