From 971aa4ae4f77a9a119b58e42d91837019b2afe8f Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 29 Apr 2021 00:58:55 +0300 Subject: [PATCH] Implement RDMA receive with memory copying (send remains zero-copy) This is the simplest and, as usual, the best implementation :) 100% zero-copy implementation is also possible (see rdma-zerocopy branch), but it requires to create A LOT of queues (~128 per client) to use QPN as a 'tag' because of the lack of receive tags and the server may simply run out of queues. Hardware limit is 262144 on Mellanox ConnectX-4 which amounts to only 2048 'connections' per host. And even with that amount of queues it's still less optimal than the non-zerocopy one. In fact, newest hardware like Mellanox ConnectX-5 does have Tag Matching support, but it's still unsuitable for us because it doesn't support scatter/gather (tm_caps.max_sge=1). --- README-ru.md | 3 +- README.md | 3 +- src/messenger.cpp | 3 - src/messenger.h | 3 +- src/msgr_rdma.cpp | 136 +++++++++++-------------------------------- src/msgr_rdma.h | 7 +++ src/msgr_receive.cpp | 89 +++++++++++++++------------- 7 files changed, 94 insertions(+), 150 deletions(-) diff --git a/README-ru.md b/README-ru.md index 88bb45e6..a18db93e 100644 --- a/README-ru.md +++ b/README-ru.md @@ -49,6 +49,7 @@ Vitastor на данный момент находится в статусе п - Именование инодов через хранение их метаданных в etcd - Снапшоты и copy-on-write клоны - Сглаживание производительности случайной записи в SSD+HDD конфигурациях +- Поддержка RDMA/RoCEv2 через libibverbs ## Планы развития @@ -60,7 +61,7 @@ Vitastor на данный момент находится в статусе п - Фоновая проверка целостности без контрольных сумм (сверка реплик) - Контрольные суммы - Поддержка SSD-кэширования (tiered storage) -- Поддержка RDMA и NVDIMM +- Поддержка NVDIMM - Web-интерфейс - Возможно, сжатие - Возможно, поддержка кэширования данных через системный page cache diff --git a/README.md b/README.md index 7235cdb6..66f0c06d 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ breaking changes in the future. However, the following is implemented: - Inode metadata storage in etcd - Snapshots and copy-on-write image clones - Write throttling to smooth random write workloads in SSD+HDD configurations +- RDMA/RoCEv2 support via libibverbs ## Roadmap @@ -54,7 +55,7 @@ breaking changes in the future. However, the following is implemented: - Scrubbing without checksums (verification of replicas) - Checksums - Tiered storage -- RDMA and NVDIMM support +- NVDIMM support - Web GUI - Compression (possibly) - Read caching using system page cache (possibly) diff --git a/src/messenger.cpp b/src/messenger.cpp index c0003ab3..bfcd4b9b 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -139,9 +139,6 @@ void osd_messenger_t::parse_config(const json11::Json & config) this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value(); this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value(); #endif - this->bs_bitmap_granularity = strtoull(config["bitmap_granularity"].string_value().c_str(), NULL, 10); - if (!this->bs_bitmap_granularity) - this->bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() || config["use_sync_send_recv"].uint64_value(); this->peer_connect_interval = config["peer_connect_interval"].uint64_value(); diff --git a/src/messenger.h b/src/messenger.h index 4c485a08..2d2abef8 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -128,7 +128,6 @@ protected: int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT; int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT; - uint32_t bs_bitmap_granularity = 0; int log_level = 0; bool use_sync_send_recv = false; @@ -137,6 +136,7 @@ protected: std::string rdma_device; uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0; msgr_rdma_context_t *rdma_context = NULL; + // FIXME: Allow to configure these options uint64_t rdma_max_sge = 128, rdma_max_send = 32, rdma_max_recv = 32; #endif @@ -189,6 +189,7 @@ protected: void handle_send(int result, osd_client_t *cl); bool handle_read(int result, osd_client_t *cl); + bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain); bool handle_finished_read(osd_client_t *cl); void handle_op_hdr(osd_client_t *cl); bool handle_reply_hdr(osd_client_t *cl); diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index e9184374..061a0278 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -1,3 +1,6 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + #include #include #include "msgr_rdma.h" @@ -355,57 +358,34 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) // Only send one batch at a time return true; } - int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; + uint64_t op_size = 0, op_sge = 0; ibv_sge sge[rc->max_sge]; while (rc->send_pos < cl->send_list.size()) { iovec & iov = cl->send_list[rc->send_pos]; - if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR) + if (op_size >= RDMA_MAX_MSG || op_sge >= rc->max_sge) { - if (op_sge > 0) - { - try_send_rdma_wr(cl, sge, op_sge); - op_sge = 0; - op_size = 0; - if (rc->cur_send >= rc->max_send) - break; - } - assert(rc->send_buf_pos == 0); - sge[0] = { - .addr = (uintptr_t)iov.iov_base, - .length = (uint32_t)iov.iov_len, - .lkey = rc->ctx->mr->lkey, - }; - try_send_rdma_wr(cl, sge, 1); - rc->send_pos++; + try_send_rdma_wr(cl, sge, op_sge); + op_sge = 0; + op_size = 0; if (rc->cur_send >= rc->max_send) + { break; + } } - else + uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < RDMA_MAX_MSG + ? iov.iov_len-rc->send_buf_pos : RDMA_MAX_MSG-op_size); + sge[op_sge++] = { + .addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos), + .length = len, + .lkey = rc->ctx->mr->lkey, + }; + op_size += len; + rc->send_buf_pos += len; + if (rc->send_buf_pos >= iov.iov_len) { - if (op_size >= op_max || op_sge >= rc->max_sge) - { - try_send_rdma_wr(cl, sge, op_sge); - op_sge = 0; - op_size = 0; - if (rc->cur_send >= rc->max_send) - break; - } - // Fragment all messages into parts no longer than (max_sge*4k) = 120k on ConnectX-4 - // Otherwise the client may not be able to receive them in small parts - uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < op_max ? iov.iov_len-rc->send_buf_pos : op_max-op_size); - sge[op_sge++] = { - .addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos), - .length = len, - .lkey = rc->ctx->mr->lkey, - }; - op_size += len; - rc->send_buf_pos += len; - if (rc->send_buf_pos >= iov.iov_len) - { - rc->send_pos++; - rc->send_buf_pos = 0; - } + rc->send_pos++; + rc->send_buf_pos = 0; } } if (op_sge > 0) @@ -435,52 +415,16 @@ static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) { auto rc = cl->rdma_conn; - if (rc->cur_recv > 0) + while (rc->cur_recv < rc->max_recv) { - return true; - } - if (!cl->recv_list.get_size()) - { - cl->recv_list.reset(); - cl->read_op = new osd_op_t; - cl->read_op->peer_fd = cl->peer_fd; - cl->read_op->op_type = OSD_OP_IN; - cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); - cl->read_remaining = OSD_PACKET_SIZE; - cl->read_state = CL_READ_HDR; - } - int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; - iovec *segments = cl->recv_list.get_iovec(); - ibv_sge sge[rc->max_sge]; - while (rc->recv_pos < cl->recv_list.get_size()) - { - iovec & iov = segments[rc->recv_pos]; - if (op_size >= op_max || op_sge >= rc->max_sge) - { - try_recv_rdma_wr(cl, sge, op_sge); - op_sge = 0; - op_size = 0; - if (rc->cur_recv >= rc->max_recv) - break; - } - // Receive in identical (max_sge*4k) fragments - uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max ? iov.iov_len-rc->recv_buf_pos : op_max-op_size); - sge[op_sge++] = { - .addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos), - .length = len, + void *buf = malloc_or_die(RDMA_MAX_MSG); + rc->recv_buffers.push_back(buf); + ibv_sge sge = { + .addr = (uintptr_t)buf, + .length = RDMA_MAX_MSG, .lkey = rc->ctx->mr->lkey, }; - op_size += len; - rc->recv_buf_pos += len; - if (rc->recv_buf_pos >= iov.iov_len) - { - rc->recv_pos++; - rc->recv_buf_pos = 0; - } - } - if (op_sge > 0) - { - try_recv_rdma_wr(cl, sge, op_sge); + try_recv_rdma_wr(cl, &sge, 1); } return true; } @@ -531,24 +475,10 @@ void osd_messenger_t::handle_rdma_events() if (!is_send) { cl->rdma_conn->cur_recv--; - if (!cl->rdma_conn->cur_recv) - { - cl->recv_list.done += cl->rdma_conn->recv_pos; - cl->rdma_conn->recv_pos = 0; - if (!cl->recv_list.get_size()) - { - cl->read_remaining = 0; - if (handle_finished_read(cl)) - { - try_recv_rdma(cl); - } - } - else - { - // Continue to receive data - try_recv_rdma(cl); - } - } + handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len); + free(cl->rdma_conn->recv_buffers[0]); + cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1); + try_recv_rdma(cl); } else { diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h index 1ebec6fb..92573140 100644 --- a/src/msgr_rdma.h +++ b/src/msgr_rdma.h @@ -1,8 +1,14 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + #pragma once #include #include #include +// FIXME: Allow to configure this option +#define RDMA_MAX_MSG 4194304 + struct msgr_rdma_address_t { ibv_gid gid; @@ -46,6 +52,7 @@ struct msgr_rdma_connection_t int send_pos = 0, send_buf_pos = 0; int recv_pos = 0, recv_buf_pos = 0; + std::vector recv_buffers; ~msgr_rdma_connection_t(); static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge); diff --git a/src/msgr_receive.cpp b/src/msgr_receive.cpp index bf8ed1b7..873527b4 100644 --- a/src/msgr_receive.cpp +++ b/src/msgr_receive.cpp @@ -91,48 +91,9 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl) { if (cl->read_iov.iov_base == cl->in_buf) { - // Compose operation(s) from the buffer - int remain = result; - void *curbuf = cl->in_buf; - while (remain > 0) + if (!handle_read_buffer(cl, cl->in_buf, result)) { - if (!cl->read_op) - { - cl->read_op = new osd_op_t; - cl->read_op->peer_fd = cl->peer_fd; - cl->read_op->op_type = OSD_OP_IN; - cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); - cl->read_remaining = OSD_PACKET_SIZE; - cl->read_state = CL_READ_HDR; - } - while (cl->recv_list.done < cl->recv_list.count && remain > 0) - { - iovec* cur = cl->recv_list.get_iovec(); - if (cur->iov_len > remain) - { - memcpy(cur->iov_base, curbuf, remain); - cl->read_remaining -= remain; - cur->iov_len -= remain; - cur->iov_base += remain; - remain = 0; - } - else - { - memcpy(cur->iov_base, curbuf, cur->iov_len); - curbuf += cur->iov_len; - cl->read_remaining -= cur->iov_len; - remain -= cur->iov_len; - cur->iov_len = 0; - cl->recv_list.done++; - } - } - if (cl->recv_list.done >= cl->recv_list.count) - { - if (!handle_finished_read(cl)) - { - goto fin; - } - } + goto fin; } } else @@ -159,6 +120,52 @@ fin: return ret; } +bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain) +{ + // Compose operation(s) from the buffer + while (remain > 0) + { + if (!cl->read_op) + { + cl->read_op = new osd_op_t; + cl->read_op->peer_fd = cl->peer_fd; + cl->read_op->op_type = OSD_OP_IN; + cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); + cl->read_remaining = OSD_PACKET_SIZE; + cl->read_state = CL_READ_HDR; + } + while (cl->recv_list.done < cl->recv_list.count && remain > 0) + { + iovec* cur = cl->recv_list.get_iovec(); + if (cur->iov_len > remain) + { + memcpy(cur->iov_base, curbuf, remain); + cl->read_remaining -= remain; + cur->iov_len -= remain; + cur->iov_base += remain; + remain = 0; + } + else + { + memcpy(cur->iov_base, curbuf, cur->iov_len); + curbuf += cur->iov_len; + cl->read_remaining -= cur->iov_len; + remain -= cur->iov_len; + cur->iov_len = 0; + cl->recv_list.done++; + } + } + if (cl->recv_list.done >= cl->recv_list.count) + { + if (!handle_finished_read(cl)) + { + return false; + } + } + } + return true; +} + bool osd_messenger_t::handle_finished_read(osd_client_t *cl) { cl->recv_list.reset();