From 5fab6fc5ed196ff0f2a1f5a26cfe9ecad2485c94 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 19 Dec 2021 12:25:47 +0300 Subject: [PATCH] WIP Separate data connections --- src/messenger.cpp | 177 ++++++++++++++++++++++++++++++++---------- src/messenger.h | 9 ++- src/msgr_stop.cpp | 52 +++++++++++-- src/osd_secondary.cpp | 31 ++++++++ 4 files changed, 224 insertions(+), 45 deletions(-) diff --git a/src/messenger.cpp b/src/messenger.cpp index a400b23c..fb268d8a 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -4,10 +4,12 @@ #include #include #include +#include #include #include #include +#include "base64.h" #include "addr_util.h" #include "messenger.h" @@ -194,7 +196,7 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state) try_connect_peer(peer_osd); } -void osd_messenger_t::try_connect_peer(uint64_t peer_osd) +void osd_messenger_t::try_connect_peer(osd_num_t peer_osd) { auto wp_it = wanted_peers.find(peer_osd); if (wp_it == wanted_peers.end() || wp_it->second.connecting || @@ -215,40 +217,75 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd) wp.cur_addr = wp.address_list[wp.address_index].string_value(); wp.cur_port = wp.port; wp.connecting = true; - try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port); + try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port, NULL, [this](osd_num_t peer_osd, int peer_fd) + { + if (peer_fd >= 0) + osd_peer_fds[peer_osd] = peer_fd; + on_connect_peer(peer_osd, peer_fd); + }); } -void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) +static std::string urandom_str(int bytes) +{ + std::string str; + str.resize(bytes); + char *buf = (char*)str.data(); + while (bytes > 0) + { + int r = getrandom(buf, bytes, 0); + if (r < 0) + throw std::runtime_error(std::string("getrandom: ") + strerror(errno)); + buf += r; + bytes -= r; + } + return str; +} + +void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port, + osd_client_t *meta_cl, std::function connect_callback) { assert(peer_osd != this->osd_num); struct sockaddr addr; - if (!string_to_addr(peer_host, 0, peer_port, &addr)) + if (!meta_cl) { - on_connect_peer(peer_osd, -EINVAL); - return; + if (!string_to_addr(peer_host, 0, peer_port, &addr)) + { + connect_callback(peer_osd, -EINVAL); + return; + } + } + else + { + addr = meta_cl->peer_addr; } int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); + if (peer_fd >= 0) + { + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + if (r < 0 && errno != EINPROGRESS) + { + close(peer_fd); + peer_fd = -1; + } + } if (peer_fd < 0) { - on_connect_peer(peer_osd, -errno); - return; - } - fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); - int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); - if (r < 0 && errno != EINPROGRESS) - { - close(peer_fd); - on_connect_peer(peer_osd, -errno); + connect_callback(peer_osd, -errno); return; } clients[peer_fd] = new osd_client_t(); clients[peer_fd]->peer_addr = addr; - clients[peer_fd]->peer_port = peer_port; + clients[peer_fd]->peer_port = ((struct sockaddr_in*)&addr)->sin_port; clients[peer_fd]->peer_fd = peer_fd; clients[peer_fd]->peer_state = PEER_CONNECTING; clients[peer_fd]->connect_timeout_id = -1; + clients[peer_fd]->connect_callback = connect_callback; clients[peer_fd]->osd_num = peer_osd; clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); + clients[peer_fd]->data_for = meta_cl ? addr_to_string(meta_cl->peer_addr) : ""; + clients[peer_fd]->data_connection_cookie = meta_cl + ? meta_cl->data_connection_cookie : base64_encode(urandom_str(12)); tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) { // Either OUT (connected) or HUP @@ -258,10 +295,12 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer { clients[peer_fd]->connect_timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) { - osd_num_t peer_osd = clients.at(peer_fd)->osd_num; + auto cl = clients.at(peer_fd); + auto connect_callback = cl->connect_callback; + cl->connect_callback = NULL; + osd_num_t peer_osd = cl->osd_num; stop_client(peer_fd, true); - on_connect_peer(peer_osd, -EPIPE); - return; + connect_callback(peer_osd, -EPIPE); }); } } @@ -283,8 +322,10 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd) } if (result != 0) { + auto connect_callback = cl->connect_callback; + cl->connect_callback = NULL; stop_client(peer_fd, true); - on_connect_peer(peer_osd, -result); + connect_callback(peer_osd, -result); return; } int one = 1; @@ -364,6 +405,11 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) void osd_messenger_t::check_peer_config(osd_client_t *cl) { + json11::Json::object payload; + if (cl->data_connection_cookie != "") + { + payload["data_cookie"] = cl->data_connection_cookie; + } osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; op->peer_fd = cl->peer_fd; @@ -376,24 +422,33 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) }, }, }; -#ifdef WITH_RDMA - if (rdma_context) + if (cl->data_for == "") { - cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); - if (cl->rdma_conn) +#ifdef WITH_RDMA + if (rdma_context) { - json11::Json payload = json11::Json::object { - { "connect_rdma", cl->rdma_conn->addr.to_string() }, - { "rdma_max_msg", cl->rdma_conn->max_msg }, - }; - std::string payload_str = payload.dump(); - op->req.show_conf.json_len = payload_str.size(); - op->buf = malloc_or_die(payload_str.size()); - op->iov.push_back(op->buf, payload_str.size()); - memcpy(op->buf, payload_str.c_str(), payload_str.size()); + cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); + if (cl->rdma_conn) + { + payload["connect_rdma"] = cl->rdma_conn->addr.to_string(); + payload["rdma_max_msg"] = cl->rdma_conn->max_msg; + } } - } #endif + } + else + { + // Mark it as a data connection + payload["data_for"] = cl->data_for; + } + if (payload.size()) + { + std::string payload_str = json11::Json(payload).dump(); + op->req.show_conf.json_len = payload_str.size(); + op->buf = malloc_or_die(payload_str.size()); + op->iov.push_back(op->buf, payload_str.size()); + memcpy(op->buf, payload_str.c_str(), payload_str.size()); + } op->callback = [this, cl](osd_op_t *op) { std::string json_err; @@ -426,18 +481,30 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) cl->osd_num, config["protocol_version"].uint64_value(), OSD_PROTOCOL_VERSION ); } + else if (cl->data_for != "" && config["data_for"] != cl->data_for) + { + err = true; + fprintf( + stderr, "OSD %lu does not support separate data connections." + " Proceeding with a single connection\n", cl->osd_num + ); + } } if (err) { osd_num_t peer_osd = cl->osd_num; + auto connect_callback = cl->connect_callback; + cl->connect_callback = NULL; stop_client(op->peer_fd); - on_connect_peer(peer_osd, -1); + connect_callback(peer_osd, -EINVAL); delete op; return; } #ifdef WITH_RDMA - if (config["rdma_address"].is_string()) + if (rdma_context && cl->rdma_conn && config["rdma_address"].is_string()) { + // Prevent creating data connection - we are trying RDMA + cl->data_connection_cookie = ""; msgr_rdma_address_t addr; if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) || cl->rdma_conn->connect(&addr) != 0) @@ -450,8 +517,10 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) cl->rdma_conn = NULL; // FIXME: Keep TCP connection in this case osd_num_t peer_osd = cl->osd_num; + auto connect_callback = cl->connect_callback; + cl->connect_callback = NULL; stop_client(cl->peer_fd); - on_connect_peer(peer_osd, -1); + connect_callback(peer_osd, -EPIPE); delete op; return; } @@ -473,8 +542,37 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) } } #endif - osd_peer_fds[cl->osd_num] = cl->peer_fd; - on_connect_peer(cl->osd_num, cl->peer_fd); + if (cl->data_connection_cookie != "") + { + // Try to open second connection to the same address + try_connect_peer_addr(cl->osd_num, NULL, 0, cl, [this, peer_fd = cl->peer_fd](osd_num_t data_peer, int data_peer_fd) + { + auto cl_it = clients.find(peer_fd); + if (cl_it != clients.end()) + { + // Proceed with or without the data connection + auto cl = cl_it->second; + if (data_peer_fd >= 0) + { + cl->data_connection_fd = data_peer_fd; + auto data_cl = clients.at(data_peer_fd); + data_cl->meta_connection_fd = cl->peer_fd; + } + osd_peer_fds[cl->osd_num] = cl->peer_fd; + on_connect_peer(cl->osd_num, cl->peer_fd); + } + else if (data_peer_fd >= 0) + { + stop_client(data_peer_fd); + } + }); + } + else + { + auto connect_callback = cl->connect_callback; + cl->connect_callback = NULL; + connect_callback(cl->osd_num, cl->peer_fd); + } delete op; }; outbox_push(op); @@ -500,6 +598,7 @@ void osd_messenger_t::accept_connections(int listen_fd) clients[peer_fd]->peer_fd = peer_fd; clients[peer_fd]->peer_state = PEER_CONNECTED; clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); + clients_by_addr[addr_to_string(addr)] = peer_fd; // Add FD to epoll tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) { diff --git a/src/messenger.h b/src/messenger.h index 6b59325b..75ce1b63 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -57,6 +57,10 @@ struct osd_client_t int ping_time_remaining = 0; int idle_time_remaining = 0; osd_num_t osd_num = 0; + std::function connect_callback; + + int data_connection_fd = -1, meta_connection_fd = -1; + std::string data_connection_cookie, data_for; void *in_buf = NULL; @@ -148,6 +152,7 @@ public: osd_num_t osd_num; uint64_t next_subop_id = 1; std::map clients; + std::map clients_by_addr; std::map wanted_peers; std::map osd_peer_fds; // op statistics @@ -157,6 +162,7 @@ public: void parse_config(const json11::Json & config); void connect_peer(uint64_t osd_num, json11::Json peer_state); void stop_client(int peer_fd, bool force = false, bool force_delete = false); + void break_data_client_pair(osd_client_t *cl); void outbox_push(osd_op_t *cur_op); std::function exec_op; std::function repeer_pgs; @@ -174,7 +180,8 @@ public: protected: void try_connect_peer(uint64_t osd_num); - void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); + void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port, + osd_client_t *meta_cl, std::function connect_callback); void handle_peer_epoll(int peer_fd, int epoll_events); void handle_connect_epoll(int peer_fd); void on_connect_peer(osd_num_t peer_osd, int peer_fd); diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index 29776666..27e0f868 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -4,6 +4,7 @@ #include #include +#include "addr_util.h" #include "messenger.h" void osd_messenger_t::cancel_osd_ops(osd_client_t *cl) @@ -58,7 +59,8 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete) { if (cl->osd_num) { - fprintf(stderr, "[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); + fprintf(stderr, "[OSD %lu] Stopping client %d (OSD %speer %lu)\n", + osd_num, peer_fd, cl->meta_connection_fd >= 0 ? " data" : "", cl->osd_num); } else { @@ -68,7 +70,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete) // First set state to STOPPED so another stop_client() call doesn't try to free it again cl->refs++; cl->peer_state = PEER_STOPPED; - if (cl->osd_num) + if (cl->osd_num && cl->meta_connection_fd < 0) { // ...and forget OSD peer osd_peer_fds.erase(cl->osd_num); @@ -100,9 +102,17 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete) #endif if (cl->osd_num) { - // Then repeer PGs because cancel_op() callbacks can try to perform - // some actions and we need correct PG states to not do something silly - repeer_pgs(cl->osd_num); + if (cl->meta_connection_fd < 0) + { + // Then repeer PGs because cancel_op() callbacks can try to perform + // some actions and we need correct PG states to not do something silly + repeer_pgs(cl->osd_num); + } + else + { + // FIXME Try to re-establish data connection + // Only when the connection is outbound, but here it's always outbound + } } // Then cancel all operations if (cl->read_op) @@ -128,6 +138,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete) delete cl->rdma_conn; } #endif + clients_by_addr.erase(addr_to_string(cl->peer_addr)); #endif // Find the item again because it can be invalidated at this point it = clients.find(peer_fd); @@ -135,9 +146,40 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete) { clients.erase(it); } + // Break metadata/data connection pair + if (cl->data_connection_fd >= 0) + { + // No sense to keep data connection when metadata connection is stopped + auto dc_it = clients.find(cl->data_connection_fd); + cl->data_connection_fd = -1; + if (dc_it != clients.end() && dc_it->second->meta_connection_fd == cl->peer_fd) + { + stop_client(dc_it->second->peer_fd); + } + } + break_data_client_pair(cl); + // Refcount and delete cl->refs--; if (cl->refs <= 0 || force_delete) { delete cl; } } + +void osd_messenger_t::break_data_client_pair(osd_client_t *cl) +{ + if (cl->meta_connection_fd >= 0) + { + auto dc_it = clients.find(cl->meta_connection_fd); + if (dc_it != clients.end() && dc_it->second->data_connection_fd == cl->peer_fd) + dc_it->second->data_connection_fd = -1; + cl->meta_connection_fd = -1; + } + if (cl->data_connection_fd >= 0) + { + auto dc_it = clients.find(cl->data_connection_fd); + if (dc_it != clients.end() && dc_it->second->meta_connection_fd == cl->peer_fd) + dc_it->second->meta_connection_fd = -1; + cl->data_connection_fd = -1; + } +} diff --git a/src/osd_secondary.cpp b/src/osd_secondary.cpp index 3487bedc..2b3b3c76 100644 --- a/src/osd_secondary.cpp +++ b/src/osd_secondary.cpp @@ -178,6 +178,37 @@ void osd_t::exec_show_config(osd_op_t *cur_op) } } } + else + { +#endif + if (req_json["data_for"].is_string()) + { + auto cli = msgr.clients.at(cur_op->peer_fd); + auto md_it = msgr.clients_by_addr.find(req_json["data_for"].string_value()); + if (md_it != msgr.clients_by_addr.end()) + { + int md_peer_fd = md_it->second; + auto md_it = msgr.clients.find(md_peer_fd); + if (md_it != msgr.clients.end() && md_it->second->data_connection_cookie != "" && + req_json["data_cookie"].string_value() == md_it->second->data_connection_cookie) + { + // Break previous metadata/data connections for both FDs, if present + msgr.break_data_client_pair(cli); + msgr.break_data_client_pair(md_it->second); + // And setup the new pair + cli->meta_connection_fd = md_it->second->peer_fd; + md_it->second->data_connection_fd = cli->peer_fd; + wire_config["data_for"] = req_json["data_for"]; + } + } + } + else if (req_json["data_cookie"].is_string()) + { + auto cli = msgr.clients.at(cur_op->peer_fd); + cli->data_connection_cookie = req_json["data_cookie"].string_value(); + } +#ifdef WITH_RDMA + } #endif if (cur_op->buf) free(cur_op->buf);