Compare commits

...

3 Commits

Author SHA1 Message Date
Vitaliy Filippov 85010fed38 Use a separate thread for epoll
Simplest, but absolutely inefficient, way to test openonload epoll
2020-06-09 00:52:43 +03:00
Vitaliy Filippov 2498e504c2 Add ringloop back to timerfd 2020-06-09 00:52:43 +03:00
Vitaliy Filippov d56633843f Replace io_uring sendmsg/recvmsg with synchronous sendmsg/recvmsg 2020-06-09 00:52:29 +03:00
7 changed files with 140 additions and 76 deletions

View File

@ -18,7 +18,7 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \ osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
osd_rmw.o json11.o base64.o timerfd_manager.o osd_rmw.o json11.o base64.o timerfd_manager.o
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lpthread
stub_osd: stub_osd.o rw_blocking.o stub_osd: stub_osd.o rw_blocking.o
g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal
@ -87,7 +87,7 @@ dump_journal.o: dump_journal.cpp allocator.h blockstore.h blockstore_flush.h blo
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h epoll_manager.o: epoll_manager.cpp epoll_manager.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h timerfd_manager.h etcd_state_client.o: etcd_state_client.cpp base64.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd_id.h osd_ops.h pg_states.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h fio_cluster.o: fio_cluster.cpp cluster_client.h epoll_manager.h etcd_state_client.h fio/fio.h fio/optgroup.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
@ -95,7 +95,7 @@ fio_engine.o: fio_engine.cpp blockstore.h fio/fio.h fio/optgroup.h json11/json11
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd_ops.h rw_blocking.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
http_client.o: http_client.cpp http_client.h json11/json11.hpp timerfd_manager.h http_client.o: http_client.cpp http_client.h json11/json11.hpp ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
@ -149,5 +149,5 @@ test_blockstore.o: test_blockstore.cpp blockstore.h object_id.h ringloop.h timer
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<
timerfd_manager.o: timerfd_manager.cpp timerfd_manager.h timerfd_manager.o: timerfd_manager.cpp ringloop.h timerfd_manager.h
g++ $(CXXFLAGS) -c -o $@ $< g++ $(CXXFLAGS) -c -o $@ $<

View File

@ -2,17 +2,10 @@
void osd_messenger_t::read_requests() void osd_messenger_t::read_requests()
{ {
for (int i = 0; i < read_ready_clients.size(); i++) while (read_ready_clients.size() > 0)
{ {
int peer_fd = read_ready_clients[i]; int peer_fd = read_ready_clients[0];
auto & cl = clients[peer_fd]; auto & cl = clients[peer_fd];
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.read_op || cl.read_remaining < receive_buffer_size) if (!cl.read_op || cl.read_remaining < receive_buffer_size)
{ {
cl.read_iov.iov_base = cl.in_buf; cl.read_iov.iov_base = cl.in_buf;
@ -25,10 +18,14 @@ void osd_messenger_t::read_requests()
} }
cl.read_msg.msg_iov = &cl.read_iov; cl.read_msg.msg_iov = &cl.read_iov;
cl.read_msg.msg_iovlen = 1; cl.read_msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); }; read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + 1);
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); int result = recvmsg(peer_fd, &cl.read_msg, 0);
if (result < 0)
{
result = -errno;
}
handle_read(result, peer_fd);
} }
read_ready_clients.clear();
} }
bool osd_messenger_t::handle_read(int result, int peer_fd) bool osd_messenger_t::handle_read(int result, int peer_fd)

View File

@ -42,12 +42,6 @@ void osd_messenger_t::outbox_push(osd_op_t *cur_op)
bool osd_messenger_t::try_send(osd_client_t & cl) bool osd_messenger_t::try_send(osd_client_t & cl)
{ {
int peer_fd = cl.peer_fd; int peer_fd = cl.peer_fd;
io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
return false;
}
ring_data_t* data = ((ring_data_t*)sqe->user_data);
if (!cl.write_op) if (!cl.write_op)
{ {
// pick next command // pick next command
@ -84,23 +78,21 @@ bool osd_messenger_t::try_send(osd_client_t & cl)
} }
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec(); cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size(); cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); }; int result = sendmsg(peer_fd, &cl.write_msg, MSG_NOSIGNAL);
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0); if (result < 0)
result = -errno;
handle_send(result, peer_fd);
return true; return true;
} }
void osd_messenger_t::send_replies() void osd_messenger_t::send_replies()
{ {
for (int i = 0; i < write_ready_clients.size(); i++) while (write_ready_clients.size() > 0)
{ {
int peer_fd = write_ready_clients[i]; auto & cl = clients[write_ready_clients[0]];
if (!try_send(clients[peer_fd])) write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + 1);
{ try_send(cl);
write_ready_clients.erase(write_ready_clients.begin(), write_ready_clients.begin() + i);
return;
}
} }
write_ready_clients.clear();
} }
void osd_messenger_t::handle_send(int result, int peer_fd) void osd_messenger_t::handle_send(int result, int peer_fd)

95
osd.cpp
View File

@ -1,5 +1,6 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/poll.h> #include <sys/poll.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
@ -43,8 +44,14 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
{ {
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
} }
event_fd = eventfd(0, EFD_NONBLOCK);
if (event_fd < 0)
{
throw std::runtime_error(std::string("eventfd: ") + strerror(errno));
}
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); }); this->tfd = new timerfd_manager_t(ringloop);
this->tfd->set_fd_handler = [this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); };
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id) this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{ {
print_stats(); print_stats();
@ -59,17 +66,40 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
consumer.loop = [this]() { loop(); }; consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer); ringloop->register_consumer(&consumer);
epoll_thread = new std::thread([this]()
{
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
while (1)
{
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, -1);
{
std::lock_guard<std::mutex> guard(epoll_mutex);
for (int i = 0; i < nfds; i++)
{
int fd = events[i].data.fd;
int ev = events[i].events;
epoll_ready[fd] |= ev;
}
uint64_t n = 1;
write(event_fd, &n, 8);
}
}
});
} }
osd_t::~osd_t() osd_t::~osd_t()
{ {
close(epoll_fd);
epoll_thread->join();
delete epoll_thread;
if (tfd) if (tfd)
{ {
delete tfd; delete tfd;
tfd = NULL; tfd = NULL;
} }
ringloop->unregister_consumer(&consumer); ringloop->unregister_consumer(&consumer);
close(epoll_fd); close(event_fd);
close(listen_fd); close(listen_fd);
} }
@ -188,8 +218,13 @@ void osd_t::bind_socket()
{ {
close(listen_fd); close(listen_fd);
close(epoll_fd); close(epoll_fd);
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_ctl (add listen_fd): ") + strerror(errno));
} }
epoll_handlers[listen_fd] = [this](int peer_fd, int epoll_events)
{
c_cli.accept_connections(listen_fd);
};
} }
bool osd_t::shutdown() bool osd_t::shutdown()
@ -204,10 +239,23 @@ bool osd_t::shutdown()
void osd_t::loop() void osd_t::loop()
{ {
if (!wait_state) std::map<int,int> cur_epoll;
{ {
handle_epoll_events(); std::lock_guard<std::mutex> guard(epoll_mutex);
wait_state = 1; cur_epoll.swap(epoll_ready);
}
for (auto p: cur_epoll)
{
auto cb_it = epoll_handlers.find(p.first);
if (cb_it != epoll_handlers.end())
{
cb_it->second(p.first, p.second);
}
}
if (!(wait_state & 2))
{
handle_eventfd();
wait_state = wait_state | 2;
} }
handle_peers(); handle_peers();
c_cli.read_requests(); c_cli.read_requests();
@ -225,7 +273,7 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{ {
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string(exists ? "epoll_ctl (mod fd): " : "epoll_ctl (add fd): ") + strerror(errno));
} }
epoll_handlers[fd] = handler; epoll_handlers[fd] = handler;
} }
@ -233,49 +281,36 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
{ {
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT) if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
{ {
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_ctl (remove fd): ") + strerror(errno));
} }
epoll_handlers.erase(fd); epoll_handlers.erase(fd);
} }
} }
void osd_t::handle_epoll_events() void osd_t::handle_eventfd()
{ {
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); throw std::runtime_error("can't get SQE, will fall out of sync with eventfd");
} }
ring_data_t *data = ((ring_data_t*)sqe->user_data); ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); my_uring_prep_poll_add(sqe, event_fd, POLLIN);
data->callback = [this](ring_data_t *data) data->callback = [this](ring_data_t *data)
{ {
if (data->res < 0) if (data->res < 0)
{ {
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
} }
handle_epoll_events(); handle_eventfd();
}; };
ringloop->submit(); ringloop->submit();
int nfds; uint64_t n = 0;
epoll_event events[MAX_EPOLL_EVENTS]; size_t res = read(event_fd, &n, 8);
restart: if (res == 8)
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
for (int i = 0; i < nfds; i++)
{ {
if (events[i].data.fd == listen_fd) // No need to do anything, the loop has already woken up
{ ringloop->wakeup();
c_cli.accept_connections(listen_fd);
}
else
{
auto & cb = epoll_handlers[events[i].data.fd];
cb(events[i].data.fd, events[i].events);
}
}
if (nfds == MAX_EPOLL_EVENTS)
{
goto restart;
} }
} }

8
osd.h
View File

@ -12,6 +12,8 @@
#include <set> #include <set>
#include <deque> #include <deque>
#include <mutex>
#include <thread>
#include "blockstore.h" #include "blockstore.h"
#include "ringloop.h" #include "ringloop.h"
@ -114,6 +116,10 @@ class osd_t
int wait_state = 0; int wait_state = 0;
int epoll_fd = 0; int epoll_fd = 0;
int event_fd = 0;
std::thread *epoll_thread = NULL;
std::mutex epoll_mutex;
std::map<int, int> epoll_ready;
int listening_port = 0; int listening_port = 0;
int listen_fd = 0; int listen_fd = 0;
ring_consumer_t consumer; ring_consumer_t consumer;
@ -150,7 +156,7 @@ class osd_t
// event loop, socket read/write // event loop, socket read/write
void loop(); void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler); void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_epoll_events(); void handle_eventfd();
// peer handling (primary OSD logic) // peer handling (primary OSD logic)
void parse_test_peer(std::string peer); void parse_test_peer(std::string peer);

View File

@ -1,29 +1,24 @@
#include <sys/timerfd.h> #include <sys/timerfd.h>
#include <sys/poll.h> #include <sys/poll.h>
#include <sys/epoll.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include <string.h>
#include "timerfd_manager.h" #include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler) timerfd_manager_t::timerfd_manager_t(ring_loop_t *ringloop)
{ {
this->set_fd_handler = set_fd_handler;
wait_state = 0; wait_state = 0;
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (timerfd < 0) if (timerfd < 0)
{ {
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno)); throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
} }
set_fd_handler(timerfd, [this](int fd, int events) consumer.loop = [this]() { loop(); };
{ ringloop->register_consumer(&consumer);
handle_readable(); this->ringloop = ringloop;
});
} }
timerfd_manager_t::~timerfd_manager_t() timerfd_manager_t::~timerfd_manager_t()
{ {
set_fd_handler(timerfd, NULL); ringloop->unregister_consumer(&consumer);
close(timerfd); close(timerfd);
} }
@ -53,6 +48,7 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function<voi
}); });
inc_timer(timers[timers.size()-1]); inc_timer(timers[timers.size()-1]);
set_nearest(); set_nearest();
set_wait();
return timer_id; return timer_id;
} }
@ -73,6 +69,7 @@ void timerfd_manager_t::clear_timer(int timer_id)
nearest--; nearest--;
} }
set_nearest(); set_nearest();
set_wait();
break; break;
} }
} }
@ -157,3 +154,36 @@ void timerfd_manager_t::trigger_nearest()
cb(nearest_id); cb(nearest_id);
nearest = -1; nearest = -1;
} }
void timerfd_manager_t::loop()
{
if (!(wait_state & 1) && timers.size())
{
set_nearest();
}
set_wait();
}
void timerfd_manager_t::set_wait()
{
if ((wait_state & 3) == 1)
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
return;
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, timerfd, POLLIN);
data->callback = [this](ring_data_t *data)
{
if (data->res < 0)
{
throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res));
}
handle_readable();
set_wait();
};
wait_state = 3;
}
}

View File

@ -1,8 +1,7 @@
#pragma once #pragma once
#include <time.h> #include <time.h>
#include <vector> #include "ringloop.h"
#include <functional>
struct timerfd_timer_t struct timerfd_timer_t
{ {
@ -20,15 +19,20 @@ class timerfd_manager_t
int nearest = -1; int nearest = -1;
int id = 1; int id = 1;
std::vector<timerfd_timer_t> timers; std::vector<timerfd_timer_t> timers;
ring_loop_t *ringloop;
ring_consumer_t consumer;
void inc_timer(timerfd_timer_t & t); void inc_timer(timerfd_timer_t & t);
void set_nearest(); void set_nearest();
void trigger_nearest(); void trigger_nearest();
void handle_readable(); void handle_readable();
void set_wait();
void loop();
public: public:
// FIXME shouldn't be here
std::function<void(int, std::function<void(int, int)>)> set_fd_handler; std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler); timerfd_manager_t(ring_loop_t *ringloop);
~timerfd_manager_t(); ~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback); int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
void clear_timer(int timer_id); void clear_timer(int timer_id);