From 0f6d193d73a058c33ba143bd1c286a32a255f38f Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 16 Jun 2020 01:36:38 +0300 Subject: [PATCH] Postpone op callbacks to the end of handle_read(), fix a bug where primary OSD could reply -EPIPE with data to a read operation --- cluster_client.cpp | 7 +++++++ messenger.h | 5 +++-- msgr_receive.cpp | 49 ++++++++++++++++++++++++++++++++++------------ osd_primary.cpp | 5 ++++- 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/cluster_client.cpp b/cluster_client.cpp index 266f6856..324bb95d 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -39,6 +39,13 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd continue_ops(); } }; + msgr.exec_op = [this](osd_op_t *op) + { + // Garbage in + printf("Incoming garbage from peer %d\n", op->peer_fd); + msgr.stop_client(op->peer_fd); + delete op; + }; st_cli.tfd = tfd; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; diff --git a/messenger.h b/messenger.h index 723a0dd8..7d790a44 100644 --- a/messenger.h +++ b/messenger.h @@ -179,6 +179,7 @@ struct osd_messenger_t std::map clients; std::vector read_ready_clients; std::vector write_ready_clients; + std::vector> set_immediate; // op statistics osd_op_stats_t stats; @@ -207,7 +208,7 @@ protected: void handle_send(int result, int peer_fd); bool handle_read(int result, int peer_fd); - void handle_finished_read(osd_client_t & cl); + bool handle_finished_read(osd_client_t & cl); void handle_op_hdr(osd_client_t *cl); - void handle_reply_hdr(osd_client_t *cl); + bool handle_reply_hdr(osd_client_t *cl); }; diff --git a/msgr_receive.cpp b/msgr_receive.cpp index 82fdfa70..8ddab772 100644 --- a/msgr_receive.cpp +++ b/msgr_receive.cpp @@ -33,6 +33,7 @@ void osd_messenger_t::read_requests() bool osd_messenger_t::handle_read(int result, int peer_fd) { + bool ret = false; auto cl_it = clients.find(peer_fd); if (cl_it != clients.end()) { @@ -79,7 +80,12 @@ bool osd_messenger_t::handle_read(int result, int peer_fd) cl.read_buf += remain; remain = 0; if (cl.read_remaining <= 0) - handle_finished_read(cl); + { + if (!handle_finished_read(cl)) + { + goto fin; + } + } } else { @@ -88,7 +94,10 @@ bool osd_messenger_t::handle_read(int result, int peer_fd) remain -= cl.read_remaining; cl.read_remaining = 0; cl.read_buf = NULL; - handle_finished_read(cl); + if (!handle_finished_read(cl)) + { + goto fin; + } } } } @@ -104,19 +113,25 @@ bool osd_messenger_t::handle_read(int result, int peer_fd) } if (result >= cl.read_iov.iov_len) { - return true; + ret = true; } } } - return false; +fin: + for (auto cb: set_immediate) + { + cb(); + } + set_immediate.clear(); + return ret; } -void osd_messenger_t::handle_finished_read(osd_client_t & cl) +bool osd_messenger_t::handle_finished_read(osd_client_t & cl) { if (cl.read_state == CL_READ_HDR) { if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) - handle_reply_hdr(&cl); + return handle_reply_hdr(&cl); else handle_op_hdr(&cl); } @@ -124,7 +139,7 @@ void osd_messenger_t::handle_finished_read(osd_client_t & cl) { // Operation is ready cl.received_ops.push_back(cl.read_op); - exec_op(cl.read_op); + set_immediate.push_back([this, op = cl.read_op]() { exec_op(op); }); cl.read_op = NULL; cl.read_state = 0; } @@ -151,12 +166,16 @@ void osd_messenger_t::handle_finished_read(osd_client_t & cl) (tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000 ); - request->callback(request); + set_immediate.push_back([this, request]() + { + std::function(request->callback)(request); + }); } else { assert(0); } + return true; } void osd_messenger_t::handle_op_hdr(osd_client_t *cl) @@ -205,11 +224,11 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl) cl->read_op = NULL; cl->read_state = 0; cl->received_ops.push_back(cur_op); - exec_op(cur_op); + set_immediate.push_back([this, cur_op]() { exec_op(cur_op); }); } } -void osd_messenger_t::handle_reply_hdr(osd_client_t *cl) +bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) { osd_op_t *cur_op = cl->read_op; auto req_it = cl->sent_ops.find(cur_op->req.hdr.id); @@ -218,7 +237,7 @@ void osd_messenger_t::handle_reply_hdr(osd_client_t *cl) // Command out of sync. Drop connection printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cur_op->req.hdr.id); stop_client(cl->peer_fd); - return; + return false; } osd_op_t *op = req_it->second; memcpy(op->reply.buf, cur_op->req.buf, OSD_PACKET_SIZE); @@ -267,7 +286,11 @@ void osd_messenger_t::handle_reply_hdr(osd_client_t *cl) (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000 ); - // Copy lambda to be unaffected by `delete op` - std::function(op->callback)(op); + set_immediate.push_back([this, op]() + { + // Copy lambda to be unaffected by `delete op` + std::function(op->callback)(op); + }); } + return true; } diff --git a/osd_primary.cpp b/osd_primary.cpp index dc8c4021..93928322 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -100,7 +100,6 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) // Fast happy-path cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op); - cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len); op_data->st = 1; } else @@ -150,6 +149,10 @@ resume_2: } } } + else + { + cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len); + } finish_op(cur_op, cur_op->req.rw.len); }