From ce777319c3ab438bf3f113c0879002a2382e10a0 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 16 Apr 2021 00:18:15 +0300 Subject: [PATCH] WIP RDMA support Basic naive implementation works, but it's highly non-optimal as RNR retransmissions occur all the time. RDMA expects the receiver to always have place for incoming WRs... --- src/CMakeLists.txt | 11 + src/cluster_client.cpp | 1 + src/fio_cluster.cpp | 15 ++ src/messenger.cpp | 109 +++++++- src/messenger.h | 44 ++- src/msgr_rdma.cpp | 587 +++++++++++++++++++++++++++++++++++++++++ src/msgr_rdma.h | 53 ++++ src/msgr_receive.cpp | 12 +- src/msgr_send.cpp | 39 ++- src/msgr_stop.cpp | 6 + src/osd_ops.h | 2 + src/osd_secondary.cpp | 23 ++ 12 files changed, 884 insertions(+), 18 deletions(-) create mode 100644 src/msgr_rdma.cpp create mode 100644 src/msgr_rdma.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8531a144..b0720a9c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -37,11 +37,16 @@ string(REGEX REPLACE "([\\/\\-]D) *NDEBUG" "" CMAKE_C_FLAGS_RELWITHDEBINFO "${CM find_package(PkgConfig) pkg_check_modules(LIBURING REQUIRED liburing) pkg_check_modules(GLIB REQUIRED glib-2.0) +pkg_check_modules(IBVERBS libibverbs) +if (IBVERBS_LIBRARIES) + add_definitions(-DWITH_RDMA) +endif (IBVERBS_LIBRARIES) include_directories( ../ /usr/include/jerasure ${LIBURING_INCLUDE_DIRS} + ${IBVERBS_INCLUDE_DIRS} ) # libvitastor_blk.so @@ -72,6 +77,9 @@ add_library(vitastor_common STATIC messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ) +if (IBVERBS_LIBRARIES) + target_sources(vitastor_common PRIVATE msgr_rdma.cpp) +endif (IBVERBS_LIBRARIES) target_compile_options(vitastor_common PUBLIC -fPIC) # vitastor-osd @@ -84,6 +92,7 @@ target_link_libraries(vitastor-osd vitastor_common vitastor_blk Jerasure + ${IBVERBS_LIBRARIES} ) # libfio_vitastor_sec.so @@ -103,6 +112,7 @@ target_link_libraries(vitastor_client vitastor_common tcmalloc_minimal ${LIBURING_LIBRARIES} + ${IBVERBS_LIBRARIES} ) set_target_properties(vitastor_client PROPERTIES VERSION ${VERSION} SOVERSION 0) @@ -178,6 +188,7 @@ add_executable(stub_uring_osd target_link_libraries(stub_uring_osd vitastor_common ${LIBURING_LIBRARIES} + ${IBVERBS_LIBRARIES} tcmalloc_minimal ) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index dea96649..260d65ea 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -53,6 +53,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd msgr.stop_client(op->peer_fd); delete op; }; + msgr.parse_config(this->config); msgr.init(); st_cli.tfd = tfd; diff --git a/src/fio_cluster.cpp b/src/fio_cluster.cpp index e7650f53..c6bc05c8 100644 --- a/src/fio_cluster.cpp +++ b/src/fio_cluster.cpp @@ -53,6 +53,10 @@ struct sec_options uint64_t inode = 0; int cluster_log = 0; int trace = 0; + int use_rdma = 0; + int rdma_port_num = 0; + int rdma_gid_index = 0; + int rdma_mtu = 0; }; static struct fio_option options[] = { @@ -121,6 +125,16 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_FILENAME, }, + { + .name = "use_rdma", + .lname = "OSD trace", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct sec_options, use_rdma), + .help = "Use RDMA", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = NULL, }, @@ -156,6 +170,7 @@ static int sec_setup(struct thread_data *td) { "etcd_address", std::string(o->etcd_host) }, { "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") }, { "log_level", o->cluster_log }, + { "use_rdma", o->use_rdma }, }; if (!o->image) diff --git a/src/messenger.cpp b/src/messenger.cpp index 572bcf98..d76c70eb 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -12,6 +12,29 @@ void osd_messenger_t::init() { +#ifdef WITH_RDMA + if (use_rdma) + { + rdma_context = msgr_rdma_context_t::create( + rdma_device != "" ? rdma_device.c_str() : NULL, + rdma_port_num, rdma_gid_index, rdma_mtu + ); + if (!rdma_context) + { + printf("[OSD %lu] Couldn't initialize RDMA, proceeding with TCP only\n", osd_num); + } + else + { + printf("[OSD %lu] RDMA initialized successfully\n", osd_num); + fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK); + tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events) + { + handle_rdma_events(); + }); + handle_rdma_events(); + } + } +#endif keepalive_timer_id = tfd->set_timer(1000, true, [this](int) { std::vector to_stop; @@ -19,7 +42,7 @@ void osd_messenger_t::init() for (auto cl_it = clients.begin(); cl_it != clients.end(); cl_it++) { auto cl = cl_it->second; - if (!cl->osd_num || cl->peer_state != PEER_CONNECTED) + if (!cl->osd_num || cl->peer_state != PEER_CONNECTED && cl->peer_state != PEER_RDMA) { // Do not run keepalive on regular clients continue; @@ -94,10 +117,29 @@ osd_messenger_t::~osd_messenger_t() { stop_client(clients.begin()->first, true); } +#ifdef WITH_RDMA + if (rdma_context) + { + delete rdma_context; + } +#endif } void osd_messenger_t::parse_config(const json11::Json & config) { +#ifdef WITH_RDMA + if (!config["use_rdma"].is_null()) + this->use_rdma = config["use_rdma"].bool_value() || config["use_rdma"].uint64_value() != 0; + this->rdma_device = config["rdma_device"].string_value(); + this->rdma_port_num = (uint8_t)config["rdma_port_num"].uint64_value(); + if (!this->rdma_port_num) + this->rdma_port_num = 1; + this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value(); + this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value(); +#endif + this->bs_bitmap_granularity = strtoull(config["bitmap_granularity"].string_value().c_str(), NULL, 10); + if (!this->bs_bitmap_granularity) + this->bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() || config["use_sync_send_recv"].uint64_value(); this->peer_connect_interval = config["peer_connect_interval"].uint64_value(); @@ -314,6 +356,9 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) void osd_messenger_t::check_peer_config(osd_client_t *cl) { +#ifdef WITH_RDMA + msgr_rdma_connection_t *rdma_conn = NULL; +#endif osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; op->peer_fd = cl->peer_fd; @@ -326,7 +371,28 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) }, }, }; - op->callback = [this, cl](osd_op_t *op) +#ifdef WITH_RDMA + if (rdma_context) + { + cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, max_rdma_send, max_rdma_recv, max_rdma_sge); + if (cl->rdma_conn) + { + json11::Json payload = json11::Json::object { + { "connect_rdma", cl->rdma_conn->addr.to_string() }, + }; + std::string payload_str = payload.dump(); + op->req.show_conf.json_len = payload_str.size(); + op->buf = malloc_or_die(payload_str.size()); + op->iov.push_back(op->buf, payload_str.size()); + memcpy(op->buf, payload_str.c_str(), payload_str.size()); + } + } +#endif + op->callback = [this, cl +#ifdef WITH_RDMA + , rdma_conn +#endif + ](osd_op_t *op) { std::string json_err; json11::Json config; @@ -361,12 +427,42 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) } if (err) { - osd_num_t osd_num = cl->osd_num; + osd_num_t peer_osd = cl->osd_num; stop_client(op->peer_fd); - on_connect_peer(osd_num, -1); + on_connect_peer(peer_osd, -1); delete op; return; } +#ifdef WITH_RDMA + if (config["rdma_address"].is_string()) + { + msgr_rdma_address_t addr; + if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) || + cl->rdma_conn->connect(&addr) != 0) + { + printf( + "Failed to connect to OSD %lu (address %s) using RDMA\n", + cl->osd_num, config["rdma_address"].string_value().c_str() + ); + delete cl->rdma_conn; + cl->rdma_conn = NULL; + // FIXME: Keep TCP connection in this case + osd_num_t peer_osd = cl->osd_num; + stop_client(cl->peer_fd); + on_connect_peer(peer_osd, -1); + delete op; + return; + } + else + { + printf("Connected to OSD %lu using RDMA\n", cl->osd_num); + cl->peer_state = PEER_RDMA; + tfd->set_fd_handler(cl->peer_fd, false, NULL); + // Add the initial receive request + try_recv_rdma(cl); + } + } +#endif osd_peer_fds[cl->osd_num] = cl->peer_fd; on_connect_peer(cl->osd_num, cl->peer_fd); delete op; @@ -408,3 +504,8 @@ void osd_messenger_t::accept_connections(int listen_fd) throw std::runtime_error(std::string("accept: ") + strerror(errno)); } } + +bool osd_messenger_t::is_rdma_enabled() +{ + return rdma_context != NULL; +} diff --git a/src/messenger.h b/src/messenger.h index 34466d4f..6b312159 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -18,21 +18,35 @@ #include "timerfd_manager.h" #include +#ifdef WITH_RDMA +#include "msgr_rdma.h" +#endif + #define CL_READ_HDR 1 #define CL_READ_DATA 2 #define CL_READ_REPLY_DATA 3 #define CL_WRITE_READY 1 -#define CL_WRITE_REPLY 2 #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 -#define PEER_STOPPED 3 +#define PEER_RDMA_CONNECTING 3 +#define PEER_RDMA 4 +#define PEER_STOPPED 5 #define DEFAULT_PEER_CONNECT_INTERVAL 5 #define DEFAULT_PEER_CONNECT_TIMEOUT 5 #define DEFAULT_OSD_PING_TIMEOUT 5 #define DEFAULT_BITMAP_GRANULARITY 4096 +#define MSGR_SENDP_HDR 1 +#define MSGR_SENDP_FREE 2 + +struct msgr_sendp_t +{ + osd_op_t *op; + int flags; +}; + struct osd_client_t { int refs = 0; @@ -48,6 +62,10 @@ struct osd_client_t void *in_buf = NULL; +#ifdef WITH_RDMA + msgr_rdma_connection_t *rdma_conn = NULL; +#endif + // Read state int read_ready = 0; osd_op_t *read_op = NULL; @@ -70,7 +88,7 @@ struct osd_client_t msghdr write_msg = { 0 }; int write_state = 0; std::vector send_list, next_send_list; - std::vector outbox, next_outbox; + std::vector outbox, next_outbox; ~osd_client_t() { @@ -110,9 +128,18 @@ protected: int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT; int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT; + uint32_t bs_bitmap_granularity = 0; int log_level = 0; bool use_sync_send_recv = false; +#ifdef WITH_RDMA + bool use_rdma = true; + std::string rdma_device; + uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0; + msgr_rdma_context_t *rdma_context = NULL; + int max_rdma_sge = 128, max_rdma_send = 32, max_rdma_recv = 32; +#endif + std::vector read_ready_clients; std::vector write_ready_clients; std::vector> set_immediate; @@ -141,6 +168,11 @@ public: void accept_connections(int listen_fd); ~osd_messenger_t(); +#ifdef WITH_RDMA + bool is_rdma_enabled(); + bool connect_rdma(int peer_fd, std::string rdma_address); +#endif + protected: void try_connect_peer(uint64_t osd_num); void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); @@ -160,4 +192,10 @@ protected: void handle_op_hdr(osd_client_t *cl); bool handle_reply_hdr(osd_client_t *cl); void handle_reply_ready(osd_op_t *op); + +#ifdef WITH_RDMA + bool try_send_rdma(osd_client_t *cl); + bool try_recv_rdma(osd_client_t *cl); + void handle_rdma_events(); +#endif }; diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp new file mode 100644 index 00000000..bcc48ffb --- /dev/null +++ b/src/msgr_rdma.cpp @@ -0,0 +1,587 @@ +#include +#include +#include "msgr_rdma.h" +#include "messenger.h" + +std::string msgr_rdma_address_t::to_string() +{ + char msg[sizeof "0000:00000000:00000000:00000000000000000000000000000000"]; + sprintf( + msg, "%04x:%06x:%06x:%016lx%016lx", lid, qpn, psn, + htobe64(((uint64_t*)&gid)[0]), htobe64(((uint64_t*)&gid)[1]) + ); + return std::string(msg); +} + +bool msgr_rdma_address_t::from_string(const char *str, msgr_rdma_address_t *dest) +{ + uint64_t* gid = (uint64_t*)&dest->gid; + int n = sscanf( + str, "%hx:%x:%x:%16lx%16lx", &dest->lid, &dest->qpn, &dest->psn, gid, gid+1 + ); + gid[0] = be64toh(gid[0]); + gid[1] = be64toh(gid[1]); + return n == 5; +} + +msgr_rdma_context_t::~msgr_rdma_context_t() +{ + if (cq) + ibv_destroy_cq(cq); + if (channel) + ibv_destroy_comp_channel(channel); + if (mr) + ibv_dereg_mr(mr); + if (pd) + ibv_dealloc_pd(pd); + if (context) + ibv_close_device(context); +} + +msgr_rdma_connection_t::~msgr_rdma_connection_t() +{ + ctx->used_max_cqe -= max_send+max_recv; + if (qp) + ibv_destroy_qp(qp); +} + +msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu) +{ + int res; + ibv_device **dev_list = NULL; + msgr_rdma_context_t *ctx = new msgr_rdma_context_t(); + ctx->mtu = mtu; + + dev_list = ibv_get_device_list(NULL); + if (!dev_list) + { + fprintf(stderr, "Failed to get RDMA device list: %s\n", strerror(errno)); + goto cleanup; + } + if (!ib_devname) + { + ctx->dev = *dev_list; + if (!ctx->dev) + { + fprintf(stderr, "No RDMA devices found\n"); + goto cleanup; + } + } + else + { + int i; + for (i = 0; dev_list[i]; ++i) + if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname)) + break; + ctx->dev = dev_list[i]; + if (!ctx->dev) + { + fprintf(stderr, "RDMA device %s not found\n", ib_devname); + goto cleanup; + } + } + + ctx->context = ibv_open_device(ctx->dev); + if (!ctx->context) + { + fprintf(stderr, "Couldn't get RDMA context for %s\n", ibv_get_device_name(ctx->dev)); + goto cleanup; + } + + ctx->ib_port = ib_port; + ctx->gid_index = gid_index; + if ((res = ibv_query_port(ctx->context, ib_port, &ctx->portinfo)) != 0) + { + fprintf(stderr, "Couldn't get RDMA device %s port %d info: %s\n", ibv_get_device_name(ctx->dev), ib_port, strerror(res)); + goto cleanup; + } + ctx->my_lid = ctx->portinfo.lid; + if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx->my_lid) + { + fprintf(stderr, "RDMA device %s must have local LID because it's not Ethernet, but LID is zero\n", ibv_get_device_name(ctx->dev)); + goto cleanup; + } + if (ibv_query_gid(ctx->context, ib_port, gid_index, &ctx->my_gid)) + { + fprintf(stderr, "Couldn't read RDMA device %s GID index %d\n", ibv_get_device_name(ctx->dev), gid_index); + goto cleanup; + } + + ctx->pd = ibv_alloc_pd(ctx->context); + if (!ctx->pd) + { + fprintf(stderr, "Couldn't allocate RDMA protection domain\n"); + goto cleanup; + } + + { + if (ibv_query_device_ex(ctx->context, NULL, &ctx->attrx)) + { + fprintf(stderr, "Couldn't query RDMA device for its features\n"); + goto cleanup; + } + if (!(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT) || + !(ctx->attrx.odp_caps.general_caps & IBV_ODP_SUPPORT_IMPLICIT) || + !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_SEND) || + !(ctx->attrx.odp_caps.per_transport_caps.rc_odp_caps & IBV_ODP_SUPPORT_RECV)) + { + fprintf(stderr, "The RDMA device isn't implicit ODP (On-Demand Paging) capable or does not support RC send and receive with ODP\n"); + goto cleanup; + } + } + + ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); + if (!ctx->mr) + { + fprintf(stderr, "Couldn't register RDMA memory region\n"); + goto cleanup; + } + + ctx->channel = ibv_create_comp_channel(ctx->context); + if (!ctx->channel) + { + fprintf(stderr, "Couldn't create RDMA completion channel\n"); + goto cleanup; + } + + ctx->max_cqe = 4096; + ctx->cq = ibv_create_cq(ctx->context, ctx->max_cqe, NULL, ctx->channel, 0); + if (!ctx->cq) + { + fprintf(stderr, "Couldn't create RDMA completion queue\n"); + goto cleanup; + } + + if (dev_list) + ibv_free_device_list(dev_list); + return ctx; + +cleanup: + delete ctx; + if (dev_list) + ibv_free_device_list(dev_list); + return NULL; +} + +msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge) +{ + msgr_rdma_connection_t *conn = new msgr_rdma_connection_t; + + max_sge = max_sge > ctx->attrx.orig_attr.max_sge ? ctx->attrx.orig_attr.max_sge : max_sge; + + conn->ctx = ctx; + conn->max_send = max_send; + conn->max_recv = max_recv; + conn->max_sge = max_sge; + + ctx->used_max_cqe += max_send+max_recv; + if (ctx->used_max_cqe > ctx->max_cqe) + { + // Resize CQ + // Mellanox ConnectX-4 supports up to 4194303 CQEs, so it's fine to put everything into a single CQ + int new_max_cqe = ctx->max_cqe; + while (ctx->used_max_cqe > new_max_cqe) + { + new_max_cqe *= 2; + } + if (ibv_resize_cq(ctx->cq, new_max_cqe) != 0) + { + fprintf(stderr, "Couldn't resize RDMA completion queue to %d entries\n", new_max_cqe); + delete conn; + return NULL; + } + ctx->max_cqe = new_max_cqe; + } + + ibv_qp_init_attr init_attr = { + .send_cq = ctx->cq, + .recv_cq = ctx->cq, + .cap = { + .max_send_wr = max_send, + .max_recv_wr = max_recv, + .max_send_sge = max_sge, + .max_recv_sge = max_sge, + }, + .qp_type = IBV_QPT_RC, + }; + conn->qp = ibv_create_qp(ctx->pd, &init_attr); + if (!conn->qp) + { + fprintf(stderr, "Couldn't create RDMA queue pair\n"); + delete conn; + return NULL; + } + + conn->addr.lid = ctx->my_lid; + conn->addr.gid = ctx->my_gid; + conn->addr.qpn = conn->qp->qp_num; + conn->addr.psn = lrand48() & 0xffffff; + + ibv_qp_attr attr = { + .qp_state = IBV_QPS_INIT, + .qp_access_flags = 0, + .pkey_index = 0, + .port_num = ctx->ib_port, + }; + + if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) + { + fprintf(stderr, "Failed to switch RDMA queue pair to INIT state\n"); + delete conn; + return NULL; + } + + return conn; +} + +static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu) +{ + switch (mtu) + { + case 256: return IBV_MTU_256; + case 512: return IBV_MTU_512; + case 1024: return IBV_MTU_1024; + case 2048: return IBV_MTU_2048; + case 4096: return IBV_MTU_4096; + } + return IBV_MTU_4096; +} + +int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) +{ + auto conn = this; + ibv_qp_attr attr = { + .qp_state = IBV_QPS_RTR, + .path_mtu = mtu_to_ibv_mtu(conn->ctx->mtu), + .rq_psn = dest->psn, + .sq_psn = conn->addr.psn, + .dest_qp_num = dest->qpn, + .ah_attr = { + .grh = { + .dgid = dest->gid, + .sgid_index = conn->ctx->gid_index, + .hop_limit = 1, // FIXME can it vary? + }, + .dlid = dest->lid, + .sl = 0, // service level + .src_path_bits = 0, + .is_global = (uint8_t)(dest->gid.global.interface_id ? 1 : 0), + .port_num = conn->ctx->ib_port, + }, + .max_rd_atomic = 1, + .max_dest_rd_atomic = 1, + // Timeout and min_rnr_timer actual values seem to be 4.096us*2^(timeout+1) + .min_rnr_timer = 1, + .timeout = 14, + .retry_cnt = 7, + .rnr_retry = 7, + }; + // FIXME No idea if ibv_modify_qp is a blocking operation or not. No idea if it has a timeout and what it is. + if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) + { + fprintf(stderr, "Failed to switch RDMA queue pair to RTR (ready-to-receive) state\n"); + return 1; + } + attr.qp_state = IBV_QPS_RTS; + if (ibv_modify_qp(conn->qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC)) + { + fprintf(stderr, "Failed to switch RDMA queue pair to RTS (ready-to-send) state\n"); + return 1; + } + return 0; +} + +bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address) +{ + // Try to connect to the peer using RDMA + msgr_rdma_address_t addr; + if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr)) + { + auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, max_rdma_send, max_rdma_recv, max_rdma_sge); + if (rdma_conn) + { + int r = rdma_conn->connect(&addr); + if (r != 0) + { + delete rdma_conn; + printf( + "Failed to connect RDMA queue pair to %s (client %d)\n", + addr.to_string().c_str(), peer_fd + ); + } + else + { + // Remember connection, but switch to RDMA only after sending the configuration response + auto cl = clients.at(peer_fd); + cl->rdma_conn = rdma_conn; + cl->peer_state = PEER_RDMA_CONNECTING; + return true; + } + } + } + return false; +} + +static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) +{ + ibv_send_wr *bad_wr = NULL; + ibv_send_wr wr = { + .wr_id = (uint64_t)(cl->peer_fd*2+1), + .sg_list = sge, + .num_sge = op_sge, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, + }; + int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); + if (err || bad_wr) + { + printf("RDMA send failed: %s\n", strerror(err)); + exit(1); + } + cl->rdma_conn->cur_send++; +} + +bool osd_messenger_t::try_send_rdma(osd_client_t *cl) +{ + auto rc = cl->rdma_conn; + if (!cl->send_list.size() || rc->cur_send > 0) + { + // Only send one batch at a time + return true; + } + int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; + // FIXME: rc->max_sge should be negotiated between client & server + ibv_sge sge[rc->max_sge]; + while (rc->send_pos < cl->send_list.size()) + { + iovec & iov = cl->send_list[rc->send_pos]; + if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR) + { + if (op_sge > 0) + { + try_send_rdma_wr(cl, sge, op_sge); + op_sge = 0; + op_size = 0; + if (rc->cur_send >= rc->max_send) + break; + } + assert(rc->send_buf_pos == 0); + sge[0] = { + .addr = (uintptr_t)iov.iov_base, + .length = (uint32_t)iov.iov_len, + .lkey = rc->ctx->mr->lkey, + }; + try_send_rdma_wr(cl, sge, 1); + rc->send_pos++; + if (rc->cur_send >= rc->max_send) + break; + } + else + { + if (op_size >= op_max || op_sge >= rc->max_sge) + { + try_send_rdma_wr(cl, sge, op_sge); + op_sge = 0; + op_size = 0; + if (rc->cur_send >= rc->max_send) + break; + } + // Fragment all messages into parts no longer than (max_sge*4k) = 120k on ConnectX-4 + // Otherwise the client may not be able to receive them in small parts + uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < op_max ? iov.iov_len-rc->send_buf_pos : op_max-op_size); + sge[op_sge++] = { + .addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos), + .length = len, + .lkey = rc->ctx->mr->lkey, + }; + op_size += len; + rc->send_buf_pos += len; + if (rc->send_buf_pos >= iov.iov_len) + { + rc->send_pos++; + rc->send_buf_pos = 0; + } + } + } + if (op_sge > 0) + { + try_send_rdma_wr(cl, sge, op_sge); + } + return true; +} + +static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) +{ + ibv_recv_wr *bad_wr = NULL; + ibv_recv_wr wr = { + .wr_id = (uint64_t)(cl->peer_fd*2), + .sg_list = sge, + .num_sge = op_sge, + }; + int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr); + if (err || bad_wr) + { + printf("RDMA receive failed: %s\n", strerror(err)); + exit(1); + } + cl->rdma_conn->cur_recv++; +} + +bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) +{ + auto rc = cl->rdma_conn; + if (rc->cur_recv > 0) + { + return true; + } + if (!cl->recv_list.get_size()) + { + cl->recv_list.reset(); + cl->read_op = new osd_op_t; + cl->read_op->peer_fd = cl->peer_fd; + cl->read_op->op_type = OSD_OP_IN; + cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); + cl->read_remaining = OSD_PACKET_SIZE; + cl->read_state = CL_READ_HDR; + } + int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; + iovec *segments = cl->recv_list.get_iovec(); + // FIXME: rc->max_sge should be negotiated between client & server + ibv_sge sge[rc->max_sge]; + while (rc->recv_pos < cl->recv_list.get_size()) + { + iovec & iov = segments[rc->recv_pos]; + if (op_size >= op_max || op_sge >= rc->max_sge) + { + try_recv_rdma_wr(cl, sge, op_sge); + op_sge = 0; + op_size = 0; + if (rc->cur_recv >= rc->max_recv) + break; + } + // Receive in identical (max_sge*4k) fragments + uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max ? iov.iov_len-rc->recv_buf_pos : op_max-op_size); + sge[op_sge++] = { + .addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos), + .length = len, + .lkey = rc->ctx->mr->lkey, + }; + op_size += len; + rc->recv_buf_pos += len; + if (rc->recv_buf_pos >= iov.iov_len) + { + rc->recv_pos++; + rc->recv_buf_pos = 0; + } + } + if (op_sge > 0) + { + try_recv_rdma_wr(cl, sge, op_sge); + } + return true; +} + +#define RDMA_EVENTS_AT_ONCE 32 + +void osd_messenger_t::handle_rdma_events() +{ + // Request next notification + ibv_cq *ev_cq; + void *ev_ctx; + // FIXME: This is inefficient as it calls read()... + if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0) + { + ibv_ack_cq_events(rdma_context->cq, 1); + } + if (ibv_req_notify_cq(rdma_context->cq, 0) != 0) + { + printf("Failed to request RDMA completion notification, exiting\n"); + exit(1); + } + ibv_wc wc[RDMA_EVENTS_AT_ONCE]; + int event_count; + do + { + event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); + for (int i = 0; i < event_count; i++) + { + int client_id = wc[i].wr_id >> 1; + bool is_send = wc[i].wr_id & 1; + auto cl_it = clients.find(client_id); + if (cl_it == clients.end()) + { + continue; + } + osd_client_t *cl = cl_it->second; + if (wc[i].status != IBV_WC_SUCCESS) + { + printf("RDMA work request failed for client %d", client_id); + if (cl->osd_num) + { + printf(" (OSD %lu)", cl->osd_num); + } + printf(" with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); + stop_client(client_id); + continue; + } + if (!is_send) + { + cl->rdma_conn->cur_recv--; + if (!cl->rdma_conn->cur_recv) + { + cl->recv_list.done += cl->rdma_conn->recv_pos; + cl->rdma_conn->recv_pos = 0; + if (!cl->recv_list.get_size()) + { + cl->read_remaining = 0; + if (handle_finished_read(cl)) + { + try_recv_rdma(cl); + } + } + else + { + // Continue to receive data + try_recv_rdma(cl); + } + } + } + else + { + cl->rdma_conn->cur_send--; + if (!cl->rdma_conn->cur_send) + { + // Wait for the whole batch + for (int i = 0; i < cl->rdma_conn->send_pos; i++) + { + if (cl->outbox[i].flags & MSGR_SENDP_FREE) + { + // Reply fully sent + delete cl->outbox[i].op; + } + } + if (cl->rdma_conn->send_pos > 0) + { + cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); + cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); + cl->rdma_conn->send_pos = 0; + } + if (cl->rdma_conn->send_buf_pos > 0) + { + cl->send_list[0].iov_base += cl->rdma_conn->send_buf_pos; + cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos; + cl->rdma_conn->send_buf_pos = 0; + } + try_send_rdma(cl); + } + } + } + } while (event_count > 0); + for (auto cb: set_immediate) + { + cb(); + } + set_immediate.clear(); +} diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h new file mode 100644 index 00000000..1ebec6fb --- /dev/null +++ b/src/msgr_rdma.h @@ -0,0 +1,53 @@ +#pragma once +#include +#include +#include + +struct msgr_rdma_address_t +{ + ibv_gid gid; + uint16_t lid; + uint32_t qpn; + uint32_t psn; + + std::string to_string(); + static bool from_string(const char *str, msgr_rdma_address_t *dest); +}; + +struct msgr_rdma_context_t +{ + ibv_context *context = NULL; + ibv_device *dev = NULL; + ibv_device_attr_ex attrx; + ibv_pd *pd = NULL; + ibv_mr *mr = NULL; + ibv_comp_channel *channel = NULL; + ibv_cq *cq = NULL; + ibv_port_attr portinfo; + uint8_t ib_port; + uint8_t gid_index; + uint16_t my_lid; + ibv_gid my_gid; + uint32_t mtu; + int max_cqe = 0; + int used_max_cqe = 0; + + static msgr_rdma_context_t *create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu); + ~msgr_rdma_context_t(); +}; + +struct msgr_rdma_connection_t +{ + msgr_rdma_context_t *ctx = NULL; + ibv_qp *qp = NULL; + msgr_rdma_address_t addr; + int max_send = 0, max_recv = 0, max_sge = 0; + int cur_send = 0, cur_recv = 0; + + int send_pos = 0, send_buf_pos = 0; + int recv_pos = 0, recv_buf_pos = 0; + + ~msgr_rdma_connection_t(); + static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge); + int connect(msgr_rdma_address_t *dest); +}; diff --git a/src/msgr_receive.cpp b/src/msgr_receive.cpp index e5859d70..bf8ed1b7 100644 --- a/src/msgr_receive.cpp +++ b/src/msgr_receive.cpp @@ -254,6 +254,16 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl) } cl->read_remaining = cur_op->req.rw.len; } + else if (cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG) + { + if (cur_op->req.show_conf.json_len > 0) + { + cur_op->buf = malloc_or_die(cur_op->req.show_conf.json_len+1); + ((uint8_t*)cur_op->buf)[cur_op->req.show_conf.json_len] = 0; + cl->recv_list.push_back(cur_op->buf, cur_op->req.show_conf.json_len); + } + cl->read_remaining = cur_op->req.show_conf.json_len; + } if (cl->read_remaining > 0) { // Read data @@ -338,11 +348,11 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) } else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0) { - assert(!op->iov.count); delete cl->read_op; cl->read_op = op; cl->read_state = CL_READ_REPLY_DATA; cl->read_remaining = op->reply.hdr.retval; + free(op->buf); op->buf = malloc_or_die(op->reply.hdr.retval); cl->recv_list.push_back(op->buf, op->reply.hdr.retval); } diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index 8bdaf197..cd06c4ec 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -46,7 +46,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) to_send_list.push_back((iovec){ .iov_base = cur_op->req.buf, .iov_len = OSD_PACKET_SIZE }); cl->sent_ops[cur_op->req.hdr.id] = cur_op; } - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_HDR }); // Bitmap if (cur_op->op_type == OSD_OP_IN && cur_op->req.hdr.opcode == OSD_OP_SEC_READ && @@ -56,7 +56,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) .iov_base = cur_op->bitmap, .iov_len = cur_op->reply.sec_rw.attr_len, }); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } else if (cur_op->op_type == OSD_OP_OUT && (cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE) && @@ -66,7 +66,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) .iov_base = cur_op->bitmap, .iov_len = cur_op->req.sec_rw.attr_len, }); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } // Operation data if ((cur_op->op_type == OSD_OP_IN @@ -78,13 +78,14 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE || cur_op->req.hdr.opcode == OSD_OP_SEC_WRITE_STABLE || cur_op->req.hdr.opcode == OSD_OP_SEC_STABILIZE || - cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK)) && cur_op->iov.count > 0) + cur_op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK || + cur_op->req.hdr.opcode == OSD_OP_SHOW_CONFIG)) && cur_op->iov.count > 0) { for (int i = 0; i < cur_op->iov.count; i++) { assert(cur_op->iov.buf[i].iov_base); to_send_list.push_back(cur_op->iov.buf[i]); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } } if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) @@ -93,13 +94,19 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->reply.hdr.retval }); else if (cur_op->op_type == OSD_OP_OUT && cur_op->req.sec_read_bmp.len > 0) to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len }); - to_outbox.push_back(NULL); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } if (cur_op->op_type == OSD_OP_IN) { - // To free it later - to_outbox[to_outbox.size()-1] = cur_op; + to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_FREE; } +#ifdef WITH_RDMA + if (cl->peer_state == PEER_RDMA) + { + try_send_rdma(cl); + return; + } +#endif if (!ringloop) { // FIXME: It's worse because it doesn't allow batching @@ -232,10 +239,10 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) iovec & iov = cl->send_list[done]; if (iov.iov_len <= result) { - if (cl->outbox[done]) + if (cl->outbox[done].flags & MSGR_SENDP_FREE) { // Reply fully sent - delete cl->outbox[done]; + delete cl->outbox[done].op; } result -= iov.iov_len; done++; @@ -260,6 +267,18 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) cl->next_outbox.clear(); } cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0; +#ifdef WITH_RDMA + if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING) + { + // FIXME: Do something better than just forgetting the FD + // FIXME: Ignore pings during RDMA state transition + printf("Successfully connected with client %d using RDMA\n", cl->peer_fd); + cl->peer_state = PEER_RDMA; + tfd->set_fd_handler(cl->peer_fd, false, NULL); + // Add the initial receive request + try_recv_rdma(cl); + } +#endif } if (cl->write_state != 0) { diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index 5caa65ec..ae15a923 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -122,6 +122,12 @@ void osd_messenger_t::stop_client(int peer_fd, bool force) // And close the FD only when everything is done // ...because peer_fd number can get reused after close() close(peer_fd); +#ifdef WITH_RDMA + if (cl->rdma_conn) + { + delete cl->rdma_conn; + } +#endif #endif // Find the item again because it can be invalidated at this point it = clients.find(peer_fd); diff --git a/src/osd_ops.h b/src/osd_ops.h index 76b3ad3a..e8078b71 100644 --- a/src/osd_ops.h +++ b/src/osd_ops.h @@ -148,6 +148,8 @@ struct __attribute__((__packed__)) osd_reply_sec_read_bmp_t struct __attribute__((__packed__)) osd_op_show_config_t { osd_op_header_t header; + // JSON request length + uint64_t json_len; }; struct __attribute__((__packed__)) osd_reply_show_config_t diff --git a/src/osd_secondary.cpp b/src/osd_secondary.cpp index f6c3eb11..106b61de 100644 --- a/src/osd_secondary.cpp +++ b/src/osd_secondary.cpp @@ -144,6 +144,10 @@ void osd_t::exec_secondary(osd_op_t *cur_op) void osd_t::exec_show_config(osd_op_t *cur_op) { + std::string json_err; + json11::Json req_json = cur_op->req.show_conf.json_len > 0 + ? json11::Json::parse(std::string((char *)cur_op->buf), json_err) + : json11::Json(); // Expose sensitive configuration values so peers can check them json11::Json::object wire_config = json11::Json::object { { "osd_num", osd_num }, @@ -157,6 +161,25 @@ void osd_t::exec_show_config(osd_op_t *cur_op) (immediate_commit == IMMEDIATE_SMALL ? "small" : "none")) }, { "lease_timeout", etcd_report_interval+(MAX_ETCD_ATTEMPTS*(2*ETCD_QUICK_TIMEOUT)+999)/1000 }, }; +#ifdef WITH_RDMA + if (msgr.is_rdma_enabled()) + { + // Indicate that RDMA is enabled + wire_config["rdma_enabled"] = true; + if (req_json["connect_rdma"].is_string()) + { + // Peer is trying to connect using RDMA, try to satisfy him + bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value()); + if (ok) + { + wire_config["rdma_connected"] = true; + wire_config["rdma_address"] = msgr.clients.at(cur_op->peer_fd)->rdma_conn->addr.to_string(); + } + } + } +#endif + if (cur_op->buf) + free(cur_op->buf); std::string cfg_str = json11::Json(wire_config).dump(); cur_op->buf = malloc_or_die(cfg_str.size()+1); memcpy(cur_op->buf, cfg_str.c_str(), cfg_str.size()+1);