Browse Source

Linked list experiment

Rework client operation queue from a vector to a linked list.
This is required to rework continue_ops() as its current implementation
consumes ~25% of client process CPU.
rdma-zerocopy
Vitaliy Filippov 2 months ago
parent
commit
d749159585
  1. 74
      src/cluster_client.cpp
  2. 5
      src/cluster_client.h

74
src/cluster_client.cpp

@ -108,6 +108,20 @@ cluster_op_t::~cluster_op_t()
}
}
void cluster_client_t::erase_op(cluster_op_t *op)
{
if (op->prev)
op->prev->next = op->next;
if (op->next)
op->next->prev = op->prev;
if (op_queue_head == op)
op_queue_head = op->next;
if (op_queue_tail == op)
op_queue_tail = op->prev;
op->next = op->prev = NULL;
std::function<void(cluster_op_t*)>(op->callback)(op);
}
void cluster_client_t::continue_ops(bool up_retry)
{
if (!pgs_loaded)
@ -118,16 +132,14 @@ void cluster_client_t::continue_ops(bool up_retry)
if (continuing_ops)
{
// Attempt to reenter the function
continuing_ops = 2;
return;
}
restart:
continuing_ops = 1;
op_queue_pos = 0;
bool has_flushes = false, has_writes = false;
while (op_queue_pos < op_queue.size())
for (auto op = op_queue_head; op; )
{
auto op = op_queue[op_queue_pos];
cluster_op_t *next_op = op->next;
bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER;
auto opcode = op->opcode;
if (!op->up_wait || up_retry)
@ -164,14 +176,7 @@ restart:
// ...so dirty_writes can't contain anything newer than SYNC
has_flushes = has_flushes || !rm;
}
if (rm)
{
op_queue.erase(op_queue.begin()+op_queue_pos, op_queue.begin()+op_queue_pos+1);
}
else
{
op_queue_pos++;
}
op = next_op;
if (continuing_ops == 2)
{
goto restart;
@ -281,7 +286,7 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
{
// At this point, all pool operations should have been suspended
// And now they have to be resliced!
for (auto op: op_queue)
for (auto op = op_queue_head; op; op = op->next)
{
if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) &&
INODE_POOL(op->cur_inode) == pool_item.first)
@ -362,7 +367,14 @@ void cluster_client_t::execute(cluster_op_t *op)
{
delete sync_op;
};
op_queue.push_back(sync_op);
sync_op->prev = op_queue_tail;
if (op_queue_tail)
{
op_queue_tail->next = sync_op;
op_queue_tail = sync_op;
}
else
op_queue_tail = op_queue_head = sync_op;
dirty_bytes = 0;
dirty_ops = 0;
}
@ -374,7 +386,14 @@ void cluster_client_t::execute(cluster_op_t *op)
dirty_bytes = 0;
dirty_ops = 0;
}
op_queue.push_back(op);
op->prev = op_queue_tail;
if (op_queue_tail)
{
op_queue_tail->next = op;
op_queue_tail = op;
}
else
op_queue_tail = op_queue_head = op;
continue_ops();
}
@ -474,11 +493,18 @@ void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr)
}
delete op;
};
op_queue.insert(op_queue.begin(), op);
op->next = op_queue_head;
if (op_queue_head)
{
op_queue_head->prev = op;
op_queue_head = op;
}
else
op_queue_tail = op_queue_head = op;
if (continuing_ops)
{
// Rescan queue from the beginning
continuing_ops = 2;
op_queue_pos++;
}
}
@ -496,7 +522,7 @@ resume_0:
if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
{
@ -504,7 +530,7 @@ resume_0:
if (!pool_id)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
if (st_cli.pool_config.find(pool_id) == st_cli.pool_config.end() ||
@ -520,7 +546,7 @@ resume_0:
if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly)
{
op->retval = -EINVAL;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER))
@ -603,13 +629,13 @@ resume_3:
}
}
op->retval = op->len;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
else if (op->retval != 0 && op->retval != -EPIPE)
{
// Fatal error (not -EPIPE)
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
else
@ -849,7 +875,7 @@ int cluster_client_t::continue_sync(cluster_op_t *op)
{
// Sync is not required in the immediate_commit mode or if there are no dirty_osds
op->retval = 0;
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}
// Check that all OSD connections are still alive
@ -924,7 +950,7 @@ resume_1:
uw_it++;
}
}
std::function<void(cluster_op_t*)>(op->callback)(op);
erase_op(op);
return 1;
}

5
src/cluster_client.h

@ -47,6 +47,7 @@ protected:
std::vector<cluster_op_part_t> parts;
void *bitmap_buf = NULL, *part_bitmaps = NULL;
unsigned bitmap_buf_size = 0;
cluster_op_t *prev = NULL, *next = NULL;
friend class cluster_client_t;
};
@ -76,7 +77,7 @@ class cluster_client_t
int retry_timeout_id = 0;
uint64_t op_id = 1;
std::vector<cluster_op_t*> offline_ops;
std::vector<cluster_op_t*> op_queue;
cluster_op_t *op_queue_head = NULL, *op_queue_tail = NULL;
std::map<object_id, cluster_buffer_t> dirty_buffers;
std::set<osd_num_t> dirty_osds;
uint64_t dirty_bytes = 0, dirty_ops = 0;
@ -88,7 +89,6 @@ class cluster_client_t
ring_consumer_t consumer;
std::vector<std::function<void(void)>> on_ready_hooks;
int continuing_ops = 0;
int op_queue_pos = 0;
public:
etcd_state_client_t st_cli;
@ -117,4 +117,5 @@ protected:
void send_sync(cluster_op_t *op, cluster_op_part_t *part);
void handle_op_part(cluster_op_part_t *part);
void copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part);
void erase_op(cluster_op_t *op);
};
Loading…
Cancel
Save