diff --git a/README-ru.md b/README-ru.md index 70ba4f2c..492a4681 100644 --- a/README-ru.md +++ b/README-ru.md @@ -40,7 +40,7 @@ Vitastor на данный момент находится в статусе п - Драйвер диска для QEMU (собирается вне дерева исходников QEMU) - Драйвер диска для утилиты тестирования производительности fio (также собирается вне дерева исходников fio) - NBD-прокси для монтирования образов ядром ("блочное устройство в режиме пользователя") -- Утилита удаления образов/инодов (vitastor-rm) +- Утилита для удаления образов/инодов (vitastor-cmd rm) - Пакеты для Debian и CentOS - Статистика операций ввода/вывода и занятого места в разрезе инодов - Именование инодов через хранение их метаданных в etcd @@ -49,6 +49,7 @@ Vitastor на данный момент находится в статусе п - Поддержка RDMA/RoCEv2 через libibverbs - CSI-плагин для Kubernetes - Базовая поддержка OpenStack: драйвер Cinder, патчи для Nova и libvirt +- Слияние снапшотов (vitastor-cmd merge) ## Планы развития @@ -491,10 +492,10 @@ qemu-system-x86_64 -enable-kvm -m 1024 ### Удалить образ -Используйте утилиту vitastor-rm. Например: +Используйте утилиту vitastor-cmd rm. Например: ``` -vitastor-rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32 +vitastor-cmd rm --etcd_address 10.115.0.10:2379/v3 --pool 1 --inode 1 --parallel_osds 16 --iodepth 32 ``` ### NBD diff --git a/README.md b/README.md index 5f73eb0d..7ccbf072 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ breaking changes in the future. However, the following is implemented: - QEMU driver (built out-of-tree) - Loadable fio engine for benchmarks (also built out-of-tree) - NBD proxy for kernel mounts -- Inode removal tool (vitastor-rm) +- Inode removal tool (vitastor-cmd rm) - Packaging for Debian and CentOS - Per-inode I/O and space usage statistics - Inode metadata storage in etcd @@ -43,6 +43,7 @@ breaking changes in the future. However, the following is implemented: - RDMA/RoCEv2 support via libibverbs - CSI plugin for Kubernetes - Basic OpenStack support: Cinder driver, Nova and libvirt patches +- Snapshot merge tool (vitastor-cmd merge) ## Roadmap diff --git a/csi/src/controllerserver.go b/csi/src/controllerserver.go index 78b199bf..261a5f54 100644 --- a/csi/src/controllerserver.go +++ b/csi/src/controllerserver.go @@ -354,9 +354,9 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.Internal, "invalid "+inodeCfgKey+" key in etcd: "+err.Error()) } - // Delete inode data by invoking vitastor-rm + // Delete inode data by invoking vitastor-cmd args := []string{ - "--etcd_address", strings.Join(etcdUrl, ","), + "rm", "--etcd_address", strings.Join(etcdUrl, ","), "--pool", fmt.Sprintf("%d", idx.PoolId), "--inode", fmt.Sprintf("%d", idx.Id), } @@ -364,7 +364,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol { args = append(args, "--config_path", ctxVars["configPath"]) } - c := exec.Command("/usr/bin/vitastor-rm", args...) + c := exec.Command("/usr/bin/vitastor-cmd", args...) var stderr bytes.Buffer c.Stdout = nil c.Stderr = &stderr @@ -372,7 +372,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol stderrStr := string(stderr.Bytes()) if (err != nil) { - klog.Errorf("vitastor-rm failed: %s, status %s\n", stderrStr, err) + klog.Errorf("vitastor-cmd rm failed: %s, status %s\n", stderrStr, err) return nil, status.Error(codes.Internal, stderrStr+" (status "+err.Error()+")") } diff --git a/patches/cinder-vitastor.py b/patches/cinder-vitastor.py index 8284db88..2042ebdf 100644 --- a/patches/cinder-vitastor.py +++ b/patches/cinder-vitastor.py @@ -514,7 +514,7 @@ class VitastorDriver(driver.CloneableImageVD, # Clear data for kv in layers: args = [ - 'vitastor-rm', '--pool', str(kv['value']['pool_id']), + 'vitastor-cmd', 'rm', '--pool', str(kv['value']['pool_id']), '--inode', str(kv['value']['id']), '--progress', '0', *(self._vitastor_args()) ] diff --git a/rpm/vitastor-el7.spec b/rpm/vitastor-el7.spec index 7e8bd8ec..b3594cc8 100644 --- a/rpm/vitastor-el7.spec +++ b/rpm/vitastor-el7.spec @@ -57,6 +57,7 @@ cp -r mon %buildroot/usr/lib/vitastor/mon %_bindir/vitastor-dump-journal %_bindir/vitastor-nbd %_bindir/vitastor-osd +%_bindir/vitastor-cmd %_bindir/vitastor-rm %_libdir/qemu-kvm/block-vitastor.so %_libdir/libfio_vitastor.so diff --git a/rpm/vitastor-el8.spec b/rpm/vitastor-el8.spec index bdacb57f..607ede29 100644 --- a/rpm/vitastor-el8.spec +++ b/rpm/vitastor-el8.spec @@ -54,6 +54,7 @@ cp -r mon %buildroot/usr/lib/vitastor %_bindir/vitastor-dump-journal %_bindir/vitastor-nbd %_bindir/vitastor-osd +%_bindir/vitastor-cmd %_bindir/vitastor-rm %_libdir/qemu-kvm/block-vitastor.so %_libdir/libfio_vitastor.so diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0f8e70de..c108831b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -36,6 +36,11 @@ string(REGEX REPLACE "([\\/\\-]D) *NDEBUG" "" CMAKE_C_FLAGS_RELEASE "${CMAKE_C_F string(REGEX REPLACE "([\\/\\-]D) *NDEBUG" "" CMAKE_C_FLAGS_MINSIZEREL "${CMAKE_C_FLAGS_MINSIZEREL}") string(REGEX REPLACE "([\\/\\-]D) *NDEBUG" "" CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO}") +macro(install_symlink filepath sympath) + install(CODE "execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink ${filepath} ${sympath})") + install(CODE "message(\"-- Created symlink: ${sympath} -> ${filepath}\")") +endmacro(install_symlink) + find_package(PkgConfig) pkg_check_modules(LIBURING REQUIRED liburing) if (${WITH_QEMU}) @@ -146,11 +151,11 @@ target_link_libraries(vitastor-nbd vitastor_client ) -# vitastor-rm -add_executable(vitastor-rm - rm_inode.cpp +# vitastor-cmd +add_executable(vitastor-cmd + cmd.cpp ) -target_link_libraries(vitastor-rm +target_link_libraries(vitastor-cmd vitastor_client ) @@ -235,7 +240,8 @@ target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mo ### Install -install(TARGETS vitastor-osd vitastor-dump-journal vitastor-nbd vitastor-rm RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) +install(TARGETS vitastor-osd vitastor-dump-journal vitastor-nbd vitastor-cmd RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) +install_symlink(${CMAKE_INSTALL_BINDIR}/vitastor-rm vitastor-cmd) install( TARGETS vitastor_blk vitastor_client LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index 772310d4..b6e077ee 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -140,7 +140,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op) if (!op->prev_wait && pgs_loaded) continue_sync(op); } - else + else /* if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) */ { for (auto prev = op->prev; prev; prev = prev->prev) { @@ -148,7 +148,7 @@ void cluster_client_t::calc_wait(cluster_op_t *op) { op->prev_wait++; } - else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ) + else if (prev->opcode == OSD_OP_WRITE || prev->opcode == OSD_OP_READ || prev->opcode == OSD_OP_READ_BITMAP) { // Flushes are always in the beginning break; @@ -168,7 +168,7 @@ void cluster_client_t::inc_wait(uint64_t opcode, uint64_t flags, cluster_op_t *n auto n2 = next->next; if (next->opcode == OSD_OP_SYNC || next->opcode == OSD_OP_WRITE && (flags & OP_FLUSH_BUFFER) && !(next->flags & OP_FLUSH_BUFFER) || - next->opcode == OSD_OP_READ && (flags & OP_FLUSH_BUFFER)) + (next->opcode == OSD_OP_READ || next->opcode == OSD_OP_READ_BITMAP) && (flags & OP_FLUSH_BUFFER)) { next->prev_wait += inc; if (!next->prev_wait) @@ -358,7 +358,7 @@ void cluster_client_t::on_change_hook(std::map & changes // And now they have to be resliced! for (auto op = op_queue_head; op; op = op->next) { - if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ) && + if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) && INODE_POOL(op->cur_inode) == pool_item.first) { op->needs_reslice = true; @@ -418,7 +418,8 @@ void cluster_client_t::on_ready(std::function fn) */ void cluster_client_t::execute(cluster_op_t *op) { - if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && op->opcode != OSD_OP_WRITE) + if (op->opcode != OSD_OP_SYNC && op->opcode != OSD_OP_READ && + op->opcode != OSD_OP_READ_BITMAP && op->opcode != OSD_OP_WRITE) { op->retval = -EINVAL; std::function(op->callback)(op); @@ -595,7 +596,8 @@ int cluster_client_t::continue_rw(cluster_op_t *op) else if (op->state == 3) goto resume_3; resume_0: - if (!op->len || op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity) + if ((op->opcode == OSD_OP_READ || op->opcode == OSD_OP_WRITE) && !op->len || + op->offset % bs_bitmap_granularity || op->len % bs_bitmap_granularity) { op->retval = -EINVAL; erase_op(op); @@ -616,7 +618,7 @@ resume_0: return 0; } } - if (op->opcode == OSD_OP_WRITE) + if (op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE) { auto ino_it = st_cli.inode_config.find(op->inode); if (ino_it != st_cli.inode_config.end() && ino_it->second.readonly) @@ -625,7 +627,7 @@ resume_0: erase_op(op); return 1; } - if (!immediate_commit && !(op->flags & OP_FLUSH_BUFFER)) + if (op->opcode == OSD_OP_WRITE && !immediate_commit && !(op->flags & OP_FLUSH_BUFFER)) { copy_write(op, dirty_buffers); } @@ -634,7 +636,7 @@ resume_1: // Slice the operation into parts slice_rw(op); op->needs_reslice = false; - if (op->opcode == OSD_OP_WRITE && op->version && op->parts.size() > 1) + if ((op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE) && op->version && op->parts.size() > 1) { // Atomic writes to multiple stripes are unsupported op->retval = -EINVAL; @@ -794,13 +796,13 @@ void cluster_client_t::slice_rw(cluster_op_t *op) uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); uint64_t pg_block_size = bs_block_size * pg_data_size; uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; - uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; + uint64_t last_stripe = op->len > 0 ? ((op->offset + op->len - 1) / pg_block_size) * pg_block_size : first_stripe; op->retval = 0; op->parts.resize((last_stripe - first_stripe) / pg_block_size + 1); - if (op->opcode == OSD_OP_READ) + if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) { // Allocate memory for the bitmap - unsigned object_bitmap_size = ((op->len / bs_bitmap_granularity + 7) / 8); + unsigned object_bitmap_size = (((op->opcode == OSD_OP_READ_BITMAP ? pg_block_size : op->len) / bs_bitmap_granularity + 7) / 8); object_bitmap_size = (object_bitmap_size < 8 ? 8 : object_bitmap_size); unsigned bitmap_mem = object_bitmap_size + (bs_bitmap_size * pg_data_size) * op->parts.size(); if (op->bitmap_buf_size < bitmap_mem) @@ -864,13 +866,13 @@ void cluster_client_t::slice_rw(cluster_op_t *op) if (end == begin) op->done_count++; } - else + else if (op->opcode != OSD_OP_READ_BITMAP && op->opcode != OSD_OP_DELETE) { add_iov(end-begin, false, op, iov_idx, iov_pos, op->parts[i].iov, NULL, 0); } op->parts[i].parent = op; op->parts[i].offset = begin; - op->parts[i].len = (uint32_t)(end - begin); + op->parts[i].len = op->opcode == OSD_OP_READ_BITMAP || op->opcode == OSD_OP_DELETE ? 0 : (uint32_t)(end - begin); op->parts[i].pg_num = pg_num; op->parts[i].osd_num = 0; op->parts[i].flags = 0; @@ -884,7 +886,7 @@ bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); uint64_t pg_block_size = bs_block_size * pg_data_size; uint64_t first_stripe = (offset / pg_block_size) * pg_block_size; - uint64_t last_stripe = ((offset + len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; + uint64_t last_stripe = len > 0 ? ((offset + len - 1) / pg_block_size) * pg_block_size : first_stripe; for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) { pg_num_t pg_num = (stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg() @@ -917,9 +919,12 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i) pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks ); uint64_t meta_rev = 0; - auto ino_it = st_cli.inode_config.find(op->inode); - if (ino_it != st_cli.inode_config.end()) - meta_rev = ino_it->second.mod_revision; + if (op->opcode != OSD_OP_READ_BITMAP && op->opcode != OSD_OP_DELETE) + { + auto ino_it = st_cli.inode_config.find(op->inode); + if (ino_it != st_cli.inode_config.end()) + meta_rev = ino_it->second.mod_revision; + } part->op = (osd_op_t){ .op_type = OSD_OP_OUT, .peer_fd = peer_fd, @@ -927,16 +932,16 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i) .header = { .magic = SECONDARY_OSD_OP_MAGIC, .id = op_id++, - .opcode = op->opcode, + .opcode = op->opcode == OSD_OP_READ_BITMAP ? OSD_OP_READ : op->opcode, }, .inode = op->cur_inode, .offset = part->offset, .len = part->len, .meta_revision = meta_rev, - .version = op->opcode == OSD_OP_WRITE ? op->version : 0, + .version = op->opcode == OSD_OP_WRITE || op->opcode == OSD_OP_DELETE ? op->version : 0, } }, - .bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i, - .bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size), + .bitmap = (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP ? op->part_bitmaps + pg_bitmap_size*i : NULL), + .bitmap_len = (unsigned)(op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP ? pg_bitmap_size : 0), .callback = [this, part](osd_op_t *op_part) { handle_op_part(part); @@ -1118,7 +1123,7 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part) dirty_osds.insert(part->osd_num); part->flags |= PART_DONE; op->done_count++; - if (op->opcode == OSD_OP_READ) + if (op->opcode == OSD_OP_READ || op->opcode == OSD_OP_READ_BITMAP) { copy_part_bitmap(op, part); op->version = op->parts.size() == 1 ? part->op.reply.rw.version : 0; @@ -1142,7 +1147,7 @@ void cluster_client_t::copy_part_bitmap(cluster_op_t *op, cluster_op_part_t *par ); uint32_t object_offset = (part->op.req.rw.offset - op->offset) / bs_bitmap_granularity; uint32_t part_offset = (part->op.req.rw.offset % pg_block_size) / bs_bitmap_granularity; - uint32_t part_len = part->op.req.rw.len / bs_bitmap_granularity; + uint32_t part_len = (op->opcode == OSD_OP_READ_BITMAP ? pg_block_size : part->op.req.rw.len) / bs_bitmap_granularity; if (!(object_offset & 0x7) && !(part_offset & 0x7) && (part_len >= 8)) { // Copy bytes diff --git a/src/cluster_client.h b/src/cluster_client.h index 66892bfc..08b543d7 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -12,6 +12,7 @@ #define DEFAULT_CLIENT_MAX_DIRTY_OPS 1024 #define INODE_LIST_DONE 1 #define INODE_LIST_HAS_UNSTABLE 2 +#define OSD_OP_READ_BITMAP OSD_OP_SEC_READ_BMP struct cluster_op_t; @@ -29,7 +30,7 @@ struct cluster_op_part_t struct cluster_op_t { - uint64_t opcode; // OSD_OP_READ, OSD_OP_WRITE, OSD_OP_SYNC + uint64_t opcode; // OSD_OP_READ, OSD_OP_WRITE, OSD_OP_SYNC, OSD_OP_DELETE, OSD_OP_READ_BITMAP uint64_t inode; uint64_t offset; uint64_t len; @@ -38,6 +39,8 @@ struct cluster_op_t uint64_t version = 0; int retval; osd_op_buf_list_t iov; + // READ and READ_BITMAP return the bitmap here + void *bitmap_buf = NULL; std::function callback; ~cluster_op_t(); protected: @@ -50,7 +53,7 @@ protected: bool up_wait = false; int inflight_count = 0, done_count = 0; std::vector parts; - void *bitmap_buf = NULL, *part_bitmaps = NULL; + void *part_bitmaps = NULL; unsigned bitmap_buf_size = 0; cluster_op_t *prev = NULL, *next = NULL; int prev_wait = 0; @@ -115,9 +118,11 @@ public: static void copy_write(cluster_op_t *op, std::map & dirty_buffers); void continue_ops(bool up_retry = false); inode_list_t *list_inode_start(inode_t inode, - std::function&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback); + std::function&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback); int list_pg_count(inode_list_t *lst); void list_inode_next(inode_list_t *lst, int next_pgs); + inline uint32_t get_bs_bitmap_granularity() { return bs_bitmap_granularity; } + inline uint64_t get_bs_block_size() { return bs_block_size; } uint64_t next_op_id(); protected: diff --git a/src/cluster_client_list.cpp b/src/cluster_client_list.cpp index 5e3f2c64..779a6f76 100644 --- a/src/cluster_client_list.cpp +++ b/src/cluster_client_list.cpp @@ -37,11 +37,11 @@ struct inode_list_t int done_pgs = 0; int want = 0; std::vector pgs; - std::function&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback; + std::function&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback; }; inode_list_t* cluster_client_t::list_inode_start(inode_t inode, - std::function&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback) + std::function&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status)> callback) { int skipped_pgs = 0; pool_id_t pool_id = INODE_POOL(inode); @@ -264,7 +264,7 @@ void cluster_client_t::send_list(inode_list_osd_t *cur_list) { status |= INODE_LIST_HAS_UNSTABLE; } - lst->callback(std::move(pg->objects), pg->pg_num, pg->cur_primary, status); + lst->callback(lst, std::move(pg->objects), pg->pg_num, pg->cur_primary, status); lst->pgs[pg->pos] = NULL; delete pg; } diff --git a/src/cmd.cpp b/src/cmd.cpp new file mode 100644 index 00000000..c85784e0 --- /dev/null +++ b/src/cmd.cpp @@ -0,0 +1,834 @@ +// Copyright (c) Vitaliy Filippov, 2019+ +// License: VNPL-1.1 (see README.md for details) + +/** + * CLI tool + * Currently can (a) remove inodes and (b) merge snapshot/clone layers + */ + +#include +#include + +#include "cpp-btree/safe_btree_set.h" +#include "epoll_manager.h" +#include "cluster_client.h" +#include "pg_states.h" + +#define RM_LISTING 1 +#define RM_REMOVING 2 +#define RM_END 3 + +const char *exe_name = NULL; + +struct rm_pg_t +{ + pg_num_t pg_num; + osd_num_t rm_osd_num; + std::set objects; + std::set::iterator obj_pos; + uint64_t obj_count = 0, obj_done = 0, obj_prev_done = 0; + int state = 0; + int in_flight = 0; +}; + +struct rm_inode_t; +struct snap_merger_t; + +class cli_tool_t +{ +protected: + uint64_t iodepth = 0, parallel_osds = 0; + bool progress = true; + bool list_first = false; + int log_level = 0; + int mode = 0; + + ring_loop_t *ringloop = NULL; + epoll_manager_t *epmgr = NULL; + cluster_client_t *cli = NULL; + ring_consumer_t consumer; + bool started = false; + + rm_inode_t *remover = NULL; + snap_merger_t *merger = NULL; + +public: + static json11::Json::object parse_args(int narg, const char *args[]) + { + json11::Json::object cfg; + json11::Json::array cmd; + cfg["progress"] = "1"; + for (int i = 1; i < narg; i++) + { + if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help")) + { + help(); + } + else if (args[i][0] == '-' && args[i][1] == '-') + { + const char *opt = args[i]+2; + cfg[opt] = !strcmp(opt, "json") || !strcmp(opt, "wait-list") || i == narg-1 ? "1" : args[++i]; + } + else + { + cmd.push_back(std::string(args[i])); + } + } + if (!cmd.size()) + { + std::string exe(exe_name); + if (exe.substr(exe.size()-11) == "vitastor-rm") + { + cmd.push_back("rm"); + } + } + cfg["command"] = cmd; + return cfg; + } + + static void help() + { + printf( + "Vitastor inode removal tool\n" + "(c) Vitaliy Filippov, 2020 (VNPL-1.1)\n\n" + "USAGE:\n" + " %s rm [--etcd_address ] --pool --inode \n" + " [--wait-list] [--iodepth 32] [--parallel_osds 4] [--progress 1]\n" + " %s merge [--etcd_address ] [--target ]\n" + " [--iodepth 128] [--progress 1]\n", + exe_name, exe_name + ); + exit(0); + } + + void run(json11::Json cfg); + void start_work(); + void continue_work(); + + friend struct rm_inode_t; + friend struct snap_merger_t; +}; + +struct rm_inode_t +{ + uint64_t inode = 0; + pool_id_t pool_id = 0; + + cli_tool_t *parent = NULL; + inode_list_t *lister = NULL; + std::vector lists; + uint64_t total_count = 0, total_done = 0, total_prev_pct = 0; + uint64_t pgs_to_list = 0; + bool lists_done = false; + + void start_delete() + { + lister = parent->cli->list_inode_start(inode, [this](inode_list_t *lst, std::set&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status) + { + rm_pg_t *rm = new rm_pg_t({ + .pg_num = pg_num, + .rm_osd_num = primary_osd, + .objects = objects, + .obj_count = objects.size(), + .obj_done = 0, + .obj_prev_done = 0, + }); + rm->obj_pos = rm->objects.begin(); + lists.push_back(rm); + if (parent->list_first) + { + parent->cli->list_inode_next(lister, 1); + } + if (status & INODE_LIST_DONE) + { + lists_done = true; + } + pgs_to_list--; + continue_delete(); + }); + if (!lister) + { + fprintf(stderr, "Failed to list inode %lx objects\n", inode); + exit(1); + } + pgs_to_list = parent->cli->list_pg_count(lister); + parent->cli->list_inode_next(lister, parent->parallel_osds); + } + + void send_ops(rm_pg_t *cur_list) + { + if (parent->cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) == + parent->cli->msgr.osd_peer_fds.end()) + { + // Initiate connection + parent->cli->msgr.connect_peer(cur_list->rm_osd_num, parent->cli->st_cli.peer_states[cur_list->rm_osd_num]); + return; + } + while (cur_list->in_flight < parent->iodepth && cur_list->obj_pos != cur_list->objects.end()) + { + osd_op_t *op = new osd_op_t(); + op->op_type = OSD_OP_OUT; + op->peer_fd = parent->cli->msgr.osd_peer_fds[cur_list->rm_osd_num]; + op->req = (osd_any_op_t){ + .rw = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = parent->cli->next_op_id(), + .opcode = OSD_OP_DELETE, + }, + .inode = cur_list->obj_pos->inode, + .offset = cur_list->obj_pos->stripe, + .len = 0, + }, + }; + op->callback = [this, cur_list](osd_op_t *op) + { + cur_list->in_flight--; + if (op->reply.hdr.retval < 0) + { + fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n", + op->req.rw.inode, op->req.rw.offset, + cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval); + } + delete op; + cur_list->obj_done++; + total_done++; + continue_delete(); + }; + cur_list->obj_pos++; + cur_list->in_flight++; + parent->cli->msgr.outbox_push(op); + } + } + + void continue_delete() + { + if (parent->list_first && !lists_done) + { + return; + } + for (int i = 0; i < lists.size(); i++) + { + if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end()) + { + delete lists[i]; + lists.erase(lists.begin()+i, lists.begin()+i+1); + i--; + if (!lists_done) + { + parent->cli->list_inode_next(lister, 1); + } + } + else + { + send_ops(lists[i]); + } + } + if (parent->progress && total_count > 0 && total_done*1000/total_count != total_prev_pct) + { + printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list); + total_prev_pct = total_done*1000/total_count; + } + if (lists_done && !lists.size()) + { + printf("Done, inode %lu in pool %u removed\n", (inode & ((1l << (64-POOL_ID_BITS)) - 1)), pool_id); + exit(0); + } + } +}; + +// Layer merge is the base for multiple operations: +// 1) Delete snapshot "up" = merge child layer into the parent layer, remove the child +// and rename the parent to the child +// 2) Delete snapshot "down" = merge parent layer into the child layer and remove the parent +// 3) Flatten image = merge parent layers into the child layer and break the connection +struct snap_merger_t +{ + cli_tool_t *parent; + + // merge from..to into target (target may be one of from..to) + std::string from_name, to_name, target_name; + + // inode=>rank (bigger rank means child layers) + std::map sources; + // target to merge data into + inode_t target; + // rank of the target + int target_rank; + // delete merged source inode data during merge + bool delete_source = false; + // don't necessarily delete source data, but perform checks as if we were to do it + bool check_delete_source = false; + // interval between fsyncs + int fsync_interval = 128; + + bool inside_continue = false; + int state = 0; + int lists_todo = 0; + uint64_t target_block_size = 0; + btree::safe_btree_set merge_offsets; + btree::safe_btree_set::iterator oit; + std::map> layer_lists; + std::map layer_block_size; + std::map layer_list_pos; + int in_flight = 0; + uint64_t last_fsync_offset = 0; + uint64_t last_written_offset = 0; + int deleted_unsynced = 0; + uint64_t processed = 0, to_process = 0; + + inode_config_t* get_inode_cfg(const std::string & name) + { + for (auto & ic: parent->cli->st_cli.inode_config) + { + if (ic.second.name == name) + { + return &ic.second; + } + } + fprintf(stderr, "Layer %s not found\n", name.c_str()); + exit(1); + } + + void start_merge() + { + check_delete_source = delete_source || check_delete_source; + inode_config_t *from_cfg = get_inode_cfg(from_name); + inode_config_t *to_cfg = get_inode_cfg(to_name); + inode_config_t *target_cfg = target_name == "" ? from_cfg : get_inode_cfg(target_name); + if (to_cfg->num == from_cfg->num) + { + fprintf(stderr, "Only one layer specified, nothing to merge\n"); + exit(1); + } + // Check that to_cfg is actually a child of from_cfg and target_cfg is somewhere between them + std::vector chain_list; + inode_config_t *cur = to_cfg; + chain_list.push_back(cur->num); + layer_block_size[cur->num] = get_block_size(cur->num); + while (cur->parent_id != from_cfg->num && + cur->parent_id != to_cfg->num && + cur->parent_id != 0) + { + auto it = parent->cli->st_cli.inode_config.find(cur->parent_id); + if (it == parent->cli->st_cli.inode_config.end()) + { + fprintf(stderr, "Parent inode of layer %s (%lx) not found\n", cur->name.c_str(), cur->parent_id); + exit(1); + } + cur = &it->second; + chain_list.push_back(cur->num); + layer_block_size[cur->num] = get_block_size(cur->num); + } + if (cur->parent_id != from_cfg->num) + { + fprintf(stderr, "Layer %s is not a child of %s\n", to_name.c_str(), from_name.c_str()); + exit(1); + } + chain_list.push_back(from_cfg->num); + layer_block_size[from_cfg->num] = get_block_size(from_cfg->num); + int i = chain_list.size()-1; + for (inode_t item: chain_list) + { + sources[item] = i--; + } + if (sources.find(target_cfg->num) == sources.end()) + { + fprintf(stderr, "Layer %s is not between %s and %s\n", target_name.c_str(), to_name.c_str(), from_name.c_str()); + exit(1); + } + target = target_cfg->num; + target_rank = sources.at(target); + int to_rank = sources.at(to_cfg->num); + // Check that there are no other inodes dependent on altered layers + // + // 1) everything between and except is not allowed + // to have children other than if is a child of : + // + // - - + // \- <--------X--------- NOT ALLOWED + // + // 2) everything between and , except , is not allowed + // to have children other than if sources are to be deleted after merging: + // + // - - - + // \- <---------X-------- NOT ALLOWED + for (auto & ic: parent->cli->st_cli.inode_config) + { + auto it = sources.find(ic.second.num); + if (it == sources.end() && ic.second.parent_id != 0) + { + it = sources.find(ic.second.parent_id); + if (it != sources.end()) + { + int parent_rank = it->second; + if (parent_rank < to_rank && (parent_rank >= target_rank || check_delete_source)) + { + fprintf( + stderr, "Layers at or above %s, but below %s are not allowed" + " to have other children, but %s is a child of %s\n", + (check_delete_source ? from_name.c_str() : target_name.c_str()), + to_name.c_str(), ic.second.name.c_str(), + parent->cli->st_cli.inode_config.at(ic.second.parent_id).name.c_str() + ); + exit(1); + } + } + } + } + sources.erase(target); + printf("Merging %ld layers into target inode %lx\n", sources.size(), target); + target_block_size = get_block_size(target); + continue_merge_reent(); + } + + uint64_t get_block_size(inode_t inode) + { + auto & pool_cfg = parent->cli->st_cli.pool_config.at(INODE_POOL(inode)); + uint64_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks); + return parent->cli->get_bs_block_size() * pg_data_size; + } + + void continue_merge_reent() + { + if (!inside_continue) + { + inside_continue = true; + continue_merge(); + inside_continue = false; + } + } + + void continue_merge() + { + if (state == 1) + goto resume_1; + else if (state == 2) + goto resume_2; + else if (state == 3) + goto resume_3; + else if (state == 4) + goto resume_4; + else if (state == 5) + goto resume_5; + else if (state == 6) + goto resume_6; + // First list lower layers + list_layers(true); + state = 1; + resume_1: + while (lists_todo > 0) + { + // Wait for lists + return; + } + if (merge_offsets.size() > 0) + { + state = 2; + oit = merge_offsets.begin(); + processed = 0; + to_process = merge_offsets.size(); + resume_2: + // Then remove blocks already filled in target by issuing zero-length reads and checking bitmaps + while (in_flight < parent->iodepth*parent->parallel_osds && oit != merge_offsets.end()) + { + in_flight++; + check_if_full(*oit); + oit++; + processed++; + if (parent->progress && !(processed % 128)) + { + printf("\rFiltering target blocks: %lu/%lu", processed, to_process); + } + } + if (in_flight > 0 || oit != merge_offsets.end()) + { + // Wait until reads finish + return; + } + if (parent->progress) + { + printf("\r%lu full blocks of target filtered out\n", to_process-merge_offsets.size()); + } + } + state = 3; + resume_3: + // Then list upper layers + list_layers(false); + state = 4; + resume_4: + while (lists_todo > 0) + { + // Wait for lists + return; + } + state = 5; + processed = 0; + to_process = merge_offsets.size(); + oit = merge_offsets.begin(); + resume_5: + // Now read, overwrite and optionally delete offsets one by one + while (in_flight < parent->iodepth*parent->parallel_osds && oit != merge_offsets.end()) + { + in_flight++; + read_and_write(*oit); + oit++; + processed++; + if (parent->progress && !(processed % 128)) + { + printf("\rOverwriting blocks: %lu/%lu", processed, to_process); + } + } + if (in_flight > 0 || oit != merge_offsets.end()) + { + // Wait until overwrites finish + return; + } + if (parent->progress) + { + printf("\rOverwriting blocks: %lu/%lu\n", to_process, to_process); + } + state = 6; + resume_6: + // Done + printf("Done, layers from %s to %s merged into %s\n", from_name.c_str(), to_name.c_str(), target_name.c_str()); + exit(0); + } + + void list_layers(bool lower) + { + for (auto & sp: sources) + { + inode_t src = sp.first; + if (lower ? (sp.second < target_rank) : (sp.second > target_rank)) + { + lists_todo++; + inode_list_t* lst = parent->cli->list_inode_start(src, [this, src]( + inode_list_t *lst, std::set&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status) + { + uint64_t layer_block = layer_block_size.at(src); + for (object_id obj: objects) + { + merge_offsets.insert(obj.stripe - obj.stripe % target_block_size); + for (int i = target_block_size; i < layer_block; i += target_block_size) + { + merge_offsets.insert(obj.stripe - obj.stripe % target_block_size + i); + } + } + if (delete_source) + { + // Also store individual lists + auto & layer_list = layer_lists[src]; + int pos = layer_list.size(); + layer_list.resize(pos + objects.size()); + for (object_id obj: objects) + { + layer_list[pos++] = obj.stripe; + } + } + if (status & INODE_LIST_DONE) + { + printf("Got listing of inode %lx\n", src); + if (delete_source) + { + // Sort the inode listing + std::sort(layer_lists[src].begin(), layer_lists[src].end()); + } + lists_todo--; + continue_merge_reent(); + } + else + { + parent->cli->list_inode_next(lst, 1); + } + }); + parent->cli->list_inode_next(lst, parent->parallel_osds); + } + } + } + + // Check if is fully written in and remove it from merge_offsets if so + void check_if_full(uint64_t offset) + { + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_READ_BITMAP; + op->inode = target; + op->offset = offset; + op->len = 0; + op->callback = [this](cluster_op_t *op) + { + if (op->retval < 0) + { + fprintf(stderr, "error reading target bitmap at offset %lx: %s\n", op->offset, strerror(-op->retval)); + } + else + { + uint64_t bitmap_bytes = target_block_size/parent->cli->get_bs_bitmap_granularity()/8; + int i; + for (i = 0; i < bitmap_bytes; i++) + { + if (((uint8_t*)op->bitmap_buf)[i] != 0xff) + { + break; + } + } + if (i == bitmap_bytes) + { + // full + merge_offsets.erase(op->offset); + } + } + delete op; + in_flight--; + continue_merge_reent(); + }; + parent->cli->execute(op); + } + + // Read from , write it to and optionally delete it + // from all layers except after fsync'ing + void read_and_write(uint64_t offset) + { + void *buf = malloc(target_block_size); + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_READ; + op->inode = target; + op->offset = offset; + op->len = target_block_size; + op->iov.push_back(buf, target_block_size); + op->callback = [this](cluster_op_t *op) + { + // Write each non-empty range using an individual operation + // FIXME: Allow to use a single write with bitmap (OSDs don't allow it yet) + uint32_t gran = parent->cli->get_bs_bitmap_granularity(); + uint64_t bitmap_size = target_block_size/gran; + uint32_t start = 0, end = 0; + int i; + // Track pending subops allowing write_subop() to return immediately (just in case) + op->version = bitmap_size; + for (i = 0; i < bitmap_size; i++) + { + auto bit = ((*(uint8_t*)(op->bitmap_buf + (i >> 3))) & (1 << (i & 0x7))); + if (!bit && end > start) + { + // write end->start + op->version++; + write_subop(op, start, end); + } + end += gran; + if (!bit) + { + start = end; + } + } + if (end > start) + { + // write end->start + op->version++; + write_subop(op, start, end); + } + op->version -= bitmap_size; + // Just in case + autofree_op(op); + }; + parent->cli->execute(op); + } + + void write_subop(cluster_op_t *op, uint32_t start, uint32_t end) + { + void *buf = op->iov.buf[0].iov_base; + cluster_op_t *subop = new cluster_op_t; + subop->opcode = OSD_OP_WRITE; + subop->inode = target; + subop->offset = op->offset+start; + subop->len = end-start; + subop->iov.push_back(buf+start, end-start); + subop->callback = [this, op](cluster_op_t *subop) + { + if (subop->retval != subop->len) + { + fprintf(stderr, "error writing target at offset %lx: %s\n", subop->offset, strerror(-subop->retval)); + exit(1); + } + op->version--; + autofree_op(op); + delete subop; + }; + parent->cli->execute(subop); + } + + void delete_offset(inode_t inode_num, uint64_t offset) + { + cluster_op_t *subop = new cluster_op_t; + subop->opcode = OSD_OP_DELETE; + subop->inode = inode_num; + subop->offset = offset; + subop->len = 0; + subop->callback = [this](cluster_op_t *subop) + { + if (subop->retval != 0) + { + fprintf(stderr, "error deleting from layer 0x%lx at offset %lx: %s", subop->inode, subop->offset, strerror(-subop->retval)); + } + delete subop; + }; + parent->cli->execute(subop); + } + + void autofree_op(cluster_op_t *op) + { + if (!op->version) + { + if (last_written_offset < op->offset+target_block_size) + { + last_written_offset = op->offset+target_block_size; + } + if (delete_source) + { + deleted_unsynced++; + if (deleted_unsynced >= fsync_interval) + { + uint64_t from = last_fsync_offset, to = last_written_offset; + cluster_op_t *subop = new cluster_op_t; + subop->opcode = OSD_OP_SYNC; + subop->callback = [this, from, to](cluster_op_t *subop) + { + delete subop; + // We can now delete source data between and + // But to do this we have to keep all object lists in memory :-( + for (auto & lp: layer_list_pos) + { + auto & layer_list = layer_lists.at(lp.first); + uint64_t layer_block = layer_block_size.at(lp.first); + int cur_pos = lp.second; + while (cur_pos < layer_list.size() && layer_list[cur_pos]+layer_block < to) + { + delete_offset(lp.first, layer_list[cur_pos]); + cur_pos++; + } + lp.second = cur_pos; + } + }; + parent->cli->execute(subop); + } + } + void *buf = op->iov.buf[0].iov_base; + free(buf); + delete op; + in_flight--; + continue_merge_reent(); + } + } +}; + +void cli_tool_t::run(json11::Json cfg) +{ + json11::Json::array cmd = cfg["command"].array_items(); + if (!cmd.size()) + { + fprintf(stderr, "command is missing\n"); + exit(1); + } + else if (cmd[0] == "rm") + { + // Delete inode + remover = new rm_inode_t(); + remover->parent = this; + remover->inode = cfg["inode"].uint64_value(); + remover->pool_id = cfg["pool"].uint64_value(); + if (remover->pool_id) + remover->inode = (remover->inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)remover->pool_id) << (64-POOL_ID_BITS)); + remover->pool_id = INODE_POOL(remover->inode); + if (!remover->pool_id) + { + fprintf(stderr, "pool is missing\n"); + exit(1); + } + } + else if (cmd[0] == "merge") + { + // Merge layers + merger = new snap_merger_t(); + merger->parent = this; + merger->from_name = cmd[1].string_value(); + merger->to_name = cmd[2].string_value(); + merger->target_name = cfg["target"].string_value(); + if (merger->from_name == "" || merger->to_name == "") + { + fprintf(stderr, "Beginning or end of the merge sequence is missing\n"); + exit(1); + } + merger->delete_source = cfg["delete"].string_value() != ""; + merger->fsync_interval = cfg["fsync-interval"].uint64_value(); + if (!merger->fsync_interval) + merger->fsync_interval = 128; + } + else + { + fprintf(stderr, "unknown command: %s\n", cmd[0].string_value().c_str()); + exit(1); + } + iodepth = cfg["iodepth"].uint64_value(); + if (!iodepth) + iodepth = 32; + parallel_osds = cfg["parallel_osds"].uint64_value(); + if (!parallel_osds) + parallel_osds = 4; + log_level = cfg["log_level"].int64_value(); + progress = cfg["progress"].uint64_value() ? true : false; + list_first = cfg["wait-list"].uint64_value() ? true : false; + // Create client + ringloop = new ring_loop_t(512); + epmgr = new epoll_manager_t(ringloop); + cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); + cli->on_ready([this]() { start_work(); }); + // Initialize job + consumer.loop = [this]() + { + if (started) + continue_work(); + ringloop->submit(); + }; + ringloop->register_consumer(&consumer); + // Loop until it completes + while (1) + { + ringloop->loop(); + ringloop->wait(); + } +} + +void cli_tool_t::start_work() +{ + if (remover) + { + remover->start_delete(); + } + else if (merger) + { + merger->start_merge(); + } + started = true; +} + +void cli_tool_t::continue_work() +{ + if (remover) + { + remover->continue_delete(); + } + else if (merger) + { + merger->continue_merge_reent(); + } +} + +int main(int narg, const char *args[]) +{ + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + exe_name = args[0]; + cli_tool_t *p = new cli_tool_t(); + p->run(cli_tool_t::parse_args(narg, args)); + return 0; +} diff --git a/src/osd_primary.cpp b/src/osd_primary.cpp index edc7c387..91fe9c08 100644 --- a/src/osd_primary.cpp +++ b/src/osd_primary.cpp @@ -198,7 +198,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) { // Fast happy-path cur_op->buf = alloc_read_buffer(op_data->stripes, op_data->pg_data_size, 0); - submit_primary_subops(SUBMIT_READ, op_data->target_ver, pg.cur_set.data(), cur_op); + submit_primary_subops(SUBMIT_RMW_READ, op_data->target_ver, pg.cur_set.data(), cur_op); op_data->st = 1; } else @@ -215,7 +215,7 @@ void osd_t::continue_primary_read(osd_op_t *cur_op) op_data->scheme = pg.scheme; op_data->degraded = 1; cur_op->buf = alloc_read_buffer(op_data->stripes, pg.pg_size, 0); - submit_primary_subops(SUBMIT_READ, op_data->target_ver, cur_set, cur_op); + submit_primary_subops(SUBMIT_RMW_READ, op_data->target_ver, cur_set, cur_op); op_data->st = 1; } } diff --git a/src/rm_inode.cpp b/src/rm_inode.cpp deleted file mode 100644 index f4d66bf4..00000000 --- a/src/rm_inode.cpp +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright (c) Vitaliy Filippov, 2019+ -// License: VNPL-1.1 (see README.md for details) - -/** - * Inode removal tool - * May be included into a bigger "command-line management interface" in the future - */ - -#include -#include - -#include "epoll_manager.h" -#include "cluster_client.h" -#include "pg_states.h" - -#define RM_LISTING 1 -#define RM_REMOVING 2 -#define RM_END 3 - -const char *exe_name = NULL; - -struct rm_pg_t -{ - pg_num_t pg_num; - osd_num_t rm_osd_num; - std::set objects; - std::set::iterator obj_pos; - uint64_t obj_count = 0, obj_done = 0, obj_prev_done = 0; - int state = 0; - int in_flight = 0; -}; - -class rm_inode_t -{ -protected: - uint64_t inode = 0; - pool_id_t pool_id = 0; - uint64_t iodepth = 0, parallel_osds = 0; - inode_list_t *lister = NULL; - - ring_loop_t *ringloop = NULL; - epoll_manager_t *epmgr = NULL; - cluster_client_t *cli = NULL; - ring_consumer_t consumer; - - std::vector lists; - uint64_t total_count = 0, total_done = 0, total_prev_pct = 0; - uint64_t pgs_to_list = 0; - bool started = false; - bool progress = true; - bool list_first = false, lists_done = false; - int log_level = 0; - -public: - static json11::Json::object parse_args(int narg, const char *args[]) - { - json11::Json::object cfg; - cfg["progress"] = "1"; - for (int i = 1; i < narg; i++) - { - if (!strcmp(args[i], "-h") || !strcmp(args[i], "--help")) - { - help(); - } - else if (args[i][0] == '-' && args[i][1] == '-') - { - const char *opt = args[i]+2; - cfg[opt] = !strcmp(opt, "json") || !strcmp(opt, "wait-list") || i == narg-1 ? "1" : args[++i]; - } - } - return cfg; - } - - static void help() - { - printf( - "Vitastor inode removal tool\n" - "(c) Vitaliy Filippov, 2020 (VNPL-1.1)\n\n" - "USAGE:\n" - " %s [--etcd_address ] --pool --inode [--wait-list]\n", - exe_name - ); - exit(0); - } - - void run(json11::Json cfg) - { - inode = cfg["inode"].uint64_value(); - pool_id = cfg["pool"].uint64_value(); - if (pool_id) - inode = (inode & ((1l << (64-POOL_ID_BITS)) - 1)) | (((uint64_t)pool_id) << (64-POOL_ID_BITS)); - pool_id = INODE_POOL(inode); - if (!pool_id) - { - fprintf(stderr, "pool is missing"); - exit(1); - } - iodepth = cfg["iodepth"].uint64_value(); - if (!iodepth) - iodepth = 32; - parallel_osds = cfg["parallel_osds"].uint64_value(); - if (!parallel_osds) - parallel_osds = 4; - log_level = cfg["log_level"].int64_value(); - progress = cfg["progress"].uint64_value() ? true : false; - list_first = cfg["wait-list"].uint64_value() ? true : false; - // Create client - ringloop = new ring_loop_t(512); - epmgr = new epoll_manager_t(ringloop); - cli = new cluster_client_t(ringloop, epmgr->tfd, cfg); - cli->on_ready([this]() { start_delete(); }); - // Initialize job - consumer.loop = [this]() - { - if (started) - continue_delete(); - ringloop->submit(); - }; - ringloop->register_consumer(&consumer); - // Loop until it completes - while (1) - { - ringloop->loop(); - ringloop->wait(); - } - } - - void start_delete() - { - lister = cli->list_inode_start(inode, [this](std::set&& objects, pg_num_t pg_num, osd_num_t primary_osd, int status) - { - rm_pg_t *rm = new rm_pg_t({ - .pg_num = pg_num, - .rm_osd_num = primary_osd, - .objects = objects, - .obj_count = objects.size(), - .obj_done = 0, - .obj_prev_done = 0, - }); - rm->obj_pos = rm->objects.begin(); - lists.push_back(rm); - if (list_first) - { - cli->list_inode_next(lister, 1); - } - if (status & INODE_LIST_DONE) - { - lists_done = true; - } - pgs_to_list--; - continue_delete(); - }); - if (!lister) - { - fprintf(stderr, "Failed to list inode %lx objects\n", inode); - exit(1); - } - pgs_to_list = cli->list_pg_count(lister); - cli->list_inode_next(lister, parallel_osds); - started = true; - } - - void send_ops(rm_pg_t *cur_list) - { - if (cli->msgr.osd_peer_fds.find(cur_list->rm_osd_num) == - cli->msgr.osd_peer_fds.end()) - { - // Initiate connection - cli->msgr.connect_peer(cur_list->rm_osd_num, cli->st_cli.peer_states[cur_list->rm_osd_num]); - return; - } - while (cur_list->in_flight < iodepth && cur_list->obj_pos != cur_list->objects.end()) - { - osd_op_t *op = new osd_op_t(); - op->op_type = OSD_OP_OUT; - op->peer_fd = cli->msgr.osd_peer_fds[cur_list->rm_osd_num]; - op->req = (osd_any_op_t){ - .rw = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = cli->next_op_id(), - .opcode = OSD_OP_DELETE, - }, - .inode = cur_list->obj_pos->inode, - .offset = cur_list->obj_pos->stripe, - .len = 0, - }, - }; - op->callback = [this, cur_list](osd_op_t *op) - { - cur_list->in_flight--; - if (op->reply.hdr.retval < 0) - { - fprintf(stderr, "Failed to remove object %lx:%lx from PG %u (OSD %lu) (retval=%ld)\n", - op->req.rw.inode, op->req.rw.offset, - cur_list->pg_num, cur_list->rm_osd_num, op->reply.hdr.retval); - } - delete op; - cur_list->obj_done++; - total_done++; - continue_delete(); - }; - cur_list->obj_pos++; - cur_list->in_flight++; - cli->msgr.outbox_push(op); - } - } - - void continue_delete() - { - if (list_first && !lists_done) - { - return; - } - for (int i = 0; i < lists.size(); i++) - { - if (!lists[i]->in_flight && lists[i]->obj_pos == lists[i]->objects.end()) - { - delete lists[i]; - lists.erase(lists.begin()+i, lists.begin()+i+1); - i--; - if (!lists_done) - { - cli->list_inode_next(lister, 1); - } - } - else - { - send_ops(lists[i]); - } - } - if (progress && total_count > 0 && total_done*1000/total_count != total_prev_pct) - { - printf("\rRemoved %lu/%lu objects, %lu more PGs to list...", total_done, total_count, pgs_to_list); - total_prev_pct = total_done*1000/total_count; - } - if (lists_done && !lists.size()) - { - printf("Done, inode %lu in pool %u removed\n", (inode & ((1l << (64-POOL_ID_BITS)) - 1)), pool_id); - exit(0); - } - } -}; - -int main(int narg, const char *args[]) -{ - setvbuf(stdout, NULL, _IONBF, 0); - setvbuf(stderr, NULL, _IONBF, 0); - exe_name = args[0]; - rm_inode_t *p = new rm_inode_t(); - p->run(rm_inode_t::parse_args(narg, args)); - return 0; -} diff --git a/tests/test_rm.sh b/tests/test_rm.sh index edfe4ccc..c3c9fb0f 100755 --- a/tests/test_rm.sh +++ b/tests/test_rm.sh @@ -9,7 +9,6 @@ LD_PRELOAD=libasan.so.5 \ $ETCDCTL get --prefix '/vitastor/pg/state' -LD_PRELOAD=libasan.so.5 \ - build/src/vitastor-rm --etcd_address $ETCD_URL --pool 1 --inode 1 +build/src/vitastor-cmd rm --etcd_address $ETCD_URL --pool 1 --inode 1 format_green OK diff --git a/tests/test_snapshot.sh b/tests/test_snapshot.sh index 4723640f..9e8b3bcf 100755 --- a/tests/test_snapshot.sh +++ b/tests/test_snapshot.sh @@ -38,4 +38,18 @@ node mon/merge.js ./testdata/layer0.bin ./testdata/layer1.bin ./testdata/check.b cmp ./testdata/merged.bin ./testdata/check.bin +# Test merge + +$ETCDCTL put /vitastor/config/inode/1/3 '{"parent_id":2,"name":"testimg","size":'$((32*1024*1024))'}' + +build/src/vitastor-cmd merge --etcd_address $ETCD_URL testimg@0 testimg --target testimg + +$ETCDCTL put /vitastor/config/inode/1/3 '{"name":"testimg","size":'$((32*1024*1024))'}' + +qemu-img convert -S 4096 -p \ + -f raw "vitastor:etcd_host=127.0.0.1\:$ETCD_PORT/v3:image=testimg" \ + -O raw ./testdata/merged-by-tool.bin + +cmp ./testdata/merged.bin ./testdata/merged-by-tool.bin + format_green OK