From 95c55da0ad638651065dbaeb151e419ffd18d5c9 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sun, 1 Aug 2021 20:06:01 +0300 Subject: [PATCH] Implement merge with CAS --- src/cmd.cpp | 180 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 125 insertions(+), 55 deletions(-) diff --git a/src/cmd.cpp b/src/cmd.cpp index c85784e0..7fb83af5 100644 --- a/src/cmd.cpp +++ b/src/cmd.cpp @@ -94,8 +94,8 @@ public: "USAGE:\n" " %s rm [--etcd_address ] --pool --inode \n" " [--wait-list] [--iodepth 32] [--parallel_osds 4] [--progress 1]\n" - " %s merge [--etcd_address ] [--target ]\n" - " [--iodepth 128] [--progress 1]\n", + " %s merge [--etcd_address ] [--target ]\n" + " [--iodepth 128] [--progress 1] [--cas 0|1]\n", exe_name, exe_name ); exit(0); @@ -237,6 +237,15 @@ struct rm_inode_t } }; +struct snap_rw_op_t +{ + uint64_t offset = 0; + void *buf = NULL; + cluster_op_t op; + int todo = 0; + uint32_t start = 0, end = 0; +}; + // Layer merge is the base for multiple operations: // 1) Delete snapshot "up" = merge child layer into the parent layer, remove the child // and rename the parent to the child @@ -257,6 +266,8 @@ struct snap_merger_t int target_rank; // delete merged source inode data during merge bool delete_source = false; + // use CAS writes (0 = never, 1 = auto, 2 = always) + int use_cas = 1; // don't necessarily delete source data, but perform checks as if we were to do it bool check_delete_source = false; // interval between fsyncs @@ -340,6 +351,7 @@ struct snap_merger_t target = target_cfg->num; target_rank = sources.at(target); int to_rank = sources.at(to_cfg->num); + bool to_has_children = false; // Check that there are no other inodes dependent on altered layers // // 1) everything between and except is not allowed @@ -373,11 +385,24 @@ struct snap_merger_t ); exit(1); } + if (parent_rank >= to_rank) + { + to_has_children = true; + } } } } + if ((target_rank < to_rank || to_has_children) && use_cas == 1) + { + // has children itself, no need for CAS + use_cas = 0; + } sources.erase(target); - printf("Merging %ld layers into target inode %lx\n", sources.size(), target); + printf( + "Merging %ld layer(s) into target %s%s (inode %lx)\n", + sources.size(), target_cfg->name.c_str(), + use_cas ? " online (with CAS)" : "", target + ); target_block_size = get_block_size(target); continue_merge_reent(); } @@ -528,7 +553,8 @@ struct snap_merger_t } if (status & INODE_LIST_DONE) { - printf("Got listing of inode %lx\n", src); + auto & name = parent->cli->st_cli.inode_config.at(src).name; + printf("Got listing of layer %s (inode %lx)\n", name.c_str(), src); if (delete_source) { // Sort the inode listing @@ -589,69 +615,112 @@ struct snap_merger_t // from all layers except after fsync'ing void read_and_write(uint64_t offset) { - void *buf = malloc(target_block_size); - cluster_op_t *op = new cluster_op_t; + snap_rw_op_t *rwo = new snap_rw_op_t; + // Initialize counter to 1 to later allow write_subop() to return immediately + // (even though it shouldn't really do that) + rwo->todo = 1; + rwo->buf = malloc(target_block_size); + rwo->offset = offset; + rwo_read(rwo); + } + + void rwo_read(snap_rw_op_t *rwo) + { + cluster_op_t *op = &rwo->op; op->opcode = OSD_OP_READ; op->inode = target; - op->offset = offset; + op->offset = rwo->offset; op->len = target_block_size; - op->iov.push_back(buf, target_block_size); - op->callback = [this](cluster_op_t *op) + op->iov.push_back(rwo->buf, target_block_size); + op->callback = [this, rwo](cluster_op_t *op) { - // Write each non-empty range using an individual operation - // FIXME: Allow to use a single write with bitmap (OSDs don't allow it yet) - uint32_t gran = parent->cli->get_bs_bitmap_granularity(); - uint64_t bitmap_size = target_block_size/gran; - uint32_t start = 0, end = 0; - int i; - // Track pending subops allowing write_subop() to return immediately (just in case) - op->version = bitmap_size; - for (i = 0; i < bitmap_size; i++) + if (op->retval != op->len) { - auto bit = ((*(uint8_t*)(op->bitmap_buf + (i >> 3))) & (1 << (i & 0x7))); - if (!bit && end > start) - { - // write end->start - op->version++; - write_subop(op, start, end); - } - end += gran; - if (!bit) - { - start = end; - } + fprintf(stderr, "error reading target at offset %lx: %s\n", op->offset, strerror(-op->retval)); + exit(1); } - if (end > start) - { - // write end->start - op->version++; - write_subop(op, start, end); - } - op->version -= bitmap_size; - // Just in case - autofree_op(op); + next_write(rwo); }; parent->cli->execute(op); } - void write_subop(cluster_op_t *op, uint32_t start, uint32_t end) + void next_write(snap_rw_op_t *rwo) + { + // Write each non-empty range using an individual operation + // FIXME: Allow to use single write with "holes" (OSDs don't allow it yet) + uint32_t gran = parent->cli->get_bs_bitmap_granularity(); + uint64_t bitmap_size = target_block_size / gran; + while (rwo->end < bitmap_size) + { + auto bit = ((*(uint8_t*)(rwo->op.bitmap_buf + (rwo->end >> 3))) & (1 << (rwo->end & 0x7))); + if (!bit) + { + if (rwo->end > rwo->start) + { + // write start->end + rwo->todo++; + write_subop(rwo, rwo->start*gran, rwo->end*gran, use_cas ? 1+rwo->op.version : 0); + rwo->start = rwo->end; + if (use_cas) + { + // Submit one by one if using CAS writes + return; + } + } + rwo->start = rwo->end = rwo->end+1; + } + else + { + rwo->end++; + } + } + if (rwo->end > rwo->start) + { + // write start->end + rwo->todo++; + write_subop(rwo, rwo->start*gran, rwo->end*gran, use_cas ? 1+rwo->op.version : 0); + rwo->start = rwo->end; + if (use_cas) + { + return; + } + } + rwo->todo--; + // Just in case, if everything is done + autofree_op(rwo); + } + + void write_subop(snap_rw_op_t *rwo, uint32_t start, uint32_t end, uint64_t version) { - void *buf = op->iov.buf[0].iov_base; cluster_op_t *subop = new cluster_op_t; subop->opcode = OSD_OP_WRITE; subop->inode = target; - subop->offset = op->offset+start; + subop->offset = rwo->offset+start; subop->len = end-start; - subop->iov.push_back(buf+start, end-start); - subop->callback = [this, op](cluster_op_t *subop) + subop->version = version; + subop->iov.push_back(rwo->buf+start, end-start); + subop->callback = [this, rwo](cluster_op_t *subop) { + rwo->todo--; if (subop->retval != subop->len) { + if (use_cas && subop->retval == -EINTR) + { + // CAS failure - reread and repeat optimistically + rwo->start = subop->offset - rwo->offset; + rwo_read(rwo); + delete subop; + return; + } fprintf(stderr, "error writing target at offset %lx: %s\n", subop->offset, strerror(-subop->retval)); exit(1); } - op->version--; - autofree_op(op); + // Increment CAS version + rwo->op.version++; + if (use_cas) + next_write(rwo); + else + autofree_op(rwo); delete subop; }; parent->cli->execute(subop); @@ -675,13 +744,13 @@ struct snap_merger_t parent->cli->execute(subop); } - void autofree_op(cluster_op_t *op) + void autofree_op(snap_rw_op_t *rwo) { - if (!op->version) + if (!rwo->todo) { - if (last_written_offset < op->offset+target_block_size) + if (last_written_offset < rwo->op.offset+target_block_size) { - last_written_offset = op->offset+target_block_size; + last_written_offset = rwo->op.offset+target_block_size; } if (delete_source) { @@ -712,9 +781,8 @@ struct snap_merger_t parent->cli->execute(subop); } } - void *buf = op->iov.buf[0].iov_base; - free(buf); - delete op; + free(rwo->buf); + delete rwo; in_flight--; continue_merge_reent(); } @@ -747,7 +815,7 @@ void cli_tool_t::run(json11::Json cfg) } else if (cmd[0] == "merge") { - // Merge layers + // Merge layer data without affecting metadata merger = new snap_merger_t(); merger->parent = this; merger->from_name = cmd[1].string_value(); @@ -758,10 +826,12 @@ void cli_tool_t::run(json11::Json cfg) fprintf(stderr, "Beginning or end of the merge sequence is missing\n"); exit(1); } - merger->delete_source = cfg["delete"].string_value() != ""; + merger->delete_source = cfg["delete-source"].string_value() != ""; merger->fsync_interval = cfg["fsync-interval"].uint64_value(); if (!merger->fsync_interval) merger->fsync_interval = 128; + if (!cfg["cas"].is_null()) + merger->use_cas = cfg["cas"].uint64_value() ? 2 : 0; } else {