Extract peer connect and read-write loop into a separate file (to be shared with the client library)
parent
fa98be6bc0
commit
0aca6e9ca8
@ -0,0 +1,358 @@ |
||||
#include <unistd.h> |
||||
#include <fcntl.h> |
||||
#include <sys/socket.h> |
||||
#include <sys/epoll.h> |
||||
#include <netinet/tcp.h> |
||||
|
||||
#include "cluster_client.h" |
||||
|
||||
osd_op_t::~osd_op_t() |
||||
{ |
||||
assert(!bs_op); |
||||
if (op_data) |
||||
{ |
||||
free(op_data); |
||||
} |
||||
if (rmw_buf) |
||||
{ |
||||
free(rmw_buf); |
||||
} |
||||
if (buf) |
||||
{ |
||||
// Note: reusing osd_op_t WILL currently lead to memory leaks
|
||||
// So we don't reuse it, but free it every time
|
||||
free(buf); |
||||
} |
||||
} |
||||
|
||||
void cluster_client_t::connect_peer(uint64_t peer_osd, json11::Json address_list, int port) |
||||
{ |
||||
if (wanted_peers.find(peer_osd) == wanted_peers.end()) |
||||
{ |
||||
wanted_peers[peer_osd] = (osd_wanted_peer_t){ |
||||
.address_list = address_list, |
||||
.port = port, |
||||
}; |
||||
} |
||||
else |
||||
{ |
||||
wanted_peers[peer_osd].address_list = address_list; |
||||
wanted_peers[peer_osd].port = port; |
||||
} |
||||
wanted_peers[peer_osd].address_changed = true; |
||||
if (!wanted_peers[peer_osd].connecting && |
||||
(time(NULL) - wanted_peers[peer_osd].last_connect_attempt) >= peer_connect_interval) |
||||
{ |
||||
try_connect_peer(peer_osd); |
||||
} |
||||
} |
||||
|
||||
void cluster_client_t::try_connect_peer(uint64_t peer_osd) |
||||
{ |
||||
auto wp_it = wanted_peers.find(peer_osd); |
||||
if (wp_it == wanted_peers.end()) |
||||
{ |
||||
return; |
||||
} |
||||
if (osd_peer_fds.find(peer_osd) != osd_peer_fds.end()) |
||||
{ |
||||
wanted_peers.erase(peer_osd); |
||||
return; |
||||
} |
||||
auto & wp = wp_it->second; |
||||
if (wp.address_index >= wp.address_list.array_items().size()) |
||||
{ |
||||
return; |
||||
} |
||||
wp.cur_addr = wp.address_list[wp.address_index].string_value(); |
||||
wp.cur_port = wp.port; |
||||
try_connect_peer_addr(peer_osd, wp.cur_addr.c_str(), wp.cur_port); |
||||
} |
||||
|
||||
void cluster_client_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port) |
||||
{ |
||||
struct sockaddr_in addr; |
||||
int r; |
||||
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1) |
||||
{ |
||||
on_connect_peer(peer_osd, -EINVAL); |
||||
return; |
||||
} |
||||
addr.sin_family = AF_INET; |
||||
addr.sin_port = htons(peer_port ? peer_port : 11203); |
||||
int peer_fd = socket(AF_INET, SOCK_STREAM, 0); |
||||
if (peer_fd < 0) |
||||
{ |
||||
on_connect_peer(peer_osd, -errno); |
||||
return; |
||||
} |
||||
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK); |
||||
int timeout_id = -1; |
||||
if (peer_connect_timeout > 0) |
||||
{ |
||||
timeout_id = tfd->set_timer(1000*peer_connect_timeout, false, [this, peer_fd](int timer_id) |
||||
{ |
||||
osd_num_t peer_osd = clients[peer_fd].osd_num; |
||||
stop_client(peer_fd); |
||||
on_connect_peer(peer_osd, -EIO); |
||||
return; |
||||
}); |
||||
} |
||||
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr)); |
||||
if (r < 0 && errno != EINPROGRESS) |
||||
{ |
||||
close(peer_fd); |
||||
on_connect_peer(peer_osd, -errno); |
||||
return; |
||||
} |
||||
assert(peer_osd != this->osd_num); |
||||
clients[peer_fd] = (osd_client_t){ |
||||
.peer_addr = addr, |
||||
.peer_port = peer_port, |
||||
.peer_fd = peer_fd, |
||||
.peer_state = PEER_CONNECTING, |
||||
.connect_timeout_id = timeout_id, |
||||
.osd_num = peer_osd, |
||||
.in_buf = malloc(receive_buffer_size), |
||||
}; |
||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) |
||||
{ |
||||
// Either OUT (connected) or HUP
|
||||
handle_connect_epoll(peer_fd); |
||||
}); |
||||
} |
||||
|
||||
void cluster_client_t::handle_connect_epoll(int peer_fd) |
||||
{ |
||||
auto & cl = clients[peer_fd]; |
||||
if (cl.connect_timeout_id >= 0) |
||||
{ |
||||
tfd->clear_timer(cl.connect_timeout_id); |
||||
cl.connect_timeout_id = -1; |
||||
} |
||||
osd_num_t peer_osd = cl.osd_num; |
||||
int result = 0; |
||||
socklen_t result_len = sizeof(result); |
||||
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) |
||||
{ |
||||
result = errno; |
||||
} |
||||
if (result != 0) |
||||
{ |
||||
stop_client(peer_fd); |
||||
on_connect_peer(peer_osd, -result); |
||||
return; |
||||
} |
||||
int one = 1; |
||||
setsockopt(peer_fd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); |
||||
cl.peer_state = PEER_CONNECTED; |
||||
// FIXME Disable EPOLLOUT on this fd
|
||||
tfd->set_fd_handler(peer_fd, [this](int peer_fd, int epoll_events) |
||||
{ |
||||
handle_peer_epoll(peer_fd, epoll_events); |
||||
}); |
||||
// Check OSD number
|
||||
check_peer_config(cl); |
||||
} |
||||
|
||||
void cluster_client_t::handle_peer_epoll(int peer_fd, int epoll_events) |
||||
{ |
||||
// Mark client as ready (i.e. some data is available)
|
||||
if (epoll_events & EPOLLRDHUP) |
||||
{ |
||||
// Stop client
|
||||
printf("[OSD %lu] client %d disconnected\n", this->osd_num, peer_fd); |
||||
stop_client(peer_fd); |
||||
} |
||||
else if (epoll_events & EPOLLIN) |
||||
{ |
||||
// Mark client as ready (i.e. some data is available)
|
||||
auto & cl = clients[peer_fd]; |
||||
cl.read_ready++; |
||||
if (cl.read_ready == 1) |
||||
{ |
||||
read_ready_clients.push_back(cl.peer_fd); |
||||
ringloop->wakeup(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void cluster_client_t::on_connect_peer(osd_num_t peer_osd, int peer_fd) |
||||
{ |
||||
auto & wp = wanted_peers.at(peer_osd); |
||||
wp.connecting = false; |
||||
if (peer_fd < 0) |
||||
{ |
||||
printf("Failed to connect to peer OSD %lu address %s port %d: %s\n", peer_osd, wp.cur_addr.c_str(), wp.cur_port, strerror(-peer_fd)); |
||||
if (wp.address_changed) |
||||
{ |
||||
wp.address_changed = false; |
||||
wp.address_index = 0; |
||||
try_connect_peer(peer_osd); |
||||
} |
||||
else if (wp.address_index < wp.address_list.array_items().size()-1) |
||||
{ |
||||
// Try other addresses
|
||||
wp.address_index++; |
||||
try_connect_peer(peer_osd); |
||||
} |
||||
else |
||||
{ |
||||
// Retry again in <peer_connect_interval> seconds
|
||||
wp.last_connect_attempt = time(NULL); |
||||
wp.address_index = 0; |
||||
tfd->set_timer(1000*peer_connect_interval, false, [this, peer_osd](int) |
||||
{ |
||||
try_connect_peer(peer_osd); |
||||
}); |
||||
} |
||||
return; |
||||
} |
||||
printf("Connected with peer OSD %lu (fd %d)\n", peer_osd, peer_fd); |
||||
wanted_peers.erase(peer_osd); |
||||
repeer_pgs(peer_osd); |
||||
} |
||||
|
||||
void cluster_client_t::check_peer_config(osd_client_t & cl) |
||||
{ |
||||
osd_op_t *op = new osd_op_t(); |
||||
op->op_type = OSD_OP_OUT; |
||||
op->send_list.push_back(op->req.buf, OSD_PACKET_SIZE); |
||||
op->peer_fd = cl.peer_fd; |
||||
op->req = { |
||||
.show_conf = { |
||||
.header = { |
||||
.magic = SECONDARY_OSD_OP_MAGIC, |
||||
.id = this->next_subop_id++, |
||||
.opcode = OSD_OP_SHOW_CONFIG, |
||||
}, |
||||
}, |
||||
}; |
||||
op->callback = [this](osd_op_t *op) |
||||
{ |
||||
osd_client_t & cl = clients[op->peer_fd]; |
||||
std::string json_err; |
||||
json11::Json config; |
||||
bool err = false; |
||||
if (op->reply.hdr.retval < 0) |
||||
{ |
||||
err = true; |
||||
printf("Failed to get config from OSD %lu (retval=%ld), disconnecting peer\n", cl.osd_num, op->reply.hdr.retval); |
||||
} |
||||
else |
||||
{ |
||||
config = json11::Json::parse(std::string((char*)op->buf), json_err); |
||||
if (json_err != "") |
||||
{ |
||||
err = true; |
||||
printf("Failed to get config from OSD %lu: bad JSON: %s, disconnecting peer\n", cl.osd_num, json_err.c_str()); |
||||
} |
||||
else if (config["osd_num"].uint64_value() != cl.osd_num) |
||||
{ |
||||
err = true; |
||||
printf("Connected to OSD %lu instead of OSD %lu, peer state is outdated, disconnecting peer\n", config["osd_num"].uint64_value(), cl.osd_num); |
||||
on_connect_peer(cl.osd_num, -1); |
||||
} |
||||
} |
||||
if (err) |
||||
{ |
||||
stop_client(op->peer_fd); |
||||
delete op; |
||||
return; |
||||
} |
||||
osd_peer_fds[cl.osd_num] = cl.peer_fd; |
||||
on_connect_peer(cl.osd_num, cl.peer_fd); |
||||
delete op; |
||||
}; |
||||
outbox_push(op); |
||||
} |
||||
|
||||
void cluster_client_t::cancel_osd_ops(osd_client_t & cl) |
||||
{ |
||||
for (auto p: cl.sent_ops) |
||||
{ |
||||
cancel_out_op(p.second); |
||||
} |
||||
cl.sent_ops.clear(); |
||||
for (auto op: cl.outbox) |
||||
{ |
||||
cancel_out_op(op); |
||||
} |
||||
cl.outbox.clear(); |
||||
if (cl.write_op) |
||||
{ |
||||
cancel_out_op(cl.write_op); |
||||
cl.write_op = NULL; |
||||
} |
||||
} |
||||
|
||||
void cluster_client_t::cancel_out_op(osd_op_t *op) |
||||
{ |
||||
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; |
||||
op->reply.hdr.id = op->req.hdr.id; |
||||
op->reply.hdr.opcode = op->req.hdr.opcode; |
||||
op->reply.hdr.retval = -EPIPE; |
||||
// Copy lambda to be unaffected by `delete op`
|
||||
std::function<void(osd_op_t*)>(op->callback)(op); |
||||
} |
||||
|
||||
void cluster_client_t::stop_client(int peer_fd) |
||||
{ |
||||
assert(peer_fd != 0); |
||||
auto it = clients.find(peer_fd); |
||||
if (it == clients.end()) |
||||
{ |
||||
return; |
||||
} |
||||
uint64_t repeer_osd = 0; |
||||
osd_client_t cl = it->second; |
||||
if (cl.peer_state == PEER_CONNECTED) |
||||
{ |
||||
if (cl.osd_num) |
||||
{ |
||||
// Reload configuration from etcd when the connection is dropped
|
||||
printf("[OSD %lu] Stopping client %d (OSD peer %lu)\n", osd_num, peer_fd, cl.osd_num); |
||||
repeer_osd = cl.osd_num; |
||||
} |
||||
else |
||||
{ |
||||
printf("[OSD %lu] Stopping client %d (regular client)\n", osd_num, peer_fd); |
||||
} |
||||
} |
||||
clients.erase(it); |
||||
tfd->set_fd_handler(peer_fd, NULL); |
||||
if (cl.osd_num) |
||||
{ |
||||
osd_peer_fds.erase(cl.osd_num); |
||||
// Cancel outbound operations
|
||||
cancel_osd_ops(cl); |
||||
} |
||||
if (cl.read_op) |
||||
{ |
||||
delete cl.read_op; |
||||
cl.read_op = NULL; |
||||
} |
||||
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++) |
||||
{ |
||||
if (*rit == peer_fd) |
||||
{ |
||||
read_ready_clients.erase(rit); |
||||
break; |
||||
} |
||||
} |
||||
for (auto wit = write_ready_clients.begin(); wit != write_ready_clients.end(); wit++) |
||||
{ |
||||
if (*wit == peer_fd) |
||||
{ |
||||
write_ready_clients.erase(wit); |
||||
break; |
||||
} |
||||
} |
||||
free(cl.in_buf); |
||||
assert(peer_fd != 0); |
||||
close(peer_fd); |
||||
if (repeer_osd) |
||||
{ |
||||
repeer_pgs(repeer_osd); |
||||
} |
||||
} |
@ -0,0 +1,209 @@ |
||||
#pragma once |
||||
|
||||
#include <sys/types.h> |
||||
#include <stdint.h> |
||||
#include <arpa/inet.h> |
||||
#include <malloc.h> |
||||
|
||||
#include <set> |
||||
#include <map> |
||||
#include <deque> |
||||
#include <vector> |
||||
|
||||
#include "json11/json11.hpp" |
||||
#include "osd_ops.h" |
||||
#include "timerfd_manager.h" |
||||
#include "ringloop.h" |
||||
|
||||
#define OSD_OP_IN 0 |
||||
#define OSD_OP_OUT 1 |
||||
|
||||
#define CL_READ_HDR 1 |
||||
#define CL_READ_DATA 2 |
||||
#define CL_READ_REPLY_DATA 3 |
||||
#define CL_WRITE_READY 1 |
||||
#define CL_WRITE_REPLY 2 |
||||
#define MAX_EPOLL_EVENTS 64 |
||||
#define OSD_OP_INLINE_BUF_COUNT 16 |
||||
|
||||
#define PEER_CONNECTING 1 |
||||
#define PEER_CONNECTED 2 |
||||
|
||||
struct osd_op_buf_list_t |
||||
{ |
||||
int count = 0, alloc = 0, sent = 0; |
||||
iovec *buf = NULL; |
||||
iovec inline_buf[OSD_OP_INLINE_BUF_COUNT]; |
||||
|
||||
~osd_op_buf_list_t() |
||||
{ |
||||
if (buf && buf != inline_buf) |
||||
{ |
||||
free(buf); |
||||
} |
||||
} |
||||
|
||||
inline iovec* get_iovec() |
||||
{ |
||||
return (buf ? buf : inline_buf) + sent; |
||||
} |
||||
|
||||
inline int get_size() |
||||
{ |
||||
return count - sent; |
||||
} |
||||
|
||||
inline void push_back(void *nbuf, size_t len) |
||||
{ |
||||
if (count >= alloc) |
||||
{ |
||||
if (!alloc) |
||||
{ |
||||
alloc = OSD_OP_INLINE_BUF_COUNT; |
||||
buf = inline_buf; |
||||
} |
||||
else if (buf == inline_buf) |
||||
{ |
||||
int old = alloc; |
||||
alloc = ((alloc/16)*16 + 1); |
||||
buf = (iovec*)malloc(sizeof(iovec) * alloc); |
||||
memcpy(buf, inline_buf, sizeof(iovec)*old); |
||||
} |
||||
else |
||||
{ |
||||
alloc = ((alloc/16)*16 + 1); |
||||
buf = (iovec*)realloc(buf, sizeof(iovec) * alloc); |
||||
} |
||||
} |
||||
buf[count++] = { .iov_base = nbuf, .iov_len = len }; |
||||
} |
||||
}; |
||||
|
||||
struct blockstore_op_t; |
||||
|
||||
struct osd_primary_op_data_t; |
||||
|
||||
struct osd_op_t |
||||
{ |
||||
timespec tv_begin; |
||||
uint64_t op_type = OSD_OP_IN; |
||||
int peer_fd; |
||||
osd_any_op_t req; |
||||
osd_any_reply_t reply; |
||||
blockstore_op_t *bs_op = NULL; |
||||
void *buf = NULL; |
||||
void *rmw_buf = NULL; |
||||
osd_primary_op_data_t* op_data = NULL; |
||||
std::function<void(osd_op_t*)> callback; |
||||
|
||||
osd_op_buf_list_t send_list; |
||||
|
||||
~osd_op_t(); |
||||
}; |
||||
|
||||
struct osd_client_t |
||||
{ |
||||
sockaddr_in peer_addr; |
||||
int peer_port; |
||||
int peer_fd; |
||||
int peer_state; |
||||
int connect_timeout_id = -1; |
||||
osd_num_t osd_num = 0; |
||||
|
||||
void *in_buf = NULL; |
||||
|
||||
// Read state
|
||||
int read_ready = 0; |
||||
osd_op_t *read_op = NULL; |
||||
int read_reply_id = 0; |
||||
iovec read_iov; |
||||
msghdr read_msg; |
||||
void *read_buf = NULL; |
||||
int read_remaining = 0; |
||||
int read_state = 0; |
||||
|
||||
// Outbound operations sent to this peer
|
||||
std::map<int, osd_op_t*> sent_ops; |
||||
|
||||
// Outbound messages (replies or requests)
|
||||
std::deque<osd_op_t*> outbox; |
||||
|
||||
// PGs dirtied by this client's primary-writes (FIXME to drop the connection)
|
||||
std::set<pg_num_t> dirty_pgs; |
||||
|
||||
// Write state
|
||||
osd_op_t *write_op = NULL; |
||||
msghdr write_msg; |
||||
int write_state = 0; |
||||
}; |
||||
|
||||
struct osd_wanted_peer_t |
||||
{ |
||||
json11::Json address_list; |
||||
int port; |
||||
time_t last_connect_attempt; |
||||
bool connecting, address_changed; |
||||
int address_index; |
||||
std::string cur_addr; |
||||
int cur_port; |
||||
}; |
||||
|
||||
struct osd_op_stats_t |
||||
{ |
||||
uint64_t op_stat_sum[OSD_OP_MAX+1] = { 0 }; |
||||
uint64_t op_stat_count[OSD_OP_MAX+1] = { 0 }; |
||||
uint64_t op_stat_bytes[OSD_OP_MAX+1] = { 0 }; |
||||
uint64_t subop_stat_sum[OSD_OP_MAX+1] = { 0 }; |
||||
uint64_t subop_stat_count[OSD_OP_MAX+1] = { 0 }; |
||||
}; |
||||
|
||||
struct cluster_client_t |
||||
{ |
||||
timerfd_manager_t *tfd; |
||||
ring_loop_t *ringloop; |
||||
|
||||
// osd_num_t is only for logging and asserts
|
||||
osd_num_t osd_num; |
||||
int receive_buffer_size = 9000; |
||||
int peer_connect_interval = 5; |
||||
int peer_connect_timeout = 5; |
||||
int log_level = 0; |
||||
|
||||
std::map<osd_num_t, osd_wanted_peer_t> wanted_peers; |
||||
std::map<uint64_t, int> osd_peer_fds; |
||||
uint64_t next_subop_id = 1; |
||||
|
||||
std::map<int, osd_client_t> clients; |
||||
std::vector<int> read_ready_clients; |
||||
std::vector<int> write_ready_clients; |
||||
|
||||
// op statistics
|
||||
osd_op_stats_t stats; |
||||
|
||||
// public
|
||||
void connect_peer(uint64_t osd_num, json11::Json address_list, int port); |
||||
void stop_client(int peer_fd); |
||||
void outbox_push(osd_op_t *cur_op); |
||||
std::function<void(osd_op_t*)> exec_op; |
||||
std::function<void(osd_num_t)> repeer_pgs; |
||||
|
||||
// private
|
||||
void try_connect_peer(uint64_t osd_num); |
||||
void try_connect_peer_addr(osd_num_t peer_osd, const char *peer_host, int peer_port); |
||||
void handle_connect_epoll(int peer_fd); |
||||
void handle_peer_epoll(int peer_fd, int epoll_events); |
||||
void on_connect_peer(osd_num_t peer_osd, int peer_fd); |
||||
void check_peer_config(osd_client_t & cl); |
||||
void cancel_osd_ops(osd_client_t & cl); |
||||
void cancel_out_op(osd_op_t *op); |
||||
|
||||
bool try_send(osd_client_t & cl); |
||||
void send_replies(); |
||||
void handle_send(ring_data_t *data, int peer_fd); |
||||
|
||||
void read_requests(); |
||||
void handle_read(ring_data_t *data, int peer_fd); |
||||
void handle_finished_read(osd_client_t & cl); |
||||
void handle_op_hdr(osd_client_t *cl); |
||||
void handle_reply_hdr(osd_client_t *cl); |
||||
}; |