Browse Source

Implement RDMA receive with memory copying (send remains zero-copy)

This is the simplest and, as usual, the best implementation :)

100% zero-copy implementation is also possible (see rdma-zerocopy branch),
but it requires to create A LOT of queues (~128 per client) to use QPN as a 'tag'
because of the lack of receive tags and the server may simply run out of queues.
Hardware limit is 262144 on Mellanox ConnectX-4 which amounts to only 2048
'connections' per host. And even with that amount of queues it's still less optimal
than the non-zerocopy one.

In fact, newest hardware like Mellanox ConnectX-5 does have Tag Matching
support, but it's still unsuitable for us because it doesn't support scatter/gather
(tm_caps.max_sge=1).
allow-etcd-address-option
Vitaliy Filippov 5 months ago
parent
commit
971aa4ae4f
  1. 3
      README-ru.md
  2. 3
      README.md
  3. 3
      src/messenger.cpp
  4. 3
      src/messenger.h
  5. 136
      src/msgr_rdma.cpp
  6. 7
      src/msgr_rdma.h
  7. 89
      src/msgr_receive.cpp

3
README-ru.md

@ -49,6 +49,7 @@ Vitastor на данный момент находится в статусе п
- Именование инодов через хранение их метаданных в etcd
- Снапшоты и copy-on-write клоны
- Сглаживание производительности случайной записи в SSD+HDD конфигурациях
- Поддержка RDMA/RoCEv2 через libibverbs
## Планы развития
@ -60,7 +61,7 @@ Vitastor на данный момент находится в статусе п
- Фоновая проверка целостности без контрольных сумм (сверка реплик)
- Контрольные суммы
- Поддержка SSD-кэширования (tiered storage)
- Поддержка RDMA и NVDIMM
- Поддержка NVDIMM
- Web-интерфейс
- Возможно, сжатие
- Возможно, поддержка кэширования данных через системный page cache

3
README.md

@ -43,6 +43,7 @@ breaking changes in the future. However, the following is implemented:
- Inode metadata storage in etcd
- Snapshots and copy-on-write image clones
- Write throttling to smooth random write workloads in SSD+HDD configurations
- RDMA/RoCEv2 support via libibverbs
## Roadmap
@ -54,7 +55,7 @@ breaking changes in the future. However, the following is implemented:
- Scrubbing without checksums (verification of replicas)
- Checksums
- Tiered storage
- RDMA and NVDIMM support
- NVDIMM support
- Web GUI
- Compression (possibly)
- Read caching using system page cache (possibly)

3
src/messenger.cpp

@ -139,9 +139,6 @@ void osd_messenger_t::parse_config(const json11::Json & config)
this->rdma_gid_index = (uint8_t)config["rdma_gid_index"].uint64_value();
this->rdma_mtu = (uint32_t)config["rdma_mtu"].uint64_value();
#endif
this->bs_bitmap_granularity = strtoull(config["bitmap_granularity"].string_value().c_str(), NULL, 10);
if (!this->bs_bitmap_granularity)
this->bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
this->use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
config["use_sync_send_recv"].uint64_value();
this->peer_connect_interval = config["peer_connect_interval"].uint64_value();

3
src/messenger.h

@ -128,7 +128,6 @@ protected:
int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
int osd_idle_timeout = DEFAULT_OSD_PING_TIMEOUT;
int osd_ping_timeout = DEFAULT_OSD_PING_TIMEOUT;
uint32_t bs_bitmap_granularity = 0;
int log_level = 0;
bool use_sync_send_recv = false;
@ -137,6 +136,7 @@ protected:
std::string rdma_device;
uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0;
msgr_rdma_context_t *rdma_context = NULL;
// FIXME: Allow to configure these options
uint64_t rdma_max_sge = 128, rdma_max_send = 32, rdma_max_recv = 32;
#endif
@ -189,6 +189,7 @@ protected:
void handle_send(int result, osd_client_t *cl);
bool handle_read(int result, osd_client_t *cl);
bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain);
bool handle_finished_read(osd_client_t *cl);
void handle_op_hdr(osd_client_t *cl);
bool handle_reply_hdr(osd_client_t *cl);

136
src/msgr_rdma.cpp

@ -1,3 +1,6 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include "msgr_rdma.h"
@ -355,57 +358,34 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl)
// Only send one batch at a time
return true;
}
int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity;
uint64_t op_size = 0, op_sge = 0;
ibv_sge sge[rc->max_sge];
while (rc->send_pos < cl->send_list.size())
{
iovec & iov = cl->send_list[rc->send_pos];
if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR)
if (op_size >= RDMA_MAX_MSG || op_sge >= rc->max_sge)
{
if (op_sge > 0)
{
try_send_rdma_wr(cl, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
break;
}
assert(rc->send_buf_pos == 0);
sge[0] = {
.addr = (uintptr_t)iov.iov_base,
.length = (uint32_t)iov.iov_len,
.lkey = rc->ctx->mr->lkey,
};
try_send_rdma_wr(cl, sge, 1);
rc->send_pos++;
try_send_rdma_wr(cl, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
{
break;
}
}
else
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < RDMA_MAX_MSG
? iov.iov_len-rc->send_buf_pos : RDMA_MAX_MSG-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
};
op_size += len;
rc->send_buf_pos += len;
if (rc->send_buf_pos >= iov.iov_len)
{
if (op_size >= op_max || op_sge >= rc->max_sge)
{
try_send_rdma_wr(cl, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_send >= rc->max_send)
break;
}
// Fragment all messages into parts no longer than (max_sge*4k) = 120k on ConnectX-4
// Otherwise the client may not be able to receive them in small parts
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < op_max ? iov.iov_len-rc->send_buf_pos : op_max-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)(iov.iov_base+rc->send_buf_pos),
.length = len,
.lkey = rc->ctx->mr->lkey,
};
op_size += len;
rc->send_buf_pos += len;
if (rc->send_buf_pos >= iov.iov_len)
{
rc->send_pos++;
rc->send_buf_pos = 0;
}
rc->send_pos++;
rc->send_buf_pos = 0;
}
}
if (op_sge > 0)
@ -435,52 +415,16 @@ static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge)
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl)
{
auto rc = cl->rdma_conn;
if (rc->cur_recv > 0)
while (rc->cur_recv < rc->max_recv)
{
return true;
}
if (!cl->recv_list.get_size())
{
cl->recv_list.reset();
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity;
iovec *segments = cl->recv_list.get_iovec();
ibv_sge sge[rc->max_sge];
while (rc->recv_pos < cl->recv_list.get_size())
{
iovec & iov = segments[rc->recv_pos];
if (op_size >= op_max || op_sge >= rc->max_sge)
{
try_recv_rdma_wr(cl, sge, op_sge);
op_sge = 0;
op_size = 0;
if (rc->cur_recv >= rc->max_recv)
break;
}
// Receive in identical (max_sge*4k) fragments
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max ? iov.iov_len-rc->recv_buf_pos : op_max-op_size);
sge[op_sge++] = {
.addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos),
.length = len,
void *buf = malloc_or_die(RDMA_MAX_MSG);
rc->recv_buffers.push_back(buf);
ibv_sge sge = {
.addr = (uintptr_t)buf,
.length = RDMA_MAX_MSG,
.lkey = rc->ctx->mr->lkey,
};
op_size += len;
rc->recv_buf_pos += len;
if (rc->recv_buf_pos >= iov.iov_len)
{
rc->recv_pos++;
rc->recv_buf_pos = 0;
}
}
if (op_sge > 0)
{
try_recv_rdma_wr(cl, sge, op_sge);
try_recv_rdma_wr(cl, &sge, 1);
}
return true;
}
@ -531,24 +475,10 @@ void osd_messenger_t::handle_rdma_events()
if (!is_send)
{
cl->rdma_conn->cur_recv--;
if (!cl->rdma_conn->cur_recv)
{
cl->recv_list.done += cl->rdma_conn->recv_pos;
cl->rdma_conn->recv_pos = 0;
if (!cl->recv_list.get_size())
{
cl->read_remaining = 0;
if (handle_finished_read(cl))
{
try_recv_rdma(cl);
}
}
else
{
// Continue to receive data
try_recv_rdma(cl);
}
}
handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len);
free(cl->rdma_conn->recv_buffers[0]);
cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1);
try_recv_rdma(cl);
}
else
{

7
src/msgr_rdma.h

@ -1,8 +1,14 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#pragma once
#include <infiniband/verbs.h>
#include <string>
#include <vector>
// FIXME: Allow to configure this option
#define RDMA_MAX_MSG 4194304
struct msgr_rdma_address_t
{
ibv_gid gid;
@ -46,6 +52,7 @@ struct msgr_rdma_connection_t
int send_pos = 0, send_buf_pos = 0;
int recv_pos = 0, recv_buf_pos = 0;
std::vector<void*> recv_buffers;
~msgr_rdma_connection_t();
static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge);

89
src/msgr_receive.cpp

@ -91,48 +91,9 @@ bool osd_messenger_t::handle_read(int result, osd_client_t *cl)
{
if (cl->read_iov.iov_base == cl->in_buf)
{
// Compose operation(s) from the buffer
int remain = result;
void *curbuf = cl->in_buf;
while (remain > 0)
if (!handle_read_buffer(cl, cl->in_buf, result))
{
if (!cl->read_op)
{
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
while (cl->recv_list.done < cl->recv_list.count && remain > 0)
{
iovec* cur = cl->recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl->read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl->read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl->recv_list.done++;
}
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
goto fin;
}
}
goto fin;
}
}
else
@ -159,6 +120,52 @@ fin:
return ret;
}
bool osd_messenger_t::handle_read_buffer(osd_client_t *cl, void *curbuf, int remain)
{
// Compose operation(s) from the buffer
while (remain > 0)
{
if (!cl->read_op)
{
cl->read_op = new osd_op_t;
cl->read_op->peer_fd = cl->peer_fd;
cl->read_op->op_type = OSD_OP_IN;
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE);
cl->read_remaining = OSD_PACKET_SIZE;
cl->read_state = CL_READ_HDR;
}
while (cl->recv_list.done < cl->recv_list.count && remain > 0)
{
iovec* cur = cl->recv_list.get_iovec();
if (cur->iov_len > remain)
{
memcpy(cur->iov_base, curbuf, remain);
cl->read_remaining -= remain;
cur->iov_len -= remain;
cur->iov_base += remain;
remain = 0;
}
else
{
memcpy(cur->iov_base, curbuf, cur->iov_len);
curbuf += cur->iov_len;
cl->read_remaining -= cur->iov_len;
remain -= cur->iov_len;
cur->iov_len = 0;
cl->recv_list.done++;
}
}
if (cl->recv_list.done >= cl->recv_list.count)
{
if (!handle_finished_read(cl))
{
return false;
}
}
}
return true;
}
bool osd_messenger_t::handle_finished_read(osd_client_t *cl)
{
cl->recv_list.reset();

Loading…
Cancel
Save