Use a separate thread for epoll

Simplest, but absolutely inefficient, way to test openonload epoll
openonload
Vitaliy Filippov 2020-05-30 10:24:50 +03:00
parent 2498e504c2
commit 85010fed38
3 changed files with 71 additions and 31 deletions

View File

@ -18,7 +18,7 @@ OSD_OBJS := osd.o osd_secondary.o msgr_receive.o msgr_send.o osd_peering.o osd_f
osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \ osd_primary.o osd_primary_subops.o etcd_state_client.o messenger.o osd_cluster.o http_client.o pg_states.o \
osd_rmw.o json11.o base64.o timerfd_manager.o osd_rmw.o json11.o base64.o timerfd_manager.o
osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS) osd: ./libblockstore.so osd_main.cpp osd.h osd_ops.h $(OSD_OBJS)
g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring g++ $(CXXFLAGS) -o $@ osd_main.cpp $(OSD_OBJS) ./libblockstore.so -ltcmalloc_minimal -luring -lpthread
stub_osd: stub_osd.o rw_blocking.o stub_osd: stub_osd.o rw_blocking.o
g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal g++ $(CXXFLAGS) -o $@ stub_osd.o rw_blocking.o -ltcmalloc_minimal

92
osd.cpp
View File

@ -1,5 +1,6 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/poll.h> #include <sys/poll.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
@ -43,6 +44,11 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
{ {
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
} }
event_fd = eventfd(0, EFD_NONBLOCK);
if (event_fd < 0)
{
throw std::runtime_error(std::string("eventfd: ") + strerror(errno));
}
this->tfd = new timerfd_manager_t(ringloop); this->tfd = new timerfd_manager_t(ringloop);
this->tfd->set_fd_handler = [this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); }; this->tfd->set_fd_handler = [this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); };
@ -60,17 +66,40 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
consumer.loop = [this]() { loop(); }; consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer); ringloop->register_consumer(&consumer);
epoll_thread = new std::thread([this]()
{
int nfds;
epoll_event events[MAX_EPOLL_EVENTS];
while (1)
{
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, -1);
{
std::lock_guard<std::mutex> guard(epoll_mutex);
for (int i = 0; i < nfds; i++)
{
int fd = events[i].data.fd;
int ev = events[i].events;
epoll_ready[fd] |= ev;
}
uint64_t n = 1;
write(event_fd, &n, 8);
}
}
});
} }
osd_t::~osd_t() osd_t::~osd_t()
{ {
close(epoll_fd);
epoll_thread->join();
delete epoll_thread;
if (tfd) if (tfd)
{ {
delete tfd; delete tfd;
tfd = NULL; tfd = NULL;
} }
ringloop->unregister_consumer(&consumer); ringloop->unregister_consumer(&consumer);
close(epoll_fd); close(event_fd);
close(listen_fd); close(listen_fd);
} }
@ -189,8 +218,13 @@ void osd_t::bind_socket()
{ {
close(listen_fd); close(listen_fd);
close(epoll_fd); close(epoll_fd);
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_ctl (add listen_fd): ") + strerror(errno));
} }
epoll_handlers[listen_fd] = [this](int peer_fd, int epoll_events)
{
c_cli.accept_connections(listen_fd);
};
} }
bool osd_t::shutdown() bool osd_t::shutdown()
@ -205,10 +239,23 @@ bool osd_t::shutdown()
void osd_t::loop() void osd_t::loop()
{ {
if (!wait_state) std::map<int,int> cur_epoll;
{ {
handle_epoll_events(); std::lock_guard<std::mutex> guard(epoll_mutex);
wait_state = 1; cur_epoll.swap(epoll_ready);
}
for (auto p: cur_epoll)
{
auto cb_it = epoll_handlers.find(p.first);
if (cb_it != epoll_handlers.end())
{
cb_it->second(p.first, p.second);
}
}
if (!(wait_state & 2))
{
handle_eventfd();
wait_state = wait_state | 2;
} }
handle_peers(); handle_peers();
c_cli.read_requests(); c_cli.read_requests();
@ -226,7 +273,7 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0)
{ {
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string(exists ? "epoll_ctl (mod fd): " : "epoll_ctl (add fd): ") + strerror(errno));
} }
epoll_handlers[fd] = handler; epoll_handlers[fd] = handler;
} }
@ -234,49 +281,36 @@ void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
{ {
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT) if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0 && errno != ENOENT)
{ {
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); throw std::runtime_error(std::string("epoll_ctl (remove fd): ") + strerror(errno));
} }
epoll_handlers.erase(fd); epoll_handlers.erase(fd);
} }
} }
void osd_t::handle_epoll_events() void osd_t::handle_eventfd()
{ {
io_uring_sqe *sqe = ringloop->get_sqe(); io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe) if (!sqe)
{ {
throw std::runtime_error("can't get SQE, will fall out of sync with EPOLLET"); throw std::runtime_error("can't get SQE, will fall out of sync with eventfd");
} }
ring_data_t *data = ((ring_data_t*)sqe->user_data); ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, epoll_fd, POLLIN); my_uring_prep_poll_add(sqe, event_fd, POLLIN);
data->callback = [this](ring_data_t *data) data->callback = [this](ring_data_t *data)
{ {
if (data->res < 0) if (data->res < 0)
{ {
throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res)); throw std::runtime_error(std::string("epoll failed: ") + strerror(-data->res));
} }
handle_epoll_events(); handle_eventfd();
}; };
ringloop->submit(); ringloop->submit();
int nfds; uint64_t n = 0;
epoll_event events[MAX_EPOLL_EVENTS]; size_t res = read(event_fd, &n, 8);
restart: if (res == 8)
nfds = epoll_wait(epoll_fd, events, MAX_EPOLL_EVENTS, 0);
for (int i = 0; i < nfds; i++)
{ {
if (events[i].data.fd == listen_fd) // No need to do anything, the loop has already woken up
{ ringloop->wakeup();
c_cli.accept_connections(listen_fd);
}
else
{
auto & cb = epoll_handlers[events[i].data.fd];
cb(events[i].data.fd, events[i].events);
}
}
if (nfds == MAX_EPOLL_EVENTS)
{
goto restart;
} }
} }

8
osd.h
View File

@ -12,6 +12,8 @@
#include <set> #include <set>
#include <deque> #include <deque>
#include <mutex>
#include <thread>
#include "blockstore.h" #include "blockstore.h"
#include "ringloop.h" #include "ringloop.h"
@ -114,6 +116,10 @@ class osd_t
int wait_state = 0; int wait_state = 0;
int epoll_fd = 0; int epoll_fd = 0;
int event_fd = 0;
std::thread *epoll_thread = NULL;
std::mutex epoll_mutex;
std::map<int, int> epoll_ready;
int listening_port = 0; int listening_port = 0;
int listen_fd = 0; int listen_fd = 0;
ring_consumer_t consumer; ring_consumer_t consumer;
@ -150,7 +156,7 @@ class osd_t
// event loop, socket read/write // event loop, socket read/write
void loop(); void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler); void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_epoll_events(); void handle_eventfd();
// peer handling (primary OSD logic) // peer handling (primary OSD logic)
void parse_test_peer(std::string peer); void parse_test_peer(std::string peer);