|
- // Copyright (c) Vitaliy Filippov, 2019+
- // License: VNPL-1.0 or GNU GPL-2.0+ (see README.md for details)
-
- #include <unistd.h>
- #include <fcntl.h>
- #include <sys/socket.h>
- #include <sys/epoll.h>
- #include <netinet/tcp.h>
- #include <stdexcept>
-
- #include "messenger.h"
-
- osd_op_t::~osd_op_t()
- {
- assert(!bs_op);
- assert(!op_data);
- if (rmw_buf)
- {
- free(rmw_buf);
- }
- if (buf)
- {
- // Note: reusing osd_op_t WILL currently lead to memory leaks
- // So we don't reuse it, but free it every time
- free(buf);
- }
- }
-
- osd_messenger_t::~osd_messenger_t()
- {
- while (clients.size() > 0)
- {
- stop_client(clients.begin()->first);
- }
- }
-
- void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state)
- {
- if (wanted_peers.find(peer_osd) == wanted_peers.end())
- {
- wanted_peers[peer_osd] = (osd_wanted_peer_t){
- .address_list = peer_state["addresses"],
- .port = (int)peer_state["port"].int64_value(),
- };
- }
- else
- {
- wanted_peers[peer_osd].address_list = peer_state["addresses"];
- wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value();
- }
- wanted_peers[peer_osd].address_changed = true;
- if (!wanted_peers[peer_osd].connecting &&
- (time(NULL) - wanted_peers[peer_osd].last_connect_attempt) >= peer_connect_interval)
- {
- try_connect_peer(peer_osd);
- }
- }
-
- void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
- {
- auto wp_it = wanted_peers.find(peer_osd);
- if (wp_it == wanted_peers.end())
- {
- return;
- }
- if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end())
- {
- wanted_peers.erase(peer_osd);
- return;
- }
- auto & wp = wp_it->second;
- if (wp.address_index >= wp.address_list.array_items().size())
- {
- return;
- }
- wp.cur_addr = wp.address_list[wp.address_index].string_value();
- wp.cur_port = wp.port;
- wp.connecting = true;
- try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port);
- }
-
- void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
- {
- assert(peer_osd != this->osd_num);
- struct sockaddr_in addr;
- int r;
- if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
- {
- on_connect_peer(peer_osd, -EINVAL);
- return;
- }
- addr.sin_family = AF_INET;
- addr.sin_port = htons(peer_port ? peer_port : 11203);
- int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
- if (peer_fd < 0)
- {
- on_connect_peer(peer_osd, -errno);
- return;
- }
- fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
- r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
- if (r < 0 && errno != EINPROGRESS)
- {
- close(peer_fd);
- on_connect_peer(peer_osd, -errno);
- return;
- }
- int timeout_id = -1;
- if (peer_connect_timeout > 0)
- {
- timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id)
- {
- osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
- stop_client(peer_fd);
- on_connect_peer(peer_osd, -EIO);
- return;
- });
- }
- clients[peer_fd] = new osd_client_t((osd_client_t){
- .peer_addr = addr,
- .peer_port = peer_port,
- .peer_fd = peer_fd,
- .peer_state = PEER_CONNECTING,
- .connect_timeout_id = timeout_id,
- .osd_num = peer_osd,
- .in_buf = malloc_or_die(receive_buffer_size),
- });
- tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events)
- {
- // Either OUT (connected) or HUP
- handle_connect_epoll(peer_fd);
- });
- }
-
- void osd_messenger_t::handle_connect_epoll(int peer_fd)
- {
- auto cl = clients[peer_fd];
- if (cl->connect_timeout_id >= 0)
- {
- tfd->clear_timer(cl->connect_timeout_id);
- cl->connect_timeout_id = -1;
- }
- osd_num_t peer_osd = cl->osd_num;
- int result = 0;
- socklen_t result_len = sizeof(result);
- if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
- {
- result = errno;
- }
- if (result != 0)
- {
- stop_client(peer_fd);
- on_connect_peer(peer_osd, -result);
- return;
- }
- int one = 1;
- setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
- cl->peer_state = PEER_CONNECTED;
- tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
- {
- handle_peer_epoll(peer_fd, epoll_events);
- });
- // Check OSD number
- check_peer_config(cl);
- }
-
- void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)
- {
- // Mark client as ready (i.e. some data is available)
- if (epoll_events & EPOLLRDHUP)
- {
- // Stop client
- printf("[OSD %lu] client %d disconnected\n", this->osd_num, peer_fd);
- stop_client(peer_fd);
- }
- else if (epoll_events & EPOLLIN)
- {
- // Mark client as ready (i.e. some data is available)
- auto cl = clients[peer_fd];
- cl->read_ready++;
- if (cl->read_ready == 1)
- {
- read_ready_clients.push_back(cl->peer_fd);
- if (ringloop)
- ringloop->wakeup();
- else
- read_requests();
- }
- }
- }
-
- void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd)
- {
- auto & wp = wanted_peers.at(peer_osd);
- wp.connecting = false;
- if (peer_fd < 0)
- {
- printf("Failed to connect to peer OSD %lu address %s port %d: %s\n", peer_osd, wp.cur_addr.c_str(), wp.cur_port, strerror(-peer_fd));
- if (wp.address_changed)
- {
- wp.address_changed = false;
- wp.address_index = 0;
- try_connect_peer(peer_osd);
- }
- else if (wp.address_index < wp.address_list.array_items().size()-1)
- {
- // Try other addresses
- wp.address_index++;
- try_connect_peer(peer_osd);
- }
- else
- {
- // Retry again in <peer_connect_interval> seconds
- wp.last_connect_attempt = time(NULL);
- wp.address_index = 0;
- tfd->set_timer(1000*peer_connect_interval, false, [this, peer_osd](int)
- {
- try_connect_peer(peer_osd);
- });
- }
- return;
- }
- if (log_level > 0)
- {
- printf("[OSD %lu] Connected with peer OSD %lu (client %d)\n", osd_num, peer_osd, peer_fd);
- }
- wanted_peers.erase(peer_osd);
- repeer_pgs(peer_osd);
- }
-
- void osd_messenger_t::check_peer_config(osd_client_t *cl)
- {
- osd_op_t *op = new osd_op_t();
- op->op_type = OSD_OP_OUT;
- op->peer_fd = cl->peer_fd;
- op->req = (osd_any_op_t){
- .show_conf = {
- .header = {
- .magic = SECONDARY_OSD_OP_MAGIC,
- .id = this->next_subop_id++,
- .opcode = OSD_OP_SHOW_CONFIG,
- },
- },
- };
- op->callback = [this, cl](osd_op_t *op)
- {
- std::string json_err;
- json11::Json config;
- bool err = false;
- if (op->reply.hdr.retval < 0)
- {
- err = true;
- printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl->osd_num, op->reply.hdr.retval);
- }
- else
- {
- config = json11::Json::parse(std::string((char*)op->buf), json_err);
- if (json_err != "")
- {
- err = true;
- printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl->osd_num, json_err.c_str());
- }
- else if (config["osd_num"].uint64_value() != cl->osd_num)
- {
- err = true;
- printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl->osd_num);
- }
- }
- if (err)
- {
- osd_num_t osd_num = cl->osd_num;
- stop_client(op->peer_fd);
- on_connect_peer(osd_num, -1);
- delete op;
- return;
- }
- osd_peer_fds[cl->osd_num] = cl->peer_fd;
- on_connect_peer(cl->osd_num, cl->peer_fd);
- delete op;
- };
- outbox_push(op);
- }
-
- void osd_messenger_t::cancel_osd_ops(osd_client_t *cl)
- {
- for (auto p: cl->sent_ops)
- {
- cancel_op(p.second);
- }
- cl->sent_ops.clear();
- cl->outbox.clear();
- }
-
- void osd_messenger_t::cancel_op(osd_op_t *op)
- {
- if (op->op_type == OSD_OP_OUT)
- {
- op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
- op->reply.hdr.id = op->req.hdr.id;
- op->reply.hdr.opcode = op->req.hdr.opcode;
- op->reply.hdr.retval = -EPIPE;
- // Copy lambda to be unaffected by `delete op`
- std::function<void(osd_op_t*)>(op->callback)(op);
- }
- else
- {
- // This function is only called in stop_client(), so it's fine to destroy the operation
- delete op;
- }
- }
-
- void osd_messenger_t::stop_client(int peer_fd)
- {
- assert(peer_fd != 0);
- auto it = clients.find(peer_fd);
- if (it == clients.end())
- {
- return;
- }
- uint64_t repeer_osd = 0;
- osd_client_t *cl = it->second;
- if (cl->peer_state == PEER_CONNECTED)
- {
- if (cl->osd_num)
- {
- // Reload configuration from etcd when the connection is dropped
- if (log_level > 0)
- printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num);
- repeer_osd = cl->osd_num;
- }
- else
- {
- if (log_level > 0)
- printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd);
- }
- }
- cl->peer_state = PEER_STOPPED;
- clients.erase(it);
- tfd->set_fd_handler(peer_fd, false, NULL);
- if (cl->connect_timeout_id >= 0)
- {
- tfd->clear_timer(cl->connect_timeout_id);
- cl->connect_timeout_id = -1;
- }
- if (cl->osd_num)
- {
- osd_peer_fds.erase(cl->osd_num);
- }
- if (cl->read_op)
- {
- if (cl->read_op->callback)
- {
- cancel_op(cl->read_op);
- }
- else
- {
- delete cl->read_op;
- }
- cl->read_op = NULL;
- }
- for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
- {
- if (*rit == peer_fd)
- {
- read_ready_clients.erase(rit);
- break;
- }
- }
- for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++)
- {
- if (*wit == peer_fd)
- {
- write_ready_clients.erase(wit);
- break;
- }
- }
- free(cl->in_buf);
- cl->in_buf = NULL;
- close(peer_fd);
- if (repeer_osd)
- {
- // First repeer PGs as canceling OSD ops may push new operations
- // and we need correct PG states when we do that
- repeer_pgs(repeer_osd);
- }
- if (cl->osd_num)
- {
- // Cancel outbound operations
- cancel_osd_ops(cl);
- }
- if (cl->refs <= 0)
- {
- delete cl;
- }
- }
-
- void osd_messenger_t::accept_connections(int listen_fd)
- {
- // Accept new connections
- sockaddr_in addr;
- socklen_t peer_addr_size = sizeof(addr);
- int peer_fd;
- while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
- {
- assert(peer_fd != 0);
- char peer_str[256];
- printf("[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
- inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
- fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
- int one = 1;
- setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
- clients[peer_fd] = new osd_client_t((osd_client_t){
- .peer_addr = addr,
- .peer_port = ntohs(addr.sin_port),
- .peer_fd = peer_fd,
- .peer_state = PEER_CONNECTED,
- .in_buf = malloc_or_die(receive_buffer_size),
- });
- // Add FD to epoll
- tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events)
- {
- handle_peer_epoll(peer_fd, epoll_events);
- });
- // Try to accept next connection
- peer_addr_size = sizeof(addr);
- }
- if (peer_fd == -1 && errno != EAGAIN)
- {
- throw std::runtime_error(std::string("accept: ") + strerror(errno));
- }
- }
|