Async connection to peers

blocking-uring-test
Vitaliy Filippov 2020-01-04 01:23:25 +03:00
parent b87092fcfe
commit b6f0c1cde5
2 changed files with 113 additions and 10 deletions

88
osd.cpp
View File

@ -82,6 +82,7 @@ osd_op_t::~osd_op_t()
if (buf)
{
// Note: reusing osd_op_t WILL currently lead to memory leaks
// So we don't reuse it, but free it every time
if (op_type == OSD_OP_IN &&
op.hdr.opcode == OSD_OP_SHOW_CONFIG)
{
@ -134,6 +135,79 @@ void osd_t::loop()
ringloop->submit();
}
void osd_t::connect_peer(unsigned osd_num, char *peer_host, int peer_port, std::function<void(int)> callback)
{
struct sockaddr_in addr;
int r;
if ((r = inet_pton(AF_INET, peer_host, &addr.sin_addr)) != 1)
{
callback(-EINVAL);
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(peer_port ? peer_port : 11203);
int peer_fd = socket(AF_INET, SOCK_STREAM, 0);
if (peer_fd < 0)
{
callback(-errno);
return;
}
fcntl(peer_fd, F_SETFL, fcntl(peer_fd, F_GETFL, 0) | O_NONBLOCK);
r = connect(peer_fd, (sockaddr*)&addr, sizeof(addr));
if (r < 0 && r != EINPROGRESS)
{
close(peer_fd);
callback(-errno);
return;
}
clients[peer_fd] = (osd_client_t){
.peer_addr = addr,
.peer_port = peer_port,
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTING,
.connect_callback = callback,
.osd_num = osd_num,
};
osd_peer_fds[osd_num] = peer_fd;
// Add FD to epoll (EPOLLOUT for tracking connect() result)
epoll_event ev;
ev.data.fd = peer_fd;
ev.events = EPOLLOUT | EPOLLIN | EPOLLRDHUP;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peer_fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
}
void osd_t::handle_connect_result(int peer_fd)
{
auto & cl = clients[peer_fd];
std::function<void(int)> callback = cl.connect_callback;
int result = 0;
socklen_t result_len = sizeof(result);
if (getsockopt(peer_fd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0)
{
result = errno;
}
if (result != 0)
{
stop_client(peer_fd);
callback(-result);
return;
}
// Disable EPOLLOUT on this fd
cl.connect_callback = NULL;
cl.peer_state = PEER_CONNECTED;
epoll_event ev;
ev.data.fd = peer_fd;
ev.events = EPOLLIN | EPOLLRDHUP;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, peer_fd, &ev) < 0)
{
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
callback(peer_fd);
}
int osd_t::handle_epoll_events()
{
epoll_event events[MAX_EPOLL_EVENTS];
@ -153,8 +227,9 @@ int osd_t::handle_epoll_events()
fcntl(peer_fd, F_SETFL, fcntl(listen_fd, F_GETFL, 0) | O_NONBLOCK);
clients[peer_fd] = {
.peer_addr = addr,
.peer_addr_size = peer_addr_size,
.peer_port = ntohs(addr.sin_port),
.peer_fd = peer_fd,
.peer_state = PEER_CONNECTED,
};
// Add FD to epoll
epoll_event ev;
@ -181,6 +256,13 @@ int osd_t::handle_epoll_events()
printf("osd: client %d disconnected\n", cl.peer_fd);
stop_client(cl.peer_fd);
}
else if (cl.peer_state == PEER_CONNECTING)
{
if (events[i].events & EPOLLOUT)
{
handle_connect_result(cl.peer_fd);
}
}
else if (!cl.read_ready)
{
// Mark client as ready (i.e. some data is available)
@ -207,6 +289,10 @@ void osd_t::stop_client(int peer_fd)
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
}
auto it = clients.find(peer_fd);
if (it->second.osd_num)
{
osd_peer_fds.erase(it->second.osd_num);
}
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
{
if (*rit == peer_fd)

35
osd.h
View File

@ -53,11 +53,17 @@ struct osd_op_t
~osd_op_t();
};
#define PEER_CONNECTING 1
#define PEER_CONNECTED 2
struct osd_client_t
{
sockaddr_in peer_addr;
socklen_t peer_addr_size;
int peer_port;
int peer_fd;
int peer_state;
std::function<void(int)> connect_callback;
uint64_t osd_num = 0;
//int in_flight_ops = 0;
// Read state
@ -112,17 +118,22 @@ namespace std
};
}
#define PG_ST_OFFLINE 1
#define PG_ST_PEERING 2
#define PG_ST_INCOMPLETE 3
#define PG_ST_DEGRADED 4
#define PG_ST_MISPLACED 5
#define PG_ST_ACTIVE 6
// Placement group state:
// Exactly one of these:
#define PG_OFFLINE (1<<0)
#define PG_PEERING (1<<1)
#define PG_INCOMPLETE (1<<2)
#define PG_ACTIVE (1<<3)
// Plus any of these:
#define PG_HAS_UNFOUND (1<<4)
#define PG_HAS_DEGRADED (1<<5)
#define PG_HAS_MISPLACED (1<<6)
struct osd_pg_t
{
int state;
unsigned num;
uint64_t n_unfound = 0, n_degraded = 0, n_misplaced = 0;
std::vector<osd_pg_role_t> target_set;
// moved object map. by default, each object is considered to reside on the target_set.
// this map stores all objects that differ.
@ -137,7 +148,7 @@ class osd_t
{
// config
uint64_t osd_num = 0;
uint64_t osd_num = 1; // OSD numbers start with 1
blockstore_config_t config;
std::string bind_address;
int bind_port, listen_backlog;
@ -168,9 +179,9 @@ class osd_t
// methods
// event loop, socket read/write
void loop();
int handle_epoll_events();
void stop_client(int peer_fd);
void read_requests();
void handle_read(ring_data_t *data, int peer_fd);
void handle_read_op(osd_client_t *cl);
@ -179,6 +190,12 @@ class osd_t
void make_reply(osd_op_t *op);
void handle_send(ring_data_t *data, int peer_fd);
// connect/disconnect
void connect_peer(unsigned osd_num, char *peer_host, int peer_port, std::function<void(int)> callback);
void handle_connect_result(int peer_fd);
void stop_client(int peer_fd);
// op execution
void handle_reply(osd_op_t *cur_op);
void exec_op(osd_op_t *cur_op);
void exec_sync_stab_all(osd_op_t *cur_op);