|
|
@ -17,6 +17,8 @@ |
|
|
|
|
|
|
|
#include "epoll_manager.h" |
|
|
|
#include "cluster_client.h" |
|
|
|
#include "mmap_manager.h" |
|
|
|
#include <stdexcept> |
|
|
|
|
|
|
|
#ifndef MSG_ZEROCOPY |
|
|
|
#define MSG_ZEROCOPY 0 |
|
|
@ -24,6 +26,24 @@ |
|
|
|
|
|
|
|
const char *exe_name = NULL; |
|
|
|
|
|
|
|
static inline void my_uring_prep_splice(struct io_uring_sqe *sqe, |
|
|
|
int fd_in, int64_t off_in, |
|
|
|
int fd_out, int64_t off_out, |
|
|
|
unsigned int nbytes, |
|
|
|
unsigned int splice_flags) |
|
|
|
{ |
|
|
|
my_uring_prep_rw(IORING_OP_SPLICE, sqe, fd_out, NULL, nbytes, (__u64) off_out); |
|
|
|
sqe->splice_off_in = (__u64) off_in; |
|
|
|
sqe->splice_fd_in = fd_in; |
|
|
|
sqe->splice_flags = splice_flags; |
|
|
|
} |
|
|
|
|
|
|
|
struct buf_to_free_t |
|
|
|
{ |
|
|
|
void *buf = NULL; |
|
|
|
uint64_t unmap = 0; |
|
|
|
}; |
|
|
|
|
|
|
|
class nbd_proxy |
|
|
|
{ |
|
|
|
protected: |
|
|
@ -38,7 +58,7 @@ protected: |
|
|
|
ring_consumer_t consumer; |
|
|
|
|
|
|
|
std::vector<iovec> send_list, next_send_list; |
|
|
|
std::vector<void*> to_free; |
|
|
|
std::vector<buf_to_free_t> to_free; |
|
|
|
int nbd_fd = -1; |
|
|
|
void *recv_buf = NULL; |
|
|
|
int receive_buffer_size = 9000; |
|
|
@ -51,6 +71,10 @@ protected: |
|
|
|
msghdr read_msg = { 0 }, send_msg = { 0 }; |
|
|
|
iovec read_iov = { 0 }; |
|
|
|
|
|
|
|
mmap_manager_t mm; |
|
|
|
int pipe_fd[2]; |
|
|
|
int vmspliced = 0; |
|
|
|
|
|
|
|
public: |
|
|
|
static json11::Json::object parse_args(int narg, const char *args[]) |
|
|
|
{ |
|
|
@ -174,6 +198,12 @@ public: |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
} |
|
|
|
// Create pipe for splicing
|
|
|
|
if (pipe(pipe_fd) < 0) |
|
|
|
{ |
|
|
|
fprintf(stderr, "pipe failed: %s\n", strerror(errno)); |
|
|
|
exit(1); |
|
|
|
} |
|
|
|
// Create client
|
|
|
|
ringloop = new ring_loop_t(512); |
|
|
|
epmgr = new epoll_manager_t(ringloop); |
|
|
@ -522,16 +552,76 @@ protected: |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
io_uring_sqe* sqe = ringloop->get_sqe(); |
|
|
|
if (!sqe) |
|
|
|
int i; |
|
|
|
//uint64_t len = 0;
|
|
|
|
for (i = 0; i < send_list.size(); i++) |
|
|
|
{ |
|
|
|
return; |
|
|
|
if (to_free[i].unmap) |
|
|
|
{ |
|
|
|
break; |
|
|
|
} |
|
|
|
//len += send_list[i].iov_len;
|
|
|
|
} |
|
|
|
//if (true)
|
|
|
|
if (i > 0) |
|
|
|
{ |
|
|
|
/*io_uring_sqe* sqe = ringloop->get_sqe();
|
|
|
|
if (!sqe) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
ring_data_t* data = ((ring_data_t*)sqe->user_data); |
|
|
|
data->callback = [this](ring_data_t *data) { handle_send(data->res); };*/ |
|
|
|
send_msg.msg_iov = send_list.data(); |
|
|
|
//send_msg.msg_iovlen = send_list.size();
|
|
|
|
send_msg.msg_iovlen = i; |
|
|
|
//my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY);
|
|
|
|
int res = sendmsg(nbd_fd, &send_msg, MSG_ZEROCOPY); |
|
|
|
if (res < 0) |
|
|
|
res = -errno; |
|
|
|
handle_send(res); |
|
|
|
//int r = sendmsg(int sockfd, const struct msghdr *msg, int flags);
|
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
io_uring_sqe* sqe = ringloop->get_sqe(); |
|
|
|
if (!sqe) |
|
|
|
{ |
|
|
|
return; |
|
|
|
} |
|
|
|
if (vmspliced <= 0) |
|
|
|
{ |
|
|
|
vmspliced = vmsplice(pipe_fd[1], send_list.data(), 1, SPLICE_F_GIFT); |
|
|
|
if (vmspliced < 0) |
|
|
|
{ |
|
|
|
throw std::runtime_error(std::string("vmsplice: ")+strerror(errno)); |
|
|
|
} |
|
|
|
} |
|
|
|
send_msg.msg_iovlen = 1; |
|
|
|
ring_data_t* data = ((ring_data_t*)sqe->user_data); |
|
|
|
data->callback = [this](ring_data_t *data) |
|
|
|
{ |
|
|
|
if (data->res > 0) |
|
|
|
vmspliced -= data->res; |
|
|
|
handle_send(data->res); |
|
|
|
}; |
|
|
|
my_uring_prep_splice(sqe, pipe_fd[0], -1l, nbd_fd, -1l, vmspliced, SPLICE_F_MOVE); |
|
|
|
/*int sent = res, spl = res;
|
|
|
|
while (spl > 0) |
|
|
|
{ |
|
|
|
res = splice(pipe_fd[0], NULL, nbd_fd, NULL, spl, SPLICE_F_MOVE); |
|
|
|
if (res < 0) |
|
|
|
{ |
|
|
|
if (errno != EAGAIN) |
|
|
|
throw std::runtime_error(std::string("splice: ")+strerror(errno)); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
spl -= res; |
|
|
|
} |
|
|
|
} |
|
|
|
handle_send(sent);*/ |
|
|
|
} |
|
|
|
ring_data_t* data = ((ring_data_t*)sqe->user_data); |
|
|
|
data->callback = [this](ring_data_t *data) { handle_send(data->res); }; |
|
|
|
send_msg.msg_iov = send_list.data(); |
|
|
|
send_msg.msg_iovlen = send_list.size(); |
|
|
|
my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY); |
|
|
|
} |
|
|
|
|
|
|
|
void handle_send(int result) |
|
|
@ -547,7 +637,10 @@ protected: |
|
|
|
{ |
|
|
|
if (result >= send_list[to_eat].iov_len) |
|
|
|
{ |
|
|
|
free(to_free[to_eat]); |
|
|
|
if (to_free[to_eat].unmap) |
|
|
|
mm.free(to_free[to_eat].buf, to_free[to_eat].unmap); |
|
|
|
else |
|
|
|
free(to_free[to_eat].buf); |
|
|
|
result -= send_list[to_eat].iov_len; |
|
|
|
to_eat++; |
|
|
|
} |
|
|
@ -659,6 +752,7 @@ protected: |
|
|
|
printf("request %lx +%x %lx\n", be64toh(cur_req.from), be32toh(cur_req.len), handle); |
|
|
|
#endif |
|
|
|
void *buf = NULL; |
|
|
|
nbd_reply *reply = NULL; |
|
|
|
cluster_op_t *op = new cluster_op_t; |
|
|
|
if (req_type == NBD_CMD_READ || req_type == NBD_CMD_WRITE) |
|
|
|
{ |
|
|
@ -666,36 +760,51 @@ protected: |
|
|
|
op->inode = inode ? inode : watch->cfg.num; |
|
|
|
op->offset = be64toh(cur_req.from); |
|
|
|
op->len = be32toh(cur_req.len); |
|
|
|
buf = malloc_or_die(sizeof(nbd_reply) + op->len); |
|
|
|
op->iov.push_back(buf + sizeof(nbd_reply), op->len); |
|
|
|
if (req_type == NBD_CMD_WRITE) |
|
|
|
{ |
|
|
|
buf = malloc_or_die(sizeof(nbd_reply) + op->len); |
|
|
|
reply = (nbd_reply*)buf; |
|
|
|
op->iov.push_back(buf + sizeof(nbd_reply), op->len); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
buf = mm.alloc(op->len); |
|
|
|
reply = (nbd_reply*)malloc_or_die(sizeof(nbd_reply)); |
|
|
|
op->iov.push_back(buf, op->len); |
|
|
|
} |
|
|
|
} |
|
|
|
else if (req_type == NBD_CMD_FLUSH) |
|
|
|
{ |
|
|
|
op->opcode = OSD_OP_SYNC; |
|
|
|
buf = malloc_or_die(sizeof(nbd_reply)); |
|
|
|
reply = (nbd_reply*)malloc_or_die(sizeof(nbd_reply)); |
|
|
|
} |
|
|
|
op->callback = [this, buf, handle](cluster_op_t *op) |
|
|
|
op->callback = [this, buf, reply, handle](cluster_op_t *op) |
|
|
|
{ |
|
|
|
#ifdef DEBUG |
|
|
|
printf("reply %lx e=%d\n", handle, op->retval); |
|
|
|
#endif |
|
|
|
nbd_reply *reply = (nbd_reply*)buf; |
|
|
|
reply->magic = htobe32(NBD_REPLY_MAGIC); |
|
|
|
memcpy(reply->handle, &handle, 8); |
|
|
|
reply->error = htobe32(op->retval < 0 ? -op->retval : 0); |
|
|
|
auto & to_list = send_msg.msg_iovlen > 0 ? next_send_list : send_list; |
|
|
|
if (op->retval < 0 || op->opcode != OSD_OP_READ) |
|
|
|
to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) }); |
|
|
|
else |
|
|
|
to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) + op->len }); |
|
|
|
to_free.push_back(buf); |
|
|
|
to_list.push_back((iovec){ .iov_base = reply, .iov_len = sizeof(nbd_reply) }); |
|
|
|
to_free.push_back((buf_to_free_t){ .buf = reply, .unmap = 0 }); |
|
|
|
if (op->retval >= 0 && op->opcode == OSD_OP_READ) |
|
|
|
{ |
|
|
|
to_list.push_back((iovec){ .iov_base = buf, .iov_len = op->len }); |
|
|
|
to_free.push_back((buf_to_free_t){ .buf = buf, .unmap = op->len }); |
|
|
|
} |
|
|
|
else if (op->opcode == OSD_OP_READ) |
|
|
|
{ |
|
|
|
mm.free(buf, op->len); |
|
|
|
} |
|
|
|
delete op; |
|
|
|
ringloop->wakeup(); |
|
|
|
}; |
|
|
|
if (req_type == NBD_CMD_WRITE) |
|
|
|
{ |
|
|
|
cur_op = op; |
|
|
|
cur_buf = buf + sizeof(nbd_reply); |
|
|
|
cur_buf = buf; |
|
|
|
cur_left = op->len; |
|
|
|
read_state = CL_READ_DATA; |
|
|
|
} |
|
|
|