diff --git a/mon/mon.js b/mon/mon.js index d0cf1be0..1150e584 100644 --- a/mon/mon.js +++ b/mon/mon.js @@ -70,9 +70,9 @@ const etcd_tree = { rdma_gid_index: 0, rdma_mtu: 4096, rdma_max_sge: 128, - rdma_max_send: 32, - rdma_max_recv: 8, - rdma_max_msg: 1048576, + rdma_max_send: 64, + rdma_max_recv: 128, + rdma_max_msg: 132096, log_level: 0, block_size: 131072, disk_alignment: 4096, diff --git a/src/messenger.cpp b/src/messenger.cpp index 50b557b1..6f4fa80d 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -157,7 +157,7 @@ void osd_messenger_t::parse_config(const json11::Json & config) this->rdma_max_sge = 128; this->rdma_max_send = config["rdma_max_send"].uint64_value(); if (!this->rdma_max_send) - this->rdma_max_send = 1; + this->rdma_max_send = 64; this->rdma_max_recv = config["rdma_max_recv"].uint64_value(); if (!this->rdma_max_recv) this->rdma_max_recv = 128; diff --git a/src/msgr_rdma.cpp b/src/msgr_rdma.cpp index 08c5efa1..8e63195f 100644 --- a/src/msgr_rdma.cpp +++ b/src/msgr_rdma.cpp @@ -368,9 +368,8 @@ static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) bool osd_messenger_t::try_send_rdma(osd_client_t *cl) { auto rc = cl->rdma_conn; - if (!cl->send_list.size() || rc->cur_send > 0) + if (!cl->send_list.size() || rc->cur_send >= rc->max_send) { - // Only send one batch at a time return true; } uint64_t op_size = 0, op_sge = 0; @@ -380,6 +379,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) iovec & iov = cl->send_list[rc->send_pos]; if (op_size >= rc->max_msg || op_sge >= rc->max_sge) { + rc->send_sizes.push_back(op_size); try_send_rdma_wr(cl, sge, op_sge); op_sge = 0; op_size = 0; @@ -405,6 +405,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) } if (op_sge > 0) { + rc->send_sizes.push_back(op_size); try_send_rdma_wr(cl, sge, op_sge); } return true; @@ -476,6 +477,7 @@ void osd_messenger_t::handle_rdma_events() continue; } osd_client_t *cl = cl_it->second; + auto rc = cl->rdma_conn; if (wc[i].status != IBV_WC_SUCCESS) { fprintf(stderr, "RDMA work request failed for client %d", client_id); @@ -489,43 +491,59 @@ void osd_messenger_t::handle_rdma_events() } if (!is_send) { - cl->rdma_conn->cur_recv--; - if (!handle_read_buffer(cl, cl->rdma_conn->recv_buffers[cl->rdma_conn->next_recv_buf], wc[i].byte_len)) + rc->cur_recv--; + if (!handle_read_buffer(cl, rc->recv_buffers[rc->next_recv_buf], wc[i].byte_len)) { // handle_read_buffer may stop the client continue; } - try_recv_rdma_wr(cl, cl->rdma_conn->recv_buffers[cl->rdma_conn->next_recv_buf]); - cl->rdma_conn->next_recv_buf = (cl->rdma_conn->next_recv_buf+1) % cl->rdma_conn->recv_buffers.size(); + try_recv_rdma_wr(cl, rc->recv_buffers[rc->next_recv_buf]); + rc->next_recv_buf = (rc->next_recv_buf+1) % rc->recv_buffers.size(); } else { - cl->rdma_conn->cur_send--; - if (!cl->rdma_conn->cur_send) + rc->cur_send--; + uint64_t sent_size = rc->send_sizes.at(0); + rc->send_sizes.erase(rc->send_sizes.begin(), rc->send_sizes.begin()+1); + int send_pos = 0, send_buf_pos = 0; + while (sent_size > 0) { - // Wait for the whole batch - for (int i = 0; i < cl->rdma_conn->send_pos; i++) + if (sent_size >= cl->send_list.at(send_pos).iov_len) { - if (cl->outbox[i].flags & MSGR_SENDP_FREE) - { - // Reply fully sent - delete cl->outbox[i].op; - } + sent_size -= cl->send_list[send_pos].iov_len; + send_pos++; } - if (cl->rdma_conn->send_pos > 0) + else { - cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); - cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); - cl->rdma_conn->send_pos = 0; + send_buf_pos = sent_size; + sent_size = 0; } - if (cl->rdma_conn->send_buf_pos > 0) - { - cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + cl->rdma_conn->send_buf_pos; - cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos; - cl->rdma_conn->send_buf_pos = 0; - } - try_send_rdma(cl); } + assert(rc->send_pos >= send_pos); + if (rc->send_pos == send_pos) + { + rc->send_buf_pos -= send_buf_pos; + } + rc->send_pos -= send_pos; + for (int i = 0; i < send_pos; i++) + { + if (cl->outbox[i].flags & MSGR_SENDP_FREE) + { + // Reply fully sent + delete cl->outbox[i].op; + } + } + if (send_pos > 0) + { + cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+send_pos); + cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+send_pos); + } + if (send_buf_pos > 0) + { + cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + send_buf_pos; + cl->send_list[0].iov_len -= send_buf_pos; + } + try_send_rdma(cl); } } } while (event_count > 0); diff --git a/src/msgr_rdma.h b/src/msgr_rdma.h index c8e79e78..7c56f4a1 100644 --- a/src/msgr_rdma.h +++ b/src/msgr_rdma.h @@ -51,6 +51,7 @@ struct msgr_rdma_connection_t int send_pos = 0, send_buf_pos = 0; int next_recv_buf = 0; std::vector recv_buffers; + std::vector send_sizes; ~msgr_rdma_connection_t(); static msgr_rdma_connection_t *create(msgr_rdma_context_t *ctx, uint32_t max_send, uint32_t max_recv, uint32_t max_sge, uint32_t max_msg);