Collapse create+delete journal entry pairs if they're already flushed

Old journal replay mechanism could lead to a double allocation of the same
block and a "Fatal error: tried to overwrite non-zero metadata entry"
rel-0.5
Vitaliy Filippov 2021-03-27 23:21:18 +03:00
parent 843b7052d2
commit f4769ba7c7
2 changed files with 43 additions and 29 deletions

View File

@ -582,32 +582,10 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
// its data and metadata are already flushed.
// We don't know if newer versions are flushed, but
// the previous delete definitely is.
// So we flush previous dirty entries, but retain the clean one.
// So we forget previous dirty entries, but retain the clean one.
// This feature is required for writes happening shortly
// after deletes.
auto dirty_end = dirty_it;
dirty_end++;
while (1)
{
if (dirty_it == bs->dirty_db.begin())
{
break;
}
dirty_it--;
if (dirty_it->first.oid != je->big_write.oid)
{
dirty_it++;
break;
}
}
auto clean_it = bs->clean_db.find(je->big_write.oid);
bs->erase_dirty(
dirty_it, dirty_end,
clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX
);
// Remove it from the flusher's queue, too
// Otherwise it may end up referring to a small unstable write after reading the rest of the journal
bs->flusher->remove_flush(je->big_write.oid);
erase_dirty_object(dirty_it);
}
}
auto clean_it = bs->clean_db.find(je->big_write.oid);
@ -679,16 +657,26 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
#ifdef BLOCKSTORE_DEBUG
printf("je_delete oid=%lx:%lx ver=%lu\n", je->del.oid.inode, je->del.oid.stripe, je->del.version);
#endif
bool dirty_exists = false;
auto dirty_it = bs->dirty_db.upper_bound((obj_ver_id){
.oid = je->del.oid,
.version = UINT64_MAX,
});
if (dirty_it != bs->dirty_db.begin())
{
dirty_it--;
dirty_exists = dirty_it->first.oid == je->del.oid;
}
auto clean_it = bs->clean_db.find(je->del.oid);
// Ignore delete if neither preceding dirty entries nor the clean one are present
if ((clean_it != bs->clean_db.end() &&
clean_it->second.version < je->del.version) ||
(dirty_it != bs->dirty_db.begin() &&
std::prev(dirty_it)->first.oid == je->del.oid))
bool clean_exists = (clean_it != bs->clean_db.end() &&
clean_it->second.version < je->del.version);
if (!clean_exists && dirty_exists)
{
// Clean entry doesn't exist. This means that the delete is already flushed.
// So we must not flush this object anymore.
erase_dirty_object(dirty_it);
}
else if (clean_exists || dirty_exists)
{
// oid, version
obj_ver_id ov = {
@ -708,6 +696,7 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
// "2-phase commit" (write->stabilize) isn't sufficient for them anyway
bs->mark_stable(ov);
}
// Ignore delete if neither preceding dirty entries nor the clean one are present
}
started = true;
pos += je->size;
@ -718,3 +707,27 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
bs->journal.next_free = next_free;
return 1;
}
void blockstore_init_journal::erase_dirty_object(blockstore_dirty_db_t::iterator dirty_it)
{
auto oid = dirty_it->first.oid;
auto dirty_end = dirty_it;
dirty_end++;
while (1)
{
if (dirty_it == bs->dirty_db.begin())
{
break;
}
dirty_it--;
if (dirty_it->first.oid != oid)
{
dirty_it++;
break;
}
}
bs->erase_dirty(dirty_it, dirty_end, UINT64_MAX);
// Remove it from the flusher's queue, too
// Otherwise it may end up referring to a small unstable write after reading the rest of the journal
bs->flusher->remove_flush(oid);
}

View File

@ -48,6 +48,7 @@ class blockstore_init_journal
std::function<void(ring_data_t*)> simple_callback;
int handle_journal_part(void *buf, uint64_t done_pos, uint64_t len);
void handle_event(ring_data_t *data);
void erase_dirty_object(blockstore_dirty_db_t::iterator dirty_it);
public:
blockstore_init_journal(blockstore_impl_t* bs);
int loop();