diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5a063f314..69dd915a8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -88,8 +88,8 @@ if (IBVERBS_LIBRARIES) set(MSGR_RDMA "msgr_rdma.cpp") endif (IBVERBS_LIBRARIES) add_library(vitastor_common STATIC - epoll_manager.cpp etcd_state_client.cpp - messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp + epoll_manager.cpp etcd_state_client.cpp messenger.cpp addr_util.cpp + msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA} ) target_compile_options(vitastor_common PUBLIC -fPIC) @@ -112,6 +112,7 @@ if (${WITH_FIO}) add_library(fio_vitastor_sec SHARED fio_sec_osd.cpp rw_blocking.cpp + addr_util.cpp ) target_link_libraries(fio_vitastor_sec tcmalloc_minimal @@ -189,11 +190,11 @@ endif (${WITH_QEMU}) ### Test stubs # stub_osd, stub_bench, osd_test -add_executable(stub_osd stub_osd.cpp rw_blocking.cpp) +add_executable(stub_osd stub_osd.cpp rw_blocking.cpp addr_util.cpp) target_link_libraries(stub_osd tcmalloc_minimal) -add_executable(stub_bench stub_bench.cpp rw_blocking.cpp) +add_executable(stub_bench stub_bench.cpp rw_blocking.cpp addr_util.cpp) target_link_libraries(stub_bench tcmalloc_minimal) -add_executable(osd_test osd_test.cpp rw_blocking.cpp) +add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp) target_link_libraries(osd_test tcmalloc_minimal) # osd_rmw_test diff --git a/src/addr_util.cpp b/src/addr_util.cpp new file mode 100644 index 000000000..fdab6c00e --- /dev/null +++ b/src/addr_util.cpp @@ -0,0 +1,60 @@ +#include +#include +#include + +#include + +#include "addr_util.h" + +bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr) +{ + if (parse_port) + { + int p = str.rfind(':'); + if (p != std::string::npos && !(str.length() > 0 && str[p-1] == ']')) // "[ipv6]" which contains ':' + { + char null_byte = 0; + int n = sscanf(str.c_str()+p+1, "%d%c", &default_port, &null_byte); + if (n != 1 || default_port >= 0x10000) + return false; + str = str.substr(0, p); + } + } + if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1) + { + addr->sa_family = AF_INET; + ((struct sockaddr_in*)addr)->sin_port = htons(default_port); + return true; + } + if (str.length() >= 2 && str[0] == '[' && str[str.length()-1] == ']') + str = str.substr(1, str.length()-2); + if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1) + { + addr->sa_family = AF_INET6; + ((struct sockaddr_in6*)addr)->sin6_port = htons(default_port); + return true; + } + return false; +} + +std::string addr_to_string(const sockaddr &addr) +{ + char peer_str[256]; + bool ok = false; + int port; + if (addr.sa_family == AF_INET) + { + ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256); + port = ntohs(((sockaddr_in*)&addr)->sin_port); + } + else if (addr.sa_family == AF_INET6) + { + ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256); + port = ntohs(((sockaddr_in6*)&addr)->sin6_port); + } + else + throw std::runtime_error("Unknown address family "+std::to_string(addr.sa_family)); + if (!ok) + throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno)); + return std::string(peer_str)+":"+std::to_string(port); +} diff --git a/src/addr_util.h b/src/addr_util.h new file mode 100644 index 000000000..53c6eb31e --- /dev/null +++ b/src/addr_util.h @@ -0,0 +1,7 @@ +#pragma once + +#include +#include + +bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr); +std::string addr_to_string(const sockaddr &addr); diff --git a/src/fio_sec_osd.cpp b/src/fio_sec_osd.cpp index 34af4ff30..0be2f7647 100644 --- a/src/fio_sec_osd.cpp +++ b/src/fio_sec_osd.cpp @@ -28,6 +28,7 @@ #include #include +#include "addr_util.h" #include "rw_blocking.h" #include "osd_ops.h" #include "fio_headers.h" @@ -152,17 +153,14 @@ static int sec_init(struct thread_data *td) bsd->block_order = o->block_order == 0 ? 17 : o->block_order; bsd->block_size = 1 << o->block_order; - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, o->host ? o->host : "127.0.0.1", &addr.sin_addr)) != 1) + sockaddr addr; + if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr)) { - fprintf(stderr, "server address: %s%s\n", o->host ? o->host : "127.0.0.1", r == 0 ? " is not valid" : ": no ipv4 support"); + fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1"); return 1; } - addr.sin_family = AF_INET; - addr.sin_port = htons(o->port ? o->port : 11203); - bsd->connect_fd = socket(AF_INET, SOCK_STREAM, 0); + bsd->connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (bsd->connect_fd < 0) { perror("socket"); diff --git a/src/http_client.cpp b/src/http_client.cpp index 7b23122f8..82191f161 100644 --- a/src/http_client.cpp +++ b/src/http_client.cpp @@ -15,13 +15,13 @@ #include +#include "addr_util.h" #include "json11/json11.hpp" #include "http_client.h" #include "timerfd_manager.h" #define READ_BUFFER_SIZE 9000 -static int extract_port(std::string & host); static std::string trim(const std::string & in); static std::string ws_format_frame(int type, uint64_t size); static bool ws_parse_frame(std::string & buf, int & type, std::string & res); @@ -185,19 +185,15 @@ http_co_t::~http_co_t() void http_co_t::start_connection() { stackin(); - int port = extract_port(host); - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1) + struct sockaddr addr; + if (!string_to_addr(host.c_str(), 1, 80, &addr)) { parsed.error_code = ENXIO; stackout(); end(); return; } - addr.sin_family = AF_INET; - addr.sin_port = htons(port ? port : 80); - peer_fd = socket(AF_INET, SOCK_STREAM, 0); + peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (peer_fd < 0) { parsed.error_code = errno; @@ -219,7 +215,7 @@ void http_co_t::start_connection() } epoll_events = 0; // Finally call connect - r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + int r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); if (r < 0 && errno != EINPROGRESS) { parsed.error_code = errno; @@ -759,22 +755,6 @@ std::vector getifaddr_list(json11::Json mask_cfg, bool include_v6) return addresses; } -static int extract_port(std::string & host) -{ - int port = 0; - int pos = 0; - if ((pos = host.find(':')) >= 0) - { - port = strtoull(host.c_str() + pos + 1, NULL, 10); - if (port >= 0x10000) - { - port = 0; - } - host = host.substr(0, pos); - } - return port; -} - std::string strtolower(const std::string & in) { std::string s = in; diff --git a/src/http_client.h b/src/http_client.h index ece1db695..d29965531 100644 --- a/src/http_client.h +++ b/src/http_client.h @@ -45,7 +45,7 @@ struct websocket_t void parse_http_headers(std::string & res, http_response_t *parsed); -std::vector getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = false); +std::vector getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = true); uint64_t stoull_full(const std::string & str, int base = 10); diff --git a/src/messenger.cpp b/src/messenger.cpp index 28ea48c63..a400b23c1 100644 --- a/src/messenger.cpp +++ b/src/messenger.cpp @@ -8,6 +8,7 @@ #include #include +#include "addr_util.h" #include "messenger.h" void osd_messenger_t::init() @@ -220,23 +221,20 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd) void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) { assert(peer_osd != this->osd_num); - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) + struct sockaddr addr; + if (!string_to_addr(peer_host, 0, peer_port, &addr)) { on_connect_peer(peer_osd, -EINVAL); return; } - addr.sin_family = AF_INET; - addr.sin_port = htons(peer_port ? peer_port : 11203); - int peer_fd = socket(AF_INET, SOCK_STREAM, 0); + int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (peer_fd < 0) { on_connect_peer(peer_osd, -errno); return; } fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); - r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); + int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); if (r < 0 && errno != EINPROGRESS) { close(peer_fd); @@ -485,21 +483,20 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) void osd_messenger_t::accept_connections(int listen_fd) { // Accept new connections - sockaddr_in addr; + sockaddr addr; socklen_t peer_addr_size = sizeof(addr); int peer_fd; - while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0) + while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0) { assert(peer_fd != 0); - char peer_str[256]; - fprintf(stderr, "[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd, - inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); + fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd, + addr_to_string(addr).c_str()); fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); clients[peer_fd] = new osd_client_t(); clients[peer_fd]->peer_addr = addr; - clients[peer_fd]->peer_port = ntohs(addr.sin_port); + clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port); clients[peer_fd]->peer_fd = peer_fd; clients[peer_fd]->peer_state = PEER_CONNECTED; clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); diff --git a/src/messenger.h b/src/messenger.h index fcee30c9e..6b59325b2 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -49,7 +49,7 @@ struct osd_client_t { int refs = 0; - sockaddr_in peer_addr; + sockaddr peer_addr; int peer_port; int peer_fd; int peer_state; diff --git a/src/osd.cpp b/src/osd.cpp index 200541067..95e8b5849 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -7,6 +7,7 @@ #include #include +#include "addr_util.h" #include "blockstore_impl.h" #include "osd_primary.h" #include "osd.h" @@ -156,14 +157,6 @@ void osd_t::parse_config(const json11::Json & config) void osd_t::bind_socket() { - listen_fd = socket(AF_INET, SOCK_STREAM, 0); - if (listen_fd < 0) - { - throw std::runtime_error(std::string("socket: ") + strerror(errno)); - } - int enable = 1; - setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); - if (config["osd_network"].is_string() || config["osd_network"].is_array()) { @@ -173,7 +166,7 @@ void osd_t::bind_socket() else for (auto v: config["osd_network"].array_items()) mask.push_back(v.string_value()); - auto matched_addrs = getifaddr_list(mask, false); + auto matched_addrs = getifaddr_list(mask); if (matched_addrs.size() > 1) { fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str()); @@ -192,17 +185,21 @@ void osd_t::bind_socket() // FIXME Support multiple listening sockets - sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1) + sockaddr addr; + if (!string_to_addr(bind_address, 0, bind_port, &addr)) { - close(listen_fd); - throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support")); + throw std::runtime_error("bind address "+bind_address+" is not valid"); } - addr.sin_family = AF_INET; - addr.sin_port = htons(bind_port); - if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + listen_fd = socket(addr.sa_family, SOCK_STREAM, 0); + if (listen_fd < 0) + { + throw std::runtime_error(std::string("socket: ") + strerror(errno)); + } + int enable = 1; + setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + + if (bind(listen_fd, &addr, sizeof(addr)) < 0) { close(listen_fd); throw std::runtime_error(std::string("bind: ") + strerror(errno)); @@ -215,7 +212,7 @@ void osd_t::bind_socket() close(listen_fd); throw std::runtime_error(std::string("getsockname: ") + strerror(errno)); } - listening_port = ntohs(addr.sin_port); + listening_port = ntohs(((sockaddr_in*)&addr)->sin_port); } else { diff --git a/src/osd_test.cpp b/src/osd_test.cpp index 2581a34af..7d641389b 100644 --- a/src/osd_test.cpp +++ b/src/osd_test.cpp @@ -16,6 +16,7 @@ #include +#include "addr_util.h" #include "osd_ops.h" #include "rw_blocking.h" #include "test_pattern.h" @@ -133,17 +134,14 @@ int main(int narg, char *args[]) int connect_osd(const char *osd_address, int osd_port) { - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, osd_address, &addr.sin_addr)) != 1) + struct sockaddr addr; + if (!string_to_addr(osd_address, 0, osd_port, &addr)) { - fprintf(stderr, "server address: %s%s\n", osd_address, r == 0 ? " is not valid" : ": no ipv4 support"); + fprintf(stderr, "server address: %s is not valid\n", osd_address); return -1; } - addr.sin_family = AF_INET; - addr.sin_port = htons(osd_port); - int connect_fd = socket(AF_INET, SOCK_STREAM, 0); + int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (connect_fd < 0) { perror("socket"); diff --git a/src/stub_bench.cpp b/src/stub_bench.cpp index d995641a7..6ca922b5d 100644 --- a/src/stub_bench.cpp +++ b/src/stub_bench.cpp @@ -21,6 +21,7 @@ #include +#include "addr_util.h" #include "rw_blocking.h" #include "osd_ops.h" @@ -66,16 +67,14 @@ int main(int narg, char *args[]) int connect_stub(const char *server_address, int server_port) { - struct sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, server_address, &addr.sin_addr)) != 1) + struct sockaddr addr; + if (!string_to_addr(server_address, 0, server_port, &addr)) { - fprintf(stderr, "server address: %s%s\n", server_address, r == 0 ? " is not valid" : ": no ipv4 support"); + fprintf(stderr, "server address: %s is not valid\n", server_address); return -1; } - addr.sin_family = AF_INET; - addr.sin_port = htons(server_port); - int connect_fd = socket(AF_INET, SOCK_STREAM, 0); + + int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (connect_fd < 0) { perror("socket"); diff --git a/src/stub_osd.cpp b/src/stub_osd.cpp index 2b1a6804e..485f4bbe8 100644 --- a/src/stub_osd.cpp +++ b/src/stub_osd.cpp @@ -37,10 +37,11 @@ #include +#include "addr_util.h" #include "rw_blocking.h" #include "osd_ops.h" -int bind_stub(const char *bind_address, int bind_port); +int bind_stub(std::string bind_address, int bind_port); void run_stub(int peer_fd); @@ -48,13 +49,13 @@ int main(int narg, char *args[]) { int listen_fd = bind_stub("0.0.0.0", 11203); // Accept new connections - sockaddr_in addr; + sockaddr addr; socklen_t peer_addr_size = sizeof(addr); int peer_fd; while (1) { printf("stub_osd: waiting for 1 client\n"); - peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size); + peer_fd = accept(listen_fd, &addr, &peer_addr_size); if (peer_fd == -1) { if (errno == EAGAIN) @@ -62,9 +63,8 @@ int main(int narg, char *args[]) else throw std::runtime_error(std::string("accept: ") + strerror(errno)); } - char peer_str[256]; - printf("stub_osd: new client %d: connection from %s port %d\n", peer_fd, - inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port)); + printf("stub_osd: new client %d: connection from %s\n", peer_fd, + addr_to_string(addr).c_str()); int one = 1; setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); run_stub(peer_fd); @@ -76,11 +76,17 @@ int main(int narg, char *args[]) return 0; } -int bind_stub(const char *bind_address, int bind_port) +int bind_stub(std::string bind_address, int bind_port) { int listen_backlog = 128; - int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + sockaddr addr; + if (!string_to_addr(bind_address, 0, bind_port, &addr)) + { + throw std::runtime_error("bind address "+bind_address+" is not valid"); + } + + int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (listen_fd < 0) { throw std::runtime_error(std::string("socket: ") + strerror(errno)); @@ -88,17 +94,7 @@ int bind_stub(const char *bind_address, int bind_port) int enable = 1; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); - sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1) - { - close(listen_fd); - throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support")); - } - addr.sin_family = AF_INET; - addr.sin_port = htons(bind_port); - - if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + if (bind(listen_fd, &addr, sizeof(addr)) < 0) { close(listen_fd); throw std::runtime_error(std::string("bind: ") + strerror(errno)); diff --git a/src/stub_uring_osd.cpp b/src/stub_uring_osd.cpp index 355d657d3..84d94f221 100644 --- a/src/stub_uring_osd.cpp +++ b/src/stub_uring_osd.cpp @@ -20,11 +20,12 @@ #include +#include "addr_util.h" #include "ringloop.h" #include "epoll_manager.h" #include "messenger.h" -int bind_stub(const char *bind_address, int bind_port); +int bind_stub(std::string bind_address, int bind_port); void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op); @@ -66,11 +67,17 @@ int main(int narg, char *args[]) return 0; } -int bind_stub(const char *bind_address, int bind_port) +int bind_stub(std::string bind_address, int bind_port) { int listen_backlog = 128; - int listen_fd = socket(AF_INET, SOCK_STREAM, 0); + sockaddr addr; + if (!string_to_addr(bind_address, 0, bind_port, &addr)) + { + throw std::runtime_error("bind address "+bind_address+" is not valid"); + } + + int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0); if (listen_fd < 0) { throw std::runtime_error(std::string("socket: ") + strerror(errno)); @@ -78,17 +85,7 @@ int bind_stub(const char *bind_address, int bind_port) int enable = 1; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); - sockaddr_in addr; - int r; - if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1) - { - close(listen_fd); - throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support")); - } - addr.sin_family = AF_INET; - addr.sin_port = htons(bind_port); - - if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0) + if (bind(listen_fd, &addr, sizeof(addr)) < 0) { close(listen_fd); throw std::runtime_error(std::string("bind: ") + strerror(errno));