Browse Source

Implement chained (optimized) read in the primary OSD code

tags/v0.6.0
Vitaliy Filippov 1 month ago
parent
commit
38a3df4a0e
11 changed files with 768 additions and 78 deletions
  1. +1
    -1
      src/CMakeLists.txt
  2. +1
    -0
      src/cluster_client.cpp
  3. +5
    -0
      src/osd.cpp
  4. +36
    -1
      src/osd.h
  5. +4
    -0
      src/osd_ops.h
  6. +78
    -10
      src/osd_primary.cpp
  7. +25
    -9
      src/osd_primary.h
  8. +549
    -0
      src/osd_primary_chain.cpp
  9. +54
    -45
      src/osd_primary_subops.cpp
  10. +2
    -2
      src/osd_primary_write.cpp
  11. +13
    -10
      src/osd_rmw.cpp

+ 1
- 1
src/CMakeLists.txt View File

@@ -66,7 +66,7 @@ target_link_libraries(fio_vitastor_blk
# vitastor-osd
add_executable(vitastor-osd
osd_main.cpp osd.cpp osd_secondary.cpp msgr_receive.cpp msgr_send.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp
osd_primary.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp
osd_primary.cpp osd_primary_chain.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp
etcd_state_client.cpp messenger.cpp msgr_stop.cpp msgr_op.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp
osd_rmw.cpp base64.cpp timerfd_manager.cpp epoll_manager.cpp ../json11/json11.cpp
)


+ 1
- 0
src/cluster_client.cpp View File

@@ -811,6 +811,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i)
.inode = op->cur_inode,
.offset = part->offset,
.len = part->len,
.meta_revision = 0,
} },
.bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i,
.bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size),


+ 5
- 0
src/osd.cpp View File

@@ -20,6 +20,10 @@ osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop)
bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY;
clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8;

zero_buffer_size = 1<<20;
zero_buffer = malloc_or_die(zero_buffer_size);
memset(zero_buffer, 0, zero_buffer_size);

this->config = config;
this->ringloop = ringloop;

@@ -58,6 +62,7 @@ osd_t::~osd_t()
delete epmgr;
delete bs;
close(listen_fd);
free(zero_buffer);
}

void osd_t::parse_config(blockstore_config_t & config)


+ 36
- 1
src/osd.h View File

@@ -66,6 +66,28 @@ struct inode_stats_t
uint64_t op_bytes[3] = { 0 };
};

struct bitmap_request_t
{
osd_num_t osd_num;
object_id oid;
uint64_t version;
void *bmp_buf;
};

inline bool operator < (const bitmap_request_t & a, const bitmap_request_t & b)
{
return a.osd_num < b.osd_num || a.osd_num == b.osd_num && a.oid < b.oid;
}

struct osd_chain_read_t
{
int chain_pos;
inode_t inode;
uint32_t offset, len;
};

struct osd_rmw_stripe_t;

class osd_t
{
// config
@@ -126,6 +148,8 @@ class osd_t
bool stopping = false;
int inflight_ops = 0;
blockstore_t *bs;
void *zero_buffer = NULL;
uint64_t zero_buffer_size = 0;
uint32_t bs_block_size, bs_bitmap_granularity, clean_entry_bitmap_size;
ring_loop_t *ringloop;
timerfd_manager_t *tfd = NULL;
@@ -216,7 +240,10 @@ class osd_t
void handle_primary_bs_subop(osd_op_t *subop);
void add_bs_subop_stats(osd_op_t *subop);
void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval);
void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op);

void submit_primary_subops(int submit_type, uint64_t op_version, const uint64_t* osd_set, osd_op_t *cur_op);
int submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t op_version,
osd_rmw_stripe_t *stripes, const uint64_t* osd_set, osd_op_t *cur_op, int subop_idx, int zero_read);
void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set);
void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count);
int submit_primary_sync_subops(osd_op_t *cur_op);
@@ -224,6 +251,14 @@ class osd_t

uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state);

void continue_chained_read(osd_op_t *cur_op);
int submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op);
void send_chained_read_results(pg_t & pg, osd_op_t *cur_op);
std::vector<osd_chain_read_t> collect_chained_read_requests(osd_op_t *cur_op);
int collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector<bitmap_request_t> & bitmap_requests);
int submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg);
int read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state);

inline pg_num_t map_to_pg(object_id oid, uint64_t pg_stripe_size)
{
uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)];


+ 4
- 0
src/osd_ops.h View File

@@ -184,6 +184,10 @@ struct __attribute__((__packed__)) osd_op_rw_t
uint64_t offset;
// length
uint32_t len;
// flags (for future)
uint32_t flags;
// inode metadata revision
uint64_t meta_revision;
};

struct __attribute__((__packed__)) osd_reply_rw_t


+ 78
- 10
src/osd_primary.cpp View File

@@ -28,6 +28,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
return false;
}
auto & pool_cfg = pool_cfg_it->second;
// FIXME: op_data->pg_data_size can probably be removed (there's pg.pg_data_size)
uint64_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
uint64_t pg_block_size = bs_block_size * pg_data_size;
object_id oid = {
@@ -52,20 +53,81 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
return false;
}
int stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size);
int chain_size = 0;
if (cur_op->req.hdr.opcode == OSD_OP_READ && cur_op->req.rw.meta_revision > 0)
{
// Chained read
auto inode_it = st_cli.inode_config.find(cur_op->req.rw.inode);
if (inode_it->second.mod_revision != cur_op->req.rw.meta_revision)
{
// Client view of the metadata differs from OSD's view
// Operation can't be completed correctly, client should retry later
finish_op(cur_op, -EPIPE);
return false;
}
// Find parents from the same pool. Optimized reads only work within pools
while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id &&
INODE_POOL(inode_it->second.parent_id) == pg_it->second.pool_id)
{
chain_size++;
inode_it = st_cli.inode_config.find(inode_it->second.parent_id);
}
if (chain_size)
{
// Add the original inode
chain_size++;
}
}
osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc_or_die(
1, sizeof(osd_primary_op_data_t) + (clean_entry_bitmap_size + sizeof(osd_rmw_stripe_t)) * stripe_count
// Allocate:
// - op_data
1, sizeof(osd_primary_op_data_t) +
// - stripes
// - resulting bitmap buffers
stripe_count * (clean_entry_bitmap_size + sizeof(osd_rmw_stripe_t)) +
chain_size * (
// - copy of the chain
sizeof(inode_t) +
// - bitmap buffers for chained read
stripe_count * clean_entry_bitmap_size +
// - 'missing' flags for chained reads
(pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : pg_it->second.pg_size)
)
);
void *data_buf = ((void*)op_data) + sizeof(osd_primary_op_data_t);
op_data->pg_num = pg_num;
op_data->oid = oid;
op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1));
op_data->stripes = (osd_rmw_stripe_t*)data_buf;
data_buf += sizeof(osd_rmw_stripe_t) * stripe_count;
op_data->scheme = pool_cfg.scheme;
op_data->pg_data_size = pg_data_size;
op_data->pg_size = pg_it->second.pg_size;
cur_op->op_data = op_data;
split_stripes(pg_data_size, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes);
// Allocate bitmaps along with stripes to avoid extra allocations and fragmentation
for (int i = 0; i < stripe_count; i++)
{
op_data->stripes[i].bmp_buf = (void*)(op_data->stripes+stripe_count) + clean_entry_bitmap_size*i;
op_data->stripes[i].bmp_buf = data_buf;
data_buf += clean_entry_bitmap_size;
}
op_data->chain_size = chain_size;
if (chain_size > 0)
{
op_data->read_chain = (inode_t*)data_buf;
data_buf += sizeof(inode_t) * chain_size;
op_data->snapshot_bitmaps = data_buf;
data_buf += chain_size * stripe_count * clean_entry_bitmap_size;
op_data->missing_flags = (uint8_t*)data_buf;
data_buf += chain_size * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : pg_it->second.pg_size);
// Copy chain
int chain_num = 0;
op_data->read_chain[chain_num++] = cur_op->req.rw.inode;
auto inode_it = st_cli.inode_config.find(cur_op->req.rw.inode);
while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id)
{
op_data->read_chain[chain_num++] = inode_it->second.parent_id;
inode_it = st_cli.inode_config.find(inode_it->second.parent_id);
}
}
pg_it->second.inflight++;
return true;
@@ -106,10 +168,17 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
{
return;
}
cur_op->reply.rw.bitmap_len = 0;
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == 1) goto resume_1;
else if (op_data->st == 2) goto resume_2;
if (op_data->chain_size)
{
continue_chained_read(cur_op);
return;
}
if (op_data->st == 1)
goto resume_1;
else if (op_data->st == 2)
goto resume_2;
cur_op->reply.rw.bitmap_len = 0;
{
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
for (int role = 0; role < op_data->pg_data_size; role++)
@@ -124,8 +193,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
{
// Fast happy-path
cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_data_size, 0);
submit_primary_subops(SUBMIT_READ, op_data->target_ver,
(op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size), pg.cur_set.data(), cur_op);
submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.cur_set.data(), cur_op);
op_data->st = 1;
}
else
@@ -142,7 +210,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op)
op_data->scheme = pg.scheme;
op_data->degraded = 1;
cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0);
submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_size, cur_set, cur_op);
submit_primary_subops(SUBMIT_READ, op_data->target_ver, cur_set, cur_op);
op_data->st = 1;
}
}
@@ -265,7 +333,7 @@ resume_1:
// Determine which OSDs contain this object and delete it
op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state);
// Submit 1 read to determine the actual version number
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
resume_2:
op_data->st = 2;
return;


+ 25
- 9
src/osd_primary.h View File

@@ -31,15 +31,31 @@ struct osd_primary_op_data_t
uint64_t *prev_set = NULL;
pg_osd_set_state_t *object_state = NULL;

// for sync. oops, requires freeing
std::vector<unstable_osd_num_t> *unstable_write_osds = NULL;
pool_pg_num_t *dirty_pgs = NULL;
int dirty_pg_count = 0;
osd_num_t *dirty_osds = NULL;
int dirty_osd_count = 0;
obj_ver_id *unstable_writes = NULL;
obj_ver_osd_t *copies_to_delete = NULL;
int copies_to_delete_count = 0;
union
{
struct
{
// for sync. oops, requires freeing
std::vector<unstable_osd_num_t> *unstable_write_osds;
pool_pg_num_t *dirty_pgs;
int dirty_pg_count;
osd_num_t *dirty_osds;
int dirty_osd_count;
obj_ver_id *unstable_writes;
obj_ver_osd_t *copies_to_delete;
int copies_to_delete_count;
};
struct
{
// for read_bitmaps
void *snapshot_bitmaps;
inode_t *read_chain;
uint8_t *missing_flags;
int chain_size;
osd_chain_read_t *chain_reads;
int chain_read_count;
};
};
};

bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num);

+ 549
- 0
src/osd_primary_chain.cpp View File

@@ -0,0 +1,549 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)

#include "osd_primary.h"
#include "allocator.h"

void osd_t::continue_chained_read(osd_op_t *cur_op)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num });
if (op_data->st == 1)
goto resume_1;
else if (op_data->st == 2)
goto resume_2;
else if (op_data->st == 3)
goto resume_3;
else if (op_data->st == 4)
goto resume_4;
cur_op->reply.rw.bitmap_len = 0;
for (int role = 0; role < op_data->pg_data_size; role++)
{
op_data->stripes[role].read_start = op_data->stripes[role].req_start;
op_data->stripes[role].read_end = op_data->stripes[role].req_end;
}
resume_1:
resume_2:
// Read bitmaps
if (read_bitmaps(cur_op, pg, 1) != 0)
return;
// Prepare & submit reads
if (submit_chained_read_requests(pg, cur_op) != 0)
return;
if (op_data->n_subops > 0)
{
// Wait for reads
op_data->st = 3;
resume_3:
return;
}
resume_4:
if (op_data->errors > 0)
{
free(op_data->chain_reads);
op_data->chain_reads = NULL;
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
send_chained_read_results(pg, cur_op);
finish_op(cur_op, cur_op->req.rw.len);
}

int osd_t::read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
if (op_data->st == base_state)
goto resume_0;
else if (op_data->st == base_state+1)
goto resume_1;
if (pg.state == PG_ACTIVE && pg.scheme == POOL_SCHEME_REPLICATED)
{
// Happy path for clean replicated PGs (all bitmaps are available locally)
for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++)
{
object_id cur_oid = { .inode = op_data->read_chain[chain_num], .stripe = op_data->oid.stripe };
auto vo_it = pg.ver_override.find(cur_oid);
auto read_version = (vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
// Read bitmap synchronously from the local database
bs->read_bitmap(cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, NULL);
}
}
else
{
if (submit_bitmap_subops(cur_op, pg) < 0)
{
// Failure
finish_op(cur_op, -EIO);
return -1;
}
resume_0:
if (op_data->n_subops > 0)
{
// Wait for subops
op_data->st = base_state;
return 1;
}
resume_1:
if (pg.scheme != POOL_SCHEME_REPLICATED)
{
for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++)
{
// Check if we need to reconstruct any bitmaps
for (int i = 0; i < pg.pg_size; i++)
{
if (op_data->missing_flags[chain_num*pg.pg_size + i])
{
osd_rmw_stripe_t local_stripes[pg.pg_size] = { 0 };
for (i = 0; i < pg.pg_size; i++)
{
local_stripes[i].missing = op_data->missing_flags[chain_num*pg.pg_size + i] && true;
local_stripes[i].bmp_buf = op_data->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size;
local_stripes[i].read_start = local_stripes[i].read_end = 1;
}
if (pg.scheme == POOL_SCHEME_XOR)
{
reconstruct_stripes_xor(local_stripes, pg.pg_size, clean_entry_bitmap_size);
}
else if (pg.scheme == POOL_SCHEME_JERASURE)
{
reconstruct_stripes_jerasure(local_stripes, pg.pg_size, pg.pg_data_size, clean_entry_bitmap_size);
}
break;
}
}
}
}
}
return 0;
}

int osd_t::collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector<bitmap_request_t> & bitmap_requests)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++)
{
object_id cur_oid = { .inode = op_data->read_chain[chain_num], .stripe = op_data->oid.stripe };
auto vo_it = pg.ver_override.find(cur_oid);
uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
pg_osd_set_state_t *object_state;
uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
if (pg.scheme == POOL_SCHEME_REPLICATED)
{
osd_num_t read_target = 0;
for (int i = 0; i < pg.pg_size; i++)
{
if (cur_set[i] == this->osd_num || cur_set[i] != 0 && read_target == 0)
{
// Select local or any other available OSD for reading
read_target = cur_set[i];
}
}
assert(read_target != 0);
bitmap_requests.push_back((bitmap_request_t){
.osd_num = read_target,
.oid = cur_oid,
.version = target_version,
.bmp_buf = op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
});
}
else
{
osd_rmw_stripe_t local_stripes[pg.pg_size];
memcpy(local_stripes, op_data->stripes, sizeof(osd_rmw_stripe_t) * pg.pg_size);
if (extend_missing_stripes(local_stripes, cur_set, pg.pg_data_size, pg.pg_size) < 0)
{
free(op_data->snapshot_bitmaps);
return -1;
}
int need_at_least = 0;
for (int i = 0; i < pg.pg_size; i++)
{
if (local_stripes[i].read_end != 0 && cur_set[i] == 0)
{
// We need this part of the bitmap, but it's unavailable
need_at_least = pg.pg_data_size;
op_data->missing_flags[chain_num*pg.pg_size + i] = 1;
}
else
{
op_data->missing_flags[chain_num*pg.pg_size + i] = 0;
}
}
int found = 0;
for (int i = 0; i < pg.pg_size; i++)
{
if (cur_set[i] != 0 && (local_stripes[i].read_end != 0 || found < need_at_least))
{
// Read part of the bitmap
bitmap_requests.push_back((bitmap_request_t){
.osd_num = cur_set[i],
.oid = {
.inode = cur_oid.inode,
.stripe = cur_oid.stripe | i,
},
.version = target_version,
.bmp_buf = op_data->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size,
});
found++;
}
}
// Already checked by extend_missing_stripes, so it's fine to use assert
assert(found >= need_at_least);
}
}
std::sort(bitmap_requests.begin(), bitmap_requests.end());
return 0;
}

int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
std::vector<bitmap_request_t> *bitmap_requests = new std::vector<bitmap_request_t>();
if (collect_bitmap_requests(cur_op, pg, *bitmap_requests) < 0)
{
return -1;
}
op_data->n_subops = 0;
for (int i = 0; i < bitmap_requests->size(); i++)
{
if ((i == bitmap_requests->size()-1 || (*bitmap_requests)[i+1].osd_num != (*bitmap_requests)[i].osd_num) &&
(*bitmap_requests)[i].osd_num != this->osd_num)
{
op_data->n_subops++;
}
}
if (op_data->n_subops)
{
op_data->fact_ver = 0;
op_data->done = op_data->errors = 0;
op_data->subops = new osd_op_t[op_data->n_subops];
}
for (int i = 0, subop_idx = 0, prev = 0; i < bitmap_requests->size(); i++)
{
if (i == bitmap_requests->size()-1 || (*bitmap_requests)[i+1].osd_num != (*bitmap_requests)[i].osd_num)
{
osd_num_t subop_osd_num = (*bitmap_requests)[i].osd_num;
if (subop_osd_num == this->osd_num)
{
// Read bitmap synchronously from the local database
for (int j = prev; j <= i; j++)
{
bs->read_bitmap((*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf, NULL);
}
}
else
{
// Send to a remote OSD
osd_op_t *subop = op_data->subops+subop_idx;
subop->op_type = OSD_OP_OUT;
subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num);
// FIXME: Use the pre-allocated buffer
subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev));
subop->req = (osd_any_op_t){
.sec_read_bmp = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.opcode = OSD_OP_SEC_READ_BMP,
},
.len = sizeof(obj_ver_id)*(i+1-prev),
}
};
obj_ver_id *ov = (obj_ver_id*)subop->buf;
for (int j = prev; j <= i; j++, ov++)
{
ov->oid = (*bitmap_requests)[j].oid;
ov->version = (*bitmap_requests)[j].version;
}
subop->callback = [cur_op, bitmap_requests, prev, i, this](osd_op_t *subop)
{
int requested_count = subop->req.sec_read_bmp.len / sizeof(obj_ver_id);
if (subop->reply.hdr.retval == requested_count * (8 + clean_entry_bitmap_size))
{
void *cur_buf = subop->buf + 8;
for (int j = prev; j <= i; j++)
{
memcpy((*bitmap_requests)[j].bmp_buf, cur_buf, clean_entry_bitmap_size);
cur_buf += 8 + clean_entry_bitmap_size;
}
}
if ((cur_op->op_data->errors + cur_op->op_data->done + 1) >= cur_op->op_data->n_subops)
{
delete bitmap_requests;
}
handle_primary_subop(subop, cur_op);
};
c_cli.outbox_push(subop);
subop_idx++;
}
prev = i+1;
}
}
if (!op_data->n_subops)
{
delete bitmap_requests;
}
return 0;
}

std::vector<osd_chain_read_t> osd_t::collect_chained_read_requests(osd_op_t *cur_op)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
std::vector<osd_chain_read_t> chain_reads;
int stripe_count = (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : op_data->pg_size);
memset(op_data->stripes[0].bmp_buf, 0, stripe_count * clean_entry_bitmap_size);
uint8_t *global_bitmap = (uint8_t*)op_data->stripes[0].bmp_buf;
// We always use at most 1 read request per layer
for (int chain_pos = 0; chain_pos < op_data->chain_size; chain_pos++)
{
uint8_t *part_bitmap = ((uint8_t*)op_data->snapshot_bitmaps) + chain_pos*stripe_count*clean_entry_bitmap_size;
int start = (cur_op->req.rw.offset - op_data->oid.stripe)/bs_bitmap_granularity;
int end = start + cur_op->req.rw.len/bs_bitmap_granularity;
// Skip unneeded part in the beginning
while (start < end && (
((global_bitmap[start>>3] >> (start&7)) & 1) ||
!((part_bitmap[start>>3] >> (start&7)) & 1)))
{
start++;
}
// Skip unneeded part in the end
while (start < end && (
((global_bitmap[(end-1)>>3] >> ((end-1)&7)) & 1) ||
!((part_bitmap[(end-1)>>3] >> ((end-1)&7)) & 1)))
{
end--;
}
if (start < end)
{
// Copy (OR) bits in between
int cur = start;
for (; cur < end && (cur & 0x7); cur++)
{
global_bitmap[cur>>3] = global_bitmap[cur>>3] | (part_bitmap[cur>>3] & (1 << (cur&7)));
}
for (; cur <= end-8; cur += 8)
{
global_bitmap[cur>>3] = global_bitmap[cur>>3] | part_bitmap[cur>>3];
}
for (; cur < end; cur++)
{
global_bitmap[cur>>3] = global_bitmap[cur>>3] | (part_bitmap[cur>>3] & (1 << (cur&7)));
}
// Add request
chain_reads.push_back((osd_chain_read_t){
.chain_pos = chain_pos,
.inode = op_data->read_chain[chain_pos],
.offset = start*bs_bitmap_granularity,
.len = (end-start)*bs_bitmap_granularity,
});
}
}
return chain_reads;
}

int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op)
{
// Decide which parts of which objects we need to read based on bitmaps
osd_primary_op_data_t *op_data = cur_op->op_data;
auto chain_reads = collect_chained_read_requests(cur_op);
int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size);
op_data->chain_read_count = chain_reads.size();
op_data->chain_reads = (osd_chain_read_t*)calloc_or_die(
1, (sizeof(osd_chain_read_t) + sizeof(osd_rmw_stripe_t)*stripe_count) * chain_reads.size()
);
osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(((void*)op_data->chain_reads) + sizeof(osd_chain_read_t) * op_data->chain_read_count);
// Now process each subrequest as a separate read, including reconstruction if needed
// Prepare reads
int n_subops = 0;
uint64_t read_buffer_size = 0;
for (int cri = 0; cri < chain_reads.size(); cri++)
{
op_data->chain_reads[cri] = chain_reads[cri];
object_id cur_oid = { .inode = chain_reads[cri].inode, .stripe = op_data->oid.stripe };
// FIXME: maybe introduce split_read_stripes to shorten these lines and to remove read_start=req_start
osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count;
split_stripes(pg.pg_data_size, bs_block_size, chain_reads[cri].offset, chain_reads[cri].len, stripes);
if (op_data->scheme == POOL_SCHEME_REPLICATED && !stripes[0].req_end)
{
continue;
}
for (int role = 0; role < op_data->pg_data_size; role++)
{
stripes[role].read_start = stripes[role].req_start;
stripes[role].read_end = stripes[role].req_end;
}
uint64_t *cur_set = pg.cur_set.data();
if (pg.state != PG_ACTIVE && op_data->scheme != POOL_SCHEME_REPLICATED)
{
pg_osd_set_state_t *object_state;
cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
if (extend_missing_stripes(stripes, cur_set, pg.pg_data_size, pg.pg_size) < 0)
{
free(op_data->chain_reads);
op_data->chain_reads = NULL;
finish_op(cur_op, -EIO);
return -1;
}
op_data->degraded = 1;
}
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
n_subops++;
read_buffer_size += stripes[0].read_end - stripes[0].read_start;
}
else
{
for (int role = 0; role < pg.pg_size; role++)
{
if (stripes[role].read_end > 0 && cur_set[role] != 0)
n_subops++;
if (stripes[role].read_end > 0)
read_buffer_size += stripes[role].read_end - stripes[role].read_start;
}
}
}
cur_op->buf = memalign_or_die(MEM_ALIGNMENT, read_buffer_size);
void *cur_buf = cur_op->buf;
for (int cri = 0; cri < chain_reads.size(); cri++)
{
osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count;
for (int role = 0; role < stripe_count; role++)
{
if (stripes[role].read_end > 0)
{
stripes[role].read_buf = cur_buf;
stripes[role].bmp_buf = op_data->snapshot_bitmaps + (chain_reads[cri].chain_pos*stripe_count + role)*clean_entry_bitmap_size;
cur_buf += stripes[role].read_end - stripes[role].read_start;
}
}
}
// Submit all reads
op_data->fact_ver = UINT64_MAX;
op_data->done = op_data->errors = 0;
op_data->n_subops = n_subops;
if (!n_subops)
{
return 0;
}
op_data->subops = new osd_op_t[n_subops];
int cur_subops = 0;
for (int cri = 0; cri < chain_reads.size(); cri++)
{
osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count;
if (op_data->scheme == POOL_SCHEME_REPLICATED && !stripes[0].req_end)
{
continue;
}
object_id cur_oid = { .inode = chain_reads[cri].inode, .stripe = op_data->oid.stripe };
auto vo_it = pg.ver_override.find(cur_oid);
uint64_t target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX;
uint64_t *cur_set = pg.cur_set.data();
if (pg.state != PG_ACTIVE && op_data->scheme != POOL_SCHEME_REPLICATED)
{
pg_osd_set_state_t *object_state;
cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state);
}
int zero_read = -1;
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
for (int role = 0; role < op_data->pg_size; role++)
if (cur_set[role] == this->osd_num || zero_read == -1)
zero_read = role;
}
cur_subops += submit_primary_subop_batch(SUBMIT_READ, chain_reads[cri].inode, target_ver, stripes, cur_set, cur_op, cur_subops, zero_read);
}
assert(cur_subops == n_subops);
return 0;
}

void osd_t::send_chained_read_results(pg_t & pg, osd_op_t *cur_op)
{
osd_primary_op_data_t *op_data = cur_op->op_data;
int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size);
osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(((void*)op_data->chain_reads) + sizeof(osd_chain_read_t) * op_data->chain_read_count);
// Reconstruct parts if needed
if (op_data->degraded)
{
int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size);
for (int cri = 0; cri < op_data->chain_read_count; cri++)
{
// Reconstruct missing stripes
osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count;
if (op_data->scheme == POOL_SCHEME_XOR)
{
reconstruct_stripes_xor(stripes, pg.pg_size, clean_entry_bitmap_size);
}
else if (op_data->scheme == POOL_SCHEME_JERASURE)
{
reconstruct_stripes_jerasure(stripes, pg.pg_size, pg.pg_data_size, clean_entry_bitmap_size);
}
}
}
// Send bitmap
cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size;
cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len);
// And finally compose the result
uint64_t sent = 0;
int prev_pos = 0, pos = 0;
bool prev_set = false;
int prev = (cur_op->req.rw.offset - op_data->oid.stripe) / bs_bitmap_granularity;
int end = prev + cur_op->req.rw.len/bs_bitmap_granularity;
int cur = prev;
while (cur <= end)
{
bool has_bit = false;
if (cur < end)
{
for (pos = 0; pos < op_data->chain_size; pos++)
{
has_bit = (((uint8_t*)op_data->snapshot_bitmaps)[pos*stripe_count*clean_entry_bitmap_size + cur/8] >> (cur%8)) & 1;
if (has_bit)
break;
}
}
if (has_bit != prev_set || pos != prev_pos || cur == end)
{
if (cur > prev)
{
// Send buffer in parts to avoid copying
if (!prev_set)
{
while ((cur-prev) > zero_buffer_size/bs_bitmap_granularity)
{
cur_op->iov.push_back(zero_buffer, zero_buffer_size);
sent += zero_buffer_size;
prev += zero_buffer_size/bs_bitmap_granularity;
}
cur_op->iov.push_back(zero_buffer, (cur-prev)*bs_bitmap_granularity);
sent += (cur-prev)*bs_bitmap_granularity;
}
else
{
osd_rmw_stripe_t *stripes = chain_stripes + prev_pos*stripe_count;
while (cur > prev)
{
int role = prev*bs_bitmap_granularity/bs_block_size;
int role_start = prev*bs_bitmap_granularity - role*bs_block_size;
int role_end = cur*bs_bitmap_granularity - role*bs_block_size;
if (role_end > bs_block_size)
role_end = bs_block_size;
assert(stripes[role].read_buf);
cur_op->iov.push_back(
stripes[role].read_buf + (role_start - stripes[role].read_start),
role_end - role_start
);
sent += role_end - role_start;
prev += (role_end - role_start)/bs_bitmap_granularity;
}
}
}
prev = cur;
prev_pos = pos;
prev_set = has_bit;
}
cur++;
}
assert(sent == cur_op->req.rw.len);
free(op_data->chain_reads);
op_data->chain_reads = NULL;
}

+ 54
- 45
src/osd_primary_subops.cpp View File

@@ -76,9 +76,6 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
}
}
assert(!cur_op->op_data->subops);
assert(!cur_op->op_data->unstable_write_osds);
assert(!cur_op->op_data->unstable_writes);
assert(!cur_op->op_data->dirty_pgs);
free(cur_op->op_data);
cur_op->op_data = NULL;
}
@@ -106,7 +103,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval)
}
}

void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op)
void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, const uint64_t* osd_set, osd_op_t *cur_op)
{
bool wr = submit_type == SUBMIT_WRITE;
osd_primary_op_data_t *op_data = cur_op->op_data;
@@ -114,32 +111,34 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
// Allocate subops
int n_subops = 0, zero_read = -1;
for (int role = 0; role < pg_size; role++)
for (int role = 0; role < op_data->pg_size; role++)
{
if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1)
{
zero_read = role;
}
if (osd_set[role] != 0 && (wr || !rep && stripes[role].read_end != 0))
{
n_subops++;
}
}
if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep))
{
n_subops = 1;
}
else
{
zero_read = -1;
}
osd_op_t *subops = new osd_op_t[n_subops];
op_data->fact_ver = 0;
op_data->done = op_data->errors = 0;
op_data->n_subops = n_subops;
op_data->subops = subops;
int i = 0;
for (int role = 0; role < pg_size; role++)
int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read);
assert(sent == n_subops);
}

int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t op_version,
osd_rmw_stripe_t *stripes, const uint64_t* osd_set, osd_op_t *cur_op, int subop_idx, int zero_read)
{
bool wr = submit_type == SUBMIT_WRITE;
osd_primary_op_data_t *op_data = cur_op->op_data;
bool rep = op_data->scheme == POOL_SCHEME_REPLICATED;
int i = subop_idx;
for (int role = 0; role < op_data->pg_size; role++)
{
// We always submit zero-length writes to all replicas, even if the stripe is not modified
if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role))
@@ -150,20 +149,21 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
if (role_osd_num != 0)
{
int stripe_num = rep ? 0 : role;
osd_op_t *subop = op_data->subops + i;
if (role_osd_num == this->osd_num)
{
clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin);
subops[i].op_type = (uint64_t)cur_op;
subops[i].bitmap = stripes[stripe_num].bmp_buf;
subops[i].bitmap_len = clean_entry_bitmap_size;
subops[i].bs_op = new blockstore_op_t({
clock_gettime(CLOCK_REALTIME, &subop->tv_begin);
subop->op_type = (uint64_t)cur_op;
subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size;
subop->bs_op = new blockstore_op_t({
.opcode = (uint64_t)(wr ? (rep ? BS_OP_WRITE_STABLE : BS_OP_WRITE) : BS_OP_READ),
.callback = [subop = &subops[i], this](blockstore_op_t *bs_subop)
.callback = [subop, this](blockstore_op_t *bs_subop)
{
handle_primary_bs_subop(subop);
},
.oid = {
.inode = op_data->oid.inode,
.inode = inode,
.stripe = op_data->oid.stripe | stripe_num,
},
.version = op_version,
@@ -175,26 +175,26 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
#ifdef OSD_DEBUG
printf(
"Submit %s to local: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read",
op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version,
subops[i].bs_op->offset, subops[i].bs_op->len
inode, op_data->oid.stripe | stripe_num, op_version,
subop->bs_op->offset, subop->bs_op->len
);
#endif
bs->enqueue_op(subops[i].bs_op);
bs->enqueue_op(subop->bs_op);
}
else
{
subops[i].op_type = OSD_OP_OUT;
subops[i].peer_fd = c_cli.osd_peer_fds.at(role_osd_num);
subops[i].bitmap = stripes[stripe_num].bmp_buf;
subops[i].bitmap_len = clean_entry_bitmap_size;
subops[i].req.sec_rw = {
subop->op_type = OSD_OP_OUT;
subop->peer_fd = c_cli.osd_peer_fds.at(role_osd_num);
subop->bitmap = stripes[stripe_num].bmp_buf;
subop->bitmap_len = clean_entry_bitmap_size;
subop->req.sec_rw = {
.header = {
.magic = SECONDARY_OSD_OP_MAGIC,
.id = c_cli.next_subop_id++,
.opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ),
},
.oid = {
.inode = op_data->oid.inode,
.inode = inode,
.stripe = op_data->oid.stripe | stripe_num,
},
.version = op_version,
@@ -205,33 +205,34 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s
#ifdef OSD_DEBUG
printf(
"Submit %s to osd %lu: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read", role_osd_num,
op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version,
subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len
inode, op_data->oid.stripe | stripe_num, op_version,
subop->req.sec_rw.offset, subop->req.sec_rw.len
);
#endif
if (wr)
{
if (stripes[stripe_num].write_end > stripes[stripe_num].write_start)
{
subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
subop->iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start);
}
}
else
{
if (stripes[stripe_num].read_end > stripes[stripe_num].read_start)
{
subops[i].iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start);
subop->iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start);
}
}
subops[i].callback = [cur_op, this](osd_op_t *subop)
subop->callback = [cur_op, this](osd_op_t *subop)
{
handle_primary_subop(subop, cur_op);
};
c_cli.outbox_push(&subops[i]);
c_cli.outbox_push(subop);
}
i++;
}
}
return i-subop_idx;
}

static uint64_t bs_op_to_osd_op[] = {
@@ -302,8 +303,13 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
{
uint64_t opcode = subop->req.hdr.opcode;
int retval = subop->reply.hdr.retval;
int expected = opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE
|| opcode == OSD_OP_SEC_WRITE_STABLE ? subop->req.sec_rw.len : 0;
int expected;
if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE)
expected = subop->req.sec_rw.len;
else if (opcode == OSD_OP_SEC_READ_BMP)
expected = subop->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size);
else
expected = 0;
osd_primary_op_data_t *op_data = cur_op->op_data;
if (retval != expected)
{
@@ -330,14 +336,17 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op)
? c_cli.clients[subop->peer_fd]->osd_num : osd_num;
printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version);
#endif
if (op_data->fact_ver != 0 && op_data->fact_ver != version)
if (op_data->fact_ver != UINT64_MAX)
{
throw std::runtime_error(
"different fact_versions returned from "+std::string(osd_op_names[opcode])+
" subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)
);
if (op_data->fact_ver != 0 && op_data->fact_ver != version)
{
throw std::runtime_error(
"different fact_versions returned from "+std::string(osd_op_names[opcode])+
" subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver)
);
}
op_data->fact_ver = version;
}
op_data->fact_ver = version;
}
}
if ((op_data->errors + op_data->done) >= op_data->n_subops)


+ 2
- 2
src/osd_primary_write.cpp View File

@@ -87,7 +87,7 @@ resume_1:
}
}
// Read required blocks
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op);
submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op);
resume_2:
op_data->st = 2;
return;
@@ -158,7 +158,7 @@ resume_10:
return;
}
}
submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op);
submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.cur_set.data(), cur_op);
resume_4:
op_data->st = 4;
return;


+ 13
- 10
src/osd_rmw.cpp View File

@@ -246,20 +246,23 @@ void reconstruct_stripes_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg
{
if (stripes[role].read_end != 0 && stripes[role].missing)
{
for (int other = 0; other < pg_size; other++)
if (stripes[role].read_end > stripes[role].read_start)
{
if (stripes[other].read_end != 0 && !stripes[other].missing)
for (int other = 0; other < pg_size; other++)
{
assert(stripes[other].read_start <= stripes[role].read_start);
assert(stripes[other].read_end >= stripes[role].read_end);
data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start));
if (stripes[other].read_end != 0 && !stripes[other].missing)
{
assert(stripes[other].read_start <= stripes[role].read_start);
assert(stripes[other].read_end >= stripes[role].read_end);
data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start));
}
}
data_ptrs[role] = (char*)stripes[role].read_buf;
jerasure_matrix_dotprod(
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start
);
}
data_ptrs[role] = (char*)stripes[role].read_buf;
jerasure_matrix_dotprod(
pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role,
data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start
);
for (int other = 0; other < pg_size; other++)
{
if (stripes[other].read_end != 0 && !stripes[other].missing)


Loading…
Cancel
Save