diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 4bf079af..dea96649 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -108,8 +108,101 @@ cluster_op_t::~cluster_op_t() } } +void cluster_client_t::calc_wait(cluster_op_t *op) +{ + op->prev_wait = 0; + if (op->opcode == OSD_OP_WRITE) + { + for (auto prev = op->prev; prev; prev = prev->prev) + { + if (prev->opcode == OSD_OP_SYNC || + prev->opcode == OSD_OP_WRITE && !(op->flags & OP_FLUSH_BUFFER) && (prev->flags & OP_FLUSH_BUFFER)) + { + op->prev_wait++; + } + } + if (!op->prev_wait && pgs_loaded) + continue_rw(op); + } + else if (op->opcode == OSD_OP_SYNC) + { + for (auto prev = op->prev; prev; prev = prev->prev) + { + if (prev->opcode == OSD_OP_SYNC || prev->opcode == OSD_OP_WRITE) + { + op->prev_wait++; + } + } + if (!op->prev_wait && pgs_loaded) + continue_sync(op); + } + else + { + for (auto prev = op->prev; prev; prev = prev->prev) + { + if (prev->opcode == OSD_OP_WRITE && prev->flags & OP_FLUSH_BUFFER) + { + op->prev_wait++; + } + else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ) + { + // Flushes are always in the beginning + break; + } + } + if (!op->prev_wait && pgs_loaded) + continue_rw(op); + } +} + +void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc) +{ + if (opcode == OSD_OP_WRITE) + { + while (next) + { + auto n2 = next->next; + if (next->opcode == OSD_OP_SYNC || + next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER) || + next->opcode == OSD_OP_READ && (flags & OP_FLUSH_BUFFER)) + { + next->prev_wait += inc; + if (!next->prev_wait) + { + if (next->opcode == OSD_OP_SYNC) + continue_sync(next); + else + continue_rw(next); + } + } + next = n2; + } + } + else if (opcode == OSD_OP_SYNC) + { + while (next) + { + auto n2 = next->next; + if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE) + { + next->prev_wait += inc; + if (!next->prev_wait) + { + if (next->opcode == OSD_OP_SYNC) + continue_sync(next); + else + continue_rw(next); + } + } + next = n2; + } + } +} + void cluster_client_t::erase_op(cluster_op_t *op) { + uint64_t opcode = op->opcode, flags = op->flags; + cluster_op_t *next = op->next; if (op->prev) op->prev->next = op->next; if (op->next) @@ -120,6 +213,8 @@ void cluster_client_t::erase_op(cluster_op_t *op) op_queue_tail = op->prev; op->next = op->prev = NULL; std::function(op->callback)(op); + if (!immediate_commit) + inc_wait(opcode, flags, next, -1); } void cluster_client_t::continue_ops(bool up_retry) @@ -136,45 +231,19 @@ void cluster_client_t::continue_ops(bool up_retry) } restart: continuing_ops = 1; - bool has_flushes = false, has_writes = false; for (auto op = op_queue_head; op; ) { cluster_op_t *next_op = op->next; - bool rm = false, is_flush = op->flags & OP_FLUSH_BUFFER; - auto opcode = op->opcode; if (!op->up_wait || up_retry) { op->up_wait = false; - if (opcode == OSD_OP_READ || opcode == OSD_OP_WRITE) + if (!op->prev_wait) { - if (is_flush || !has_flushes) - { - // Regular writes can't proceed before buffer flushes - rm = continue_rw(op); - } + if (op->opcode == OSD_OP_SYNC) + continue_sync(op); + else + continue_rw(op); } - else if (opcode == OSD_OP_SYNC) - { - if (!has_writes) - { - // SYNC can't proceed before previous writes - rm = continue_sync(op); - } - } - } - if (opcode == OSD_OP_WRITE) - { - has_writes = has_writes || !rm; - if (is_flush) - { - has_flushes = has_flushes || !rm; - } - } - else if (opcode == OSD_OP_SYNC) - { - // Postpone writes until previous SYNC completes - // ...so dirty_writes can't contain anything newer than SYNC - has_flushes = has_flushes || !rm; } op = next_op; if (continuing_ops == 2) @@ -218,11 +287,8 @@ void cluster_client_t::on_load_config_hook(json11::Json::object & config) { throw std::runtime_error("Bad block size"); } - if (config["immediate_commit"] == "all") - { - // Cluster-wide immediate_commit mode - immediate_commit = true; - } + // Cluster-wide immediate_commit mode + immediate_commit = (config["immediate_commit"] == "all"); if (config.find("client_max_dirty_bytes") != config.end()) { client_max_dirty_bytes = config["client_max_dirty_bytes"].uint64_value(); @@ -377,6 +443,7 @@ void cluster_client_t::execute(cluster_op_t *op) op_queue_tail = op_queue_head = sync_op; dirty_bytes = 0; dirty_ops = 0; + calc_wait(sync_op); } dirty_bytes += op->len; dirty_ops++; @@ -394,7 +461,15 @@ void cluster_client_t::execute(cluster_op_t *op) } else op_queue_tail = op_queue_head = op; - continue_ops(); + if (!immediate_commit) + calc_wait(op); + else if (pgs_loaded) + { + if (op->opcode == OSD_OP_SYNC) + continue_sync(op); + else + continue_rw(op); + } } void cluster_client_t::copy_write(cluster_op_t *op, std::map & dirty_buffers) @@ -501,11 +576,8 @@ void cluster_client_t::flush_buffer(const object_id & oid, cluster_buffer_t *wr) } else op_queue_tail = op_queue_head = op; - if (continuing_ops) - { - // Rescan queue from the beginning - continuing_ops = 2; - } + inc_wait(op->opcode, op->flags, op->next, 1); + continue_rw(op); } int cluster_client_t::continue_rw(cluster_op_t *op) @@ -1034,7 +1106,10 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) } if (op->inflight_count == 0) { - continue_ops(); + if (op->opcode == OSD_OP_SYNC) + continue_sync(op); + else + continue_rw(op); } } diff --git a/src/cluster_client.h b/src/cluster_client.h index 7c4af1df..dcf3db32 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -36,7 +36,7 @@ struct cluster_op_t std::function callback; ~cluster_op_t(); protected: - int flags = 0; + uint64_t flags = 0; int state = 0; uint64_t cur_inode; // for snapshot reads void *buf = NULL; @@ -48,6 +48,7 @@ protected: void *bitmap_buf = NULL, *part_bitmaps = NULL; unsigned bitmap_buf_size = 0; cluster_op_t *prev = NULL, *next = NULL; + int prev_wait = 0; friend class cluster_client_t; }; @@ -67,7 +68,8 @@ class cluster_client_t uint64_t bs_block_size = 0; uint32_t bs_bitmap_granularity = 0, bs_bitmap_size = 0; std::map pg_counts; - bool immediate_commit = false; + // WARNING: initially true so execute() doesn't create fake sync + bool immediate_commit = true; // FIXME: Implement inmemory_commit mode. Note that it requires to return overlapping reads from memory. uint64_t client_max_dirty_bytes = 0; uint64_t client_max_dirty_ops = 0; @@ -118,4 +120,6 @@ protected: void handle_op_part(cluster_op_part_t *part); void copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *part); void erase_op(cluster_op_t *op); + void calc_wait(cluster_op_t *op); + void inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *next, int inc); };