From d749159585eb2e70d9c143346a14f49ce864827c Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 28 Apr 2021 01:39:27 +0300 Subject: [PATCH] Linked list experiment Rework client operation queue from a vector to a linked list. This is required to rework continue_ops() as its current implementation consumes ~25% of client process CPU. --- src/cluster_client.cpp | 74 ++++++++++++++++++++++++++++-------------- src/cluster_client.h | 5 +-- 2 files changed, 53 insertions(+), 26 deletions(-) diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 8599aac7..4bf079af 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -108,6 +108,20 @@ cluster_op_t::~cluster_op_t() } } +void cluster_client_t::erase_op(cluster_op_t *op) +{ + if (op->prev) + op->prev->next = op->next; + if (op->next) + op->next->prev = op->prev; + if (op_queue_head == op) + op_queue_head = op->next; + if (op_queue_tail == op) + op_queue_tail = op->prev; + op->next = op->prev = NULL; + std::function(op->callback)(op); +} + void cluster_client_t::continue_ops(bool up_retry) { if (!pgs_loaded) @@ -118,16 +132,14 @@ void cluster_client_t::continue_ops(bool up_retry) if (continuing_ops) { // Attempt to reenter the function - continuing_ops = 2; return; } restart: continuing_ops = 1; - op_queue_pos = 0; bool has_flushes = false, has_writes = false; - while (op_queue_pos < op_queue.size()) + for (auto op = op_queue_head; op; ) { - auto op = op_queue[op_queue_pos]; + cluster_op_t *next_op = op->next; bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER; auto opcode = op->opcode; if (!op->up_wait || up_retry) @@ -164,14 +176,7 @@ restart: // ...so dirty_writes can't contain anything newer than SYNC has_flushes = has_flushes || !rm; } - if (rm) - { - op_queue.erase(op_queue.begin()+op_queue_pos, op_queue.begin()+op_queue_pos+1); - } - else - { - op_queue_pos++; - } + op = next_op; if (continuing_ops == 2) { goto restart; @@ -281,7 +286,7 @@ void cluster_client_t::on_change_hook(std::map & changes { // At this point, all pool operations should have been suspended // And now they have to be resliced! - for (auto op: op_queue) + for (auto op = op_queue_head; op; op = op->next) { if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) && INODE_POOL(op->cur_inode) == pool_item.first) @@ -362,7 +367,14 @@ void cluster_client_t::execute(cluster_op_t *op) { delete sync_op; }; - op_queue.push_back(sync_op); + sync_op->prev = op_queue_tail; + if (op_queue_tail) + { + op_queue_tail->next = sync_op; + op_queue_tail = sync_op; + } + else + op_queue_tail = op_queue_head = sync_op; dirty_bytes = 0; dirty_ops = 0; } @@ -374,7 +386,14 @@ void cluster_client_t::execute(cluster_op_t *op) dirty_bytes = 0; dirty_ops = 0; } - op_queue.push_back(op); + op->prev = op_queue_tail; + if (op_queue_tail) + { + op_queue_tail->next = op; + op_queue_tail = op; + } + else + op_queue_tail = op_queue_head = op; continue_ops(); } @@ -474,11 +493,18 @@ void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr) } delete op; }; - op_queue.insert(op_queue.begin(), op); + op->next = op_queue_head; + if (op_queue_head) + { + op_queue_head->prev = op; + op_queue_head = op; + } + else + op_queue_tail = op_queue_head = op; if (continuing_ops) { + // Rescan queue from the beginning continuing_ops = 2; - op_queue_pos++; } } @@ -496,7 +522,7 @@ resume_0: if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity) { op->retval = -EINVAL; - std::function(op->callback)(op); + erase_op(op); return 1; } { @@ -504,7 +530,7 @@ resume_0: if (!pool_id) { op->retval = -EINVAL; - std::function(op->callback)(op); + erase_op(op); return 1; } if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() || @@ -520,7 +546,7 @@ resume_0: if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly) { op->retval = -EINVAL; - std::function(op->callback)(op); + erase_op(op); return 1; } if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER)) @@ -603,13 +629,13 @@ resume_3: } } op->retval = op->len; - std::function(op->callback)(op); + erase_op(op); return 1; } else if (op->retval != 0 && op->retval != -EPIPE) { // Fatal error (not -EPIPE) - std::function(op->callback)(op); + erase_op(op); return 1; } else @@ -849,7 +875,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op) { // Sync is not required in the immediate_commit mode or if there are no dirty_osds op->retval = 0; - std::function(op->callback)(op); + erase_op(op); return 1; } // Check that all OSD connections are still alive @@ -924,7 +950,7 @@ resume_1: uw_it++; } } - std::function(op->callback)(op); + erase_op(op); return 1; } diff --git a/src/cluster_client.h b/src/cluster_client.h index 14b0d474..7c4af1df 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -47,6 +47,7 @@ protected: std::vector parts; void *bitmap_buf = NULL, *part_bitmaps = NULL; unsigned bitmap_buf_size = 0; + cluster_op_t *prev = NULL, *next = NULL; friend class cluster_client_t; }; @@ -76,7 +77,7 @@ class cluster_client_t int retry_timeout_id = 0; uint64_t op_id = 1; std::vector offline_ops; - std::vector op_queue; + cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL; std::map dirty_buffers; std::set dirty_osds; uint64_t dirty_bytes = 0, dirty_ops = 0; @@ -88,7 +89,6 @@ class cluster_client_t ring_consumer_t consumer; std::vector> on_ready_hooks; int continuing_ops = 0; - int op_queue_pos = 0; public: etcd_state_client_t st_cli; @@ -117,4 +117,5 @@ protected: void send_sync(cluster_op_t *op, cluster_op_part_t *part); void handle_op_part(cluster_op_part_t *part); void copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part); + void erase_op(cluster_op_t *op); };