From b4b6407716428ce76218957ccc604c840243f819 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 24 Feb 2023 17:19:55 +0300 Subject: [PATCH] WIP Implement RDMA v2 based on IBV_WR_RDMA_WRITE with remote buffer management One BIG FIXME remaining - handling large operations :)) --- src/CMakeLists.txt | 7 +- src/freelist.cpp | 63 +++++++ src/freelist.h | 23 +++ src/messenger.cpp | 32 ++-- src/messenger.h | 12 +- src/msgr_rdma.cpp | 415 +++++++++++++++++++++++++++++++----------- src/msgr_rdma.h | 37 +++- src/msgr_receive.cpp | 21 ++- src/msgr_send.cpp | 1 + src/msgr_stop.cpp | 1 + src/osd_secondary.cpp | 11 +- src/test_freelist.cpp | 64 +++++++ 12 files changed, 548 insertions(+), 139 deletions(-) create mode 100644 src/freelist.cpp create mode 100644 src/freelist.h create mode 100644 src/test_freelist.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bc0d3cc9..32a36513 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -98,7 +98,7 @@ endif (${WITH_FIO}) # libvitastor_common.a set(MSGR_RDMA "") if (IBVERBS_LIBRARIES) - set(MSGR_RDMA "msgr_rdma.cpp") + set(MSGR_RDMA msgr_rdma.cpp freelist.cpp allocator.cpp) endif (IBVERBS_LIBRARIES) add_library(vitastor_common STATIC epoll_manager.cpp etcd_state_client.cpp messenger.cpp addr_util.cpp @@ -278,6 +278,11 @@ add_executable(test_allocator EXCLUDE_FROM_ALL test_allocator.cpp allocator.cpp) add_dependencies(build_tests test_allocator) add_test(NAME test_allocator COMMAND test_allocator) +# test_freelist +add_executable(test_freelist EXCLUDE_FROM_ALL test_freelist.cpp) +add_dependencies(build_tests test_freelist) +add_test(NAME test_freelist COMMAND test_freelist) + # test_cas add_executable(test_cas test_cas.cpp diff --git a/src/freelist.cpp b/src/freelist.cpp new file mode 100644 index 00000000..8463c576 --- /dev/null +++ b/src/freelist.cpp @@ -0,0 +1,63 @@ +// Copyright (c) Vitaliy Filippov, 2023+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#include +#include "freelist.h" + +uint64_t freelist_allocator_t::alloc(uint64_t data_size) +{ + for (int i = 0; i < freelist.size(); i++) + { + if (freelist[i].size >= data_size) + { + uint64_t r = freelist[i].start; + freelist[i].start += data_size; + freelist[i].size -= data_size; + return r; + } + } + return UINT64_MAX; +} + +void freelist_allocator_t::free(uint64_t start, uint64_t size) +{ + int min = 0, max = freelist.size(); + if (max && freelist[freelist.size()-1].start < start) + { + min = max; + } + if (max && freelist[0].start >= start) + { + max = 0; + } + while (max-min > 1) + { + int mid = (min+max)/2; + if (freelist[mid].start >= start) + max = mid; + else + min = mid; + } + // max = the first item where freelist[max].start >= start + if (max > 0 && freelist[max-1].start+freelist[max-1].size >= start) + { + assert(freelist[max-1].start+freelist[max-1].size == start); + freelist[max-1].size += size; + } + else if (max < freelist.size() && freelist[max].start <= size+start) + { + assert(freelist[max].start == size+start); + freelist[max].start -= size; + freelist[max].size += size; + } + else + { + freelist.insert(freelist.begin()+min, (freelist_item_t){ .start = start, .size = size }); + max = min; // to skip the if below + } + if (min != max && max < freelist.size() && freelist[max].start == freelist[min].start+freelist[min].size) + { + freelist[min].size += freelist[max].size; + freelist.erase(freelist.begin()+max, freelist.begin()+max+1); + } +} diff --git a/src/freelist.h b/src/freelist.h new file mode 100644 index 00000000..76da1e65 --- /dev/null +++ b/src/freelist.h @@ -0,0 +1,23 @@ +// Copyright (c) Vitaliy Filippov, 2023+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#pragma once + +#include +#include + +struct freelist_item_t +{ + uint64_t start, size; +}; + +// Really trivial freelist allocator +// Should be fine for remote RDMA memory management because +// most of the time fragmentation shouldn't be an issue as all +// memory regions are short-lived +struct freelist_allocator_t +{ + std::vector freelist; + uint64_t alloc(uint64_t data_size); + void free(uint64_t start, uint64_t size); +}; diff --git a/src/messenger.cpp b/src/messenger.cpp index 50b557b1..2a39bf74 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -157,13 +157,16 @@ void osd_messenger_t::parse_config(const json11::Json & config) this->rdma_max_sge = 128; this->rdma_max_send = config["rdma_max_send"].uint64_value(); if (!this->rdma_max_send) - this->rdma_max_send = 1; + this->rdma_max_send = 128; this->rdma_max_recv = config["rdma_max_recv"].uint64_value(); if (!this->rdma_max_recv) this->rdma_max_recv = 128; - this->rdma_max_msg = config["rdma_max_msg"].uint64_value(); - if (!this->rdma_max_msg || this->rdma_max_msg > 128*1024*1024) - this->rdma_max_msg = 129*1024; + this->rdma_op_slots = config["rdma_op_slots"].uint64_value(); + if (!this->rdma_op_slots || this->rdma_op_slots >= 1024*1024) + this->rdma_op_slots = 4096; + this->rdma_op_memory = config["rdma_op_memory"].uint64_value(); + if (!this->rdma_op_memory || this->rdma_op_memory >= 1024*1024*1024) + this->rdma_op_memory = 16*1024*1024; #endif this->receive_buffer_size = (uint32_t)config["tcp_header_buffer_size"].uint64_value(); if (!this->receive_buffer_size || this->receive_buffer_size > 1024*1024*1024) @@ -388,12 +391,16 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) #ifdef WITH_RDMA if (rdma_context) { - cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); + cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory); if (cl->rdma_conn) { + clients_by_qp[cl->rdma_conn->qp->qp_num] = cl->peer_fd; json11::Json payload = json11::Json::object { { "connect_rdma", cl->rdma_conn->addr.to_string() }, - { "rdma_max_msg", cl->rdma_conn->max_msg }, + { "rdma_data_rkey", (uint64_t)cl->rdma_conn->in_data_mr->rkey }, + { "rdma_op_rkey", (uint64_t)cl->rdma_conn->in_op_mr->rkey }, + { "rdma_op_slots", cl->rdma_conn->op_slots }, + { "rdma_op_memory", cl->rdma_conn->op_memory }, }; std::string payload_str = payload.dump(); op->req.show_conf.json_len = payload_str.size(); @@ -453,12 +460,14 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) { msgr_rdma_address_t addr; if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) || + config["rdma_op_memory"].uint64_value() == 0 || cl->rdma_conn->connect(&addr) != 0) { fprintf( stderr, "Failed to connect to OSD %lu (address %s) using RDMA\n", cl->osd_num, config["rdma_address"].string_value().c_str() ); + clients_by_qp.erase(cl->rdma_conn->qp->qp_num); delete cl->rdma_conn; cl->rdma_conn = NULL; // FIXME: Keep TCP connection in this case @@ -470,11 +479,12 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) } else { - uint64_t server_max_msg = config["rdma_max_msg"].uint64_value(); - if (cl->rdma_conn->max_msg > server_max_msg) - { - cl->rdma_conn->max_msg = server_max_msg; - } + cl->rdma_conn->set_out_capacity( + config["rdma_data_rkey"].uint64_value(), + config["rdma_op_rkey"].uint64_value(), + config["rdma_op_slots"].uint64_value(), + config["rdma_op_memory"].uint64_value() + ); if (log_level > 0) { fprintf(stderr, "Connected to OSD %lu using RDMA\n", cl->osd_num); diff --git a/src/messenger.h b/src/messenger.h index 238e5164..0af18797 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -37,6 +37,7 @@ #define MSGR_SENDP_HDR 1 #define MSGR_SENDP_FREE 2 +#define MSGR_SENDP_LAST 4 struct msgr_sendp_t { @@ -131,9 +132,10 @@ protected: bool use_rdma = true; std::string rdma_device; uint64_t rdma_port_num = 1, rdma_gid_index = 0, rdma_mtu = 0; - msgr_rdma_context_t *rdma_context = NULL; uint64_t rdma_max_sge = 0, rdma_max_send = 0, rdma_max_recv = 0; - uint64_t rdma_max_msg = 0; + uint64_t rdma_op_slots = 0, rdma_op_memory = 0; + msgr_rdma_context_t *rdma_context = NULL; + std::map clients_by_qp; #endif std::vector read_ready_clients; @@ -170,7 +172,8 @@ public: #ifdef WITH_RDMA bool is_rdma_enabled(); - bool connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg); + bool connect_rdma(int peer_fd, std::string rdma_address, + uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory); #endif protected: @@ -191,12 +194,13 @@ protected: bool handle_read_buffer(osd_client_t *cl, void *curbuf, int remain); bool handle_finished_read(osd_client_t *cl); void handle_op_hdr(osd_client_t *cl); - bool handle_reply_hdr(osd_client_t *cl); + bool handle_reply_hdr(void *reply_hdr, osd_client_t *cl); void handle_reply_ready(osd_op_t *op); #ifdef WITH_RDMA bool try_send_rdma(osd_client_t *cl); bool try_recv_rdma(osd_client_t *cl); void handle_rdma_events(); + bool rdma_handle_op(osd_client_t *cl, uint32_t op_slot); #endif }; diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index 1d267274..745be67b 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -46,9 +46,20 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t() ctx->used_max_cqe -= max_send+max_recv; if (qp) ibv_destroy_qp(qp); - if (recv_buffers.size()) - for (auto b: recv_buffers) - free(b); + if (in_data_mr) + ibv_dereg_mr(in_data_mr); + if (in_op_mr) + ibv_dereg_mr(in_op_mr); + if (in_data_buf) + free(in_data_buf); + if (in_ops) + free(in_ops); + if (out_op_alloc) + delete out_op_alloc; + if (out_slot_data) + free(out_slot_data); + if (out_slot_ops) + free(out_slot_ops); } msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) @@ -149,7 +160,7 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); if (!ctx->mr) { - fprintf(stderr, "Couldn't register RDMA memory region\n"); + fprintf(stderr, "Couldn't register global RDMA memory region: %s\n", strerror(errno)); goto cleanup; } @@ -180,7 +191,7 @@ cleanup: } msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, - uint32_t max_recv, uint32_t max_sge, uint32_t max_msg) + uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory) { msgr_rdma_connection_t *conn = new msgr_rdma_connection_t; @@ -190,7 +201,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, conn->max_send = max_send; conn->max_recv = max_recv; conn->max_sge = max_sge; - conn->max_msg = max_msg; ctx->used_max_cqe += max_send+max_recv; if (ctx->used_max_cqe > ctx->max_cqe) @@ -211,6 +221,30 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, ctx->max_cqe = new_max_cqe; } + conn->op_memory = op_memory; + conn->in_data_buf = memalign_or_die(MEM_ALIGNMENT, op_memory); + conn->in_data_mr = ibv_reg_mr(ctx->pd, conn->in_data_buf, op_memory, + IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND); + if (!conn->in_data_mr) + { + fprintf(stderr, "Couldn't register %lu MB RDMA memory region for incoming data: %s\n", + (op_memory+1024*1024-1)/1024/1024, strerror(errno)); + delete conn; + return NULL; + } + + conn->op_slots = op_slots; + conn->in_ops = (msgr_rdma_cmd_t *)malloc_or_die(sizeof(msgr_rdma_cmd_t) * op_slots); + conn->in_op_mr = ibv_reg_mr(ctx->pd, conn->in_ops, sizeof(msgr_rdma_cmd_t) * op_slots, + IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND); + if (!conn->in_op_mr) + { + fprintf(stderr, "Couldn't register %lu KB RDMA memory region for incoming operation headers: %s\n", + (sizeof(msgr_rdma_cmd_t) * op_slots + 1023)/1024, strerror(errno)); + delete conn; + return NULL; + } + ibv_qp_init_attr init_attr = { .send_cq = ctx->cq, .recv_cq = ctx->cq, @@ -237,7 +271,7 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, ibv_qp_attr attr = { .qp_state = IBV_QPS_INIT, - .qp_access_flags = 0, + .qp_access_flags = IBV_ACCESS_REMOTE_WRITE, .pkey_index = 0, .port_num = ctx->ib_port, }; @@ -265,6 +299,19 @@ static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu) return IBV_MTU_4096; } +void msgr_rdma_connection_t::set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory) +{ + assert(!out_op_alloc); + this->out_data_rkey = out_data_rkey; + this->out_op_rkey = out_op_rkey; + this->out_op_slots = out_op_slots; + this->out_op_memory = out_op_memory; + out_op_alloc = new allocator(out_op_slots); + out_data_alloc.free(0, out_op_memory); + out_slot_data = (msgr_rdma_out_pos_t *)malloc_or_die(sizeof(msgr_rdma_out_pos_t) * out_op_slots); + out_slot_ops = (osd_op_t **)malloc_or_die(sizeof(osd_op_t *) * out_op_slots); +} + int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) { auto conn = this; @@ -311,17 +358,14 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) return 0; } -bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg) +bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, + uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory) { // Try to connect to the peer using RDMA msgr_rdma_address_t addr; if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr)) { - if (client_max_msg > rdma_max_msg) - { - client_max_msg = rdma_max_msg; - } - auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg); + auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory); if (rdma_conn) { int r = rdma_conn->connect(&addr); @@ -336,6 +380,8 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64 else { // Remember connection, but switch to RDMA only after sending the configuration response + clients_by_qp[rdma_conn->qp->qp_num] = peer_fd; + rdma_conn->set_out_capacity(out_data_rkey, out_op_rkey, out_op_slots, out_op_memory); auto cl = clients.at(peer_fd); cl->rdma_conn = rdma_conn; cl->peer_state = PEER_RDMA_CONNECTING; @@ -346,66 +392,161 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64 return false; } -static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) -{ - ibv_send_wr *bad_wr = NULL; - ibv_send_wr wr = { - .wr_id = (uint64_t)(cl->peer_fd*2+1), - .sg_list = sge, - .num_sge = op_sge, - .opcode = IBV_WR_SEND, - .send_flags = IBV_SEND_SIGNALED, - }; - int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); - if (err || bad_wr) - { - fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); - exit(1); - } - cl->rdma_conn->cur_send++; -} - bool osd_messenger_t::try_send_rdma(osd_client_t *cl) { auto rc = cl->rdma_conn; - if (!cl->send_list.size() || rc->cur_send > 0) + if (!cl->send_list.size() && !rc->in_slots_freed.size() || rc->cur_send >= rc->max_send) { - // Only send one batch at a time return true; } - uint64_t op_size = 0, op_sge = 0; - ibv_sge sge[rc->max_sge]; - while (rc->send_pos < cl->send_list.size()) + int i = 0; + while (i < rc->in_slots_freed.size()) { - iovec & iov = cl->send_list[rc->send_pos]; - if (op_size >= rc->max_msg || op_sge >= rc->max_sge) - { - try_send_rdma_wr(cl, sge, op_sge); - op_sge = 0; - op_size = 0; - if (rc->cur_send >= rc->max_send) - { - break; - } - } - uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg - ? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size); - sge[op_sge++] = { - .addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos), - .length = len, - .lkey = rc->ctx->mr->lkey, + auto op_slot = rc->in_slots_freed[i++]; + assert(op_slot < 0x80000000); + ibv_send_wr *bad_wr = NULL; + ibv_send_wr wr = { + .wr_id = 0, + .opcode = IBV_WR_RDMA_WRITE_WITH_IMM, + .imm_data = 0x80000000 | op_slot, }; - op_size += len; - rc->send_buf_pos += len; - if (rc->send_buf_pos >= iov.iov_len) + int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); + if (err || bad_wr) { - rc->send_pos++; - rc->send_buf_pos = 0; + fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); + exit(1); + } + rc->cur_send++; + if (rc->cur_send >= rc->max_send) + { + break; } } - if (op_sge > 0) + rc->in_slots_freed.erase(rc->in_slots_freed.begin(), rc->in_slots_freed.begin()+i); + if (!cl->send_list.size() || rc->cur_send >= rc->max_send) { - try_send_rdma_wr(cl, sge, op_sge); + return true; + } + ibv_sge sge[rc->max_sge]; + int op_start = 0; + while (op_start < cl->send_list.size()) + { + uint64_t op_data_size = 0; + int op_end = op_start; + while (!(cl->outbox[op_end].flags & MSGR_SENDP_LAST)) + { + op_data_size += cl->send_list[op_end].iov_len; + op_end++; + } + op_data_size += cl->send_list[op_end].iov_len; + op_end++; + op_data_size -= cl->send_list[op_start].iov_len; + // Operation boundaries in send_list: op_start..op_end, first iovec is the header + uint64_t op_slot = rc->out_op_alloc->find_free(); + if (op_slot == UINT64_MAX) + { + // op queue is full + return true; + } + uint64_t data_pos = UINT64_MAX; + if (op_data_size >= 0) + { + if (rc->cur_send > rc->max_send-1-(op_end-op_start-1+rc->max_sge)/rc->max_sge) + { + // RDMA queue is full + return true; + } + // FIXME: Oops, and what if op data is larger than the whole buffer... :) + data_pos = rc->out_data_alloc.alloc(op_data_size); + if (data_pos == UINT64_MAX) + { + // data buffers are full + return true; + } + int cur_sge = 0; + for (int data_sent = 1; data_sent < op_end; data_sent++) + { + sge[cur_sge++] = { + .addr = (uintptr_t)cl->send_list[data_sent].iov_base, + .length = (uint32_t)cl->send_list[data_sent].iov_len, + .lkey = rc->ctx->mr->lkey, + }; + if (data_sent == op_end-1 || cur_sge >= rc->max_sge) + { + ibv_send_wr *bad_wr = NULL; + ibv_send_wr wr = { + .wr_id = op_slot, + .next = NULL, + .sg_list = sge, + .num_sge = cur_sge, + .opcode = IBV_WR_RDMA_WRITE, + .send_flags = 0, + .wr = { + .rdma = { + .remote_addr = data_pos, + .rkey = rc->out_data_rkey, + }, + }, + }; + int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); + if (err || bad_wr) + { + fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); + exit(1); + } + rc->cur_send++; + cur_sge = 0; + } + } + } + if (rc->cur_send > rc->max_send-1) + { + // RDMA queue is full + return true; + } + rc->out_op_alloc->set(op_slot, true); + assert(cl->send_list[op_start].iov_len == OSD_PACKET_SIZE); + sge[0] = { + .addr = (uintptr_t)cl->send_list[op_start].iov_base, + .length = (uint32_t)cl->send_list[op_start].iov_len, + .lkey = rc->ctx->mr->lkey, + }; + rc->out_slot_data[op_slot] = { .data_pos = data_pos, .data_size = op_data_size }; + rc->out_slot_ops[op_slot] = (cl->outbox[op_end-1].flags & MSGR_SENDP_FREE) + ? cl->outbox[op_end-1].op : NULL; + sge[1] = { + .addr = (uintptr_t)(rc->out_slot_data+op_slot), + .length = sizeof(rc->out_slot_data[op_slot]), + .lkey = rc->ctx->mr->lkey, + }; + ibv_send_wr *bad_wr = NULL; + ibv_send_wr wr = { + .wr_id = op_slot, + .next = NULL, + .sg_list = sge, + .num_sge = 2, + .opcode = IBV_WR_RDMA_WRITE_WITH_IMM, + .send_flags = IBV_SEND_SIGNALED, + .imm_data = (uint32_t)op_slot, + .wr = { + .rdma = { + .remote_addr = op_slot*sizeof(msgr_rdma_cmd_t), + .rkey = rc->out_op_rkey, + }, + }, + }; + int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); + if (err || bad_wr) + { + fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); + exit(1); + } + rc->cur_send++; + op_start = op_end; + } + if (op_start > 0) + { + cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_start); } return true; } @@ -427,23 +568,87 @@ static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) cl->rdma_conn->cur_recv++; } +static void copy_data_to_recv_list(uint8_t *data_buf, uint64_t data_size, osd_client_t *cl) +{ + uint64_t pos = 0; + while (cl->recv_list.done < cl->recv_list.count) + { + uint64_t cur = cl->recv_list.buf[cl->recv_list.done].iov_len; + assert(cur <= data_size-pos); + memcpy(cl->recv_list.buf[cl->recv_list.done].iov_base, data_buf+pos, cur); + pos += cur; + } + cl->recv_list.reset(); +} + bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) { auto rc = cl->rdma_conn; while (rc->cur_recv < rc->max_recv) { - void *buf = malloc_or_die(rc->max_msg); - rc->recv_buffers.push_back(buf); - ibv_sge sge = { - .addr = (uintptr_t)buf, - .length = (uint32_t)rc->max_msg, - .lkey = rc->ctx->mr->lkey, - }; - try_recv_rdma_wr(cl, &sge, 1); + try_recv_rdma_wr(cl, NULL, 0); } return true; } +bool osd_messenger_t::rdma_handle_op(osd_client_t *cl, uint32_t op_slot) +{ + auto rc = cl->rdma_conn; + if (op_slot >= rc->in_op_cap) + { + // Invalid incoming index + fprintf(stderr, "Client %d invalid incoming RDMA op slot: %u, dropping connection\n", cl->peer_fd, op_slot); + stop_client(cl->peer_fd); + return false; + } + osd_op_header_t *hdr = (osd_op_header_t *)rc->in_ops[op_slot].header; + uint8_t *data_buf = (uint8_t*)rc->in_data_buf + rc->in_ops[op_slot].pos.data_pos; + uint64_t data_size = rc->in_ops[op_slot].pos.data_size; + if (hdr->magic == SECONDARY_OSD_REPLY_MAGIC) + { + // Reply + if (cl->read_op) + { + delete cl->read_op; + cl->read_op = NULL; + } + if (!handle_reply_hdr(rc->in_ops[op_slot].header, cl)) + return false; + if (cl->read_state == CL_READ_REPLY_DATA) + { + // copy reply data to cl->recv_list + copy_data_to_recv_list(data_buf, data_size, cl); + // and handle reply with data + handle_reply_ready(cl->read_op); + cl->read_op = NULL; + cl->read_state = 0; + cl->read_remaining = 0; + } + } + else + { + // Operation + cl->read_op = new osd_op_t; + cl->read_op->peer_fd = cl->peer_fd; + cl->read_op->op_type = OSD_OP_IN; + memcpy(&cl->read_op->req, hdr, OSD_PACKET_SIZE); + handle_op_hdr(cl); + if (cl->read_state == CL_READ_DATA) + { + copy_data_to_recv_list(data_buf, data_size, cl); + // And handle the incoming op with data + cl->received_ops.push_back(cl->read_op); + set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); }); + cl->read_op = NULL; + cl->read_state = 0; + } + } + // We don't need the incoming data buffer anymore, notify peer about it + // FIXME: Allow to pass memory to the internal layer without copying and notify after handling it + rc->in_slots_freed.push_back(op_slot); + return true; +} + #define RDMA_EVENTS_AT_ONCE 32 void osd_messenger_t::handle_rdma_events() @@ -468,9 +673,9 @@ void osd_messenger_t::handle_rdma_events() event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); for (int i = 0; i < event_count; i++) { - int client_id = wc[i].wr_id >> 1; - bool is_send = wc[i].wr_id & 1; - auto cl_it = clients.find(client_id); + auto cqp_it = clients_by_qp.find(wc[i].qp_num); + int peer_fd = cqp_it != clients_by_qp.end() ? cqp_it->second : -1; + auto cl_it = clients.find(peer_fd); if (cl_it == clients.end()) { continue; @@ -478,55 +683,51 @@ void osd_messenger_t::handle_rdma_events() osd_client_t *cl = cl_it->second; if (wc[i].status != IBV_WC_SUCCESS) { - fprintf(stderr, "RDMA work request failed for client %d", client_id); + fprintf(stderr, "RDMA work request failed for client %d", peer_fd); if (cl->osd_num) - { fprintf(stderr, " (OSD %lu)", cl->osd_num); - } fprintf(stderr, " with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); - stop_client(client_id); + if (peer_fd >= 0) + stop_client(peer_fd); continue; } - if (!is_send) + auto rc = cl->rdma_conn; + if (wc[i].opcode == IBV_WC_RDMA_WRITE) { - cl->rdma_conn->cur_recv--; - if (!handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len)) + // Operation or reply is sent, we can free it + auto & op = rc->out_slot_ops[wc[i].wr_id]; + if (op) { - // handle_read_buffer may stop the client - continue; + delete op; + op = NULL; } - free(cl->rdma_conn->recv_buffers[0]); - cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1); - try_recv_rdma(cl); + rc->cur_send--; + try_send_rdma(cl); } - else + else if (wc[i].opcode == IBV_WC_RECV) { - cl->rdma_conn->cur_send--; - if (!cl->rdma_conn->cur_send) + if (!(wc[i].imm_data & 0x80000000)) { - // Wait for the whole batch - for (int i = 0; i < cl->rdma_conn->send_pos; i++) + // Operation or reply received. Handle it + if (!rdma_handle_op(cl, wc[i].imm_data)) { - if (cl->outbox[i].flags & MSGR_SENDP_FREE) - { - // Reply fully sent - delete cl->outbox[i].op; - } + // false means that the client is stopped due to invalid operation + continue; } - if (cl->rdma_conn->send_pos > 0) - { - cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); - cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); - cl->rdma_conn->send_pos = 0; - } - if (cl->rdma_conn->send_buf_pos > 0) - { - cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + cl->rdma_conn->send_buf_pos; - cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos; - cl->rdma_conn->send_buf_pos = 0; - } - try_send_rdma(cl); + rc->cur_recv--; + try_recv_rdma(cl); } + else + { + // Outbox slot is marked as free (the remote side doesn't need it anymore) + uint32_t op_slot = wc[i].imm_data & 0x7FFFFFFF; + auto & pos = rc->in_ops[op_slot].pos; + if (pos.data_size > 0) + rc->out_data_alloc.free(pos.data_pos, pos.data_size); + rc->out_op_alloc->set(op_slot, false); + } + // Try to continue sending + try_send_rdma(cl); } } } while (event_count > 0); diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h index 0789a02d..6da56b22 100644 --- a/src/msgr_rdma.h +++ b/src/msgr_rdma.h @@ -5,6 +5,11 @@ #include #include #include +#include "allocator.h" +#include "freelist.h" +#include "osd_ops.h" + +struct osd_op_t; struct msgr_rdma_address_t { @@ -39,6 +44,17 @@ struct msgr_rdma_context_t ~msgr_rdma_context_t(); }; +struct msgr_rdma_out_pos_t +{ + uint64_t data_pos, data_size; +}; + +struct msgr_rdma_cmd_t +{ + uint8_t header[OSD_PACKET_SIZE]; + msgr_rdma_out_pos_t pos; +}; + struct msgr_rdma_connection_t { msgr_rdma_context_t *ctx = NULL; @@ -46,13 +62,24 @@ struct msgr_rdma_connection_t msgr_rdma_address_t addr; int max_send = 0, max_recv = 0, max_sge = 0; int cur_send = 0, cur_recv = 0; - uint64_t max_msg = 0; + uint64_t op_slots = 0, op_memory = 0; - int send_pos = 0, send_buf_pos = 0; - int recv_pos = 0, recv_buf_pos = 0; - std::vector recv_buffers; + ibv_mr *in_data_mr = NULL, *in_op_mr = NULL; + msgr_rdma_cmd_t *in_ops = NULL; + int in_op_cap = 0; + void *in_data_buf = NULL; + std::vector in_slots_freed; + + uint32_t out_data_rkey = 0, out_op_rkey = 0; + uint64_t out_op_slots = 0, out_op_memory = 0; + allocator *out_op_alloc = NULL; + freelist_allocator_t out_data_alloc; + msgr_rdma_out_pos_t *out_slot_data = NULL; + osd_op_t **out_slot_ops = NULL; ~msgr_rdma_connection_t(); - static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg); + static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, + uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory); int connect(msgr_rdma_address_t *dest); + void set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory); }; diff --git a/src/msgr_receive.cpp b/src/msgr_receive.cpp index b150f3c3..e2b866e2 100644 --- a/src/msgr_receive.cpp +++ b/src/msgr_receive.cpp @@ -172,7 +172,7 @@ bool osd_messenger_t::handle_finished_read(osd_client_t *cl) if (cl->read_state == CL_READ_HDR) { if (cl->read_op->req.hdr.magic == SECONDARY_OSD_REPLY_MAGIC) - return handle_reply_hdr(cl); + return handle_reply_hdr(cl->read_op->req.buf, cl); else if (cl->read_op->req.hdr.magic == SECONDARY_OSD_OP_MAGIC) handle_op_hdr(cl); else @@ -286,7 +286,7 @@ void osd_messenger_t::handle_op_hdr(osd_client_t *cl) } } -bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) +bool osd_messenger_t::handle_reply_hdr(void *reply_hdr, osd_client_t *cl) { auto req_it = cl->sent_ops.find(cl->read_op->req.hdr.id); if (req_it == cl->sent_ops.end()) @@ -297,7 +297,7 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) return false; } osd_op_t *op = req_it->second; - memcpy(op->reply.buf, cl->read_op->req.buf, OSD_PACKET_SIZE); + memcpy(op->reply.buf, reply_hdr, OSD_PACKET_SIZE); cl->sent_ops.erase(req_it); if (op->reply.hdr.opcode == OSD_OP_SEC_READ || op->reply.hdr.opcode == OSD_OP_READ) { @@ -328,14 +328,16 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) { goto reuse; } - delete cl->read_op; + if (cl->read_op) + delete cl->read_op; cl->read_op = op; cl->read_state = CL_READ_REPLY_DATA; } else if (op->reply.hdr.opcode == OSD_OP_SEC_LIST && op->reply.hdr.retval > 0) { assert(!op->iov.count); - delete cl->read_op; + if (cl->read_op) + delete cl->read_op; cl->read_op = op; cl->read_state = CL_READ_REPLY_DATA; cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval; @@ -345,7 +347,8 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) else if (op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP && op->reply.hdr.retval > 0) { assert(!op->iov.count); - delete cl->read_op; + if (cl->read_op) + delete cl->read_op; cl->read_op = op; cl->read_state = CL_READ_REPLY_DATA; cl->read_remaining = op->reply.hdr.retval; @@ -355,7 +358,8 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) } else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0) { - delete cl->read_op; + if (cl->read_op) + delete cl->read_op; cl->read_op = op; cl->read_state = CL_READ_REPLY_DATA; cl->read_remaining = op->reply.hdr.retval; @@ -368,7 +372,8 @@ bool osd_messenger_t::handle_reply_hdr(osd_client_t *cl) reuse: // It's fine to reuse cl->read_op for the next reply handle_reply_ready(op); - cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); + if (cl->read_op) + cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); cl->read_remaining = OSD_PACKET_SIZE; cl->read_state = CL_READ_HDR; } diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index 5248e641..9f987cc0 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -96,6 +96,7 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op) to_send_list.push_back((iovec){ .iov_base = cur_op->buf, .iov_len = (size_t)cur_op->req.sec_read_bmp.len }); to_outbox.push_back((msgr_sendp_t){ .op = cur_op, .flags = 0 }); } + to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_LAST; if (cur_op->op_type == OSD_OP_IN) { to_outbox[to_outbox.size()-1].flags |= MSGR_SENDP_FREE; diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index 4e952a0e..b62a0e69 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -129,6 +129,7 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete) #ifdef WITH_RDMA if (cl->rdma_conn) { + clients_by_qp.erase(cl->rdma_conn->qp->qp_num); delete cl->rdma_conn; } #endif diff --git a/src/osd_secondary.cpp b/src/osd_secondary.cpp index 2fe77f69..79a039c8 100644 --- a/src/osd_secondary.cpp +++ b/src/osd_secondary.cpp @@ -166,15 +166,20 @@ void osd_t::exec_show_config(osd_op_t *cur_op) { // Indicate that RDMA is enabled wire_config["rdma_enabled"] = true; - if (req_json["connect_rdma"].is_string()) + if (req_json["connect_rdma"].is_string() && req_json["rdma_op_memory"].uint64_value() != 0) { // Peer is trying to connect using RDMA, try to satisfy him - bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(), req_json["rdma_max_msg"].uint64_value()); + bool ok = msgr.connect_rdma(cur_op->peer_fd, req_json["connect_rdma"].string_value(), + req_json["rdma_data_rkey"].uint64_value(), req_json["rdma_op_rkey"].uint64_value(), + req_json["rdma_op_slots"].uint64_value(), req_json["rdma_op_memory"].uint64_value()); if (ok) { auto rc = msgr.clients.at(cur_op->peer_fd)->rdma_conn; wire_config["rdma_address"] = rc->addr.to_string(); - wire_config["rdma_max_msg"] = rc->max_msg; + wire_config["rdma_data_rkey"] = (uint64_t)rc->in_data_mr->rkey; + wire_config["rdma_op_rkey"] = (uint64_t)rc->in_op_mr->rkey; + wire_config["rdma_op_slots"] = rc->op_slots; + wire_config["rdma_op_memory"] = rc->op_memory; } } } diff --git a/src/test_freelist.cpp b/src/test_freelist.cpp new file mode 100644 index 00000000..bf0889bc --- /dev/null +++ b/src/test_freelist.cpp @@ -0,0 +1,64 @@ +// Copyright (c) Vitaliy Filippov, 2023+ +// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details) + +#include +#include +#include +#include "freelist.cpp" + +inline bool operator == (const freelist_item_t & a, const freelist_item_t & b) +{ + return a.start == b.start && a.size == b.size; +} + +void dump(std::vector & freelist) +{ + printf("free: "); + for (auto & item: freelist) + { + printf("%lx+%lx ", item.start, item.size); + } + printf("\n"); +} + +void dump(freelist_allocator_t &alloc) +{ + dump(alloc.freelist); +} + +uint64_t test_alloc(freelist_allocator_t &alloc, uint64_t size) +{ + uint64_t r = alloc.alloc(size); + printf("alloc %lx: %lx\n", size, r); + return r; +} + +void assert_eq(freelist_allocator_t &alloc, std::vector v) +{ + if (alloc.freelist != v) + { + printf("expected "); + dump(v); + printf("got "); + dump(alloc); + throw std::runtime_error("test failed"); + } + dump(alloc); +} + +int main(int narg, char *args[]) +{ + freelist_allocator_t alloc; + alloc.free(0, 0x1000000); + assert_eq(alloc, { { 0, 0x1000000 } }); + assert(test_alloc(alloc, 0x1000) == 0); + assert_eq(alloc, { { 0x1000, 0xfff000 } }); + assert(test_alloc(alloc, 0x4000) == 0x1000); + alloc.free(0x1000000, 0x4000); + assert_eq(alloc, { { 0x5000, 0xfff000 } }); + alloc.free(0, 0x1000); + assert_eq(alloc, { { 0, 0x1000 }, { 0x5000, 0xfff000 } }); + alloc.free(0x1000, 0x4000); + assert_eq(alloc, { { 0, 0x1004000 } }); + return 0; +}