diff --git a/src/messenger.cpp b/src/messenger.cpp index 2aa5f11a..96da14a5 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -14,9 +14,11 @@ void osd_messenger_t::init() { keepalive_timer_id = tfd->set_timer(1000, true, [this](int) { - for (auto cl_it = clients.begin(); cl_it != clients.end();) + std::vector to_stop; + std::vector to_ping; + for (auto cl_it = clients.begin(); cl_it != clients.end(); cl_it++) { - auto cl = (cl_it++)->second; + auto cl = cl_it->second; if (!cl->osd_num || cl->peer_state != PEER_CONNECTED) { // Do not run keepalive on regular clients @@ -29,7 +31,7 @@ void osd_messenger_t::init() { // Ping timed out, stop the client printf("Ping timed out for OSD %lu (client %d), disconnecting peer\n", cl->osd_num, cl->peer_fd); - stop_client(cl->peer_fd, true); + to_stop.push_back(cl->peer_fd); } } else if (cl->idle_time_remaining > 0) @@ -59,7 +61,7 @@ void osd_messenger_t::init() stop_client(fail_fd, true); } }; - outbox_push(op); + to_ping.push_back(op); cl->ping_time_remaining = osd_ping_timeout; cl->idle_time_remaining = osd_idle_timeout; } @@ -69,6 +71,15 @@ void osd_messenger_t::init() cl->idle_time_remaining = osd_idle_timeout; } } + // Don't stop clients while a 'clients' iterator is still active + for (int peer_fd: to_stop) + { + stop_client(peer_fd, true); + } + for (auto op: to_ping) + { + outbox_push(op); + } }); } @@ -180,15 +191,14 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer on_connect_peer(peer_osd, -errno); return; } - clients[peer_fd] = new osd_client_t((osd_client_t){ - .peer_addr = addr, - .peer_port = peer_port, - .peer_fd = peer_fd, - .peer_state = PEER_CONNECTING, - .connect_timeout_id = -1, - .osd_num = peer_osd, - .in_buf = malloc_or_die(receive_buffer_size), - }); + clients[peer_fd] = new osd_client_t(); + clients[peer_fd]->peer_addr = addr; + clients[peer_fd]->peer_port = peer_port; + clients[peer_fd]->peer_fd = peer_fd; + clients[peer_fd]->peer_state = PEER_CONNECTING; + clients[peer_fd]->connect_timeout_id = -1; + clients[peer_fd]->osd_num = peer_osd; + clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) { // Either OUT (connected) or HUP @@ -370,13 +380,12 @@ void osd_messenger_t::accept_connections(int listen_fd) fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - clients[peer_fd] = new osd_client_t((osd_client_t){ - .peer_addr = addr, - .peer_port = ntohs(addr.sin_port), - .peer_fd = peer_fd, - .peer_state = PEER_CONNECTED, - .in_buf = malloc_or_die(receive_buffer_size), - }); + clients[peer_fd] = new osd_client_t(); + clients[peer_fd]->peer_addr = addr; + clients[peer_fd]->peer_port = ntohs(addr.sin_port); + clients[peer_fd]->peer_fd = peer_fd; + clients[peer_fd]->peer_state = PEER_CONNECTED; + clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); // Add FD to epoll tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) { diff --git a/src/messenger.h b/src/messenger.h index 0c367970..119eb34b 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -70,6 +70,12 @@ struct osd_client_t int write_state = 0; std::vector send_list, next_send_list; std::vector outbox, next_outbox; + + ~osd_client_t() + { + free(in_buf); + in_buf = NULL; + } }; struct osd_wanted_peer_t diff --git a/src/msgr_send.cpp b/src/msgr_send.cpp index 536eeec4..0fae8969 100644 --- a/src/msgr_send.cpp +++ b/src/msgr_send.cpp @@ -180,7 +180,7 @@ void osd_messenger_t::handle_send(int result, osd_client_t *cl) cl->refs--; if (cl->peer_state == PEER_STOPPED) { - if (!cl->refs) + if (cl->refs <= 0) { delete cl; } diff --git a/src/msgr_stop.cpp b/src/msgr_stop.cpp index 57a8f585..5caa65ec 100644 --- a/src/msgr_stop.cpp +++ b/src/msgr_stop.cpp @@ -49,53 +49,38 @@ void osd_messenger_t::stop_client(int peer_fd, bool force) { return; } - uint64_t repeer_osd = 0; osd_client_t *cl = it->second; - if (cl->peer_state == PEER_CONNECTED) - { - if (cl->osd_num) - { - // Reload configuration from etcd when the connection is dropped - if (log_level > 0) - printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); - repeer_osd = cl->osd_num; - } - else - { - if (log_level > 0) - printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); - } - } - else if (!force) + if (cl->peer_state == PEER_CONNECTING && !force || cl->peer_state == PEER_STOPPED) { return; } + if (log_level > 0) + { + if (cl->osd_num) + { + printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl->osd_num); + } + else + { + printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); + } + } + // First set state to STOPPED so another stop_client() call doesn't try to free it again + cl->refs++; cl->peer_state = PEER_STOPPED; - clients.erase(it); + if (cl->osd_num) + { + // ...and forget OSD peer + osd_peer_fds.erase(cl->osd_num); + } #ifndef __MOCK__ + // Then remove FD from the eventloop so we don't accidentally read something tfd->set_fd_handler(peer_fd, false, NULL); if (cl->connect_timeout_id >= 0) { tfd->clear_timer(cl->connect_timeout_id); cl->connect_timeout_id = -1; } -#endif - if (cl->osd_num) - { - osd_peer_fds.erase(cl->osd_num); - } - if (cl->read_op) - { - if (cl->read_op->callback) - { - cancel_op(cl->read_op); - } - else - { - delete cl->read_op; - } - cl->read_op = NULL; - } for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) { if (*rit == peer_fd) @@ -112,22 +97,39 @@ void osd_messenger_t::stop_client(int peer_fd, bool force) break; } } - free(cl->in_buf); - cl->in_buf = NULL; -#ifndef __MOCK__ - close(peer_fd); #endif - if (repeer_osd) + if (cl->osd_num) { - // First repeer PGs as canceling OSD ops may push new operations - // and we need correct PG states when we do that - repeer_pgs(repeer_osd); + // Then repeer PGs because cancel_op() callbacks can try to perform + // some actions and we need correct PG states to not do something silly + repeer_pgs(cl->osd_num); + } + // Then cancel all operations + if (cl->read_op) + { + if (!cl->read_op->callback) + { + delete cl->read_op; + } + cl->read_op = NULL; } if (cl->osd_num) { // Cancel outbound operations cancel_osd_ops(cl); } +#ifndef __MOCK__ + // And close the FD only when everything is done + // ...because peer_fd number can get reused after close() + close(peer_fd); +#endif + // Find the item again because it can be invalidated at this point + it = clients.find(peer_fd); + if (it != clients.end()) + { + clients.erase(it); + } + cl->refs--; if (cl->refs <= 0) { delete cl;