diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 29c8cd29..f26f1fdb 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -325,7 +325,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op) { // Basic verification not passed op->retval = -EINVAL; - std::function(op->callback)(op); + ringloop->set_immediate([op]() { std::function(op->callback)(op); }); return; } if (op->opcode == BS_OP_SYNC_STAB_ALL) @@ -368,7 +368,7 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op) } if ((op->opcode == BS_OP_WRITE || op->opcode == BS_OP_WRITE_STABLE || op->opcode == BS_OP_DELETE) && !enqueue_write(op)) { - std::function(op->callback)(op); + ringloop->set_immediate([op]() { std::function(op->callback)(op); }); return; } // Call constructor without allocating memory. We'll call destructor before returning op back diff --git a/src/messenger.h b/src/messenger.h index f1800cef..238e5164 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -138,6 +138,7 @@ protected: std::vector read_ready_clients; std::vector write_ready_clients; + // We don't use ringloop->set_immediate here because we may have no ringloop in client :) std::vector> set_immediate; public: diff --git a/src/osd_primary_chain.cpp b/src/osd_primary_chain.cpp index 2ae51fdb..64b061e8 100644 --- a/src/osd_primary_chain.cpp +++ b/src/osd_primary_chain.cpp @@ -297,7 +297,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) // Fail it immediately subop->peer_fd = -1; subop->reply.hdr.retval = -EPIPE; - subop->callback(subop); + ringloop->set_immediate([subop]() { std::function(subop->callback)(subop); }); } subop_idx++; } diff --git a/src/osd_primary_subops.cpp b/src/osd_primary_subops.cpp index bba5d231..0d23eac4 100644 --- a/src/osd_primary_subops.cpp +++ b/src/osd_primary_subops.cpp @@ -235,7 +235,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o // Fail it immediately subop->peer_fd = -1; subop->reply.hdr.retval = -EPIPE; - subop->callback(subop); + ringloop->set_immediate([subop]() { std::function(subop->callback)(subop); }); } } i++; @@ -520,7 +520,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_ // Fail it immediately subops[i].peer_fd = -1; subops[i].reply.hdr.retval = -EPIPE; - subops[i].callback(&subops[i]); + ringloop->set_immediate([subop = &subops[i]]() { std::function(subop->callback)(subop); }); } } } @@ -635,7 +635,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) // Fail it immediately subops[i].peer_fd = -1; subops[i].reply.hdr.retval = -EPIPE; - subops[i].callback(&subops[i]); + ringloop->set_immediate([subop = &subops[i]]() { std::function(subop->callback)(subop); }); } } } diff --git a/src/ringloop.cpp b/src/ringloop.cpp index 13fbbe19..28da77c2 100644 --- a/src/ringloop.cpp +++ b/src/ringloop.cpp @@ -88,6 +88,13 @@ void ring_loop_t::loop() for (int i = 0; i < consumers.size(); i++) { consumers[i]->loop(); + if (immediate_queue.size()) + { + immediate_queue2.swap(immediate_queue); + for (auto & cb: immediate_queue2) + cb(); + immediate_queue2.clear(); + } } } while (loop_again); } diff --git a/src/ringloop.h b/src/ringloop.h index de9343d8..8d9c15c8 100644 --- a/src/ringloop.h +++ b/src/ringloop.h @@ -119,6 +119,7 @@ struct ring_consumer_t class ring_loop_t { + std::vector> immediate_queue, immediate_queue2; std::vector consumers; struct ring_data_t *ring_datas; int *free_ring_data; @@ -143,6 +144,10 @@ public: } return sqe; } + inline void set_immediate(const std::function cb) + { + immediate_queue.push_back(cb); + } inline int submit() { return io_uring_submit(&ring);