Allow writes with low version numbers after a delete

Vitaliy Filippov 2020-12-04 11:37:01 +03:00
parent 089f138e0c
commit 44656fbf67
4 changed files with 117 additions and 32 deletions

View File

@ -105,8 +105,8 @@ void journal_flusher_t::unshift_flush(obj_ver_id ov)
else else
{ {
flush_versions[ov.oid] = ov.version; flush_versions[ov.oid] = ov.version;
flush_queue.push_front(ov.oid);
} }
flush_queue.push_front(ov.oid);
if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0)) if (!dequeuing && (flush_queue.size() >= flusher_start_threshold || trim_wanted > 0))
{ {
dequeuing = true; dequeuing = true;

View File

@ -30,12 +30,13 @@
#define BS_ST_BIG_WRITE 0x02 #define BS_ST_BIG_WRITE 0x02
#define BS_ST_DELETE 0x03 #define BS_ST_DELETE 0x03
#define BS_ST_WAIT_BIG 0x10 #define BS_ST_WAIT_DEL 0x10
#define BS_ST_IN_FLIGHT 0x20 #define BS_ST_WAIT_BIG 0x20
#define BS_ST_SUBMITTED 0x30 #define BS_ST_IN_FLIGHT 0x30
#define BS_ST_WRITTEN 0x40 #define BS_ST_SUBMITTED 0x40
#define BS_ST_SYNCED 0x50 #define BS_ST_WRITTEN 0x50
#define BS_ST_STABLE 0x60 #define BS_ST_SYNCED 0x60
#define BS_ST_STABLE 0x70
#define BS_ST_INSTANT 0x100 #define BS_ST_INSTANT 0x100
@ -153,6 +154,8 @@ struct blockstore_op_private_t
// Write // Write
struct iovec iov_zerofill[3]; struct iovec iov_zerofill[3];
// Warning: must not have a default value here because it's written to before calling constructor in blockstore_write.cpp O_o
uint64_t real_version;
// Sync // Sync
std::vector<obj_ver_id> sync_big_writes, sync_small_writes; std::vector<obj_ver_id> sync_big_writes, sync_small_writes;

View File

@ -234,10 +234,35 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t
void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc) void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc)
{ {
auto dirty_it = dirty_end; if (dirty_end == dirty_start)
while (dirty_it != dirty_start)
{ {
return;
}
auto dirty_it = dirty_end;
dirty_it--;
if (IS_DELETE(dirty_it->second.state))
{
object_id oid = dirty_it->first.oid;
dirty_it = dirty_end;
// Unblock operations blocked by delete flushing
uint32_t next_state = BS_ST_IN_FLIGHT;
while (dirty_it != dirty_db.end() && dirty_it->first.oid == oid)
{
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_DEL)
{
dirty_it->second.state = (dirty_it->second.state & ~BS_ST_WORKFLOW_MASK) | next_state;
if (IS_BIG_WRITE(dirty_it->second.state))
{
next_state = BS_ST_WAIT_BIG;
}
}
dirty_it++;
}
dirty_it = dirty_end;
dirty_it--; dirty_it--;
}
while (1)
{
if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc) if (IS_BIG_WRITE(dirty_it->second.state) && dirty_it->second.location != clean_loc)
{ {
#ifdef BLOCKSTORE_DEBUG #ifdef BLOCKSTORE_DEBUG
@ -256,6 +281,11 @@ void blockstore_impl_t::erase_dirty(blockstore_dirty_db_t::iterator dirty_start,
{ {
journal.used_sectors.erase(dirty_it->second.journal_sector); journal.used_sectors.erase(dirty_it->second.journal_sector);
} }
if (dirty_it == dirty_start)
{
break;
}
dirty_it--;
} }
dirty_db.erase(dirty_start, dirty_end); dirty_db.erase(dirty_start, dirty_end);
} }

View File

@ -7,7 +7,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
{ {
// Check or assign version number // Check or assign version number
bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE); bool found = false, deleted = false, is_del = (op->opcode == BS_OP_DELETE);
bool is_inflight_big = false; bool wait_big = false, wait_del = false;
uint64_t version = 1; uint64_t version = 1;
if (dirty_db.size() > 0) if (dirty_db.size() > 0)
{ {
@ -21,7 +21,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
found = true; found = true;
version = dirty_it->first.version + 1; version = dirty_it->first.version + 1;
deleted = IS_DELETE(dirty_it->second.state); deleted = IS_DELETE(dirty_it->second.state);
is_inflight_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE wait_del = ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_DEL);
wait_big = (dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE
? !IS_SYNCED(dirty_it->second.state) ? !IS_SYNCED(dirty_it->second.state)
: ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG); : ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG);
} }
@ -38,23 +39,40 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
deleted = true; deleted = true;
} }
} }
if (op->version == 0)
{
op->version = version;
}
else if (op->version < version)
{
// Invalid version requested
op->retval = -EEXIST;
return false;
}
if (deleted && is_del) if (deleted && is_del)
{ {
// Already deleted // Already deleted
op->retval = 0; op->retval = 0;
return false; return false;
} }
if (is_inflight_big && !is_del && !deleted && op->len < block_size && PRIV(op)->real_version = 0;
if (op->version == 0)
{
op->version = version;
}
else if (op->version < version)
{
// Implicit operations must be added like that: DEL [FLUSH] BIG [SYNC] SMALL SMALL
if (deleted || wait_del)
{
// It's allowed to write versions with low numbers over deletes
// However, we have to flush those deletes first as we use version number for ordering
wait_del = true;
PRIV(op)->real_version = op->version;
op->version = version;
flusher->unshift_flush((obj_ver_id){
.oid = op->oid,
.version = version-1,
});
}
else
{
// Invalid version requested
op->retval = -EEXIST;
return false;
}
}
if (wait_big && !is_del && !deleted && op->len < block_size &&
immediate_commit != IMMEDIATE_ALL) immediate_commit != IMMEDIATE_ALL)
{ {
// Issue an additional sync so that the previous big write can reach the journal // Issue an additional sync so that the previous big write can reach the journal
@ -72,19 +90,28 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
else else
printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len); printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len);
#endif #endif
// No strict need to add it into dirty_db here, it's just left // FIXME No strict need to add it into dirty_db here, it's just left
// from the previous implementation where reads waited for writes // from the previous implementation where reads waited for writes
uint32_t state;
if (is_del)
state = BS_ST_DELETE | BS_ST_IN_FLIGHT;
else
{
state = (op->len == block_size || deleted ? BS_ST_BIG_WRITE : BS_ST_SMALL_WRITE);
if (wait_del)
state |= BS_ST_WAIT_DEL;
else if (state == BS_ST_SMALL_WRITE && wait_big)
state |= BS_ST_WAIT_BIG;
else
state |= BS_ST_IN_FLIGHT;
if (op->opcode == BS_OP_WRITE_STABLE)
state |= BS_ST_INSTANT;
}
dirty_db.emplace((obj_ver_id){ dirty_db.emplace((obj_ver_id){
.oid = op->oid, .oid = op->oid,
.version = op->version, .version = op->version,
}, (dirty_entry){ }, (dirty_entry){
.state = (uint32_t)( .state = state,
is_del
? (BS_ST_DELETE | BS_ST_IN_FLIGHT)
: (op->opcode == BS_OP_WRITE_STABLE ? BS_ST_INSTANT : 0) | (op->len == block_size || deleted
? (BS_ST_BIG_WRITE | BS_ST_IN_FLIGHT)
: (is_inflight_big ? (BS_ST_SMALL_WRITE | BS_ST_WAIT_BIG) : (BS_ST_SMALL_WRITE | BS_ST_IN_FLIGHT)))
),
.flags = 0, .flags = 0,
.location = 0, .location = 0,
.offset = is_del ? 0 : op->offset, .offset = is_del ? 0 : op->offset,
@ -106,12 +133,35 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
.version = op->version, .version = op->version,
}); });
assert(dirty_it != dirty_db.end()); assert(dirty_it != dirty_db.end());
if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) == BS_ST_WAIT_BIG) if ((dirty_it->second.state & BS_ST_WORKFLOW_MASK) < BS_ST_IN_FLIGHT)
{ {
// Don't dequeue // Don't dequeue
return 0; return 0;
} }
else if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE) if (PRIV(op)->real_version != 0)
{
// Restore original low version number for unblocked operations
auto prev_it = dirty_it;
prev_it--;
if (prev_it->first.oid == op->oid && prev_it->first.version >= PRIV(op)->real_version)
{
// Original version is still invalid
// FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it
dirty_db.erase(dirty_it);
op->retval = -EEXIST;
FINISH_OP(op);
return 1;
}
op->version = PRIV(op)->real_version;
PRIV(op)->real_version = 0;
dirty_entry e = dirty_it->second;
dirty_db.erase(dirty_it);
dirty_it = dirty_db.emplace((obj_ver_id){
.oid = op->oid,
.version = op->version,
}, e).first;
}
if ((dirty_it->second.state & BS_ST_TYPE_MASK) == BS_ST_BIG_WRITE)
{ {
blockstore_journal_check_t space_check(this); blockstore_journal_check_t space_check(this);
if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION)) if (!space_check.check_available(op, unsynced_big_writes.size() + 1, sizeof(journal_entry_big_write), JOURNAL_STABILIZE_RESERVATION))
@ -129,6 +179,8 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
PRIV(op)->wait_for = WAIT_FREE; PRIV(op)->wait_for = WAIT_FREE;
return 0; return 0;
} }
// FIXME Oops. Successive small writes will currently break in an unexpected way. Fix it
dirty_db.erase(dirty_it);
op->retval = -ENOSPC; op->retval = -ENOSPC;
FINISH_OP(op); FINISH_OP(op);
return 1; return 1;