Browse Source

Implement merge with CAS

listing
Vitaliy Filippov 2 months ago
parent
commit
95c55da0ad
  1. 176
      src/cmd.cpp

176
src/cmd.cpp

@ -94,8 +94,8 @@ public:
"USAGE:\n"
" %s rm [--etcd_address <etcd_address>] --pool <pool> --inode <inode>\n"
" [--wait-list] [--iodepth 32] [--parallel_osds 4] [--progress 1]\n"
" %s merge [--etcd_address <etcd_address>] <from> <to> [--target <name>]\n"
" [--iodepth 128] [--progress 1]\n",
" %s merge [--etcd_address <etcd_address>] <from> <to> [--target <from>]\n"
" [--iodepth 128] [--progress 1] [--cas 0|1]\n",
exe_name, exe_name
);
exit(0);
@ -237,6 +237,15 @@ struct rm_inode_t
}
};
struct snap_rw_op_t
{
uint64_t offset = 0;
void *buf = NULL;
cluster_op_t op;
int todo = 0;
uint32_t start = 0, end = 0;
};
// Layer merge is the base for multiple operations:
// 1) Delete snapshot "up" = merge child layer into the parent layer, remove the child
// and rename the parent to the child
@ -257,6 +266,8 @@ struct snap_merger_t
int target_rank;
// delete merged source inode data during merge
bool delete_source = false;
// use CAS writes (0 = never, 1 = auto, 2 = always)
int use_cas = 1;
// don't necessarily delete source data, but perform checks as if we were to do it
bool check_delete_source = false;
// interval between fsyncs
@ -340,6 +351,7 @@ struct snap_merger_t
target = target_cfg->num;
target_rank = sources.at(target);
int to_rank = sources.at(to_cfg->num);
bool to_has_children = false;
// Check that there are no other inodes dependent on altered layers
//
// 1) everything between <target> and <to> except <to> is not allowed
@ -373,11 +385,24 @@ struct snap_merger_t
);
exit(1);
}
if (parent_rank >= to_rank)
{
to_has_children = true;
}
}
}
}
if ((target_rank < to_rank || to_has_children) && use_cas == 1)
{
// <to> has children itself, no need for CAS
use_cas = 0;
}
sources.erase(target);
printf("Merging %ld layers into target inode %lx\n", sources.size(), target);
printf(
"Merging %ld layer(s) into target %s%s (inode %lx)\n",
sources.size(), target_cfg->name.c_str(),
use_cas ? " online (with CAS)" : "", target
);
target_block_size = get_block_size(target);
continue_merge_reent();
}
@ -528,7 +553,8 @@ struct snap_merger_t
}
if (status & INODE_LIST_DONE)
{
printf("Got listing of inode %lx\n", src);
auto & name = parent->cli->st_cli.inode_config.at(src).name;
printf("Got listing of layer %s (inode %lx)\n", name.c_str(), src);
if (delete_source)
{
// Sort the inode listing
@ -589,69 +615,112 @@ struct snap_merger_t
// from all layers except <target> after fsync'ing
void read_and_write(uint64_t offset)
{
void *buf = malloc(target_block_size);
cluster_op_t *op = new cluster_op_t;
snap_rw_op_t *rwo = new snap_rw_op_t;
// Initialize counter to 1 to later allow write_subop() to return immediately
// (even though it shouldn't really do that)
rwo->todo = 1;
rwo->buf = malloc(target_block_size);
rwo->offset = offset;
rwo_read(rwo);
}
void rwo_read(snap_rw_op_t *rwo)
{
cluster_op_t *op = &rwo->op;
op->opcode = OSD_OP_READ;
op->inode = target;
op->offset = offset;
op->offset = rwo->offset;
op->len = target_block_size;
op->iov.push_back(buf, target_block_size);
op->callback = [this](cluster_op_t *op)
op->iov.push_back(rwo->buf, target_block_size);
op->callback = [this, rwo](cluster_op_t *op)
{
// Write each non-empty range using an individual operation
// FIXME: Allow to use a single write with bitmap (OSDs don't allow it yet)
uint32_t gran = parent->cli->get_bs_bitmap_granularity();
uint64_t bitmap_size = target_block_size/gran;
uint32_t start = 0, end = 0;
int i;
// Track pending subops allowing write_subop() to return immediately (just in case)
op->version = bitmap_size;
for (i = 0; i < bitmap_size; i++)
if (op->retval != op->len)
{
auto bit = ((*(uint8_t*)(op->bitmap_buf + (i >> 3))) & (1 << (i & 0x7)));
if (!bit && end > start)
{
// write end->start
op->version++;
write_subop(op, start, end);
}
end += gran;
if (!bit)
fprintf(stderr, "error reading target at offset %lx: %s\n", op->offset, strerror(-op->retval));
exit(1);
}
next_write(rwo);
};
parent->cli->execute(op);
}
void next_write(snap_rw_op_t *rwo)
{
// Write each non-empty range using an individual operation
// FIXME: Allow to use single write with "holes" (OSDs don't allow it yet)
uint32_t gran = parent->cli->get_bs_bitmap_granularity();
uint64_t bitmap_size = target_block_size / gran;
while (rwo->end < bitmap_size)
{
auto bit = ((*(uint8_t*)(rwo->op.bitmap_buf + (rwo->end >> 3))) & (1 << (rwo->end & 0x7)));
if (!bit)
{
if (rwo->end > rwo->start)
{
start = end;
// write start->end
rwo->todo++;
write_subop(rwo, rwo->start*gran, rwo->end*gran, use_cas ? 1+rwo->op.version : 0);
rwo->start = rwo->end;
if (use_cas)
{
// Submit one by one if using CAS writes
return;
}
}
rwo->start = rwo->end = rwo->end+1;
}
if (end > start)
else
{
// write end->start
op->version++;
write_subop(op, start, end);
rwo->end++;
}
op->version -= bitmap_size;
// Just in case
autofree_op(op);
};
parent->cli->execute(op);
}
if (rwo->end > rwo->start)
{
// write start->end
rwo->todo++;
write_subop(rwo, rwo->start*gran, rwo->end*gran, use_cas ? 1+rwo->op.version : 0);
rwo->start = rwo->end;
if (use_cas)
{
return;
}
}
rwo->todo--;
// Just in case, if everything is done
autofree_op(rwo);
}
void write_subop(cluster_op_t *op, uint32_t start, uint32_t end)
void write_subop(snap_rw_op_t *rwo, uint32_t start, uint32_t end, uint64_t version)
{
void *buf = op->iov.buf[0].iov_base;
cluster_op_t *subop = new cluster_op_t;
subop->opcode = OSD_OP_WRITE;
subop->inode = target;
subop->offset = op->offset+start;
subop->offset = rwo->offset+start;
subop->len = end-start;
subop->iov.push_back(buf+start, end-start);
subop->callback = [this, op](cluster_op_t *subop)
subop->version = version;
subop->iov.push_back(rwo->buf+start, end-start);
subop->callback = [this, rwo](cluster_op_t *subop)
{
rwo->todo--;
if (subop->retval != subop->len)
{
if (use_cas && subop->retval == -EINTR)
{
// CAS failure - reread and repeat optimistically
rwo->start = subop->offset - rwo->offset;
rwo_read(rwo);
delete subop;
return;
}
fprintf(stderr, "error writing target at offset %lx: %s\n", subop->offset, strerror(-subop->retval));
exit(1);
}
op->version--;
autofree_op(op);
// Increment CAS version
rwo->op.version++;
if (use_cas)
next_write(rwo);
else
autofree_op(rwo);
delete subop;
};
parent->cli->execute(subop);
@ -675,13 +744,13 @@ struct snap_merger_t
parent->cli->execute(subop);
}
void autofree_op(cluster_op_t *op)
void autofree_op(snap_rw_op_t *rwo)
{
if (!op->version)
if (!rwo->todo)
{
if (last_written_offset < op->offset+target_block_size)
if (last_written_offset < rwo->op.offset+target_block_size)
{
last_written_offset = op->offset+target_block_size;
last_written_offset = rwo->op.offset+target_block_size;
}
if (delete_source)
{
@ -712,9 +781,8 @@ struct snap_merger_t
parent->cli->execute(subop);
}
}
void *buf = op->iov.buf[0].iov_base;
free(buf);
delete op;
free(rwo->buf);
delete rwo;
in_flight--;
continue_merge_reent();
}
@ -747,7 +815,7 @@ void cli_tool_t::run(json11::Json cfg)
}
else if (cmd[0] == "merge")
{
// Merge layers
// Merge layer data without affecting metadata
merger = new snap_merger_t();
merger->parent = this;
merger->from_name = cmd[1].string_value();
@ -758,10 +826,12 @@ void cli_tool_t::run(json11::Json cfg)
fprintf(stderr, "Beginning or end of the merge sequence is missing\n");
exit(1);
}
merger->delete_source = cfg["delete"].string_value() != "";
merger->delete_source = cfg["delete-source"].string_value() != "";
merger->fsync_interval = cfg["fsync-interval"].uint64_value();
if (!merger->fsync_interval)
merger->fsync_interval = 128;
if (!cfg["cas"].is_null())
merger->use_cas = cfg["cas"].uint64_value() ? 2 : 0;
}
else
{

Loading…
Cancel
Save