Allow to turn synchronous recvmsg/sendmsg on with a config option

Vitaliy Filippov 2020-06-22 23:44:32 +03:00
parent 9abaf5b735
commit 62343c8022
6 changed files with 51 additions and 21 deletions

View File

@ -46,6 +46,8 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
msgr.stop_client(op->peer_fd);
delete op;
};
msgr.use_sync_send_recv = config["use_sync_send_recv"].bool_value() ||
config["use_sync_send_recv"].uint64_value();
st_cli.tfd = tfd;
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };

View File

@ -3,17 +3,17 @@
// Random write:
//
// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -fsync=16 -iodepth=16 -rw=randwrite \
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -inode=1 -size=1000M
//
// Linear write:
//
// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=128k -direct=1 -fsync=32 -iodepth=32 -rw=write \
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -inode=1 -size=1000M
//
// Random read (run with -iodepth=32 or -iodepth=1):
//
// fio -thread -ioengine=./libfio_cluster.so -name=test -bs=4k -direct=1 -iodepth=32 -rw=randread \
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -size=1000M
// -etcd=127.0.0.1:2379 [-etcd_prefix=/microceph] -inode=1 -size=1000M
#include <sys/types.h>
#include <sys/socket.h>

View File

@ -169,7 +169,10 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
if (cl.read_ready == 1)
{
read_ready_clients.push_back(cl.peer_fd);
ringloop->wakeup();
if (ringloop)
ringloop->wakeup();
else
read_requests();
}
}
}

View File

@ -233,6 +233,7 @@ struct osd_messenger_t
int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL;
int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
int log_level = 0;
bool use_sync_send_recv = false;
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers;
std::map<uint64_t, int> osd_peer_fds;

View File

@ -6,13 +6,6 @@ void osd_messenger_t::read_requests()
{
int peer_fd = read_ready_clients[i];
auto & cl = clients[peer_fd];
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (cl.read_remaining < receive_buffer_size)
{
cl.read_iov.iov_base = cl.in_buf;
@ -27,8 +20,27 @@ void osd_messenger_t::read_requests()
cl.read_msg.msg_iov = cl.recv_list.get_iovec();
cl.read_msg.msg_iovlen = cl.recv_list.get_size();
}
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
}
else
{
int result = recvmsg(peer_fd, &cl.read_msg, 0);
if (result < 0)
{
result = -errno;
}
handle_read(result, peer_fd);
}
}
read_ready_clients.clear();
}

View File

@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
bool osd_messenger_t::try_send(osd_client_t & cl)
{
int peer_fd = cl.peer_fd;
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.write_op)
{
// pick next command
@ -103,8 +97,26 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
}
cl.write_msg.msg_iov = cl.send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.send_list.get_size();
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
if (ringloop && !use_sync_send_recv)
{
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
}
else
{
int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
if (result < 0)
{
result = -errno;
}
handle_send(result, peer_fd);
}
return true;
}