diff --git a/osd.cpp b/osd.cpp index 784912516..5b7615cec 100644 --- a/osd.cpp +++ b/osd.cpp @@ -196,7 +196,6 @@ restart: { char peer_str[256]; printf("osd: new client %d: connection from %s port %d\n", peer_fd, inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); - fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); clients[peer_fd] = { @@ -208,7 +207,7 @@ restart: // Add FD to epoll epoll_event ev; ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; + ev.events = EPOLLET | EPOLLRDHUP; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); diff --git a/osd.h b/osd.h index 37a5283da..fbf0e996e 100644 --- a/osd.h +++ b/osd.h @@ -131,8 +131,8 @@ struct osd_client_t int read_ready = 0; osd_op_t *read_op = NULL; int read_reply_id = 0; - iovec read_iov; - msghdr read_msg; + iovec read_iov = { 0 }; + msghdr read_msg = { 0 }; void *read_buf = NULL; int read_remaining = 0; int read_state = 0; @@ -215,6 +215,7 @@ class osd_t // event loop, socket read/write void loop(); void handle_epoll_events(); + bool try_receive(osd_client_t & cl); void read_requests(); void handle_read(ring_data_t *data, int peer_fd); void handle_op_hdr(osd_client_t *cl); diff --git a/osd_peering.cpp b/osd_peering.cpp index 6abaa9632..404330f54 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -89,7 +89,7 @@ void osd_t::connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port // Add FD to epoll (EPOLLOUT for tracking connect() result) epoll_event ev; ev.data.fd = peer_fd; - ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; + ev.events = EPOLLOUT | EPOLLRDHUP | EPOLLET; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); @@ -115,12 +115,13 @@ void osd_t::handle_connect_result(int peer_fd) } int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) & ~O_NONBLOCK); // Disable EPOLLOUT on this fd cl.connect_callback = NULL; cl.peer_state = PEER_CONNECTED; epoll_event ev; ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; + ev.events = EPOLLRDHUP | EPOLLET; if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); diff --git a/osd_receive.cpp b/osd_receive.cpp index 9d5157a9a..e476619e9 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -1,40 +1,46 @@ #include "osd.h" +bool osd_t::try_receive(osd_client_t & cl) +{ + int peer_fd = cl.peer_fd; + io_uring_sqe* sqe = ringloop->get_sqe(); + if (!sqe) + { + return false; + } + ring_data_t* data = ((ring_data_t*)sqe->user_data); + if (!cl.read_buf) + { + // no reads in progress + // so this is either a new command or a reply to a previously sent command + if (!cl.read_op) + { + cl.read_op = new osd_op_t; + cl.read_op->peer_fd = peer_fd; + } + cl.read_op->op_type = OSD_OP_IN; + cl.read_buf = &cl.read_op->req.buf; + cl.read_remaining = OSD_PACKET_SIZE; + cl.read_state = CL_READ_OP; + } + cl.read_iov.iov_base = cl.read_buf; + cl.read_iov.iov_len = cl.read_remaining; + cl.read_msg.msg_iov = &cl.read_iov; + cl.read_msg.msg_iovlen = 1; + data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; + my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); + return true; +} + void osd_t::read_requests() { - for (int i = 0; i < read_ready_clients.size(); i++) + for (auto & p: clients) { - int peer_fd = read_ready_clients[i]; - auto & cl = clients[peer_fd]; - io_uring_sqe* sqe = ringloop->get_sqe(); - if (!sqe) + if (p.second.peer_state == PEER_CONNECTED && p.second.read_iov.iov_len == 0) { - read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i); - return; + try_receive(p.second); } - ring_data_t* data = ((ring_data_t*)sqe->user_data); - if (!cl.read_buf) - { - // no reads in progress - // so this is either a new command or a reply to a previously sent command - if (!cl.read_op) - { - cl.read_op = new osd_op_t; - cl.read_op->peer_fd = peer_fd; - } - cl.read_op->op_type = OSD_OP_IN; - cl.read_buf = &cl.read_op->req.buf; - cl.read_remaining = OSD_PACKET_SIZE; - cl.read_state = CL_READ_OP; - } - cl.read_iov.iov_base = cl.read_buf; - cl.read_iov.iov_len = cl.read_remaining; - cl.read_msg.msg_iov = &cl.read_iov; - cl.read_msg.msg_iovlen = 1; - data->callback = [this, peer_fd](ring_data_t *data) { handle_read(data, peer_fd); }; - my_uring_prep_recvmsg(sqe, peer_fd, &cl.read_msg, 0); } - read_ready_clients.clear(); } void osd_t::handle_read(ring_data_t *data, int peer_fd) @@ -43,11 +49,9 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) if (cl_it != clients.end()) { auto & cl = cl_it->second; + cl.read_iov.iov_len = 0; if (data->res == -EAGAIN) { - cl.read_ready--; - if (cl.read_ready > 0) - read_ready_clients.push_back(peer_fd); return; } else if (data->res < 0) @@ -57,7 +61,6 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) stop_client(peer_fd); return; } - read_ready_clients.push_back(peer_fd); if (data->res > 0) { cl.read_remaining -= data->res;