forked from vitalif/vitastor
Read object lists from peers and own blockstore
parent
8c05ee252c
commit
a8bc44064d
|
@ -62,6 +62,7 @@ bool blockstore_impl_t::is_stalled()
|
|||
// main event loop - produce requests
|
||||
void blockstore_impl_t::loop()
|
||||
{
|
||||
// FIXME: initialized == 10 is ugly
|
||||
if (initialized != 10)
|
||||
{
|
||||
// read metadata, then journal
|
||||
|
@ -89,6 +90,7 @@ void blockstore_impl_t::loop()
|
|||
delete journal_init_reader;
|
||||
journal_init_reader = NULL;
|
||||
initialized = 10;
|
||||
ringloop->wakeup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -168,6 +170,11 @@ void blockstore_impl_t::loop()
|
|||
{
|
||||
dequeue_op = dequeue_stable(op);
|
||||
}
|
||||
else if ((op->opcode & BS_OP_TYPE_MASK) == BS_OP_LIST)
|
||||
{
|
||||
process_list(op);
|
||||
dequeue_op = true;
|
||||
}
|
||||
if (dequeue_op)
|
||||
{
|
||||
submit_queue.erase(op_ptr);
|
||||
|
@ -304,13 +311,6 @@ void blockstore_impl_t::enqueue_op(blockstore_op_t *op, bool first)
|
|||
op->callback(op);
|
||||
return;
|
||||
}
|
||||
else if (type == BS_OP_LIST)
|
||||
{
|
||||
// List operation is processed synchronously
|
||||
process_list(op);
|
||||
op->callback(op);
|
||||
return;
|
||||
}
|
||||
// Call constructor without allocating memory. We'll call destructor before returning op back
|
||||
new ((void*)op->private_data) blockstore_op_private_t;
|
||||
PRIV(op)->wait_for = 0;
|
||||
|
@ -397,4 +397,5 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
|||
}
|
||||
}
|
||||
}
|
||||
FINISH_OP(op);
|
||||
}
|
||||
|
|
15
osd.h
15
osd.h
|
@ -52,6 +52,7 @@ struct osd_op_t
|
|||
};
|
||||
blockstore_op_t bs_op;
|
||||
void *buf = NULL;
|
||||
std::function<void(osd_op_t*)> callback;
|
||||
|
||||
~osd_op_t();
|
||||
};
|
||||
|
@ -83,8 +84,8 @@ struct osd_client_t
|
|||
// Outbound operations sent to this client (which is probably an OSD peer)
|
||||
std::map<int, osd_op_t*> sent_ops;
|
||||
|
||||
// Completed operations to send replies back to the client
|
||||
std::deque<osd_op_t*> completions;
|
||||
// Outbound messages (replies or requests)
|
||||
std::deque<osd_op_t*> outbox;
|
||||
|
||||
// Write state
|
||||
osd_op_t *write_op = NULL;
|
||||
|
@ -141,9 +142,14 @@ namespace std
|
|||
#define OSD_HALF_STABLE 0x10000
|
||||
#define OSD_NEEDS_ROLLBACK 0x20000
|
||||
|
||||
class osd_t;
|
||||
|
||||
struct osd_pg_peering_state_t
|
||||
{
|
||||
osd_t* self;
|
||||
uint64_t pg_num;
|
||||
std::unordered_map<uint64_t, osd_op_t*> list_ops;
|
||||
int list_done = 0;
|
||||
};
|
||||
|
||||
struct osd_pg_t
|
||||
|
@ -187,7 +193,7 @@ class osd_t
|
|||
|
||||
std::map<uint64_t, int> osd_peer_fds;
|
||||
std::vector<osd_pg_t> pgs;
|
||||
bool needs_peering = false;
|
||||
int peering_state = 0;
|
||||
unsigned pg_count = 0;
|
||||
|
||||
// client & peer I/O
|
||||
|
@ -218,6 +224,7 @@ class osd_t
|
|||
void send_replies();
|
||||
void make_reply(osd_op_t *op);
|
||||
void handle_send(ring_data_t *data, int peer_fd);
|
||||
void outbox_push(osd_client_t & cl, osd_op_t *op);
|
||||
|
||||
// peer handling (primary OSD logic)
|
||||
void connect_peer(unsigned osd_num, const char *peer_host, int peer_port, std::function<void(int)> callback);
|
||||
|
@ -226,9 +233,9 @@ class osd_t
|
|||
osd_peer_def_t parse_peer(std::string peer);
|
||||
void init_primary();
|
||||
void handle_peers();
|
||||
void start_pg_peering(int i);
|
||||
|
||||
// op execution
|
||||
void handle_reply(osd_op_t *cur_op);
|
||||
void exec_op(osd_op_t *cur_op);
|
||||
void exec_sync_stab_all(osd_op_t *cur_op);
|
||||
void exec_show_config(osd_op_t *cur_op);
|
||||
|
|
|
@ -2,11 +2,6 @@
|
|||
|
||||
#include "json11/json11.hpp"
|
||||
|
||||
void osd_t::handle_reply(osd_op_t *cur_op)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void osd_t::secondary_op_callback(osd_op_t *cur_op)
|
||||
{
|
||||
inflight_ops--;
|
||||
|
@ -14,14 +9,8 @@ void osd_t::secondary_op_callback(osd_op_t *cur_op)
|
|||
if (cl_it != clients.end())
|
||||
{
|
||||
auto & cl = cl_it->second;
|
||||
if (cl.write_state == 0)
|
||||
{
|
||||
cl.write_state = CL_WRITE_READY;
|
||||
write_ready_clients.push_back(cur_op->peer_fd);
|
||||
}
|
||||
make_reply(cur_op);
|
||||
cl.completions.push_back(cur_op);
|
||||
ringloop->wakeup();
|
||||
outbox_push(cl, cur_op);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -80,7 +69,7 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
|
|||
cl.write_state = CL_WRITE_READY;
|
||||
write_ready_clients.push_back(cur_op->peer_fd);
|
||||
make_reply(cur_op);
|
||||
cl.completions.push_back(cur_op);
|
||||
cl.outbox.push_back(cur_op);
|
||||
ringloop->wakeup();
|
||||
}
|
||||
|
||||
|
|
103
osd_peering.cpp
103
osd_peering.cpp
|
@ -18,7 +18,7 @@ void osd_t::init_primary()
|
|||
.object_map = spp::sparse_hash_map<object_id, int>(),
|
||||
});
|
||||
pg_count = 1;
|
||||
needs_peering = true;
|
||||
peering_state = 1;
|
||||
}
|
||||
|
||||
osd_peer_def_t osd_t::parse_peer(std::string peer)
|
||||
|
@ -120,7 +120,7 @@ void osd_t::handle_connect_result(int peer_fd)
|
|||
// Ideally: Connect -> Ask & check config -> Start PG peering
|
||||
void osd_t::handle_peers()
|
||||
{
|
||||
if (needs_peering)
|
||||
if (peering_state & 1)
|
||||
{
|
||||
for (int i = 0; i < peers.size(); i++)
|
||||
{
|
||||
|
@ -131,27 +131,100 @@ void osd_t::handle_peers()
|
|||
connect_peer(peers[i].osd_num, peers[i].addr.c_str(), peers[i].port, [this](int peer_fd)
|
||||
{
|
||||
printf("Connected with peer OSD %lu (fd %d)\n", clients[peer_fd].osd_num, peer_fd);
|
||||
// Restart PG peering
|
||||
pgs[0].state = PG_PEERING;
|
||||
pgs[0].acting_set_ids.clear();
|
||||
pgs[0].acting_sets.clear();
|
||||
pgs[0].object_map.clear();
|
||||
if (pgs[0].peering_state)
|
||||
delete pgs[0].peering_state;
|
||||
ringloop->wakeup();
|
||||
int i;
|
||||
for (i = 0; i < peers.size(); i++)
|
||||
{
|
||||
auto it = osd_peer_fds.find(peers[i].osd_num);
|
||||
if (it == osd_peer_fds.end() || clients[it->second].peer_state != PEER_CONNECTED)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (i >= peers.size())
|
||||
{
|
||||
// Start PG peering
|
||||
pgs[0].state = PG_PEERING;
|
||||
pgs[0].acting_set_ids.clear();
|
||||
pgs[0].acting_sets.clear();
|
||||
pgs[0].object_map.clear();
|
||||
if (pgs[0].peering_state)
|
||||
delete pgs[0].peering_state;
|
||||
peering_state = 2;
|
||||
ringloop->wakeup();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < pgs.size(); i++)
|
||||
if (peering_state & 2)
|
||||
{
|
||||
if (pgs[i].state == PG_PEERING)
|
||||
for (int i = 0; i < pgs.size(); i++)
|
||||
{
|
||||
if (!pgs[i].peering_state)
|
||||
if (pgs[i].state == PG_PEERING)
|
||||
{
|
||||
pgs[i].peering_state = new osd_pg_peering_state_t();
|
||||
|
||||
if (!pgs[i].peering_state)
|
||||
{
|
||||
start_pg_peering(i);
|
||||
}
|
||||
else if (pgs[i].peering_state->list_done >= 3)
|
||||
{
|
||||
// FIXME
|
||||
peering_state = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void osd_t::start_pg_peering(int pg_idx)
|
||||
{
|
||||
auto & pg = pgs[pg_idx];
|
||||
auto ps = pg.peering_state = new osd_pg_peering_state_t();
|
||||
ps->self = this;
|
||||
ps->pg_num = pg_idx; // FIXME probably shouldn't be pg_idx
|
||||
{
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = 0;
|
||||
op->peer_fd = 0;
|
||||
op->bs_op.opcode = BS_OP_LIST;
|
||||
op->bs_op.callback = [ps, op](blockstore_op_t *bs_op)
|
||||
{
|
||||
printf(
|
||||
"Got object list from OSD %lu (local): %d objects (%lu of them stable)\n",
|
||||
ps->self->osd_num, bs_op->retval, bs_op->version
|
||||
);
|
||||
ps->list_done++;
|
||||
};
|
||||
pg.peering_state->list_ops[osd_num] = op;
|
||||
bs->enqueue_op(&op->bs_op);
|
||||
}
|
||||
for (int i = 0; i < peers.size(); i++)
|
||||
{
|
||||
auto & cl = clients[osd_peer_fds[peers[i].osd_num]];
|
||||
osd_op_t *op = new osd_op_t();
|
||||
op->op_type = OSD_OP_OUT;
|
||||
op->peer_fd = cl.peer_fd;
|
||||
op->op = {
|
||||
.sec_list = {
|
||||
.header = {
|
||||
.magic = SECONDARY_OSD_OP_MAGIC,
|
||||
.id = 1,
|
||||
.opcode = OSD_OP_SECONDARY_LIST,
|
||||
},
|
||||
.pgnum = 1,
|
||||
.pgtotal = 1,
|
||||
},
|
||||
};
|
||||
op->callback = [ps](osd_op_t *op)
|
||||
{
|
||||
printf(
|
||||
"Got object list from OSD %lu: %ld objects (%lu of them stable)\n",
|
||||
ps->self->clients[op->peer_fd].osd_num, op->reply.hdr.retval,
|
||||
op->reply.sec_list.stable_count
|
||||
);
|
||||
ps->list_done++;
|
||||
};
|
||||
pg.peering_state->list_ops[cl.osd_num] = op;
|
||||
outbox_push(cl, op);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ void osd_t::handle_read(ring_data_t *data, int peer_fd)
|
|||
cl.sent_ops.erase(req_it);
|
||||
cl.read_reply_id = 0;
|
||||
cl.read_state = 0;
|
||||
handle_reply(request);
|
||||
request->callback(request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -168,6 +168,6 @@ void osd_t::handle_read_reply(osd_client_t *cl)
|
|||
{
|
||||
cl->read_state = 0;
|
||||
cl->sent_ops.erase(req_it);
|
||||
handle_reply(request);
|
||||
request->callback(request);
|
||||
}
|
||||
}
|
||||
|
|
17
osd_send.cpp
17
osd_send.cpp
|
@ -1,5 +1,16 @@
|
|||
#include "osd.h"
|
||||
|
||||
void osd_t::outbox_push(osd_client_t & cl, osd_op_t *cur_op)
|
||||
{
|
||||
if (cl.write_state == 0)
|
||||
{
|
||||
cl.write_state = CL_WRITE_READY;
|
||||
write_ready_clients.push_back(cur_op->peer_fd);
|
||||
}
|
||||
cl.outbox.push_back(cur_op);
|
||||
ringloop->wakeup();
|
||||
}
|
||||
|
||||
void osd_t::send_replies()
|
||||
{
|
||||
for (int i = 0; i < write_ready_clients.size(); i++)
|
||||
|
@ -16,8 +27,8 @@ void osd_t::send_replies()
|
|||
if (!cl.write_buf)
|
||||
{
|
||||
// pick next command
|
||||
cl.write_op = cl.completions.front();
|
||||
cl.completions.pop_front();
|
||||
cl.write_op = cl.outbox.front();
|
||||
cl.outbox.pop_front();
|
||||
if (cl.write_op->op_type == OSD_OP_OUT)
|
||||
{
|
||||
cl.write_buf = &cl.write_op->op_buf;
|
||||
|
@ -135,7 +146,7 @@ void osd_t::handle_send(ring_data_t *data, int peer_fd)
|
|||
cl.sent_ops[cl.write_op->op.hdr.id] = cl.write_op;
|
||||
}
|
||||
cl.write_op = NULL;
|
||||
cl.write_state = cl.completions.size() > 0 ? CL_WRITE_READY : 0;
|
||||
cl.write_state = cl.outbox.size() > 0 ? CL_WRITE_READY : 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue