Slightly cleanup socket send/receive code

sync-io-test
Vitaliy Filippov 2020-05-31 15:03:27 +03:00
parent b466e215f0
commit 5feff1ffb9
6 changed files with 115 additions and 112 deletions

View File

@ -199,10 +199,10 @@ struct cluster_client_t
bool try_send(osd_client_t & cl);
void send_replies();
void handle_send(ring_data_t *data, int peer_fd);
void handle_send(int result, int peer_fd);
void read_requests();
void handle_read(ring_data_t *data, int peer_fd);
bool handle_read(int result, int peer_fd);
void handle_finished_read(osd_client_t & cl);
void handle_op_hdr(osd_client_t *cl);
void handle_reply_hdr(osd_client_t *cl);

View File

@ -52,6 +52,7 @@ struct http_co_t
~http_co_t();
void start_connection();
void handle_events();
void handle_connect_result();
void submit_read();
void submit_send();
@ -142,6 +143,7 @@ void websocket_t::close()
http_co_t::~http_co_t()
{
epoll_events = 0;
if (timeout_id >= 0)
{
tfd->clear_timer(timeout_id);
@ -204,25 +206,6 @@ void http_co_t::start_connection()
delete this;
});
}
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
{
this->epoll_events |= epoll_events;
if (state == HTTP_CO_CONNECTING)
{
handle_connect_result();
}
else
{
if (this->epoll_events & EPOLLIN)
{
submit_read();
}
else if (this->epoll_events & (EPOLLRDHUP|EPOLLERR))
{
delete this;
}
}
});
epoll_events = 0;
// Finally call connect
r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
@ -232,40 +215,61 @@ void http_co_t::start_connection()
delete this;
return;
}
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
{
this->epoll_events |= epoll_events;
handle_events();
});
state = HTTP_CO_CONNECTING;
}
void http_co_t::handle_events()
{
while (epoll_events)
{
if (state == HTTP_CO_CONNECTING)
{
handle_connect_result();
}
else
{
epoll_events &= ~EPOLLOUT;
if (epoll_events & EPOLLIN)
{
submit_read();
}
else if (epoll_events & (EPOLLRDHUP|EPOLLERR))
{
delete this;
return;
}
}
}
}
void http_co_t::handle_connect_result()
{
if (epoll_events & (EPOLLOUT | EPOLLERR))
int result = 0;
socklen_t result_len = sizeof(result);
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
{
int result = 0;
socklen_t result_len = sizeof(result);
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
{
result = errno;
}
if (result != 0)
{
parsed.error_code = result;
delete this;
return;
}
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
state = HTTP_CO_SENDING_REQUEST;
submit_send();
result = errno;
}
else
if (result != 0)
{
parsed.error_code = result;
delete this;
return;
}
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
state = HTTP_CO_SENDING_REQUEST;
submit_send();
}
void http_co_t::submit_read()
{
int res;
again:
if (rbuf.size() != READ_BUFFER_SIZE)
{
rbuf.resize(READ_BUFFER_SIZE);
@ -273,34 +277,24 @@ again:
read_iov = { .iov_base = rbuf.data(), .iov_len = READ_BUFFER_SIZE };
read_msg.msg_iov = &read_iov;
read_msg.msg_iovlen = 1;
epoll_events = epoll_events & ~EPOLLIN;
res = recvmsg(peer_fd, &read_msg, 0);
if (res < 0)
{
res = -errno;
}
if (res == -EAGAIN)
if (res == -EAGAIN || res == 0)
{
res = 0;
epoll_events = epoll_events & ~EPOLLIN;
}
if (res < 0)
else if (res < 0)
{
delete this;
return;
}
response += std::string(rbuf.data(), res);
if (res == READ_BUFFER_SIZE)
else if (res > 0)
{
goto again;
}
if (!handle_read())
{
return;
}
if (res < READ_BUFFER_SIZE && (epoll_events & (EPOLLRDHUP|EPOLLERR)))
{
delete this;
return;
response += std::string(rbuf.data(), res);
handle_read();
}
}
@ -331,7 +325,9 @@ again:
if (state == HTTP_CO_SENDING_REQUEST)
{
if (sent >= request.size())
{
state = HTTP_CO_REQUEST_SENT;
}
else
goto again;
}

69
osd.cpp
View File

@ -284,38 +284,7 @@ restart:
{
if (events[i].data.fd == listen_fd)
{
// Accept new connections
sockaddr_in addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
{
assert(peer_fd != 0);
char peer_str[256];
printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
c_cli.clients[peer_fd] = {
.peer_addr = addr,
.peer_port = ntohs(addr.sin_port),
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTED,
.in_buf = malloc(c_cli.receive_buffer_size),
};
// Add FD to epoll
set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
{
c_cli.handle_peer_epoll(peer_fd, epoll_events);
});
// Try to accept next connection
peer_addr_size = sizeof(addr);
}
if (peer_fd == -1 && errno != EAGAIN)
{
throw std::runtime_error(std::string("accept: ") + strerror(errno));
}
accept_connections();
}
else
{
@ -329,6 +298,42 @@ restart:
}
}
void osd_t::accept_connections()
{
// Accept new connections
sockaddr_in addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
{
assert(peer_fd != 0);
char peer_str[256];
printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
c_cli.clients[peer_fd] = {
.peer_addr = addr,
.peer_port = ntohs(addr.sin_port),
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTED,
.in_buf = malloc(c_cli.receive_buffer_size),
};
// Add FD to epoll
set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events)
{
c_cli.handle_peer_epoll(peer_fd, epoll_events);
});
// Try to accept next connection
peer_addr_size = sizeof(addr);
}
if (peer_fd == -1 && errno != EAGAIN)
{
throw std::runtime_error(std::string("accept: ") + strerror(errno));
}
}
void osd_t::exec_op(osd_op_t *cur_op)
{
clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin);

1
osd.h
View File

@ -149,6 +149,7 @@ class osd_t
// event loop, socket read/write
void loop();
void accept_connections();
void set_fd_handler(int fd, std::function<void(int, int)> handler);
void handle_epoll_events();

View File

@ -25,26 +25,26 @@ void cluster_client_t::read_requests()
}
cl.read_msg.msg_iov = &cl.read_iov;
cl.read_msg.msg_iovlen = 1;
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); };
data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data->res, peer_fd); };
my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0);
}
read_ready_clients.clear();
}
void cluster_client_t::handle_read(ring_data_t *data, int peer_fd)
bool cluster_client_t::handle_read(int result, int peer_fd)
{
auto cl_it = clients.find(peer_fd);
if (cl_it != clients.end())
{
auto & cl = cl_it->second;
if (data->res < 0 && data->res != -EAGAIN)
if (result < 0 && result != -EAGAIN)
{
// this is a client socket, so don't panic. just disconnect it
printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res));
printf("Client %d socket read error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result));
stop_client(peer_fd);
return;
return false;
}
if (data->res == -EAGAIN || cl.read_iov.iov_base == cl.in_buf && data->res < receive_buffer_size)
if (result == -EAGAIN || result < cl.read_iov.iov_len)
{
cl.read_ready--;
if (cl.read_ready > 0)
@ -54,16 +54,12 @@ void cluster_client_t::handle_read(ring_data_t *data, int peer_fd)
{
read_ready_clients.push_back(peer_fd);
}
if (data->res == -EAGAIN)
{
return;
}
if (data->res > 0)
if (result > 0)
{
if (cl.read_iov.iov_base == cl.in_buf)
{
// Compose operation(s) from the buffer
int remain = data->res;
int remain = result;
void *curbuf = cl.in_buf;
while (remain > 0)
{
@ -99,15 +95,20 @@ void cluster_client_t::handle_read(ring_data_t *data, int peer_fd)
else
{
// Long data
cl.read_remaining -= data->res;
cl.read_buf += data->res;
cl.read_remaining -= result;
cl.read_buf += result;
if (cl.read_remaining <= 0)
{
handle_finished_read(cl);
}
}
if (result >= cl.read_iov.iov_len)
{
return true;
}
}
}
return false;
}
void cluster_client_t::handle_finished_read(osd_client_t & cl)

View File

@ -65,7 +65,7 @@ bool cluster_client_t::try_send(osd_client_t & cl)
}
cl.write_msg.msg_iov = cl.write_op->send_list.get_iovec();
cl.write_msg.msg_iovlen = cl.write_op->send_list.get_size();
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data, peer_fd); };
data->callback = [this, peer_fd](ring_data_t *data) { handle_send(data->res, peer_fd); };
my_uring_prep_sendmsg(sqe, peer_fd, &cl.write_msg, 0);
return true;
}
@ -84,34 +84,34 @@ void cluster_client_t::send_replies()
write_ready_clients.clear();
}
void cluster_client_t::handle_send(ring_data_t *data, int peer_fd)
void cluster_client_t::handle_send(int result, int peer_fd)
{
auto cl_it = clients.find(peer_fd);
if (cl_it != clients.end())
{
auto & cl = cl_it->second;
if (data->res < 0 && data->res != -EAGAIN)
if (result < 0 && result != -EAGAIN)
{
// this is a client socket, so don't panic. just disconnect it
printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -data->res, strerror(-data->res));
printf("Client %d socket write error: %d (%s). Disconnecting client\n", peer_fd, -result, strerror(-result));
stop_client(peer_fd);
return;
}
if (data->res >= 0)
if (result >= 0)
{
osd_op_t *cur_op = cl.write_op;
while (data->res > 0 && cur_op->send_list.sent < cur_op->send_list.count)
while (result > 0 && cur_op->send_list.sent < cur_op->send_list.count)
{
iovec & iov = cur_op->send_list.buf[cur_op->send_list.sent];
if (iov.iov_len <= data->res)
if (iov.iov_len <= result)
{
data->res -= iov.iov_len;
result -= iov.iov_len;
cur_op->send_list.sent++;
}
else
{
iov.iov_len -= data->res;
iov.iov_base += data->res;
iov.iov_len -= result;
iov.iov_base += result;
break;
}
}