Fix "address already in use" in test example, begin client read loop

blocking-uring-test
Vitaliy Filippov 2019-12-13 14:05:11 +03:00
parent 6239f6748a
commit 283d03ef18
3 changed files with 103 additions and 26 deletions

112
osd.cpp
View File

@ -12,6 +12,12 @@ struct osd_client_t
socklen_t peer_addr_size;
int peer_fd;
bool ready;
bool reading;
iovec iov;
msghdr msg;
void *cur_buf = NULL;
int cur_len = 0;
};
class osd_t
@ -44,6 +50,8 @@ osd_t::osd_t(ring_loop_t *ringloop)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr;
if ((int r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
@ -52,7 +60,7 @@ osd_t::osd_t(ring_loop_t *ringloop)
throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = bind_port;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
@ -142,6 +150,7 @@ int osd_t::handle_epoll_events()
int peer_fd;
while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0)
{
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
clients[peer_fd] = {
.peer_addr = addr,
.peer_addr_size = peer_addr_size,
@ -170,32 +179,14 @@ int osd_t::handle_epoll_events()
if (events[i].events & EPOLLHUP)
{
// Stop client
struct epoll_event ev;
ev.data.fd = cl.peer_fd;
ev.events = EPOLLIN | EPOLLHUP;
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, cl.peer_fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
auto it = clients.find(cl.peer_fd);
clients.erase(it);
if (cl.ready)
{
for (auto it = ready_clients.begin(); it != ready_clients.end(); it++)
{
if (*it == cl.peer_fd)
{
ready_clients.erase(it);
break;
}
}
}
stop_client(cl.peer_fd);
}
else if (!cl.ready)
{
// Mark client as ready (i.e. some commands are available)
// Mark client as ready (i.e. some data is available)
cl.ready = true;
ready_clients.push_back(cl.peer_fd);
if (!cl.reading)
ready_clients.push_back(cl.peer_fd);
}
}
count++;
@ -203,3 +194,78 @@ int osd_t::handle_epoll_events()
}
return count;
}
void osd_t::stop_client(int peer_fd)
{
struct epoll_event ev;
ev.data.fd = peer_fd;
ev.events = EPOLLIN | EPOLLHUP;
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
auto it = clients.find(peer_fd);
clients.erase(it);
if (cl.ready)
{
for (auto it = ready_clients.begin(); it != ready_clients.end(); it++)
{
if (*it == peer_fd)
{
ready_clients.erase(it);
break;
}
}
}
close(peer_fd);
}
void osd_t::read_commands()
{
for (int i = 0; i < ready_clients.size(); i++)
{
int peer_fd = ready_clients[i];
auto & cl = clients[peer_fd];
if (!cl.cur_buf)
{
// no reads in progress, this is probably a new command
cl.cur_buf = cl.command_buffer;
cl.cur_len = OSD_OP_PACKET_SIZE;
}
struct io_uring_sqe* sqe = ringloop->get_sqe();
if (!sqe)
{
ready_clients.erase(ready_clients.begin(), ready_clients().begin() + i);
return;
}
struct ring_data_t* data = ((ring_data_t*)sqe->user_data);
cl.iov.iov_base = cl.cur_buf;
cl.iov.iov_len = cl.cur_len;
cl.msg.msg_iov = &cl.iov;
cl.msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.msg, 0);
ringloop->submit();
cl.reading = true;
}
ready_clients.clear();
}
void osd_t::handle_read(ring_data_t *data, int peer_fd)
{
auto cl = clients.find(peer_fd);
if (cl != clients.end())
{
if (data->res < 0 && data->res != -EAGAIN)
{
// this is a client socket, so don't panic. just disconnect it
printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res));
stop_client(peer_fd);
return;
}
cl->reading = false;
if (cl->ready)
ready_clients.push_back(peer_fd);
}
}

View File

@ -127,6 +127,7 @@ public:
~ring_loop_t();
inline struct io_uring_sqe* get_sqe()
{
// FIXME: Limit inflight ops count to not overflow the completion ring
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
if (sqe)
{

View File

@ -239,12 +239,19 @@ int main02(int argc, char *argv[])
int main(int argc, char *argv[])
{
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
int listen_fd = socket(AF_INET, SOCK_STREAM, 0), enable = 1;
assert(listen_fd >= 0);
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
struct sockaddr_in bind_addr;
assert(inet_pton(AF_INET, "0.0.0.0", &bind_addr.sin_addr) == 1);
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = 13892;
assert(bind(listen_fd, (sockaddr*)&bind_addr, sizeof(bind_addr)) == 0);
bind_addr.sin_port = htons(13892);
int r = bind(listen_fd, (sockaddr*)&bind_addr, sizeof(bind_addr));
if (r)
{
perror("bind");
return 1;
}
assert(listen(listen_fd, 128) == 0);
struct sockaddr_in peer_addr;
socklen_t peer_addr_size = sizeof(peer_addr);
@ -271,6 +278,9 @@ int main(int argc, char *argv[])
printf("cqe result: %d\n", ret);
// ok, io_uring's sendmsg always reads as much data as is available and finishes
io_uring_cqe_seen(&ring, cqe);
close(peer_fd);
close(listen_fd);
io_uring_queue_exit(&ring);
return 0;
}