forked from vitalif/vitastor
Add queue stall tracking
parent
522a9db0e2
commit
b3f2102f33
|
@ -20,6 +20,11 @@ bool blockstore_t::is_started()
|
||||||
return impl->is_started();
|
return impl->is_started();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool blockstore_t::is_stalled()
|
||||||
|
{
|
||||||
|
return impl->is_stalled();
|
||||||
|
}
|
||||||
|
|
||||||
bool blockstore_t::is_safe_to_stop()
|
bool blockstore_t::is_safe_to_stop()
|
||||||
{
|
{
|
||||||
return impl->is_safe_to_stop();
|
return impl->is_safe_to_stop();
|
||||||
|
|
|
@ -82,6 +82,9 @@ public:
|
||||||
// (Although you're free to enqueue them before that)
|
// (Although you're free to enqueue them before that)
|
||||||
bool is_started();
|
bool is_started();
|
||||||
|
|
||||||
|
// Returns true when blockstore is stalled
|
||||||
|
bool is_stalled();
|
||||||
|
|
||||||
// Returns true when it's safe to destroy the instance. If destroying the instance
|
// Returns true when it's safe to destroy the instance. If destroying the instance
|
||||||
// requires to purge some queues, starts that process. Should be called in the event
|
// requires to purge some queues, starts that process. Should be called in the event
|
||||||
// loop until it returns true.
|
// loop until it returns true.
|
||||||
|
|
|
@ -22,6 +22,7 @@ journal_flusher_co::journal_flusher_co()
|
||||||
wait_state = 0;
|
wait_state = 0;
|
||||||
simple_callback_r = [this](ring_data_t* data)
|
simple_callback_r = [this](ring_data_t* data)
|
||||||
{
|
{
|
||||||
|
bs->live = true;
|
||||||
if (data->res != data->iov.iov_len)
|
if (data->res != data->iov.iov_len)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(
|
throw std::runtime_error(
|
||||||
|
@ -33,6 +34,7 @@ journal_flusher_co::journal_flusher_co()
|
||||||
};
|
};
|
||||||
simple_callback_w = [this](ring_data_t* data)
|
simple_callback_w = [this](ring_data_t* data)
|
||||||
{
|
{
|
||||||
|
bs->live = true;
|
||||||
if (data->res != data->iov.iov_len)
|
if (data->res != data->iov.iov_len)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(
|
throw std::runtime_error(
|
||||||
|
|
|
@ -64,6 +64,11 @@ bool blockstore_impl_t::is_started()
|
||||||
return initialized == 10;
|
return initialized == 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool blockstore_impl_t::is_stalled()
|
||||||
|
{
|
||||||
|
return queue_stall;
|
||||||
|
}
|
||||||
|
|
||||||
// main event loop - produce requests
|
// main event loop - produce requests
|
||||||
void blockstore_impl_t::loop()
|
void blockstore_impl_t::loop()
|
||||||
{
|
{
|
||||||
|
@ -100,6 +105,7 @@ void blockstore_impl_t::loop()
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// try to submit ops
|
// try to submit ops
|
||||||
|
unsigned initial_ring_space = ringloop->space_left();
|
||||||
auto cur_sync = in_progress_syncs.begin();
|
auto cur_sync = in_progress_syncs.begin();
|
||||||
while (cur_sync != in_progress_syncs.end())
|
while (cur_sync != in_progress_syncs.end())
|
||||||
{
|
{
|
||||||
|
@ -190,6 +196,12 @@ void blockstore_impl_t::loop()
|
||||||
{
|
{
|
||||||
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
|
throw std::runtime_error(std::string("io_uring_submit: ") + strerror(-ret));
|
||||||
}
|
}
|
||||||
|
if ((initial_ring_space - ringloop->space_left()) > 0)
|
||||||
|
{
|
||||||
|
live = true;
|
||||||
|
}
|
||||||
|
queue_stall = !live && !ringloop->get_loop_again();
|
||||||
|
live = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -207,6 +207,7 @@ class blockstore_impl_t
|
||||||
struct journal_t journal;
|
struct journal_t journal;
|
||||||
journal_flusher_t *flusher;
|
journal_flusher_t *flusher;
|
||||||
|
|
||||||
|
bool live = false, queue_stall = false;
|
||||||
ring_loop_t *ringloop;
|
ring_loop_t *ringloop;
|
||||||
|
|
||||||
bool stop_sync_submitted;
|
bool stop_sync_submitted;
|
||||||
|
@ -281,6 +282,9 @@ public:
|
||||||
// loop until it returns true.
|
// loop until it returns true.
|
||||||
bool is_safe_to_stop();
|
bool is_safe_to_stop();
|
||||||
|
|
||||||
|
// Returns true if stalled
|
||||||
|
bool is_stalled();
|
||||||
|
|
||||||
// Submission
|
// Submission
|
||||||
void enqueue_op(blockstore_op_t *op);
|
void enqueue_op(blockstore_op_t *op);
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||||
|
|
||||||
void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op)
|
void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
|
live = true;
|
||||||
PRIV(op)->pending_ops--;
|
PRIV(op)->pending_ops--;
|
||||||
if (data->res != data->iov.iov_len)
|
if (data->res != data->iov.iov_len)
|
||||||
{
|
{
|
||||||
|
|
|
@ -123,6 +123,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
|
||||||
|
|
||||||
void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op)
|
void blockstore_impl_t::handle_stable_event(ring_data_t *data, blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
|
live = true;
|
||||||
if (data->res != data->iov.iov_len)
|
if (data->res != data->iov.iov_len)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(
|
throw std::runtime_error(
|
||||||
|
|
|
@ -133,6 +133,7 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
|
||||||
|
|
||||||
void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
|
void blockstore_impl_t::handle_sync_event(ring_data_t *data, blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
|
live = true;
|
||||||
if (data->res != data->iov.iov_len)
|
if (data->res != data->iov.iov_len)
|
||||||
{
|
{
|
||||||
throw std::runtime_error(
|
throw std::runtime_error(
|
||||||
|
|
|
@ -212,6 +212,7 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op)
|
||||||
|
|
||||||
void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op)
|
void blockstore_impl_t::handle_write_event(ring_data_t *data, blockstore_op_t *op)
|
||||||
{
|
{
|
||||||
|
live = true;
|
||||||
if (data->res != data->iov.iov_len)
|
if (data->res != data->iov.iov_len)
|
||||||
{
|
{
|
||||||
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
|
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
|
||||||
|
|
Loading…
Reference in New Issue