Experiment: vmsplice+splice "zero-copy" read in NBD

nbd-vmsplice
Vitaliy Filippov 2021-11-21 22:52:24 +03:00
parent a863013cb2
commit 6c3248a36c
4 changed files with 209 additions and 22 deletions

View File

@ -145,7 +145,7 @@ endif (${WITH_FIO})
# vitastor-nbd
add_executable(vitastor-nbd
nbd_proxy.cpp
nbd_proxy.cpp mmap_manager.cpp
)
target_link_libraries(vitastor-nbd
vitastor_client

82
src/mmap_manager.cpp Normal file
View File

@ -0,0 +1,82 @@
#include <stdexcept>
#include <cassert>
#include <sys/mman.h>
#include "mmap_manager.h"
mmap_manager_t::mmap_manager_t(uint64_t mmap_size)
{
this->mmap_size = mmap_size;
}
mmap_manager_t::~mmap_manager_t()
{
for (auto & kv: past_buffers)
{
munmap(kv.second.addr, kv.second.size);
}
if (active_buffer.addr != NULL)
{
munmap(active_buffer.addr, active_buffer.size);
}
}
void *mmap_manager_t::alloc(uint64_t size)
{
if (!active_buffer.addr || (active_buffer.pos + size) > active_buffer.size)
{
if (active_buffer.addr)
{
if (active_buffer.freed >= active_buffer.pos)
munmap(active_buffer.addr, active_buffer.size);
else
past_buffers[active_buffer.addr] = active_buffer;
active_buffer = { 0 };
}
uint64_t new_size = size < mmap_size ? mmap_size : size;
void *buf = mmap(NULL, new_size, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
if (!buf)
throw std::runtime_error(std::string("can't mmap "+std::to_string(new_size)+" bytes"));
active_buffer = {
.addr = buf,
.size = new_size,
.freed = 0,
.pos = 0,
};
}
void *res = active_buffer.addr + active_buffer.pos;
active_buffer.pos += size;
return res;
}
void mmap_manager_t::free(void *addr, uint64_t size)
{
auto it = past_buffers.upper_bound(addr);
if (it != past_buffers.begin())
{
if (it == past_buffers.end())
{
it--;
if (addr < it->second.addr || addr >= it->second.addr+it->second.size)
it = past_buffers.end();
}
else
it--;
}
else
it = past_buffers.end();
if (it != past_buffers.end())
{
assert(addr >= it->second.addr && addr+size <= it->second.addr+it->second.size);
it->second.freed += size;
if (it->second.freed >= it->second.pos)
{
munmap(it->second.addr, it->second.size);
past_buffers.erase(it);
}
}
else
{
assert(addr < active_buffer.addr+active_buffer.size);
active_buffer.freed += size;
}
}

26
src/mmap_manager.h Normal file
View File

@ -0,0 +1,26 @@
#pragma once
#include <stdint.h>
#include <map>
struct mmap_buffer_t
{
void *addr = NULL;
uint64_t size = 0;
uint64_t freed = 0;
uint64_t pos = 0;
};
class mmap_manager_t
{
protected:
uint64_t mmap_size = 32*1024*1024;
std::map<void*, mmap_buffer_t> past_buffers;
mmap_buffer_t active_buffer;
public:
mmap_manager_t(uint64_t mmap_size = 32*1024*1024);
~mmap_manager_t();
void *alloc(uint64_t size);
void free(void *addr, uint64_t size);
};

View File

@ -17,6 +17,8 @@
#include "epoll_manager.h"
#include "cluster_client.h"
#include "mmap_manager.h"
#include <stdexcept>
#ifndef MSG_ZEROCOPY
#define MSG_ZEROCOPY 0
@ -24,6 +26,12 @@
const char *exe_name = NULL;
struct buf_to_free_t
{
void *buf = NULL;
uint64_t unmap = 0;
};
class nbd_proxy
{
protected:
@ -38,7 +46,7 @@ protected:
ring_consumer_t consumer;
std::vector<iovec> send_list, next_send_list;
std::vector<void*> to_free;
std::vector<buf_to_free_t> to_free;
int nbd_fd = -1;
void *recv_buf = NULL;
int receive_buffer_size = 9000;
@ -51,6 +59,9 @@ protected:
msghdr read_msg = { 0 }, send_msg = { 0 };
iovec read_iov = { 0 };
mmap_manager_t mm;
int pipe_fd[2];
public:
static json11::Json::object parse_args(int narg, const char *args[])
{
@ -174,6 +185,12 @@ public:
exit(1);
}
}
// Create pipe for splicing
if (pipe(pipe_fd) < 0)
{
fprintf(stderr, "pipe failed: %s\n", strerror(errno));
exit(1);
}
// Create client
ringloop = new ring_loop_t(512);
epmgr = new epoll_manager_t(ringloop);
@ -522,16 +539,59 @@ protected:
{
return;
}
io_uring_sqe* sqe = ringloop->get_sqe();
int i;
//uint64_t len = 0;
for (i = 0; i < send_list.size(); i++)
{
if (to_free[i].unmap)
{
break;
}
//len += send_list[i].iov_len;
}
//if (true)
if (i > 0)
{
/*io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this](ring_data_t *data) { handle_send(data->res); };
data->callback = [this](ring_data_t *data) { handle_send(data->res); };*/
send_msg.msg_iov = send_list.data();
send_msg.msg_iovlen = send_list.size();
my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY);
//send_msg.msg_iovlen = send_list.size();
send_msg.msg_iovlen = i;
//my_uring_prep_sendmsg(sqe, nbd_fd, &send_msg, MSG_ZEROCOPY);
int res = sendmsg(nbd_fd, &send_msg, MSG_ZEROCOPY);
if (res < 0)
res = -errno;
handle_send(res);
//int r = sendmsg(int sockfd, const struct msghdr *msg, int flags);
}
else
{
int res = vmsplice(pipe_fd[1], send_list.data(), 1, SPLICE_F_GIFT);
if (res < 0)
{
throw std::runtime_error(std::string("vmsplice: ")+strerror(errno));
}
int sent = res, spl = res;
while (spl > 0)
{
res = splice(pipe_fd[0], NULL, nbd_fd, NULL, spl, SPLICE_F_MOVE);
if (res < 0)
{
if (errno != EAGAIN)
throw std::runtime_error(std::string("splice: ")+strerror(errno));
}
else
{
spl -= res;
}
}
handle_send(sent);
}
}
void handle_send(int result)
@ -547,7 +607,10 @@ protected:
{
if (result >= send_list[to_eat].iov_len)
{
free(to_free[to_eat]);
if (to_free[to_eat].unmap)
mm.free(to_free[to_eat].buf, to_free[to_eat].unmap);
else
free(to_free[to_eat].buf);
result -= send_list[to_eat].iov_len;
to_eat++;
}
@ -659,6 +722,7 @@ protected:
printf("request %lx +%x %lx\n", be64toh(cur_req.from), be32toh(cur_req.len), handle);
#endif
void *buf = NULL;
nbd_reply *reply = NULL;
cluster_op_t *op = new cluster_op_t;
if (req_type == NBD_CMD_READ || req_type == NBD_CMD_WRITE)
{
@ -666,36 +730,51 @@ protected:
op->inode = inode ? inode : watch->cfg.num;
op->offset = be64toh(cur_req.from);
op->len = be32toh(cur_req.len);
if (req_type == NBD_CMD_WRITE)
{
buf = malloc_or_die(sizeof(nbd_reply) + op->len);
reply = (nbd_reply*)buf;
op->iov.push_back(buf + sizeof(nbd_reply), op->len);
}
else
{
buf = mm.alloc(op->len);
reply = (nbd_reply*)malloc_or_die(sizeof(nbd_reply));
op->iov.push_back(buf, op->len);
}
}
else if (req_type == NBD_CMD_FLUSH)
{
op->opcode = OSD_OP_SYNC;
buf = malloc_or_die(sizeof(nbd_reply));
reply = (nbd_reply*)malloc_or_die(sizeof(nbd_reply));
}
op->callback = [this, buf, handle](cluster_op_t *op)
op->callback = [this, buf, reply, handle](cluster_op_t *op)
{
#ifdef DEBUG
printf("reply %lx e=%d\n", handle, op->retval);
#endif
nbd_reply *reply = (nbd_reply*)buf;
reply->magic = htobe32(NBD_REPLY_MAGIC);
memcpy(reply->handle, &handle, 8);
reply->error = htobe32(op->retval < 0 ? -op->retval : 0);
auto & to_list = send_msg.msg_iovlen > 0 ? next_send_list : send_list;
if (op->retval < 0 || op->opcode != OSD_OP_READ)
to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) });
else
to_list.push_back({ .iov_base = buf, .iov_len = sizeof(nbd_reply) + op->len });
to_free.push_back(buf);
to_list.push_back((iovec){ .iov_base = reply, .iov_len = sizeof(nbd_reply) });
to_free.push_back((buf_to_free_t){ .buf = reply, .unmap = 0 });
if (op->retval >= 0 && op->opcode == OSD_OP_READ)
{
to_list.push_back((iovec){ .iov_base = buf, .iov_len = op->len });
to_free.push_back((buf_to_free_t){ .buf = buf, .unmap = op->len });
}
else if (op->opcode == OSD_OP_READ)
{
mm.free(buf, op->len);
}
delete op;
ringloop->wakeup();
};
if (req_type == NBD_CMD_WRITE)
{
cur_op = op;
cur_buf = buf + sizeof(nbd_reply);
cur_buf = buf;
cur_left = op->len;
read_state = CL_READ_DATA;
}