Browse Source

Rework qemu_proxy into a C wrapper library with public header

allow-etcd-address-option
Vitaliy Filippov 3 months ago
parent
commit
511a89948b
  1. 1
      rpm/vitastor-el7.spec
  2. 1
      rpm/vitastor-el8.spec
  3. 112
      src/CMakeLists.txt
  4. 128
      src/fio_cluster.cpp
  5. 32
      src/qemu_driver.c
  6. 177
      src/qemu_proxy.cpp
  7. 35
      src/qemu_proxy.h
  8. 245
      src/vitastor_c.cpp
  9. 54
      src/vitastor_c.h

1
rpm/vitastor-el7.spec

@ -64,6 +64,7 @@ cp -r mon %buildroot/usr/lib/vitastor/mon
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
%_includedir/vitastor_c.h
/usr/lib/vitastor

1
rpm/vitastor-el8.spec

@ -61,6 +61,7 @@ cp -r mon %buildroot/usr/lib/vitastor
%_libdir/libfio_vitastor_sec.so
%_libdir/libvitastor_blk.so*
%_libdir/libvitastor_client.so*
%_includedir/vitastor_c.h
/usr/lib/vitastor

112
src/CMakeLists.txt

@ -4,6 +4,8 @@ project(vitastor)
include(GNUInstallDirs)
set(WITH_QEMU true CACHE BOOL "Build QEMU driver")
set(WITH_FIO true CACHE BOOL "Build FIO driver")
set(QEMU_PLUGINDIR qemu CACHE STRING "QEMU plugin directory suffix (qemu-kvm on RHEL)")
set(WITH_ASAN false CACHE BOOL "Build with AddressSanitizer")
if("${CMAKE_INSTALL_PREFIX}" MATCHES "^/usr/local/?$")
@ -36,7 +38,9 @@ string(REGEX REPLACE "([\\/\\-]D) *NDEBUG" "" CMAKE_C_FLAGS_RELWITHDEBINFO "${CM
find_package(PkgConfig)
pkg_check_modules(LIBURING REQUIRED liburing)
pkg_check_modules(GLIB REQUIRED glib-2.0)
if (${WITH_QEMU})
pkg_check_modules(GLIB REQUIRED glib-2.0)
endif (${WITH_QEMU})
pkg_check_modules(IBVERBS libibverbs)
if (IBVERBS_LIBRARIES)
add_definitions(-DWITH_RDMA)
@ -62,14 +66,16 @@ target_link_libraries(vitastor_blk
)
set_target_properties(vitastor_blk PROPERTIES VERSION ${VERSION} SOVERSION 0)
# libfio_vitastor_blk.so
add_library(fio_vitastor_blk SHARED
fio_engine.cpp
../json11/json11.cpp
)
target_link_libraries(fio_vitastor_blk
vitastor_blk
)
if (${WITH_FIO})
# libfio_vitastor_blk.so
add_library(fio_vitastor_blk SHARED
fio_engine.cpp
../json11/json11.cpp
)
target_link_libraries(fio_vitastor_blk
vitastor_blk
)
endif (${WITH_FIO})
# libvitastor_common.a
set(MSGR_RDMA "")
@ -96,19 +102,23 @@ target_link_libraries(vitastor-osd
${IBVERBS_LIBRARIES}
)
# libfio_vitastor_sec.so
add_library(fio_vitastor_sec SHARED
fio_sec_osd.cpp
rw_blocking.cpp
)
target_link_libraries(fio_vitastor_sec
tcmalloc_minimal
)
if (${WITH_FIO})
# libfio_vitastor_sec.so
add_library(fio_vitastor_sec SHARED
fio_sec_osd.cpp
rw_blocking.cpp
)
target_link_libraries(fio_vitastor_sec
tcmalloc_minimal
)
endif (${WITH_FIO})
# libvitastor_client.so
add_library(vitastor_client SHARED
cluster_client.cpp
vitastor_c.cpp
)
set_target_properties(vitastor_client PROPERTIES PUBLIC_HEADER "vitastor_c.h")
target_link_libraries(vitastor_client
vitastor_common
tcmalloc_minimal
@ -117,13 +127,15 @@ target_link_libraries(vitastor_client
)
set_target_properties(vitastor_client PROPERTIES VERSION ${VERSION} SOVERSION 0)
# libfio_vitastor.so
add_library(fio_vitastor SHARED
fio_cluster.cpp
)
target_link_libraries(fio_vitastor
vitastor_client
)
if (${WITH_FIO})
# libfio_vitastor.so
add_library(fio_vitastor SHARED
fio_cluster.cpp
)
target_link_libraries(fio_vitastor
vitastor_client
)
endif (${WITH_FIO})
# vitastor-nbd
add_executable(vitastor-nbd
@ -146,27 +158,24 @@ add_executable(vitastor-dump-journal
dump_journal.cpp crc32c.c
)
# qemu_driver.so
add_library(qemu_proxy STATIC qemu_proxy.cpp)
target_compile_options(qemu_proxy PUBLIC -fPIC)
target_include_directories(qemu_proxy PUBLIC
../qemu/b/qemu
../qemu/include
${GLIB_INCLUDE_DIRS}
)
target_link_libraries(qemu_proxy
vitastor_client
)
add_library(qemu_vitastor SHARED
qemu_driver.c
)
target_link_libraries(qemu_vitastor
qemu_proxy
)
set_target_properties(qemu_vitastor PROPERTIES
PREFIX ""
OUTPUT_NAME "block-vitastor"
)
if (${WITH_QEMU})
# qemu_driver.so
add_library(qemu_vitastor SHARED
qemu_driver.c
)
target_include_directories(qemu_vitastor PUBLIC
../qemu/b/qemu
../qemu/include
${GLIB_INCLUDE_DIRS}
)
target_link_libraries(qemu_vitastor
vitastor_client
)
set_target_properties(qemu_vitastor PROPERTIES
PREFIX ""
OUTPUT_NAME "block-vitastor"
)
endif (${WITH_QEMU})
### Test stubs
@ -226,5 +235,14 @@ target_include_directories(test_cluster_client PUBLIC ${CMAKE_SOURCE_DIR}/src/mo
### Install
install(TARGETS vitastor-osd vitastor-dump-journal vitastor-nbd vitastor-rm RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
install(TARGETS fio_vitastor fio_vitastor_blk fio_vitastor_sec vitastor_blk vitastor_client LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
install(TARGETS qemu_vitastor LIBRARY DESTINATION /usr/${CMAKE_INSTALL_LIBDIR}/${QEMU_PLUGINDIR})
install(
TARGETS vitastor_blk vitastor_client
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
)
if (${WITH_FIO})
install(TARGETS fio_vitastor fio_vitastor_blk fio_vitastor_sec LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif (${WITH_FIO})
if (${WITH_QEMU})
install(TARGETS qemu_vitastor LIBRARY DESTINATION /usr/${CMAKE_INSTALL_LIBDIR}/${QEMU_PLUGINDIR})
endif (${WITH_QEMU})

128
src/fio_cluster.cpp

@ -25,20 +25,17 @@
#include <vector>
#include "epoll_manager.h"
#include "cluster_client.h"
#include "vitastor_c.h"
#include "fio_headers.h"
struct sec_data
{
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
cluster_client_t *cli = NULL;
inode_watch_t *watch = NULL;
vitastor_c *cli = NULL;
void *watch = NULL;
bool last_sync = false;
/* The list of completed io_u structs. */
std::vector<io_u*> completed;
uint64_t op_n = 0, inflight = 0;
uint64_t inflight = 0;
bool trace = false;
};
@ -189,6 +186,12 @@ static struct fio_option options[] = {
},
};
static void watch_callback(long retval, void *opaque)
{
struct sec_data *bsd = (struct sec_data*)opaque;
bsd->watch = (void*)retval;
}
static int sec_setup(struct thread_data *td)
{
sec_options *o = (sec_options*)td->eo;
@ -209,27 +212,6 @@ static int sec_setup(struct thread_data *td)
td->o.open_files++;
}
json11::Json::object cfg;
if (o->config_path)
cfg["config_path"] = std::string(o->config_path);
if (o->etcd_host)
cfg["etcd_address"] = std::string(o->etcd_host);
if (o->etcd_prefix)
cfg["etcd_prefix"] = std::string(o->etcd_prefix);
if (o->rdma_device)
cfg["rdma_device"] = std::string(o->rdma_device);
if (o->rdma_port_num)
cfg["rdma_port_num"] = o->rdma_port_num;
if (o->rdma_gid_index)
cfg["rdma_gid_index"] = o->rdma_gid_index;
if (o->rdma_mtu)
cfg["rdma_mtu"] = o->rdma_mtu;
if (o->cluster_log)
cfg["log_level"] = o->cluster_log;
if (o->use_rdma != -1)
cfg["use_rdma"] = o->use_rdma;
json11::Json cfg_json(cfg);
if (!o->image)
{
if (!(o->inode & ((1l << (64-POOL_ID_BITS)) - 1)))
@ -251,20 +233,20 @@ static int sec_setup(struct thread_data *td)
{
o->inode = 0;
}
bsd->ringloop = new ring_loop_t(512);
bsd->epmgr = new epoll_manager_t(bsd->ringloop);
bsd->cli = new cluster_client_t(bsd->ringloop, bsd->epmgr->tfd, cfg_json);
bsd->cli = vitastor_c_create_uring(o->config_path, o->etcd_host, o->etcd_prefix,
o->use_rdma, o->rdma_device, o->rdma_port_num, o->rdma_gid_index, o->rdma_mtu, o->cluster_log);
if (o->image)
{
while (!bsd->cli->is_ready())
bsd->watch = NULL;
vitastor_c_watch_inode(bsd->cli, o->image, watch_callback, bsd);
while (true)
{
bsd->ringloop->loop();
if (bsd->cli->is_ready())
vitastor_c_uring_handle_events(bsd->cli);
if (bsd->watch)
break;
bsd->ringloop->wait();
vitastor_c_uring_wait_events(bsd->cli);
}
bsd->watch = bsd->cli->st_cli.watch_inode(std::string(o->image));
td->files[0]->real_file_size = bsd->watch->cfg.size;
td->files[0]->real_file_size = vitastor_c_inode_get_size(bsd->watch);
}
bsd->trace = o->trace ? true : false;
@ -279,11 +261,9 @@ static void sec_cleanup(struct thread_data *td)
{
if (bsd->watch)
{
bsd->cli->st_cli.close_watch(bsd->watch);
vitastor_c_close_watch(bsd->cli, bsd->watch);
}
delete bsd->cli;
delete bsd->epmgr;
delete bsd->ringloop;
vitastor_c_destroy(bsd->cli);
delete bsd;
}
}
@ -294,12 +274,26 @@ static int sec_init(struct thread_data *td)
return 0;
}
static void io_callback(long retval, void *opaque)
{
struct io_u *io = (struct io_u*)opaque;
io->error = retval < 0 ? -retval : 0;
sec_data *bsd = (sec_data*)io->engine_data;
bsd->inflight--;
bsd->completed.push_back(io);
if (bsd->trace)
{
printf("--- %s 0x%lx retval=%ld\n", io->ddir == DDIR_READ ? "READ" :
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), (uint64_t)io, retval);
}
}
/* Begin read or write request. */
static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
{
sec_options *opt = (sec_options*)td->eo;
sec_data *bsd = (sec_data*)td->io_ops_data;
int n = bsd->op_n;
struct iovec iov;
fio_ro_check(td, io);
if (io->ddir == DDIR_SYNC && bsd->last_sync)
@ -308,32 +302,29 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
}
io->engine_data = bsd;
cluster_op_t *op = new cluster_op_t;
io->error = 0;
bsd->inflight++;
op->inode = opt->image ? bsd->watch->cfg.num : opt->inode;
uint64_t inode = opt->image ? vitastor_c_inode_get_num(bsd->watch) : opt->inode;
switch (io->ddir)
{
case DDIR_READ:
op->opcode = OSD_OP_READ;
op->offset = io->offset;
op->len = io->xfer_buflen;
op->iov.push_back(io->xfer_buf, io->xfer_buflen);
iov = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
vitastor_c_read(bsd->cli, inode, io->offset, io->xfer_buflen, &iov, 1, io_callback, io);
bsd->last_sync = false;
break;
case DDIR_WRITE:
if (opt->image && bsd->watch->cfg.readonly)
if (opt->image && vitastor_c_inode_get_readonly(bsd->watch))
{
io->error = EROFS;
return FIO_Q_COMPLETED;
}
op->opcode = OSD_OP_WRITE;
op->offset = io->offset;
op->len = io->xfer_buflen;
op->iov.push_back(io->xfer_buf, io->xfer_buflen);
iov = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen };
vitastor_c_write(bsd->cli, inode, io->offset, io->xfer_buflen, &iov, 1, io_callback, io);
bsd->last_sync = false;
break;
case DDIR_SYNC:
op->opcode = OSD_OP_SYNC;
vitastor_c_sync(bsd->cli, io_callback, io);
bsd->last_sync = true;
break;
default:
@ -341,39 +332,20 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
return FIO_Q_COMPLETED;
}
op->callback = [io, n](cluster_op_t *op)
{
io->error = op->retval < 0 ? -op->retval : 0;
sec_data *bsd = (sec_data*)io->engine_data;
bsd->inflight--;
bsd->completed.push_back(io);
if (bsd->trace)
{
printf("--- %s n=%d retval=%d\n", io->ddir == DDIR_READ ? "READ" :
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n, op->retval);
}
delete op;
};
if (opt->trace)
{
if (io->ddir == DDIR_SYNC)
{
printf("+++ SYNC # %d\n", n);
printf("+++ SYNC 0x%lx\n", (uint64_t)io);
}
else
{
printf("+++ %s # %d 0x%llx+%llx\n",
printf("+++ %s 0x%lx 0x%llx+%llx\n",
io->ddir == DDIR_READ ? "READ" : "WRITE",
n, io->offset, io->xfer_buflen);
(uint64_t)io, io->offset, io->xfer_buflen);
}
}
io->error = 0;
bsd->inflight++;
bsd->op_n++;
bsd->cli->execute(op);
if (io->error != 0)
return FIO_Q_COMPLETED;
return FIO_Q_QUEUED;
@ -384,10 +356,10 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
sec_data *bsd = (sec_data*)td->io_ops_data;
while (true)
{
bsd->ringloop->loop();
vitastor_c_uring_handle_events(bsd->cli);
if (bsd->completed.size() >= min)
break;
bsd->ringloop->wait();
vitastor_c_uring_wait_events(bsd->cli);
}
return bsd->completed.size();
}

32
src/qemu_driver.c

@ -26,7 +26,7 @@
#define qobject_unref QDECREF
#endif
#include "qemu_proxy.h"
#include "vitastor_c.h"
void qemu_module_dummy(void)
{
@ -48,6 +48,7 @@ typedef struct VitastorClient
uint64_t pool;
uint64_t size;
long readonly;
int use_rdma;
char *rdma_device;
int rdma_port_num;
int rdma_gid_index;
@ -132,6 +133,7 @@ static void vitastor_parse_filename(const char *filename, QDict *options, Error
if (!strcmp(name, "inode") ||
!strcmp(name, "pool") ||
!strcmp(name, "size") ||
!strcmp(name, "use_rdma") ||
!strcmp(name, "rdma_port_num") ||
!strcmp(name, "rdma_gid_index") ||
!strcmp(name, "rdma_mtu"))
@ -181,7 +183,7 @@ static void coroutine_fn vitastor_co_get_metadata(VitastorRPC *task)
task->co = qemu_coroutine_self();
qemu_mutex_lock(&client->mutex);
vitastor_proxy_watch_metadata(client->proxy, client->image, vitastor_co_generic_bh_cb, task);
vitastor_c_watch_inode(client->proxy, client->image, vitastor_co_generic_bh_cb, task);
qemu_mutex_unlock(&client->mutex);
while (!task->complete)
@ -198,13 +200,14 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
client->config_path = g_strdup(qdict_get_try_str(options, "config_path"));
client->etcd_host = g_strdup(qdict_get_try_str(options, "etcd_host"));
client->etcd_prefix = g_strdup(qdict_get_try_str(options, "etcd_prefix"));
client->use_rdma = qdict_get_try_int(options, "use_rdma", -1);
client->rdma_device = g_strdup(qdict_get_try_str(options, "rdma_device"));
client->rdma_port_num = qdict_get_try_int(options, "rdma_port_num", 0);
client->rdma_gid_index = qdict_get_try_int(options, "rdma_gid_index", 0);
client->rdma_mtu = qdict_get_try_int(options, "rdma_mtu", 0);
client->proxy = vitastor_proxy_create(
bdrv_get_aio_context(bs), client->config_path, client->etcd_host, client->etcd_prefix,
client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu
client->proxy = vitastor_c_create_qemu(
(QEMUSetFDHandler*)aio_set_fd_handler, bdrv_get_aio_context(bs), client->config_path, client->etcd_host, client->etcd_prefix,
client->use_rdma, client->rdma_device, client->rdma_port_num, client->rdma_gid_index, client->rdma_mtu, 0
);
client->image = g_strdup(qdict_get_try_str(options, "image"));
client->readonly = (flags & BDRV_O_RDWR) ? 1 : 0;
@ -224,9 +227,9 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
BDRV_POLL_WHILE(bs, !task.complete);
client->watch = (void*)task.ret;
client->readonly = client->readonly || vitastor_proxy_get_readonly(client->watch);
client->size = vitastor_proxy_get_size(client->watch);
if (!vitastor_proxy_get_inode_num(client->watch))
client->readonly = client->readonly || vitastor_c_inode_get_readonly(client->watch);
client->size = vitastor_c_inode_get_size(client->watch);
if (!vitastor_c_inode_get_num(client->watch))
{
error_setg(errp, "image does not exist");
vitastor_close(bs);
@ -255,6 +258,7 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
}
bs->total_sectors = client->size / BDRV_SECTOR_SIZE;
//client->aio_context = bdrv_get_aio_context(bs);
qdict_del(options, "use_rdma");
qdict_del(options, "rdma_mtu");
qdict_del(options, "rdma_gid_index");
qdict_del(options, "rdma_port_num");
@ -272,7 +276,7 @@ static int vitastor_file_open(BlockDriverState *bs, QDict *options, int flags, E
static void vitastor_close(BlockDriverState *bs)
{
VitastorClient *client = bs->opaque;
vitastor_proxy_destroy(client->proxy);
vitastor_c_destroy(client->proxy);
qemu_mutex_destroy(&client->mutex);
if (client->config_path)
g_free(client->config_path);
@ -410,9 +414,9 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs, uint64_t offset
vitastor_co_init_task(bs, &task);
task.iov = iov;
uint64_t inode = client->watch ? vitastor_proxy_get_inode_num(client->watch) : client->inode;
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_proxy_rw(0, client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task);
vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -430,9 +434,9 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs, uint64_t offse
vitastor_co_init_task(bs, &task);
task.iov = iov;
uint64_t inode = client->watch ? vitastor_proxy_get_inode_num(client->watch) : client->inode;
uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode;
qemu_mutex_lock(&client->mutex);
vitastor_proxy_rw(1, client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task);
vitastor_c_write(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)
@ -462,7 +466,7 @@ static int coroutine_fn vitastor_co_flush(BlockDriverState *bs)
vitastor_co_init_task(bs, &task);
qemu_mutex_lock(&client->mutex);
vitastor_proxy_sync(client->proxy, vitastor_co_generic_bh_cb, &task);
vitastor_c_sync(client->proxy, vitastor_co_generic_bh_cb, &task);
qemu_mutex_unlock(&client->mutex);
while (!task.complete)

177
src/qemu_proxy.cpp

@ -1,177 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
// C-C++ proxy for the QEMU driver
// (QEMU headers don't compile with g++)
#include <sys/epoll.h>
#include "cluster_client.h"
typedef void* AioContext;
#include "qemu_proxy.h"
extern "C"
{
// QEMU
typedef void IOHandler(void *opaque);
void aio_set_fd_handler(AioContext *ctx, int fd, int is_external, IOHandler *fd_read, IOHandler *fd_write, void *poll_fn, void *opaque);
}
struct QemuProxyData
{
int fd;
std::function<void(int, int)> callback;
};
class QemuProxy
{
std::map<int, QemuProxyData> handlers;
public:
timerfd_manager_t *tfd;
cluster_client_t *cli;
AioContext *ctx;
QemuProxy(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
{
this->ctx = ctx;
json11::Json::object cfg;
if (config_path)
cfg["config_path"] = std::string(config_path);
if (etcd_host)
cfg["etcd_address"] = std::string(etcd_host);
if (etcd_prefix)
cfg["etcd_prefix"] = std::string(etcd_prefix);
if (rdma_device)
cfg["rdma_device"] = std::string(rdma_device);
if (rdma_port_num)
cfg["rdma_port_num"] = rdma_port_num;
if (rdma_gid_index)
cfg["rdma_gid_index"] = rdma_gid_index;
if (rdma_mtu)
cfg["rdma_mtu"] = rdma_mtu;
json11::Json cfg_json(cfg);
tfd = new timerfd_manager_t([this](int fd, bool wr, std::function<void(int, int)> callback) { set_fd_handler(fd, wr, callback); });
cli = new cluster_client_t(NULL, tfd, cfg_json);
}
~QemuProxy()
{
delete cli;
delete tfd;
}
void set_fd_handler(int fd, bool wr, std::function<void(int, int)> callback)
{
if (callback != NULL)
{
handlers[fd] = { .fd = fd, .callback = callback };
aio_set_fd_handler(ctx, fd, false, &QemuProxy::read_handler, wr ? &QemuProxy::write_handler : NULL, NULL, &handlers[fd]);
}
else
{
handlers.erase(fd);
aio_set_fd_handler(ctx, fd, false, NULL, NULL, NULL, NULL);
}
}
static void read_handler(void *opaque)
{
QemuProxyData *data = (QemuProxyData *)opaque;
data->callback(data->fd, EPOLLIN);
}
static void write_handler(void *opaque)
{
QemuProxyData *data = (QemuProxyData *)opaque;
data->callback(data->fd, EPOLLOUT);
}
};
extern "C" {
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu)
{
QemuProxy *p = new QemuProxy(ctx, config_path, etcd_host, etcd_prefix, rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu);
return p;
}
void vitastor_proxy_destroy(void *client)
{
QemuProxy *p = (QemuProxy*)client;
delete p;
}
void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque)
{
QemuProxy *p = (QemuProxy*)client;
cluster_op_t *op = new cluster_op_t;
op->opcode = write ? OSD_OP_WRITE : OSD_OP_READ;
op->inode = inode;
op->offset = offset;
op->len = len;
for (int i = 0; i < iovcnt; i++)
{
op->iov.push_back(iov[i].iov_base, iov[i].iov_len);
}
op->callback = [cb, opaque](cluster_op_t *op)
{
cb(op->retval, opaque);
delete op;
};
p->cli->execute(op);
}
void vitastor_proxy_sync(void *client, VitastorIOHandler cb, void *opaque)
{
QemuProxy *p = (QemuProxy*)client;
cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_SYNC;
op->callback = [cb, opaque](cluster_op_t *op)
{
cb(op->retval, opaque);
delete op;
};
p->cli->execute(op);
}
void vitastor_proxy_watch_metadata(void *client, char *image, VitastorIOHandler cb, void *opaque)
{
QemuProxy *p = (QemuProxy*)client;
p->cli->on_ready([=]()
{
auto watch = p->cli->st_cli.watch_inode(std::string(image));
cb((long)watch, opaque);
});
}
void vitastor_proxy_close_watch(void *client, void *watch)
{
QemuProxy *p = (QemuProxy*)client;
p->cli->st_cli.close_watch((inode_watch_t*)watch);
}
uint64_t vitastor_proxy_get_size(void *watch_ptr)
{
inode_watch_t *watch = (inode_watch_t*)watch_ptr;
return watch->cfg.size;
}
uint64_t vitastor_proxy_get_inode_num(void *watch_ptr)
{
inode_watch_t *watch = (inode_watch_t*)watch_ptr;
return watch->cfg.num;
}
int vitastor_proxy_get_readonly(void *watch_ptr)
{
inode_watch_t *watch = (inode_watch_t*)watch_ptr;
return watch->cfg.readonly;
}
}

35
src/qemu_proxy.h

@ -1,35 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
#ifndef VITASTOR_QEMU_PROXY_H
#define VITASTOR_QEMU_PROXY_H
#ifndef POOL_ID_BITS
#define POOL_ID_BITS 16
#endif
#include <stdint.h>
#include <sys/uio.h>
#ifdef __cplusplus
extern "C" {
#endif
// Our exports
typedef void VitastorIOHandler(long retval, void *opaque);
void* vitastor_proxy_create(AioContext *ctx, const char *config_path, const char *etcd_host, const char *etcd_prefix,
const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu);
void vitastor_proxy_destroy(void *client);
void vitastor_proxy_rw(int write, void *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque);
void vitastor_proxy_sync(void *client, VitastorIOHandler cb, void *opaque);
void vitastor_proxy_watch_metadata(void *client, char *image, VitastorIOHandler cb, void *opaque);
void vitastor_proxy_close_watch(void *client, void *watch);
uint64_t vitastor_proxy_get_size(void *watch);
uint64_t vitastor_proxy_get_inode_num(void *watch);
int vitastor_proxy_get_readonly(void *watch);
#ifdef __cplusplus
}
#endif
#endif

245
src/vitastor_c.cpp

@ -0,0 +1,245 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
// Simplified C client library for QEMU, fio and other external drivers
// Also acts as a C-C++ proxy for the QEMU driver (QEMU headers don't compile with g++)
#include <sys/epoll.h>
#include "ringloop.h"
#include "epoll_manager.h"
#include "cluster_client.h"
#include "vitastor_c.h"
struct vitastor_qemu_fd_t
{
int fd;
std::function<void(int, int)> callback;
};
struct vitastor_c
{
std::map<int, vitastor_qemu_fd_t> handlers;
ring_loop_t *ringloop = NULL;
epoll_manager_t *epmgr = NULL;
timerfd_manager_t *tfd = NULL;
cluster_client_t *cli = NULL;
QEMUSetFDHandler *aio_set_fd_handler = NULL;
void *aio_ctx = NULL;
};
extern "C" {
static json11::Json vitastor_c_common_config(const char *config_path, const char *etcd_host, const char *etcd_prefix,
int use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level)
{
json11::Json::object cfg;
if (config_path)
cfg["config_path"] = std::string(config_path);
if (etcd_host)
cfg["etcd_address"] = std::string(etcd_host);
if (etcd_prefix)
cfg["etcd_prefix"] = std::string(etcd_prefix);
// -1 means unspecified
if (use_rdma >= 0)
cfg["use_rdma"] = use_rdma > 0;
if (rdma_device)
cfg["rdma_device"] = std::string(rdma_device);
if (rdma_port_num)
cfg["rdma_port_num"] = rdma_port_num;
if (rdma_gid_index)
cfg["rdma_gid_index"] = rdma_gid_index;
if (rdma_mtu)
cfg["rdma_mtu"] = rdma_mtu;
if (log_level)
cfg["log_level"] = log_level;
return json11::Json(cfg);
}
static void vitastor_c_read_handler(void *opaque)
{
vitastor_qemu_fd_t *data = (vitastor_qemu_fd_t *)opaque;
data->callback(data->fd, EPOLLIN);
}
static void vitastor_c_write_handler(void *opaque)
{
vitastor_qemu_fd_t *data = (vitastor_qemu_fd_t *)opaque;
data->callback(data->fd, EPOLLOUT);
}
vitastor_c *vitastor_c_create_qemu(QEMUSetFDHandler *aio_set_fd_handler, void *aio_context,
const char *config_path, const char *etcd_host, const char *etcd_prefix,
bool use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level)
{
json11::Json cfg_json = vitastor_c_common_config(
config_path, etcd_host, etcd_prefix, use_rdma,
rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu, log_level
);
vitastor_c *self = new vitastor_c;
self->aio_set_fd_handler = aio_set_fd_handler;
self->aio_ctx = aio_context;
self->tfd = new timerfd_manager_t([self](int fd, bool wr, std::function<void(int, int)> callback)
{
if (callback != NULL)
{
self->handlers[fd] = { .fd = fd, .callback = callback };
self->aio_set_fd_handler(self->aio_ctx, fd, false,
vitastor_c_read_handler, wr ? vitastor_c_write_handler : NULL, NULL, &self->handlers[fd]);
}
else
{
self->handlers.erase(fd);
self->aio_set_fd_handler(self->aio_ctx, fd, false, NULL, NULL, NULL, NULL);
}
});
self->cli = new cluster_client_t(NULL, self->tfd, cfg_json);
return self;
}
vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_host, const char *etcd_prefix,
int use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level)
{
json11::Json cfg_json = vitastor_c_common_config(
config_path, etcd_host, etcd_prefix, use_rdma,
rdma_device, rdma_port_num, rdma_gid_index, rdma_mtu, log_level
);
vitastor_c *self = new vitastor_c;
self->ringloop = new ring_loop_t(512);
self->epmgr = new epoll_manager_t(self->ringloop);
self->cli = new cluster_client_t(self->ringloop, self->epmgr->tfd, cfg_json);
return self;
}
vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len)
{
json11::Json::object cfg;
for (int i = 0; i < options_len-1; i += 2)
{
cfg[options[i]] = std::string(options[i+1]);
}
json11::Json cfg_json(cfg);
vitastor_c *self = new vitastor_c;
self->ringloop = new ring_loop_t(512);
self->epmgr = new epoll_manager_t(self->ringloop);
self->cli = new cluster_client_t(self->ringloop, self->epmgr->tfd, cfg_json);
return self;
}
void vitastor_c_destroy(vitastor_c *client)
{
delete client->cli;
if (client->epmgr)
delete client->epmgr;
else
delete client->tfd;
if (client->ringloop)
delete client->ringloop;
delete client;
}
int vitastor_c_is_ready(vitastor_c *client)
{
return client->cli->is_ready();
}
void vitastor_c_uring_wait_ready(vitastor_c *client)
{
while (!client->cli->is_ready())
{
client->ringloop->loop();
if (client->cli->is_ready())
break;
client->ringloop->wait();
}
}
void vitastor_c_uring_handle_events(vitastor_c *client)
{
client->ringloop->loop();
}
void vitastor_c_uring_wait_events(vitastor_c *client)
{
client->ringloop->wait();
}
static inline void vitastor_c_rw(bool write, vitastor_c *p, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque)
{
cluster_op_t *op = new cluster_op_t;
op->opcode = write ? OSD_OP_WRITE : OSD_OP_READ;
op->inode = inode;
op->offset = offset;
op->len = len;
for (int i = 0; i < iovcnt; i++)
{
op->iov.push_back(iov[i].iov_base, iov[i].iov_len);
}
op->callback = [cb, opaque](cluster_op_t *op)
{
cb(op->retval, opaque);
delete op;
};
p->cli->execute(op);
}
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque)
{
vitastor_c_rw(0, client, inode, offset, len, iov, iovcnt, cb, opaque);
}
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque)
{
vitastor_c_rw(1, client, inode, offset, len, iov, iovcnt, cb, opaque);
}
void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque)
{
cluster_op_t *op = new cluster_op_t;
op->opcode = OSD_OP_SYNC;
op->callback = [cb, opaque](cluster_op_t *op)
{
cb(op->retval, opaque);
delete op;
};
client->cli->execute(op);
}
void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler cb, void *opaque)
{
client->cli->on_ready([=]()
{
auto watch = client->cli->st_cli.watch_inode(std::string(image));
cb((long)watch, opaque);
});
}
void vitastor_c_close_watch(vitastor_c *client, void *handle)
{
client->cli->st_cli.close_watch((inode_watch_t*)handle);
}
uint64_t vitastor_c_inode_get_size(void *handle)
{
inode_watch_t *watch = (inode_watch_t*)handle;
return watch->cfg.size;
}
uint64_t vitastor_c_inode_get_num(void *handle)
{
inode_watch_t *watch = (inode_watch_t*)handle;
return watch->cfg.num;
}
int vitastor_c_inode_get_readonly(void *handle)
{
inode_watch_t *watch = (inode_watch_t*)handle;
return watch->cfg.readonly;
}
}

54
src/vitastor_c.h

@ -0,0 +1,54 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 or GNU GPL-2.0+ (see README.md for details)
// Simplified C client library for QEMU, fio and other external drivers
#ifndef VITASTOR_QEMU_PROXY_H
#define VITASTOR_QEMU_PROXY_H
#ifndef POOL_ID_BITS
#define POOL_ID_BITS 16
#endif
#include <stdint.h>
#include <sys/uio.h>
#ifdef __cplusplus
extern "C" {
#endif
struct vitastor_c;
typedef struct vitastor_c vitastor_c;
typedef void VitastorIOHandler(long retval, void *opaque);
// QEMU
typedef void IOHandler(void *opaque);
typedef void QEMUSetFDHandler(void *ctx, int fd, int is_external, IOHandler *fd_read, IOHandler *fd_write, void *poll_fn, void *opaque);
vitastor_c *vitastor_c_create_qemu(QEMUSetFDHandler *aio_set_fd_handler, void *aio_context,
const char *config_path, const char *etcd_host, const char *etcd_prefix,
bool use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level);
vitastor_c *vitastor_c_create_uring(const char *config_path, const char *etcd_host, const char *etcd_prefix,
int use_rdma, const char *rdma_device, int rdma_port_num, int rdma_gid_index, int rdma_mtu, int log_level);
vitastor_c *vitastor_c_create_uring_json(const char **options, int options_len);
void vitastor_c_destroy(vitastor_c *client);
int vitastor_c_is_ready(vitastor_c *client);
void vitastor_c_uring_wait_ready(vitastor_c *client);
void vitastor_c_uring_handle_events(vitastor_c *client);
void vitastor_c_uring_wait_events(vitastor_c *client);
void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque);
void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len,
struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque);
void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque);
void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler cb, void *opaque);
void vitastor_c_close_watch(vitastor_c *client, void *handle);
uint64_t vitastor_c_inode_get_size(void *handle);
uint64_t vitastor_c_inode_get_num(void *handle);
int vitastor_c_inode_get_readonly(void *handle);
#ifdef __cplusplus
}
#endif
#endif
Loading…
Cancel
Save