From 0949f0840707f16b6ea1935bf294155a0e27a6bf Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Tue, 23 Mar 2021 00:34:46 +0300 Subject: [PATCH] Extract osd_primary write and sync code into separate files --- src/CMakeLists.txt | 3 +- src/osd_primary.cpp | 619 -------------------------------------- src/osd_primary_sync.cpp | 263 ++++++++++++++++ src/osd_primary_write.cpp | 365 ++++++++++++++++++++++ 4 files changed, 630 insertions(+), 620 deletions(-) create mode 100644 src/osd_primary_sync.cpp create mode 100644 src/osd_primary_write.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index aa33e395..a87f8161 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -66,7 +66,8 @@ target_link_libraries(fio_vitastor_blk # vitastor-osd add_executable(vitastor-osd osd_main.cpp osd.cpp osd_secondary.cpp msgr_receive.cpp msgr_send.cpp osd_peering.cpp osd_flush.cpp osd_peering_pg.cpp - osd_primary.cpp osd_primary_subops.cpp etcd_state_client.cpp messenger.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp + osd_primary.cpp osd_primary_sync.cpp osd_primary_write.cpp osd_primary_subops.cpp + etcd_state_client.cpp messenger.cpp osd_cluster.cpp http_client.cpp osd_ops.cpp pg_states.cpp osd_rmw.cpp base64.cpp timerfd_manager.cpp epoll_manager.cpp ../json11/json11.cpp ) target_link_libraries(vitastor-osd diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index 3c6cbf0b..fed7de8a 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -177,625 +177,6 @@ resume_2: finish_op(cur_op, cur_op->req.rw.len); } -bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg) -{ - osd_primary_op_data_t *op_data = cur_op->op_data; - // Check if actions are pending for this object - auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){ - .oid = op_data->oid, - .osd_num = 0, - }); - if (act_it != pg.flush_actions.end() && - act_it->first.oid.inode == op_data->oid.inode && - (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) - { - pg.write_queue.emplace(op_data->oid, cur_op); - return false; - } - // Check if there are other write requests to the same object - auto vo_it = pg.write_queue.find(op_data->oid); - if (vo_it != pg.write_queue.end()) - { - op_data->st = 1; - pg.write_queue.emplace(op_data->oid, cur_op); - return false; - } - pg.write_queue.emplace(op_data->oid, cur_op); - return true; -} - -void osd_t::continue_primary_write(osd_op_t *cur_op) -{ - if (!cur_op->op_data && !prepare_primary_rw(cur_op)) - { - return; - } - osd_primary_op_data_t *op_data = cur_op->op_data; - auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }); - if (op_data->st == 1) goto resume_1; - else if (op_data->st == 2) goto resume_2; - else if (op_data->st == 3) goto resume_3; - else if (op_data->st == 4) goto resume_4; - else if (op_data->st == 5) goto resume_5; - else if (op_data->st == 6) goto resume_6; - else if (op_data->st == 7) goto resume_7; - else if (op_data->st == 8) goto resume_8; - else if (op_data->st == 9) goto resume_9; - else if (op_data->st == 10) goto resume_10; - assert(op_data->st == 0); - if (!check_write_queue(cur_op, pg)) - { - return; - } -resume_1: - // Determine blocks to read and write - // Missing chunks are allowed to be overwritten even in incomplete objects - // FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact - op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); - if (op_data->scheme == POOL_SCHEME_REPLICATED) - { - // Simplified algorithm - op_data->stripes[0].write_start = op_data->stripes[0].req_start; - op_data->stripes[0].write_end = op_data->stripes[0].req_end; - op_data->stripes[0].write_buf = cur_op->buf; - if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 || - op_data->stripes[0].write_end != bs_block_size)) - { - // Object is degraded/misplaced and will be moved to - op_data->stripes[0].read_start = 0; - op_data->stripes[0].read_end = bs_block_size; - cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign_or_die(MEM_ALIGNMENT, bs_block_size); - } - } - else - { - cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set, - pg.pg_size, op_data->pg_data_size, pg.pg_cursize, pg.cur_set.data(), bs_block_size); - if (!cur_op->rmw_buf) - { - // Refuse partial overwrite of an incomplete object - cur_op->reply.hdr.retval = -EINVAL; - goto continue_others; - } - } - // Read required blocks - submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op); -resume_2: - op_data->st = 2; - return; -resume_3: - if (op_data->errors > 0) - { - pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); - return; - } - if (op_data->scheme == POOL_SCHEME_REPLICATED) - { - // Only (possibly) copy new data from the request into the recovery buffer - if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 || - op_data->stripes[0].write_end != bs_block_size)) - { - memcpy( - op_data->stripes[0].read_buf + op_data->stripes[0].req_start, - op_data->stripes[0].write_buf, - op_data->stripes[0].req_end - op_data->stripes[0].req_start - ); - op_data->stripes[0].write_buf = op_data->stripes[0].read_buf; - op_data->stripes[0].write_start = 0; - op_data->stripes[0].write_end = bs_block_size; - } - } - else - { - // For EC/XOR pools, save version override to make it impossible - // for parallel reads to read different versions of data and parity - pg.ver_override[op_data->oid] = op_data->fact_ver; - // Recover missing stripes, calculate parity - if (pg.scheme == POOL_SCHEME_XOR) - { - calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); - } - else if (pg.scheme == POOL_SCHEME_JERASURE) - { - calc_rmw_parity_jerasure(op_data->stripes, pg.pg_size, op_data->pg_data_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); - } - } - // Send writes - if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch) - { - op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1; - } - else - { - if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1)) - { - assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1)); - pg.epoch++; - } - op_data->target_ver = op_data->fact_ver + 1; - } - if (pg.epoch > pg.reported_epoch) - { - // Report newer epoch before writing - // FIXME: We may report only one PG state here... - this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); - pg.history_changed = true; - report_pg_states(); -resume_10: - if (pg.epoch > pg.reported_epoch) - { - op_data->st = 10; - return; - } - } - submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op); -resume_4: - op_data->st = 4; - return; -resume_5: - if (op_data->scheme != POOL_SCHEME_REPLICATED) - { - // Remove version override just after the write, but before stabilizing - pg.ver_override.erase(op_data->oid); - } - if (op_data->object_state) - { - // We must forget the unclean state of the object before deleting it - // so the next reads don't accidentally read a deleted version - // And it should be done at the same time as the removal of the version override - remove_object_from_state(op_data->oid, op_data->object_state, pg); - pg.clean_count++; - } - if (op_data->errors > 0) - { - free_object_state(pg, &op_data->object_state); - pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); - return; - } -resume_6: -resume_7: - if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6)) - { - // FIXME: Check for immediate_commit == IMMEDIATE_SMALL - free_object_state(pg, &op_data->object_state); - return; - } - if (op_data->fact_ver == 1) - { - // Object is created - pg.clean_count++; - pg.total_count++; - } - if (op_data->object_state) - { - { - int recovery_type = op_data->object_state->state & (OBJ_DEGRADED|OBJ_INCOMPLETE) ? 0 : 1; - recovery_stat_count[0][recovery_type]++; - if (!recovery_stat_count[0][recovery_type]) - { - recovery_stat_count[0][recovery_type]++; - recovery_stat_bytes[0][recovery_type] = 0; - } - for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++) - { - recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start; - } - } - // Any kind of a non-clean object can have extra chunks, because we don't record objects - // as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks - if (immediate_commit != IMMEDIATE_ALL) - { - // We can't remove extra chunks yet if fsyncs are explicit, because - // new copies may not be committed to stable storage yet - // We can only remove extra chunks after a successful SYNC for this PG - for (auto & chunk: op_data->object_state->osd_set) - { - // Check is the same as in submit_primary_del_subops() - if (op_data->scheme == POOL_SCHEME_REPLICATED - ? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num) - : (chunk.osd_num != pg.cur_set[chunk.role])) - { - pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){ - .osd_num = chunk.osd_num, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role), - }, - .version = op_data->fact_ver, - }); - copies_to_delete_after_sync_count++; - } - } - free_object_state(pg, &op_data->object_state); - } - else - { - submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set); - free_object_state(pg, &op_data->object_state); - if (op_data->n_subops > 0) - { -resume_8: - op_data->st = 8; - return; -resume_9: - if (op_data->errors > 0) - { - pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); - return; - } - } - } - } - cur_op->reply.hdr.retval = cur_op->req.rw.len; -continue_others: - object_id oid = op_data->oid; - // Remove the operation from queue before calling finish_op so it doesn't see the completed operation in queue - auto next_it = pg.write_queue.find(oid); - if (next_it != pg.write_queue.end() && next_it->second == cur_op) - { - pg.write_queue.erase(next_it++); - } - // finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :) - finish_op(cur_op, cur_op->reply.hdr.retval); - // Continue other write operations to the same object - if (next_it != pg.write_queue.end() && next_it->first == oid) - { - osd_op_t *next_op = next_it->second; - continue_primary_write(next_op); - } -} - -bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state) -{ - osd_primary_op_data_t *op_data = cur_op->op_data; - if (op_data->st == base_state) - { - goto resume_6; - } - else if (op_data->st == base_state+1) - { - goto resume_7; - } - // FIXME: Check for immediate_commit == IMMEDIATE_SMALL - if (immediate_commit == IMMEDIATE_ALL) - { - if (op_data->scheme != POOL_SCHEME_REPLICATED) - { - // Send STABILIZE ops immediately - op_data->unstable_write_osds = new std::vector(); - op_data->unstable_writes = new obj_ver_id[loc_set.size()]; - { - int last_start = 0; - for (auto & chunk: loc_set) - { - op_data->unstable_writes[last_start] = (obj_ver_id){ - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, - }, - .version = op_data->fact_ver, - }; - op_data->unstable_write_osds->push_back((unstable_osd_num_t){ - .osd_num = chunk.osd_num, - .start = last_start, - .len = 1, - }); - last_start++; - } - } - submit_primary_stab_subops(cur_op); -resume_6: - op_data->st = 6; - return false; -resume_7: - // FIXME: Free those in the destructor? - delete op_data->unstable_write_osds; - delete[] op_data->unstable_writes; - op_data->unstable_writes = NULL; - op_data->unstable_write_osds = NULL; - if (op_data->errors > 0) - { - pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); - return false; - } - } - } - else - { - if (op_data->scheme != POOL_SCHEME_REPLICATED) - { - // Remember version as unstable for EC/XOR - for (auto & chunk: loc_set) - { - this->dirty_osds.insert(chunk.osd_num); - this->unstable_writes[(osd_object_id_t){ - .osd_num = chunk.osd_num, - .oid = { - .inode = op_data->oid.inode, - .stripe = op_data->oid.stripe | chunk.role, - }, - }] = op_data->fact_ver; - } - } - else - { - // Only remember to sync OSDs for replicated pools - for (auto & chunk: loc_set) - { - this->dirty_osds.insert(chunk.osd_num); - } - } - // Remember PG as dirty to drop the connection when PG goes offline - // (this is required because of the "lazy sync") - auto cl_it = c_cli.clients.find(cur_op->peer_fd); - if (cl_it != c_cli.clients.end()) - { - cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); - } - dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); - } - return true; -} - -// Save and clear unstable_writes -> SYNC all -> STABLE all -void osd_t::continue_primary_sync(osd_op_t *cur_op) -{ - if (!cur_op->op_data) - { - cur_op->op_data = (osd_primary_op_data_t*)calloc_or_die(1, sizeof(osd_primary_op_data_t)); - } - osd_primary_op_data_t *op_data = cur_op->op_data; - if (op_data->st == 1) goto resume_1; - else if (op_data->st == 2) goto resume_2; - else if (op_data->st == 3) goto resume_3; - else if (op_data->st == 4) goto resume_4; - else if (op_data->st == 5) goto resume_5; - else if (op_data->st == 6) goto resume_6; - else if (op_data->st == 7) goto resume_7; - else if (op_data->st == 8) goto resume_8; - assert(op_data->st == 0); - if (syncs_in_progress.size() > 0) - { - // Wait for previous syncs, if any - // FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all - syncs_in_progress.push_back(cur_op); - op_data->st = 1; -resume_1: - return; - } - else - { - syncs_in_progress.push_back(cur_op); - } -resume_2: - if (dirty_osds.size() == 0) - { - // Nothing to sync - goto finish; - } - // Save and clear unstable_writes - // In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication - // It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway - if (unstable_writes.size() > 0) - { - op_data->unstable_write_osds = new std::vector(); - op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()]; - osd_num_t last_osd = 0; - int last_start = 0, last_end = 0; - for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++) - { - if (last_osd != it->first.osd_num) - { - if (last_osd != 0) - { - op_data->unstable_write_osds->push_back((unstable_osd_num_t){ - .osd_num = last_osd, - .start = last_start, - .len = last_end - last_start, - }); - } - last_osd = it->first.osd_num; - last_start = last_end; - } - op_data->unstable_writes[last_end] = (obj_ver_id){ - .oid = it->first.oid, - .version = it->second, - }; - last_end++; - } - if (last_osd != 0) - { - op_data->unstable_write_osds->push_back((unstable_osd_num_t){ - .osd_num = last_osd, - .start = last_start, - .len = last_end - last_start, - }); - } - this->unstable_writes.clear(); - } - { - void *dirty_buf = malloc_or_die( - sizeof(pool_pg_num_t)*dirty_pgs.size() + - sizeof(osd_num_t)*dirty_osds.size() + - sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count - ); - op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf; - op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size()); - op_data->dirty_pg_count = dirty_pgs.size(); - op_data->dirty_osd_count = dirty_osds.size(); - if (this->copies_to_delete_after_sync_count) - { - op_data->copies_to_delete_count = 0; - op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count); - for (auto dirty_pg_num: dirty_pgs) - { - auto & pg = pgs.at(dirty_pg_num); - assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count); - memcpy( - op_data->copies_to_delete + op_data->copies_to_delete_count, - pg.copies_to_delete_after_sync.data(), - sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size() - ); - op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size(); - this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size(); - pg.copies_to_delete_after_sync.clear(); - } - assert(this->copies_to_delete_after_sync_count == 0); - } - int dpg = 0; - for (auto dirty_pg_num: dirty_pgs) - { - pgs.at(dirty_pg_num).inflight++; - op_data->dirty_pgs[dpg++] = dirty_pg_num; - } - dirty_pgs.clear(); - dpg = 0; - for (auto osd_num: dirty_osds) - { - op_data->dirty_osds[dpg++] = osd_num; - } - dirty_osds.clear(); - } - if (immediate_commit != IMMEDIATE_ALL) - { - // SYNC - if (!submit_primary_sync_subops(cur_op)) - { - goto resume_4; - } -resume_3: - op_data->st = 3; - return; -resume_4: - if (op_data->errors > 0) - { - goto resume_6; - } - } - if (op_data->unstable_writes) - { - // Stabilize version sets, if any - submit_primary_stab_subops(cur_op); -resume_5: - op_data->st = 5; - return; - } -resume_6: - if (op_data->errors > 0) - { - // Return PGs and OSDs back into their dirty sets - for (int i = 0; i < op_data->dirty_pg_count; i++) - { - dirty_pgs.insert(op_data->dirty_pgs[i]); - } - for (int i = 0; i < op_data->dirty_osd_count; i++) - { - dirty_osds.insert(op_data->dirty_osds[i]); - } - if (op_data->unstable_writes) - { - // Return objects back into the unstable write set - for (auto unstable_osd: *(op_data->unstable_write_osds)) - { - for (int i = 0; i < unstable_osd.len; i++) - { - // Except those from peered PGs - auto & w = op_data->unstable_writes[i]; - pool_pg_num_t wpg = { - .pool_id = INODE_POOL(w.oid.inode), - .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size), - }; - if (pgs.at(wpg).state & PG_ACTIVE) - { - uint64_t & dest = this->unstable_writes[(osd_object_id_t){ - .osd_num = unstable_osd.osd_num, - .oid = w.oid, - }]; - dest = dest < w.version ? w.version : dest; - dirty_pgs.insert(wpg); - } - } - } - } - if (op_data->copies_to_delete) - { - // Return 'copies to delete' back into respective PGs - for (int i = 0; i < op_data->copies_to_delete_count; i++) - { - auto & w = op_data->copies_to_delete[i]; - auto & pg = pgs.at((pool_pg_num_t){ - .pool_id = INODE_POOL(w.oid.inode), - .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size), - }); - if (pg.state & PG_ACTIVE) - { - pg.copies_to_delete_after_sync.push_back(w); - copies_to_delete_after_sync_count++; - } - } - } - } - else if (op_data->copies_to_delete) - { - // Actually delete copies which we wanted to delete - submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count); -resume_7: - op_data->st = 7; - return; -resume_8: - if (op_data->errors > 0) - { - goto resume_6; - } - } - for (int i = 0; i < op_data->dirty_pg_count; i++) - { - auto & pg = pgs.at(op_data->dirty_pgs[i]); - pg.inflight--; - if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch && - // We must either forget all PG's unstable writes or wait for it to become clean - dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end()) - { - finish_stop_pg(pg); - } - } - // FIXME: Free those in the destructor? - free(op_data->dirty_pgs); - op_data->dirty_pgs = NULL; - op_data->dirty_osds = NULL; - if (op_data->unstable_writes) - { - delete op_data->unstable_write_osds; - delete[] op_data->unstable_writes; - op_data->unstable_writes = NULL; - op_data->unstable_write_osds = NULL; - } - if (op_data->errors > 0) - { - finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO); - } - else - { -finish: - if (cur_op->peer_fd) - { - auto it = c_cli.clients.find(cur_op->peer_fd); - if (it != c_cli.clients.end()) - it->second->dirty_pgs.clear(); - } - finish_op(cur_op, 0); - } - assert(syncs_in_progress.front() == cur_op); - syncs_in_progress.pop_front(); - if (syncs_in_progress.size() > 0) - { - cur_op = syncs_in_progress.front(); - op_data = cur_op->op_data; - op_data->st++; - goto resume_2; - } -} - // Decrement pg_osd_set_state_t's object_count and change PG state accordingly void osd_t::remove_object_from_state(object_id & oid, pg_osd_set_state_t *object_state, pg_t & pg) { diff --git a/src/osd_primary_sync.cpp b/src/osd_primary_sync.cpp new file mode 100644 index 00000000..536e0b29 --- /dev/null +++ b/src/osd_primary_sync.cpp @@ -0,0 +1,263 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) + +#include "osd_primary.h" + +// Save and clear unstable_writes -> SYNC all -> STABLE all +void osd_t::continue_primary_sync(osd_op_t *cur_op) +{ + if (!cur_op->op_data) + { + cur_op->op_data = (osd_primary_op_data_t*)calloc_or_die(1, sizeof(osd_primary_op_data_t)); + } + osd_primary_op_data_t *op_data = cur_op->op_data; + if (op_data->st == 1) goto resume_1; + else if (op_data->st == 2) goto resume_2; + else if (op_data->st == 3) goto resume_3; + else if (op_data->st == 4) goto resume_4; + else if (op_data->st == 5) goto resume_5; + else if (op_data->st == 6) goto resume_6; + else if (op_data->st == 7) goto resume_7; + else if (op_data->st == 8) goto resume_8; + assert(op_data->st == 0); + if (syncs_in_progress.size() > 0) + { + // Wait for previous syncs, if any + // FIXME: We may try to execute the current one in parallel, like in Blockstore, but I'm not sure if it matters at all + syncs_in_progress.push_back(cur_op); + op_data->st = 1; +resume_1: + return; + } + else + { + syncs_in_progress.push_back(cur_op); + } +resume_2: + if (dirty_osds.size() == 0) + { + // Nothing to sync + goto finish; + } + // Save and clear unstable_writes + // In theory it is possible to do in on a per-client basis, but this seems to be an unnecessary complication + // It would be cool not to copy these here at all, but someone has to deduplicate them by object IDs anyway + if (unstable_writes.size() > 0) + { + op_data->unstable_write_osds = new std::vector(); + op_data->unstable_writes = new obj_ver_id[this->unstable_writes.size()]; + osd_num_t last_osd = 0; + int last_start = 0, last_end = 0; + for (auto it = this->unstable_writes.begin(); it != this->unstable_writes.end(); it++) + { + if (last_osd != it->first.osd_num) + { + if (last_osd != 0) + { + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = last_osd, + .start = last_start, + .len = last_end - last_start, + }); + } + last_osd = it->first.osd_num; + last_start = last_end; + } + op_data->unstable_writes[last_end] = (obj_ver_id){ + .oid = it->first.oid, + .version = it->second, + }; + last_end++; + } + if (last_osd != 0) + { + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = last_osd, + .start = last_start, + .len = last_end - last_start, + }); + } + this->unstable_writes.clear(); + } + { + void *dirty_buf = malloc_or_die( + sizeof(pool_pg_num_t)*dirty_pgs.size() + + sizeof(osd_num_t)*dirty_osds.size() + + sizeof(obj_ver_osd_t)*this->copies_to_delete_after_sync_count + ); + op_data->dirty_pgs = (pool_pg_num_t*)dirty_buf; + op_data->dirty_osds = (osd_num_t*)(dirty_buf + sizeof(pool_pg_num_t)*dirty_pgs.size()); + op_data->dirty_pg_count = dirty_pgs.size(); + op_data->dirty_osd_count = dirty_osds.size(); + if (this->copies_to_delete_after_sync_count) + { + op_data->copies_to_delete_count = 0; + op_data->copies_to_delete = (obj_ver_osd_t*)(op_data->dirty_osds + op_data->dirty_osd_count); + for (auto dirty_pg_num: dirty_pgs) + { + auto & pg = pgs.at(dirty_pg_num); + assert(pg.copies_to_delete_after_sync.size() <= this->copies_to_delete_after_sync_count); + memcpy( + op_data->copies_to_delete + op_data->copies_to_delete_count, + pg.copies_to_delete_after_sync.data(), + sizeof(obj_ver_osd_t)*pg.copies_to_delete_after_sync.size() + ); + op_data->copies_to_delete_count += pg.copies_to_delete_after_sync.size(); + this->copies_to_delete_after_sync_count -= pg.copies_to_delete_after_sync.size(); + pg.copies_to_delete_after_sync.clear(); + } + assert(this->copies_to_delete_after_sync_count == 0); + } + int dpg = 0; + for (auto dirty_pg_num: dirty_pgs) + { + pgs.at(dirty_pg_num).inflight++; + op_data->dirty_pgs[dpg++] = dirty_pg_num; + } + dirty_pgs.clear(); + dpg = 0; + for (auto osd_num: dirty_osds) + { + op_data->dirty_osds[dpg++] = osd_num; + } + dirty_osds.clear(); + } + if (immediate_commit != IMMEDIATE_ALL) + { + // SYNC + if (!submit_primary_sync_subops(cur_op)) + { + goto resume_4; + } +resume_3: + op_data->st = 3; + return; +resume_4: + if (op_data->errors > 0) + { + goto resume_6; + } + } + if (op_data->unstable_writes) + { + // Stabilize version sets, if any + submit_primary_stab_subops(cur_op); +resume_5: + op_data->st = 5; + return; + } +resume_6: + if (op_data->errors > 0) + { + // Return PGs and OSDs back into their dirty sets + for (int i = 0; i < op_data->dirty_pg_count; i++) + { + dirty_pgs.insert(op_data->dirty_pgs[i]); + } + for (int i = 0; i < op_data->dirty_osd_count; i++) + { + dirty_osds.insert(op_data->dirty_osds[i]); + } + if (op_data->unstable_writes) + { + // Return objects back into the unstable write set + for (auto unstable_osd: *(op_data->unstable_write_osds)) + { + for (int i = 0; i < unstable_osd.len; i++) + { + // Except those from peered PGs + auto & w = op_data->unstable_writes[i]; + pool_pg_num_t wpg = { + .pool_id = INODE_POOL(w.oid.inode), + .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size), + }; + if (pgs.at(wpg).state & PG_ACTIVE) + { + uint64_t & dest = this->unstable_writes[(osd_object_id_t){ + .osd_num = unstable_osd.osd_num, + .oid = w.oid, + }]; + dest = dest < w.version ? w.version : dest; + dirty_pgs.insert(wpg); + } + } + } + } + if (op_data->copies_to_delete) + { + // Return 'copies to delete' back into respective PGs + for (int i = 0; i < op_data->copies_to_delete_count; i++) + { + auto & w = op_data->copies_to_delete[i]; + auto & pg = pgs.at((pool_pg_num_t){ + .pool_id = INODE_POOL(w.oid.inode), + .pg_num = map_to_pg(w.oid, st_cli.pool_config.at(INODE_POOL(w.oid.inode)).pg_stripe_size), + }); + if (pg.state & PG_ACTIVE) + { + pg.copies_to_delete_after_sync.push_back(w); + copies_to_delete_after_sync_count++; + } + } + } + } + else if (op_data->copies_to_delete) + { + // Actually delete copies which we wanted to delete + submit_primary_del_batch(cur_op, op_data->copies_to_delete, op_data->copies_to_delete_count); +resume_7: + op_data->st = 7; + return; +resume_8: + if (op_data->errors > 0) + { + goto resume_6; + } + } + for (int i = 0; i < op_data->dirty_pg_count; i++) + { + auto & pg = pgs.at(op_data->dirty_pgs[i]); + pg.inflight--; + if ((pg.state & PG_STOPPING) && pg.inflight == 0 && !pg.flush_batch && + // We must either forget all PG's unstable writes or wait for it to become clean + dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) == dirty_pgs.end()) + { + finish_stop_pg(pg); + } + } + // FIXME: Free those in the destructor? + free(op_data->dirty_pgs); + op_data->dirty_pgs = NULL; + op_data->dirty_osds = NULL; + if (op_data->unstable_writes) + { + delete op_data->unstable_write_osds; + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + op_data->unstable_write_osds = NULL; + } + if (op_data->errors > 0) + { + finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO); + } + else + { +finish: + if (cur_op->peer_fd) + { + auto it = c_cli.clients.find(cur_op->peer_fd); + if (it != c_cli.clients.end()) + it->second->dirty_pgs.clear(); + } + finish_op(cur_op, 0); + } + assert(syncs_in_progress.front() == cur_op); + syncs_in_progress.pop_front(); + if (syncs_in_progress.size() > 0) + { + cur_op = syncs_in_progress.front(); + op_data = cur_op->op_data; + op_data->st++; + goto resume_2; + } +} diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp new file mode 100644 index 00000000..540e4da3 --- /dev/null +++ b/src/osd_primary_write.cpp @@ -0,0 +1,365 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) + +#include "osd_primary.h" +#include "allocator.h" + +bool osd_t::check_write_queue(osd_op_t *cur_op, pg_t & pg) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + // Check if actions are pending for this object + auto act_it = pg.flush_actions.lower_bound((obj_piece_id_t){ + .oid = op_data->oid, + .osd_num = 0, + }); + if (act_it != pg.flush_actions.end() && + act_it->first.oid.inode == op_data->oid.inode && + (act_it->first.oid.stripe & ~STRIPE_MASK) == op_data->oid.stripe) + { + pg.write_queue.emplace(op_data->oid, cur_op); + return false; + } + // Check if there are other write requests to the same object + auto vo_it = pg.write_queue.find(op_data->oid); + if (vo_it != pg.write_queue.end()) + { + op_data->st = 1; + pg.write_queue.emplace(op_data->oid, cur_op); + return false; + } + pg.write_queue.emplace(op_data->oid, cur_op); + return true; +} + +void osd_t::continue_primary_write(osd_op_t *cur_op) +{ + if (!cur_op->op_data && !prepare_primary_rw(cur_op)) + { + return; + } + osd_primary_op_data_t *op_data = cur_op->op_data; + auto & pg = pgs.at({ .pool_id = INODE_POOL(op_data->oid.inode), .pg_num = op_data->pg_num }); + if (op_data->st == 1) goto resume_1; + else if (op_data->st == 2) goto resume_2; + else if (op_data->st == 3) goto resume_3; + else if (op_data->st == 4) goto resume_4; + else if (op_data->st == 5) goto resume_5; + else if (op_data->st == 6) goto resume_6; + else if (op_data->st == 7) goto resume_7; + else if (op_data->st == 8) goto resume_8; + else if (op_data->st == 9) goto resume_9; + else if (op_data->st == 10) goto resume_10; + assert(op_data->st == 0); + if (!check_write_queue(cur_op, pg)) + { + return; + } +resume_1: + // Determine blocks to read and write + // Missing chunks are allowed to be overwritten even in incomplete objects + // FIXME: Allow to do small writes to the old (degraded/misplaced) OSD set for lower performance impact + op_data->prev_set = get_object_osd_set(pg, op_data->oid, pg.cur_set.data(), &op_data->object_state); + if (op_data->scheme == POOL_SCHEME_REPLICATED) + { + // Simplified algorithm + op_data->stripes[0].write_start = op_data->stripes[0].req_start; + op_data->stripes[0].write_end = op_data->stripes[0].req_end; + op_data->stripes[0].write_buf = cur_op->buf; + if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 || + op_data->stripes[0].write_end != bs_block_size)) + { + // Object is degraded/misplaced and will be moved to + op_data->stripes[0].read_start = 0; + op_data->stripes[0].read_end = bs_block_size; + cur_op->rmw_buf = op_data->stripes[0].read_buf = memalign_or_die(MEM_ALIGNMENT, bs_block_size); + } + } + else + { + cur_op->rmw_buf = calc_rmw(cur_op->buf, op_data->stripes, op_data->prev_set, + pg.pg_size, op_data->pg_data_size, pg.pg_cursize, pg.cur_set.data(), bs_block_size); + if (!cur_op->rmw_buf) + { + // Refuse partial overwrite of an incomplete object + cur_op->reply.hdr.retval = -EINVAL; + goto continue_others; + } + } + // Read required blocks + submit_primary_subops(SUBMIT_RMW_READ, UINT64_MAX, pg.pg_size, op_data->prev_set, cur_op); +resume_2: + op_data->st = 2; + return; +resume_3: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } + if (op_data->scheme == POOL_SCHEME_REPLICATED) + { + // Only (possibly) copy new data from the request into the recovery buffer + if (pg.cur_set.data() != op_data->prev_set && (op_data->stripes[0].write_start != 0 || + op_data->stripes[0].write_end != bs_block_size)) + { + memcpy( + op_data->stripes[0].read_buf + op_data->stripes[0].req_start, + op_data->stripes[0].write_buf, + op_data->stripes[0].req_end - op_data->stripes[0].req_start + ); + op_data->stripes[0].write_buf = op_data->stripes[0].read_buf; + op_data->stripes[0].write_start = 0; + op_data->stripes[0].write_end = bs_block_size; + } + } + else + { + // For EC/XOR pools, save version override to make it impossible + // for parallel reads to read different versions of data and parity + pg.ver_override[op_data->oid] = op_data->fact_ver; + // Recover missing stripes, calculate parity + if (pg.scheme == POOL_SCHEME_XOR) + { + calc_rmw_parity_xor(op_data->stripes, pg.pg_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); + } + else if (pg.scheme == POOL_SCHEME_JERASURE) + { + calc_rmw_parity_jerasure(op_data->stripes, pg.pg_size, op_data->pg_data_size, op_data->prev_set, pg.cur_set.data(), bs_block_size); + } + } + // Send writes + if ((op_data->fact_ver >> (64-PG_EPOCH_BITS)) < pg.epoch) + { + op_data->target_ver = ((uint64_t)pg.epoch << (64-PG_EPOCH_BITS)) | 1; + } + else + { + if ((op_data->fact_ver & (1ul<<(64-PG_EPOCH_BITS) - 1)) == (1ul<<(64-PG_EPOCH_BITS) - 1)) + { + assert(pg.epoch != ((1ul << PG_EPOCH_BITS)-1)); + pg.epoch++; + } + op_data->target_ver = op_data->fact_ver + 1; + } + if (pg.epoch > pg.reported_epoch) + { + // Report newer epoch before writing + // FIXME: We may report only one PG state here... + this->pg_state_dirty.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); + pg.history_changed = true; + report_pg_states(); +resume_10: + if (pg.epoch > pg.reported_epoch) + { + op_data->st = 10; + return; + } + } + submit_primary_subops(SUBMIT_WRITE, op_data->target_ver, pg.pg_size, pg.cur_set.data(), cur_op); +resume_4: + op_data->st = 4; + return; +resume_5: + if (op_data->scheme != POOL_SCHEME_REPLICATED) + { + // Remove version override just after the write, but before stabilizing + pg.ver_override.erase(op_data->oid); + } + if (op_data->object_state) + { + // We must forget the unclean state of the object before deleting it + // so the next reads don't accidentally read a deleted version + // And it should be done at the same time as the removal of the version override + remove_object_from_state(op_data->oid, op_data->object_state, pg); + pg.clean_count++; + } + if (op_data->errors > 0) + { + free_object_state(pg, &op_data->object_state); + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } +resume_6: +resume_7: + if (!remember_unstable_write(cur_op, pg, pg.cur_loc_set, 6)) + { + // FIXME: Check for immediate_commit == IMMEDIATE_SMALL + free_object_state(pg, &op_data->object_state); + return; + } + if (op_data->fact_ver == 1) + { + // Object is created + pg.clean_count++; + pg.total_count++; + } + if (op_data->object_state) + { + { + int recovery_type = op_data->object_state->state & (OBJ_DEGRADED|OBJ_INCOMPLETE) ? 0 : 1; + recovery_stat_count[0][recovery_type]++; + if (!recovery_stat_count[0][recovery_type]) + { + recovery_stat_count[0][recovery_type]++; + recovery_stat_bytes[0][recovery_type] = 0; + } + for (int role = 0; role < (op_data->scheme == POOL_SCHEME_REPLICATED ? 1 : pg.pg_size); role++) + { + recovery_stat_bytes[0][recovery_type] += op_data->stripes[role].write_end - op_data->stripes[role].write_start; + } + } + // Any kind of a non-clean object can have extra chunks, because we don't record objects + // as degraded & misplaced or incomplete & misplaced at the same time. So try to remove extra chunks + if (immediate_commit != IMMEDIATE_ALL) + { + // We can't remove extra chunks yet if fsyncs are explicit, because + // new copies may not be committed to stable storage yet + // We can only remove extra chunks after a successful SYNC for this PG + for (auto & chunk: op_data->object_state->osd_set) + { + // Check is the same as in submit_primary_del_subops() + if (op_data->scheme == POOL_SCHEME_REPLICATED + ? !contains_osd(pg.cur_set.data(), pg.pg_size, chunk.osd_num) + : (chunk.osd_num != pg.cur_set[chunk.role])) + { + pg.copies_to_delete_after_sync.push_back((obj_ver_osd_t){ + .osd_num = chunk.osd_num, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | (op_data->scheme == POOL_SCHEME_REPLICATED ? 0 : chunk.role), + }, + .version = op_data->fact_ver, + }); + copies_to_delete_after_sync_count++; + } + } + free_object_state(pg, &op_data->object_state); + } + else + { + submit_primary_del_subops(cur_op, pg.cur_set.data(), pg.pg_size, op_data->object_state->osd_set); + free_object_state(pg, &op_data->object_state); + if (op_data->n_subops > 0) + { +resume_8: + op_data->st = 8; + return; +resume_9: + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return; + } + } + } + } + cur_op->reply.hdr.retval = cur_op->req.rw.len; +continue_others: + object_id oid = op_data->oid; + // Remove the operation from queue before calling finish_op so it doesn't see the completed operation in queue + auto next_it = pg.write_queue.find(oid); + if (next_it != pg.write_queue.end() && next_it->second == cur_op) + { + pg.write_queue.erase(next_it++); + } + // finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :) + finish_op(cur_op, cur_op->reply.hdr.retval); + // Continue other write operations to the same object + if (next_it != pg.write_queue.end() && next_it->first == oid) + { + osd_op_t *next_op = next_it->second; + continue_primary_write(next_op); + } +} + +bool osd_t::remember_unstable_write(osd_op_t *cur_op, pg_t & pg, pg_osd_set_t & loc_set, int base_state) +{ + osd_primary_op_data_t *op_data = cur_op->op_data; + if (op_data->st == base_state) + { + goto resume_6; + } + else if (op_data->st == base_state+1) + { + goto resume_7; + } + // FIXME: Check for immediate_commit == IMMEDIATE_SMALL + if (immediate_commit == IMMEDIATE_ALL) + { + if (op_data->scheme != POOL_SCHEME_REPLICATED) + { + // Send STABILIZE ops immediately + op_data->unstable_write_osds = new std::vector(); + op_data->unstable_writes = new obj_ver_id[loc_set.size()]; + { + int last_start = 0; + for (auto & chunk: loc_set) + { + op_data->unstable_writes[last_start] = (obj_ver_id){ + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + .version = op_data->fact_ver, + }; + op_data->unstable_write_osds->push_back((unstable_osd_num_t){ + .osd_num = chunk.osd_num, + .start = last_start, + .len = 1, + }); + last_start++; + } + } + submit_primary_stab_subops(cur_op); +resume_6: + op_data->st = 6; + return false; +resume_7: + // FIXME: Free those in the destructor? + delete op_data->unstable_write_osds; + delete[] op_data->unstable_writes; + op_data->unstable_writes = NULL; + op_data->unstable_write_osds = NULL; + if (op_data->errors > 0) + { + pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO); + return false; + } + } + } + else + { + if (op_data->scheme != POOL_SCHEME_REPLICATED) + { + // Remember version as unstable for EC/XOR + for (auto & chunk: loc_set) + { + this->dirty_osds.insert(chunk.osd_num); + this->unstable_writes[(osd_object_id_t){ + .osd_num = chunk.osd_num, + .oid = { + .inode = op_data->oid.inode, + .stripe = op_data->oid.stripe | chunk.role, + }, + }] = op_data->fact_ver; + } + } + else + { + // Only remember to sync OSDs for replicated pools + for (auto & chunk: loc_set) + { + this->dirty_osds.insert(chunk.osd_num); + } + } + // Remember PG as dirty to drop the connection when PG goes offline + // (this is required because of the "lazy sync") + auto cl_it = c_cli.clients.find(cur_op->peer_fd); + if (cl_it != c_cli.clients.end()) + { + cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); + } + dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); + } + return true; +}