|
|
|
@ -293,75 +293,237 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address) |
|
|
|
|
// Being the client, connect all server's RDMA queues to our local (client) queues
|
|
|
|
|
bool osd_messenger_t::connect_rdma_server(osd_client_t *cl, json11::Json rdma_addresses, uint64_t server_max_sge) |
|
|
|
|
{ |
|
|
|
|
// Try to connect to the peer using RDMA
|
|
|
|
|
msgr_rdma_address_t addr; |
|
|
|
|
if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr)) |
|
|
|
|
if (rdma_addresses.array_items().size() > 0) |
|
|
|
|
{ |
|
|
|
|
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, max_rdma_send, max_rdma_recv, max_rdma_sge); |
|
|
|
|
if (rdma_conn) |
|
|
|
|
if (!server_max_sge || server_max_sge > rdma_max_sge) |
|
|
|
|
{ |
|
|
|
|
int r = rdma_conn->connect(&addr); |
|
|
|
|
if (r != 0) |
|
|
|
|
server_max_sge = rdma_max_sge; |
|
|
|
|
} |
|
|
|
|
int n_conn = rdma_addresses.array_items().size(); |
|
|
|
|
if (n_conn < cl->rdma_queues.size()) |
|
|
|
|
{ |
|
|
|
|
for (int i = n_conn; i < cl->rdma_queues.size(); i++) |
|
|
|
|
{ |
|
|
|
|
delete cl->rdma_queues[i]; |
|
|
|
|
} |
|
|
|
|
cl->rdma_queues.resize(n_conn); |
|
|
|
|
} |
|
|
|
|
else if (n_conn > cl->rdma_queues.size()) |
|
|
|
|
{ |
|
|
|
|
n_conn = cl->rdma_queues.size(); |
|
|
|
|
} |
|
|
|
|
for (int i = 0; i < n_conn; i++) |
|
|
|
|
{ |
|
|
|
|
msgr_rdma_address_t addr; |
|
|
|
|
if (!msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr) || |
|
|
|
|
cl->rdma_queues[i]->connect(&addr) != 0) |
|
|
|
|
{ |
|
|
|
|
delete rdma_conn; |
|
|
|
|
printf( |
|
|
|
|
"Failed to connect RDMA queue pair to %s (client %d)\n", |
|
|
|
|
addr.to_string().c_str(), peer_fd |
|
|
|
|
"Failed to connect to OSD %lu (address %s) using RDMA\n", |
|
|
|
|
cl->osd_num, rdma_addresses[i].string_value().c_str() |
|
|
|
|
); |
|
|
|
|
// FIXME: Keep TCP connection in this case
|
|
|
|
|
osd_num_t peer_osd = cl->osd_num; |
|
|
|
|
stop_client(cl->peer_fd); |
|
|
|
|
on_connect_peer(peer_osd, -1); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// Remember connection, but switch to RDMA only after sending the configuration response
|
|
|
|
|
auto cl = clients.at(peer_fd); |
|
|
|
|
cl->rdma_conn = rdma_conn; |
|
|
|
|
cl->peer_state = PEER_RDMA_CONNECTING; |
|
|
|
|
return true; |
|
|
|
|
printf("Connected local queue %d to OSD %lu queue %d using RDMA\n", cl->rdma_queues[i]->qp->qp_num, cl->osd_num, addr.qpn); |
|
|
|
|
if (cl->rdma_queues[i]->max_sge > server_max_sge) |
|
|
|
|
{ |
|
|
|
|
cl->rdma_queues[i]->max_sge = server_max_sge; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
cl->peer_state = PEER_RDMA; |
|
|
|
|
tfd->set_fd_handler(cl->peer_fd, false, NULL); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
for (auto rdma_conn: cl->rdma_queues) |
|
|
|
|
{ |
|
|
|
|
delete rdma_conn; |
|
|
|
|
} |
|
|
|
|
cl->rdma_queues.resize(0); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Being the server, try to connect all client's RDMA queues to our local (server) queues
|
|
|
|
|
bool osd_messenger_t::connect_rdma_client(osd_client_t *cl, json11::Json rdma_addresses, uint64_t client_max_sge) |
|
|
|
|
{ |
|
|
|
|
if (rdma_addresses.array_items().size() > 0) |
|
|
|
|
{ |
|
|
|
|
if (!client_max_sge || client_max_sge > rdma_max_sge) |
|
|
|
|
{ |
|
|
|
|
client_max_sge = rdma_max_sge; |
|
|
|
|
} |
|
|
|
|
int n_conn = rdma_addresses.array_items().size(); |
|
|
|
|
if (n_conn > rdma_queues_per_connection) |
|
|
|
|
{ |
|
|
|
|
n_conn = rdma_queues_per_connection; |
|
|
|
|
} |
|
|
|
|
for (int i = 0; i < n_conn; i++) |
|
|
|
|
{ |
|
|
|
|
msgr_rdma_address_t addr; |
|
|
|
|
if (msgr_rdma_address_t::from_string(rdma_addresses[i].string_value().c_str(), &addr)) |
|
|
|
|
{ |
|
|
|
|
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, client_max_sge); |
|
|
|
|
if (rdma_conn && rdma_conn->connect(&addr) == 0) |
|
|
|
|
{ |
|
|
|
|
printf("Connected local queue %d to client %d queue %d using RDMA\n", rdma_conn->qp->qp_num, cl->peer_fd, addr.qpn); |
|
|
|
|
cl->rdma_queues.push_back(rdma_conn); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
if (rdma_conn) |
|
|
|
|
{ |
|
|
|
|
delete rdma_conn; |
|
|
|
|
} |
|
|
|
|
printf( |
|
|
|
|
"Failed to connect RDMA queue pair to %s (client %d queue %d)\n", |
|
|
|
|
addr.to_string().c_str(), cl->peer_fd, i+1 |
|
|
|
|
); |
|
|
|
|
// Delete all RDMA queues to keep the TCP connection
|
|
|
|
|
for (int j = 0; j < cl->rdma_queues.size(); j++) |
|
|
|
|
{ |
|
|
|
|
delete cl->rdma_queues[j]; |
|
|
|
|
} |
|
|
|
|
cl->rdma_queues.resize(0); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Switch to RDMA state only after sending the configuration response
|
|
|
|
|
cl->peer_state = PEER_RDMA_CONNECTING; |
|
|
|
|
for (int i = 0; i < cl->rdma_queues.size(); i++) |
|
|
|
|
{ |
|
|
|
|
try_recv_rdma(cl, cl->rdma_queues[i]); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) |
|
|
|
|
static void try_send_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge) |
|
|
|
|
{ |
|
|
|
|
timespec tv; |
|
|
|
|
clock_gettime(CLOCK_REALTIME, &tv); |
|
|
|
|
uint64_t total = 0; |
|
|
|
|
for (int i = 0; i < op_sge; i++) |
|
|
|
|
total += sge[i].length; |
|
|
|
|
printf("%lu.%09lu RDMA send to queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total); |
|
|
|
|
ibv_send_wr *bad_wr = NULL; |
|
|
|
|
ibv_send_wr wr = { |
|
|
|
|
.wr_id = (uint64_t)(cl->peer_fd*2+1), |
|
|
|
|
.wr_id = wr_id, |
|
|
|
|
.sg_list = sge, |
|
|
|
|
.num_sge = op_sge, |
|
|
|
|
.opcode = IBV_WR_SEND, |
|
|
|
|
.send_flags = IBV_SEND_SIGNALED, |
|
|
|
|
}; |
|
|
|
|
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); |
|
|
|
|
int err = ibv_post_send(rc->qp, &wr, &bad_wr); |
|
|
|
|
if (err || bad_wr) |
|
|
|
|
{ |
|
|
|
|
printf("RDMA send failed: %s\n", strerror(err)); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
cl->rdma_conn->cur_send++; |
|
|
|
|
rc->cur_send++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool osd_messenger_t::try_send_rdma(osd_client_t *cl) |
|
|
|
|
static void try_recv_rdma_wr(msgr_rdma_connection_t *rc, uint64_t wr_id, ibv_sge *sge, int op_sge) |
|
|
|
|
{ |
|
|
|
|
auto rc = cl->rdma_conn; |
|
|
|
|
if (!cl->send_list.size() || rc->cur_send > 0) |
|
|
|
|
timespec tv; |
|
|
|
|
clock_gettime(CLOCK_REALTIME, &tv); |
|
|
|
|
uint64_t total = 0; |
|
|
|
|
for (int i = 0; i < op_sge; i++) |
|
|
|
|
total += sge[i].length; |
|
|
|
|
printf("%lu.%09lu RDMA receive from queue %d: %lu bytes\n", tv.tv_sec, tv.tv_nsec, rc->qp->qp_num, total); |
|
|
|
|
ibv_recv_wr *bad_wr = NULL; |
|
|
|
|
ibv_recv_wr wr = { |
|
|
|
|
.wr_id = wr_id, |
|
|
|
|
.sg_list = sge, |
|
|
|
|
.num_sge = op_sge, |
|
|
|
|
}; |
|
|
|
|
int err = ibv_post_recv(rc->qp, &wr, &bad_wr); |
|
|
|
|
if (err || bad_wr) |
|
|
|
|
{ |
|
|
|
|
// Only send one batch at a time
|
|
|
|
|
return true; |
|
|
|
|
printf("RDMA receive failed: %s\n", strerror(err)); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; |
|
|
|
|
// FIXME: rc->max_sge should be negotiated between client & server
|
|
|
|
|
rc->cur_recv++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool try_recv_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, uint32_t bs_bitmap_granularity) |
|
|
|
|
{ |
|
|
|
|
int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity; |
|
|
|
|
iovec *segments = cur_op->iov.get_iovec(); |
|
|
|
|
ibv_sge sge[rc->max_sge]; |
|
|
|
|
while (rc->send_pos < cl->send_list.size()) |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cur_op->reply.buf, |
|
|
|
|
.length = (uint32_t)OSD_PACKET_SIZE, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
while (rc->recv_pos < cur_op->iov.get_size()) |
|
|
|
|
{ |
|
|
|
|
iovec & iov = segments[rc->recv_pos]; |
|
|
|
|
if (op_size >= op_max || op_sge >= rc->max_sge) |
|
|
|
|
{ |
|
|
|
|
try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge); |
|
|
|
|
op_sge = 0; |
|
|
|
|
op_size = 0; |
|
|
|
|
if (rc->cur_recv >= rc->max_recv) |
|
|
|
|
{ |
|
|
|
|
// FIXME
|
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Receive in (max_sge*4k) fragments
|
|
|
|
|
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max |
|
|
|
|
? iov.iov_len-rc->recv_buf_pos : op_max-op_size); |
|
|
|
|
sge[op_sge++] = { |
|
|
|
|
.addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos), |
|
|
|
|
.length = len, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
op_size += len; |
|
|
|
|
rc->recv_buf_pos += len; |
|
|
|
|
if (rc->recv_buf_pos >= iov.iov_len) |
|
|
|
|
{ |
|
|
|
|
rc->recv_pos++; |
|
|
|
|
rc->recv_buf_pos = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (op_sge > 0) |
|
|
|
|
{ |
|
|
|
|
try_recv_rdma_wr(rc, cl->peer_fd, sge, op_sge); |
|
|
|
|
} |
|
|
|
|
rc->recv_pos = 0; |
|
|
|
|
rc->recv_buf_pos = 0; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool try_send_rdma_read(osd_client_t *cl, msgr_rdma_connection_t *rc, osd_op_t *cur_op, int op_list_size, uint32_t bs_bitmap_granularity) |
|
|
|
|
{ |
|
|
|
|
ibv_sge sge[rc->max_sge]; |
|
|
|
|
int op_size = bs_bitmap_granularity, op_sge = 1, op_max = rc->max_sge*bs_bitmap_granularity; |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cl->send_list[0].iov_base, |
|
|
|
|
.length = (uint32_t)cl->send_list[0].iov_len, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
rc->send_pos = 1; |
|
|
|
|
while (rc->send_pos < op_list_size) |
|
|
|
|
{ |
|
|
|
|
iovec & iov = cl->send_list[rc->send_pos]; |
|
|
|
|
if (cl->outbox[rc->send_pos].flags & MSGR_SENDP_HDR) |
|
|
|
|
{ |
|
|
|
|
if (op_sge > 0) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(cl, sge, op_sge); |
|
|
|
|
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); |
|
|
|
|
op_sge = 0; |
|
|
|
|
op_size = 0; |
|
|
|
|
if (rc->cur_send >= rc->max_send) |
|
|
|
@ -373,7 +535,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) |
|
|
|
|
.length = (uint32_t)iov.iov_len, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_send_rdma_wr(cl, sge, 1); |
|
|
|
|
try_send_rdma_wr(rc, cl->peer_fd, sge, 1); |
|
|
|
|
rc->send_pos++; |
|
|
|
|
if (rc->cur_send >= rc->max_send) |
|
|
|
|
break; |
|
|
|
@ -382,7 +544,7 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) |
|
|
|
|
{ |
|
|
|
|
if (op_size >= op_max || op_sge >= rc->max_sge) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(cl, sge, op_sge); |
|
|
|
|
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); |
|
|
|
|
op_sge = 0; |
|
|
|
|
op_size = 0; |
|
|
|
|
if (rc->cur_send >= rc->max_send) |
|
|
|
@ -407,80 +569,222 @@ bool osd_messenger_t::try_send_rdma(osd_client_t *cl) |
|
|
|
|
} |
|
|
|
|
if (op_sge > 0) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(cl, sge, op_sge); |
|
|
|
|
try_send_rdma_wr(rc, cl->peer_fd, sge, op_sge); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) |
|
|
|
|
{ |
|
|
|
|
ibv_recv_wr *bad_wr = NULL; |
|
|
|
|
ibv_recv_wr wr = { |
|
|
|
|
.wr_id = (uint64_t)(cl->peer_fd*2), |
|
|
|
|
.sg_list = sge, |
|
|
|
|
.num_sge = op_sge, |
|
|
|
|
}; |
|
|
|
|
int err = ibv_post_recv(cl->rdma_conn->qp, &wr, &bad_wr); |
|
|
|
|
if (err || bad_wr) |
|
|
|
|
if (op_list_size == 1) |
|
|
|
|
{ |
|
|
|
|
printf("RDMA receive failed: %s\n", strerror(err)); |
|
|
|
|
exit(1); |
|
|
|
|
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || |
|
|
|
|
cur_op->req.hdr.opcode == OSD_OP_READ) |
|
|
|
|
{ |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = 0, |
|
|
|
|
.length = 0, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
uint64_t data_size = cur_op->req.hdr.opcode == OSD_OP_SEC_READ |
|
|
|
|
? cur_op->req.sec_rw.len |
|
|
|
|
: cur_op->req.rw.len; |
|
|
|
|
while (data_size >= op_max) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(rc, cl->peer_fd, sge, 1); |
|
|
|
|
data_size -= op_max; |
|
|
|
|
} |
|
|
|
|
if (data_size > 0) |
|
|
|
|
try_send_rdma_wr(rc, cl->peer_fd, sge, 1); |
|
|
|
|
} |
|
|
|
|
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) |
|
|
|
|
{ |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = 0, |
|
|
|
|
.length = 0, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_send_rdma_wr(rc, cl->peer_fd, sge, 1); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
cl->rdma_conn->cur_recv++; |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) |
|
|
|
|
void osd_messenger_t::try_send_rdma(osd_client_t *cl) |
|
|
|
|
{ |
|
|
|
|
auto rc = cl->rdma_conn; |
|
|
|
|
if (rc->cur_recv > 0) |
|
|
|
|
{ |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
if (!cl->recv_list.get_size()) |
|
|
|
|
// Two different algorithms for outgoing and incoming operations
|
|
|
|
|
while (cl->outbox.size() > 0) |
|
|
|
|
{ |
|
|
|
|
cl->recv_list.reset(); |
|
|
|
|
cl->read_op = new osd_op_t; |
|
|
|
|
cl->read_op->peer_fd = cl->peer_fd; |
|
|
|
|
cl->read_op->op_type = OSD_OP_IN; |
|
|
|
|
cl->recv_list.push_back(cl->read_op->req.buf, OSD_PACKET_SIZE); |
|
|
|
|
cl->read_remaining = OSD_PACKET_SIZE; |
|
|
|
|
cl->read_state = CL_READ_HDR; |
|
|
|
|
} |
|
|
|
|
int op_size = 0, op_sge = 0, op_max = rc->max_sge*bs_bitmap_granularity; |
|
|
|
|
iovec *segments = cl->recv_list.get_iovec(); |
|
|
|
|
// FIXME: rc->max_sge should be negotiated between client & server
|
|
|
|
|
ibv_sge sge[rc->max_sge]; |
|
|
|
|
while (rc->recv_pos < cl->recv_list.get_size()) |
|
|
|
|
{ |
|
|
|
|
iovec & iov = segments[rc->recv_pos]; |
|
|
|
|
if (op_size >= op_max || op_sge >= rc->max_sge) |
|
|
|
|
osd_op_t *cur_op = cl->outbox[0].op; |
|
|
|
|
if (cur_op->op_type == OSD_OP_OUT) |
|
|
|
|
{ |
|
|
|
|
try_recv_rdma_wr(cl, sge, op_sge); |
|
|
|
|
op_sge = 0; |
|
|
|
|
op_size = 0; |
|
|
|
|
if (rc->cur_recv >= rc->max_recv) |
|
|
|
|
break; |
|
|
|
|
// Pick a queue. Send operation to it in one part.
|
|
|
|
|
int qi; |
|
|
|
|
for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != NULL; qi++) {} |
|
|
|
|
if (qi >= cl->rdma_queues.size()) |
|
|
|
|
{ |
|
|
|
|
// No free queues, retry later.
|
|
|
|
|
// We only post 1 operation per queue to use the queue pair number as a 'tag'.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Pick all entries for the operation from the queue
|
|
|
|
|
int op_list_size = 0; |
|
|
|
|
while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op) |
|
|
|
|
{ |
|
|
|
|
op_list_size++; |
|
|
|
|
} |
|
|
|
|
auto rq = cl->rdma_queues[qi]; |
|
|
|
|
rq->cur_op = cur_op; |
|
|
|
|
ibv_sge sge[rq->max_sge]; |
|
|
|
|
// FIXME: This won't work with long bitmaps. But I don't care, I want to finally test fucking RDMA
|
|
|
|
|
// header or header+data
|
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cl->send_list[0].iov_base, |
|
|
|
|
.length = (uint32_t)cl->send_list[0].iov_len, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
if (op_list_size == 2) |
|
|
|
|
{ |
|
|
|
|
auto & iov = cl->send_list[1]; |
|
|
|
|
sge[1] = { |
|
|
|
|
.addr = (uintptr_t)iov.iov_base, |
|
|
|
|
.length = (uint32_t)iov.iov_len, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_send_rdma_wr(rq, cl->peer_fd, sge, 2); |
|
|
|
|
} |
|
|
|
|
else if (op_list_size == 1) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(rq, cl->peer_fd, sge, 1); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
printf("unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size()); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size); |
|
|
|
|
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size); |
|
|
|
|
// Post a receive request for the reply at the same time
|
|
|
|
|
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || |
|
|
|
|
cur_op->req.hdr.opcode == OSD_OP_READ) |
|
|
|
|
{ |
|
|
|
|
try_recv_rdma_read(cl, rq, cur_op, bs_bitmap_granularity); |
|
|
|
|
} |
|
|
|
|
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) |
|
|
|
|
{ |
|
|
|
|
assert(!cur_op->iov.count); |
|
|
|
|
// FIXME: hardcode
|
|
|
|
|
#define clean_entry_bitmap_size 4 |
|
|
|
|
// Reply size is known
|
|
|
|
|
uint64_t data_size = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size); |
|
|
|
|
cur_op->rmw_buf = malloc_or_die(data_size); |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cur_op->reply.buf, |
|
|
|
|
.length = (uint32_t)OSD_PACKET_SIZE, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
sge[1] = { |
|
|
|
|
.addr = (uintptr_t)cur_op->rmw_buf, |
|
|
|
|
.length = (uint32_t)data_size, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_recv_rdma_wr(rq, cl->peer_fd, sge, 2); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// No reply or reply size is unknown
|
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cur_op->reply.buf, |
|
|
|
|
.length = (uint32_t)OSD_PACKET_SIZE, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_recv_rdma_wr(rq, cl->peer_fd, sge, 1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Receive in identical (max_sge*4k) fragments
|
|
|
|
|
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->recv_buf_pos < op_max ? iov.iov_len-rc->recv_buf_pos : op_max-op_size); |
|
|
|
|
sge[op_sge++] = { |
|
|
|
|
.addr = (uintptr_t)(iov.iov_base+rc->recv_buf_pos), |
|
|
|
|
.length = len, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
op_size += len; |
|
|
|
|
rc->recv_buf_pos += len; |
|
|
|
|
if (rc->recv_buf_pos >= iov.iov_len) |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
rc->recv_pos++; |
|
|
|
|
rc->recv_buf_pos = 0; |
|
|
|
|
// Send reply to the same queue the operation came from.
|
|
|
|
|
// Fragment it into parts no longer than (max_sge*4k) to always
|
|
|
|
|
// be able to send and receive them correctly.
|
|
|
|
|
int qi; |
|
|
|
|
for (qi = 0; qi < cl->rdma_queues.size() && cl->rdma_queues[qi]->cur_op != cur_op; qi++) {} |
|
|
|
|
if (qi >= cl->rdma_queues.size()) |
|
|
|
|
{ |
|
|
|
|
printf("Unknown incoming operation for client %d\n", cl->peer_fd); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
// Pick all entries for the operation from the queue
|
|
|
|
|
int op_list_size = 0; |
|
|
|
|
while (op_list_size < cl->outbox.size() && cl->outbox[op_list_size].op == cur_op) |
|
|
|
|
{ |
|
|
|
|
op_list_size++; |
|
|
|
|
} |
|
|
|
|
auto rq = cl->rdma_queues[qi]; |
|
|
|
|
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ || |
|
|
|
|
cur_op->req.hdr.opcode == OSD_OP_READ) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_read(cl, rq, cur_op, op_list_size, bs_bitmap_granularity); |
|
|
|
|
rq->send_pos = 0; |
|
|
|
|
rq->send_buf_pos = 0; |
|
|
|
|
} |
|
|
|
|
else if (op_list_size == 1) |
|
|
|
|
{ |
|
|
|
|
ibv_sge sge[1]; |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cl->send_list[0].iov_base, |
|
|
|
|
.length = (uint32_t)cl->send_list[0].iov_len, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_send_rdma_wr(rq, cl->peer_fd, sge, 1); |
|
|
|
|
} |
|
|
|
|
else if (op_list_size == 2) |
|
|
|
|
{ |
|
|
|
|
ibv_sge sge[2]; |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cl->send_list[0].iov_base, |
|
|
|
|
.length = (uint32_t)cl->send_list[0].iov_len, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
sge[1] = { |
|
|
|
|
.addr = (uintptr_t)cl->send_list[1].iov_base, |
|
|
|
|
.length = (uint32_t)cl->send_list[1].iov_len, |
|
|
|
|
.lkey = rq->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP) |
|
|
|
|
try_send_rdma_wr(rq, cl->peer_fd, sge, 2); |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(rq, cl->peer_fd, sge, 1); |
|
|
|
|
try_send_rdma_wr(rq, cl->peer_fd, sge+1, 1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else if (op_list_size > 2) |
|
|
|
|
{ |
|
|
|
|
printf("Unexpected long send_list for opcode %lu: %lu entries\n", cur_op->req.hdr.opcode, cl->send_list.size()); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_list_size); |
|
|
|
|
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+op_list_size); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (op_sge > 0) |
|
|
|
|
{ |
|
|
|
|
try_recv_rdma_wr(cl, sge, op_sge); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Try to receive an incoming operation via RDMA
|
|
|
|
|
void osd_messenger_t::try_recv_rdma(osd_client_t *cl, msgr_rdma_connection_t *rc) |
|
|
|
|
{ |
|
|
|
|
rc->cur_op = new osd_op_t; |
|
|
|
|
rc->cur_op->peer_fd = cl->peer_fd; |
|
|
|
|
rc->cur_op->op_type = OSD_OP_IN; |
|
|
|
|
rc->cur_op->buf = memalign_or_die(MEM_ALIGNMENT, 128*1024); // FIXME hardcode for tests
|
|
|
|
|
ibv_sge sge[2]; |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)rc->cur_op->req.buf, |
|
|
|
|
.length = (uint32_t)OSD_PACKET_SIZE, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
sge[1] = { |
|
|
|
|
.addr = (uintptr_t)rc->cur_op->buf, |
|
|
|
|
.length = (uint32_t)128*1024, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_recv_rdma_wr(rc, cl->peer_fd, sge, 2); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#define RDMA_EVENTS_AT_ONCE 32 |
|
|
|
@ -491,6 +795,7 @@ void osd_messenger_t::handle_rdma_events() |
|
|
|
|
ibv_cq *ev_cq; |
|
|
|
|
void *ev_ctx; |
|
|
|
|
// FIXME: This is inefficient as it calls read()...
|
|
|
|
|
timespec tv; |
|
|
|
|
if (ibv_get_cq_event(rdma_context->channel, &ev_cq, &ev_ctx) == 0) |
|
|
|
|
{ |
|
|
|
|
ibv_ack_cq_events(rdma_context->cq, 1); |
|
|
|
@ -507,8 +812,8 @@ void osd_messenger_t::handle_rdma_events() |
|
|
|
|
event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); |
|
|
|
|
for (int i = 0; i < event_count; i++) |
|
|
|
|
{ |
|
|
|
|
int client_id = wc[i].wr_id >> 1; |
|
|
|
|
bool is_send = wc[i].wr_id & 1; |
|
|
|
|
int client_id = wc[i].wr_id; |
|
|
|
|
bool is_send = wc[i].opcode == IBV_WC_SEND; |
|
|
|
|
auto cl_it = clients.find(client_id); |
|
|
|
|
if (cl_it == clients.end()) |
|
|
|
|
{ |
|
|
|
@ -526,55 +831,120 @@ void osd_messenger_t::handle_rdma_events() |
|
|
|
|
stop_client(client_id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
int q; |
|
|
|
|
for (q = 0; q < cl->rdma_queues.size() && cl->rdma_queues[q]->qp->qp_num != wc[i].qp_num; q++) {} |
|
|
|
|
if (q >= cl->rdma_queues.size()) |
|
|
|
|
{ |
|
|
|
|
printf("Unknown queue %d for client %d\n", wc[i].qp_num, cl->peer_fd); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
auto rc = cl->rdma_queues[q]; |
|
|
|
|
if (is_send) |
|
|
|
|
{ |
|
|
|
|
clock_gettime(CLOCK_REALTIME, &tv); |
|
|
|
|
printf("%lu.%09lu Done RDMA send on queue %d\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
clock_gettime(CLOCK_REALTIME, &tv); |
|
|
|
|
printf("%lu.%09lu Done RDMA recv on queue %d, %d bytes\n", tv.tv_sec, tv.tv_nsec, wc[i].qp_num, wc[i].byte_len); |
|
|
|
|
} |
|
|
|
|
if (!is_send) |
|
|
|
|
{ |
|
|
|
|
cl->rdma_conn->cur_recv--; |
|
|
|
|
if (!cl->rdma_conn->cur_recv) |
|
|
|
|
rc->cur_recv--; |
|
|
|
|
if (!rc->cur_recv) |
|
|
|
|
{ |
|
|
|
|
cl->recv_list.done += cl->rdma_conn->recv_pos; |
|
|
|
|
cl->rdma_conn->recv_pos = 0; |
|
|
|
|
if (!cl->recv_list.get_size()) |
|
|
|
|
// Fucking shit...
|
|
|
|
|
if (rc->cur_op->op_type == OSD_OP_IN) |
|
|
|
|
{ |
|
|
|
|
cl->read_remaining = 0; |
|
|
|
|
if (handle_finished_read(cl)) |
|
|
|
|
if (wc[i].byte_len <= OSD_PACKET_SIZE) |
|
|
|
|
{ |
|
|
|
|
try_recv_rdma(cl); |
|
|
|
|
free(rc->cur_op->buf); |
|
|
|
|
rc->cur_op->buf = NULL; |
|
|
|
|
} |
|
|
|
|
cl->received_ops.push_back(rc->cur_op); |
|
|
|
|
set_immediate.push_back([this, op = rc->cur_op]() { exec_op(op); }); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
else /* if (rc->cur_op->op_type == OSD_OP_OUT) */ |
|
|
|
|
{ |
|
|
|
|
// Continue to receive data
|
|
|
|
|
try_recv_rdma(cl); |
|
|
|
|
if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ || |
|
|
|
|
rc->cur_op->reply.hdr.opcode == OSD_OP_READ) |
|
|
|
|
{ |
|
|
|
|
// Data is already received
|
|
|
|
|
cl->sent_ops.erase(rc->cur_op->req.hdr.id); |
|
|
|
|
handle_reply_ready(rc->cur_op); |
|
|
|
|
rc->cur_op = NULL; |
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
} |
|
|
|
|
else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_READ_BMP) |
|
|
|
|
{ |
|
|
|
|
// Data is already received, but we need to switch buffers
|
|
|
|
|
cl->sent_ops.erase(rc->cur_op->req.hdr.id); |
|
|
|
|
free(rc->cur_op->buf); |
|
|
|
|
rc->cur_op->buf = rc->cur_op->rmw_buf; |
|
|
|
|
handle_reply_ready(rc->cur_op); |
|
|
|
|
rc->cur_op = NULL; |
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
} |
|
|
|
|
else if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST && rc->cur_op->reply.hdr.retval > 0 || |
|
|
|
|
rc->cur_op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && rc->cur_op->reply.hdr.retval > 0) |
|
|
|
|
{ |
|
|
|
|
if (rc->recv_pos != 1) |
|
|
|
|
{ |
|
|
|
|
// Data is not received yet (RNR)
|
|
|
|
|
uint32_t len; |
|
|
|
|
if (rc->cur_op->reply.hdr.opcode == OSD_OP_SEC_LIST) |
|
|
|
|
len = sizeof(obj_ver_id) * rc->cur_op->reply.hdr.retval; |
|
|
|
|
else |
|
|
|
|
len = rc->cur_op->reply.hdr.retval; |
|
|
|
|
rc->cur_op->buf = malloc_or_die(len); |
|
|
|
|
ibv_sge sge[1]; |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)rc->cur_op->buf, |
|
|
|
|
.length = len, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_recv_rdma_wr(rc, cl->peer_fd, sge, 1); |
|
|
|
|
rc->recv_pos = 1; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// Done
|
|
|
|
|
cl->sent_ops.erase(rc->cur_op->req.hdr.id); |
|
|
|
|
handle_reply_ready(rc->cur_op); |
|
|
|
|
rc->cur_op = NULL; |
|
|
|
|
rc->recv_pos = 0; |
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// No data
|
|
|
|
|
cl->sent_ops.erase(rc->cur_op->req.hdr.id); |
|
|
|
|
handle_reply_ready(rc->cur_op); |
|
|
|
|
rc->cur_op = NULL; |
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
cl->rdma_conn->cur_send--; |
|
|
|
|
if (!cl->rdma_conn->cur_send) |
|
|
|
|
rc->cur_send--; |
|
|
|
|
if (!rc->cur_send) |
|
|
|
|
{ |
|
|
|
|
// Wait for the whole batch
|
|
|
|
|
for (int i = 0; i < cl->rdma_conn->send_pos; i++) |
|
|
|
|
{ |
|
|
|
|
if (cl->outbox[i].flags & MSGR_SENDP_FREE) |
|
|
|
|
{ |
|
|
|
|
// Reply fully sent
|
|
|
|
|
delete cl->outbox[i].op; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (cl->rdma_conn->send_pos > 0) |
|
|
|
|
if (rc->cur_op->op_type == OSD_OP_OUT) |
|
|
|
|
{ |
|
|
|
|
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); |
|
|
|
|
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); |
|
|
|
|
cl->rdma_conn->send_pos = 0; |
|
|
|
|
// Nothing
|
|
|
|
|
} |
|
|
|
|
if (cl->rdma_conn->send_buf_pos > 0) |
|
|
|
|
else /* if (rc->cur_op->op_type == OSD_OP_IN) */ |
|
|
|
|
{ |
|
|
|
|
cl->send_list[0].iov_base += cl->rdma_conn->send_buf_pos; |
|
|
|
|
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos; |
|
|
|
|
cl->rdma_conn->send_buf_pos = 0; |
|
|
|
|
// Reply fully sent
|
|
|
|
|
delete rc->cur_op; |
|
|
|
|
rc->cur_op = NULL; |
|
|
|
|
// Post receive for the next incoming op
|
|
|
|
|
try_recv_rdma(cl, rc); |
|
|
|
|
} |
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -585,3 +955,8 @@ void osd_messenger_t::handle_rdma_events() |
|
|
|
|
} |
|
|
|
|
set_immediate.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int osd_messenger_t::get_rdma_max_sge() |
|
|
|
|
{ |
|
|
|
|
return rdma_max_sge; |
|
|
|
|
} |
|
|
|
|