Implement object listing with controllable parallelism in cluster_client

nbd-vmsplice
Vitaliy Filippov 2021-07-10 21:47:23 +03:00
parent 712576ca75
commit 7d3d696110
4 changed files with 308 additions and 1 deletions

View File

@ -116,6 +116,7 @@ endif (${WITH_FIO})
# libvitastor_client.so
add_library(vitastor_client SHARED
cluster_client.cpp
cluster_client_list.cpp
vitastor_c.cpp
)
set_target_properties(vitastor_client PROPERTIES PUBLIC_HEADER "vitastor_c.h")
@ -220,7 +221,7 @@ target_link_libraries(test_cas
# test_cluster_client
add_executable(test_cluster_client
test_cluster_client.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
pg_states.cpp osd_ops.cpp cluster_client.cpp cluster_client_list.cpp msgr_op.cpp mock/messenger.cpp msgr_stop.cpp
etcd_state_client.cpp timerfd_manager.cpp ../json11/json11.cpp
)
target_compile_definitions(test_cluster_client PUBLIC -D__MOCK__)

View File

@ -31,6 +31,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
{
// peer_osd just connected
continue_ops();
continue_lists();
}
else if (dirty_buffers.size())
{
@ -1161,3 +1162,8 @@ void cluster_client_t::copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *par
part_len--;
}
}
uint64_t cluster_client_t::next_op_id()
{
return op_id++;
}

View File

@ -10,6 +10,8 @@
#define MAX_BLOCK_SIZE 128*1024*1024
#define DEFAULT_CLIENT_MAX_DIRTY_BYTES 32*1024*1024
#define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024
#define INODE_LIST_DONE 1
#define INODE_LIST_HAS_UNSTABLE 2
struct cluster_op_t;
@ -62,6 +64,9 @@ struct cluster_buffer_t
int state;
};
struct inode_list_t;
struct inode_list_osd_t;
// FIXME: Split into public and private interfaces
class cluster_client_t
{
@ -93,6 +98,7 @@ class cluster_client_t
bool pgs_loaded = false;
ring_consumer_t consumer;
std::vector<std::function<void(void)>> on_ready_hooks;
std::vector<inode_list_t*> lists;
int continuing_ops = 0;
public:
@ -108,6 +114,12 @@ public:
static void copy_write(cluster_op_t *op, std::map<object_id, cluster_buffer_t> & dirty_buffers);
void continue_ops(bool up_retry = false);
inode_list_t *list_inode_start(inode_t inode,
std::function<void(std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback);
int list_pg_count(inode_list_t *lst);
void list_inode_next(inode_list_t *lst, int next_pgs);
uint64_t next_op_id();
protected:
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
void flush_buffer(const object_id & oid, cluster_buffer_t *wr);
@ -125,4 +137,7 @@ protected:
void erase_op(cluster_op_t *op);
void calc_wait(cluster_op_t *op);
void inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc);
void continue_lists();
void continue_listing(inode_list_t *lst);
void send_list(inode_list_osd_t *cur_list);
};

285
src/cluster_client_list.cpp Normal file
View File

@ -0,0 +1,285 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#include <algorithm>
#include "pg_states.h"
#include "cluster_client.h"
struct inode_list_t;
struct inode_list_pg_t;
struct inode_list_osd_t
{
inode_list_pg_t *pg = NULL;
osd_num_t osd_num = 0;
bool sent = false;
};
struct inode_list_pg_t
{
inode_list_t *lst = NULL;
int pos = 0;
pg_num_t pg_num;
osd_num_t cur_primary;
bool has_unstable = false;
int sent = 0;
int done = 0;
std::vector<inode_list_osd_t> list_osds;
std::set<object_id> objects;
};
struct inode_list_t
{
cluster_client_t *cli = NULL;
pool_id_t pool_id = 0;
inode_t inode = 0;
int done_pgs = 0;
int want = 0;
std::vector<inode_list_pg_t*> pgs;
std::function<void(std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback;
};
inode_list_t* cluster_client_t::list_inode_start(inode_t inode,
std::function<void(std::set<object_id>&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback)
{
int skipped_pgs = 0;
pool_id_t pool_id = INODE_POOL(inode);
if (!pool_id || st_cli.pool_config.find(pool_id) == st_cli.pool_config.end())
{
if (log_level > 0)
{
fprintf(stderr, "Pool %u does not exist\n", pool_id);
}
return NULL;
}
inode_list_t *lst = new inode_list_t();
lst->cli = this;
lst->pool_id = pool_id;
lst->inode = inode;
lst->callback = callback;
auto pool_cfg = st_cli.pool_config[pool_id];
for (auto & pg_item: pool_cfg.pg_config)
{
auto & pg = pg_item.second;
if (pg.pause || !pg.cur_primary || !(pg.cur_state & PG_ACTIVE))
{
skipped_pgs++;
if (log_level > 0)
{
fprintf(stderr, "PG %u is inactive, skipping\n", pg_item.first);
}
continue;
}
inode_list_pg_t *r = new inode_list_pg_t();
r->lst = lst;
r->pg_num = pg_item.first;
r->cur_primary = pg.cur_primary;
if (pg.cur_state != PG_ACTIVE)
{
// Not clean
std::set<osd_num_t> all_peers;
for (osd_num_t pg_osd: pg.target_set)
{
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
for (osd_num_t pg_osd: pg.all_peers)
{
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
for (auto & hist_item: pg.target_history)
{
for (auto pg_osd: hist_item)
{
if (pg_osd != 0)
{
all_peers.insert(pg_osd);
}
}
}
for (osd_num_t peer_osd: all_peers)
{
r->list_osds.push_back((inode_list_osd_t){
.pg = r,
.osd_num = peer_osd,
.sent = false,
});
}
}
else
{
// Clean
r->list_osds.push_back((inode_list_osd_t){
.pg = r,
.osd_num = pg.cur_primary,
.sent = false,
});
}
lst->pgs.push_back(r);
}
std::sort(lst->pgs.begin(), lst->pgs.end(), [](inode_list_pg_t *a, inode_list_pg_t *b)
{
return a->cur_primary < b->cur_primary ? true : false;
});
for (int i = 0; i < lst->pgs.size(); i++)
{
lst->pgs[i]->pos = i;
}
lists.push_back(lst);
return lst;
}
int cluster_client_t::list_pg_count(inode_list_t *lst)
{
return lst->pgs.size();
}
void cluster_client_t::list_inode_next(inode_list_t *lst, int next_pgs)
{
if (next_pgs >= 0)
{
lst->want += next_pgs;
}
continue_listing(lst);
}
void cluster_client_t::continue_listing(inode_list_t *lst)
{
if (lst->done_pgs >= lst->pgs.size())
{
// All done
for (int i = 0; i < lists.size(); i++)
{
if (lists[i] == lst)
{
lists.erase(lists.begin()+i, lists.begin()+i+1);
break;
}
}
delete lst;
return;
}
if (lst->want <= 0)
{
return;
}
for (int i = 0; i < lst->pgs.size(); i++)
{
if (lst->pgs[i] && lst->pgs[i]->sent < lst->pgs[i]->list_osds.size())
{
for (int j = 0; j < lst->pgs[i]->list_osds.size(); j++)
{
send_list(&lst->pgs[i]->list_osds[j]);
if (lst->want <= 0)
{
break;
}
}
}
}
}
void cluster_client_t::send_list(inode_list_osd_t *cur_list)
{
if (cur_list->sent)
{
return;
}
if (msgr.osd_peer_fds.find(cur_list->osd_num) == msgr.osd_peer_fds.end())
{
// Initiate connection
msgr.connect_peer(cur_list->osd_num, st_cli.peer_states[cur_list->osd_num]);
return;
}
auto & pool_cfg = st_cli.pool_config[cur_list->pg->lst->pool_id];
osd_op_t *op = new osd_op_t();
op->op_type = OSD_OP_OUT;
op->peer_fd = msgr.osd_peer_fds[cur_list->osd_num];
op->req = (osd_any_op_t){
.sec_list = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = op_id++,
.opcode = OSD_OP_SEC_LIST,
},
.list_pg = cur_list->pg->pg_num,
.pg_count = (pg_num_t)pool_cfg.real_pg_count,
.pg_stripe_size = pool_cfg.pg_stripe_size,
.min_inode = cur_list->pg->lst->inode,
.max_inode = cur_list->pg->lst->inode,
},
};
op->callback = [this, cur_list](osd_op_t *op)
{
if (op->reply.hdr.retval < 0)
{
fprintf(stderr, "Failed to get PG %u/%u object list from OSD %lu (retval=%ld), skipping\n",
cur_list->pg->lst->pool_id, cur_list->pg->pg_num, cur_list->osd_num, op->reply.hdr.retval);
}
else
{
if (op->reply.sec_list.stable_count < op->reply.hdr.retval)
{
// Unstable objects, if present, mean that someone still writes into the inode. Warn the user about it.
cur_list->pg->has_unstable = true;
fprintf(
stderr, "[PG %u/%u] Inode still has %lu unstable object versions out of total %lu - is it still open?\n",
cur_list->pg->lst->pool_id, cur_list->pg->pg_num, op->reply.hdr.retval - op->reply.sec_list.stable_count,
op->reply.hdr.retval
);
}
if (log_level > 0)
{
fprintf(
stderr, "[PG %u/%u] Got inode object list from OSD %lu: %ld object versions\n",
cur_list->pg->lst->pool_id, cur_list->pg->pg_num, cur_list->osd_num, op->reply.hdr.retval
);
}
for (uint64_t i = 0; i < op->reply.hdr.retval; i++)
{
object_id oid = ((obj_ver_id*)op->buf)[i].oid;
oid.stripe = oid.stripe & ~STRIPE_MASK;
cur_list->pg->objects.insert(oid);
}
}
delete op;
auto lst = cur_list->pg->lst;
auto pg = cur_list->pg;
pg->done++;
if (pg->done >= pg->list_osds.size())
{
int status = 0;
lst->done_pgs++;
if (lst->done_pgs >= lst->pgs.size())
{
status |= INODE_LIST_DONE;
}
if (pg->has_unstable)
{
status |= INODE_LIST_HAS_UNSTABLE;
}
lst->callback(std::move(pg->objects), pg->pg_num, pg->cur_primary, status);
lst->pgs[pg->pos] = NULL;
delete pg;
}
continue_listing(lst);
};
msgr.outbox_push(op);
cur_list->sent = true;
cur_list->pg->sent++;
cur_list->pg->lst->want--;
}
void cluster_client_t::continue_lists()
{
for (auto lst: lists)
{
continue_listing(lst);
}
}