Add set_immediate to ringloop and use it for bs/osd ops to prevent reenterability issues

rdma-v2
Vitaliy Filippov 2023-02-07 01:59:34 +03:00
parent 3d09c9cec7
commit 1a1ba0d1e7
6 changed files with 19 additions and 6 deletions

View File

@ -325,7 +325,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
{ {
// Basic verification not passed // Basic verification not passed
op->retval = -EINVAL; op->retval = -EINVAL;
std::function<void (blockstore_op_t*)>(op->callback)(op); ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return; return;
} }
if (op->opcode == BS_OP_SYNC_STAB_ALL) if (op->opcode == BS_OP_SYNC_STAB_ALL)
@ -368,7 +368,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op)
} }
if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op)) if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op))
{ {
std::function<void (blockstore_op_t*)>(op->callback)(op); ringloop->set_immediate([op]() { std::function<void (blockstore_op_t*)>(op->callback)(op); });
return; return;
} }
// Call constructor without allocating memory. We'll call destructor before returning op back // Call constructor without allocating memory. We'll call destructor before returning op back

View File

@ -138,6 +138,7 @@ protected:
std::vector<int> read_ready_clients; std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients; std::vector<int> write_ready_clients;
// We don't use ringloop->set_immediate here because we may have no ringloop in client :)
std::vector<std::function<void()>> set_immediate; std::vector<std::function<void()>> set_immediate;
public: public:

View File

@ -297,7 +297,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Fail it immediately // Fail it immediately
subop->peer_fd = -1; subop->peer_fd = -1;
subop->reply.hdr.retval = -EPIPE; subop->reply.hdr.retval = -EPIPE;
subop->callback(subop); ringloop->set_immediate([subop]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
subop_idx++; subop_idx++;
} }

View File

@ -235,7 +235,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o
// Fail it immediately // Fail it immediately
subop->peer_fd = -1; subop->peer_fd = -1;
subop->reply.hdr.retval = -EPIPE; subop->reply.hdr.retval = -EPIPE;
subop->callback(subop); ringloop->set_immediate([subop]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
} }
i++; i++;
@ -520,7 +520,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_
// Fail it immediately // Fail it immediately
subops[i].peer_fd = -1; subops[i].peer_fd = -1;
subops[i].reply.hdr.retval = -EPIPE; subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]); ringloop->set_immediate([subop = &subops[i]]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
} }
} }
@ -635,7 +635,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op)
// Fail it immediately // Fail it immediately
subops[i].peer_fd = -1; subops[i].peer_fd = -1;
subops[i].reply.hdr.retval = -EPIPE; subops[i].reply.hdr.retval = -EPIPE;
subops[i].callback(&subops[i]); ringloop->set_immediate([subop = &subops[i]]() { std::function<void(osd_op_t*)>(subop->callback)(subop); });
} }
} }
} }

View File

@ -88,6 +88,13 @@ void ring_loop_t::loop()
for (int i = 0; i < consumers.size(); i++) for (int i = 0; i < consumers.size(); i++)
{ {
consumers[i]->loop(); consumers[i]->loop();
if (immediate_queue.size())
{
immediate_queue2.swap(immediate_queue);
for (auto & cb: immediate_queue2)
cb();
immediate_queue2.clear();
}
} }
} while (loop_again); } while (loop_again);
} }

View File

@ -119,6 +119,7 @@ struct ring_consumer_t
class ring_loop_t class ring_loop_t
{ {
std::vector<std::function<void()>> immediate_queue, immediate_queue2;
std::vector<ring_consumer_t*> consumers; std::vector<ring_consumer_t*> consumers;
struct ring_data_t *ring_datas; struct ring_data_t *ring_datas;
int *free_ring_data; int *free_ring_data;
@ -143,6 +144,10 @@ public:
} }
return sqe; return sqe;
} }
inline void set_immediate(const std::function<void()> cb)
{
immediate_queue.push_back(cb);
}
inline int submit() inline int submit()
{ {
return io_uring_submit(&ring); return io_uring_submit(&ring);