Use std::vector for the blockstore submission queue

Vitaliy Filippov 2021-03-07 00:48:34 +03:00
parent 0d8b5e2ef9
commit 36c935ace6
3 changed files with 32 additions and 39 deletions

View File

@ -101,15 +101,14 @@ void blockstore_impl_t::loop()
{ {
// try to submit ops // try to submit ops
unsigned initial_ring_space = ringloop->space_left(); unsigned initial_ring_space = ringloop->space_left();
auto cur = submit_queue.begin();
// has_writes == 0 - no writes before the current queue item // has_writes == 0 - no writes before the current queue item
// has_writes == 1 - some writes in progress // has_writes == 1 - some writes in progress
// has_writes == 2 - tried to submit some writes, but failed // has_writes == 2 - tried to submit some writes, but failed
int has_writes = 0; int has_writes = 0, op_idx = 0, new_idx = 0;
while (cur != submit_queue.end()) for (; op_idx < submit_queue.size(); op_idx++)
{ {
auto op_ptr = cur; auto op = submit_queue[op_idx];
auto op = *(cur++); submit_queue[new_idx++] = op;
// FIXME: This needs some simplification // FIXME: This needs some simplification
// Writes should not block reads if the ring is not full and reads don't depend on them // Writes should not block reads if the ring is not full and reads don't depend on them
// In all other cases we should stop submission // In all other cases we should stop submission
@ -131,12 +130,13 @@ void blockstore_impl_t::loop()
} }
unsigned ring_space = ringloop->space_left(); unsigned ring_space = ringloop->space_left();
unsigned prev_sqe_pos = ringloop->save(); unsigned prev_sqe_pos = ringloop->save();
bool dequeue_op = false, cancel_op = false; // 0 = can't submit
bool has_in_progress_sync = false; // 1 = in progress
// 2 = can be removed from queue
int wr_st = 0;
if (op->opcode == BS_OP_READ) if (op->opcode == BS_OP_READ)
{ {
dequeue_op = dequeue_read(op); wr_st = dequeue_read(op);
cancel_op = !dequeue_op;
} }
else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE) else if (op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE)
{ {
@ -145,12 +145,7 @@ void blockstore_impl_t::loop()
// Some writes already could not be submitted // Some writes already could not be submitted
continue; continue;
} }
int wr_st = dequeue_write(op); wr_st = dequeue_write(op);
// 0 = can't submit
// 1 = in progress
// 2 = completed, remove from queue
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
has_writes = wr_st > 0 ? 1 : 2; has_writes = wr_st > 0 ? 1 : 2;
} }
else if (op->opcode == BS_OP_DELETE) else if (op->opcode == BS_OP_DELETE)
@ -160,9 +155,7 @@ void blockstore_impl_t::loop()
// Some writes already could not be submitted // Some writes already could not be submitted
continue; continue;
} }
int wr_st = dequeue_del(op); wr_st = dequeue_del(op);
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
has_writes = wr_st > 0 ? 1 : 2; has_writes = wr_st > 0 ? 1 : 2;
} }
else if (op->opcode == BS_OP_SYNC) else if (op->opcode == BS_OP_SYNC)
@ -176,39 +169,31 @@ void blockstore_impl_t::loop()
// Can't submit SYNC before previous writes // Can't submit SYNC before previous writes
continue; continue;
} }
int wr_st = continue_sync(op, has_in_progress_sync); wr_st = continue_sync(op, false);
dequeue_op = wr_st == 2; if (wr_st != 2)
cancel_op = wr_st == 0;
if (dequeue_op != 2)
{ {
// Or we could just set has_writes=1... has_writes = wr_st > 0 ? 1 : 2;
has_in_progress_sync = true;
} }
} }
else if (op->opcode == BS_OP_STABLE) else if (op->opcode == BS_OP_STABLE)
{ {
int wr_st = dequeue_stable(op); wr_st = dequeue_stable(op);
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
} }
else if (op->opcode == BS_OP_ROLLBACK) else if (op->opcode == BS_OP_ROLLBACK)
{ {
int wr_st = dequeue_rollback(op); wr_st = dequeue_rollback(op);
dequeue_op = wr_st == 2;
cancel_op = wr_st == 0;
} }
else if (op->opcode == BS_OP_LIST) else if (op->opcode == BS_OP_LIST)
{ {
// LIST doesn't need to be blocked by previous modifications // LIST doesn't need to be blocked by previous modifications
process_list(op); process_list(op);
dequeue_op = true; wr_st = 2;
cancel_op = false;
} }
if (dequeue_op) if (wr_st == 2)
{ {
submit_queue.erase(op_ptr); new_idx--;
} }
if (cancel_op) if (wr_st == 0)
{ {
ringloop->restore(prev_sqe_pos); ringloop->restore(prev_sqe_pos);
if (PRIV(op)->wait_for == WAIT_SQE) if (PRIV(op)->wait_for == WAIT_SQE)
@ -219,6 +204,14 @@ void blockstore_impl_t::loop()
} }
} }
} }
if (op_idx != new_idx)
{
while (op_idx < submit_queue.size())
{
submit_queue[new_idx++] = submit_queue[op_idx++];
}
submit_queue.resize(new_idx);
}
if (!readonly) if (!readonly)
{ {
flusher->loop(); flusher->loop();

View File

@ -208,7 +208,7 @@ class blockstore_impl_t
blockstore_clean_db_t clean_db; blockstore_clean_db_t clean_db;
uint8_t *clean_bitmap = NULL; uint8_t *clean_bitmap = NULL;
blockstore_dirty_db_t dirty_db; blockstore_dirty_db_t dirty_db;
std::list<blockstore_op_t*> submit_queue; // FIXME: funny thing is that vector is better here std::vector<blockstore_op_t*> submit_queue;
std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes; std::vector<obj_ver_id> unsynced_big_writes, unsynced_small_writes;
allocator *data_alloc = NULL; allocator *data_alloc = NULL;
uint8_t *zero_object; uint8_t *zero_object;

View File

@ -112,7 +112,7 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
read_op->version = 0; read_op->version = 0;
read_op->retval = read_op->len; read_op->retval = read_op->len;
FINISH_OP(read_op); FINISH_OP(read_op);
return 1; return 2;
} }
uint64_t fulfilled = 0; uint64_t fulfilled = 0;
PRIV(read_op)->pending_ops = 0; PRIV(read_op)->pending_ops = 0;
@ -232,10 +232,10 @@ int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
} }
read_op->retval = read_op->len; read_op->retval = read_op->len;
FINISH_OP(read_op); FINISH_OP(read_op);
return 1; return 2;
} }
read_op->retval = 0; read_op->retval = 0;
return 1; return 2;
} }
void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op) void blockstore_impl_t::handle_read_event(ring_data_t *data, blockstore_op_t *op)