From 9b33f598d31227d5eb83aa10c2e2a59b674dfbdc Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 27 Jun 2020 02:13:33 +0300 Subject: [PATCH] Fix two more cluster client bugs 1) Sync could delete an unfinished write due to the lack of ordering (fixed by introducing syncing_writes) 2) Writes could be postponed indefinitely due to bad resuming of operations after a sync --- cluster_client.cpp | 63 ++++++++++++++++++++++++++++++---------------- cluster_client.h | 1 + 2 files changed, 43 insertions(+), 21 deletions(-) diff --git a/cluster_client.cpp b/cluster_client.cpp index 3b88b82a..96721808 100644 --- a/cluster_client.cpp +++ b/cluster_client.cpp @@ -18,6 +18,20 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd else if (unsynced_writes.size()) { // peer_osd just dropped connection + for (auto op: syncing_writes) + { + for (auto & part: op->parts) + { + if (part.osd_num == peer_osd && part.done) + { + // repeat this operation + part.osd_num = 0; + part.done = false; + assert(!part.sent); + op->done_count--; + } + } + } for (auto op: unsynced_writes) { for (auto & part: op->parts) @@ -206,6 +220,10 @@ void cluster_client_t::on_change_hook(json11::Json::object & changes) { op->needs_reslice = true; } + for (auto op: syncing_writes) + { + op->needs_reslice = true; + } pg_count = st_cli.pg_config.size(); if (pg_count) { @@ -281,19 +299,19 @@ void cluster_client_t::execute(cluster_op_t *op) { if (next_writes.size() > 0) { + assert(cur_sync); next_writes.push_back(op); return; } if (queued_bytes >= client_dirty_limit) { // Push an extra SYNC operation to flush previous writes + next_writes.push_back(op); cluster_op_t *sync_op = new cluster_op_t; sync_op->is_internal = true; sync_op->opcode = OSD_OP_SYNC; sync_op->callback = [](cluster_op_t* sync_op) {}; execute_sync(sync_op); - next_writes.push_back(op); - queued_bytes = 0; return; } queued_bytes += op->len; @@ -512,10 +530,8 @@ void cluster_client_t::execute_sync(cluster_op_t *op) op->retval = 0; std::function(op->callback)(op); } - else if (next_writes.size() > 0 || cur_sync != NULL) + else if (cur_sync != NULL) { - // There are some writes postponed to be executed after SYNC - // Push this SYNC after them next_writes.push_back(op); } else @@ -565,6 +581,7 @@ void cluster_client_t::continue_sync() return; } } + syncing_writes.swap(unsynced_writes); // Post sync to affected OSDs cur_sync->parts.resize(sync_osds.size()); int i = 0; @@ -584,6 +601,18 @@ void cluster_client_t::continue_sync() void cluster_client_t::finish_sync() { int retval = cur_sync->retval; + if (retval != 0) + { + for (auto op: syncing_writes) + { + if (op->done_count < op->parts.size()) + { + cur_ops.insert(op); + } + } + unsynced_writes.insert(unsynced_writes.begin(), syncing_writes.begin(), syncing_writes.end()); + syncing_writes.clear(); + } if (retval == -EPIPE) { // Retry later @@ -596,8 +625,9 @@ void cluster_client_t::finish_sync() std::function(cur_sync->callback)(cur_sync); if (!retval) { - for (auto op: unsynced_writes) + for (auto op: syncing_writes) { + assert(op->sent_count == 0); if (op->is_internal) { if (op->buf) @@ -605,24 +635,15 @@ void cluster_client_t::finish_sync() delete op; } } - unsynced_writes.clear(); + syncing_writes.clear(); } cur_sync = NULL; - while (next_writes.size() > 0) + queued_bytes = 0; + std::vector next_wr_copy; + next_wr_copy.swap(next_writes); + for (auto next_op: next_wr_copy) { - if (next_writes[0]->opcode == OSD_OP_SYNC) - { - cur_sync = next_writes[0]; - next_writes.erase(next_writes.begin(), next_writes.begin()+1); - continue_sync(); - } - else - { - auto wr = next_writes[0]; - cur_ops.insert(wr); - next_writes.erase(next_writes.begin(), next_writes.begin()+1); - continue_rw(wr); - } + execute(next_op); } } diff --git a/cluster_client.h b/cluster_client.h index 09349182..ac0c4a86 100644 --- a/cluster_client.h +++ b/cluster_client.h @@ -72,6 +72,7 @@ class cluster_client_t // unsynced operations are copied in memory to allow replay when cluster isn't in the immediate_commit mode // unsynced_writes are replayed in any order (because only the SYNC operation guarantees ordering) std::vector unsynced_writes; + std::vector syncing_writes; cluster_op_t* cur_sync = NULL; std::vector next_writes; std::vector offline_ops;