From 38a3df4a0e540fca30cd5b09b69116fc18029f6a Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 23 Mar 2021 00:26:22 +0300 Subject: [PATCH] Implement chained (optimized) read in the primary OSD code --- src/CMakeLists.txt | 2 +- src/cluster_client.cpp | 1 + src/osd.cpp | 5 + src/osd.h | 37 ++- src/osd_ops.h | 4 + src/osd_primary.cpp | 88 +++++- src/osd_primary.h | 34 ++- src/osd_primary_chain.cpp | 549 +++++++++++++++++++++++++++++++++++++ src/osd_primary_subops.cpp | 99 ++++--- src/osd_primary_write.cpp | 4 +- src/osd_rmw.cpp | 23 +- 11 files changed, 768 insertions(+), 78 deletions(-) create mode 100644 src/osd_primary_chain.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b2b902de..b980f245 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -66,7 +66,7 @@ target_link_libraries(fio_vitastor_blk # vitastor-osd add_executable(vitastor-osd osd_main.cpp osd.cpp osd_secondary.cpp msgr_receive.cpp msgr_send.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp - osd_primary.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp + osd_primary.cpp osd_primary_chain.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp etcd_state_client.cpp messenger.cpp msgr_stop.cpp msgr_op.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp osd_rmw.cpp base64.cpp timerfd_manager.cpp epoll_manager.cpp ../json11/json11.cpp ) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 050f55be..8f91deb7 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -811,6 +811,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i) .inode = op->cur_inode, .offset = part->offset, .len = part->len, + .meta_revision = 0, } }, .bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i, .bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size), diff --git a/src/osd.cpp b/src/osd.cpp index 523f9c11..db7b05a0 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -20,6 +20,10 @@ osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop) bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; clean_entry_bitmap_size = bs_block_size / bs_bitmap_granularity / 8; + zero_buffer_size = 1<<20; + zero_buffer = malloc_or_die(zero_buffer_size); + memset(zero_buffer, 0, zero_buffer_size); + this->config = config; this->ringloop = ringloop; @@ -58,6 +62,7 @@ osd_t::~osd_t() delete epmgr; delete bs; close(listen_fd); + free(zero_buffer); } void osd_t::parse_config(blockstore_config_t & config) diff --git a/src/osd.h b/src/osd.h index 749ab0cd..7a67c6e9 100644 --- a/src/osd.h +++ b/src/osd.h @@ -66,6 +66,28 @@ struct inode_stats_t uint64_t op_bytes[3] = { 0 }; }; +struct bitmap_request_t +{ + osd_num_t osd_num; + object_id oid; + uint64_t version; + void *bmp_buf; +}; + +inline bool operator < (const bitmap_request_t & a, const bitmap_request_t & b) +{ + return a.osd_num < b.osd_num || a.osd_num == b.osd_num && a.oid < b.oid; +} + +struct osd_chain_read_t +{ + int chain_pos; + inode_t inode; + uint32_t offset, len; +}; + +struct osd_rmw_stripe_t; + class osd_t { // config @@ -126,6 +148,8 @@ class osd_t bool stopping = false; int inflight_ops = 0; blockstore_t *bs; + void *zero_buffer = NULL; + uint64_t zero_buffer_size = 0; uint32_t bs_block_size, bs_bitmap_granularity, clean_entry_bitmap_size; ring_loop_t *ringloop; timerfd_manager_t *tfd = NULL; @@ -216,7 +240,10 @@ class osd_t void handle_primary_bs_subop(osd_op_t *subop); void add_bs_subop_stats(osd_op_t *subop); void pg_cancel_write_queue(pg_t & pg, osd_op_t *first_op, object_id oid, int retval); - void submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op); + + void submit_primary_subops(int submit_type, uint64_t op_version, const uint64_t* osd_set, osd_op_t *cur_op); + int submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t op_version, + osd_rmw_stripe_t *stripes, const uint64_t* osd_set, osd_op_t *cur_op, int subop_idx, int zero_read); void submit_primary_del_subops(osd_op_t *cur_op, uint64_t *cur_set, uint64_t set_size, pg_osd_set_t & loc_set); void submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_delete, int chunks_to_delete_count); int submit_primary_sync_subops(osd_op_t *cur_op); @@ -224,6 +251,14 @@ class osd_t uint64_t* get_object_osd_set(pg_t &pg, object_id &oid, uint64_t *def, pg_osd_set_state_t **object_state); + void continue_chained_read(osd_op_t *cur_op); + int submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op); + void send_chained_read_results(pg_t & pg, osd_op_t *cur_op); + std::vector collect_chained_read_requests(osd_op_t *cur_op); + int collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector & bitmap_requests); + int submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg); + int read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state); + inline pg_num_t map_to_pg(object_id oid, uint64_t pg_stripe_size) { uint64_t pg_count = pg_counts[INODE_POOL(oid.inode)]; diff --git a/src/osd_ops.h b/src/osd_ops.h index 94b185d0..ee80d742 100644 --- a/src/osd_ops.h +++ b/src/osd_ops.h @@ -184,6 +184,10 @@ struct __attribute__((__packed__)) osd_op_rw_t uint64_t offset; // length uint32_t len; + // flags (for future) + uint32_t flags; + // inode metadata revision + uint64_t meta_revision; }; struct __attribute__((__packed__)) osd_reply_rw_t diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index 52a30ee4..3a3e685e 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -28,6 +28,7 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return false; } auto & pool_cfg = pool_cfg_it->second; + // FIXME: op_data->pg_data_size can probably be removed (there's pg.pg_data_size) uint64_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); uint64_t pg_block_size = bs_block_size * pg_data_size; object_id oid = { @@ -52,20 +53,81 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) return false; } int stripe_count = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg_it->second.pg_size); + int chain_size = 0; + if (cur_op->req.hdr.opcode == OSD_OP_READ && cur_op->req.rw.meta_revision > 0) + { + // Chained read + auto inode_it = st_cli.inode_config.find(cur_op->req.rw.inode); + if (inode_it->second.mod_revision != cur_op->req.rw.meta_revision) + { + // Client view of the metadata differs from OSD's view + // Operation can't be completed correctly, client should retry later + finish_op(cur_op, -EPIPE); + return false; + } + // Find parents from the same pool. Optimized reads only work within pools + while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id && + INODE_POOL(inode_it->second.parent_id) == pg_it->second.pool_id) + { + chain_size++; + inode_it = st_cli.inode_config.find(inode_it->second.parent_id); + } + if (chain_size) + { + // Add the original inode + chain_size++; + } + } osd_primary_op_data_t *op_data = (osd_primary_op_data_t*)calloc_or_die( - 1, sizeof(osd_primary_op_data_t) + (clean_entry_bitmap_size + sizeof(osd_rmw_stripe_t)) * stripe_count + // Allocate: + // - op_data + 1, sizeof(osd_primary_op_data_t) + + // - stripes + // - resulting bitmap buffers + stripe_count * (clean_entry_bitmap_size + sizeof(osd_rmw_stripe_t)) + + chain_size * ( + // - copy of the chain + sizeof(inode_t) + + // - bitmap buffers for chained read + stripe_count * clean_entry_bitmap_size + + // - 'missing' flags for chained reads + (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : pg_it->second.pg_size) + ) ); + void *data_buf = ((void*)op_data) + sizeof(osd_primary_op_data_t); op_data->pg_num = pg_num; op_data->oid = oid; - op_data->stripes = ((osd_rmw_stripe_t*)(op_data+1)); + op_data->stripes = (osd_rmw_stripe_t*)data_buf; + data_buf += sizeof(osd_rmw_stripe_t) * stripe_count; op_data->scheme = pool_cfg.scheme; op_data->pg_data_size = pg_data_size; + op_data->pg_size = pg_it->second.pg_size; cur_op->op_data = op_data; split_stripes(pg_data_size, bs_block_size, (uint32_t)(cur_op->req.rw.offset - oid.stripe), cur_op->req.rw.len, op_data->stripes); // Allocate bitmaps along with stripes to avoid extra allocations and fragmentation for (int i = 0; i < stripe_count; i++) { - op_data->stripes[i].bmp_buf = (void*)(op_data->stripes+stripe_count) + clean_entry_bitmap_size*i; + op_data->stripes[i].bmp_buf = data_buf; + data_buf += clean_entry_bitmap_size; + } + op_data->chain_size = chain_size; + if (chain_size > 0) + { + op_data->read_chain = (inode_t*)data_buf; + data_buf += sizeof(inode_t) * chain_size; + op_data->snapshot_bitmaps = data_buf; + data_buf += chain_size * stripe_count * clean_entry_bitmap_size; + op_data->missing_flags = (uint8_t*)data_buf; + data_buf += chain_size * (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 0 : pg_it->second.pg_size); + // Copy chain + int chain_num = 0; + op_data->read_chain[chain_num++] = cur_op->req.rw.inode; + auto inode_it = st_cli.inode_config.find(cur_op->req.rw.inode); + while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id) + { + op_data->read_chain[chain_num++] = inode_it->second.parent_id; + inode_it = st_cli.inode_config.find(inode_it->second.parent_id); + } } pg_it->second.inflight++; return true; @@ -106,10 +168,17 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) { return; } - cur_op->reply.rw.bitmap_len = 0; osd_primary_op_data_t *op_data = cur_op->op_data; - if (op_data->st == 1) goto resume_1; - else if (op_data->st == 2) goto resume_2; + if (op_data->chain_size) + { + continue_chained_read(cur_op); + return; + } + if (op_data->st == 1) + goto resume_1; + else if (op_data->st == 2) + goto resume_2; + cur_op->reply.rw.bitmap_len = 0; { auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }); for (int role = 0; role < op_data->pg_data_size; role++) @@ -124,8 +193,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) { // Fast happy-path cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_data_size, 0); - submit_primary_subops(SUBMIT_READ, op_data->target_ver, - (op_data->scheme == POOL_SCHEME_REPLICATED ? pg.pg_size : op_data->pg_data_size), pg.cur_set.data(), cur_op); + submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.cur_set.data(), cur_op); op_data->st = 1; } else @@ -142,7 +210,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) op_data->scheme = pg.scheme; op_data->degraded = 1; cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); - submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.pg_size, cur_set, cur_op); + submit_primary_subops(SUBMIT_READ, op_data->target_ver, cur_set, cur_op); op_data->st = 1; } } @@ -265,7 +333,7 @@ resume_1: // Determine which OSDs contain this object and delete it op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); // Submit 1 read to determine the actual version number - submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op); + submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op); resume_2: op_data->st = 2; return; diff --git a/src/osd_primary.h b/src/osd_primary.h index b53310f0..22593b1a 100644 --- a/src/osd_primary.h +++ b/src/osd_primary.h @@ -31,15 +31,31 @@ struct osd_primary_op_data_t uint64_t *prev_set = NULL; pg_osd_set_state_t *object_state = NULL; - // for sync. oops, requires freeing - std::vector *unstable_write_osds = NULL; - pool_pg_num_t *dirty_pgs = NULL; - int dirty_pg_count = 0; - osd_num_t *dirty_osds = NULL; - int dirty_osd_count = 0; - obj_ver_id *unstable_writes = NULL; - obj_ver_osd_t *copies_to_delete = NULL; - int copies_to_delete_count = 0; + union + { + struct + { + // for sync. oops, requires freeing + std::vector *unstable_write_osds; + pool_pg_num_t *dirty_pgs; + int dirty_pg_count; + osd_num_t *dirty_osds; + int dirty_osd_count; + obj_ver_id *unstable_writes; + obj_ver_osd_t *copies_to_delete; + int copies_to_delete_count; + }; + struct + { + // for read_bitmaps + void *snapshot_bitmaps; + inode_t *read_chain; + uint8_t *missing_flags; + int chain_size; + osd_chain_read_t *chain_reads; + int chain_read_count; + }; + }; }; bool contains_osd(osd_num_t *osd_set, uint64_t size, osd_num_t osd_num); diff --git a/src/osd_primary_chain.cpp b/src/osd_primary_chain.cpp new file mode 100644 index 00000000..6340fff1 --- /dev/null +++ b/src/osd_primary_chain.cpp @@ -0,0 +1,549 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) + +#include "osd_primary.h" +#include "allocator.h" + +void osd_t::continue_chained_read(osd_op_t *cur_op) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }); + if (op_data->st == 1) + goto resume_1; + else if (op_data->st == 2) + goto resume_2; + else if (op_data->st == 3) + goto resume_3; + else if (op_data->st == 4) + goto resume_4; + cur_op->reply.rw.bitmap_len = 0; + for (int role = 0; role < op_data->pg_data_size; role++) + { + op_data->stripes[role].read_start = op_data->stripes[role].req_start; + op_data->stripes[role].read_end = op_data->stripes[role].req_end; + } +resume_1: +resume_2: + // Read bitmaps + if (read_bitmaps(cur_op, pg, 1) != 0) + return; + // Prepare & submit reads + if (submit_chained_read_requests(pg, cur_op) != 0) + return; + if (op_data->n_subops > 0) + { + // Wait for reads + op_data->st = 3; +resume_3: + return; + } +resume_4: + if (op_data->errors > 0) + { + free(op_data->chain_reads); + op_data->chain_reads = NULL; + finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } + send_chained_read_results(pg, cur_op); + finish_op(cur_op, cur_op->req.rw.len); +} + +int osd_t::read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + if (op_data->st == base_state) + goto resume_0; + else if (op_data->st == base_state+1) + goto resume_1; + if (pg.state == PG_ACTIVE && pg.scheme == POOL_SCHEME_REPLICATED) + { + // Happy path for clean replicated PGs (all bitmaps are available locally) + for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++) + { + object_id cur_oid = { .inode = op_data->read_chain[chain_num], .stripe = op_data->oid.stripe }; + auto vo_it = pg.ver_override.find(cur_oid); + auto read_version = (vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX); + // Read bitmap synchronously from the local database + bs->read_bitmap(cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, NULL); + } + } + else + { + if (submit_bitmap_subops(cur_op, pg) < 0) + { + // Failure + finish_op(cur_op, -EIO); + return -1; + } +resume_0: + if (op_data->n_subops > 0) + { + // Wait for subops + op_data->st = base_state; + return 1; + } +resume_1: + if (pg.scheme != POOL_SCHEME_REPLICATED) + { + for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++) + { + // Check if we need to reconstruct any bitmaps + for (int i = 0; i < pg.pg_size; i++) + { + if (op_data->missing_flags[chain_num*pg.pg_size + i]) + { + osd_rmw_stripe_t local_stripes[pg.pg_size] = { 0 }; + for (i = 0; i < pg.pg_size; i++) + { + local_stripes[i].missing = op_data->missing_flags[chain_num*pg.pg_size + i] && true; + local_stripes[i].bmp_buf = op_data->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size; + local_stripes[i].read_start = local_stripes[i].read_end = 1; + } + if (pg.scheme == POOL_SCHEME_XOR) + { + reconstruct_stripes_xor(local_stripes, pg.pg_size, clean_entry_bitmap_size); + } + else if (pg.scheme == POOL_SCHEME_JERASURE) + { + reconstruct_stripes_jerasure(local_stripes, pg.pg_size, pg.pg_data_size, clean_entry_bitmap_size); + } + break; + } + } + } + } + } + return 0; +} + +int osd_t::collect_bitmap_requests(osd_op_t *cur_op, pg_t & pg, std::vector & bitmap_requests) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + for (int chain_num = 0; chain_num < op_data->chain_size; chain_num++) + { + object_id cur_oid = { .inode = op_data->read_chain[chain_num], .stripe = op_data->oid.stripe }; + auto vo_it = pg.ver_override.find(cur_oid); + uint64_t target_version = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX; + pg_osd_set_state_t *object_state; + uint64_t* cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state); + if (pg.scheme == POOL_SCHEME_REPLICATED) + { + osd_num_t read_target = 0; + for (int i = 0; i < pg.pg_size; i++) + { + if (cur_set[i] == this->osd_num || cur_set[i] != 0 && read_target == 0) + { + // Select local or any other available OSD for reading + read_target = cur_set[i]; + } + } + assert(read_target != 0); + bitmap_requests.push_back((bitmap_request_t){ + .osd_num = read_target, + .oid = cur_oid, + .version = target_version, + .bmp_buf = op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, + }); + } + else + { + osd_rmw_stripe_t local_stripes[pg.pg_size]; + memcpy(local_stripes, op_data->stripes, sizeof(osd_rmw_stripe_t) * pg.pg_size); + if (extend_missing_stripes(local_stripes, cur_set, pg.pg_data_size, pg.pg_size) < 0) + { + free(op_data->snapshot_bitmaps); + return -1; + } + int need_at_least = 0; + for (int i = 0; i < pg.pg_size; i++) + { + if (local_stripes[i].read_end != 0 && cur_set[i] == 0) + { + // We need this part of the bitmap, but it's unavailable + need_at_least = pg.pg_data_size; + op_data->missing_flags[chain_num*pg.pg_size + i] = 1; + } + else + { + op_data->missing_flags[chain_num*pg.pg_size + i] = 0; + } + } + int found = 0; + for (int i = 0; i < pg.pg_size; i++) + { + if (cur_set[i] != 0 && (local_stripes[i].read_end != 0 || found < need_at_least)) + { + // Read part of the bitmap + bitmap_requests.push_back((bitmap_request_t){ + .osd_num = cur_set[i], + .oid = { + .inode = cur_oid.inode, + .stripe = cur_oid.stripe | i, + }, + .version = target_version, + .bmp_buf = op_data->snapshot_bitmaps + (chain_num*pg.pg_size + i)*clean_entry_bitmap_size, + }); + found++; + } + } + // Already checked by extend_missing_stripes, so it's fine to use assert + assert(found >= need_at_least); + } + } + std::sort(bitmap_requests.begin(), bitmap_requests.end()); + return 0; +} + +int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + std::vector *bitmap_requests = new std::vector(); + if (collect_bitmap_requests(cur_op, pg, *bitmap_requests) < 0) + { + return -1; + } + op_data->n_subops = 0; + for (int i = 0; i < bitmap_requests->size(); i++) + { + if ((i == bitmap_requests->size()-1 || (*bitmap_requests)[i+1].osd_num != (*bitmap_requests)[i].osd_num) && + (*bitmap_requests)[i].osd_num != this->osd_num) + { + op_data->n_subops++; + } + } + if (op_data->n_subops) + { + op_data->fact_ver = 0; + op_data->done = op_data->errors = 0; + op_data->subops = new osd_op_t[op_data->n_subops]; + } + for (int i = 0, subop_idx = 0, prev = 0; i < bitmap_requests->size(); i++) + { + if (i == bitmap_requests->size()-1 || (*bitmap_requests)[i+1].osd_num != (*bitmap_requests)[i].osd_num) + { + osd_num_t subop_osd_num = (*bitmap_requests)[i].osd_num; + if (subop_osd_num == this->osd_num) + { + // Read bitmap synchronously from the local database + for (int j = prev; j <= i; j++) + { + bs->read_bitmap((*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf, NULL); + } + } + else + { + // Send to a remote OSD + osd_op_t *subop = op_data->subops+subop_idx; + subop->op_type = OSD_OP_OUT; + subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num); + // FIXME: Use the pre-allocated buffer + subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev)); + subop->req = (osd_any_op_t){ + .sec_read_bmp = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = c_cli.next_subop_id++, + .opcode = OSD_OP_SEC_READ_BMP, + }, + .len = sizeof(obj_ver_id)*(i+1-prev), + } + }; + obj_ver_id *ov = (obj_ver_id*)subop->buf; + for (int j = prev; j <= i; j++, ov++) + { + ov->oid = (*bitmap_requests)[j].oid; + ov->version = (*bitmap_requests)[j].version; + } + subop->callback = [cur_op, bitmap_requests, prev, i, this](osd_op_t *subop) + { + int requested_count = subop->req.sec_read_bmp.len / sizeof(obj_ver_id); + if (subop->reply.hdr.retval == requested_count * (8 + clean_entry_bitmap_size)) + { + void *cur_buf = subop->buf + 8; + for (int j = prev; j <= i; j++) + { + memcpy((*bitmap_requests)[j].bmp_buf, cur_buf, clean_entry_bitmap_size); + cur_buf += 8 + clean_entry_bitmap_size; + } + } + if ((cur_op->op_data->errors + cur_op->op_data->done + 1) >= cur_op->op_data->n_subops) + { + delete bitmap_requests; + } + handle_primary_subop(subop, cur_op); + }; + c_cli.outbox_push(subop); + subop_idx++; + } + prev = i+1; + } + } + if (!op_data->n_subops) + { + delete bitmap_requests; + } + return 0; +} + +std::vector osd_t::collect_chained_read_requests(osd_op_t *cur_op) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + std::vector chain_reads; + int stripe_count = (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : op_data->pg_size); + memset(op_data->stripes[0].bmp_buf, 0, stripe_count * clean_entry_bitmap_size); + uint8_t *global_bitmap = (uint8_t*)op_data->stripes[0].bmp_buf; + // We always use at most 1 read request per layer + for (int chain_pos = 0; chain_pos < op_data->chain_size; chain_pos++) + { + uint8_t *part_bitmap = ((uint8_t*)op_data->snapshot_bitmaps) + chain_pos*stripe_count*clean_entry_bitmap_size; + int start = (cur_op->req.rw.offset - op_data->oid.stripe)/bs_bitmap_granularity; + int end = start + cur_op->req.rw.len/bs_bitmap_granularity; + // Skip unneeded part in the beginning + while (start < end && ( + ((global_bitmap[start>>3] >> (start&7)) & 1) || + !((part_bitmap[start>>3] >> (start&7)) & 1))) + { + start++; + } + // Skip unneeded part in the end + while (start < end && ( + ((global_bitmap[(end-1)>>3] >> ((end-1)&7)) & 1) || + !((part_bitmap[(end-1)>>3] >> ((end-1)&7)) & 1))) + { + end--; + } + if (start < end) + { + // Copy (OR) bits in between + int cur = start; + for (; cur < end && (cur & 0x7); cur++) + { + global_bitmap[cur>>3] = global_bitmap[cur>>3] | (part_bitmap[cur>>3] & (1 << (cur&7))); + } + for (; cur <= end-8; cur += 8) + { + global_bitmap[cur>>3] = global_bitmap[cur>>3] | part_bitmap[cur>>3]; + } + for (; cur < end; cur++) + { + global_bitmap[cur>>3] = global_bitmap[cur>>3] | (part_bitmap[cur>>3] & (1 << (cur&7))); + } + // Add request + chain_reads.push_back((osd_chain_read_t){ + .chain_pos = chain_pos, + .inode = op_data->read_chain[chain_pos], + .offset = start*bs_bitmap_granularity, + .len = (end-start)*bs_bitmap_granularity, + }); + } + } + return chain_reads; +} + +int osd_t::submit_chained_read_requests(pg_t & pg, osd_op_t *cur_op) +{ + // Decide which parts of which objects we need to read based on bitmaps + osd_primary_op_data_t *op_data = cur_op->op_data; + auto chain_reads = collect_chained_read_requests(cur_op); + int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); + op_data->chain_read_count = chain_reads.size(); + op_data->chain_reads = (osd_chain_read_t*)calloc_or_die( + 1, (sizeof(osd_chain_read_t) + sizeof(osd_rmw_stripe_t)*stripe_count) * chain_reads.size() + ); + osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(((void*)op_data->chain_reads) + sizeof(osd_chain_read_t) * op_data->chain_read_count); + // Now process each subrequest as a separate read, including reconstruction if needed + // Prepare reads + int n_subops = 0; + uint64_t read_buffer_size = 0; + for (int cri = 0; cri < chain_reads.size(); cri++) + { + op_data->chain_reads[cri] = chain_reads[cri]; + object_id cur_oid = { .inode = chain_reads[cri].inode, .stripe = op_data->oid.stripe }; + // FIXME: maybe introduce split_read_stripes to shorten these lines and to remove read_start=req_start + osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count; + split_stripes(pg.pg_data_size, bs_block_size, chain_reads[cri].offset, chain_reads[cri].len, stripes); + if (op_data->scheme == POOL_SCHEME_REPLICATED && !stripes[0].req_end) + { + continue; + } + for (int role = 0; role < op_data->pg_data_size; role++) + { + stripes[role].read_start = stripes[role].req_start; + stripes[role].read_end = stripes[role].req_end; + } + uint64_t *cur_set = pg.cur_set.data(); + if (pg.state != PG_ACTIVE && op_data->scheme != POOL_SCHEME_REPLICATED) + { + pg_osd_set_state_t *object_state; + cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state); + if (extend_missing_stripes(stripes, cur_set, pg.pg_data_size, pg.pg_size) < 0) + { + free(op_data->chain_reads); + op_data->chain_reads = NULL; + finish_op(cur_op, -EIO); + return -1; + } + op_data->degraded = 1; + } + if (op_data->scheme == POOL_SCHEME_REPLICATED) + { + n_subops++; + read_buffer_size += stripes[0].read_end - stripes[0].read_start; + } + else + { + for (int role = 0; role < pg.pg_size; role++) + { + if (stripes[role].read_end > 0 && cur_set[role] != 0) + n_subops++; + if (stripes[role].read_end > 0) + read_buffer_size += stripes[role].read_end - stripes[role].read_start; + } + } + } + cur_op->buf = memalign_or_die(MEM_ALIGNMENT, read_buffer_size); + void *cur_buf = cur_op->buf; + for (int cri = 0; cri < chain_reads.size(); cri++) + { + osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count; + for (int role = 0; role < stripe_count; role++) + { + if (stripes[role].read_end > 0) + { + stripes[role].read_buf = cur_buf; + stripes[role].bmp_buf = op_data->snapshot_bitmaps + (chain_reads[cri].chain_pos*stripe_count + role)*clean_entry_bitmap_size; + cur_buf += stripes[role].read_end - stripes[role].read_start; + } + } + } + // Submit all reads + op_data->fact_ver = UINT64_MAX; + op_data->done = op_data->errors = 0; + op_data->n_subops = n_subops; + if (!n_subops) + { + return 0; + } + op_data->subops = new osd_op_t[n_subops]; + int cur_subops = 0; + for (int cri = 0; cri < chain_reads.size(); cri++) + { + osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count; + if (op_data->scheme == POOL_SCHEME_REPLICATED && !stripes[0].req_end) + { + continue; + } + object_id cur_oid = { .inode = chain_reads[cri].inode, .stripe = op_data->oid.stripe }; + auto vo_it = pg.ver_override.find(cur_oid); + uint64_t target_ver = vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX; + uint64_t *cur_set = pg.cur_set.data(); + if (pg.state != PG_ACTIVE && op_data->scheme != POOL_SCHEME_REPLICATED) + { + pg_osd_set_state_t *object_state; + cur_set = get_object_osd_set(pg, cur_oid, pg.cur_set.data(), &object_state); + } + int zero_read = -1; + if (op_data->scheme == POOL_SCHEME_REPLICATED) + { + for (int role = 0; role < op_data->pg_size; role++) + if (cur_set[role] == this->osd_num || zero_read == -1) + zero_read = role; + } + cur_subops += submit_primary_subop_batch(SUBMIT_READ, chain_reads[cri].inode, target_ver, stripes, cur_set, cur_op, cur_subops, zero_read); + } + assert(cur_subops == n_subops); + return 0; +} + +void osd_t::send_chained_read_results(pg_t & pg, osd_op_t *cur_op) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); + osd_rmw_stripe_t *chain_stripes = (osd_rmw_stripe_t*)(((void*)op_data->chain_reads) + sizeof(osd_chain_read_t) * op_data->chain_read_count); + // Reconstruct parts if needed + if (op_data->degraded) + { + int stripe_count = (pg.scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); + for (int cri = 0; cri < op_data->chain_read_count; cri++) + { + // Reconstruct missing stripes + osd_rmw_stripe_t *stripes = chain_stripes + cri*stripe_count; + if (op_data->scheme == POOL_SCHEME_XOR) + { + reconstruct_stripes_xor(stripes, pg.pg_size, clean_entry_bitmap_size); + } + else if (op_data->scheme == POOL_SCHEME_JERASURE) + { + reconstruct_stripes_jerasure(stripes, pg.pg_size, pg.pg_data_size, clean_entry_bitmap_size); + } + } + } + // Send bitmap + cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size; + cur_op->iov.push_back(op_data->stripes[0].bmp_buf, cur_op->reply.rw.bitmap_len); + // And finally compose the result + uint64_t sent = 0; + int prev_pos = 0, pos = 0; + bool prev_set = false; + int prev = (cur_op->req.rw.offset - op_data->oid.stripe) / bs_bitmap_granularity; + int end = prev + cur_op->req.rw.len/bs_bitmap_granularity; + int cur = prev; + while (cur <= end) + { + bool has_bit = false; + if (cur < end) + { + for (pos = 0; pos < op_data->chain_size; pos++) + { + has_bit = (((uint8_t*)op_data->snapshot_bitmaps)[pos*stripe_count*clean_entry_bitmap_size + cur/8] >> (cur%8)) & 1; + if (has_bit) + break; + } + } + if (has_bit != prev_set || pos != prev_pos || cur == end) + { + if (cur > prev) + { + // Send buffer in parts to avoid copying + if (!prev_set) + { + while ((cur-prev) > zero_buffer_size/bs_bitmap_granularity) + { + cur_op->iov.push_back(zero_buffer, zero_buffer_size); + sent += zero_buffer_size; + prev += zero_buffer_size/bs_bitmap_granularity; + } + cur_op->iov.push_back(zero_buffer, (cur-prev)*bs_bitmap_granularity); + sent += (cur-prev)*bs_bitmap_granularity; + } + else + { + osd_rmw_stripe_t *stripes = chain_stripes + prev_pos*stripe_count; + while (cur > prev) + { + int role = prev*bs_bitmap_granularity/bs_block_size; + int role_start = prev*bs_bitmap_granularity - role*bs_block_size; + int role_end = cur*bs_bitmap_granularity - role*bs_block_size; + if (role_end > bs_block_size) + role_end = bs_block_size; + assert(stripes[role].read_buf); + cur_op->iov.push_back( + stripes[role].read_buf + (role_start - stripes[role].read_start), + role_end - role_start + ); + sent += role_end - role_start; + prev += (role_end - role_start)/bs_bitmap_granularity; + } + } + } + prev = cur; + prev_pos = pos; + prev_set = has_bit; + } + cur++; + } + assert(sent == cur_op->req.rw.len); + free(op_data->chain_reads); + op_data->chain_reads = NULL; +} diff --git a/src/osd_primary_subops.cpp b/src/osd_primary_subops.cpp index d52a7e93..68f8ae73 100644 --- a/src/osd_primary_subops.cpp +++ b/src/osd_primary_subops.cpp @@ -76,9 +76,6 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) } } assert(!cur_op->op_data->subops); - assert(!cur_op->op_data->unstable_write_osds); - assert(!cur_op->op_data->unstable_writes); - assert(!cur_op->op_data->dirty_pgs); free(cur_op->op_data); cur_op->op_data = NULL; } @@ -106,7 +103,7 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) } } -void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_size, const uint64_t* osd_set, osd_op_t *cur_op) +void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, const uint64_t* osd_set, osd_op_t *cur_op) { bool wr = submit_type == SUBMIT_WRITE; osd_primary_op_data_t *op_data = cur_op->op_data; @@ -114,32 +111,34 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s bool rep = op_data->scheme == POOL_SCHEME_REPLICATED; // Allocate subops int n_subops = 0, zero_read = -1; - for (int role = 0; role < pg_size; role++) + for (int role = 0; role < op_data->pg_size; role++) { if (osd_set[role] == this->osd_num || osd_set[role] != 0 && zero_read == -1) - { zero_read = role; - } if (osd_set[role] != 0 && (wr || !rep && stripes[role].read_end != 0)) - { n_subops++; - } } if (!n_subops && (submit_type == SUBMIT_RMW_READ || rep)) - { n_subops = 1; - } else - { zero_read = -1; - } osd_op_t *subops = new osd_op_t[n_subops]; op_data->fact_ver = 0; op_data->done = op_data->errors = 0; op_data->n_subops = n_subops; op_data->subops = subops; - int i = 0; - for (int role = 0; role < pg_size; role++) + int sent = submit_primary_subop_batch(submit_type, op_data->oid.inode, op_version, op_data->stripes, osd_set, cur_op, 0, zero_read); + assert(sent == n_subops); +} + +int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t op_version, + osd_rmw_stripe_t *stripes, const uint64_t* osd_set, osd_op_t *cur_op, int subop_idx, int zero_read) +{ + bool wr = submit_type == SUBMIT_WRITE; + osd_primary_op_data_t *op_data = cur_op->op_data; + bool rep = op_data->scheme == POOL_SCHEME_REPLICATED; + int i = subop_idx; + for (int role = 0; role < op_data->pg_size; role++) { // We always submit zero-length writes to all replicas, even if the stripe is not modified if (!(wr || !rep && stripes[role].read_end != 0 || zero_read == role)) @@ -150,20 +149,21 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s if (role_osd_num != 0) { int stripe_num = rep ? 0 : role; + osd_op_t *subop = op_data->subops + i; if (role_osd_num == this->osd_num) { - clock_gettime(CLOCK_REALTIME, &subops[i].tv_begin); - subops[i].op_type = (uint64_t)cur_op; - subops[i].bitmap = stripes[stripe_num].bmp_buf; - subops[i].bitmap_len = clean_entry_bitmap_size; - subops[i].bs_op = new blockstore_op_t({ + clock_gettime(CLOCK_REALTIME, &subop->tv_begin); + subop->op_type = (uint64_t)cur_op; + subop->bitmap = stripes[stripe_num].bmp_buf; + subop->bitmap_len = clean_entry_bitmap_size; + subop->bs_op = new blockstore_op_t({ .opcode = (uint64_t)(wr ? (rep ? BS_OP_WRITE_STABLE : BS_OP_WRITE) : BS_OP_READ), - .callback = [subop = &subops[i], this](blockstore_op_t *bs_subop) + .callback = [subop, this](blockstore_op_t *bs_subop) { handle_primary_bs_subop(subop); }, .oid = { - .inode = op_data->oid.inode, + .inode = inode, .stripe = op_data->oid.stripe | stripe_num, }, .version = op_version, @@ -175,26 +175,26 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s #ifdef OSD_DEBUG printf( "Submit %s to local: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read", - op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version, - subops[i].bs_op->offset, subops[i].bs_op->len + inode, op_data->oid.stripe | stripe_num, op_version, + subop->bs_op->offset, subop->bs_op->len ); #endif - bs->enqueue_op(subops[i].bs_op); + bs->enqueue_op(subop->bs_op); } else { - subops[i].op_type = OSD_OP_OUT; - subops[i].peer_fd = c_cli.osd_peer_fds.at(role_osd_num); - subops[i].bitmap = stripes[stripe_num].bmp_buf; - subops[i].bitmap_len = clean_entry_bitmap_size; - subops[i].req.sec_rw = { + subop->op_type = OSD_OP_OUT; + subop->peer_fd = c_cli.osd_peer_fds.at(role_osd_num); + subop->bitmap = stripes[stripe_num].bmp_buf; + subop->bitmap_len = clean_entry_bitmap_size; + subop->req.sec_rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, .id = c_cli.next_subop_id++, .opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ), }, .oid = { - .inode = op_data->oid.inode, + .inode = inode, .stripe = op_data->oid.stripe | stripe_num, }, .version = op_version, @@ -205,33 +205,34 @@ void osd_t::submit_primary_subops(int submit_type, uint64_t op_version, int pg_s #ifdef OSD_DEBUG printf( "Submit %s to osd %lu: %lx:%lx v%lu %u-%u\n", wr ? "write" : "read", role_osd_num, - op_data->oid.inode, op_data->oid.stripe | stripe_num, op_version, - subops[i].req.sec_rw.offset, subops[i].req.sec_rw.len + inode, op_data->oid.stripe | stripe_num, op_version, + subop->req.sec_rw.offset, subop->req.sec_rw.len ); #endif if (wr) { if (stripes[stripe_num].write_end > stripes[stripe_num].write_start) { - subops[i].iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start); + subop->iov.push_back(stripes[stripe_num].write_buf, stripes[stripe_num].write_end - stripes[stripe_num].write_start); } } else { if (stripes[stripe_num].read_end > stripes[stripe_num].read_start) { - subops[i].iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start); + subop->iov.push_back(stripes[stripe_num].read_buf, stripes[stripe_num].read_end - stripes[stripe_num].read_start); } } - subops[i].callback = [cur_op, this](osd_op_t *subop) + subop->callback = [cur_op, this](osd_op_t *subop) { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(&subops[i]); + c_cli.outbox_push(subop); } i++; } } + return i-subop_idx; } static uint64_t bs_op_to_osd_op[] = { @@ -302,8 +303,13 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) { uint64_t opcode = subop->req.hdr.opcode; int retval = subop->reply.hdr.retval; - int expected = opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE - || opcode == OSD_OP_SEC_WRITE_STABLE ? subop->req.sec_rw.len : 0; + int expected; + if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE || opcode == OSD_OP_SEC_WRITE_STABLE) + expected = subop->req.sec_rw.len; + else if (opcode == OSD_OP_SEC_READ_BMP) + expected = subop->req.sec_read_bmp.len / sizeof(obj_ver_id) * (8 + clean_entry_bitmap_size); + else + expected = 0; osd_primary_op_data_t *op_data = cur_op->op_data; if (retval != expected) { @@ -330,14 +336,17 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) ? c_cli.clients[subop->peer_fd]->osd_num : osd_num; printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version); #endif - if (op_data->fact_ver != 0 && op_data->fact_ver != version) + if (op_data->fact_ver != UINT64_MAX) { - throw std::runtime_error( - "different fact_versions returned from "+std::string(osd_op_names[opcode])+ - " subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver) - ); + if (op_data->fact_ver != 0 && op_data->fact_ver != version) + { + throw std::runtime_error( + "different fact_versions returned from "+std::string(osd_op_names[opcode])+ + " subops: "+std::to_string(version)+" vs "+std::to_string(op_data->fact_ver) + ); + } + op_data->fact_ver = version; } - op_data->fact_ver = version; } } if ((op_data->errors + op_data->done) >= op_data->n_subops) diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index fa434388..28edf9d1 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -87,7 +87,7 @@ resume_1: } } // Read required blocks - submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op); + submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, op_data->prev_set, cur_op); resume_2: op_data->st = 2; return; @@ -158,7 +158,7 @@ resume_10: return; } } - submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op); + submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.cur_set.data(), cur_op); resume_4: op_data->st = 4; return; diff --git a/src/osd_rmw.cpp b/src/osd_rmw.cpp index 27e297d7..8795e14c 100644 --- a/src/osd_rmw.cpp +++ b/src/osd_rmw.cpp @@ -246,20 +246,23 @@ void reconstruct_stripes_jerasure(osd_rmw_stripe_t *stripes, int pg_size, int pg { if (stripes[role].read_end != 0 && stripes[role].missing) { - for (int other = 0; other < pg_size; other++) + if (stripes[role].read_end > stripes[role].read_start) { - if (stripes[other].read_end != 0 && !stripes[other].missing) + for (int other = 0; other < pg_size; other++) { - assert(stripes[other].read_start <= stripes[role].read_start); - assert(stripes[other].read_end >= stripes[role].read_end); - data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start)); + if (stripes[other].read_end != 0 && !stripes[other].missing) + { + assert(stripes[other].read_start <= stripes[role].read_start); + assert(stripes[other].read_end >= stripes[role].read_end); + data_ptrs[other] = (char*)(stripes[other].read_buf + (stripes[role].read_start - stripes[other].read_start)); + } } + data_ptrs[role] = (char*)stripes[role].read_buf; + jerasure_matrix_dotprod( + pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role, + data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start + ); } - data_ptrs[role] = (char*)stripes[role].read_buf; - jerasure_matrix_dotprod( - pg_minsize, OSD_JERASURE_W, decoding_matrix+(role*pg_minsize), dm_ids, role, - data_ptrs, data_ptrs+pg_minsize, stripes[role].read_end - stripes[role].read_start - ); for (int other = 0; other < pg_size; other++) { if (stripes[other].read_end != 0 && !stripes[other].missing)