|
|
|
@ -46,9 +46,20 @@ msgr_rdma_connection_t::~msgr_rdma_connection_t() |
|
|
|
|
ctx->used_max_cqe -= max_send+max_recv; |
|
|
|
|
if (qp) |
|
|
|
|
ibv_destroy_qp(qp); |
|
|
|
|
if (recv_buffers.size()) |
|
|
|
|
for (auto b: recv_buffers) |
|
|
|
|
free(b); |
|
|
|
|
if (in_data_mr) |
|
|
|
|
ibv_dereg_mr(in_data_mr); |
|
|
|
|
if (in_op_mr) |
|
|
|
|
ibv_dereg_mr(in_op_mr); |
|
|
|
|
if (in_data_buf) |
|
|
|
|
free(in_data_buf); |
|
|
|
|
if (in_ops) |
|
|
|
|
free(in_ops); |
|
|
|
|
if (out_op_alloc) |
|
|
|
|
delete out_op_alloc; |
|
|
|
|
if (out_slot_data) |
|
|
|
|
free(out_slot_data); |
|
|
|
|
if (out_slot_ops) |
|
|
|
|
free(out_slot_ops); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t ib_port, uint8_t gid_index, uint32_t mtu, int log_level) |
|
|
|
@ -149,7 +160,7 @@ msgr_rdma_context_t *msgr_rdma_context_t::create(const char *ib_devname, uint8_t |
|
|
|
|
ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_ON_DEMAND); |
|
|
|
|
if (!ctx->mr) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "Couldn't register RDMA memory region\n"); |
|
|
|
|
fprintf(stderr, "Couldn't register global RDMA memory region: %s\n", strerror(errno)); |
|
|
|
|
goto cleanup; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -180,7 +191,7 @@ cleanup: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, uint32_t max_send, |
|
|
|
|
uint32_t max_recv, uint32_t max_sge, uint32_t max_msg) |
|
|
|
|
uint32_t max_recv, uint32_t max_sge, uint64_t op_slots, uint64_t op_memory) |
|
|
|
|
{ |
|
|
|
|
msgr_rdma_connection_t *conn = new msgr_rdma_connection_t; |
|
|
|
|
|
|
|
|
@ -190,7 +201,6 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, |
|
|
|
|
conn->max_send = max_send; |
|
|
|
|
conn->max_recv = max_recv; |
|
|
|
|
conn->max_sge = max_sge; |
|
|
|
|
conn->max_msg = max_msg; |
|
|
|
|
|
|
|
|
|
ctx->used_max_cqe += max_send+max_recv; |
|
|
|
|
if (ctx->used_max_cqe > ctx->max_cqe) |
|
|
|
@ -211,6 +221,30 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, |
|
|
|
|
ctx->max_cqe = new_max_cqe; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
conn->op_memory = op_memory; |
|
|
|
|
conn->in_data_buf = memalign_or_die(MEM_ALIGNMENT, op_memory); |
|
|
|
|
conn->in_data_mr = ibv_reg_mr(ctx->pd, conn->in_data_buf, op_memory, |
|
|
|
|
IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND); |
|
|
|
|
if (!conn->in_data_mr) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "Couldn't register %lu MB RDMA memory region for incoming data: %s\n", |
|
|
|
|
(op_memory+1024*1024-1)/1024/1024, strerror(errno)); |
|
|
|
|
delete conn; |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
conn->op_slots = op_slots; |
|
|
|
|
conn->in_ops = (msgr_rdma_cmd_t *)malloc_or_die(sizeof(msgr_rdma_cmd_t) * op_slots); |
|
|
|
|
conn->in_op_mr = ibv_reg_mr(ctx->pd, conn->in_ops, sizeof(msgr_rdma_cmd_t) * op_slots, |
|
|
|
|
IBV_ACCESS_ZERO_BASED | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_ON_DEMAND); |
|
|
|
|
if (!conn->in_op_mr) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "Couldn't register %lu KB RDMA memory region for incoming operation headers: %s\n", |
|
|
|
|
(sizeof(msgr_rdma_cmd_t) * op_slots + 1023)/1024, strerror(errno)); |
|
|
|
|
delete conn; |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ibv_qp_init_attr init_attr = { |
|
|
|
|
.send_cq = ctx->cq, |
|
|
|
|
.recv_cq = ctx->cq, |
|
|
|
@ -237,7 +271,7 @@ msgr_rdma_connection_t *msgr_rdma_connection_t::create(msgr_rdma_context_t *ctx, |
|
|
|
|
|
|
|
|
|
ibv_qp_attr attr = { |
|
|
|
|
.qp_state = IBV_QPS_INIT, |
|
|
|
|
.qp_access_flags = 0, |
|
|
|
|
.qp_access_flags = IBV_ACCESS_REMOTE_WRITE, |
|
|
|
|
.pkey_index = 0, |
|
|
|
|
.port_num = ctx->ib_port, |
|
|
|
|
}; |
|
|
|
@ -265,6 +299,19 @@ static ibv_mtu mtu_to_ibv_mtu(uint32_t mtu) |
|
|
|
|
return IBV_MTU_4096; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void msgr_rdma_connection_t::set_out_capacity(uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory) |
|
|
|
|
{ |
|
|
|
|
assert(!out_op_alloc); |
|
|
|
|
this->out_data_rkey = out_data_rkey; |
|
|
|
|
this->out_op_rkey = out_op_rkey; |
|
|
|
|
this->out_op_slots = out_op_slots; |
|
|
|
|
this->out_op_memory = out_op_memory; |
|
|
|
|
out_op_alloc = new allocator(out_op_slots); |
|
|
|
|
out_data_alloc.free(0, out_op_memory); |
|
|
|
|
out_slot_data = (msgr_rdma_out_pos_t *)malloc_or_die(sizeof(msgr_rdma_out_pos_t) * out_op_slots); |
|
|
|
|
out_slot_ops = (osd_op_t **)malloc_or_die(sizeof(osd_op_t *) * out_op_slots); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) |
|
|
|
|
{ |
|
|
|
|
auto conn = this; |
|
|
|
@ -311,17 +358,14 @@ int msgr_rdma_connection_t::connect(msgr_rdma_address_t *dest) |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64_t client_max_msg) |
|
|
|
|
bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, |
|
|
|
|
uint32_t out_data_rkey, uint32_t out_op_rkey, uint64_t out_op_slots, uint64_t out_op_memory) |
|
|
|
|
{ |
|
|
|
|
// Try to connect to the peer using RDMA
|
|
|
|
|
msgr_rdma_address_t addr; |
|
|
|
|
if (msgr_rdma_address_t::from_string(rdma_address.c_str(), &addr)) |
|
|
|
|
{ |
|
|
|
|
if (client_max_msg > rdma_max_msg) |
|
|
|
|
{ |
|
|
|
|
client_max_msg = rdma_max_msg; |
|
|
|
|
} |
|
|
|
|
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, client_max_msg); |
|
|
|
|
auto rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_op_slots, rdma_op_memory); |
|
|
|
|
if (rdma_conn) |
|
|
|
|
{ |
|
|
|
|
int r = rdma_conn->connect(&addr); |
|
|
|
@ -336,6 +380,8 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64 |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// Remember connection, but switch to RDMA only after sending the configuration response
|
|
|
|
|
clients_by_qp[rdma_conn->qp->qp_num] = peer_fd; |
|
|
|
|
rdma_conn->set_out_capacity(out_data_rkey, out_op_rkey, out_op_slots, out_op_memory); |
|
|
|
|
auto cl = clients.at(peer_fd); |
|
|
|
|
cl->rdma_conn = rdma_conn; |
|
|
|
|
cl->peer_state = PEER_RDMA_CONNECTING; |
|
|
|
@ -346,66 +392,161 @@ bool osd_messenger_t::connect_rdma(int peer_fd, std::string rdma_address, uint64 |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void try_send_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) |
|
|
|
|
{ |
|
|
|
|
ibv_send_wr *bad_wr = NULL; |
|
|
|
|
ibv_send_wr wr = { |
|
|
|
|
.wr_id = (uint64_t)(cl->peer_fd*2+1), |
|
|
|
|
.sg_list = sge, |
|
|
|
|
.num_sge = op_sge, |
|
|
|
|
.opcode = IBV_WR_SEND, |
|
|
|
|
.send_flags = IBV_SEND_SIGNALED, |
|
|
|
|
}; |
|
|
|
|
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); |
|
|
|
|
if (err || bad_wr) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
cl->rdma_conn->cur_send++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool osd_messenger_t::try_send_rdma(osd_client_t *cl) |
|
|
|
|
{ |
|
|
|
|
auto rc = cl->rdma_conn; |
|
|
|
|
if (!cl->send_list.size() || rc->cur_send > 0) |
|
|
|
|
if (!cl->send_list.size() && !rc->in_slots_freed.size() || rc->cur_send >= rc->max_send) |
|
|
|
|
{ |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
int i = 0; |
|
|
|
|
while (i < rc->in_slots_freed.size()) |
|
|
|
|
{ |
|
|
|
|
auto op_slot = rc->in_slots_freed[i++]; |
|
|
|
|
assert(op_slot < 0x80000000); |
|
|
|
|
ibv_send_wr *bad_wr = NULL; |
|
|
|
|
ibv_send_wr wr = { |
|
|
|
|
.wr_id = 0, |
|
|
|
|
.opcode = IBV_WR_RDMA_WRITE_WITH_IMM, |
|
|
|
|
.imm_data = 0x80000000 | op_slot, |
|
|
|
|
}; |
|
|
|
|
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); |
|
|
|
|
if (err || bad_wr) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
rc->cur_send++; |
|
|
|
|
if (rc->cur_send >= rc->max_send) |
|
|
|
|
{ |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
rc->in_slots_freed.erase(rc->in_slots_freed.begin(), rc->in_slots_freed.begin()+i); |
|
|
|
|
if (!cl->send_list.size() || rc->cur_send >= rc->max_send) |
|
|
|
|
{ |
|
|
|
|
// Only send one batch at a time
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
uint64_t op_size = 0, op_sge = 0; |
|
|
|
|
ibv_sge sge[rc->max_sge]; |
|
|
|
|
while (rc->send_pos < cl->send_list.size()) |
|
|
|
|
int op_start = 0; |
|
|
|
|
while (op_start < cl->send_list.size()) |
|
|
|
|
{ |
|
|
|
|
iovec & iov = cl->send_list[rc->send_pos]; |
|
|
|
|
if (op_size >= rc->max_msg || op_sge >= rc->max_sge) |
|
|
|
|
uint64_t op_data_size = 0; |
|
|
|
|
int op_end = op_start; |
|
|
|
|
while (!(cl->outbox[op_end].flags & MSGR_SENDP_LAST)) |
|
|
|
|
{ |
|
|
|
|
op_data_size += cl->send_list[op_end].iov_len; |
|
|
|
|
op_end++; |
|
|
|
|
} |
|
|
|
|
op_data_size += cl->send_list[op_end].iov_len; |
|
|
|
|
op_end++; |
|
|
|
|
op_data_size -= cl->send_list[op_start].iov_len; |
|
|
|
|
// Operation boundaries in send_list: op_start..op_end, first iovec is the header
|
|
|
|
|
uint64_t op_slot = rc->out_op_alloc->find_free(); |
|
|
|
|
if (op_slot == UINT64_MAX) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(cl, sge, op_sge); |
|
|
|
|
op_sge = 0; |
|
|
|
|
op_size = 0; |
|
|
|
|
if (rc->cur_send >= rc->max_send) |
|
|
|
|
// op queue is full
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
uint64_t data_pos = UINT64_MAX; |
|
|
|
|
if (op_data_size >= 0) |
|
|
|
|
{ |
|
|
|
|
if (rc->cur_send > rc->max_send-1-(op_end-op_start-1+rc->max_sge)/rc->max_sge) |
|
|
|
|
{ |
|
|
|
|
break; |
|
|
|
|
// RDMA queue is full
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
// FIXME: Oops, and what if op data is larger than the whole buffer... :)
|
|
|
|
|
data_pos = rc->out_data_alloc.alloc(op_data_size); |
|
|
|
|
if (data_pos == UINT64_MAX) |
|
|
|
|
{ |
|
|
|
|
// data buffers are full
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
int cur_sge = 0; |
|
|
|
|
for (int data_sent = 1; data_sent < op_end; data_sent++) |
|
|
|
|
{ |
|
|
|
|
sge[cur_sge++] = { |
|
|
|
|
.addr = (uintptr_t)cl->send_list[data_sent].iov_base, |
|
|
|
|
.length = (uint32_t)cl->send_list[data_sent].iov_len, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
if (data_sent == op_end-1 || cur_sge >= rc->max_sge) |
|
|
|
|
{ |
|
|
|
|
ibv_send_wr *bad_wr = NULL; |
|
|
|
|
ibv_send_wr wr = { |
|
|
|
|
.wr_id = op_slot, |
|
|
|
|
.next = NULL, |
|
|
|
|
.sg_list = sge, |
|
|
|
|
.num_sge = cur_sge, |
|
|
|
|
.opcode = IBV_WR_RDMA_WRITE, |
|
|
|
|
.send_flags = 0, |
|
|
|
|
.wr = { |
|
|
|
|
.rdma = { |
|
|
|
|
.remote_addr = data_pos, |
|
|
|
|
.rkey = rc->out_data_rkey, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); |
|
|
|
|
if (err || bad_wr) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
rc->cur_send++; |
|
|
|
|
cur_sge = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
uint32_t len = (uint32_t)(op_size+iov.iov_len-rc->send_buf_pos < rc->max_msg |
|
|
|
|
? iov.iov_len-rc->send_buf_pos : rc->max_msg-op_size); |
|
|
|
|
sge[op_sge++] = { |
|
|
|
|
.addr = (uintptr_t)((uint8_t*)iov.iov_base+rc->send_buf_pos), |
|
|
|
|
.length = len, |
|
|
|
|
if (rc->cur_send > rc->max_send-1) |
|
|
|
|
{ |
|
|
|
|
// RDMA queue is full
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
rc->out_op_alloc->set(op_slot, true); |
|
|
|
|
assert(cl->send_list[op_start].iov_len == OSD_PACKET_SIZE); |
|
|
|
|
sge[0] = { |
|
|
|
|
.addr = (uintptr_t)cl->send_list[op_start].iov_base, |
|
|
|
|
.length = (uint32_t)cl->send_list[op_start].iov_len, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
rc->out_slot_data[op_slot] = { .data_pos = data_pos, .data_size = op_data_size }; |
|
|
|
|
rc->out_slot_ops[op_slot] = (cl->outbox[op_end-1].flags & MSGR_SENDP_FREE) |
|
|
|
|
? cl->outbox[op_end-1].op : NULL; |
|
|
|
|
sge[1] = { |
|
|
|
|
.addr = (uintptr_t)(rc->out_slot_data+op_slot), |
|
|
|
|
.length = sizeof(rc->out_slot_data[op_slot]), |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
op_size += len; |
|
|
|
|
rc->send_buf_pos += len; |
|
|
|
|
if (rc->send_buf_pos >= iov.iov_len) |
|
|
|
|
ibv_send_wr *bad_wr = NULL; |
|
|
|
|
ibv_send_wr wr = { |
|
|
|
|
.wr_id = op_slot, |
|
|
|
|
.next = NULL, |
|
|
|
|
.sg_list = sge, |
|
|
|
|
.num_sge = 2, |
|
|
|
|
.opcode = IBV_WR_RDMA_WRITE_WITH_IMM, |
|
|
|
|
.send_flags = IBV_SEND_SIGNALED, |
|
|
|
|
.imm_data = (uint32_t)op_slot, |
|
|
|
|
.wr = { |
|
|
|
|
.rdma = { |
|
|
|
|
.remote_addr = op_slot*sizeof(msgr_rdma_cmd_t), |
|
|
|
|
.rkey = rc->out_op_rkey, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
int err = ibv_post_send(cl->rdma_conn->qp, &wr, &bad_wr); |
|
|
|
|
if (err || bad_wr) |
|
|
|
|
{ |
|
|
|
|
rc->send_pos++; |
|
|
|
|
rc->send_buf_pos = 0; |
|
|
|
|
fprintf(stderr, "RDMA send failed: %s\n", strerror(err)); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
rc->cur_send++; |
|
|
|
|
op_start = op_end; |
|
|
|
|
} |
|
|
|
|
if (op_sge > 0) |
|
|
|
|
if (op_start > 0) |
|
|
|
|
{ |
|
|
|
|
try_send_rdma_wr(cl, sge, op_sge); |
|
|
|
|
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+op_start); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
@ -427,23 +568,87 @@ static void try_recv_rdma_wr(osd_client_t *cl, ibv_sge *sge, int op_sge) |
|
|
|
|
cl->rdma_conn->cur_recv++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void copy_data_to_recv_list(uint8_t *data_buf, uint64_t data_size, osd_client_t *cl) |
|
|
|
|
{ |
|
|
|
|
uint64_t pos = 0; |
|
|
|
|
while (cl->recv_list.done < cl->recv_list.count) |
|
|
|
|
{ |
|
|
|
|
uint64_t cur = cl->recv_list.buf[cl->recv_list.done].iov_len; |
|
|
|
|
assert(cur <= data_size-pos); |
|
|
|
|
memcpy(cl->recv_list.buf[cl->recv_list.done].iov_base, data_buf+pos, cur); |
|
|
|
|
pos += cur; |
|
|
|
|
} |
|
|
|
|
cl->recv_list.reset(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool osd_messenger_t::try_recv_rdma(osd_client_t *cl) |
|
|
|
|
{ |
|
|
|
|
auto rc = cl->rdma_conn; |
|
|
|
|
while (rc->cur_recv < rc->max_recv) |
|
|
|
|
{ |
|
|
|
|
void *buf = malloc_or_die(rc->max_msg); |
|
|
|
|
rc->recv_buffers.push_back(buf); |
|
|
|
|
ibv_sge sge = { |
|
|
|
|
.addr = (uintptr_t)buf, |
|
|
|
|
.length = (uint32_t)rc->max_msg, |
|
|
|
|
.lkey = rc->ctx->mr->lkey, |
|
|
|
|
}; |
|
|
|
|
try_recv_rdma_wr(cl, &sge, 1); |
|
|
|
|
try_recv_rdma_wr(cl, NULL, 0); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool osd_messenger_t::rdma_handle_op(osd_client_t *cl, uint32_t op_slot) |
|
|
|
|
{ |
|
|
|
|
auto rc = cl->rdma_conn; |
|
|
|
|
if (op_slot >= rc->in_op_cap) |
|
|
|
|
{ |
|
|
|
|
// Invalid incoming index
|
|
|
|
|
fprintf(stderr, "Client %d invalid incoming RDMA op slot: %u, dropping connection\n", cl->peer_fd, op_slot); |
|
|
|
|
stop_client(cl->peer_fd); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
osd_op_header_t *hdr = (osd_op_header_t *)rc->in_ops[op_slot].header; |
|
|
|
|
uint8_t *data_buf = (uint8_t*)rc->in_data_buf + rc->in_ops[op_slot].pos.data_pos; |
|
|
|
|
uint64_t data_size = rc->in_ops[op_slot].pos.data_size; |
|
|
|
|
if (hdr->magic == SECONDARY_OSD_REPLY_MAGIC) |
|
|
|
|
{ |
|
|
|
|
// Reply
|
|
|
|
|
if (cl->read_op) |
|
|
|
|
{ |
|
|
|
|
delete cl->read_op; |
|
|
|
|
cl->read_op = NULL; |
|
|
|
|
} |
|
|
|
|
if (!handle_reply_hdr(rc->in_ops[op_slot].header, cl)) |
|
|
|
|
return false; |
|
|
|
|
if (cl->read_state == CL_READ_REPLY_DATA) |
|
|
|
|
{ |
|
|
|
|
// copy reply data to cl->recv_list
|
|
|
|
|
copy_data_to_recv_list(data_buf, data_size, cl); |
|
|
|
|
// and handle reply with data
|
|
|
|
|
handle_reply_ready(cl->read_op); |
|
|
|
|
cl->read_op = NULL; |
|
|
|
|
cl->read_state = 0; |
|
|
|
|
cl->read_remaining = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// Operation
|
|
|
|
|
cl->read_op = new osd_op_t; |
|
|
|
|
cl->read_op->peer_fd = cl->peer_fd; |
|
|
|
|
cl->read_op->op_type = OSD_OP_IN; |
|
|
|
|
memcpy(&cl->read_op->req, hdr, OSD_PACKET_SIZE); |
|
|
|
|
handle_op_hdr(cl); |
|
|
|
|
if (cl->read_state == CL_READ_DATA) |
|
|
|
|
{ |
|
|
|
|
copy_data_to_recv_list(data_buf, data_size, cl); |
|
|
|
|
// And handle the incoming op with data
|
|
|
|
|
cl->received_ops.push_back(cl->read_op); |
|
|
|
|
set_immediate.push_back([this, op = cl->read_op]() { exec_op(op); }); |
|
|
|
|
cl->read_op = NULL; |
|
|
|
|
cl->read_state = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// We don't need the incoming data buffer anymore, notify peer about it
|
|
|
|
|
// FIXME: Allow to pass memory to the internal layer without copying and notify after handling it
|
|
|
|
|
rc->in_slots_freed.push_back(op_slot); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#define RDMA_EVENTS_AT_ONCE 32 |
|
|
|
|
|
|
|
|
|
void osd_messenger_t::handle_rdma_events() |
|
|
|
@ -468,9 +673,9 @@ void osd_messenger_t::handle_rdma_events() |
|
|
|
|
event_count = ibv_poll_cq(rdma_context->cq, RDMA_EVENTS_AT_ONCE, wc); |
|
|
|
|
for (int i = 0; i < event_count; i++) |
|
|
|
|
{ |
|
|
|
|
int client_id = wc[i].wr_id >> 1; |
|
|
|
|
bool is_send = wc[i].wr_id & 1; |
|
|
|
|
auto cl_it = clients.find(client_id); |
|
|
|
|
auto cqp_it = clients_by_qp.find(wc[i].qp_num); |
|
|
|
|
int peer_fd = cqp_it != clients_by_qp.end() ? cqp_it->second : -1; |
|
|
|
|
auto cl_it = clients.find(peer_fd); |
|
|
|
|
if (cl_it == clients.end()) |
|
|
|
|
{ |
|
|
|
|
continue; |
|
|
|
@ -478,55 +683,51 @@ void osd_messenger_t::handle_rdma_events() |
|
|
|
|
osd_client_t *cl = cl_it->second; |
|
|
|
|
if (wc[i].status != IBV_WC_SUCCESS) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "RDMA work request failed for client %d", client_id); |
|
|
|
|
fprintf(stderr, "RDMA work request failed for client %d", peer_fd); |
|
|
|
|
if (cl->osd_num) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, " (OSD %lu)", cl->osd_num); |
|
|
|
|
} |
|
|
|
|
fprintf(stderr, " with status: %s, stopping client\n", ibv_wc_status_str(wc[i].status)); |
|
|
|
|
stop_client(client_id); |
|
|
|
|
if (peer_fd >= 0) |
|
|
|
|
stop_client(peer_fd); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
if (!is_send) |
|
|
|
|
auto rc = cl->rdma_conn; |
|
|
|
|
if (wc[i].opcode == IBV_WC_RDMA_WRITE) |
|
|
|
|
{ |
|
|
|
|
cl->rdma_conn->cur_recv--; |
|
|
|
|
if (!handle_read_buffer(cl, cl->rdma_conn->recv_buffers[0], wc[i].byte_len)) |
|
|
|
|
// Operation or reply is sent, we can free it
|
|
|
|
|
auto & op = rc->out_slot_ops[wc[i].wr_id]; |
|
|
|
|
if (op) |
|
|
|
|
{ |
|
|
|
|
// handle_read_buffer may stop the client
|
|
|
|
|
continue; |
|
|
|
|
delete op; |
|
|
|
|
op = NULL; |
|
|
|
|
} |
|
|
|
|
free(cl->rdma_conn->recv_buffers[0]); |
|
|
|
|
cl->rdma_conn->recv_buffers.erase(cl->rdma_conn->recv_buffers.begin(), cl->rdma_conn->recv_buffers.begin()+1); |
|
|
|
|
try_recv_rdma(cl); |
|
|
|
|
rc->cur_send--; |
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
else if (wc[i].opcode == IBV_WC_RECV) |
|
|
|
|
{ |
|
|
|
|
cl->rdma_conn->cur_send--; |
|
|
|
|
if (!cl->rdma_conn->cur_send) |
|
|
|
|
if (!(wc[i].imm_data & 0x80000000)) |
|
|
|
|
{ |
|
|
|
|
// Wait for the whole batch
|
|
|
|
|
for (int i = 0; i < cl->rdma_conn->send_pos; i++) |
|
|
|
|
// Operation or reply received. Handle it
|
|
|
|
|
if (!rdma_handle_op(cl, wc[i].imm_data)) |
|
|
|
|
{ |
|
|
|
|
if (cl->outbox[i].flags & MSGR_SENDP_FREE) |
|
|
|
|
{ |
|
|
|
|
// Reply fully sent
|
|
|
|
|
delete cl->outbox[i].op; |
|
|
|
|
} |
|
|
|
|
// false means that the client is stopped due to invalid operation
|
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
if (cl->rdma_conn->send_pos > 0) |
|
|
|
|
{ |
|
|
|
|
cl->send_list.erase(cl->send_list.begin(), cl->send_list.begin()+cl->rdma_conn->send_pos); |
|
|
|
|
cl->outbox.erase(cl->outbox.begin(), cl->outbox.begin()+cl->rdma_conn->send_pos); |
|
|
|
|
cl->rdma_conn->send_pos = 0; |
|
|
|
|
} |
|
|
|
|
if (cl->rdma_conn->send_buf_pos > 0) |
|
|
|
|
{ |
|
|
|
|
cl->send_list[0].iov_base = (uint8_t*)cl->send_list[0].iov_base + cl->rdma_conn->send_buf_pos; |
|
|
|
|
cl->send_list[0].iov_len -= cl->rdma_conn->send_buf_pos; |
|
|
|
|
cl->rdma_conn->send_buf_pos = 0; |
|
|
|
|
} |
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
rc->cur_recv--; |
|
|
|
|
try_recv_rdma(cl); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
// Outbox slot is marked as free (the remote side doesn't need it anymore)
|
|
|
|
|
uint32_t op_slot = wc[i].imm_data & 0x7FFFFFFF; |
|
|
|
|
auto & pos = rc->in_ops[op_slot].pos; |
|
|
|
|
if (pos.data_size > 0) |
|
|
|
|
rc->out_data_alloc.free(pos.data_pos, pos.data_size); |
|
|
|
|
rc->out_op_alloc->set(op_slot, false); |
|
|
|
|
} |
|
|
|
|
// Try to continue sending
|
|
|
|
|
try_send_rdma(cl); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} while (event_count > 0); |
|
|
|
|