diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 635e6259..57bbcaa1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -145,7 +145,7 @@ endif (${WITH_FIO}) # vitastor-nbd add_executable(vitastor-nbd - nbd_proxy.cpp + nbd_proxy.cpp mmap_manager.cpp ) target_link_libraries(vitastor-nbd vitastor_client diff --git a/src/mmap_manager.cpp b/src/mmap_manager.cpp new file mode 100644 index 00000000..2cb8be76 --- /dev/null +++ b/src/mmap_manager.cpp @@ -0,0 +1,82 @@ +#include +#include +#include +#include "mmap_manager.h" + +mmap_manager_t::mmap_manager_t(uint64_t mmap_size) +{ + this->mmap_size = mmap_size; +} + +mmap_manager_t::~mmap_manager_t() +{ + for (auto & kv: past_buffers) + { + munmap(kv.second.addr, kv.second.size); + } + if (active_buffer.addr != NULL) + { + munmap(active_buffer.addr, active_buffer.size); + } +} + +void *mmap_manager_t::alloc(uint64_t size) +{ + if (!active_buffer.addr || (active_buffer.pos + size) > active_buffer.size) + { + if (active_buffer.addr) + { + if (active_buffer.freed >= active_buffer.pos) + munmap(active_buffer.addr, active_buffer.size); + else + past_buffers[active_buffer.addr] = active_buffer; + active_buffer = { 0 }; + } + uint64_t new_size = size < mmap_size ? mmap_size : size; + void *buf = mmap(NULL, new_size, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0); + if (!buf) + throw std::runtime_error(std::string("can't mmap "+std::to_string(new_size)+" bytes")); + active_buffer = { + .addr = buf, + .size = new_size, + .freed = 0, + .pos = 0, + }; + } + void *res = active_buffer.addr + active_buffer.pos; + active_buffer.pos += size; + return res; +} + +void mmap_manager_t::free(void *addr, uint64_t size) +{ + auto it = past_buffers.upper_bound(addr); + if (it != past_buffers.begin()) + { + if (it == past_buffers.end()) + { + it--; + if (addr < it->second.addr || addr >= it->second.addr+it->second.size) + it = past_buffers.end(); + } + else + it--; + } + else + it = past_buffers.end(); + if (it != past_buffers.end()) + { + assert(addr >= it->second.addr && addr+size <= it->second.addr+it->second.size); + it->second.freed += size; + if (it->second.freed >= it->second.pos) + { + munmap(it->second.addr, it->second.size); + past_buffers.erase(it); + } + } + else + { + assert(addr < active_buffer.addr+active_buffer.size); + active_buffer.freed += size; + } +} diff --git a/src/mmap_manager.h b/src/mmap_manager.h new file mode 100644 index 00000000..764865ed --- /dev/null +++ b/src/mmap_manager.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include + +struct mmap_buffer_t +{ + void *addr = NULL; + uint64_t size = 0; + uint64_t freed = 0; + uint64_t pos = 0; +}; + +class mmap_manager_t +{ +protected: + uint64_t mmap_size = 32*1024*1024; + std::map past_buffers; + mmap_buffer_t active_buffer; + +public: + mmap_manager_t(uint64_t mmap_size = 32*1024*1024); + ~mmap_manager_t(); + void *alloc(uint64_t size); + void free(void *addr, uint64_t size); +}; diff --git a/src/nbd_proxy.cpp b/src/nbd_proxy.cpp index e4d34e3c..956185a6 100644 --- a/src/nbd_proxy.cpp +++ b/src/nbd_proxy.cpp @@ -17,6 +17,8 @@ #include "epoll_manager.h" #include "cluster_client.h" +#include "mmap_manager.h" +#include #ifndef MSG_ZEROCOPY #define MSG_ZEROCOPY 0 @@ -24,6 +26,12 @@ const char *exe_name = NULL; +struct buf_to_free_t +{ + void *buf = NULL; + uint64_t unmap = 0; +}; + class nbd_proxy { protected: @@ -38,7 +46,7 @@ protected: ring_consumer_t consumer; std::vector send_list, next_send_list; - std::vector to_free; + std::vector to_free; int nbd_fd = -1; void *recv_buf = NULL; int receive_buffer_size = 9000; @@ -51,6 +59,9 @@ protected: msghdr read_msg = { 0 }, send_msg = { 0 }; iovec read_iov = { 0 }; + mmap_manager_t mm; + int pipe_fd[2]; + public: static json11::Json::object parse_args(int narg, const char *args[]) { @@ -174,6 +185,12 @@ public: exit(1); } } + // Create pipe for splicing + if (pipe(pipe_fd) < 0) + { + fprintf(stderr, "pipe failed: %s\n", strerror(errno)); + exit(1); + } // Create client ringloop = new ring_loop_t(512); epmgr = new epoll_manager_t(ringloop); @@ -522,16 +539,59 @@ protected: { return; } - io_uring_sqe* sqe = ringloop->get_sqe(); - if (!sqe) + int i; + //uint64_t len = 0; + for (i = 0; i < send_list.size(); i++) { - return; + if (to_free[i].unmap) + { + break; + } + //len += send_list[i].iov_len; + } + //if (true) + if (i > 0) + { + /*io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + return; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + data->callback = [this](ring_data_t *data) { handle_send(data->res); };*/ + send_msg.msg_iov = send_list.data(); + //send_msg.msg_iovlen = send_list.size(); + send_msg.msg_iovlen = i; + //my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY); + int res = sendmsg(nbd_fd, &send_msg, MSG_ZEROCOPY); + if (res < 0) + res = -errno; + handle_send(res); + //int r = sendmsg(int sockfd, const struct msghdr *msg, int flags); + } + else + { + int res = vmsplice(pipe_fd[1], send_list.data(), 1, SPLICE_F_GIFT); + if (res < 0) + { + throw std::runtime_error(std::string("vmsplice: ")+strerror(errno)); + } + int sent = res, spl = res; + while (spl > 0) + { + res = splice(pipe_fd[0], NULL, nbd_fd, NULL, spl, SPLICE_F_MOVE); + if (res < 0) + { + if (errno != EAGAIN) + throw std::runtime_error(std::string("splice: ")+strerror(errno)); + } + else + { + spl -= res; + } + } + handle_send(sent); } - ring_data_t* data = ((ring_data_t*)sqe->user_data); - data->callback = [this](ring_data_t *data) { handle_send(data->res); }; - send_msg.msg_iov = send_list.data(); - send_msg.msg_iovlen = send_list.size(); - my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY); } void handle_send(int result) @@ -547,7 +607,10 @@ protected: { if (result >= send_list[to_eat].iov_len) { - free(to_free[to_eat]); + if (to_free[to_eat].unmap) + mm.free(to_free[to_eat].buf, to_free[to_eat].unmap); + else + free(to_free[to_eat].buf); result -= send_list[to_eat].iov_len; to_eat++; } @@ -659,6 +722,7 @@ protected: printf("request %lx +%x %lx\n", be64toh(cur_req.from), be32toh(cur_req.len), handle); #endif void *buf = NULL; + nbd_reply *reply = NULL; cluster_op_t *op = new cluster_op_t; if (req_type == NBD_CMD_READ || req_type == NBD_CMD_WRITE) { @@ -666,36 +730,51 @@ protected: op->inode = inode ? inode : watch->cfg.num; op->offset = be64toh(cur_req.from); op->len = be32toh(cur_req.len); - buf = malloc_or_die(sizeof(nbd_reply) + op->len); - op->iov.push_back(buf + sizeof(nbd_reply), op->len); + if (req_type == NBD_CMD_WRITE) + { + buf = malloc_or_die(sizeof(nbd_reply) + op->len); + reply = (nbd_reply*)buf; + op->iov.push_back(buf + sizeof(nbd_reply), op->len); + } + else + { + buf = mm.alloc(op->len); + reply = (nbd_reply*)malloc_or_die(sizeof(nbd_reply)); + op->iov.push_back(buf, op->len); + } } else if (req_type == NBD_CMD_FLUSH) { op->opcode = OSD_OP_SYNC; - buf = malloc_or_die(sizeof(nbd_reply)); + reply = (nbd_reply*)malloc_or_die(sizeof(nbd_reply)); } - op->callback = [this, buf, handle](cluster_op_t *op) + op->callback = [this, buf, reply, handle](cluster_op_t *op) { #ifdef DEBUG printf("reply %lx e=%d\n", handle, op->retval); #endif - nbd_reply *reply = (nbd_reply*)buf; reply->magic = htobe32(NBD_REPLY_MAGIC); memcpy(reply->handle, &handle, 8); reply->error = htobe32(op->retval < 0 ? -op->retval : 0); auto & to_list = send_msg.msg_iovlen > 0 ? next_send_list : send_list; - if (op->retval < 0 || op->opcode != OSD_OP_READ) - to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) }); - else - to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) + op->len }); - to_free.push_back(buf); + to_list.push_back((iovec){ .iov_base = reply, .iov_len = sizeof(nbd_reply) }); + to_free.push_back((buf_to_free_t){ .buf = reply, .unmap = 0 }); + if (op->retval >= 0 && op->opcode == OSD_OP_READ) + { + to_list.push_back((iovec){ .iov_base = buf, .iov_len = op->len }); + to_free.push_back((buf_to_free_t){ .buf = buf, .unmap = op->len }); + } + else if (op->opcode == OSD_OP_READ) + { + mm.free(buf, op->len); + } delete op; ringloop->wakeup(); }; if (req_type == NBD_CMD_WRITE) { cur_op = op; - cur_buf = buf + sizeof(nbd_reply); + cur_buf = buf; cur_left = op->len; read_state = CL_READ_DATA; }