Browse Source

Add journal fsync to stabilize/rollback

trace-sqes
Vitaliy Filippov 2 years ago
parent
commit
c3737ae3ff
  1. 5
      blockstore_flush.cpp
  2. 1
      blockstore_flush.h
  3. 2
      blockstore_impl.cpp
  4. 5
      blockstore_impl.h
  5. 100
      blockstore_rollback.cpp
  6. 135
      blockstore_stable.cpp
  7. 48
      blockstore_sync.cpp

5
blockstore_flush.cpp

@ -6,6 +6,7 @@ journal_flusher_t::journal_flusher_t(int flusher_count, blockstore_impl_t *bs)
this->flusher_count = flusher_count;
dequeuing = false;
active_flushers = 0;
syncing_flushers = 0;
sync_threshold = bs->journal_block_size / sizeof(journal_entry_stable);
journal_trim_interval = sync_threshold;
journal_trim_counter = 0;
@ -649,7 +650,8 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
});
sync_found:
cur_sync->ready_count++;
if (cur_sync->ready_count >= flusher->sync_threshold || !flusher->flush_queue.size())
flusher->syncing_flushers++;
if (flusher->syncing_flushers >= flusher->flusher_count || !flusher->flush_queue.size())
{
// Sync batch is ready. Do it.
await_sqe(0);
@ -675,6 +677,7 @@ bool journal_flusher_co::fsync_batch(bool fsync_meta, int wait_base)
wait_state = 2;
return false;
}
flusher->syncing_flushers--;
cur_sync->ready_count--;
if (cur_sync->ready_count == 0)
{

1
blockstore_flush.h

@ -84,6 +84,7 @@ class journal_flusher_t
void* journal_superblock;
int active_flushers;
int syncing_flushers;
std::list<flusher_sync_t> syncs;
std::map<object_id, uint64_t> sync_to_repeat;

2
blockstore_impl.cpp

@ -364,7 +364,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
// Call constructor without allocating memory. We'll call destructor before returning op back
new ((void*)op->private_data) blockstore_op_private_t;
PRIV(op)->wait_for = 0;
PRIV(op)->sync_state = 0;
PRIV(op)->op_state = 0;
PRIV(op)->pending_ops = 0;
if (!first)
{

5
blockstore_impl.h

@ -147,6 +147,7 @@ struct blockstore_op_private_t
int wait_for;
uint64_t wait_detail;
int pending_ops;
int op_state;
// Read
std::vector<fulfill_read_t> read_vec;
@ -161,7 +162,7 @@ struct blockstore_op_private_t
std::vector<obj_ver_id> sync_big_writes, sync_small_writes;
int sync_small_checked, sync_big_checked;
std::list<blockstore_op_t*>::iterator in_progress_ptr;
int sync_state, prev_sync_count;
int prev_sync_count;
};
// https://github.com/algorithm-ninja/cpp-btree
@ -280,11 +281,13 @@ class blockstore_impl_t
// Stabilize
int dequeue_stable(blockstore_op_t *op);
int continue_stable(blockstore_op_t *op);
void handle_stable_event(ring_data_t *data, blockstore_op_t *op);
void stabilize_object(object_id oid, uint64_t max_ver);
// Rollback
int dequeue_rollback(blockstore_op_t *op);
int continue_rollback(blockstore_op_t *op);
void handle_rollback_event(ring_data_t *data, blockstore_op_t *op);
void erase_dirty(blockstore_dirty_db_t::iterator dirty_start, blockstore_dirty_db_t::iterator dirty_end, uint64_t clean_loc);

100
blockstore_rollback.cpp

@ -2,6 +2,10 @@
int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op)
{
if (PRIV(op)->op_state)
{
return continue_rollback(op);
}
obj_ver_id* v;
int i, todo = op->len;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
@ -110,6 +114,70 @@ int blockstore_impl_t::dequeue_rollback(blockstore_op_t *op)
}
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1;
return 1;
}
int blockstore_impl_t::continue_rollback(blockstore_op_t *op)
{
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 3)
goto resume_3;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else
return 1;
resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe = get_sqe();
if (!sqe)
{
return 0;
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4;
return 1;
}
resume_5:
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
// Erase dirty_db entries
auto rm_end = dirty_db.lower_bound((obj_ver_id){
.oid = v->oid,
.version = UINT64_MAX,
});
rm_end--;
auto rm_start = rm_end;
while (1)
{
if (rm_end->first.oid != v->oid)
break;
else if (rm_end->first.version <= v->version)
break;
rm_start = rm_end;
if (rm_end == dirty_db.begin())
break;
rm_end--;
}
if (rm_end != rm_start)
erase_dirty(rm_start, rm_end, UINT64_MAX);
}
journal.trim();
// Acknowledge op
op->retval = 0;
FINISH_OP(op);
return 1;
}
@ -126,37 +194,11 @@ void blockstore_impl_t::handle_rollback_event(ring_data_t *data, blockstore_op_t
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
// Release used journal sectors
release_journal_sectors(op);
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
PRIV(op)->op_state++;
if (!continue_stable(op))
{
// Erase dirty_db entries
auto rm_end = dirty_db.lower_bound((obj_ver_id){
.oid = v->oid,
.version = UINT64_MAX,
});
rm_end--;
auto rm_start = rm_end;
while (1)
{
if (rm_end->first.oid != v->oid)
break;
else if (rm_end->first.version <= v->version)
break;
rm_start = rm_end;
if (rm_end == dirty_db.begin())
break;
rm_end--;
}
if (rm_end != rm_start)
erase_dirty(rm_start, rm_end, UINT64_MAX);
submit_queue.push_front(op);
}
journal.trim();
// Acknowledge op
op->retval = 0;
FINISH_OP(op);
}
}

135
blockstore_stable.cpp

@ -40,6 +40,10 @@
int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
{
if (PRIV(op)->op_state)
{
return continue_stable(op);
}
obj_ver_id* v;
int i, todo = 0;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
@ -127,6 +131,87 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
}
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->op_state = 1;
return 1;
}
int blockstore_impl_t::continue_stable(blockstore_op_t *op)
{
if (PRIV(op)->op_state == 2)
goto resume_2;
else if (PRIV(op)->op_state == 3)
goto resume_3;
else if (PRIV(op)->op_state == 5)
goto resume_5;
else
return 1;
resume_2:
// Release used journal sectors
release_journal_sectors(op);
resume_3:
if (!disable_journal_fsync)
{
io_uring_sqe *sqe = get_sqe();
if (!sqe)
{
return 0;
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_fsync(sqe, journal.fd, IORING_FSYNC_DATASYNC);
data->iov = { 0 };
data->callback = [this, op](ring_data_t *data) { handle_stable_event(data, op); };
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->op_state = 4;
return 1;
}
resume_5:
// Mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{
// Mark all dirty_db entries up to op->version as stable
auto dirty_it = dirty_db.find(*v);
if (dirty_it != dirty_db.end())
{
while (1)
{
if (dirty_it->second.state == ST_J_SYNCED)
{
dirty_it->second.state = ST_J_STABLE;
}
else if (dirty_it->second.state == ST_D_META_SYNCED)
{
dirty_it->second.state = ST_D_STABLE;
}
else if (dirty_it->second.state == ST_DEL_SYNCED)
{
dirty_it->second.state = ST_DEL_STABLE;
}
else if (IS_STABLE(dirty_it->second.state))
{
break;
}
if (dirty_it == dirty_db.begin())
{
break;
}
dirty_it--;
if (dirty_it->first.oid != v->oid)
{
break;
}
}
#ifdef BLOCKSTORE_DEBUG
printf("enqueue_flush %lu:%lu v%lu\n", v->oid.inode, v->oid.stripe, v->version);
#endif
flusher->enqueue_flush(*v);
}
}
// Acknowledge op
op->retval = 0;
FINISH_OP(op);
return 1;
}
@ -143,54 +228,10 @@ void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *
PRIV(op)->pending_ops--;
if (PRIV(op)->pending_ops == 0)
{
// FIXME Oops. We must sync the device!
// Release used journal sectors
release_journal_sectors(op);
// Mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v;
int i;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
PRIV(op)->op_state++;
if (!continue_stable(op))
{
// Mark all dirty_db entries up to op->version as stable
auto dirty_it = dirty_db.find(*v);
if (dirty_it != dirty_db.end())
{
while (1)
{
if (dirty_it->second.state == ST_J_SYNCED)
{
dirty_it->second.state = ST_J_STABLE;
}
else if (dirty_it->second.state == ST_D_META_SYNCED)
{
dirty_it->second.state = ST_D_STABLE;
}
else if (dirty_it->second.state == ST_DEL_SYNCED)
{
dirty_it->second.state = ST_DEL_STABLE;
}
else if (IS_STABLE(dirty_it->second.state))
{
break;
}
if (dirty_it == dirty_db.begin())
{
break;
}
dirty_it--;
if (dirty_it->first.oid != v->oid)
{
break;
}
}
#ifdef BLOCKSTORE_DEBUG
printf("enqueue_flush %lu:%lu v%lu\n", v->oid.inode, v->oid.stripe, v->version);
#endif
flusher->enqueue_flush(*v);
}
submit_queue.push_front(op);
}
// Acknowledge op
op->retval = 0;
FINISH_OP(op);
}
}

48
blockstore_sync.cpp

@ -11,7 +11,7 @@
int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
{
if (PRIV(op)->sync_state == 0)
if (PRIV(op)->op_state == 0)
{
stop_sync_submitted = false;
PRIV(op)->sync_big_writes.swap(unsynced_big_writes);
@ -21,11 +21,11 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
unsynced_big_writes.clear();
unsynced_small_writes.clear();
if (PRIV(op)->sync_big_writes.size() > 0)
PRIV(op)->sync_state = SYNC_HAS_BIG;
PRIV(op)->op_state = SYNC_HAS_BIG;
else if (PRIV(op)->sync_small_writes.size() > 0)
PRIV(op)->sync_state = SYNC_HAS_SMALL;
PRIV(op)->op_state = SYNC_HAS_SMALL;
else
PRIV(op)->sync_state = SYNC_DONE;
PRIV(op)->op_state = SYNC_DONE;
// Always add sync to in_progress_syncs because we clear unsynced_big_writes and unsynced_small_writes
PRIV(op)->prev_sync_count = in_progress_syncs.size();
PRIV(op)->in_progress_ptr = in_progress_syncs.insert(in_progress_syncs.end(), op);
@ -38,7 +38,7 @@ int blockstore_impl_t::dequeue_sync(blockstore_op_t *op)
int blockstore_impl_t::continue_sync(blockstore_op_t *op)
{
auto cb = [this, op](ring_data_t *data) { handle_sync_event(data, op); };
if (PRIV(op)->sync_state == SYNC_HAS_SMALL)
if (PRIV(op)->op_state == SYNC_HAS_SMALL)
{
// No big writes, just fsync the journal
for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
@ -56,15 +56,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
prepare_journal_sector_write(journal, journal.cur_sector, sqe, cb);
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = 1;
PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1;
}
else
{
PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
}
}
if (PRIV(op)->sync_state == SYNC_HAS_BIG)
if (PRIV(op)->op_state == SYNC_HAS_BIG)
{
for (; PRIV(op)->sync_big_checked < PRIV(op)->sync_big_writes.size(); PRIV(op)->sync_big_checked++)
{
@ -83,15 +83,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
data->callback = cb;
PRIV(op)->min_flushed_journal_sector = PRIV(op)->max_flushed_journal_sector = 0;
PRIV(op)->pending_ops = 1;
PRIV(op)->sync_state = SYNC_DATA_SYNC_SENT;
PRIV(op)->op_state = SYNC_DATA_SYNC_SENT;
return 1;
}
else
{
PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE;
PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
}
}
if (PRIV(op)->sync_state == SYNC_DATA_SYNC_DONE)
if (PRIV(op)->op_state == SYNC_DATA_SYNC_DONE)
{
for (; PRIV(op)->sync_small_checked < PRIV(op)->sync_small_writes.size(); PRIV(op)->sync_small_checked++)
{
@ -153,10 +153,10 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
}
PRIV(op)->max_flushed_journal_sector = 1 + journal.cur_sector;
PRIV(op)->pending_ops = s;
PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_SENT;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_SENT;
return 1;
}
if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_DONE)
if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_DONE)
{
if (!disable_journal_fsync)
{
@ -165,15 +165,15 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
data->iov = { 0 };
data->callback = cb;
PRIV(op)->pending_ops = 1;
PRIV(op)->sync_state = SYNC_JOURNAL_SYNC_SENT;
PRIV(op)->op_state = SYNC_JOURNAL_SYNC_SENT;
return 1;
}
else
{
PRIV(op)->sync_state = SYNC_DONE;
PRIV(op)->op_state = SYNC_DONE;
}
}
if (PRIV(op)->sync_state == SYNC_DONE)
if (PRIV(op)->op_state == SYNC_DONE)
{
ack_sync(op);
}
@ -196,17 +196,17 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op
// Release used journal sectors
release_journal_sectors(op);
// Handle states
if (PRIV(op)->sync_state == SYNC_DATA_SYNC_SENT)
if (PRIV(op)->op_state == SYNC_DATA_SYNC_SENT)
{
PRIV(op)->sync_state = SYNC_DATA_SYNC_DONE;
PRIV(op)->op_state = SYNC_DATA_SYNC_DONE;
}
else if (PRIV(op)->sync_state == SYNC_JOURNAL_WRITE_SENT)
else if (PRIV(op)->op_state == SYNC_JOURNAL_WRITE_SENT)
{
PRIV(op)->sync_state = SYNC_JOURNAL_WRITE_DONE;
PRIV(op)->op_state = SYNC_JOURNAL_WRITE_DONE;
}
else if (PRIV(op)->sync_state == SYNC_JOURNAL_SYNC_SENT)
else if (PRIV(op)->op_state == SYNC_JOURNAL_SYNC_SENT)
{
PRIV(op)->sync_state = SYNC_DONE;
PRIV(op)->op_state = SYNC_DONE;
ack_sync(op);
}
else
@ -218,7 +218,7 @@ void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op
int blockstore_impl_t::ack_sync(blockstore_op_t *op)
{
if (PRIV(op)->sync_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0)
if (PRIV(op)->op_state == SYNC_DONE && PRIV(op)->prev_sync_count == 0)
{
// Remove dependency of subsequent syncs
auto it = PRIV(op)->in_progress_ptr;
@ -230,7 +230,7 @@ int blockstore_impl_t::ack_sync(blockstore_op_t *op)
{
auto & next_sync = *it++;
PRIV(next_sync)->prev_sync_count -= done_syncs;
if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->sync_state == SYNC_DONE)
if (PRIV(next_sync)->prev_sync_count == 0 && PRIV(next_sync)->op_state == SYNC_DONE)
{
done_syncs++;
// Acknowledge next_sync

Loading…
Cancel
Save