Postpone op callbacks to the end of handle_read(), fix a bug where primary OSD could reply -EPIPE with data to a read operation

Vitaliy Filippov 2020-06-16 01:36:38 +03:00
parent 27ee14a4e6
commit 0f6d193d73
4 changed files with 50 additions and 16 deletions

View File

@ -39,6 +39,13 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
continue_ops(); continue_ops();
} }
}; };
msgr.exec_op = [this](osd_op_t *op)
{
// Garbage in
printf("Incoming garbage from peer %d\n", op->peer_fd);
msgr.stop_client(op->peer_fd);
delete op;
};
st_cli.tfd = tfd; st_cli.tfd = tfd;
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };

View File

@ -179,6 +179,7 @@ struct osd_messenger_t
std::map<int, osd_client_t> clients; std::map<int, osd_client_t> clients;
std::vector<int> read_ready_clients; std::vector<int> read_ready_clients;
std::vector<int> write_ready_clients; std::vector<int> write_ready_clients;
std::vector<std::function<void()>> set_immediate;
// op statistics // op statistics
osd_op_stats_t stats; osd_op_stats_t stats;
@ -207,7 +208,7 @@ protected:
void handle_send(int result, int peer_fd); void handle_send(int result, int peer_fd);
bool handle_read(int result, int peer_fd); bool handle_read(int result, int peer_fd);
void handle_finished_read(osd_client_t & cl); bool handle_finished_read(osd_client_t & cl);
void handle_op_hdr(osd_client_t *cl); void handle_op_hdr(osd_client_t *cl);
void handle_reply_hdr(osd_client_t *cl); bool handle_reply_hdr(osd_client_t *cl);
}; };

View File

@ -33,6 +33,7 @@ void osd_messenger_t::read_requests()
bool osd_messenger_t::handle_read(int result, int peer_fd) bool osd_messenger_t::handle_read(int result, int peer_fd)
{ {
bool ret = false;
auto cl_it = clients.find(peer_fd); auto cl_it = clients.find(peer_fd);
if (cl_it != clients.end()) if (cl_it != clients.end())
{ {
@ -79,7 +80,12 @@ bool osd_messenger_t::handle_read(int result, int peer_fd)
cl.read_buf += remain; cl.read_buf += remain;
remain = 0; remain = 0;
if (cl.read_remaining <= 0) if (cl.read_remaining <= 0)
handle_finished_read(cl); {
if (!handle_finished_read(cl))
{
goto fin;
}
}
} }
else else
{ {
@ -88,7 +94,10 @@ bool osd_messenger_t::handle_read(int result, int peer_fd)
remain -= cl.read_remaining; remain -= cl.read_remaining;
cl.read_remaining = 0; cl.read_remaining = 0;
cl.read_buf = NULL; cl.read_buf = NULL;
handle_finished_read(cl); if (!handle_finished_read(cl))
{
goto fin;
}
} }
} }
} }
@ -104,19 +113,25 @@ bool osd_messenger_t::handle_read(int result, int peer_fd)
} }
if (result >= cl.read_iov.iov_len) if (result >= cl.read_iov.iov_len)
{ {
return true; ret = true;
} }
} }
} }
return false; fin:
for (auto cb: set_immediate)
{
cb();
}
set_immediate.clear();
return ret;
} }
void osd_messenger_t::handle_finished_read(osd_client_t & cl) bool osd_messenger_t::handle_finished_read(osd_client_t & cl)
{ {
if (cl.read_state == CL_READ_HDR) if (cl.read_state == CL_READ_HDR)
{ {
if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) if (cl.read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC)
handle_reply_hdr(&cl); return handle_reply_hdr(&cl);
else else
handle_op_hdr(&cl); handle_op_hdr(&cl);
} }
@ -124,7 +139,7 @@ void osd_messenger_t::handle_finished_read(osd_client_t & cl)
{ {
// Operation is ready // Operation is ready
cl.received_ops.push_back(cl.read_op); cl.received_ops.push_back(cl.read_op);
exec_op(cl.read_op); set_immediate.push_back([this, op = cl.read_op]() { exec_op(op); });
cl.read_op = NULL; cl.read_op = NULL;
cl.read_state = 0; cl.read_state = 0;
} }
@ -151,12 +166,16 @@ void osd_messenger_t::handle_finished_read(osd_client_t & cl)
(tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 + (tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000 (tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000
); );
request->callback(request); set_immediate.push_back([this, request]()
{
std::function<void(osd_op_t*)>(request->callback)(request);
});
} }
else else
{ {
assert(0); assert(0);
} }
return true;
} }
void osd_messenger_t::handle_op_hdr(osd_client_t *cl) void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
@ -205,11 +224,11 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl)
cl->read_op = NULL; cl->read_op = NULL;
cl->read_state = 0; cl->read_state = 0;
cl->received_ops.push_back(cur_op); cl->received_ops.push_back(cur_op);
exec_op(cur_op); set_immediate.push_back([this, cur_op]() { exec_op(cur_op); });
} }
} }
void osd_messenger_t::handle_reply_hdr(osd_client_t *cl) bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
{ {
osd_op_t *cur_op = cl->read_op; osd_op_t *cur_op = cl->read_op;
auto req_it = cl->sent_ops.find(cur_op->req.hdr.id); auto req_it = cl->sent_ops.find(cur_op->req.hdr.id);
@ -218,7 +237,7 @@ void osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
// Command out of sync. Drop connection // Command out of sync. Drop connection
printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cur_op->req.hdr.id); printf("Client %d command out of sync: id %lu\n", cl->peer_fd, cur_op->req.hdr.id);
stop_client(cl->peer_fd); stop_client(cl->peer_fd);
return; return false;
} }
osd_op_t *op = req_it->second; osd_op_t *op = req_it->second;
memcpy(op->reply.buf, cur_op->req.buf, OSD_PACKET_SIZE); memcpy(op->reply.buf, cur_op->req.buf, OSD_PACKET_SIZE);
@ -267,7 +286,11 @@ void osd_messenger_t::handle_reply_hdr(osd_client_t *cl)
(tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 + (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 +
(tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000 (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000
); );
// Copy lambda to be unaffected by `delete op` set_immediate.push_back([this, op]()
std::function<void(osd_op_t*)>(op->callback)(op); {
// Copy lambda to be unaffected by `delete op`
std::function<void(osd_op_t*)>(op->callback)(op);
});
} }
return true;
} }

View File

@ -100,7 +100,6 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
// Fast happy-path // Fast happy-path
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0); cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_minsize, 0);
submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op); submit_primary_subops(SUBMIT_READ, pg.pg_minsize, pg.cur_set.data(), cur_op);
cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len);
op_data->st = 1; op_data->st = 1;
} }
else else
@ -150,6 +149,10 @@ resume_2:
} }
} }
} }
else
{
cur_op->send_list.push_back(cur_op->buf, cur_op->req.rw.len);
}
finish_op(cur_op, cur_op->req.rw.len); finish_op(cur_op, cur_op->req.rw.len);
} }