7 Commits

Author SHA1 Message Date
Vitaliy Filippov dfdf5c1f9c Fix comments in mon.js 3 days ago
Vitaliy Filippov aad7792d3f Check for loops in parent inode chains 3 days ago
Vitaliy Filippov 6ca8afffe5 Add CAS version parameter to the C wrapper 4 days ago
Vitaliy Filippov 511a89948b Rework qemu_proxy into a C wrapper library with public header 4 days ago
Vitaliy Filippov 3de553ecd7 Add a test for CAS write operation 1 week ago
Vitaliy Filippov 9c45d43e74 Extract common 3 OSD code from several test scripts 1 week ago
Vitaliy Filippov 891250d355 Implement CAS writes 1 week ago
  1. 14
      mon/mon.js
  2. 1
      rpm/vitastor-el7.spec
  3. 1
      rpm/vitastor-el8.spec
  4. 120
      src/CMakeLists.txt
  5. 29
      src/cluster_client.cpp
  6. 3
      src/cluster_client.h
  7. 133
      src/fio_cluster.cpp
  8. 2
      src/messenger.cpp
  9. 5
      src/osd_ops.h
  10. 20
      src/osd_primary.cpp
  11. 14
      src/osd_primary_chain.cpp
  12. 8
      src/osd_primary_write.cpp
  13. 42
      src/qemu_driver.c
  14. 177
      src/qemu_proxy.cpp
  15. 35
      src/qemu_proxy.h
  16. 135
      src/test_cas.cpp
  17. 254
      src/vitastor_c.cpp
  18. 55
      src/vitastor_c.h
  19. 43
      tests/run_3osds.sh
  20. 7
      tests/test_cas.sh
  21. 36
      tests/test_snapshot.sh
  22. 36
      tests/test_vm_start.sh
  23. 36
      tests/test_write.sh
  24. 36
      tests/test_write_no_same.sh

14
mon/mon.js

@ -42,7 +42,7 @@ const etcd_tree = {
config: {
/* global: {
// WARNING: NOT ALL OF THESE ARE ACTUALLY CONFIGURABLE HERE
// THIS IS JUST A POOR'S MAN CONFIG DOCUMENTATION
// THIS IS JUST A POOR MAN'S CONFIG DOCUMENTATION
// etcd connection
config_path: "/etc/vitastor/vitastor.conf",
etcd_address: "10.0.115.10:2379/v3",
@ -257,11 +257,13 @@ const etcd_tree = {
},
inode: {
stats: {
/* <inode_t>: {
raw_used: uint64_t, // raw used bytes on OSDs
read: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
write: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
delete: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
/* <pool_id>: {
<inode_t>: {
raw_used: uint64_t, // raw used bytes on OSDs
read: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
write: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
delete: { count: uint64_t, usec: uint64_t, bytes: uint64_t },
},
}, */
},
},

1
rpm/vitastor-el7.spec

@ -64,6 +64,7 @@ cp -r mon %buildroot/usr/lib/vitastor/mon
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
%_includedir/vitastor_c.h
/usr/lib/vitastor

1
rpm/vitastor-el8.spec

@ -61,6 +61,7 @@ cp -r mon %buildroot/usr/lib/vitastor
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
%_includedir/vitastor_c.h
/usr/lib/vitastor

120
src/CMakeLists.txt

@ -4,6 +4,8 @@ project(vitastor)
include(GNUInstallDirs)
set(WITH_QEMU true CACHE BOOL "Build QEMU driver")
set(WITH_FIO true CACHE BOOL "Build FIO driver")
set(QEMU_PLUGINDIR qemu CACHE STRING "QEMU plugin directory suffix (qemu-kvm on RHEL)")
set(WITH_ASAN false CACHE BOOL "Build with AddressSanitizer")
if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
@ -36,7 +38,9 @@ string(REGEX REPLACE "([\\/\\-]D) *NDEBUG" "" CMAKE_C_FLAGS_RELWITHDEBINFO "${CM
find_package(PkgConfig)
pkg_check_modules(LIBURING REQUIRED liburing)
pkg_check_modules(GLIB REQUIRED glib-2.0)
if (${WITH_QEMU})
pkg_check_modules(GLIB REQUIRED glib-2.0)
endif (${WITH_QEMU})
pkg_check_modules(IBVERBS libibverbs)
if (IBVERBS_LIBRARIES)
add_definitions(-DWITH_RDMA)
@ -62,14 +66,16 @@ target_link_libraries(vitastor_blk
)
set_target_properties(vitastor_blk PROPERTIES VERSION ${VERSION} SOVERSION 0)
# libfio_vitastor_blk.so
add_library(fio_vitastor_blk SHARED
fio_engine.cpp
../json11/json11.cpp
)
target_link_libraries(fio_vitastor_blk
vitastor_blk
)
if (${WITH_FIO})
# libfio_vitastor_blk.so
add_library(fio_vitastor_blk SHARED
fio_engine.cpp
../json11/json11.cpp
)
target_link_libraries(fio_vitastor_blk
vitastor_blk
)
endif (${WITH_FIO})
# libvitastor_common.a
set(MSGR_RDMA "")
@ -96,19 +102,23 @@ target_link_libraries(vitastor-osd
${IBVERBS_LIBRARIES}
)
# libfio_vitastor_sec.so
add_library(fio_vitastor_sec SHARED
fio_sec_osd.cpp
rw_blocking.cpp
)
target_link_libraries(fio_vitastor_sec
tcmalloc_minimal
)
if (${WITH_FIO})
# libfio_vitastor_sec.so
add_library(fio_vitastor_sec SHARED
fio_sec_osd.cpp
rw_blocking.cpp
)
target_link_libraries(fio_vitastor_sec
tcmalloc_minimal
)
endif (${WITH_FIO})
# libvitastor_client.so
add_library(vitastor_client SHARED
cluster_client.cpp
vitastor_c.cpp
)
set_target_properties(vitastor_client PROPERTIES PUBLIC_HEADER "vitastor_c.h")
target_link_libraries(vitastor_client
vitastor_common
tcmalloc_minimal
@ -117,13 +127,15 @@ target_link_libraries(vitastor_client
)
set_target_properties(vitastor_client PROPERTIES VERSION ${VERSION} SOVERSION 0)
# libfio_vitastor.so
add_library(fio_vitastor SHARED
fio_cluster.cpp
)
target_link_libraries(fio_vitastor
vitastor_client
)
if (${WITH_FIO})
# libfio_vitastor.so
add_library(fio_vitastor SHARED
fio_cluster.cpp
)
target_link_libraries(fio_vitastor
vitastor_client
)
endif (${WITH_FIO})
# vitastor-nbd
add_executable(vitastor-nbd
@ -146,27 +158,24 @@ add_executable(vitastor-dump-journal
dump_journal.cpp crc32c.c
)
# qemu_driver.so
add_library(qemu_proxy STATIC qemu_proxy.cpp)
target_compile_options(qemu_proxy PUBLIC -fPIC)
target_include_directories(qemu_proxy PUBLIC
../qemu/b/qemu
../qemu/include
${GLIB_INCLUDE_DIRS}
)
target_link_libraries(qemu_proxy
vitastor_client
)
add_library(qemu_vitastor SHARED
qemu_driver.c
)
target_link_libraries(qemu_vitastor
qemu_proxy
)
set_target_properties(qemu_vitastor PROPERTIES
PREFIX ""
OUTPUT_NAME "block-vitastor"
)
if (${WITH_QEMU})
# qemu_driver.so
add_library(qemu_vitastor SHARED
qemu_driver.c
)
target_include_directories(qemu_vitastor PUBLIC
../qemu/b/qemu
../qemu/include
${GLIB_INCLUDE_DIRS}
)
target_link_libraries(qemu_vitastor
vitastor_client
)
set_target_properties(qemu_vitastor PROPERTIES
PREFIX ""
OUTPUT_NAME "block-vitastor"
)
endif (${WITH_QEMU})
### Test stubs
@ -200,6 +209,14 @@ target_link_libraries(osd_peering_pg_test tcmalloc_minimal)
# test_allocator
add_executable(test_allocator test_allocator.cpp allocator.cpp)
# test_cas
add_executable(test_cas
test_cas.cpp
)
target_link_libraries(test_cas
vitastor_client
)
# test_cluster_client
add_executable(test_cluster_client
test_cluster_client.cpp
@ -218,5 +235,14 @@ target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mo
### Install
install(TARGETS vitastor-osd vitastor-dump-journal vitastor-nbd vitastor-rm RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
install(TARGETS fio_vitastor fio_vitastor_blk fio_vitastor_sec vitastor_blk vitastor_client LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
install(TARGETS qemu_vitastor LIBRARY DESTINATION /usr/${CMAKE_INSTALL_LIBDIR}/${QEMU_PLUGINDIR})
install(
TARGETS vitastor_blk vitastor_client
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
)
if (${WITH_FIO})
install(TARGETS fio_vitastor fio_vitastor_blk fio_vitastor_sec LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif (${WITH_FIO})
if (${WITH_QEMU})
install(TARGETS qemu_vitastor LIBRARY DESTINATION /usr/${CMAKE_INSTALL_LIBDIR}/${QEMU_PLUGINDIR})
endif (${WITH_QEMU})

29
src/cluster_client.cpp

@ -633,6 +633,13 @@ resume_1:
// Slice the operation into parts
slice_rw(op);
op->needs_reslice = false;
if (op->opcode == OSD_OP_WRITE && op->version && op->parts.size() > 1)
{
// Atomic writes to multiple stripes are unsupported
op->retval = -EINVAL;
erase_op(op);
return 1;
}
resume_2:
// Send unsent parts, if they're not subject to change
op->state = 3;
@ -688,13 +695,16 @@ resume_3:
// Check parent inode
auto ino_it = st_cli.inode_config.find(op->cur_inode);
while (ino_it != st_cli.inode_config.end() && ino_it->second.parent_id &&
INODE_POOL(ino_it->second.parent_id) == INODE_POOL(op->cur_inode))
INODE_POOL(ino_it->second.parent_id) == INODE_POOL(op->cur_inode) &&
// Check for loops
ino_it->second.parent_id != op->inode)
{
// Skip parents from the same pool
ino_it = st_cli.inode_config.find(ino_it->second.parent_id);
}
if (ino_it != st_cli.inode_config.end() &&
ino_it->second.parent_id)
ino_it->second.parent_id &&
ino_it->second.parent_id != op->inode)
{
// Continue reading from the parent inode
op->cur_inode = ino_it->second.parent_id;
@ -922,6 +932,7 @@ bool cluster_client_t::try_send(cluster_op_t *op, int i)
.offset = part->offset,
.len = part->len,
.meta_revision = meta_rev,
.version = op->opcode == OSD_OP_WRITE ? op->version : 0,
} },
.bitmap = op->opcode == OSD_OP_WRITE ? NULL : op->part_bitmaps + pg_bitmap_size*i,
.bitmap_len = (unsigned)(op->opcode == OSD_OP_WRITE ? 0 : pg_bitmap_size),
@ -1072,10 +1083,6 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
if (part->op.reply.hdr.retval != expected)
{
// Operation failed, retry
fprintf(
stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected
);
if (part->op.reply.hdr.retval == -EPIPE)
{
// Mark op->up_wait = true before stopping the client
@ -1094,7 +1101,14 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
// Don't overwrite other errors with -EPIPE
op->retval = part->op.reply.hdr.retval;
}
msgr.stop_client(part->op.peer_fd);
if (op->retval != -EINTR && op->retval != -EIO)
{
fprintf(
stderr, "%s operation failed on OSD %lu: retval=%ld (expected %d), dropping connection\n",
osd_op_names[part->op.req.hdr.opcode], part->osd_num, part->op.reply.hdr.retval, expected
);
msgr.stop_client(part->op.peer_fd);
}
part->flags |= PART_ERROR;
}
else
@ -1106,6 +1120,7 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
if (op->opcode == OSD_OP_READ)
{
copy_part_bitmap(op, part);
op->version = op->parts.size() == 1 ? part->op.reply.rw.version : 0;
}
}
if (op->inflight_count == 0)

3
src/cluster_client.h

@ -31,6 +31,9 @@ struct cluster_op_t
uint64_t inode;
uint64_t offset;
uint64_t len;
// for reads and writes within a single object (stripe),
// reads can return current version and writes can use "CAS" semantics
uint64_t version = 0;
int retval;
osd_op_buf_list_t iov;
std::function<void(cluster_op_t*)> callback;

133
src/fio_cluster.cpp

@ -25,20 +25,17 @@
#include <vector>
#include "epoll_manager.h"
#include "cluster_client.h"
#include "vitastor_c.h"
#include "fio_headers.h"
struct sec_data
{
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
inode_watch_t *watch = NULL;
vitastor_c *cli = NULL;
void *watch = NULL;
bool last_sync = false;
/* The list of completed io_u structs. */
std::vector<io_u*> completed;
uint64_t op_n = 0, inflight = 0;
uint64_t inflight = 0;
bool trace = false;
};
@ -189,6 +186,12 @@ static struct fio_option options[] = {
},
};
static void watch_callback(void *opaque, long watch)
{
struct sec_data *bsd = (struct sec_data*)opaque;
bsd->watch = (void*)watch;
}
static int sec_setup(struct thread_data *td)
{
sec_options *o = (sec_options*)td->eo;
@ -209,27 +212,6 @@ static int sec_setup(struct thread_data *td)
td->o.open_files++;
}
json11::Json::object cfg;
if (o->config_path)
cfg["config_path"] = std::string(o->config_path);
if (o->etcd_host)
cfg["etcd_address"] = std::string(o->etcd_host);
if (o->etcd_prefix)
cfg["etcd_prefix"] = std::string(o->etcd_prefix);
if (o->rdma_device)
cfg["rdma_device"] = std::string(o->rdma_device);
if (o->rdma_port_num)
cfg["rdma_port_num"] = o->rdma_port_num;
if (o->rdma_gid_index)
cfg["rdma_gid_index"] = o->rdma_gid_index;
if (o->rdma_mtu)
cfg["rdma_mtu"] = o->rdma_mtu;
if (o->cluster_log)
cfg["log_level"] = o->cluster_log;
if (o->use_rdma != -1)
cfg["use_rdma"] = o->use_rdma;
json11::Json cfg_json(cfg);
if (!o->image)
{
if (!(o->inode & ((1l << (64-POOL_ID_BITS)) - 1)))
@ -251,20 +233,20 @@ static int sec_setup(struct thread_data *td)
{
o->inode = 0;
}
bsd->ringloop = new ring_loop_t(512);
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg_json);
bsd->cli = vitastor_c_create_uring(o->config_path, o->etcd_host, o->etcd_prefix,
o->use_rdma, o->rdma_device, o->rdma_port_num, o->rdma_gid_index, o->rdma_mtu, o->cluster_log);
if (o->image)
{
while (!bsd->cli->is_ready())
bsd->watch = NULL;
vitastor_c_watch_inode(bsd->cli, o->image, watch_callback, bsd);
while (true)
{
bsd->ringloop->loop();
if (bsd->cli->is_ready())
vitastor_c_uring_handle_events(bsd->cli);
if (bsd->watch)
break;
bsd->ringloop->wait();
vitastor_c_uring_wait_events(bsd->cli);
}
bsd->watch = bsd->cli->st_cli.watch_inode(std::string(o->image));
td->files[0]->real_file_size = bsd->watch->cfg.size;
td->files[0]->real_file_size = vitastor_c_inode_get_size(bsd->watch);
}
bsd->trace = o->trace ? true : false;
@ -279,11 +261,9 @@ static void sec_cleanup(struct thread_data *td)
{
if (bsd->watch)
{
bsd->cli->st_cli.close_watch(bsd->watch);
vitastor_c_close_watch(bsd->cli, bsd->watch);
}
delete bsd->cli;
delete bsd->epmgr;
delete bsd->ringloop;
vitastor_c_destroy(bsd->cli);
delete bsd;
}
}
@ -294,12 +274,31 @@ static int sec_init(struct thread_data *td)
return 0;
}
static void io_callback(void *opaque, long retval)
{
struct io_u *io = (struct io_u*)opaque;
io->error = retval < 0 ? -retval : 0;
sec_data *bsd = (sec_data*)io->engine_data;
bsd->inflight--;
bsd->completed.push_back(io);
if (bsd->trace)
{
printf("--- %s 0x%lx retval=%ld\n", io->ddir == DDIR_READ ? "READ" :
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), (uint64_t)io, retval);
}
}
static void read_callback(void *opaque, long retval, uint64_t version)
{
io_callback(opaque, retval);
}
/* Begin read or write request. */
static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
{
sec_options *opt = (sec_options*)td->eo;
sec_data *bsd = (sec_data*)td->io_ops_data;
int n = bsd->op_n;
struct iovec iov;
fio_ro_check(td, io);
if (io->ddir == DDIR_SYNC && bsd->last_sync)
@ -308,32 +307,29 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
}
io->engine_data = bsd;
cluster_op_t *op = new cluster_op_t;
io->error = 0;
bsd->inflight++;
op->inode = opt->image ? bsd->watch->cfg.num : opt->inode;
uint64_t inode = opt->image ? vitastor_c_inode_get_num(bsd->watch) : opt->inode;
switch (io->ddir)
{
case DDIR_READ:
op->opcode = OSD_OP_READ;
op->offset = io->offset;
op->len = io->xfer_buflen;
op->iov.push_back(io->xfer_buf, io->xfer_buflen);
iov = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
vitastor_c_read(bsd->cli, inode, io->offset, io->xfer_buflen, &iov, 1, read_callback, io);
bsd->last_sync = false;
break;
case DDIR_WRITE:
if (opt->image && bsd->watch->cfg.readonly)
if (opt->image && vitastor_c_inode_get_readonly(bsd->watch))
{
io->error = EROFS;
return FIO_Q_COMPLETED;
}
op->opcode = OSD_OP_WRITE;
op->offset = io->offset;
op->len = io->xfer_buflen;
op->iov.push_back(io->xfer_buf, io->xfer_buflen);
iov = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
vitastor_c_write(bsd->cli, inode, io->offset, io->xfer_buflen, 0, &iov, 1, io_callback, io);
bsd->last_sync = false;
break;
case DDIR_SYNC:
op->opcode = OSD_OP_SYNC;
vitastor_c_sync(bsd->cli, io_callback, io);
bsd->last_sync = true;
break;
default:
@ -341,39 +337,20 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
return FIO_Q_COMPLETED;
}
op->callback = [io, n](cluster_op_t *op)
{
io->error = op->retval < 0 ? -op->retval : 0;
sec_data *bsd = (sec_data*)io->engine_data;
bsd->inflight--;
bsd->completed.push_back(io);
if (bsd->trace)
{
printf("--- %s n=%d retval=%d\n", io->ddir == DDIR_READ ? "READ" :
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n, op->retval);
}
delete op;
};
if (opt->trace)
{
if (io->ddir == DDIR_SYNC)
{
printf("+++ SYNC # %d\n", n);
printf("+++ SYNC 0x%lx\n", (uint64_t)io);
}
else
{
printf("+++ %s # %d 0x%llx+%llx\n",
printf("+++ %s 0x%lx 0x%llx+%llx\n",
io->ddir == DDIR_READ ? "READ" : "WRITE",
n, io->offset, io->xfer_buflen);
(uint64_t)io, io->offset, io->xfer_buflen);
}
}
io->error = 0;
bsd->inflight++;
bsd->op_n++;
bsd->cli->execute(op);
if (io->error != 0)
return FIO_Q_COMPLETED;
return FIO_Q_QUEUED;
@ -384,10 +361,10 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
sec_data *bsd = (sec_data*)td->io_ops_data;
while (true)
{
bsd->ringloop->loop();
vitastor_c_uring_handle_events(bsd->cli);
if (bsd->completed.size() >= min)
break;
bsd->ringloop->wait();
vitastor_c_uring_wait_events(bsd->cli);
}
return bsd->completed.size();
}

2
src/messenger.cpp

@ -261,7 +261,7 @@ void osd_messenger_t::try_connect_peer_addr(osd_num_t peer_osd, const char *peer
{
osd_num_t peer_osd = clients.at(peer_fd)->osd_num;
stop_client(peer_fd, true);
on_connect_peer(peer_osd, -EIO);
on_connect_peer(peer_osd, -EPIPE);
return;
});
}

5
src/osd_ops.h

@ -191,6 +191,9 @@ struct __attribute__((__packed__)) osd_op_rw_t
uint32_t flags;
// inode metadata revision
uint64_t meta_revision;
// object version for atomic "CAS" (compare-and-set) writes
// writes and deletes fail with -EINTR if object version differs from (version-1)
uint64_t version;
};
struct __attribute__((__packed__)) osd_reply_rw_t
@ -199,6 +202,8 @@ struct __attribute__((__packed__)) osd_reply_rw_t
// for reads: bitmap length
uint32_t bitmap_len;
uint32_t pad0;
// for reads: object version
uint64_t version;
};
// sync to the primary OSD

20
src/osd_primary.cpp

@ -67,7 +67,9 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
}
// Find parents from the same pool. Optimized reads only work within pools
while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id &&
INODE_POOL(inode_it->second.parent_id) == pg_it->second.pool_id)
INODE_POOL(inode_it->second.parent_id) == pg_it->second.pool_id &&
// Check for loops
inode_it->second.parent_id != cur_op->req.rw.inode)
{
chain_size++;
inode_it = st_cli.inode_config.find(inode_it->second.parent_id);
@ -123,7 +125,10 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
int chain_num = 0;
op_data->read_chain[chain_num++] = cur_op->req.rw.inode;
auto inode_it = st_cli.inode_config.find(cur_op->req.rw.inode);
while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id)
while (inode_it != st_cli.inode_config.end() && inode_it->second.parent_id &&
INODE_POOL(inode_it->second.parent_id) == pg_it->second.pool_id &&
// Check for loops
inode_it->second.parent_id != cur_op->req.rw.inode)
{
op_data->read_chain[chain_num++] = inode_it->second.parent_id;
inode_it = st_cli.inode_config.find(inode_it->second.parent_id);
@ -222,6 +227,7 @@ resume_2:
finish_op(cur_op, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
cur_op->reply.rw.version = op_data->fact_ver;
cur_op->reply.rw.bitmap_len = op_data->pg_data_size * clean_entry_bitmap_size;
if (op_data->degraded)
{
@ -343,6 +349,12 @@ resume_3:
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
// Check CAS version
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
{
cur_op->reply.hdr.retval = -EINTR;
goto continue_others;
}
// Save version override for parallel reads
pg.ver_override[op_data->oid] = op_data->fact_ver;
// Submit deletes
@ -370,6 +382,8 @@ resume_5:
free_object_state(pg, &op_data->object_state);
}
pg.total_count--;
cur_op->reply.hdr.retval = 0;
continue_others:
osd_op_t *next_op = NULL;
auto next_it = pg.write_queue.find(op_data->oid);
if (next_it != pg.write_queue.end() && next_it->second == cur_op)
@ -378,7 +392,7 @@ resume_5:
if (next_it != pg.write_queue.end() && next_it->first == op_data->oid)
next_op = next_it->second;
}
finish_op(cur_op, cur_op->req.rw.len);
finish_op(cur_op, cur_op->reply.hdr.retval);
if (next_op)
{
// Continue next write to the same object

14
src/osd_primary_chain.cpp

@ -65,7 +65,10 @@ int osd_t::read_bitmaps(osd_op_t *cur_op, pg_t & pg, int base_state)
auto vo_it = pg.ver_override.find(cur_oid);
auto read_version = (vo_it != pg.ver_override.end() ? vo_it->second : UINT64_MAX);
// Read bitmap synchronously from the local database
bs->read_bitmap(cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size, NULL);
bs->read_bitmap(
cur_oid, read_version, op_data->snapshot_bitmaps + chain_num*clean_entry_bitmap_size,
!chain_num ? &cur_op->reply.rw.version : NULL
);
}
}
else
@ -228,7 +231,10 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
// Read bitmap synchronously from the local database
for (int j = prev; j <= i; j++)
{
bs->read_bitmap((*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf, NULL);
bs->read_bitmap(
(*bitmap_requests)[j].oid, (*bitmap_requests)[j].version, (*bitmap_requests)[j].bmp_buf,
(*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode ? &cur_op->reply.rw.version : NULL
);
}
}
else
@ -264,6 +270,10 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg)
for (int j = prev; j <= i; j++)
{
memcpy((*bitmap_requests)[j].bmp_buf, cur_buf, clean_entry_bitmap_size);
if ((*bitmap_requests)[j].oid.inode == cur_op->req.rw.inode)
{
memcpy(&cur_op->reply.rw.version, cur_buf-8, 8);
}
cur_buf += 8 + clean_entry_bitmap_size;
}
}

8
src/osd_primary_write.cpp

@ -96,6 +96,12 @@ resume_3:
pg_cancel_write_queue(pg, cur_op, op_data->oid, op_data->epipe > 0 ? -EPIPE : -EIO);
return;
}
// Check CAS version
if (cur_op->req.rw.version && op_data->fact_ver != (cur_op->req.rw.version-1))
{
cur_op->reply.hdr.retval = -EINTR;
goto continue_others;
}
if (op_data->scheme == POOL_SCHEME_REPLICATED)
{
// Set bitmap bits
@ -265,7 +271,7 @@ continue_others:
next_op = next_it->second;
}
// finish_op would invalidate next_it if it cleared pg.write_queue, but it doesn't do that :)
finish_op(cur_op, cur_op->req.rw.len);
finish_op(cur_op, cur_op->reply.hdr.retval);
if (next_op)
{
// Continue next write to the same object

42
src/qemu_driver.c

@ -26,7 +26,7 @@
#define qobject_unref QDECREF
#endif
#include "qemu_proxy.h"
#include "vitastor_c.h"
void qemu_module_dummy(void)
{
@ -48,6 +48,7 @@ typedef struct VitastorClient
uint64_t pool;
uint64_t size;
long readonly;
int use_rdma;
char *rdma_device;
int rdma_port_num;
int rdma_gid_index;
@ -65,7 +66,8 @@ typedef struct VitastorRPC
} VitastorRPC;
static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task);
static void vitastor_co_generic_bh_cb(long retval, void *opaque);
static void vitastor_co_generic_bh_cb(void *opaque, long retval);
static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version);
static void vitastor_close(BlockDriverState *bs);
static char *qemu_rbd_next_tok(char *src, char delim, char **p)
@ -132,6 +134,7 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error
if (!strcmp(name, "inode") ||
!strcmp(name, "pool") ||
!strcmp(name, "size") ||
!strcmp(name, "use_rdma") ||
!strcmp(name, "rdma_port_num") ||
!strcmp(name, "rdma_gid_index") ||
!strcmp(name, "rdma_mtu"))
@ -181,7 +184,7 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
task->co = qemu_coroutine_self();
qemu_mutex_lock(&client->mutex);
vitastor_proxy_watch_metadata(client->proxy, client->image, vitastor_co_generic_bh_cb, task);
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_bh_cb, task);
qemu_mutex_unlock(&client->mutex);
while (!task->complete)
@ -198,13 +201,14 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
client->config_path = g_strdup(qdict_get_try_str(options, "config_path"));
client->etcd_host = g_strdup(qdict_get_try_str(options, "etcd_host"));
client->etcd_prefix = g_strdup(qdict_get_try_str(options, "etcd_prefix"));
client->use_rdma = qdict_get_try_int(options, "use_rdma", -1);
client->rdma_device = g_strdup(qdict_get_try_str(options, "rdma_device"));
client->rdma_port_num = qdict_get_try_int(options, "rdma_port_num", 0);
client->rdma_gid_index = qdict_get_try_int(options, "rdma_gid_index", 0);
client->rdma_mtu = qdict_get_try_int(options, "rdma_mtu", 0);
client->proxy = vitastor_proxy_create(
bdrv_get_aio_context(bs), client->config_path, client->etcd_host, client->etcd_prefix,
client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu
client->proxy = vitastor_c_create_qemu(
(QEMUSetFDHandler*)aio_set_fd_handler, bdrv_get_aio_context(bs), client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
client->image = g_strdup(qdict_get_try_str(options, "image"));
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
@ -224,9 +228,9 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
BDRV_POLL_WHILE(bs, !task.complete);
client->watch = (void*)task.ret;
client->readonly = client->readonly || vitastor_proxy_get_readonly(client->watch);
client->size = vitastor_proxy_get_size(client->watch);
if (!vitastor_proxy_get_inode_num(client->watch))
client->readonly = client->readonly || vitastor_c_inode_get_readonly(client->watch);
client->size = vitastor_c_inode_get_size(client->watch);
if (!vitastor_c_inode_get_num(client->watch))
{
error_setg(errp, "image does not exist");
vitastor_close(bs);
@ -255,6 +259,7 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
bs->total_sectors = client->size / BDRV_SECTOR_SIZE;
//client->aio_context = bdrv_get_aio_context(bs);
qdict_del(options, "use_rdma");
qdict_del(options, "rdma_mtu");
qdict_del(options, "rdma_gid_index");
qdict_del(options, "rdma_port_num");
@ -272,7 +277,7 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
static void vitastor_close(BlockDriverState *bs)
{
VitastorClient *client = bs->opaque;
vitastor_proxy_destroy(client->proxy);
vitastor_c_destroy(client->proxy);
qemu_mutex_destroy(&client->mutex);
if (client->config_path)
g_free(client->config_path);
@ -387,7 +392,7 @@ static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task)
};
}
static void vitastor_co_generic_bh_cb(long retval, void *opaque)
static void vitastor_co_generic_bh_cb(void *opaque, long retval)
{
VitastorRPC *task = opaque;
task->ret = retval;
@ -403,6 +408,11 @@ static void vitastor_co_generic_bh_cb(long retval, void *opaque)
}
}
static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version)
{
vitastor_co_generic_bh_cb(opaque, retval);
}
static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *iov, int flags)
{
VitastorClient *client = bs->opaque;
@ -410,9 +420,9 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs, uint64_t offset
vitastor_co_init_task(bs, &task);
task.iov = iov;
uint64_t inode = client->watch ? vitastor_proxy_get_inode_num(client->watch) : client->inode;
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_proxy_rw(0, client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task);
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -430,9 +440,9 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs, uint64_t offse
vitastor_co_init_task(bs, &task);
task.iov = iov;
uint64_t inode = client->watch ? vitastor_proxy_get_inode_num(client->watch) : client->inode;
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_proxy_rw(1, client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task);
vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -462,7 +472,7 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs)
vitastor_co_init_task(bs, &task);
qemu_mutex_lock(&client->mutex);
vitastor_proxy_sync(client->proxy, vitastor_co_generic_bh_cb, &task);
vitastor_c_sync(client->proxy, vitastor_co_generic_bh_cb, &task);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)

177
src/qemu_proxy.cpp

@ -1,177 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
// C-C++ proxy for the QEMU driver
// (QEMU headers don't compile with g++)
#include <sys/epoll.h>
#include "cluster_client.h"
typedef void* AioContext;
#include "qemu_proxy.h"
extern "C"
{
// QEMU
typedef void IOHandler(void *opaque);
void aio_set_fd_handler(AioContext *ctx, int fd, int is_external, IOHandler *fd_read, IOHandler *fd_write, void *poll_fn, void *opaque);
}
struct QemuProxyData
{
int fd;
std::function<void(int, int)> callback;
};
class QemuProxy
{
std::map<int, QemuProxyData> handlers;
public:
timerfd_manager_t *tfd;
cluster_client_t *cli;
AioContext *ctx;
QemuProxy(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
{
this->ctx = ctx;
json11::Json::object cfg;
if (config_path)
cfg["config_path"] = std::string(config_path);
if (etcd_host)
cfg["etcd_address"] = std::string(etcd_host);
if (etcd_prefix)
cfg["etcd_prefix"] = std::string(etcd_prefix);
if (rdma_device)
cfg["rdma_device"] = std::string(rdma_device);
if (rdma_port_num)
cfg["rdma_port_num"] = rdma_port_num;
if (rdma_gid_index)
cfg["rdma_gid_index"] = rdma_gid_index;
if (rdma_mtu)
cfg["rdma_mtu"] = rdma_mtu;
json11::Json cfg_json(cfg);
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> callback) { set_fd_handler(fd, wr, callback); });
cli = new cluster_client_t(NULL, tfd, cfg_json);
}
~QemuProxy()
{
delete cli;
delete tfd;
}
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> callback)
{
if (callback != NULL)
{
handlers[fd] = { .fd = fd, .callback = callback };
aio_set_fd_handler(ctx, fd, false, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, NULL, &handlers[fd]);
}
else
{
handlers.erase(fd);
aio_set_fd_handler(ctx, fd, false, NULL, NULL, NULL, NULL);
}
}
static void read_handler(void *opaque)
{
QemuProxyData *data = (QemuProxyData *)opaque;
data->callback(data->fd, EPOLLIN);
}
static void write_handler(void *opaque)
{
QemuProxyData *data = (QemuProxyData *)opaque;
data->callback(data->fd, EPOLLOUT);
}
};
extern "C" {
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
{
QemuProxy *p = new QemuProxy(ctx, config_path, etcd_host, etcd_prefix, rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu);
return p;
}
void vitastor_proxy_destroy(void *client)
{
QemuProxy *p = (QemuProxy*)client;
delete p;
}
void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque)
{
QemuProxy *p = (QemuProxy*)client;
cluster_op_t *op = new cluster_op_t;
op->opcode = write ? OSD_OP_WRITE : OSD_OP_READ;
op->inode = inode;
op->offset = offset;
op->len = len;
for (int i = 0; i < iovcnt; i++)
{
op->iov.push_back(iov[i].iov_base, iov[i].iov_len);
}
op->callback = [cb, opaque](cluster_op_t *op)
{
cb(op->retval, opaque);
delete op;
};
p->cli->execute(op);
}
void vitastor_proxy_sync(void *client, VitastorIOHandler cb, void *opaque)
{
QemuProxy *p = (QemuProxy*)client;
cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_SYNC;
op->callback = [cb, opaque](cluster_op_t *op)
{
cb(op->retval, opaque);
delete op;
};
p->cli->execute(op);
}
void vitastor_proxy_watch_metadata(void *client, char *image, VitastorIOHandler cb, void *opaque)
{
QemuProxy *p = (QemuProxy*)client;
p->cli->on_ready([=]()
{
auto watch = p->cli->st_cli.watch_inode(std::string(image));
cb((long)watch, opaque);
});
}
void vitastor_proxy_close_watch(void *client, void *watch)
{
QemuProxy *p = (QemuProxy*)client;
p->cli->st_cli.close_watch((inode_watch_t*)watch);
}
uint64_t vitastor_proxy_get_size(void *watch_ptr)
{
inode_watch_t *watch = (inode_watch_t*)watch_ptr;
return watch->cfg.size;
}
uint64_t vitastor_proxy_get_inode_num(void *watch_ptr)
{
inode_watch_t *watch = (inode_watch_t*)watch_ptr;
return watch->cfg.num;
}
int vitastor_proxy_get_readonly(void *watch_ptr)
{
inode_watch_t *watch = (inode_watch_t*)watch_ptr;
return watch->cfg.readonly;
}
}

35
src/qemu_proxy.h

@ -1,35 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#ifndef VITASTOR_QEMU_PROXY_H
#define VITASTOR_QEMU_PROXY_H
#ifndef POOL_ID_BITS
#define POOL_ID_BITS 16
#endif
#include <stdint.h>
#include <sys/uio.h>
#ifdef __cplusplus
extern "C" {
#endif
// Our exports
typedef void VitastorIOHandler(long retval, void *opaque);
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu);
void vitastor_proxy_destroy(void *client);
void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque);
void vitastor_proxy_sync(void *client, VitastorIOHandler cb, void *opaque);
void vitastor_proxy_watch_metadata(void *client, char *image, VitastorIOHandler cb, void *opaque);
void vitastor_proxy_close_watch(void *client, void *watch);
uint64_t vitastor_proxy_get_size(void *watch);
uint64_t vitastor_proxy_get_inode_num(void *watch);
int vitastor_proxy_get_readonly(void *watch);
#ifdef __cplusplus
}
#endif
#endif

135
src/test_cas.cpp

@ -0,0 +1,135 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
#include <stdio.h>
#include <stdlib.h>
#include "epoll_manager.h"
#include "cluster_client.h"
void send_read(cluster_client_t *cli, uint64_t inode, std::function<void(int, uint64_t)> cb)
{
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_READ;
op->inode = inode;
op->offset = 0;
op->len = 4096;
op->iov.push_back(malloc_or_die(op->len), op->len);
op->callback = [cb](cluster_op_t *op)
{
uint64_t version = op->version;
int retval = op->retval;
if (retval == op->len)
retval = 0;
free(op->iov.buf[0].iov_base);
delete op;
if (cb != NULL)
cb(retval, version);
};
cli->execute(op);
}
void send_write(cluster_client_t *cli, uint64_t inode, int byte, uint64_t version, std::function<void(int)> cb)
{
cluster_op_t *op = new cluster_op_t();
op->opcode = OSD_OP_WRITE;
op->inode = inode;
op->offset = 0;
op->len = 4096;
op->version = version;
op->iov.push_back(malloc_or_die(op->len), op->len);
memset(op->iov.buf[0].iov_base, byte, op->len);
op->callback = [cb](cluster_op_t *op)
{
int retval = op->retval;
if (retval == op->len)
retval = 0;
free(op->iov.buf[0].iov_base);
delete op;
if (cb != NULL)
cb(retval);
};
cli->execute(op);
}
int main(int narg, char *args[])
{
json11::Json::object cfgo;
for (int i = 1; i < narg; i++)
{
if (args[i][0] == '-' && args[i][1] == '-')
{
const char *opt = args[i]+2;
cfgo[opt] = i == narg-1 ? "1" : args[++i];
}
}
json11::Json cfg(cfgo);
uint64_t inode = (cfg["pool_id"].uint64_value() << (64-POOL_ID_BITS))
| cfg["inode_id"].uint64_value();
uint64_t base_ver = 0;
// Create client
auto ringloop = new ring_loop_t(512);
auto epmgr = new epoll_manager_t(ringloop);
auto cli = new cluster_client_t(ringloop, epmgr->tfd, cfg);
cli->on_ready([&]()
{
send_read(cli, inode, [&](int r, uint64_t v)
{
if (r < 0)
{
fprintf(stderr, "Initial read operation failed\n");
exit(1);
}
base_ver = v;
// CAS v=1 = compare with zero, non-existing object
send_write(cli, inode, 0x01, base_ver+1, [&](int r)
{
if (r < 0)
{
fprintf(stderr, "CAS for non-existing object failed\n");
exit(1);
}
// Check that read returns the new version
send_read(cli, inode, [&](int r, uint64_t v)
{
if (r < 0)
{
fprintf(stderr, "Read operation failed after write\n");
exit(1);
}
if (v != base_ver+1)
{
fprintf(stderr, "Read operation failed to return the new version number\n");
exit(1);
}
// CAS v=2 = compare with v=1, existing object
send_write(cli, inode, 0x02, base_ver+2, [&](int r)
{
if (r < 0)
{
fprintf(stderr, "CAS for existing object failed\n");
exit(1);
}
// CAS v=2 again = compare with v=1, but version is 2. Must fail with -EINTR
send_write(cli, inode, 0x03, base_ver+2, [&](int r)
{
if (r != -EINTR)
{
fprintf(stderr, "CAS conflict detection failed\n");
exit(1);
}
printf("Basic CAS test succeeded\n");
exit(0);
});
});
});
});
});
});
while (1)
{
ringloop->loop();
ringloop->wait();
}
return 0;
}

254
src/vitastor_c.cpp

@ -0,0 +1,254 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
// Simplified C client library for QEMU, fio and other external drivers
// Also acts as a C-C++ proxy for the QEMU driver (QEMU headers don't compile with g++)
#include <sys/epoll.h>
#include "ringloop.h"
#include "epoll_manager.h"
#include "cluster_client.h"
#include "vitastor_c.h"
struct vitastor_qemu_fd_t
{
int fd;
std::function<void(int, int)> callback;
};
struct vitastor_c
{
std::map<int, vitastor_qemu_fd_t> handlers;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
timerfd_manager_t *tfd = NULL;
cluster_client_t *cli = NULL;
QEMUSetFDHandler *aio_set_fd_handler = NULL;
void *aio_ctx = NULL;
};
extern "C" {
static json11::Json vitastor_c_common_config(const char *config_path, const char *etcd_host, const char *etcd_prefix,
int use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level)
{
json11::Json::object cfg;
if (config_path)
cfg["config_path"] = std::string(config_path);
if (etcd_host)
cfg["etcd_address"] = std::string(etcd_host);
if (etcd_prefix)
cfg["etcd_prefix"] = std::string(etcd_prefix);
// -1 means unspecified
if (use_rdma >= 0)
cfg["use_rdma"] = use_rdma > 0;
if (rdma_device)
cfg["rdma_device"] = std::string(rdma_device);
if (rdma_port_num)
cfg["rdma_port_num"] = rdma_port_num;
if (rdma_gid_index)
cfg["rdma_gid_index"] = rdma_gid_index;
if (rdma_mtu)
cfg["rdma_mtu"] = rdma_mtu;
if (log_level)
cfg["log_level"] = log_level;
return json11::Json(cfg);
}
static void vitastor_c_read_handler(void *opaque)
{
vitastor_qemu_fd_t *data = (vitastor_qemu_fd_t *)opaque;
data->callback(data->fd, EPOLLIN);
}
static void vitastor_c_write_handler(void *opaque)
{
vitastor_qemu_fd_t *data = (vitastor_qemu_fd_t *)opaque;
data->callback(data->fd, EPOLLOUT);
}
vitastor_c *vitastor_c_create_qemu(QEMUSetFDHandler *aio_set_fd_handler, void *aio_context,
const char *config_path, const char *etcd_host, const char *etcd_prefix,
bool use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level)
{
json11::Json cfg_json = vitastor_c_common_config(
config_path, etcd_host, etcd_prefix, use_rdma,
rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu, log_level
);
vitastor_c *self = new vitastor_c;
self->aio_set_fd_handler = aio_set_fd_handler;
self->aio_ctx = aio_context;
self->tfd = new timerfd_manager_t([self](int fd, bool wr, std::function<void(int, int)> callback)
{
if (callback != NULL)
{
self->handlers[fd] = { .fd = fd, .callback = callback };
self->aio_set_fd_handler(self->aio_ctx, fd, false,
vitastor_c_read_handler, wr ? vitastor_c_write_handler : NULL, NULL, &self->handlers[fd]);</