Remove io_uring usage from osd_http and timerfd_manager

For better future interoperability with external event loops such as QEMU's one
trace-sqes
Vitaliy Filippov 2020-05-21 01:23:43 +03:00
parent f57731f8ca
commit a61ede9951
5 changed files with 122 additions and 179 deletions

22
osd.cpp
View File

@ -43,7 +43,7 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo
throw std::runtime_error(std::string("epoll_create: ") + strerror(errno));
}
this->tfd = new timerfd_manager_t(ringloop);
this->tfd = new timerfd_manager_t([this](int fd, std::function<void(int, int)> handler) { set_fd_handler(fd, handler); });
this->tfd->set_timer(print_stats_interval*1000, true, [this](int timer_id)
{
print_stats();
@ -236,6 +236,26 @@ void osd_t::loop()
ringloop->submit();
}
void osd_t::set_fd_handler(int fd, std::function<void(int, int)> handler)
{
if (handler != NULL)
{
epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
epoll_handlers[fd] = handler;
}
else
{
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
epoll_handlers.erase(fd);
}
}
void osd_t::handle_epoll_events()
{
io_uring_sqe *sqe = ringloop->get_sqe();

1
osd.h
View File

@ -327,6 +327,7 @@ class osd_t
// event loop, socket read/write
void loop();
void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_epoll_events();
void read_requests();
void handle_read(ring_data_t *data, int peer_fd);

View File

@ -21,10 +21,7 @@ static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
// FIXME: Use keepalive
struct http_co_t
{
ring_loop_t *ringloop;
timerfd_manager_t *tfd;
int epoll_fd;
std::map<int, std::function<void(int, int)>> *epoll_handlers;
int request_timeout = 0;
std::string host;
@ -40,12 +37,10 @@ struct http_co_t
int peer_fd = -1;
int timeout_id = -1;
int epoll_events = 0;
ring_data_t *send_data = NULL, *read_data = NULL;
int sent = 0;
std::vector<char> rbuf;
iovec read_iov, send_iov;
msghdr read_msg = { 0 }, send_msg = { 0 };
int waiting_read_sqe = 0, waiting_send_sqe = 0;
std::function<void(const http_response_t*)> callback;
@ -74,9 +69,6 @@ void osd_t::http_request(const std::string & host, const std::string & request,
const http_options_t & options, std::function<void(const http_response_t *response)> callback)
{
http_co_t *handler = new http_co_t();
handler->ringloop = ringloop;
handler->epoll_fd = epoll_fd;
handler->epoll_handlers = &epoll_handlers;
handler->request_timeout = options.timeout < 0 ? 0 : (options.timeout == 0 ? DEFAULT_TIMEOUT : options.timeout);
handler->want_streaming = options.want_streaming;
handler->tfd = tfd;
@ -124,9 +116,6 @@ websocket_t* osd_t::open_websocket(const std::string & host, const std::string &
"Sec-WebSocket-Version: 13\r\n"
"\r\n";
http_co_t *handler = new http_co_t();
handler->ringloop = ringloop;
handler->epoll_fd = epoll_fd;
handler->epoll_handlers = &epoll_handlers;
handler->request_timeout = timeout < 0 ? -1 : (timeout == 0 ? DEFAULT_TIMEOUT : timeout);
handler->want_streaming = false;
handler->tfd = tfd;
@ -155,28 +144,9 @@ http_co_t::~http_co_t()
tfd->clear_timer(timeout_id);
timeout_id = -1;
}
if (read_data)
{
// Ignore CQE result
read_data->callback = [](ring_data_t *data) {};
}
else if (waiting_read_sqe)
{
ringloop->cancel_wait_sqe(waiting_read_sqe);
}
if (send_data)
{
// Ignore CQE result
send_data->callback = [](ring_data_t *data) {};
}
else if (waiting_send_sqe)
{
ringloop->cancel_wait_sqe(waiting_send_sqe);
}
if (peer_fd >= 0)
{
epoll_handlers->erase(peer_fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL);
tfd->set_fd_handler(peer_fd, NULL);
close(peer_fd);
peer_fd = -1;
}
@ -231,7 +201,7 @@ void http_co_t::start_connection()
delete this;
});
}
(*epoll_handlers)[peer_fd] = [this](int peer_fd, int epoll_events)
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
{
this->epoll_events |= epoll_events;
if (state == HTTP_CO_CONNECTING)
@ -249,17 +219,7 @@ void http_co_t::start_connection()
delete this;
}
}
};
// Add FD to epoll (EPOLLOUT for tracking connect() result)
epoll_event ev;
ev.data.fd = peer_fd;
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
{
parsed.error_code = errno;
delete this;
return;
}
});
epoll_events = 0;
// Finally call connect
r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
@ -301,96 +261,83 @@ void http_co_t::handle_connect_result()
void http_co_t::submit_read()
{
if (!read_data && !waiting_read_sqe)
int res;
again:
if (rbuf.size() != READ_BUFFER_SIZE)
{
if (rbuf.size() != READ_BUFFER_SIZE)
{
rbuf.resize(READ_BUFFER_SIZE);
}
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
waiting_read_sqe = ringloop->wait_sqe([this]() { waiting_read_sqe = 0; submit_read(); });
return;
}
read_data = ((ring_data_t*)sqe->user_data);
read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE };
read_msg.msg_iov = &read_iov;
read_msg.msg_iovlen = 1;
epoll_events = epoll_events & ~EPOLLIN;
read_data->callback = [this](ring_data_t *data)
{
read_data = NULL;
if (data->res == -EAGAIN)
{
data->res = 0;
}
if (data->res < 0)
{
delete this;
return;
}
response += std::string(rbuf.data(), data->res);
if (data->res == READ_BUFFER_SIZE)
{
submit_read();
}
if (!handle_read())
{
return;
}
if (data->res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR)))
{
delete this;
return;
}
};
my_uring_prep_recvmsg(sqe, peer_fd, &read_msg, 0);
rbuf.resize(READ_BUFFER_SIZE);
}
read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE };
read_msg.msg_iov = &read_iov;
read_msg.msg_iovlen = 1;
epoll_events = epoll_events & ~EPOLLIN;
res = recvmsg(peer_fd, &read_msg, 0);
if (res < 0)
{
res = -errno;
}
if (res == -EAGAIN)
{
res = 0;
}
if (res < 0)
{
delete this;
return;
}
response += std::string(rbuf.data(), res);
if (res == READ_BUFFER_SIZE)
{
goto again;
}
if (!handle_read())
{
return;
}
if (res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR)))
{
delete this;
return;
}
}
void http_co_t::submit_send()
{
if (sent < request.size() && !send_data && !waiting_send_sqe)
int res;
again:
if (sent < request.size())
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
{
waiting_send_sqe = ringloop->wait_sqe([this]() { waiting_send_sqe = 0; submit_send(); });
return;
}
send_data = ((ring_data_t*)sqe->user_data);
send_iov = (iovec){ .iov_base = (void*)(request.c_str()+sent), .iov_len = request.size()-sent };
send_msg.msg_iov = &send_iov;
send_msg.msg_iovlen = 1;
send_data->callback = [this](ring_data_t *data)
res = sendmsg(peer_fd, &send_msg, 0);
if (res < 0)
{
send_data = NULL;
if (data->res == -EAGAIN)
{
data->res = 0;
}
else if (data->res < 0)
{
delete this;
return;
}
sent += data->res;
if (state == HTTP_CO_SENDING_REQUEST)
{
if (sent >= request.size())
state = HTTP_CO_REQUEST_SENT;
else
submit_send();
}
else if (state == HTTP_CO_WEBSOCKET)
{
request = request.substr(sent);
sent = 0;
submit_send();
}
};
my_uring_prep_sendmsg(sqe, peer_fd, &send_msg, 0);
res = -errno;
}
if (res == -EAGAIN)
{
res = 0;
}
else if (res < 0)
{
delete this;
return;
}
sent += res;
if (state == HTTP_CO_SENDING_REQUEST)
{
if (sent >= request.size())
state = HTTP_CO_REQUEST_SENT;
else
goto again;
}
else if (state == HTTP_CO_WEBSOCKET)
{
request = request.substr(sent);
sent = 0;
goto again;
}
}
}

View File

@ -1,24 +1,29 @@
#include <sys/timerfd.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include "timerfd_manager.h"
timerfd_manager_t::timerfd_manager_t(ring_loop_t *ringloop)
timerfd_manager_t::timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler)
{
this->set_fd_handler = set_fd_handler;
wait_state = 0;
timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (timerfd < 0)
{
throw std::runtime_error(std::string("timerfd_create: ") + strerror(errno));
}
consumer.loop = [this]() { loop(); };
ringloop->register_consumer(&consumer);
this->ringloop = ringloop;
set_fd_handler(timerfd, [this](int fd, int events)
{
handle_readable();
});
}
timerfd_manager_t::~timerfd_manager_t()
{
ringloop->unregister_consumer(&consumer);
set_fd_handler(timerfd, NULL);
close(timerfd);
}
@ -48,7 +53,6 @@ int timerfd_manager_t::set_timer(uint64_t millis, bool repeat, std::function<voi
});
inc_timer(timers[timers.size()-1]);
set_nearest();
set_wait();
return timer_id;
}
@ -69,7 +73,6 @@ void timerfd_manager_t::clear_timer(int timer_id)
nearest--;
}
set_nearest();
set_wait();
break;
}
}
@ -120,53 +123,25 @@ void timerfd_manager_t::set_nearest()
}
}
void timerfd_manager_t::loop()
void timerfd_manager_t::handle_readable()
{
if (!(wait_state & 1) && timers.size())
uint64_t n;
size_t res = read(timerfd, &n, 8);
if (res == 8 && nearest >= 0)
{
set_nearest();
}
set_wait();
}
void timerfd_manager_t::set_wait()
{
if ((wait_state & 3) == 1)
{
io_uring_sqe *sqe = ringloop->get_sqe();
if (!sqe)
int nearest_id = timers[nearest].id;
auto cb = timers[nearest].callback;
if (timers[nearest].repeat)
{
return;
inc_timer(timers[nearest]);
}
ring_data_t *data = ((ring_data_t*)sqe->user_data);
my_uring_prep_poll_add(sqe, timerfd, POLLIN);
data->callback = [this](ring_data_t *data)
else
{
if (data->res < 0)
{
throw std::runtime_error(std::string("waiting for timer failed: ") + strerror(-data->res));
}
uint64_t n;
read(timerfd, &n, 8);
if (nearest >= 0)
{
int nearest_id = timers[nearest].id;
auto cb = timers[nearest].callback;
if (timers[nearest].repeat)
{
inc_timer(timers[nearest]);
}
else
{
timers.erase(timers.begin()+nearest, timers.begin()+nearest+1);
}
cb(nearest_id);
nearest = -1;
}
wait_state = 0;
set_nearest();
set_wait();
};
wait_state = 3;
timers.erase(timers.begin()+nearest, timers.begin()+nearest+1);
}
cb(nearest_id);
nearest = -1;
}
wait_state = 0;
set_nearest();
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <time.h>
#include "ringloop.h"
#include <vector>
#include <functional>
struct timerfd_timer_t
{
@ -19,16 +20,15 @@ class timerfd_manager_t
int nearest = -1;
int id = 1;
std::vector<timerfd_timer_t> timers;
ring_loop_t *ringloop;
ring_consumer_t consumer;
void inc_timer(timerfd_timer_t & t);
void set_nearest();
void set_wait();
void loop();
public:
timerfd_manager_t(ring_loop_t *ringloop);
std::function<void(int, std::function<void(int, int)>)> set_fd_handler;
timerfd_manager_t(std::function<void(int, std::function<void(int, int)>)> set_fd_handler);
~timerfd_manager_t();
int set_timer(uint64_t millis, bool repeat, std::function<void(int)> callback);
void clear_timer(int timer_id);
void handle_readable();
};