Shard clean_db by PGs to speedup listings
parent
7cbfdff41a
commit
839ec9e6e0
|
@ -415,8 +415,11 @@ stop_flusher:
|
|||
flusher->active_flushers++;
|
||||
resume_1:
|
||||
// Find it in clean_db
|
||||
clean_it = bs->clean_db.find(cur.oid);
|
||||
old_clean_loc = (clean_it != bs->clean_db.end() ? clean_it->second.location : UINT64_MAX);
|
||||
{
|
||||
auto & clean_db = bs->clean_db_shard(cur.oid);
|
||||
auto clean_it = clean_db.find(cur.oid);
|
||||
old_clean_loc = (clean_it != clean_db.end() ? clean_it->second.location : UINT64_MAX);
|
||||
}
|
||||
// Scan dirty versions of the object
|
||||
if (!scan_dirty(1))
|
||||
{
|
||||
|
@ -870,10 +873,11 @@ void journal_flusher_co::update_clean_db()
|
|||
#endif
|
||||
bs->data_alloc->set(old_clean_loc >> bs->block_order, false);
|
||||
}
|
||||
auto & clean_db = bs->clean_db_shard(cur.oid);
|
||||
if (has_delete)
|
||||
{
|
||||
auto clean_it = bs->clean_db.find(cur.oid);
|
||||
bs->clean_db.erase(clean_it);
|
||||
auto clean_it = clean_db.find(cur.oid);
|
||||
clean_db.erase(clean_it);
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
printf("Free block %lu from %lx:%lx v%lu (delete)\n",
|
||||
clean_loc >> bs->block_order,
|
||||
|
@ -884,7 +888,7 @@ void journal_flusher_co::update_clean_db()
|
|||
}
|
||||
else
|
||||
{
|
||||
bs->clean_db[cur.oid] = {
|
||||
clean_db[cur.oid] = {
|
||||
.version = cur.version,
|
||||
.location = clean_loc,
|
||||
};
|
||||
|
|
|
@ -49,7 +49,6 @@ class journal_flusher_co
|
|||
std::function<void(ring_data_t*)> simple_callback_r, simple_callback_w;
|
||||
|
||||
bool skip_copy, has_delete, has_writes;
|
||||
blockstore_clean_db_t::iterator clean_it;
|
||||
std::vector<copy_buffer_t> v;
|
||||
std::vector<copy_buffer_t>::iterator it;
|
||||
int copy_count;
|
||||
|
|
|
@ -428,22 +428,104 @@ static bool replace_stable(object_id oid, uint64_t version, int search_start, in
|
|||
return false;
|
||||
}
|
||||
|
||||
blockstore_clean_db_t& blockstore_impl_t::clean_db_shard(object_id oid)
|
||||
{
|
||||
uint64_t pg_num = 0;
|
||||
uint64_t pool_id = (oid.inode >> (64-POOL_ID_BITS));
|
||||
auto sh_it = clean_db_settings.find(pool_id);
|
||||
if (sh_it != clean_db_settings.end())
|
||||
{
|
||||
// like map_to_pg()
|
||||
pg_num = (oid.stripe / sh_it->second.pg_stripe_size) % sh_it->second.pg_count + 1;
|
||||
}
|
||||
return clean_db_shards[(pool_id << (64-POOL_ID_BITS)) | pg_num];
|
||||
}
|
||||
|
||||
void blockstore_impl_t::reshard_clean_db(pool_id_t pool, uint32_t pg_count, uint32_t pg_stripe_size)
|
||||
{
|
||||
uint64_t pool_id = (uint64_t)pool;
|
||||
std::map<pool_pg_id_t, blockstore_clean_db_t> new_shards;
|
||||
auto sh_it = clean_db_shards.lower_bound((pool_id << (64-POOL_ID_BITS)));
|
||||
while (sh_it != clean_db_shards.end() &&
|
||||
(sh_it->first >> (64-POOL_ID_BITS)) == pool_id)
|
||||
{
|
||||
for (auto & pair: sh_it->second)
|
||||
{
|
||||
// like map_to_pg()
|
||||
uint64_t pg_num = (pair.first.stripe / pg_stripe_size) % pg_count + 1;
|
||||
uint64_t shard_id = (pool_id << (64-POOL_ID_BITS)) | pg_num;
|
||||
new_shards[shard_id][pair.first] = pair.second;
|
||||
}
|
||||
clean_db_shards.erase(sh_it++);
|
||||
}
|
||||
for (sh_it = new_shards.begin(); sh_it != new_shards.end(); sh_it++)
|
||||
{
|
||||
auto & to = clean_db_shards[sh_it->first];
|
||||
to.swap(sh_it->second);
|
||||
}
|
||||
clean_db_settings[pool_id] = (pool_shard_settings_t){
|
||||
.pg_count = pg_count,
|
||||
.pg_stripe_size = pg_stripe_size,
|
||||
};
|
||||
}
|
||||
|
||||
void blockstore_impl_t::process_list(blockstore_op_t *op)
|
||||
{
|
||||
uint32_t list_pg = op->offset;
|
||||
uint32_t list_pg = op->offset+1;
|
||||
uint32_t pg_count = op->len;
|
||||
uint64_t pg_stripe_size = op->oid.stripe;
|
||||
uint64_t min_inode = op->oid.inode;
|
||||
uint64_t max_inode = op->version;
|
||||
// Check PG
|
||||
if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg >= pg_count))
|
||||
if (pg_count != 0 && (pg_stripe_size < MIN_BLOCK_SIZE || list_pg > pg_count))
|
||||
{
|
||||
op->retval = -EINVAL;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
// Copy clean_db entries (sorted)
|
||||
int stable_count = 0, stable_alloc = clean_db.size() / (pg_count ? pg_count : 1);
|
||||
// Check if the DB needs resharding
|
||||
// (we don't know about PGs from the beginning, we only create "shards" here)
|
||||
uint64_t first_shard = 0, last_shard = UINT64_MAX;
|
||||
if (min_inode != 0 &&
|
||||
// Check if min_inode == max_inode == pool_id<<N, i.e. this is a pool listing
|
||||
(min_inode >> (64-POOL_ID_BITS)) == (max_inode >> (64-POOL_ID_BITS)))
|
||||
{
|
||||
pool_id_t pool_id = (min_inode >> (64-POOL_ID_BITS));
|
||||
if (pg_count > 1)
|
||||
{
|
||||
// Per-pg listing
|
||||
auto sh_it = clean_db_settings.find(pool_id);
|
||||
if (sh_it == clean_db_settings.end() ||
|
||||
sh_it->second.pg_count != pg_count ||
|
||||
sh_it->second.pg_stripe_size != pg_stripe_size)
|
||||
{
|
||||
reshard_clean_db(pool_id, pg_count, pg_stripe_size);
|
||||
}
|
||||
first_shard = last_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS)) | list_pg;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Per-pool listing
|
||||
first_shard = ((uint64_t)pool_id << (64-POOL_ID_BITS));
|
||||
last_shard = ((uint64_t)(pool_id+1) << (64-POOL_ID_BITS)) - 1;
|
||||
}
|
||||
}
|
||||
// Copy clean_db entries
|
||||
int stable_count = 0, stable_alloc = 0;
|
||||
if (min_inode != max_inode)
|
||||
{
|
||||
for (auto shard_it = clean_db_shards.lower_bound(first_shard);
|
||||
shard_it != clean_db_shards.end() && shard_it->first <= last_shard;
|
||||
shard_it++)
|
||||
{
|
||||
auto & clean_db = shard_it->second;
|
||||
stable_alloc += clean_db.size();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
stable_alloc = 32768;
|
||||
}
|
||||
obj_ver_id *stable = (obj_ver_id*)malloc(sizeof(obj_ver_id) * stable_alloc);
|
||||
if (!stable)
|
||||
{
|
||||
|
@ -451,7 +533,11 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
|||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
for (auto shard_it = clean_db_shards.lower_bound(first_shard);
|
||||
shard_it != clean_db_shards.end() && shard_it->first <= last_shard;
|
||||
shard_it++)
|
||||
{
|
||||
auto & clean_db = shard_it->second;
|
||||
auto clean_it = clean_db.begin(), clean_end = clean_db.end();
|
||||
if ((min_inode != 0 || max_inode != 0) && min_inode <= max_inode)
|
||||
{
|
||||
|
@ -466,26 +552,28 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
|||
}
|
||||
for (; clean_it != clean_end; clean_it++)
|
||||
{
|
||||
if (!pg_count || ((clean_it->first.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
|
||||
if (stable_count >= stable_alloc)
|
||||
{
|
||||
if (stable_count >= stable_alloc)
|
||||
stable_alloc *= 2;
|
||||
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
|
||||
if (!stable)
|
||||
{
|
||||
stable_alloc += 32768;
|
||||
stable = (obj_ver_id*)realloc(stable, sizeof(obj_ver_id) * stable_alloc);
|
||||
if (!stable)
|
||||
{
|
||||
op->retval = -ENOMEM;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
op->retval = -ENOMEM;
|
||||
FINISH_OP(op);
|
||||
return;
|
||||
}
|
||||
stable[stable_count++] = {
|
||||
.oid = clean_it->first,
|
||||
.version = clean_it->second.version,
|
||||
};
|
||||
}
|
||||
stable[stable_count++] = {
|
||||
.oid = clean_it->first,
|
||||
.version = clean_it->second.version,
|
||||
};
|
||||
}
|
||||
}
|
||||
if (first_shard != last_shard)
|
||||
{
|
||||
// If that's not a per-PG listing, sort clean entries
|
||||
std::sort(stable, stable+stable_count);
|
||||
}
|
||||
int clean_stable_count = stable_count;
|
||||
// Copy dirty_db entries (sorted, too)
|
||||
int unstable_count = 0, unstable_alloc = 0;
|
||||
|
@ -511,7 +599,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
|
|||
}
|
||||
for (; dirty_it != dirty_end; dirty_it++)
|
||||
{
|
||||
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count) == list_pg) // like map_to_pg()
|
||||
if (!pg_count || ((dirty_it->first.oid.stripe / pg_stripe_size) % pg_count + 1) == list_pg) // like map_to_pg()
|
||||
{
|
||||
if (IS_DELETE(dirty_it->second.state))
|
||||
{
|
||||
|
|
|
@ -204,6 +204,17 @@ typedef std::map<obj_ver_id, dirty_entry> blockstore_dirty_db_t;
|
|||
|
||||
#include "blockstore_flush.h"
|
||||
|
||||
typedef uint32_t pool_id_t;
|
||||
typedef uint64_t pool_pg_id_t;
|
||||
|
||||
#define POOL_ID_BITS 16
|
||||
|
||||
struct pool_shard_settings_t
|
||||
{
|
||||
uint32_t pg_count;
|
||||
uint32_t pg_stripe_size;
|
||||
};
|
||||
|
||||
class blockstore_impl_t
|
||||
{
|
||||
/******* OPTIONS *******/
|
||||
|
@ -247,7 +258,8 @@ class blockstore_impl_t
|
|||
|
||||
struct ring_consumer_t ring_consumer;
|
||||
|
||||
blockstore_clean_db_t clean_db;
|
||||
std::map<pool_id_t, pool_shard_settings_t> clean_db_settings;
|
||||
std::map<pool_pg_id_t, blockstore_clean_db_t> clean_db_shards;
|
||||
uint8_t *clean_bitmap = NULL;
|
||||
blockstore_dirty_db_t dirty_db;
|
||||
std::vector<blockstore_op_t*> submit_queue;
|
||||
|
@ -296,6 +308,9 @@ class blockstore_impl_t
|
|||
void open_journal();
|
||||
uint8_t* get_clean_entry_bitmap(uint64_t block_loc, int offset);
|
||||
|
||||
blockstore_clean_db_t& clean_db_shard(object_id oid);
|
||||
void reshard_clean_db(pool_id_t pool_id, uint32_t pg_count, uint32_t pg_stripe_size);
|
||||
|
||||
// Journaling
|
||||
void prepare_journal_sector_write(int sector, blockstore_op_t *op);
|
||||
void handle_journal_write(ring_data_t *data, uint64_t flush_id);
|
||||
|
|
|
@ -222,10 +222,11 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
|
|||
}
|
||||
if (entry->oid.inode > 0)
|
||||
{
|
||||
auto clean_it = bs->clean_db.find(entry->oid);
|
||||
if (clean_it == bs->clean_db.end() || clean_it->second.version < entry->version)
|
||||
auto & clean_db = bs->clean_db_shard(entry->oid);
|
||||
auto clean_it = clean_db.find(entry->oid);
|
||||
if (clean_it == clean_db.end() || clean_it->second.version < entry->version)
|
||||
{
|
||||
if (clean_it != bs->clean_db.end())
|
||||
if (clean_it != clean_db.end())
|
||||
{
|
||||
// free the previous block
|
||||
#ifdef BLOCKSTORE_DEBUG
|
||||
|
@ -245,7 +246,7 @@ void blockstore_init_meta::handle_entries(void* entries, unsigned count, int blo
|
|||
printf("Allocate block (clean entry) %lu: %lx:%lx v%lu\n", done_cnt+i, entry->oid.inode, entry->oid.stripe, entry->version);
|
||||
#endif
|
||||
bs->data_alloc->set(done_cnt+i, true);
|
||||
bs->clean_db[entry->oid] = (struct clean_entry){
|
||||
clean_db[entry->oid] = (struct clean_entry){
|
||||
.version = entry->version,
|
||||
.location = (done_cnt+i) << block_order,
|
||||
};
|
||||
|
@ -656,8 +657,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
|||
init_write_sector = proc_pos;
|
||||
return 0;
|
||||
}
|
||||
auto clean_it = bs->clean_db.find(je->small_write.oid);
|
||||
if (clean_it == bs->clean_db.end() ||
|
||||
auto & clean_db = bs->clean_db_shard(je->small_write.oid);
|
||||
auto clean_it = clean_db.find(je->small_write.oid);
|
||||
if (clean_it == clean_db.end() ||
|
||||
clean_it->second.version < je->small_write.version)
|
||||
{
|
||||
obj_ver_id ov = {
|
||||
|
@ -735,8 +737,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
|||
erase_dirty_object(dirty_it);
|
||||
}
|
||||
}
|
||||
auto clean_it = bs->clean_db.find(je->big_write.oid);
|
||||
if (clean_it == bs->clean_db.end() ||
|
||||
auto & clean_db = bs->clean_db_shard(je->big_write.oid);
|
||||
auto clean_it = clean_db.find(je->big_write.oid);
|
||||
if (clean_it == clean_db.end() ||
|
||||
clean_it->second.version < je->big_write.version)
|
||||
{
|
||||
// oid, version, block
|
||||
|
@ -841,8 +844,9 @@ int blockstore_init_journal::handle_journal_part(void *buf, uint64_t done_pos, u
|
|||
dirty_it--;
|
||||
dirty_exists = dirty_it->first.oid == je->del.oid;
|
||||
}
|
||||
auto clean_it = bs->clean_db.find(je->del.oid);
|
||||
bool clean_exists = (clean_it != bs->clean_db.end() &&
|
||||
auto & clean_db = bs->clean_db_shard(je->del.oid);
|
||||
auto clean_it = clean_db.find(je->del.oid);
|
||||
bool clean_exists = (clean_it != clean_db.end() &&
|
||||
clean_it->second.version < je->del.version);
|
||||
if (!clean_exists && dirty_exists)
|
||||
{
|
||||
|
@ -901,8 +905,9 @@ void blockstore_init_journal::erase_dirty_object(blockstore_dirty_db_t::iterator
|
|||
break;
|
||||
}
|
||||
}
|
||||
auto clean_it = bs->clean_db.find(oid);
|
||||
uint64_t clean_loc = clean_it != bs->clean_db.end()
|
||||
auto & clean_db = bs->clean_db_shard(oid);
|
||||
auto clean_it = clean_db.find(oid);
|
||||
uint64_t clean_loc = clean_it != clean_db.end()
|
||||
? clean_it->second.location : UINT64_MAX;
|
||||
if (exists && clean_loc == UINT64_MAX)
|
||||
{
|
||||
|
|
|
@ -111,6 +111,7 @@ uint8_t* blockstore_impl_t::get_clean_entry_bitmap(uint64_t block_loc, int offse
|
|||
|
||||
int blockstore_impl_t::dequeue_read(blockstore_op_t *read_op)
|
||||
{
|
||||
auto & clean_db = clean_db_shard(read_op->oid);
|
||||
auto clean_it = clean_db.find(read_op->oid);
|
||||
auto dirty_it = dirty_db.upper_bound((obj_ver_id){
|
||||
.oid = read_op->oid,
|
||||
|
@ -297,6 +298,7 @@ int blockstore_impl_t::read_bitmap(object_id oid, uint64_t target_version, void
|
|||
dirty_it--;
|
||||
}
|
||||
}
|
||||
auto & clean_db = clean_db_shard(oid);
|
||||
auto clean_it = clean_db.find(oid);
|
||||
if (clean_it != clean_db.end())
|
||||
{
|
||||
|
|
|
@ -54,6 +54,7 @@ int blockstore_impl_t::dequeue_stable(blockstore_op_t *op)
|
|||
auto dirty_it = dirty_db.find(*v);
|
||||
if (dirty_it == dirty_db.end())
|
||||
{
|
||||
auto & clean_db = clean_db_shard(v->oid);
|
||||
auto clean_it = clean_db.find(v->oid);
|
||||
if (clean_it == clean_db.end() || clean_it->second.version < v->version)
|
||||
{
|
||||
|
@ -188,6 +189,7 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
|
|||
}
|
||||
if (exists == -1)
|
||||
{
|
||||
auto & clean_db = clean_db_shard(v.oid);
|
||||
auto clean_it = clean_db.find(v.oid);
|
||||
exists = clean_it != clean_db.end() ? 1 : 0;
|
||||
}
|
||||
|
@ -215,6 +217,7 @@ void blockstore_impl_t::mark_stable(const obj_ver_id & v, bool forget_dirty)
|
|||
break;
|
||||
}
|
||||
}
|
||||
auto & clean_db = clean_db_shard(v.oid);
|
||||
auto clean_it = clean_db.find(v.oid);
|
||||
uint64_t clean_loc = clean_it != clean_db.end()
|
||||
? clean_it->second.location : UINT64_MAX;
|
||||
|
|
|
@ -41,6 +41,7 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op)
|
|||
}
|
||||
if (!found)
|
||||
{
|
||||
auto & clean_db = clean_db_shard(op->oid);
|
||||
auto clean_it = clean_db.find(op->oid);
|
||||
if (clean_it != clean_db.end())
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue