From 9681b62204d463774411c44c37e0888de9752b09 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 27 Apr 2021 00:16:49 +0300 Subject: [PATCH] WIP multi-queue RDMA --- src/fio_cluster.cpp | 13 +- src/messenger.cpp | 61 ++--- src/messenger.h | 14 +- src/msgr_rdma.cpp | 618 +++++++++++++++++++++++++++++++++--------- src/msgr_rdma.h | 3 + src/msgr_send.cpp | 8 +- src/msgr_stop.cpp | 5 +- src/osd_secondary.cpp | 14 +- 8 files changed, 549 insertions(+), 187 deletions(-) diff --git a/src/fio_cluster.cpp b/src/fio_cluster.cpp index c6bc05c8..058d8820 100644 --- a/src/fio_cluster.cpp +++ b/src/fio_cluster.cpp @@ -127,7 +127,7 @@ static struct fio_option options[] = { }, { .name = "use_rdma", - .lname = "OSD trace", + .lname = "Use RDMA", .type = FIO_OPT_BOOL, .off1 = offsetof(struct sec_options, use_rdma), .help = "Use RDMA", @@ -135,6 +135,16 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_FILENAME, }, + { + .name = "rdma_gid_index", + .lname = "RDMA gid index", + .type = FIO_OPT_INT, + .off1 = offsetof(struct sec_options, rdma_gid_index), + .help = "RDMA gid index", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = NULL, }, @@ -171,6 +181,7 @@ static int sec_setup(struct thread_data *td) { "etcd_prefix", std::string(o->etcd_prefix ? o->etcd_prefix : "/vitastor") }, { "log_level", o->cluster_log }, { "use_rdma", o->use_rdma }, + { "rdma_gid_index", o->rdma_gid_index }, }; if (!o->image) diff --git a/src/messenger.cpp b/src/messenger.cpp index d76c70eb..e15cdf89 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -25,6 +25,8 @@ void osd_messenger_t::init() } else { + rdma_max_sge = rdma_max_sge < rdma_context->attrx.orig_attr.max_sge + ? rdma_max_sge : rdma_context->attrx.orig_attr.max_sge; printf("[OSD %lu] RDMA initialized successfully\n", osd_num); fcntl(rdma_context->channel->fd, F_SETFL, fcntl(rdma_context->channel->fd, F_GETFL, 0) | O_NONBLOCK); tfd->set_fd_handler(rdma_context->channel->fd, false, [this](int notify_fd, int epoll_events) @@ -356,9 +358,6 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) void osd_messenger_t::check_peer_config(osd_client_t *cl) { -#ifdef WITH_RDMA - msgr_rdma_connection_t *rdma_conn = NULL; -#endif osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; op->peer_fd = cl->peer_fd; @@ -374,11 +373,25 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) #ifdef WITH_RDMA if (rdma_context) { - cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, max_rdma_send, max_rdma_recv, max_rdma_sge); - if (cl->rdma_conn) + for (int i = 0; i < rdma_queues_per_connection; i++) { + auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge); + if (!rdma_conn) + { + break; + } + cl->rdma_queues.push_back(rdma_conn); + } + if (cl->rdma_queues.size()) + { + json11::Json::array addresses; + for (auto rdma_conn: cl->rdma_queues) + { + addresses.push_back(rdma_conn->addr.to_string()); + } json11::Json payload = json11::Json::object { - { "connect_rdma", cl->rdma_conn->addr.to_string() }, + { "rdma_queues", addresses }, + { "rdma_max_sge", rdma_max_sge }, }; std::string payload_str = payload.dump(); op->req.show_conf.json_len = payload_str.size(); @@ -388,11 +401,7 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) } } #endif - op->callback = [this, cl -#ifdef WITH_RDMA - , rdma_conn -#endif - ](osd_op_t *op) + op->callback = [this, cl](osd_op_t *op) { std::string json_err; json11::Json config; @@ -434,33 +443,11 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) return; } #ifdef WITH_RDMA - if (config["rdma_address"].is_string()) + if (!connect_rdma_server(cl, config["rdma_queues"], config["rdma_max_sge"].uint64_value())) { - msgr_rdma_address_t addr; - if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) || - cl->rdma_conn->connect(&addr) != 0) - { - printf( - "Failed to connect to OSD %lu (address %s) using RDMA\n", - cl->osd_num, config["rdma_address"].string_value().c_str() - ); - delete cl->rdma_conn; - cl->rdma_conn = NULL; - // FIXME: Keep TCP connection in this case - osd_num_t peer_osd = cl->osd_num; - stop_client(cl->peer_fd); - on_connect_peer(peer_osd, -1); - delete op; - return; - } - else - { - printf("Connected to OSD %lu using RDMA\n", cl->osd_num); - cl->peer_state = PEER_RDMA; - tfd->set_fd_handler(cl->peer_fd, false, NULL); - // Add the initial receive request - try_recv_rdma(cl); - } + // FIXME: Keep TCP connection in this case + delete op; + return; } #endif osd_peer_fds[cl->osd_num] = cl->peer_fd; diff --git a/src/messenger.h b/src/messenger.h index 6b312159..b4bb8925 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -40,6 +40,7 @@ #define MSGR_SENDP_HDR 1 #define MSGR_SENDP_FREE 2 +#define MSGR_SENDP_BMP 4 struct msgr_sendp_t { @@ -63,7 +64,7 @@ struct osd_client_t void *in_buf = NULL; #ifdef WITH_RDMA - msgr_rdma_connection_t *rdma_conn = NULL; + std::vector rdma_queues; #endif // Read state @@ -137,7 +138,8 @@ protected: std::string rdma_device; uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0; msgr_rdma_context_t *rdma_context = NULL; - int max_rdma_sge = 128, max_rdma_send = 32, max_rdma_recv = 32; + int rdma_queues_per_connection = 128; + int rdma_max_sge = 128, rdma_max_send = 32, rdma_max_recv = 32; #endif std::vector read_ready_clients; @@ -170,7 +172,8 @@ public: #ifdef WITH_RDMA bool is_rdma_enabled(); - bool connect_rdma(int peer_fd, std::string rdma_address); + bool connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge); + int get_rdma_max_sge(); #endif protected: @@ -194,8 +197,9 @@ protected: void handle_reply_ready(osd_op_t *op); #ifdef WITH_RDMA - bool try_send_rdma(osd_client_t *cl); - bool try_recv_rdma(osd_client_t *cl); + void try_send_rdma(osd_client_t *cl); + void try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc); void handle_rdma_events(); + bool connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge); #endif }; diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index bcc48ffb..e1ae1529 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -293,75 +293,225 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) return 0; } -bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address) +// Being the client, connect all server's RDMA queues to our local (client) queues +bool osd_messenger_t::connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge) { - // Try to connect to the peer using RDMA - msgr_rdma_address_t addr; - if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr)) + if (rdma_addresses.array_items().size() > 0) { - auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, max_rdma_send, max_rdma_recv, max_rdma_sge); - if (rdma_conn) + if (!server_max_sge || server_max_sge > rdma_max_sge) { - int r = rdma_conn->connect(&addr); - if (r != 0) + server_max_sge = rdma_max_sge; + } + int n_conn = rdma_addresses.array_items().size(); + if (n_conn < cl->rdma_queues.size()) + { + for (int i = n_conn; i < cl->rdma_queues.size(); i++) + { + delete cl->rdma_queues[i]; + } + cl->rdma_queues.resize(n_conn); + } + else if (n_conn > cl->rdma_queues.size()) + { + n_conn = cl->rdma_queues.size(); + } + for (int i = 0; i < n_conn; i++) + { + msgr_rdma_address_t addr; + if (!msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr) || + cl->rdma_queues[i]->connect(&addr) != 0) { - delete rdma_conn; printf( - "Failed to connect RDMA queue pair to %s (client %d)\n", - addr.to_string().c_str(), peer_fd + "Failed to connect to OSD %lu (address %s) using RDMA\n", + cl->osd_num, rdma_addresses[i].string_value().c_str() ); + // FIXME: Keep TCP connection in this case + osd_num_t peer_osd = cl->osd_num; + stop_client(cl->peer_fd); + on_connect_peer(peer_osd, -1); + return false; } else { - // Remember connection, but switch to RDMA only after sending the configuration response - auto cl = clients.at(peer_fd); - cl->rdma_conn = rdma_conn; - cl->peer_state = PEER_RDMA_CONNECTING; - return true; + printf("Connected local queue %d to OSD %lu queue %d using RDMA\n", cl->rdma_queues[i]->qp->qp_num, cl->osd_num, addr.qpn); + if (cl->rdma_queues[i]->max_sge > server_max_sge) + { + cl->rdma_queues[i]->max_sge = server_max_sge; + } } } + cl->peer_state = PEER_RDMA; + tfd->set_fd_handler(cl->peer_fd, false, NULL); } - return false; + else + { + for (auto rdma_conn: cl->rdma_queues) + { + delete rdma_conn; + } + cl->rdma_queues.resize(0); + } + return true; } -static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) +// Being the server, try to connect all client's RDMA queues to our local (server) queues +bool osd_messenger_t::connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge) +{ + if (rdma_addresses.array_items().size() > 0) + { + if (!client_max_sge || client_max_sge > rdma_max_sge) + { + client_max_sge = rdma_max_sge; + } + int n_conn = rdma_addresses.array_items().size(); + if (n_conn > rdma_queues_per_connection) + { + n_conn = rdma_queues_per_connection; + } + for (int i = 0; i < n_conn; i++) + { + msgr_rdma_address_t addr; + if (msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr)) + { + auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, client_max_sge); + if (rdma_conn && rdma_conn->connect(&addr) == 0) + { + printf("Connected local queue %d to client %d queue %d using RDMA\n", rdma_conn->qp->qp_num, cl->peer_fd, addr.qpn); + cl->rdma_queues.push_back(rdma_conn); + } + else + { + if (rdma_conn) + { + delete rdma_conn; + } + printf( + "Failed to connect RDMA queue pair to %s (client %d queue %d)\n", + addr.to_string().c_str(), cl->peer_fd, i+1 + ); + // Delete all RDMA queues to keep the TCP connection + for (int j = 0; j < cl->rdma_queues.size(); j++) + { + delete cl->rdma_queues[j]; + } + cl->rdma_queues.resize(0); + return false; + } + } + } + // Switch to RDMA state only after sending the configuration response + cl->peer_state = PEER_RDMA_CONNECTING; + for (int i = 0; i < cl->rdma_queues.size(); i++) + { + try_recv_rdma(cl, cl->rdma_queues[i]); + } + } + return true; +} + +static void try_send_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge) { ibv_send_wr *bad_wr = NULL; ibv_send_wr wr = { - .wr_id = (uint64_t)(cl->peer_fd*2+1), + .wr_id = wr_id, .sg_list = sge, .num_sge = op_sge, .opcode = IBV_WR_SEND, .send_flags = IBV_SEND_SIGNALED, }; - int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); + int err = ibv_post_send(rc->qp, &wr, &bad_wr); if (err || bad_wr) { printf("RDMA send failed: %s\n", strerror(err)); exit(1); } - cl->rdma_conn->cur_send++; + rc->cur_send++; } -bool osd_messenger_t::try_send_rdma(osd_client_t *cl) +static void try_recv_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge) { - auto rc = cl->rdma_conn; - if (!cl->send_list.size() || rc->cur_send > 0) + ibv_recv_wr *bad_wr = NULL; + ibv_recv_wr wr = { + .wr_id = wr_id, + .sg_list = sge, + .num_sge = op_sge, + }; + int err = ibv_post_recv(rc->qp, &wr, &bad_wr); + if (err || bad_wr) { - // Only send one batch at a time - return true; + printf("RDMA receive failed: %s\n", strerror(err)); + exit(1); } - int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; - // FIXME: rc->max_sge should be negotiated between client & server + rc->cur_recv++; +} + +static bool try_recv_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, uint32_t bs_bitmap_granularity) +{ + int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity; + iovec *segments = cur_op->iov.get_iovec(); ibv_sge sge[rc->max_sge]; - while (rc->send_pos < cl->send_list.size()) + sge[0] = { + .addr = (uintptr_t)cur_op->reply.buf, + .length = (uint32_t)OSD_PACKET_SIZE, + .lkey = rc->ctx->mr->lkey, + }; + while (rc->recv_pos < cur_op->iov.get_size()) + { + iovec & iov = segments[rc->recv_pos]; + if (op_size >= op_max || op_sge >= rc->max_sge) + { + try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge); + op_sge = 0; + op_size = 0; + if (rc->cur_recv >= rc->max_recv) + { + // FIXME + exit(1); + } + } + // Receive in (max_sge*4k) fragments + uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max + ? iov.iov_len-rc->recv_buf_pos : op_max-op_size); + sge[op_sge++] = { + .addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos), + .length = len, + .lkey = rc->ctx->mr->lkey, + }; + op_size += len; + rc->recv_buf_pos += len; + if (rc->recv_buf_pos >= iov.iov_len) + { + rc->recv_pos++; + rc->recv_buf_pos = 0; + } + } + if (op_sge > 0) + { + try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge); + } + rc->recv_pos = 0; + rc->recv_buf_pos = 0; + return true; +} + +static bool try_send_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, int op_list_size, uint32_t bs_bitmap_granularity) +{ + ibv_sge sge[rc->max_sge]; + int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity; + sge[0] = { + .addr = (uintptr_t)cl->send_list[0].iov_base, + .length = (uint32_t)cl->send_list[0].iov_len, + .lkey = rc->ctx->mr->lkey, + }; + rc->send_pos = 1; + while (rc->send_pos < op_list_size) { iovec & iov = cl->send_list[rc->send_pos]; if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR) { if (op_sge > 0) { - try_send_rdma_wr(cl, sge, op_sge); + try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); op_sge = 0; op_size = 0; if (rc->cur_send >= rc->max_send) @@ -373,7 +523,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) .length = (uint32_t)iov.iov_len, .lkey = rc->ctx->mr->lkey, }; - try_send_rdma_wr(cl, sge, 1); + try_send_rdma_wr(rc, cl->peer_fd, sge, 1); rc->send_pos++; if (rc->cur_send >= rc->max_send) break; @@ -382,7 +532,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) { if (op_size >= op_max || op_sge >= rc->max_sge) { - try_send_rdma_wr(cl, sge, op_sge); + try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); op_sge = 0; op_size = 0; if (rc->cur_send >= rc->max_send) @@ -407,80 +557,222 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) } if (op_sge > 0) { - try_send_rdma_wr(cl, sge, op_sge); + try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); + } + if (op_list_size == 1) + { + if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || + cur_op->req.hdr.opcode == OSD_OP_READ) + { + sge[0] = { + .addr = 0, + .length = 0, + .lkey = rc->ctx->mr->lkey, + }; + uint64_t data_size = cur_op->req.hdr.opcode == OSD_OP_SEC_READ + ? cur_op->req.sec_rw.len + : cur_op->req.rw.len; + while (data_size >= op_max) + { + try_send_rdma_wr(rc, cl->peer_fd, sge, 1); + data_size -= op_max; + } + if (data_size > 0) + try_send_rdma_wr(rc, cl->peer_fd, sge, 1); + } + else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) + { + sge[0] = { + .addr = 0, + .length = 0, + .lkey = rc->ctx->mr->lkey, + }; + try_send_rdma_wr(rc, cl->peer_fd, sge, 1); + } + else + return true; } return true; } -static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) +void osd_messenger_t::try_send_rdma(osd_client_t *cl) { - ibv_recv_wr *bad_wr = NULL; - ibv_recv_wr wr = { - .wr_id = (uint64_t)(cl->peer_fd*2), - .sg_list = sge, - .num_sge = op_sge, + // Two different algorithms for outgoing and incoming operations + while (cl->outbox.size() > 0) + { + osd_op_t *cur_op = cl->outbox[0].op; + if (cur_op->op_type == OSD_OP_OUT) + { + // Pick a queue. Send operation to it in one part. + int qi; + for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != NULL; qi++) {} + if (qi >= cl->rdma_queues.size()) + { + // No free queues, retry later. + // We only post 1 operation per queue to use the queue pair number as a 'tag'. + return; + } + // Pick all entries for the operation from the queue + int op_list_size = 0; + while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op) + { + op_list_size++; + } + auto rq = cl->rdma_queues[qi]; + rq->cur_op = cur_op; + ibv_sge sge[rq->max_sge]; + // FIXME: This won't work with long bitmaps. But I don't care, I want to finally test fucking RDMA + // header or header+data + sge[0] = { + .addr = (uintptr_t)cl->send_list[0].iov_base, + .length = (uint32_t)cl->send_list[0].iov_len, + .lkey = rq->ctx->mr->lkey, + }; + if (op_list_size == 2) + { + auto & iov = cl->send_list[1]; + sge[1] = { + .addr = (uintptr_t)iov.iov_base, + .length = (uint32_t)iov.iov_len, + .lkey = rq->ctx->mr->lkey, + }; + try_send_rdma_wr(rq, cl->peer_fd, sge, 2); + } + else if (op_list_size == 1) + { + try_send_rdma_wr(rq, cl->peer_fd, sge, 1); + } + else + { + printf("unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size()); + exit(1); + } + cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size); + cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size); + // Post a receive request for the reply at the same time + if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || + cur_op->req.hdr.opcode == OSD_OP_READ) + { + try_recv_rdma_read(cl, rq, cur_op, bs_bitmap_granularity); + } + else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) + { + assert(!cur_op->iov.count); +// FIXME: hardcode +#define clean_entry_bitmap_size 4 + // Reply size is known + uint64_t data_size = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size); + cur_op->rmw_buf = malloc_or_die(data_size); + sge[0] = { + .addr = (uintptr_t)cur_op->reply.buf, + .length = (uint32_t)OSD_PACKET_SIZE, + .lkey = rq->ctx->mr->lkey, + }; + sge[1] = { + .addr = (uintptr_t)cur_op->rmw_buf, + .length = (uint32_t)data_size, + .lkey = rq->ctx->mr->lkey, + }; + try_recv_rdma_wr(rq, cl->peer_fd, sge, 2); + } + else + { + // No reply or reply size is unknown + sge[0] = { + .addr = (uintptr_t)cur_op->reply.buf, + .length = (uint32_t)OSD_PACKET_SIZE, + .lkey = rq->ctx->mr->lkey, + }; + try_recv_rdma_wr(rq, cl->peer_fd, sge, 1); + } + } + else + { + // Send reply to the same queue the operation came from. + // Fragment it into parts no longer than (max_sge*4k) to always + // be able to send and receive them correctly. + int qi; + for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != cur_op; qi++) {} + if (qi >= cl->rdma_queues.size()) + { + printf("Unknown incoming operation for client %d\n", cl->peer_fd); + exit(1); + } + // Pick all entries for the operation from the queue + int op_list_size = 0; + while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op) + { + op_list_size++; + } + auto rq = cl->rdma_queues[qi]; + if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || + cur_op->req.hdr.opcode == OSD_OP_READ) + { + try_send_rdma_read(cl, rq, cur_op, op_list_size, bs_bitmap_granularity); + rq->send_pos = 0; + rq->send_buf_pos = 0; + } + else if (op_list_size == 1) + { + ibv_sge sge[1]; + sge[0] = { + .addr = (uintptr_t)cl->send_list[0].iov_base, + .length = (uint32_t)cl->send_list[0].iov_len, + .lkey = rq->ctx->mr->lkey, + }; + try_send_rdma_wr(rq, cl->peer_fd, sge, 1); + } + else if (op_list_size == 2) + { + ibv_sge sge[2]; + sge[0] = { + .addr = (uintptr_t)cl->send_list[0].iov_base, + .length = (uint32_t)cl->send_list[0].iov_len, + .lkey = rq->ctx->mr->lkey, + }; + sge[1] = { + .addr = (uintptr_t)cl->send_list[1].iov_base, + .length = (uint32_t)cl->send_list[1].iov_len, + .lkey = rq->ctx->mr->lkey, + }; + if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) + try_send_rdma_wr(rq, cl->peer_fd, sge, 2); + else + { + try_send_rdma_wr(rq, cl->peer_fd, sge, 1); + try_send_rdma_wr(rq, cl->peer_fd, sge+1, 1); + } + } + else if (op_list_size > 2) + { + printf("Unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size()); + exit(1); + } + cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size); + cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size); + } + } +} + +// Try to receive an incoming operation via RDMA +void osd_messenger_t::try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc) +{ + rc->cur_op = new osd_op_t; + rc->cur_op->peer_fd = cl->peer_fd; + rc->cur_op->op_type = OSD_OP_IN; + rc->cur_op->buf = memalign_or_die(MEM_ALIGNMENT, 128*1024); // FIXME hardcode for tests + ibv_sge sge[2]; + sge[0] = { + .addr = (uintptr_t)rc->cur_op->req.buf, + .length = (uint32_t)OSD_PACKET_SIZE, + .lkey = rc->ctx->mr->lkey, }; - int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr); - if (err || bad_wr) - { - printf("RDMA receive failed: %s\n", strerror(err)); - exit(1); - } - cl->rdma_conn->cur_recv++; -} - -bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) -{ - auto rc = cl->rdma_conn; - if (rc->cur_recv > 0) - { - return true; - } - if (!cl->recv_list.get_size()) - { - cl->recv_list.reset(); - cl->read_op = new osd_op_t; - cl->read_op->peer_fd = cl->peer_fd; - cl->read_op->op_type = OSD_OP_IN; - cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); - cl->read_remaining = OSD_PACKET_SIZE; - cl->read_state = CL_READ_HDR; - } - int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; - iovec *segments = cl->recv_list.get_iovec(); - // FIXME: rc->max_sge should be negotiated between client & server - ibv_sge sge[rc->max_sge]; - while (rc->recv_pos < cl->recv_list.get_size()) - { - iovec & iov = segments[rc->recv_pos]; - if (op_size >= op_max || op_sge >= rc->max_sge) - { - try_recv_rdma_wr(cl, sge, op_sge); - op_sge = 0; - op_size = 0; - if (rc->cur_recv >= rc->max_recv) - break; - } - // Receive in identical (max_sge*4k) fragments - uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max ? iov.iov_len-rc->recv_buf_pos : op_max-op_size); - sge[op_sge++] = { - .addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos), - .length = len, - .lkey = rc->ctx->mr->lkey, - }; - op_size += len; - rc->recv_buf_pos += len; - if (rc->recv_buf_pos >= iov.iov_len) - { - rc->recv_pos++; - rc->recv_buf_pos = 0; - } - } - if (op_sge > 0) - { - try_recv_rdma_wr(cl, sge, op_sge); - } - return true; + sge[1] = { + .addr = (uintptr_t)rc->cur_op->buf, + .length = (uint32_t)128*1024, + .lkey = rc->ctx->mr->lkey, + }; + try_recv_rdma_wr(rc, cl->peer_fd, sge, 2); } #define RDMA_EVENTS_AT_ONCE 32 @@ -507,8 +799,8 @@ void osd_messenger_t::handle_rdma_events() event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); for (int i = 0; i < event_count; i++) { - int client_id = wc[i].wr_id >> 1; - bool is_send = wc[i].wr_id & 1; + int client_id = wc[i].wr_id; + bool is_send = wc[i].opcode == IBV_WC_SEND; auto cl_it = clients.find(client_id); if (cl_it == clients.end()) { @@ -526,55 +818,110 @@ void osd_messenger_t::handle_rdma_events() stop_client(client_id); continue; } + int q; + for (q = 0; q < cl->rdma_queues.size() && cl->rdma_queues[q]->qp->qp_num != wc[i].qp_num; q++) {} + if (q >= cl->rdma_queues.size()) + { + printf("Unknown queue %d for client %d\n", wc[i].qp_num, cl->peer_fd); + exit(1); + } + auto rc = cl->rdma_queues[q]; if (!is_send) { - cl->rdma_conn->cur_recv--; - if (!cl->rdma_conn->cur_recv) + rc->cur_recv--; + if (!rc->cur_recv) { - cl->recv_list.done += cl->rdma_conn->recv_pos; - cl->rdma_conn->recv_pos = 0; - if (!cl->recv_list.get_size()) + // Fucking shit... + if (rc->cur_op->op_type == OSD_OP_IN) { - cl->read_remaining = 0; - if (handle_finished_read(cl)) + if (wc[i].byte_len <= OSD_PACKET_SIZE) { - try_recv_rdma(cl); + free(rc->cur_op->buf); + rc->cur_op->buf = NULL; } + cl->received_ops.push_back(rc->cur_op); + set_immediate.push_back([this, op = rc->cur_op]() { exec_op(op); }); } - else + else /* if (rc->cur_op->op_type == OSD_OP_OUT) */ { - // Continue to receive data - try_recv_rdma(cl); + if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ || + rc->cur_op->reply.hdr.opcode == OSD_OP_READ) + { + // Data is already received + cl->sent_ops.erase(rc->cur_op->req.hdr.id); + handle_reply_ready(rc->cur_op); + rc->cur_op = NULL; + try_send_rdma(cl); + } + else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP) + { + // Data is already received, but we need to switch buffers + cl->sent_ops.erase(rc->cur_op->req.hdr.id); + free(rc->cur_op->buf); + rc->cur_op->buf = rc->cur_op->rmw_buf; + handle_reply_ready(rc->cur_op); + rc->cur_op = NULL; + try_send_rdma(cl); + } + else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST && rc->cur_op->reply.hdr.retval > 0 || + rc->cur_op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && rc->cur_op->reply.hdr.retval > 0) + { + if (rc->recv_pos != 1) + { + // Data is not received yet (RNR) + uint32_t len; + if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST) + len = sizeof(obj_ver_id) * rc->cur_op->reply.hdr.retval; + else + len = rc->cur_op->reply.hdr.retval; + rc->cur_op->buf = malloc_or_die(len); + ibv_sge sge[1]; + sge[0] = { + .addr = (uintptr_t)rc->cur_op->buf, + .length = len, + .lkey = rc->ctx->mr->lkey, + }; + try_recv_rdma_wr(rc, cl->peer_fd, sge, 1); + rc->recv_pos = 1; + } + else + { + // Done + cl->sent_ops.erase(rc->cur_op->req.hdr.id); + handle_reply_ready(rc->cur_op); + rc->cur_op = NULL; + rc->recv_pos = 0; + try_send_rdma(cl); + } + } + else + { + // No data + cl->sent_ops.erase(rc->cur_op->req.hdr.id); + handle_reply_ready(rc->cur_op); + rc->cur_op = NULL; + try_send_rdma(cl); + } } } } else { - cl->rdma_conn->cur_send--; - if (!cl->rdma_conn->cur_send) + rc->cur_send--; + if (!rc->cur_send) { - // Wait for the whole batch - for (int i = 0; i < cl->rdma_conn->send_pos; i++) + if (rc->cur_op->op_type == OSD_OP_OUT) { - if (cl->outbox[i].flags & MSGR_SENDP_FREE) - { - // Reply fully sent - delete cl->outbox[i].op; - } + // Nothing } - if (cl->rdma_conn->send_pos > 0) + else /* if (rc->cur_op->op_type == OSD_OP_IN) */ { - cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); - cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); - cl->rdma_conn->send_pos = 0; + // Reply fully sent + delete rc->cur_op; + rc->cur_op = NULL; + // Post receive for the next incoming op + try_recv_rdma(cl, rc); } - if (cl->rdma_conn->send_buf_pos > 0) - { - cl->send_list[0].iov_base += cl->rdma_conn->send_buf_pos; - cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos; - cl->rdma_conn->send_buf_pos = 0; - } - try_send_rdma(cl); } } } @@ -585,3 +932,8 @@ void osd_messenger_t::handle_rdma_events() } set_immediate.clear(); } + +int osd_messenger_t::get_rdma_max_sge() +{ + return rdma_max_sge; +} diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h index 1ebec6fb..52704c5a 100644 --- a/src/msgr_rdma.h +++ b/src/msgr_rdma.h @@ -3,6 +3,8 @@ #include #include +struct osd_op_t; + struct msgr_rdma_address_t { ibv_gid gid; @@ -44,6 +46,7 @@ struct msgr_rdma_connection_t int max_send = 0, max_recv = 0, max_sge = 0; int cur_send = 0, cur_recv = 0; + osd_op_t *cur_op = NULL; int send_pos = 0, send_buf_pos = 0; int recv_pos = 0, recv_buf_pos = 0; diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index 564c72fc..f4a4afa2 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -62,7 +62,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) .iov_base = cur_op->bitmap, .iov_len = cur_op->reply.sec_rw.bitmap_len, }); - to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_BMP }); } } else if (cur_op->op_type == OSD_OP_OUT && @@ -79,7 +79,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) .iov_base = cur_op->bitmap, .iov_len = cur_op->req.sec_rw.attr_len, }); - to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); + to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = MSGR_SENDP_BMP }); } } // Operation data @@ -282,15 +282,13 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) } cl->write_state = cl->outbox.size() > 0 ? CL_WRITE_READY : 0; #ifdef WITH_RDMA - if (cl->rdma_conn && !cl->outbox.size() && cl->peer_state == PEER_RDMA_CONNECTING) + if (cl->peer_state == PEER_RDMA_CONNECTING && cl->rdma_queues.size() > 0 && !cl->outbox.size()) { // FIXME: Do something better than just forgetting the FD // FIXME: Ignore pings during RDMA state transition printf("Successfully connected with client %d using RDMA\n", cl->peer_fd); cl->peer_state = PEER_RDMA; tfd->set_fd_handler(cl->peer_fd, false, NULL); - // Add the initial receive request - try_recv_rdma(cl); } #endif } diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index ae15a923..a3a240b0 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -123,10 +123,11 @@ void osd_messenger_t::stop_client(int peer_fd, bool force) // ...because peer_fd number can get reused after close() close(peer_fd); #ifdef WITH_RDMA - if (cl->rdma_conn) + for (auto rdma_conn: cl->rdma_queues) { - delete cl->rdma_conn; + delete rdma_conn; } + cl->rdma_queues.resize(0); #endif #endif // Find the item again because it can be invalidated at this point diff --git a/src/osd_secondary.cpp b/src/osd_secondary.cpp index 6dc5a6ca..185e2ccb 100644 --- a/src/osd_secondary.cpp +++ b/src/osd_secondary.cpp @@ -166,14 +166,20 @@ void osd_t::exec_show_config(osd_op_t *cur_op) { // Indicate that RDMA is enabled wire_config["rdma_enabled"] = true; - if (req_json["connect_rdma"].is_string()) + if (req_json["rdma_queues"].array_items().size()) { // Peer is trying to connect using RDMA, try to satisfy him - bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value()); + auto cl = msgr.clients.at(cur_op->peer_fd); + bool ok = msgr.connect_rdma_client(cl, req_json["rdma_queues"], req_json["rdma_max_sge"].uint64_value()); if (ok) { - wire_config["rdma_connected"] = true; - wire_config["rdma_address"] = msgr.clients.at(cur_op->peer_fd)->rdma_conn->addr.to_string(); + json11::Json::array rdma_queues; + for (auto rdma_conn: cl->rdma_queues) + { + rdma_queues.push_back(rdma_conn->addr.to_string()); + } + wire_config["rdma_queues"] = rdma_queues; + wire_config["rdma_max_sge"] = msgr.get_rdma_max_sge(); } } }