diff --git a/Makefile b/Makefile index 2db81d70..1e2679ba 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ libfio_blockstore.so: ./libblockstore.so fio_engine.cpp json11.o g++ $(CXXFLAGS) -shared -o libfio_blockstore.so fio_engine.cpp json11.o ./libblockstore.so -ltcmalloc_minimal -luring OSD_OBJS := osd.o osd_secondary.o osd_receive.o osd_send.o osd_peering.o osd_flush.o osd_peering_pg.o \ - osd_primary.o osd_primary_subops.o etcd_state_client.o osd_cluster.o http_client.o pg_states.o \ + osd_primary.o osd_primary_subops.o etcd_state_client.o cluster_client.o osd_cluster.o http_client.o pg_states.o \ osd_rmw.o json11.o base64.o timerfd_manager.o base64.o: base64.cpp base64.h g++ $(CXXFLAGS) -c -o $@ $< @@ -48,6 +48,8 @@ http_client.o: http_client.cpp http_client.h g++ $(CXXFLAGS) -c -o $@ $< etcd_state_client.o: etcd_state_client.cpp etcd_state_client.h http_client.h pg_states.h g++ $(CXXFLAGS) -c -o $@ $< +cluster_client.o: cluster_client.cpp cluster_client.h osd_ops.h timerfd_manager.h ringloop.h + g++ $(CXXFLAGS) -c -o $@ $< osd_flush.o: osd_flush.cpp osd.h osd_ops.h osd_peering_pg.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering_pg.o: osd_peering_pg.cpp object_id.h osd_peering_pg.h pg_states.h diff --git a/cluster_client.cpp b/cluster_client.cpp new file mode 100644 index 00000000..e2f6d403 --- /dev/null +++ b/cluster_client.cpp @@ -0,0 +1,358 @@ +#include +#include +#include +#include +#include + +#include "cluster_client.h" + +osd_op_t::~osd_op_t() +{ + assert(!bs_op); + if (op_data) + { + free(op_data); + } + if (rmw_buf) + { + free(rmw_buf); + } + if (buf) + { + // Note: reusing osd_op_t WILL currently lead to memory leaks + // So we don't reuse it, but free it every time + free(buf); + } +} + +void cluster_client_t::connect_peer(uint64_t peer_osd, json11::Json address_list, int port) +{ + if (wanted_peers.find(peer_osd) == wanted_peers.end()) + { + wanted_peers[peer_osd] = (osd_wanted_peer_t){ + .address_list = address_list, + .port = port, + }; + } + else + { + wanted_peers[peer_osd].address_list = address_list; + wanted_peers[peer_osd].port = port; + } + wanted_peers[peer_osd].address_changed = true; + if (!wanted_peers[peer_osd].connecting && + (time(NULL) - wanted_peers[peer_osd].last_connect_attempt) >= peer_connect_interval) + { + try_connect_peer(peer_osd); + } +} + +void cluster_client_t::try_connect_peer(uint64_t peer_osd) +{ + auto wp_it = wanted_peers.find(peer_osd); + if (wp_it == wanted_peers.end()) + { + return; + } + if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) + { + wanted_peers.erase(peer_osd); + return; + } + auto & wp = wp_it->second; + if (wp.address_index >= wp.address_list.array_items().size()) + { + return; + } + wp.cur_addr = wp.address_list[wp.address_index].string_value(); + wp.cur_port = wp.port; + try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port); +} + +void cluster_client_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) +{ + struct sockaddr_in addr; + int r; + if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) + { + on_connect_peer(peer_osd, -EINVAL); + return; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(peer_port ? peer_port : 11203); + int peer_fd = socket(AF_INET, SOCK_STREAM, 0); + if (peer_fd < 0) + { + on_connect_peer(peer_osd, -errno); + return; + } + fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); + int timeout_id = -1; + if (peer_connect_timeout > 0) + { + timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) + { + osd_num_t peer_osd = clients[peer_fd].osd_num; + stop_client(peer_fd); + on_connect_peer(peer_osd, -EIO); + return; + }); + } + r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + if (r < 0 && errno != EINPROGRESS) + { + close(peer_fd); + on_connect_peer(peer_osd, -errno); + return; + } + assert(peer_osd != this->osd_num); + clients[peer_fd] = (osd_client_t){ + .peer_addr = addr, + .peer_port = peer_port, + .peer_fd = peer_fd, + .peer_state = PEER_CONNECTING, + .connect_timeout_id = timeout_id, + .osd_num = peer_osd, + .in_buf = malloc(receive_buffer_size), + }; + tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + { + // Either OUT (connected) or HUP + handle_connect_epoll(peer_fd); + }); +} + +void cluster_client_t::handle_connect_epoll(int peer_fd) +{ + auto & cl = clients[peer_fd]; + if (cl.connect_timeout_id >= 0) + { + tfd->clear_timer(cl.connect_timeout_id); + cl.connect_timeout_id = -1; + } + osd_num_t peer_osd = cl.osd_num; + int result = 0; + socklen_t result_len = sizeof(result); + if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) + { + result = errno; + } + if (result != 0) + { + stop_client(peer_fd); + on_connect_peer(peer_osd, -result); + return; + } + int one = 1; + setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + cl.peer_state = PEER_CONNECTED; + // FIXME Disable EPOLLOUT on this fd + tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) + { + handle_peer_epoll(peer_fd, epoll_events); + }); + // Check OSD number + check_peer_config(cl); +} + +void cluster_client_t::handle_peer_epoll(int peer_fd, int epoll_events) +{ + // Mark client as ready (i.e. some data is available) + if (epoll_events & EPOLLRDHUP) + { + // Stop client + printf("[OSD %lu] client %d disconnected\n", this->osd_num, peer_fd); + stop_client(peer_fd); + } + else if (epoll_events & EPOLLIN) + { + // Mark client as ready (i.e. some data is available) + auto & cl = clients[peer_fd]; + cl.read_ready++; + if (cl.read_ready == 1) + { + read_ready_clients.push_back(cl.peer_fd); + ringloop->wakeup(); + } + } +} + +void cluster_client_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) +{ + auto & wp = wanted_peers.at(peer_osd); + wp.connecting = false; + if (peer_fd < 0) + { + printf("Failed to connect to peer OSD %lu address %s port %d: %s\n", peer_osd, wp.cur_addr.c_str(), wp.cur_port, strerror(-peer_fd)); + if (wp.address_changed) + { + wp.address_changed = false; + wp.address_index = 0; + try_connect_peer(peer_osd); + } + else if (wp.address_index < wp.address_list.array_items().size()-1) + { + // Try other addresses + wp.address_index++; + try_connect_peer(peer_osd); + } + else + { + // Retry again in seconds + wp.last_connect_attempt = time(NULL); + wp.address_index = 0; + tfd->set_timer(1000*peer_connect_interval, false, [this, peer_osd](int) + { + try_connect_peer(peer_osd); + }); + } + return; + } + printf("Connected with peer OSD %lu (fd %d)\n", peer_osd, peer_fd); + wanted_peers.erase(peer_osd); + repeer_pgs(peer_osd); +} + +void cluster_client_t::check_peer_config(osd_client_t & cl) +{ + osd_op_t *op = new osd_op_t(); + op->op_type = OSD_OP_OUT; + op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); + op->peer_fd = cl.peer_fd; + op->req = { + .show_conf = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = this->next_subop_id++, + .opcode = OSD_OP_SHOW_CONFIG, + }, + }, + }; + op->callback = [this](osd_op_t *op) + { + osd_client_t & cl = clients[op->peer_fd]; + std::string json_err; + json11::Json config; + bool err = false; + if (op->reply.hdr.retval < 0) + { + err = true; + printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl.osd_num, op->reply.hdr.retval); + } + else + { + config = json11::Json::parse(std::string((char*)op->buf), json_err); + if (json_err != "") + { + err = true; + printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl.osd_num, json_err.c_str()); + } + else if (config["osd_num"].uint64_value() != cl.osd_num) + { + err = true; + printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num); + on_connect_peer(cl.osd_num, -1); + } + } + if (err) + { + stop_client(op->peer_fd); + delete op; + return; + } + osd_peer_fds[cl.osd_num] = cl.peer_fd; + on_connect_peer(cl.osd_num, cl.peer_fd); + delete op; + }; + outbox_push(op); +} + +void cluster_client_t::cancel_osd_ops(osd_client_t & cl) +{ + for (auto p: cl.sent_ops) + { + cancel_out_op(p.second); + } + cl.sent_ops.clear(); + for (auto op: cl.outbox) + { + cancel_out_op(op); + } + cl.outbox.clear(); + if (cl.write_op) + { + cancel_out_op(cl.write_op); + cl.write_op = NULL; + } +} + +void cluster_client_t::cancel_out_op(osd_op_t *op) +{ + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->req.hdr.id; + op->reply.hdr.opcode = op->req.hdr.opcode; + op->reply.hdr.retval = -EPIPE; + // Copy lambda to be unaffected by `delete op` + std::function(op->callback)(op); +} + +void cluster_client_t::stop_client(int peer_fd) +{ + assert(peer_fd != 0); + auto it = clients.find(peer_fd); + if (it == clients.end()) + { + return; + } + uint64_t repeer_osd = 0; + osd_client_t cl = it->second; + if (cl.peer_state == PEER_CONNECTED) + { + if (cl.osd_num) + { + // Reload configuration from etcd when the connection is dropped + printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); + repeer_osd = cl.osd_num; + } + else + { + printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); + } + } + clients.erase(it); + tfd->set_fd_handler(peer_fd, NULL); + if (cl.osd_num) + { + osd_peer_fds.erase(cl.osd_num); + // Cancel outbound operations + cancel_osd_ops(cl); + } + if (cl.read_op) + { + delete cl.read_op; + cl.read_op = NULL; + } + for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) + { + if (*rit == peer_fd) + { + read_ready_clients.erase(rit); + break; + } + } + for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++) + { + if (*wit == peer_fd) + { + write_ready_clients.erase(wit); + break; + } + } + free(cl.in_buf); + assert(peer_fd != 0); + close(peer_fd); + if (repeer_osd) + { + repeer_pgs(repeer_osd); + } +} diff --git a/cluster_client.h b/cluster_client.h new file mode 100644 index 00000000..066fc3cb --- /dev/null +++ b/cluster_client.h @@ -0,0 +1,209 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "json11/json11.hpp" +#include "osd_ops.h" +#include "timerfd_manager.h" +#include "ringloop.h" + +#define OSD_OP_IN 0 +#define OSD_OP_OUT 1 + +#define CL_READ_HDR 1 +#define CL_READ_DATA 2 +#define CL_READ_REPLY_DATA 3 +#define CL_WRITE_READY 1 +#define CL_WRITE_REPLY 2 +#define MAX_EPOLL_EVENTS 64 +#define OSD_OP_INLINE_BUF_COUNT 16 + +#define PEER_CONNECTING 1 +#define PEER_CONNECTED 2 + +struct osd_op_buf_list_t +{ + int count = 0, alloc = 0, sent = 0; + iovec *buf = NULL; + iovec inline_buf[OSD_OP_INLINE_BUF_COUNT]; + + ~osd_op_buf_list_t() + { + if (buf && buf != inline_buf) + { + free(buf); + } + } + + inline iovec* get_iovec() + { + return (buf ? buf : inline_buf) + sent; + } + + inline int get_size() + { + return count - sent; + } + + inline void push_back(void *nbuf, size_t len) + { + if (count >= alloc) + { + if (!alloc) + { + alloc = OSD_OP_INLINE_BUF_COUNT; + buf = inline_buf; + } + else if (buf == inline_buf) + { + int old = alloc; + alloc = ((alloc/16)*16 + 1); + buf = (iovec*)malloc(sizeof(iovec) * alloc); + memcpy(buf, inline_buf, sizeof(iovec)*old); + } + else + { + alloc = ((alloc/16)*16 + 1); + buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); + } + } + buf[count++] = { .iov_base = nbuf, .iov_len = len }; + } +}; + +struct blockstore_op_t; + +struct osd_primary_op_data_t; + +struct osd_op_t +{ + timespec tv_begin; + uint64_t op_type = OSD_OP_IN; + int peer_fd; + osd_any_op_t req; + osd_any_reply_t reply; + blockstore_op_t *bs_op = NULL; + void *buf = NULL; + void *rmw_buf = NULL; + osd_primary_op_data_t* op_data = NULL; + std::function callback; + + osd_op_buf_list_t send_list; + + ~osd_op_t(); +}; + +struct osd_client_t +{ + sockaddr_in peer_addr; + int peer_port; + int peer_fd; + int peer_state; + int connect_timeout_id = -1; + osd_num_t osd_num = 0; + + void *in_buf = NULL; + + // Read state + int read_ready = 0; + osd_op_t *read_op = NULL; + int read_reply_id = 0; + iovec read_iov; + msghdr read_msg; + void *read_buf = NULL; + int read_remaining = 0; + int read_state = 0; + + // Outbound operations sent to this peer + std::map sent_ops; + + // Outbound messages (replies or requests) + std::deque outbox; + + // PGs dirtied by this client's primary-writes (FIXME to drop the connection) + std::set dirty_pgs; + + // Write state + osd_op_t *write_op = NULL; + msghdr write_msg; + int write_state = 0; +}; + +struct osd_wanted_peer_t +{ + json11::Json address_list; + int port; + time_t last_connect_attempt; + bool connecting, address_changed; + int address_index; + std::string cur_addr; + int cur_port; +}; + +struct osd_op_stats_t +{ + uint64_t op_stat_sum[OSD_OP_MAX+1] = { 0 }; + uint64_t op_stat_count[OSD_OP_MAX+1] = { 0 }; + uint64_t op_stat_bytes[OSD_OP_MAX+1] = { 0 }; + uint64_t subop_stat_sum[OSD_OP_MAX+1] = { 0 }; + uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 }; +}; + +struct cluster_client_t +{ + timerfd_manager_t *tfd; + ring_loop_t *ringloop; + + // osd_num_t is only for logging and asserts + osd_num_t osd_num; + int receive_buffer_size = 9000; + int peer_connect_interval = 5; + int peer_connect_timeout = 5; + int log_level = 0; + + std::map wanted_peers; + std::map osd_peer_fds; + uint64_t next_subop_id = 1; + + std::map clients; + std::vector read_ready_clients; + std::vector write_ready_clients; + + // op statistics + osd_op_stats_t stats; + + // public + void connect_peer(uint64_t osd_num, json11::Json address_list, int port); + void stop_client(int peer_fd); + void outbox_push(osd_op_t *cur_op); + std::function exec_op; + std::function repeer_pgs; + + // private + void try_connect_peer(uint64_t osd_num); + void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); + void handle_connect_epoll(int peer_fd); + void handle_peer_epoll(int peer_fd, int epoll_events); + void on_connect_peer(osd_num_t peer_osd, int peer_fd); + void check_peer_config(osd_client_t & cl); + void cancel_osd_ops(osd_client_t & cl); + void cancel_out_op(osd_op_t *op); + + bool try_send(osd_client_t & cl); + void send_replies(); + void handle_send(ring_data_t *data, int peer_fd); + + void read_requests(); + void handle_read(ring_data_t *data, int peer_fd); + void handle_finished_read(osd_client_t & cl); + void handle_op_hdr(osd_client_t *cl); + void handle_reply_hdr(osd_client_t *cl); +}; diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp index ee61463d..82be8f6d 100644 --- a/etcd_state_client.cpp +++ b/etcd_state_client.cpp @@ -365,6 +365,10 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso { this->peer_states.erase(peer_osd); } + if (on_change_osd_state_hook != NULL) + { + on_change_osd_state_hook(peer_osd); + } } } } diff --git a/etcd_state_client.h b/etcd_state_client.h index e4a5439a..91b35938 100644 --- a/etcd_state_client.h +++ b/etcd_state_client.h @@ -47,6 +47,7 @@ struct etcd_state_client_t std::function on_load_config_hook; std::function load_pgs_checks_hook; std::function on_load_pgs_hook; + std::function on_change_osd_state_hook; json_kv_t parse_etcd_kv(const json11::Json & kv_json); void etcd_call(std::string api, json11::Json payload, int timeout, std::function callback); diff --git a/osd.cpp b/osd.cpp index 4c8e30c0..8aab34d7 100644 --- a/osd.cpp +++ b/osd.cpp @@ -48,6 +48,11 @@ osd_t::osd_t(blockstore_config_t & config, blockstore_t *bs, ring_loop_t *ringlo print_stats(); }); + c_cli.tfd = this->tfd; + c_cli.ringloop = this->ringloop; + c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); }; + c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); }; + init_cluster(); consumer.loop = [this]() { loop(); }; @@ -66,25 +71,6 @@ osd_t::~osd_t() close(listen_fd); } -osd_op_t::~osd_op_t() -{ - assert(!bs_op); - if (op_data) - { - free(op_data); - } - if (rmw_buf) - { - free(rmw_buf); - } - if (buf) - { - // Note: reusing osd_op_t WILL currently lead to memory leaks - // So we don't reuse it, but free it every time - free(buf); - } -} - void osd_t::parse_config(blockstore_config_t & config) { int pos; @@ -116,6 +102,7 @@ void osd_t::parse_config(blockstore_config_t & config) osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); if (!osd_num) throw std::runtime_error("osd_num is required in the configuration"); + c_cli.osd_num = osd_num; run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no"; // Cluster configuration bind_address = config["bind_address"]; @@ -154,13 +141,15 @@ void osd_t::parse_config(blockstore_config_t & config) print_stats_interval = strtoull(config["print_stats_interval"].c_str(), NULL, 10); if (!print_stats_interval) print_stats_interval = 3; - peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); - if (!peer_connect_interval) - peer_connect_interval = 5; - peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10); - if (!peer_connect_timeout) - peer_connect_timeout = 5; + c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); + if (!c_cli.peer_connect_interval) + c_cli.peer_connect_interval = 5; + c_cli.peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10); + if (!c_cli.peer_connect_timeout) + c_cli.peer_connect_timeout = 5; log_level = strtoull(config["log_level"].c_str(), NULL, 10); + st_cli.log_level = log_level; + c_cli.log_level = log_level; } void osd_t::bind_socket() @@ -240,8 +229,8 @@ void osd_t::loop() wait_state = 1; } handle_peers(); - read_requests(); - send_replies(); + c_cli.read_requests(); + c_cli.send_replies(); ringloop->submit(); } @@ -249,10 +238,11 @@ void osd_t::set_fd_handler(int fd, std::function handler) { if (handler != NULL) { + bool exists = epoll_handlers.find(fd) != epoll_handlers.end(); epoll_event ev; ev.data.fd = fd; ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) + if (epoll_ctl(epoll_fd, exists ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev) < 0) { throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); } @@ -307,21 +297,18 @@ restart: fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK); int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - clients[peer_fd] = { + c_cli.clients[peer_fd] = { .peer_addr = addr, .peer_port = ntohs(addr.sin_port), .peer_fd = peer_fd, .peer_state = PEER_CONNECTED, - .in_buf = malloc(receive_buffer_size), + .in_buf = malloc(c_cli.receive_buffer_size), }; // Add FD to epoll - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) + set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } + c_cli.handle_peer_epoll(peer_fd, epoll_events); + }); // Try to accept next connection peer_addr_size = sizeof(addr); } @@ -332,37 +319,8 @@ restart: } else { - auto cl_it = clients.find(events[i].data.fd); - if (cl_it != clients.end()) - { - auto & cl = cl_it->second; - if (cl.peer_state == PEER_CONNECTING) - { - // Either OUT (connected) or HUP - handle_connect_result(cl.peer_fd); - } - else if (events[i].events & EPOLLRDHUP) - { - // Stop client - printf("[OSD %lu] client %d disconnected\n", this->osd_num, cl.peer_fd); - stop_client(cl.peer_fd); - } - else - { - // Mark client as ready (i.e. some data is available) - cl.read_ready++; - if (cl.read_ready == 1) - { - read_ready_clients.push_back(cl.peer_fd); - ringloop->wakeup(); - } - } - } - else - { - auto & cb = epoll_handlers[events[i].data.fd]; - cb(events[i].data.fd, events[i].events); - } + auto & cb = epoll_handlers[events[i].data.fd]; + cb(events[i].data.fd, events[i].events); } } if (nfds == MAX_EPOLL_EVENTS) @@ -371,108 +329,6 @@ restart: } } -void osd_t::cancel_osd_ops(osd_client_t & cl) -{ - for (auto p: cl.sent_ops) - { - cancel_op(p.second); - } - cl.sent_ops.clear(); - for (auto op: cl.outbox) - { - cancel_op(op); - } - cl.outbox.clear(); - if (cl.write_op) - { - cancel_op(cl.write_op); - cl.write_op = NULL; - } -} - -void osd_t::cancel_op(osd_op_t *op) -{ - if (op->op_type == OSD_OP_OUT) - { - op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - op->reply.hdr.id = op->req.hdr.id; - op->reply.hdr.opcode = op->req.hdr.opcode; - op->reply.hdr.retval = -EPIPE; - // Copy lambda to be unaffected by `delete op` - std::function(op->callback)(op); - } - else - { - finish_op(op, -EPIPE); - } -} - -void osd_t::stop_client(int peer_fd) -{ - assert(peer_fd != 0); - auto it = clients.find(peer_fd); - if (it == clients.end()) - { - return; - } - uint64_t repeer_osd = 0; - osd_client_t cl = it->second; - if (cl.peer_state == PEER_CONNECTED) - { - if (cl.osd_num) - { - // Reload configuration from etcd when the connection is dropped - printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); - st_cli.peer_states.erase(cl.osd_num); - repeer_osd = cl.osd_num; - peering_state |= OSD_CONNECTING_PEERS; - } - else - { - printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); - } - } - clients.erase(it); - if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL) < 0 && errno != ENOENT) - { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } - if (cl.osd_num) - { - osd_peer_fds.erase(cl.osd_num); - // Cancel outbound operations - cancel_osd_ops(cl); - } - if (cl.read_op) - { - delete cl.read_op; - cl.read_op = NULL; - } - for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) - { - if (*rit == peer_fd) - { - read_ready_clients.erase(rit); - break; - } - } - for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++) - { - if (*wit == peer_fd) - { - write_ready_clients.erase(wit); - break; - } - } - free(cl.in_buf); - assert(peer_fd != 0); - close(peer_fd); - if (repeer_osd) - { - repeer_pgs(repeer_osd); - } -} - void osd_t::exec_op(osd_op_t *cur_op) { clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); @@ -537,36 +393,21 @@ void osd_t::exec_op(osd_op_t *cur_op) void osd_t::reset_stats() { - for (int p = 0; p < 2; p++) - { - for (int i = 0; i <= OSD_OP_MAX; i++) - { - if (op_stat_count[p][i] != 0) - { - op_stat_count[p][i] = 0; - op_stat_sum[p][i] = 0; - } - } - for (int i = 0; i <= OSD_OP_MAX; i++) - { - if (subop_stat_count[p][i] != 0) - { - subop_stat_count[p][i] = 0; - subop_stat_sum[p][i] = 0; - } - } - } + c_cli.stats = { 0 }; + prev_stats = { 0 }; + memset(recovery_stat_count, 0, sizeof(recovery_stat_count)); + memset(recovery_stat_bytes, 0, sizeof(recovery_stat_bytes)); } void osd_t::print_stats() { for (int i = 0; i <= OSD_OP_MAX; i++) { - if (op_stat_count[0][i] != op_stat_count[1][i]) + if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i]) { - uint64_t avg = (op_stat_sum[0][i] - op_stat_sum[1][i])/(op_stat_count[0][i] - op_stat_count[1][i]); - uint64_t bw = (op_stat_bytes[0][i] - op_stat_bytes[1][i]) / print_stats_interval; - if (op_stat_bytes[0][i] != 0) + uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]); + uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval; + if (c_cli.stats.op_stat_bytes[i] != 0) { printf( "[OSD %lu] avg latency for op %d (%s): %lu us, B/W: %.2f %s\n", osd_num, i, osd_op_names[i], avg, @@ -578,19 +419,19 @@ void osd_t::print_stats() { printf("[OSD %lu] avg latency for op %d (%s): %lu us\n", osd_num, i, osd_op_names[i], avg); } - op_stat_count[1][i] = op_stat_count[0][i]; - op_stat_sum[1][i] = op_stat_sum[0][i]; - op_stat_bytes[1][i] = op_stat_bytes[0][i]; + prev_stats.op_stat_count[i] = c_cli.stats.op_stat_count[i]; + prev_stats.op_stat_sum[i] = c_cli.stats.op_stat_sum[i]; + prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i]; } } for (int i = 0; i <= OSD_OP_MAX; i++) { - if (subop_stat_count[0][i] != subop_stat_count[1][i]) + if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i]) { - uint64_t avg = (subop_stat_sum[0][i] - subop_stat_sum[1][i])/(subop_stat_count[0][i] - subop_stat_count[1][i]); + uint64_t avg = (c_cli.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(c_cli.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]); printf("[OSD %lu] avg latency for subop %d (%s): %ld us\n", osd_num, i, osd_op_names[i], avg); - subop_stat_count[1][i] = subop_stat_count[0][i]; - subop_stat_sum[1][i] = subop_stat_sum[0][i]; + prev_stats.subop_stat_count[i] = c_cli.stats.subop_stat_count[i]; + prev_stats.subop_stat_sum[i] = c_cli.stats.subop_stat_sum[i]; } } for (int i = 0; i < 2; i++) diff --git a/osd.h b/osd.h index 8338d1cc..f808e26b 100644 --- a/osd.h +++ b/osd.h @@ -16,26 +16,11 @@ #include "blockstore.h" #include "ringloop.h" #include "timerfd_manager.h" -#include "osd_ops.h" #include "osd_peering_pg.h" +#include "cluster_client.h" #include "etcd_state_client.h" -#define OSD_OP_IN 0 -#define OSD_OP_OUT 1 - -#define CL_READ_HDR 1 -#define CL_READ_DATA 2 -#define CL_READ_REPLY_DATA 3 -#define CL_WRITE_READY 1 -#define CL_WRITE_REPLY 2 -#define MAX_EPOLL_EVENTS 64 -#define OSD_OP_INLINE_BUF_COUNT 16 - -#define PEER_CONNECTING 1 -#define PEER_CONNECTED 2 - #define OSD_LOADING_PGS 0x01 -#define OSD_CONNECTING_PEERS 0x02 #define OSD_PEERING_PGS 0x04 #define OSD_FLUSHING_PGS 0x08 #define OSD_RECOVERING 0x10 @@ -54,114 +39,6 @@ extern const char* osd_op_names[]; -struct osd_op_buf_list_t -{ - int count = 0, alloc = 0, sent = 0; - iovec *buf = NULL; - iovec inline_buf[OSD_OP_INLINE_BUF_COUNT]; - - ~osd_op_buf_list_t() - { - if (buf && buf != inline_buf) - { - free(buf); - } - } - - inline iovec* get_iovec() - { - return (buf ? buf : inline_buf) + sent; - } - - inline int get_size() - { - return count - sent; - } - - inline void push_back(void *nbuf, size_t len) - { - if (count >= alloc) - { - if (!alloc) - { - alloc = OSD_OP_INLINE_BUF_COUNT; - buf = inline_buf; - } - else if (buf == inline_buf) - { - int old = alloc; - alloc = ((alloc/16)*16 + 1); - buf = (iovec*)malloc(sizeof(iovec) * alloc); - memcpy(buf, inline_buf, sizeof(iovec)*old); - } - else - { - alloc = ((alloc/16)*16 + 1); - buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); - } - } - buf[count++] = { .iov_base = nbuf, .iov_len = len }; - } -}; - -struct osd_primary_op_data_t; - -struct osd_op_t -{ - timespec tv_begin; - uint64_t op_type = OSD_OP_IN; - int peer_fd; - osd_any_op_t req; - osd_any_reply_t reply; - blockstore_op_t *bs_op = NULL; - void *buf = NULL; - void *rmw_buf = NULL; - osd_primary_op_data_t* op_data = NULL; - std::function callback; - - osd_op_buf_list_t send_list; - - ~osd_op_t(); -}; - -struct osd_client_t -{ - sockaddr_in peer_addr; - int peer_port; - int peer_fd; - int peer_state; - int connect_timeout_id = -1; - osd_num_t osd_num = 0; - - void *in_buf = NULL; - - // Read state - int read_ready = 0; - osd_op_t *read_op = NULL; - int read_reply_id = 0; - iovec read_iov; - msghdr read_msg; - void *read_buf = NULL; - int read_remaining = 0; - int read_state = 0; - - // Outbound operations sent to this client (which is probably an OSD peer) - std::map sent_ops; - - // Outbound messages (replies or requests) - std::deque outbox; - - // PGs dirtied by this client's primary-writes (FIXME to drop the connection) - std::set dirty_pgs; - - // Write state - osd_op_t *write_op = NULL; - msghdr write_msg; - int write_state = 0; -}; - -struct osd_rmw_stripe_t; - struct osd_object_id_t { osd_num_t osd_num; @@ -177,13 +54,6 @@ struct osd_recovery_op_t osd_op_t *osd_op = NULL; }; -struct osd_wanted_peer_t -{ - bool connecting; - time_t last_connect_attempt, last_load_attempt; - int address_index; -}; - class osd_t { // config @@ -199,22 +69,19 @@ class osd_t // FIXME: Implement client queue depth limit int client_queue_depth = 128; bool allow_test_ops = true; - int receive_buffer_size = 9000; int print_stats_interval = 3; int immediate_commit = IMMEDIATE_NONE; int autosync_interval = DEFAULT_AUTOSYNC_INTERVAL; // sync every 5 seconds int recovery_queue_depth = DEFAULT_RECOVERY_QUEUE; - int peer_connect_interval = 5; - int peer_connect_timeout = 5; int log_level = 0; // cluster state etcd_state_client_t st_cli; + cluster_client_t c_cli; int etcd_failed_attempts = 0; std::string etcd_lease_id; json11::Json self_state; - std::map wanted_peers; bool loading_peer_config = false; std::set pg_state_dirty; bool pg_config_applied = false; @@ -223,13 +90,11 @@ class osd_t // peers and PGs - std::map osd_peer_fds; std::map pgs; std::set dirty_pgs; uint64_t misplaced_objects = 0, degraded_objects = 0, incomplete_objects = 0; int peering_state = 0; unsigned pg_count = 0; - uint64_t next_subop_id = 1; std::map recovery_ops; osd_op_t *autosync_op = NULL; @@ -254,16 +119,8 @@ class osd_t ring_consumer_t consumer; std::map> epoll_handlers; - std::unordered_map clients; - std::vector read_ready_clients; - std::vector write_ready_clients; - // op statistics - uint64_t op_stat_sum[2][OSD_OP_MAX+1] = { 0 }; - uint64_t op_stat_count[2][OSD_OP_MAX+1] = { 0 }; - uint64_t op_stat_bytes[2][OSD_OP_MAX+1] = { 0 }; - uint64_t subop_stat_sum[2][OSD_OP_MAX+1] = { 0 }; - uint64_t subop_stat_count[2][OSD_OP_MAX+1] = { 0 }; + osd_op_stats_t prev_stats; const char* recovery_stat_names[2] = { "degraded", "misplaced" }; uint64_t recovery_stat_count[2][2] = { 0 }; uint64_t recovery_stat_bytes[2][2] = { 0 }; @@ -271,6 +128,7 @@ class osd_t // cluster connection void parse_config(blockstore_config_t & config); void init_cluster(); + void on_change_osd_state_hook(uint64_t osd_num); void on_change_etcd_state_hook(json11::Json::object & changes); void on_load_config_hook(json11::Json::object & changes); json11::Json on_load_pgs_checks_hook(); @@ -288,30 +146,13 @@ class osd_t void report_pg_states(); void apply_pg_count(); void apply_pg_config(); - void on_connect_peer(osd_num_t peer_osd, int peer_fd); - void load_and_connect_peers(); // event loop, socket read/write void loop(); void set_fd_handler(int fd, std::function handler); void handle_epoll_events(); - void read_requests(); - void handle_read(ring_data_t *data, int peer_fd); - void handle_finished_read(osd_client_t & cl); - void handle_op_hdr(osd_client_t *cl); - void handle_reply_hdr(osd_client_t *cl); - bool try_send(osd_client_t & cl); - void send_replies(); - void handle_send(ring_data_t *data, int peer_fd); - void outbox_push(osd_client_t & cl, osd_op_t *op); // peer handling (primary OSD logic) - void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port); - void handle_connect_result(int peer_fd); - void check_peer_config(osd_client_t & cl); - void cancel_osd_ops(osd_client_t & cl); - void cancel_op(osd_op_t *op); - void stop_client(int peer_fd); void parse_test_peer(std::string peer); void handle_peers(); void repeer_pgs(osd_num_t osd_num); diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 267add65..8ef07ed7 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -36,7 +36,6 @@ void osd_t::init_cluster() }; report_pg_state(pgs[1]); pg_count = 1; - peering_state = OSD_CONNECTING_PEERS; } bind_socket(); } @@ -44,6 +43,7 @@ void osd_t::init_cluster() { st_cli.tfd = tfd; st_cli.log_level = log_level; + st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); }; st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); }; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); }; @@ -83,7 +83,7 @@ void osd_t::parse_test_peer(std::string peer) { "addresses", json11::Json::array { addr } }, { "port", port }, }; - wanted_peers[peer_osd] = { 0 }; + c_cli.connect_peer(peer_osd, json11::Json::array { addr }, port); } json11::Json osd_t::get_osd_state() @@ -125,16 +125,16 @@ json11::Json osd_t::get_statistics() for (int i = 0; i <= OSD_OP_MAX; i++) { op_stats[osd_op_names[i]] = json11::Json::object { - { "count", op_stat_count[0][i] }, - { "usec", op_stat_sum[0][i] }, - { "bytes", op_stat_bytes[0][i] }, + { "count", c_cli.stats.op_stat_count[i] }, + { "usec", c_cli.stats.op_stat_sum[i] }, + { "bytes", c_cli.stats.op_stat_bytes[i] }, }; } for (int i = 0; i <= OSD_OP_MAX; i++) { subop_stats[osd_op_names[i]] = json11::Json::object { - { "count", subop_stat_count[0][i] }, - { "usec", subop_stat_sum[0][i] }, + { "count", c_cli.stats.subop_stat_count[i] }, + { "usec", c_cli.stats.subop_stat_sum[i] }, }; } st["op_stats"] = op_stats; @@ -207,6 +207,14 @@ void osd_t::report_statistics() }); } +void osd_t::on_change_osd_state_hook(uint64_t peer_osd) +{ + if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end()) + { + c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]["addresses"], st_cli.peer_states[peer_osd]["port"].int64_value()); + } +} + void osd_t::on_change_etcd_state_hook(json11::Json::object & changes) { // FIXME apply config changes in runtime (maybe, some) @@ -546,9 +554,9 @@ void osd_t::apply_pg_config() // Add peers for (auto pg_osd: all_peers) { - if (pg_osd != this->osd_num && osd_peer_fds.find(pg_osd) == osd_peer_fds.end()) + if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end()) { - wanted_peers[pg_osd] = { 0 }; + c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]["addresses"], st_cli.peer_states[pg_osd]["port"].int64_value()); } } start_pg_peering(pg_num); @@ -560,10 +568,6 @@ void osd_t::apply_pg_config() } } } - if (wanted_peers.size() > 0) - { - peering_state |= OSD_CONNECTING_PEERS; - } report_pg_states(); this->pg_config_applied = all_applied; } @@ -734,105 +738,3 @@ void osd_t::report_pg_states() } }); } - -void osd_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) -{ - wanted_peers[peer_osd].connecting = false; - if (peer_fd < 0) - { - int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); - auto & addrs = st_cli.peer_states[peer_osd]["addresses"].array_items(); - const char *addr = addrs[wanted_peers[peer_osd].address_index].string_value().c_str(); - printf("Failed to connect to peer OSD %lu address %s port %ld: %s\n", peer_osd, addr, peer_port, strerror(-peer_fd)); - if (wanted_peers[peer_osd].address_index < addrs.size()-1) - { - // Try all addresses - wanted_peers[peer_osd].address_index++; - } - else - { - wanted_peers[peer_osd].last_connect_attempt = time(NULL); - st_cli.peer_states.erase(peer_osd); - } - return; - } - printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd); - wanted_peers.erase(peer_osd); - if (!wanted_peers.size()) - { - // Connected to all peers - printf("Connected to all peers\n"); - peering_state = peering_state & ~OSD_CONNECTING_PEERS; - } - repeer_pgs(peer_osd); -} - -void osd_t::load_and_connect_peers() -{ - json11::Json::array load_peer_txn; - for (auto wp_it = wanted_peers.begin(); wp_it != wanted_peers.end();) - { - osd_num_t peer_osd = wp_it->first; - if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) - { - // Peer is already connected, it shouldn't be in wanted_peers - wanted_peers.erase(wp_it++); - if (!wanted_peers.size()) - { - // Connected to all peers - peering_state = peering_state & ~OSD_CONNECTING_PEERS; - } - } - else if (st_cli.peer_states.find(peer_osd) == st_cli.peer_states.end()) - { - if (!loading_peer_config && (time(NULL) - wp_it->second.last_load_attempt >= peer_connect_interval)) - { - // (Re)load OSD state from etcd - wp_it->second.last_load_attempt = time(NULL); - load_peer_txn.push_back(json11::Json::object { - { "request_range", json11::Json::object { - { "key", base64_encode(st_cli.etcd_prefix+"/osd/state/"+std::to_string(peer_osd)) }, - } } - }); - } - wp_it++; - } - else if (!wp_it->second.connecting && - time(NULL) - wp_it->second.last_connect_attempt >= peer_connect_interval) - { - // Try to connect - wp_it->second.connecting = true; - const std::string addr = st_cli.peer_states[peer_osd]["addresses"][wp_it->second.address_index].string_value(); - int64_t peer_port = st_cli.peer_states[peer_osd]["port"].int64_value(); - wp_it++; - connect_peer(peer_osd, addr.c_str(), peer_port); - } - else - { - // Skip - wp_it++; - } - } - if (load_peer_txn.size() > 0) - { - st_cli.etcd_txn(json11::Json::object { { "success", load_peer_txn } }, ETCD_QUICK_TIMEOUT, [this](std::string err, json11::Json data) - { - // Ugly, but required to wake up the loop and retry connecting after seconds - tfd->set_timer(peer_connect_interval*1000, false, [](int timer_id){}); - loading_peer_config = false; - if (err != "") - { - printf("Failed to load peer states from etcd: %s\n", err.c_str()); - return; - } - for (auto & res: data["responses"].array_items()) - { - if (res["response_range"]["kvs"].array_items().size()) - { - auto kv = st_cli.parse_etcd_kv(res["response_range"]["kvs"][0]); - st_cli.parse_state(kv.key, kv.value); - } - } - }); - } -} diff --git a/osd_flush.cpp b/osd_flush.cpp index ca208a6a..b10e464b 100644 --- a/osd_flush.cpp +++ b/osd_flush.cpp @@ -79,8 +79,8 @@ void osd_t::handle_flush_op(bool rollback, pg_num_t pg_num, pg_flush_batch_t *fb else { printf("Error while doing flush on OSD %lu: %s\n", osd_num, strerror(-retval)); - assert(osd_peer_fds.find(peer_osd) != osd_peer_fds.end()); - stop_client(osd_peer_fds[peer_osd]); + assert(c_cli.osd_peer_fds.find(peer_osd) != c_cli.osd_peer_fds.end()); + c_cli.stop_client(c_cli.osd_peer_fds[peer_osd]); return; } } @@ -178,7 +178,7 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback else { // Peer - int peer_fd = osd_peer_fds[peer_osd]; + int peer_fd = c_cli.osd_peer_fds[peer_osd]; op->op_type = OSD_OP_OUT; op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); op->send_list.push_back(op->buf, count * sizeof(obj_ver_id)); @@ -187,18 +187,18 @@ void osd_t::submit_flush_op(pg_num_t pg_num, pg_flush_batch_t *fb, bool rollback .sec_stab = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, + .id = c_cli.next_subop_id++, .opcode = (uint64_t)(rollback ? OSD_OP_SECONDARY_ROLLBACK : OSD_OP_SECONDARY_STABILIZE), }, .len = count * sizeof(obj_ver_id), }, }; - op->callback = [this, pg_num, fb](osd_op_t *op) + op->callback = [this, pg_num, fb, peer_osd](osd_op_t *op) { - handle_flush_op(op->req.hdr.opcode == OSD_OP_SECONDARY_ROLLBACK, pg_num, fb, clients[op->peer_fd].osd_num, op->reply.hdr.retval); + handle_flush_op(op->req.hdr.opcode == OSD_OP_SECONDARY_ROLLBACK, pg_num, fb, peer_osd, op->reply.hdr.retval); delete op; }; - outbox_push(clients[peer_fd], op); + c_cli.outbox_push(op); } } diff --git a/osd_peering.cpp b/osd_peering.cpp index 6c755572..c4c97427 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -6,154 +6,9 @@ #include "base64.h" #include "osd.h" -void osd_t::connect_peer(osd_num_t peer_osd, const char *peer_host, int peer_port) -{ - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) - { - on_connect_peer(peer_osd, -EINVAL); - return; - } - addr.sin_family = AF_INET; - addr.sin_port = htons(peer_port ? peer_port : 11203); - int peer_fd = socket(AF_INET, SOCK_STREAM, 0); - if (peer_fd < 0) - { - on_connect_peer(peer_osd, -errno); - return; - } - fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); - int timeout_id = -1; - if (peer_connect_timeout > 0) - { - timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) - { - osd_num_t peer_osd = clients[peer_fd].osd_num; - stop_client(peer_fd); - on_connect_peer(peer_osd, -EIO); - return; - }); - } - r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); - if (r < 0 && errno != EINPROGRESS) - { - close(peer_fd); - on_connect_peer(peer_osd, -errno); - return; - } - assert(peer_osd != osd_num); - clients[peer_fd] = (osd_client_t){ - .peer_addr = addr, - .peer_port = peer_port, - .peer_fd = peer_fd, - .peer_state = PEER_CONNECTING, - .connect_timeout_id = timeout_id, - .osd_num = peer_osd, - .in_buf = malloc(receive_buffer_size), - }; - // Add FD to epoll (EPOLLOUT for tracking connect() result) - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0) - { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } -} - -void osd_t::handle_connect_result(int peer_fd) -{ - auto & cl = clients[peer_fd]; - if (cl.connect_timeout_id >= 0) - { - tfd->clear_timer(cl.connect_timeout_id); - cl.connect_timeout_id = -1; - } - osd_num_t peer_osd = cl.osd_num; - int result = 0; - socklen_t result_len = sizeof(result); - if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) - { - result = errno; - } - if (result != 0) - { - stop_client(peer_fd); - on_connect_peer(peer_osd, -result); - return; - } - int one = 1; - setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - // Disable EPOLLOUT on this fd - cl.peer_state = PEER_CONNECTED; - epoll_event ev; - ev.data.fd = peer_fd; - ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; - if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0) - { - throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno)); - } - // Check OSD number - check_peer_config(cl); -} - -void osd_t::check_peer_config(osd_client_t & cl) -{ - osd_op_t *op = new osd_op_t(); - op->op_type = OSD_OP_OUT; - op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); - op->peer_fd = cl.peer_fd; - op->req = { - .show_conf = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, - .opcode = OSD_OP_SHOW_CONFIG, - }, - }, - }; - op->callback = [this](osd_op_t *op) - { - std::string json_err; - json11::Json config = json11::Json::parse(std::string((char*)op->buf), json_err); - osd_client_t & cl = clients[op->peer_fd]; - bool err = false; - if (op->reply.hdr.retval < 0) - { - err = true; - printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl.osd_num, op->reply.hdr.retval); - } - else if (json_err != "") - { - err = true; - printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl.osd_num, json_err.c_str()); - } - else if (config["osd_num"].uint64_value() != cl.osd_num) - { - err = true; - printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num); - } - if (err) - { - stop_client(op->peer_fd); - delete op; - return; - } - osd_peer_fds[cl.osd_num] = cl.peer_fd; - on_connect_peer(cl.osd_num, cl.peer_fd); - delete op; - }; - outbox_push(cl, op); -} - // Peering loop void osd_t::handle_peers() { - if (peering_state & OSD_CONNECTING_PEERS) - { - load_and_connect_peers(); - } if (peering_state & OSD_PEERING_PGS) { bool still = false; @@ -265,7 +120,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) pg.flush_batch = NULL; for (auto p: pg.write_queue) { - cancel_op(p.second); + finish_op(p.second, -EPIPE); } pg.write_queue.clear(); for (auto it = unstable_writes.begin(); it != unstable_writes.end(); ) @@ -286,7 +141,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) for (int role = 0; role < pg.target_set.size(); role++) { pg.cur_set[role] = pg.target_set[role] == this->osd_num || - osd_peer_fds.find(pg.target_set[role]) != osd_peer_fds.end() ? pg.target_set[role] : 0; + c_cli.osd_peer_fds.find(pg.target_set[role]) != c_cli.osd_peer_fds.end() ? pg.target_set[role] : 0; if (pg.cur_set[role] != 0) { pg.pg_cursize++; @@ -306,7 +161,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) bool found = false; for (auto history_osd: history_set) { - if (history_osd != 0 && osd_peer_fds.find(history_osd) != osd_peer_fds.end()) + if (history_osd != 0 && c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end()) { found = true; break; @@ -325,16 +180,15 @@ void osd_t::start_pg_peering(pg_num_t pg_num) report_pg_state(pg); } std::set cur_peers; - for (auto peer_osd: pg.all_peers) + for (auto pg_osd: pg.all_peers) { - if (peer_osd == this->osd_num || osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) + if (pg_osd == this->osd_num || c_cli.osd_peer_fds.find(pg_osd) != c_cli.osd_peer_fds.end()) { - cur_peers.insert(peer_osd); + cur_peers.insert(pg_osd); } - else if (wanted_peers.find(peer_osd) == wanted_peers.end()) + else if (c_cli.wanted_peers.find(pg_osd) == c_cli.wanted_peers.end()) { - wanted_peers[peer_osd] = { 0 }; - peering_state |= OSD_CONNECTING_PEERS; + c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]["addresses"], st_cli.peer_states[pg_osd]["port"].int64_value()); } } pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end()); @@ -431,7 +285,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p else { // Peer - auto & cl = clients[osd_peer_fds[role_osd]]; + auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]); osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); @@ -440,7 +294,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p .sec_sync = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, + .id = c_cli.next_subop_id++, .opcode = OSD_OP_SECONDARY_SYNC, }, }, @@ -452,7 +306,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p // FIXME: Mark peer as failed and don't reconnect immediately after dropping the connection printf("Failed to sync OSD %lu: %ld (%s), disconnecting peer\n", role_osd, op->reply.hdr.retval, strerror(-op->reply.hdr.retval)); ps->list_ops.erase(role_osd); - stop_client(op->peer_fd); + c_cli.stop_client(op->peer_fd); delete op; return; } @@ -460,7 +314,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p ps->list_ops.erase(role_osd); submit_list_subop(role_osd, ps); }; - outbox_push(cl, op); + c_cli.outbox_push(op); ps->list_ops[role_osd] = op; } } @@ -506,16 +360,15 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) else { // Peer - auto & cl = clients[osd_peer_fds[role_osd]]; osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); - op->peer_fd = cl.peer_fd; + op->peer_fd = c_cli.osd_peer_fds[role_osd]; op->req = { .sec_list = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, + .id = c_cli.next_subop_id++, .opcode = OSD_OP_SECONDARY_LIST, }, .list_pg = ps->pg_num, @@ -529,7 +382,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) { printf("Failed to get object list from OSD %lu (retval=%ld), disconnecting peer\n", role_osd, op->reply.hdr.retval); ps->list_ops.erase(role_osd); - stop_client(op->peer_fd); + c_cli.stop_client(op->peer_fd); delete op; return; } @@ -547,7 +400,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) ps->list_ops.erase(role_osd); delete op; }; - outbox_push(cl, op); + c_cli.outbox_push(op); ps->list_ops[role_osd] = op; } } diff --git a/osd_primary.cpp b/osd_primary.cpp index 6196bac0..aee21a6a 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -366,7 +366,7 @@ resume_7: } // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") - this->clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); + c_cli.clients[cur_op->peer_fd].dirty_pgs.insert(op_data->pg_num); dirty_pgs.insert(op_data->pg_num); } return true; @@ -519,8 +519,8 @@ resume_6: finish: if (cur_op->peer_fd) { - auto it = clients.find(cur_op->peer_fd); - if (it != clients.end()) + auto it = c_cli.clients.find(cur_op->peer_fd); + if (it != c_cli.clients.end()) it->second.dirty_pgs.clear(); } finish_op(cur_op, 0); diff --git a/osd_primary_subops.cpp b/osd_primary_subops.cpp index fe5b10be..d763e9f0 100644 --- a/osd_primary_subops.cpp +++ b/osd_primary_subops.cpp @@ -51,14 +51,14 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) else { // FIXME add separate magic number - auto cl_it = clients.find(cur_op->peer_fd); - if (cl_it != clients.end()) + auto cl_it = c_cli.clients.find(cur_op->peer_fd); + if (cl_it != c_cli.clients.end()) { cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; cur_op->reply.hdr.id = cur_op->req.hdr.id; cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; cur_op->reply.hdr.retval = retval; - outbox_push(cl_it->second, cur_op); + c_cli.outbox_push(cur_op); } else { @@ -135,11 +135,11 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* { subops[i].op_type = OSD_OP_OUT; subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); - subops[i].peer_fd = this->osd_peer_fds.at(role_osd_num); + subops[i].peer_fd = c_cli.osd_peer_fds.at(role_osd_num); subops[i].req.sec_rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, + .id = c_cli.next_subop_id++, .opcode = (uint64_t)(w ? OSD_OP_SECONDARY_WRITE : OSD_OP_SECONDARY_READ), }, .oid = { @@ -168,10 +168,10 @@ void osd_t::submit_primary_subops(int submit_type, int pg_size, const uint64_t* if (fail_fd >= 0) { // write operation failed, drop the connection - stop_client(fail_fd); + c_cli.stop_client(fail_fd); } }; - outbox_push(clients[subops[i].peer_fd], &subops[i]); + c_cli.outbox_push(&subops[i]); } i++; } @@ -218,20 +218,20 @@ void osd_t::add_bs_subop_stats(osd_op_t *subop) uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode]; timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - op_stat_count[0][opcode]++; - if (!op_stat_count[0][opcode]) + c_cli.stats.op_stat_count[opcode]++; + if (!c_cli.stats.op_stat_count[opcode]) { - op_stat_count[0][opcode] = 1; - op_stat_sum[0][opcode] = 0; - op_stat_bytes[0][opcode] = 0; + c_cli.stats.op_stat_count[opcode] = 1; + c_cli.stats.op_stat_sum[opcode] = 0; + c_cli.stats.op_stat_bytes[opcode] = 0; } - op_stat_sum[0][opcode] += ( + c_cli.stats.op_stat_sum[opcode] += ( (tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000 ); if (opcode == OSD_OP_SECONDARY_READ || opcode == OSD_OP_SECONDARY_WRITE) { - op_stat_bytes[0][opcode] += subop->bs_op->len; + c_cli.stats.op_stat_bytes[opcode] += subop->bs_op->len; } } @@ -337,11 +337,11 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os { subops[i].op_type = OSD_OP_OUT; subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); - subops[i].peer_fd = osd_peer_fds.at(chunk.osd_num); + subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num); subops[i].req.sec_del = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, + .id = c_cli.next_subop_id++, .opcode = OSD_OP_SECONDARY_DELETE, }, .oid = { @@ -358,10 +358,10 @@ void osd_t::submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, pg_os if (fail_fd >= 0) { // delete operation failed, drop the connection - stop_client(fail_fd); + c_cli.stop_client(fail_fd); } }; - outbox_push(clients[subops[i].peer_fd], &subops[i]); + c_cli.outbox_push(&subops[i]); } i++; } @@ -396,11 +396,11 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) { subops[i].op_type = OSD_OP_OUT; subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); - subops[i].peer_fd = osd_peer_fds.at(sync_osd); + subops[i].peer_fd = c_cli.osd_peer_fds.at(sync_osd); subops[i].req.sec_sync = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, + .id = c_cli.next_subop_id++, .opcode = OSD_OP_SECONDARY_SYNC, }, }; @@ -411,10 +411,10 @@ void osd_t::submit_primary_sync_subops(osd_op_t *cur_op) if (fail_fd >= 0) { // sync operation failed, drop the connection - stop_client(fail_fd); + c_cli.stop_client(fail_fd); } }; - outbox_push(clients[subops[i].peer_fd], &subops[i]); + c_cli.outbox_push(&subops[i]); } } } @@ -449,11 +449,11 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) { subops[i].op_type = OSD_OP_OUT; subops[i].send_list.push_back(subops[i].req.buf, OSD_PACKET_SIZE); - subops[i].peer_fd = osd_peer_fds.at(stab_osd.osd_num); + subops[i].peer_fd = c_cli.osd_peer_fds.at(stab_osd.osd_num); subops[i].req.sec_stab = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = this->next_subop_id++, + .id = c_cli.next_subop_id++, .opcode = OSD_OP_SECONDARY_STABILIZE, }, .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)), @@ -466,10 +466,10 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) if (fail_fd >= 0) { // sync operation failed, drop the connection - stop_client(fail_fd); + c_cli.stop_client(fail_fd); } }; - outbox_push(clients[subops[i].peer_fd], &subops[i]); + c_cli.outbox_push(&subops[i]); } } } diff --git a/osd_receive.cpp b/osd_receive.cpp index 4dbf27f7..a86e5f9e 100644 --- a/osd_receive.cpp +++ b/osd_receive.cpp @@ -1,6 +1,6 @@ -#include "osd.h" +#include "cluster_client.h" -void osd_t::read_requests() +void cluster_client_t::read_requests() { for (int i = 0; i < read_ready_clients.size(); i++) { @@ -31,7 +31,7 @@ void osd_t::read_requests() read_ready_clients.clear(); } -void osd_t::handle_read(ring_data_t *data, int peer_fd) +void cluster_client_t::handle_read(ring_data_t *data, int peer_fd) { auto cl_it = clients.find(peer_fd); if (cl_it != clients.end()) @@ -110,7 +110,7 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd) } } -void osd_t::handle_finished_read(osd_client_t & cl) +void cluster_client_t::handle_finished_read(osd_client_t & cl) { if (cl.read_state == CL_READ_HDR) { @@ -139,13 +139,13 @@ void osd_t::handle_finished_read(osd_client_t & cl) // Measure subop latency timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - subop_stat_count[0][request->req.hdr.opcode]++; - if (!subop_stat_count[0][request->req.hdr.opcode]) + stats.subop_stat_count[request->req.hdr.opcode]++; + if (!stats.subop_stat_count[request->req.hdr.opcode]) { - subop_stat_count[0][request->req.hdr.opcode]++; - subop_stat_sum[0][request->req.hdr.opcode] = 0; + stats.subop_stat_count[request->req.hdr.opcode]++; + stats.subop_stat_sum[request->req.hdr.opcode] = 0; } - subop_stat_sum[0][request->req.hdr.opcode] += ( + stats.subop_stat_sum[request->req.hdr.opcode] += ( (tv_end.tv_sec - request->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - request->tv_begin.tv_nsec)/1000 ); @@ -157,7 +157,7 @@ void osd_t::handle_finished_read(osd_client_t & cl) } } -void osd_t::handle_op_hdr(osd_client_t *cl) +void cluster_client_t::handle_op_hdr(osd_client_t *cl) { osd_op_t *cur_op = cl->read_op; if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_READ) @@ -206,7 +206,7 @@ void osd_t::handle_op_hdr(osd_client_t *cl) } } -void osd_t::handle_reply_hdr(osd_client_t *cl) +void cluster_client_t::handle_reply_hdr(osd_client_t *cl) { osd_op_t *cur_op = cl->read_op; auto req_it = cl->sent_ops.find(cur_op->req.hdr.id); @@ -256,13 +256,13 @@ void osd_t::handle_reply_hdr(osd_client_t *cl) // Measure subop latency timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - subop_stat_count[0][op->req.hdr.opcode]++; - if (!subop_stat_count[0][op->req.hdr.opcode]) + stats.subop_stat_count[op->req.hdr.opcode]++; + if (!stats.subop_stat_count[op->req.hdr.opcode]) { - subop_stat_count[0][op->req.hdr.opcode]++; - subop_stat_sum[0][op->req.hdr.opcode] = 0; + stats.subop_stat_count[op->req.hdr.opcode]++; + stats.subop_stat_sum[op->req.hdr.opcode] = 0; } - subop_stat_sum[0][op->req.hdr.opcode] += ( + stats.subop_stat_sum[op->req.hdr.opcode] += ( (tv_end.tv_sec - op->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - op->tv_begin.tv_nsec)/1000 ); diff --git a/osd_send.cpp b/osd_send.cpp index 47d5d468..43fb3bd5 100644 --- a/osd_send.cpp +++ b/osd_send.cpp @@ -1,8 +1,9 @@ -#include "osd.h" +#include "cluster_client.h" -void osd_t::outbox_push(osd_client_t & cl, osd_op_t *cur_op) +void cluster_client_t::outbox_push(osd_op_t *cur_op) { assert(cur_op->peer_fd); + auto & cl = clients.at(cur_op->peer_fd); if (cur_op->op_type == OSD_OP_OUT) { clock_gettime(CLOCK_REALTIME, &cur_op->tv_begin); @@ -19,7 +20,7 @@ void osd_t::outbox_push(osd_client_t & cl, osd_op_t *cur_op) } } -bool osd_t::try_send(osd_client_t & cl) +bool cluster_client_t::try_send(osd_client_t & cl) { int peer_fd = cl.peer_fd; io_uring_sqe* sqe = ringloop->get_sqe(); @@ -39,26 +40,26 @@ bool osd_t::try_send(osd_client_t & cl) // Measure execution latency timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - op_stat_count[0][cl.write_op->req.hdr.opcode]++; - if (!op_stat_count[0][cl.write_op->req.hdr.opcode]) + stats.op_stat_count[cl.write_op->req.hdr.opcode]++; + if (!stats.op_stat_count[cl.write_op->req.hdr.opcode]) { - op_stat_count[0][cl.write_op->req.hdr.opcode]++; - op_stat_sum[0][cl.write_op->req.hdr.opcode] = 0; - op_stat_bytes[0][cl.write_op->req.hdr.opcode] = 0; + stats.op_stat_count[cl.write_op->req.hdr.opcode]++; + stats.op_stat_sum[cl.write_op->req.hdr.opcode] = 0; + stats.op_stat_bytes[cl.write_op->req.hdr.opcode] = 0; } - op_stat_sum[0][cl.write_op->req.hdr.opcode] += ( + stats.op_stat_sum[cl.write_op->req.hdr.opcode] += ( (tv_end.tv_sec - cl.write_op->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - cl.write_op->tv_begin.tv_nsec)/1000 ); if (cl.write_op->req.hdr.opcode == OSD_OP_READ || cl.write_op->req.hdr.opcode == OSD_OP_WRITE) { - op_stat_bytes[0][cl.write_op->req.hdr.opcode] += cl.write_op->req.rw.len; + stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.rw.len; } else if (cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_READ || cl.write_op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) { - op_stat_bytes[0][cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len; + stats.op_stat_bytes[cl.write_op->req.hdr.opcode] += cl.write_op->req.sec_rw.len; } } } @@ -69,7 +70,7 @@ bool osd_t::try_send(osd_client_t & cl) return true; } -void osd_t::send_replies() +void cluster_client_t::send_replies() { for (int i = 0; i < write_ready_clients.size(); i++) { @@ -83,7 +84,7 @@ void osd_t::send_replies() write_ready_clients.clear(); } -void osd_t::handle_send(ring_data_t *data, int peer_fd) +void cluster_client_t::handle_send(ring_data_t *data, int peer_fd) { auto cl_it = clients.find(peer_fd); if (cl_it != clients.end())