Safe stop procedure

blocking-uring-test
Vitaliy Filippov 2019-11-25 01:29:07 +03:00
parent 50cf3667fa
commit be3015169f
5 changed files with 37 additions and 3 deletions

View File

@ -176,9 +176,32 @@ void blockstore::loop()
} }
} }
bool blockstore::stop() bool blockstore::is_safe_to_stop()
{ {
return false; // It's safe to stop blockstore when there are no in-flight operations,
// no in-progress syncs and flusher isn't doing anything
if (submit_queue.size() > 0 || in_progress_syncs.size() > 0 || flusher->is_active())
{
return false;
}
if (unsynced_big_writes.size() > 0 || unsynced_small_writes.size() > 0)
{
if (!stop_sync_submitted)
{
// We should sync the blockstore before unmounting
blockstore_operation *op = new blockstore_operation;
op->flags = OP_SYNC;
op->buf = NULL;
op->callback = [&](blockstore_operation *op)
{
delete op;
};
enqueue_op(op);
stop_sync_submitted = true;
}
return false;
}
return true;
} }
void blockstore::check_wait(blockstore_operation *op) void blockstore::check_wait(blockstore_operation *op)

View File

@ -271,6 +271,8 @@ class blockstore
ring_loop_t *ringloop; ring_loop_t *ringloop;
bool stop_sync_submitted;
inline struct io_uring_sqe* get_sqe() inline struct io_uring_sqe* get_sqe()
{ {
return ringloop->get_sqe(); return ringloop->get_sqe();
@ -327,12 +329,14 @@ public:
// Event loop // Event loop
void loop(); void loop();
// Returns true when blockstore is ready to process operations
// (Although you're free to enqueue them before that)
bool is_started(); bool is_started();
// Returns true when it's safe to destroy the instance. If destroying the instance // Returns true when it's safe to destroy the instance. If destroying the instance
// requires to purge some queues, starts that process. Should be called in the event // requires to purge some queues, starts that process. Should be called in the event
// loop until it returns true. // loop until it returns true.
bool stop(); bool is_safe_to_stop();
// Submission // Submission
void enqueue_op(blockstore_operation *op); void enqueue_op(blockstore_operation *op);

View File

@ -41,6 +41,11 @@ journal_flusher_t::~journal_flusher_t()
delete[] co; delete[] co;
} }
bool journal_flusher_t::is_active()
{
return active_flushers > 0 || flush_queue.size() > 0;
}
void journal_flusher_t::loop() void journal_flusher_t::loop()
{ {
for (int i = 0; i < flusher_count; i++) for (int i = 0; i < flusher_count; i++)

View File

@ -70,6 +70,7 @@ public:
journal_flusher_t(int flusher_count, blockstore *bs); journal_flusher_t(int flusher_count, blockstore *bs);
~journal_flusher_t(); ~journal_flusher_t();
void loop(); void loop();
bool is_active();
void queue_flush(obj_ver_id oid); void queue_flush(obj_ver_id oid);
void unshift_flush(obj_ver_id oid); void unshift_flush(obj_ver_id oid);
}; };

View File

@ -11,6 +11,7 @@ int blockstore::dequeue_sync(blockstore_operation *op)
{ {
if (op->sync_state == 0) if (op->sync_state == 0)
{ {
stop_sync_submitted = false;
op->sync_big_writes.swap(unsynced_big_writes); op->sync_big_writes.swap(unsynced_big_writes);
op->sync_small_writes.swap(unsynced_small_writes); op->sync_small_writes.swap(unsynced_small_writes);
if (op->sync_big_writes.size() > 0) if (op->sync_big_writes.size() > 0)