From 27ee14a4e6b14e05101b345c5c7d40e6a0063093 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 15 Jun 2020 13:22:20 +0300 Subject: [PATCH] Fix bugs in cluster_client --- cluster_client.cpp | 151 ++++++++++++++++++++++++++------------------- cluster_client.h | 1 + fio_cluster.cpp | 12 +++- 3 files changed, 99 insertions(+), 65 deletions(-) diff --git a/cluster_client.cpp b/cluster_client.cpp index 6587be5a..266f6856 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -5,6 +5,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd this->ringloop = ringloop; this->tfd = tfd; + msgr.osd_num = 0; msgr.tfd = tfd; msgr.ringloop = ringloop; msgr.repeer_pgs = [this](osd_num_t peer_osd) @@ -48,6 +49,14 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd log_level = config["log_level"].int64_value(); st_cli.parse_config(config); st_cli.load_global_config(); + + consumer.loop = [this]() + { + msgr.read_requests(); + msgr.send_replies(); + this->ringloop->submit(); + }; + ringloop->register_consumer(&consumer); } void cluster_client_t::continue_ops() @@ -108,10 +117,10 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) if (config.find("pg_stripe_size") != config.end()) { pg_stripe_size = config["pg_stripe_size"].uint64_value(); - if (!pg_stripe_size) - { - pg_stripe_size = DEFAULT_PG_STRIPE_SIZE; - } + } + if (!pg_stripe_size) + { + pg_stripe_size = DEFAULT_PG_STRIPE_SIZE; } if (config["immediate_commit"] == "all") { @@ -121,10 +130,10 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) else if (config.find("client_dirty_limit") != config.end()) { client_dirty_limit = config["client_dirty_limit"].uint64_value(); - if (!client_dirty_limit) - { - client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT; - } + } + if (!client_dirty_limit) + { + client_dirty_limit = DEFAULT_CLIENT_DIRTY_LIMIT; } msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value(); if (!msgr.peer_connect_interval) @@ -136,6 +145,7 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) { msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; } + st_cli.load_pgs(); } void cluster_client_t::on_load_pgs_hook(bool success) @@ -143,11 +153,14 @@ void cluster_client_t::on_load_pgs_hook(bool success) if (success) { pg_count = st_cli.pg_config.size(); - for (auto op: offline_ops) + if (pg_count) { - execute(op); + for (auto op: offline_ops) + { + execute(op); + } + offline_ops.clear(); } - offline_ops.clear(); } } @@ -189,24 +202,27 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd) /** * How writes are synced when immediate_commit is false * - * WRITE: - * 1) copy op to the queue - * 2) slice op - * 3) connect & send all parts - * 4) if any of them fail due to disconnected peers, repeat after reconnecting - * 5) if any of them fail due to other errors, fail the operation + * 1) accept up to write operations for execution, + * queue all subsequent writes into + * 2) accept exactly one SYNC, queue all subsequent SYNCs into , too + * 3) "continue" all accepted writes + * + * "Continue" WRITE: + * 1) if the operation is not a copy yet - copy it (required for replay) + * 2) if the operation is not sliced yet - slice it + * 3) if the operation doesn't require reslice - try to connect & send all remaining parts + * 4) if any of them fail due to disconnected peers or PGs not up, repeat after reconnecting or small timeout + * 5) if any of them fail due to other errors, fail the operation and forget it from the current "unsynced batch" * 6) if PG count changes before all parts are done, wait for all in-progress parts to finish, * throw all results away, reslice and resubmit op - * 7) when all parts are done, try to continue SYNCs + * 7) when all parts are done, try to "continue" the current SYNC + * 8) if the operation succeeds, but then some OSDs drop their connections, repeat + * parts from the current "unsynced batch" previously sent to those OSDs in any order * - * SYNC: - * 1) add SYNC to the queue after writes - * 2) try to continue SYNCs - * - * Continue SYNCs: - * 1) check the queue for a SYNC ready for submission. if there is one: - * 2) slice it - * 3) connect & send all SYNC parts + * "Continue" current SYNC: + * 1) take all unsynced operations from the current batch + * 2) check if all affected OSDs are still alive + * 3) if yes, send all SYNCs. otherwise, leave current SYNC as is. * 4) if any of them fail due to disconnected peers, repeat SYNC after repeating all writes * 5) if any of them fail due to other errors, fail the SYNC operation */ @@ -214,6 +230,12 @@ void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd) void cluster_client_t::execute(cluster_op_t *op) { op->retval = 0; + if (!pg_count) + { + // Config is not loaded yet, retry after connecting to etcd + offline_ops.push_back(op); + return; + } if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE || (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && (!op->inode || !op->len || op->offset % bs_disk_alignment || op->len % bs_disk_alignment)) @@ -222,12 +244,6 @@ void cluster_client_t::execute(cluster_op_t *op) std::function(op->callback)(op); return; } - if (!pg_count) - { - // Config is not loaded yet, retry after connecting to etcd - offline_ops.push_back(op); - return; - } if (op->opcode == OSD_OP_SYNC) { execute_sync(op); @@ -319,6 +335,7 @@ void cluster_client_t::continue_rw(cluster_op_t *op) op->retval = op->len; std::function(op->callback)(op); continue_sync(); + return; } else if (op->retval != 0 && op->retval != -EPIPE) { @@ -339,18 +356,25 @@ void cluster_client_t::continue_rw(cluster_op_t *op) std::function(op->callback)(op); if (del) { + if (op->buf) + free(op->buf); delete op; } continue_sync(); + return; } - else if (op->needs_reslice) + else { - op->parts.clear(); - op->done_count = 0; - op->needs_reslice = false; - continue_rw(op); + // -EPIPE or no error - clear the error + op->retval = 0; + if (op->needs_reslice) + { + op->parts.clear(); + op->done_count = 0; + op->needs_reslice = false; + continue_rw(op); + } } - // else -EPIPE } } @@ -361,34 +385,25 @@ void cluster_client_t::slice_rw(cluster_op_t *op) uint64_t pg_block_size = bs_block_size * pg_part_count; uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; - int part_count = 0; - for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) - { - if (op->offset < (stripe+pg_block_size) && (op->offset+op->len) > stripe) - { - part_count++; - } - } op->retval = 0; - op->parts.resize(part_count); + op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1); int i = 0; for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) { - uint64_t stripe_end = stripe + pg_block_size; - if (op->offset < stripe_end && (op->offset+op->len) > stripe) - { - pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pg_count + 1; - op->parts[i] = { - .parent = op, - .offset = op->offset < stripe ? stripe : op->offset, - .len = (uint32_t)((op->offset+op->len) > stripe_end ? pg_block_size : op->offset+op->len-stripe), - .pg_num = pg_num, - .buf = op->buf + (op->offset < stripe ? stripe-op->offset : 0), - .sent = false, - .done = false, - }; - i++; - } + pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pg_count + 1; + uint64_t begin = (op->offset < stripe ? stripe : op->offset); + uint64_t end = (op->offset + op->len) > (stripe + pg_block_size) + ? (stripe + pg_block_size) : (op->offset + op->len); + op->parts[i] = { + .parent = op, + .offset = begin, + .len = (uint32_t)(end - begin), + .pg_num = pg_num, + .buf = op->buf + begin - op->offset, + .sent = false, + .done = false, + }; + i++; } } @@ -539,6 +554,8 @@ void cluster_client_t::finish_sync() { if (op->is_internal) { + if (op->buf) + free(op->buf); delete op; } } @@ -548,7 +565,15 @@ void cluster_client_t::finish_sync() int i; for (i = 0; i < next_writes.size() && !cur_sync; i++) { - execute(next_writes[i]); + if (next_writes[i]->opcode == OSD_OP_SYNC) + { + execute_sync(next_writes[i]); + } + else + { + cur_ops.insert(next_writes[i]); + continue_rw(next_writes[i]); + } } next_writes.erase(next_writes.begin(), next_writes.begin()+i); } diff --git a/cluster_client.h b/cluster_client.h index de4fa4b3..258ded36 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -64,6 +64,7 @@ class cluster_client_t uint64_t op_id = 1; etcd_state_client_t st_cli; osd_messenger_t msgr; + ring_consumer_t consumer; // operations currently in progress std::set cur_ops; int retry_timeout_id = 0; diff --git a/fio_cluster.cpp b/fio_cluster.cpp index fa153158..6c28a6a9 100644 --- a/fio_cluster.cpp +++ b/fio_cluster.cpp @@ -211,8 +211,16 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) if (opt->trace) { - printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" : - (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n); + if (io->ddir == DDIR_SYNC) + { + printf("+++ SYNC # %d\n", n); + } + else + { + printf("+++ %s # %d 0x%llx+%llx\n", + io->ddir == DDIR_READ ? "READ" : "WRITE", + n, io->offset, io->xfer_buflen); + } } io->error = 0;