forked from vitalif/vitastor
Cancel outbound operations when disconnecting a peer
parent
a66b34e04d
commit
1100ad44bd
49
osd.cpp
49
osd.cpp
|
@ -222,20 +222,59 @@ int osd_t::handle_epoll_events()
|
|||
return nfds;
|
||||
}
|
||||
|
||||
void osd_t::cancel_osd_ops(osd_client_t & cl)
|
||||
{
|
||||
for (auto p: cl.sent_ops)
|
||||
{
|
||||
cancel_op(p.second);
|
||||
}
|
||||
cl.sent_ops.clear();
|
||||
for (auto op: cl.outbox)
|
||||
{
|
||||
cancel_op(op);
|
||||
}
|
||||
cl.outbox.clear();
|
||||
if (cl.write_op)
|
||||
{
|
||||
cancel_op(cl.write_op);
|
||||
cl.write_op = NULL;
|
||||
cl.write_buf = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::cancel_op(osd_op_t *op)
|
||||
{
|
||||
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
|
||||
op->reply.hdr.id = op->op.hdr.id;
|
||||
op->reply.hdr.opcode = op->op.hdr.opcode;
|
||||
op->reply.hdr.retval = -EPIPE;
|
||||
op->callback(op);
|
||||
}
|
||||
|
||||
void osd_t::stop_client(int peer_fd)
|
||||
{
|
||||
auto it = clients.find(peer_fd);
|
||||
if (it == clients.end())
|
||||
{
|
||||
return;
|
||||
}
|
||||
auto & cl = it->second;
|
||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, peer_fd, NULL) < 0)
|
||||
{
|
||||
throw std::runtime_error(std::string("epoll_ctl: ") + strerror(errno));
|
||||
}
|
||||
auto it = clients.find(peer_fd);
|
||||
if (it->second.osd_num)
|
||||
if (cl.osd_num)
|
||||
{
|
||||
// FIXME cancel outbound operations
|
||||
osd_peer_fds.erase(it->second.osd_num);
|
||||
repeer_pgs(it->second.osd_num, false);
|
||||
// Cancel outbound operations
|
||||
cancel_osd_ops(cl);
|
||||
osd_peer_fds.erase(cl.osd_num);
|
||||
repeer_pgs(cl.osd_num, false);
|
||||
peering_state |= OSD_PEERING_PEERS;
|
||||
}
|
||||
if (cl.read_op)
|
||||
{
|
||||
delete cl.read_op;
|
||||
}
|
||||
for (auto rit = read_ready_clients.begin(); rit != read_ready_clients.end(); rit++)
|
||||
{
|
||||
if (*rit == peer_fd)
|
||||
|
|
2
osd.h
2
osd.h
|
@ -217,6 +217,8 @@ class osd_t
|
|||
// peer handling (primary OSD logic)
|
||||
void connect_peer(osd_num_t osd_num, const char *peer_host, int peer_port, std::function<void(osd_num_t, int)> callback);
|
||||
void handle_connect_result(int peer_fd);
|
||||
void cancel_osd_ops(osd_client_t & cl);
|
||||
void cancel_op(osd_op_t *op);
|
||||
void stop_client(int peer_fd);
|
||||
osd_peer_def_t parse_peer(std::string peer);
|
||||
void init_primary();
|
||||
|
|
|
@ -141,7 +141,6 @@ void osd_t::handle_read_reply(osd_client_t *cl)
|
|||
if (req_it == cl->sent_ops.end())
|
||||
{
|
||||
// Command out of sync. Drop connection
|
||||
// FIXME This is probably a peer, so handle all previously sent operations carefully
|
||||
stop_client(cl->peer_fd);
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue