|
|
@ -4,10 +4,12 @@ |
|
|
|
#include <unistd.h> |
|
|
|
#include <fcntl.h> |
|
|
|
#include <sys/socket.h> |
|
|
|
#include <sys/random.h> |
|
|
|
#include <sys/epoll.h> |
|
|
|
#include <netinet/tcp.h> |
|
|
|
#include <stdexcept> |
|
|
|
|
|
|
|
#include "base64.h" |
|
|
|
#include "addr_util.h" |
|
|
|
#include "messenger.h" |
|
|
|
|
|
|
@ -194,7 +196,7 @@ void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state) |
|
|
|
try_connect_peer(peer_osd); |
|
|
|
} |
|
|
|
|
|
|
|
void osd_messenger_t::try_connect_peer(uint64_t peer_osd) |
|
|
|
void osd_messenger_t::try_connect_peer(osd_num_t peer_osd) |
|
|
|
{ |
|
|
|
auto wp_it = wanted_peers.find(peer_osd); |
|
|
|
if (wp_it == wanted_peers.end() || wp_it->second.connecting || |
|
|
@ -215,40 +217,75 @@ void osd_messenger_t::try_connect_peer(uint64_t peer_osd) |
|
|
|
wp.cur_addr = wp.address_list[wp.address_index].string_value(); |
|
|
|
wp.cur_port = wp.port; |
|
|
|
wp.connecting = true; |
|
|
|
try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port); |
|
|
|
try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port, NULL, [this](osd_num_t peer_osd, int peer_fd) |
|
|
|
{ |
|
|
|
if (peer_fd >= 0) |
|
|
|
osd_peer_fds[peer_osd] = peer_fd; |
|
|
|
on_connect_peer(peer_osd, peer_fd); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
static std::string urandom_str(int bytes) |
|
|
|
{ |
|
|
|
std::string str; |
|
|
|
str.resize(bytes); |
|
|
|
char *buf = (char*)str.data(); |
|
|
|
while (bytes > 0) |
|
|
|
{ |
|
|
|
int r = getrandom(buf, bytes, 0); |
|
|
|
if (r < 0) |
|
|
|
throw std::runtime_error(std::string("getrandom: ") + strerror(errno)); |
|
|
|
buf += r; |
|
|
|
bytes -= r; |
|
|
|
} |
|
|
|
return str; |
|
|
|
} |
|
|
|
|
|
|
|
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) |
|
|
|
void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port, |
|
|
|
osd_client_t *meta_cl, std::function<void(osd_num_t, int)> connect_callback) |
|
|
|
{ |
|
|
|
assert(peer_osd != this->osd_num); |
|
|
|
struct sockaddr addr; |
|
|
|
if (!string_to_addr(peer_host, 0, peer_port, &addr)) |
|
|
|
if (!meta_cl) |
|
|
|
{ |
|
|
|
on_connect_peer(peer_osd, -EINVAL); |
|
|
|
return; |
|
|
|
if (!string_to_addr(peer_host, 0, peer_port, &addr)) |
|
|
|
{ |
|
|
|
connect_callback(peer_osd, -EINVAL); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
addr = meta_cl->peer_addr; |
|
|
|
} |
|
|
|
int peer_fd = socket(addr.sa_family, SOCK_STREAM, 0); |
|
|
|
if (peer_fd < 0) |
|
|
|
if (peer_fd >= 0) |
|
|
|
{ |
|
|
|
on_connect_peer(peer_osd, -errno); |
|
|
|
return; |
|
|
|
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); |
|
|
|
int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); |
|
|
|
if (r < 0 && errno != EINPROGRESS) |
|
|
|
{ |
|
|
|
close(peer_fd); |
|
|
|
peer_fd = -1; |
|
|
|
} |
|
|
|
} |
|
|
|
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); |
|
|
|
int r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); |
|
|
|
if (r < 0 && errno != EINPROGRESS) |
|
|
|
if (peer_fd < 0) |
|
|
|
{ |
|
|
|
close(peer_fd); |
|
|
|
on_connect_peer(peer_osd, -errno); |
|
|
|
connect_callback(peer_osd, -errno); |
|
|
|
return; |
|
|
|
} |
|
|
|
clients[peer_fd] = new osd_client_t(); |
|
|
|
clients[peer_fd]->peer_addr = addr; |
|
|
|
clients[peer_fd]->peer_port = peer_port; |
|
|
|
clients[peer_fd]->peer_port = ((struct sockaddr_in*)&addr)->sin_port; |
|
|
|
clients[peer_fd]->peer_fd = peer_fd; |
|
|
|
clients[peer_fd]->peer_state = PEER_CONNECTING; |
|
|
|
clients[peer_fd]->connect_timeout_id = -1; |
|
|
|
clients[peer_fd]->connect_callback = connect_callback; |
|
|
|
clients[peer_fd]->osd_num = peer_osd; |
|
|
|
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); |
|
|
|
clients[peer_fd]->data_for = meta_cl ? addr_to_string(meta_cl->peer_addr) : ""; |
|
|
|
clients[peer_fd]->data_connection_cookie = meta_cl |
|
|
|
? meta_cl->data_connection_cookie : base64_encode(urandom_str(12)); |
|
|
|
tfd->set_fd_handler(peer_fd, true, [this](int peer_fd, int epoll_events) |
|
|
|
{ |
|
|
|
// Either OUT (connected) or HUP
|
|
|
@ -258,10 +295,12 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer |
|
|
|
{ |
|
|
|
clients[peer_fd]->connect_timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) |
|
|
|
{ |
|
|
|
osd_num_t peer_osd = clients.at(peer_fd)->osd_num; |
|
|
|
auto cl = clients.at(peer_fd); |
|
|
|
auto connect_callback = cl->connect_callback; |
|
|
|
cl->connect_callback = NULL; |
|
|
|
osd_num_t peer_osd = cl->osd_num; |
|
|
|
stop_client(peer_fd, true); |
|
|
|
on_connect_peer(peer_osd, -EPIPE); |
|
|
|
return; |
|
|
|
connect_callback(peer_osd, -EPIPE); |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
@ -283,8 +322,10 @@ void osd_messenger_t::handle_connect_epoll(int peer_fd) |
|
|
|
} |
|
|
|
if (result != 0) |
|
|
|
{ |
|
|
|
auto connect_callback = cl->connect_callback; |
|
|
|
cl->connect_callback = NULL; |
|
|
|
stop_client(peer_fd, true); |
|
|
|
on_connect_peer(peer_osd, -result); |
|
|
|
connect_callback(peer_osd, -result); |
|
|
|
return; |
|
|
|
} |
|
|
|
int one = 1; |
|
|
@ -364,6 +405,11 @@ void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) |
|
|
|
|
|
|
|
void osd_messenger_t::check_peer_config(osd_client_t *cl) |
|
|
|
{ |
|
|
|
json11::Json::object payload; |
|
|
|
if (cl->data_connection_cookie != "") |
|
|
|
{ |
|
|
|
payload["data_cookie"] = cl->data_connection_cookie; |
|
|
|
} |
|
|
|
osd_op_t *op = new osd_op_t(); |
|
|
|
op->op_type = OSD_OP_OUT; |
|
|
|
op->peer_fd = cl->peer_fd; |
|
|
@ -376,24 +422,33 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) |
|
|
|
}, |
|
|
|
}, |
|
|
|
}; |
|
|
|
#ifdef WITH_RDMA |
|
|
|
if (rdma_context) |
|
|
|
if (cl->data_for == "") |
|
|
|
{ |
|
|
|
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); |
|
|
|
if (cl->rdma_conn) |
|
|
|
#ifdef WITH_RDMA |
|
|
|
if (rdma_context) |
|
|
|
{ |
|
|
|
json11::Json payload = json11::Json::object { |
|
|
|
{ "connect_rdma", cl->rdma_conn->addr.to_string() }, |
|
|
|
{ "rdma_max_msg", cl->rdma_conn->max_msg }, |
|
|
|
}; |
|
|
|
std::string payload_str = payload.dump(); |
|
|
|
op->req.show_conf.json_len = payload_str.size(); |
|
|
|
op->buf = malloc_or_die(payload_str.size()); |
|
|
|
op->iov.push_back(op->buf, payload_str.size()); |
|
|
|
memcpy(op->buf, payload_str.c_str(), payload_str.size()); |
|
|
|
cl->rdma_conn = msgr_rdma_connection_t::create(rdma_context, rdma_max_send, rdma_max_recv, rdma_max_sge, rdma_max_msg); |
|
|
|
if (cl->rdma_conn) |
|
|
|
{ |
|
|
|
payload["connect_rdma"] = cl->rdma_conn->addr.to_string(); |
|
|
|
payload["rdma_max_msg"] = cl->rdma_conn->max_msg; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
#endif |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
// Mark it as a data connection
|
|
|
|
payload["data_for"] = cl->data_for; |
|
|
|
} |
|
|
|
if (payload.size()) |
|
|
|
{ |
|
|
|
std::string payload_str = json11::Json(payload).dump(); |
|
|
|
op->req.show_conf.json_len = payload_str.size(); |
|
|
|
op->buf = malloc_or_die(payload_str.size()); |
|
|
|
op->iov.push_back(op->buf, payload_str.size()); |
|
|
|
memcpy(op->buf, payload_str.c_str(), payload_str.size()); |
|
|
|
} |
|
|
|
op->callback = [this, cl](osd_op_t *op) |
|
|
|
{ |
|
|
|
std::string json_err; |
|
|
@ -426,18 +481,30 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) |
|
|
|
cl->osd_num, config["protocol_version"].uint64_value(), OSD_PROTOCOL_VERSION |
|
|
|
); |
|
|
|
} |
|
|
|
else if (cl->data_for != "" && config["data_for"] != cl->data_for) |
|
|
|
{ |
|
|
|
err = true; |
|
|
|
fprintf( |
|
|
|
stderr, "OSD %lu does not support separate data connections." |
|
|
|
" Proceeding with a single connection\n", cl->osd_num |
|
|
|
); |
|
|
|
} |
|
|
|
} |
|
|
|
if (err) |
|
|
|
{ |
|
|
|
osd_num_t peer_osd = cl->osd_num; |
|
|
|
auto connect_callback = cl->connect_callback; |
|
|
|
cl->connect_callback = NULL; |
|
|
|
stop_client(op->peer_fd); |
|
|
|
on_connect_peer(peer_osd, -1); |
|
|
|
connect_callback(peer_osd, -EINVAL); |
|
|
|
delete op; |
|
|
|
return; |
|
|
|
} |
|
|
|
#ifdef WITH_RDMA |
|
|
|
if (config["rdma_address"].is_string()) |
|
|
|
if (rdma_context && cl->rdma_conn && config["rdma_address"].is_string()) |
|
|
|
{ |
|
|
|
// Prevent creating data connection - we are trying RDMA
|
|
|
|
cl->data_connection_cookie = ""; |
|
|
|
msgr_rdma_address_t addr; |
|
|
|
if (!msgr_rdma_address_t::from_string(config["rdma_address"].string_value().c_str(), &addr) || |
|
|
|
cl->rdma_conn->connect(&addr) != 0) |
|
|
@ -450,8 +517,10 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) |
|
|
|
cl->rdma_conn = NULL; |
|
|
|
// FIXME: Keep TCP connection in this case
|
|
|
|
osd_num_t peer_osd = cl->osd_num; |
|
|
|
auto connect_callback = cl->connect_callback; |
|
|
|
cl->connect_callback = NULL; |
|
|
|
stop_client(cl->peer_fd); |
|
|
|
on_connect_peer(peer_osd, -1); |
|
|
|
connect_callback(peer_osd, -EPIPE); |
|
|
|
delete op; |
|
|
|
return; |
|
|
|
} |
|
|
@ -473,8 +542,37 @@ void osd_messenger_t::check_peer_config(osd_client_t *cl) |
|
|
|
} |
|
|
|
} |
|
|
|
#endif |
|
|
|
osd_peer_fds[cl->osd_num] = cl->peer_fd; |
|
|
|
on_connect_peer(cl->osd_num, cl->peer_fd); |
|
|
|
if (cl->data_connection_cookie != "") |
|
|
|
{ |
|
|
|
// Try to open second connection to the same address
|
|
|
|
try_connect_peer_addr(cl->osd_num, NULL, 0, cl, [this, peer_fd = cl->peer_fd](osd_num_t data_peer, int data_peer_fd) |
|
|
|
{ |
|
|
|
auto cl_it = clients.find(peer_fd); |
|
|
|
if (cl_it != clients.end()) |
|
|
|
{ |
|
|
|
// Proceed with or without the data connection
|
|
|
|
auto cl = cl_it->second; |
|
|
|
if (data_peer_fd >= 0) |
|
|
|
{ |
|
|
|
cl->data_connection_fd = data_peer_fd; |
|
|
|
auto data_cl = clients.at(data_peer_fd); |
|
|
|
data_cl->meta_connection_fd = cl->peer_fd; |
|
|
|
} |
|
|
|
osd_peer_fds[cl->osd_num] = cl->peer_fd; |
|
|
|
on_connect_peer(cl->osd_num, cl->peer_fd); |
|
|
|
} |
|
|
|
else if (data_peer_fd >= 0) |
|
|
|
{ |
|
|
|
stop_client(data_peer_fd); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
auto connect_callback = cl->connect_callback; |
|
|
|
cl->connect_callback = NULL; |
|
|
|
connect_callback(cl->osd_num, cl->peer_fd); |
|
|
|
} |
|
|
|
delete op; |
|
|
|
}; |
|
|
|
outbox_push(op); |
|
|
@ -500,6 +598,7 @@ void osd_messenger_t::accept_connections(int listen_fd) |
|
|
|
clients[peer_fd]->peer_fd = peer_fd; |
|
|
|
clients[peer_fd]->peer_state = PEER_CONNECTED; |
|
|
|
clients[peer_fd]->in_buf = malloc_or_die(receive_buffer_size); |
|
|
|
clients_by_addr[addr_to_string(addr)] = peer_fd; |
|
|
|
// Add FD to epoll
|
|
|
|
tfd->set_fd_handler(peer_fd, false, [this](int peer_fd, int epoll_events) |
|
|
|
{ |
|
|
|