Browse Source

Support IPv6 OSD addresses

sec_osd_msgr
Vitaliy Filippov 5 months ago
parent
commit
20a4406acc
  1. 11
      src/CMakeLists.txt
  2. 60
      src/addr_util.cpp
  3. 7
      src/addr_util.h
  4. 12
      src/fio_sec_osd.cpp
  5. 30
      src/http_client.cpp
  6. 2
      src/http_client.h
  7. 23
      src/messenger.cpp
  8. 2
      src/messenger.h
  9. 33
      src/osd.cpp
  10. 12
      src/osd_test.cpp
  11. 13
      src/stub_bench.cpp
  12. 34
      src/stub_osd.cpp
  13. 25
      src/stub_uring_osd.cpp

11
src/CMakeLists.txt

@ -88,8 +88,8 @@ if (IBVERBS_LIBRARIES)
set(MSGR_RDMA "msgr_rdma.cpp")
endif (IBVERBS_LIBRARIES)
add_library(vitastor_common STATIC
epoll_manager.cpp etcd_state_client.cpp
messenger.cpp msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
epoll_manager.cpp etcd_state_client.cpp messenger.cpp addr_util.cpp
msgr_stop.cpp msgr_op.cpp msgr_send.cpp msgr_receive.cpp ringloop.cpp ../json11/json11.cpp
http_client.cpp osd_ops.cpp pg_states.cpp timerfd_manager.cpp base64.cpp ${MSGR_RDMA}
)
target_compile_options(vitastor_common PUBLIC -fPIC)
@ -112,6 +112,7 @@ if (${WITH_FIO})
add_library(fio_vitastor_sec SHARED
fio_sec_osd.cpp
rw_blocking.cpp
addr_util.cpp
)
target_link_libraries(fio_vitastor_sec
tcmalloc_minimal
@ -189,11 +190,11 @@ endif (${WITH_QEMU})
### Test stubs
# stub_osd, stub_bench, osd_test
add_executable(stub_osd stub_osd.cpp rw_blocking.cpp)
add_executable(stub_osd stub_osd.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(stub_osd tcmalloc_minimal)
add_executable(stub_bench stub_bench.cpp rw_blocking.cpp)
add_executable(stub_bench stub_bench.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(stub_bench tcmalloc_minimal)
add_executable(osd_test osd_test.cpp rw_blocking.cpp)
add_executable(osd_test osd_test.cpp rw_blocking.cpp addr_util.cpp)
target_link_libraries(osd_test tcmalloc_minimal)
# osd_rmw_test

60
src/addr_util.cpp

@ -0,0 +1,60 @@
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <stdexcept>
#include "addr_util.h"
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr)
{
if (parse_port)
{
int p = str.rfind(':');
if (p != std::string::npos && !(str.length() > 0 && str[p-1] == ']')) // "[ipv6]" which contains ':'
{
char null_byte = 0;
int n = sscanf(str.c_str()+p+1, "%d%c", &default_port, &null_byte);
if (n != 1 || default_port >= 0x10000)
return false;
str = str.substr(0, p);
}
}
if (inet_pton(AF_INET, str.c_str(), &((struct sockaddr_in*)addr)->sin_addr) == 1)
{
addr->sa_family = AF_INET;
((struct sockaddr_in*)addr)->sin_port = htons(default_port);
return true;
}
if (str.length() >= 2 && str[0] == '[' && str[str.length()-1] == ']')
str = str.substr(1, str.length()-2);
if (inet_pton(AF_INET6, str.c_str(), &((struct sockaddr_in6*)addr)->sin6_addr) == 1)
{
addr->sa_family = AF_INET6;
((struct sockaddr_in6*)addr)->sin6_port = htons(default_port);
return true;
}
return false;
}
std::string addr_to_string(const sockaddr &addr)
{
char peer_str[256];
bool ok = false;
int port;
if (addr.sa_family == AF_INET)
{
ok = !!inet_ntop(AF_INET, &((sockaddr_in*)&addr)->sin_addr, peer_str, 256);
port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else if (addr.sa_family == AF_INET6)
{
ok = !!inet_ntop(AF_INET6, &((sockaddr_in6*)&addr)->sin6_addr, peer_str, 256);
port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
}
else
throw std::runtime_error("Unknown address family "+std::to_string(addr.sa_family));
if (!ok)
throw std::runtime_error(std::string("inet_ntop: ") + strerror(errno));
return std::string(peer_str)+":"+std::to_string(port);
}

7
src/addr_util.h

@ -0,0 +1,7 @@
#pragma once
#include <sys/socket.h>
#include <string>
bool string_to_addr(std::string str, bool parse_port, int default_port, struct sockaddr *addr);
std::string addr_to_string(const sockaddr &addr);

12
src/fio_sec_osd.cpp

@ -28,6 +28,7 @@
#include <vector>
#include <unordered_map>
#include "addr_util.h"
#include "rw_blocking.h"
#include "osd_ops.h"
#include "fio_headers.h"
@ -152,17 +153,14 @@ static int sec_init(struct thread_data *td)
bsd->block_order = o->block_order == 0 ? 17 : o->block_order;
bsd->block_size = 1 << o->block_order;
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, o->host ? o->host : "127.0.0.1", &addr.sin_addr)) != 1)
sockaddr addr;
if (!string_to_addr(std::string(o->host ? o->host : "127.0.0.1"), false, o->port > 0 ? o->port : 11203, &addr))
{
fprintf(stderr, "server address: %s%s\n", o->host ? o->host : "127.0.0.1", r == 0 ? " is not valid" : ": no ipv4 support");
fprintf(stderr, "server address: %s is not valid\n", o->host ? o->host : "127.0.0.1");
return 1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(o->port ? o->port : 11203);
bsd->connect_fd = socket(AF_INET, SOCK_STREAM, 0);
bsd->connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (bsd->connect_fd < 0)
{
perror("socket");

30
src/http_client.cpp

@ -15,13 +15,13 @@
#include <stdexcept>
#include "addr_util.h"
#include "json11/json11.hpp"
#include "http_client.h"
#include "timerfd_manager.h"
#define READ_BUFFER_SIZE 9000
static int extract_port(std::string & host);
static std::string trim(const std::string & in);
static std::string ws_format_frame(int type, uint64_t size);
static bool ws_parse_frame(std::string & buf, int & type, std::string & res);
@ -185,19 +185,15 @@ http_co_t::~http_co_t()
void http_co_t::start_connection()
{
stackin();
int port = extract_port(host);
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, host.c_str(), &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(host.c_str(), 1, 80, &addr))
{
parsed.error_code = ENXIO;
stackout();
end();
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port ? port : 80);
peer_fd = socket(AF_INET, SOCK_STREAM, 0);
peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (peer_fd < 0)
{
parsed.error_code = errno;
@ -219,7 +215,7 @@ void http_co_t::start_connection()
}
epoll_events = 0;
// Finally call connect
r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
int r = ::connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS)
{
parsed.error_code = errno;
@ -759,22 +755,6 @@ std::vector<std::string> getifaddr_list(json11::Json mask_cfg, bool include_v6)
return addresses;
}
static int extract_port(std::string & host)
{
int port = 0;
int pos = 0;
if ((pos = host.find(':')) >= 0)
{
port = strtoull(host.c_str() + pos + 1, NULL, 10);
if (port >= 0x10000)
{
port = 0;
}
host = host.substr(0, pos);
}
return port;
}
std::string strtolower(const std::string & in)
{
std::string s = in;

2
src/http_client.h

@ -45,7 +45,7 @@ struct websocket_t
void parse_http_headers(std::string & res, http_response_t *parsed);
std::vector<std::string> getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = false);
std::vector<std::string> getifaddr_list(json11::Json mask_cfg = json11::Json(), bool include_v6 = true);
uint64_t stoull_full(const std::string & str, int base = 10);

23
src/messenger.cpp

@ -8,6 +8,7 @@
#include <netinet/tcp.h>
#include <stdexcept>
#include "addr_util.h"
#include "messenger.h"
void osd_messenger_t::init()
@ -220,23 +221,20 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd)
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port)
{
assert(peer_osd != this->osd_num);
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(peer_host, 0, peer_port, &addr))
{
on_connect_peer(peer_osd, -EINVAL);
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(peer_port ? peer_port : 11203);
int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (peer_fd < 0)
{
on_connect_peer(peer_osd, -errno);
return;
}
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && errno != EINPROGRESS)
{
close(peer_fd);
@ -485,21 +483,20 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl)
void osd_messenger_t::accept_connections(int listen_fd)
{
// Accept new connections
sockaddr_in addr;
sockaddr addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while ((peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size)) >= 0)
while ((peer_fd = accept(listen_fd, &addr, &peer_addr_size)) >= 0)
{
assert(peer_fd != 0);
char peer_str[256];
fprintf(stderr, "[OSD %lu] new client %d: connection from %s port %d\n", this->osd_num, peer_fd,
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
fprintf(stderr, "[OSD %lu] new client %d: connection from %s\n", this->osd_num, peer_fd,
addr_to_string(addr).c_str());
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
clients[peer_fd] = new osd_client_t();
clients[peer_fd]->peer_addr = addr;
clients[peer_fd]->peer_port = ntohs(addr.sin_port);
clients[peer_fd]->peer_port = ntohs(((sockaddr_in*)&addr)->sin_port);
clients[peer_fd]->peer_fd = peer_fd;
clients[peer_fd]->peer_state = PEER_CONNECTED;
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size);

2
src/messenger.h

@ -49,7 +49,7 @@ struct osd_client_t
{
int refs = 0;
sockaddr_in peer_addr;
sockaddr peer_addr;
int peer_port;
int peer_fd;
int peer_state;

33
src/osd.cpp

@ -7,6 +7,7 @@
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "addr_util.h"
#include "blockstore_impl.h"
#include "osd_primary.h"
#include "osd.h"
@ -156,14 +157,6 @@ void osd_t::parse_config(const json11::Json & config)
void osd_t::bind_socket()
{
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (config["osd_network"].is_string() ||
config["osd_network"].is_array())
{
@ -173,7 +166,7 @@ void osd_t::bind_socket()
else
for (auto v: config["osd_network"].array_items())
mask.push_back(v.string_value());
auto matched_addrs = getifaddr_list(mask, false);
auto matched_addrs = getifaddr_list(mask);
if (matched_addrs.size() > 1)
{
fprintf(stderr, "More than 1 address matches requested network(s): %s\n", json11::Json(matched_addrs).dump().c_str());
@ -192,17 +185,21 @@ void osd_t::bind_socket()
// FIXME Support multiple listening sockets
sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, bind_address.c_str(), &addr.sin_addr)) != 1)
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
close(listen_fd);
throw std::runtime_error("bind address "+bind_address+(r == 0 ? " is not valid" : ": no ipv4 support"));
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
}
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));
@ -215,7 +212,7 @@ void osd_t::bind_socket()
close(listen_fd);
throw std::runtime_error(std::string("getsockname: ") + strerror(errno));
}
listening_port = ntohs(addr.sin_port);
listening_port = ntohs(((sockaddr_in*)&addr)->sin_port);
}
else
{

12
src/osd_test.cpp

@ -16,6 +16,7 @@
#include <stdexcept>
#include "addr_util.h"
#include "osd_ops.h"
#include "rw_blocking.h"
#include "test_pattern.h"
@ -133,17 +134,14 @@ int main(int narg, char *args[])
int connect_osd(const char *osd_address, int osd_port)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, osd_address, &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(osd_address, 0, osd_port, &addr))
{
fprintf(stderr, "server address: %s%s\n", osd_address, r == 0 ? " is not valid" : ": no ipv4 support");
fprintf(stderr, "server address: %s is not valid\n", osd_address);
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(osd_port);
int connect_fd = socket(AF_INET, SOCK_STREAM, 0);
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (connect_fd < 0)
{
perror("socket");

13
src/stub_bench.cpp

@ -21,6 +21,7 @@
#include <stdexcept>
#include "addr_util.h"
#include "rw_blocking.h"
#include "osd_ops.h"
@ -66,16 +67,14 @@ int main(int narg, char *args[])
int connect_stub(const char *server_address, int server_port)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, server_address, &addr.sin_addr)) != 1)
struct sockaddr addr;
if (!string_to_addr(server_address, 0, server_port, &addr))
{
fprintf(stderr, "server address: %s%s\n", server_address, r == 0 ? " is not valid" : ": no ipv4 support");
fprintf(stderr, "server address: %s is not valid\n", server_address);
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(server_port);
int connect_fd = socket(AF_INET, SOCK_STREAM, 0);
int connect_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (connect_fd < 0)
{
perror("socket");

34
src/stub_osd.cpp

@ -37,10 +37,11 @@
#include <stdexcept>
#include "addr_util.h"
#include "rw_blocking.h"
#include "osd_ops.h"
int bind_stub(const char *bind_address, int bind_port);
int bind_stub(std::string bind_address, int bind_port);
void run_stub(int peer_fd);
@ -48,13 +49,13 @@ int main(int narg, char *args[])
{
int listen_fd = bind_stub("0.0.0.0", 11203);
// Accept new connections
sockaddr_in addr;
sockaddr addr;
socklen_t peer_addr_size = sizeof(addr);
int peer_fd;
while (1)
{
printf("stub_osd: waiting for 1 client\n");
peer_fd = accept(listen_fd, (sockaddr*)&addr, &peer_addr_size);
peer_fd = accept(listen_fd, &addr, &peer_addr_size);
if (peer_fd == -1)
{
if (errno == EAGAIN)
@ -62,9 +63,8 @@ int main(int narg, char *args[])
else
throw std::runtime_error(std::string("accept: ") + strerror(errno));
}
char peer_str[256];
printf("stub_osd: new client %d: connection from %s port %d\n", peer_fd,
inet_ntop(AF_INET, &addr.sin_addr, peer_str, 256), ntohs(addr.sin_port));
printf("stub_osd: new client %d: connection from %s\n", peer_fd,
addr_to_string(addr).c_str());
int one = 1;
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one));
run_stub(peer_fd);
@ -76,11 +76,17 @@ int main(int narg, char *args[])
return 0;
}
int bind_stub(const char *bind_address, int bind_port)
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
@ -88,17 +94,7 @@ int bind_stub(const char *bind_address, int bind_port)
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
{
close(listen_fd);
throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));

25
src/stub_uring_osd.cpp

@ -20,11 +20,12 @@
#include <stdexcept>
#include "addr_util.h"
#include "ringloop.h"
#include "epoll_manager.h"
#include "messenger.h"
int bind_stub(const char *bind_address, int bind_port);
int bind_stub(std::string bind_address, int bind_port);
void stub_exec_op(osd_messenger_t *msgr, osd_op_t *op);
@ -66,11 +67,17 @@ int main(int narg, char *args[])
return 0;
}
int bind_stub(const char *bind_address, int bind_port)
int bind_stub(std::string bind_address, int bind_port)
{
int listen_backlog = 128;
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr addr;
if (!string_to_addr(bind_address, 0, bind_port, &addr))
{
throw std::runtime_error("bind address "+bind_address+" is not valid");
}
int listen_fd = socket(addr.sa_family, SOCK_STREAM, 0);
if (listen_fd < 0)
{
throw std::runtime_error(std::string("socket: ") + strerror(errno));
@ -78,17 +85,7 @@ int bind_stub(const char *bind_address, int bind_port)
int enable = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, bind_address, &addr.sin_addr)) != 1)
{
close(listen_fd);
throw std::runtime_error("bind address "+std::string(bind_address)+(r == 0 ? " is not valid" : ": no ipv4 support"));
}
addr.sin_family = AF_INET;
addr.sin_port = htons(bind_port);
if (bind(listen_fd, (sockaddr*)&addr, sizeof(addr)) < 0)
if (bind(listen_fd, &addr, sizeof(addr)) < 0)
{
close(listen_fd);
throw std::runtime_error(std::string("bind: ") + strerror(errno));

Loading…
Cancel
Save