diff --git a/src/fio_cluster.cpp b/src/fio_cluster.cpp index 78419eaa..edc82822 100644 --- a/src/fio_cluster.cpp +++ b/src/fio_cluster.cpp @@ -186,10 +186,10 @@ static struct fio_option options[] = { }, }; -static void watch_callback(long retval, void *opaque) +static void watch_callback(void *opaque, long watch) { struct sec_data *bsd = (struct sec_data*)opaque; - bsd->watch = (void*)retval; + bsd->watch = (void*)watch; } static int sec_setup(struct thread_data *td) @@ -274,7 +274,7 @@ static int sec_init(struct thread_data *td) return 0; } -static void io_callback(long retval, void *opaque) +static void io_callback(void *opaque, long retval) { struct io_u *io = (struct io_u*)opaque; io->error = retval < 0 ? -retval : 0; @@ -288,6 +288,11 @@ static void io_callback(long retval, void *opaque) } } +static void read_callback(void *opaque, long retval, uint64_t version) +{ + io_callback(opaque, retval); +} + /* Begin read or write request. */ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) { @@ -310,7 +315,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) { case DDIR_READ: iov = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen }; - vitastor_c_read(bsd->cli, inode, io->offset, io->xfer_buflen, &iov, 1, io_callback, io); + vitastor_c_read(bsd->cli, inode, io->offset, io->xfer_buflen, &iov, 1, read_callback, io); bsd->last_sync = false; break; case DDIR_WRITE: @@ -320,7 +325,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) return FIO_Q_COMPLETED; } iov = { .iov_base = io->xfer_buf, .iov_len = io->xfer_buflen }; - vitastor_c_write(bsd->cli, inode, io->offset, io->xfer_buflen, &iov, 1, io_callback, io); + vitastor_c_write(bsd->cli, inode, io->offset, io->xfer_buflen, 0, &iov, 1, io_callback, io); bsd->last_sync = false; break; case DDIR_SYNC: diff --git a/src/qemu_driver.c b/src/qemu_driver.c index 4e647f26..5c282da1 100644 --- a/src/qemu_driver.c +++ b/src/qemu_driver.c @@ -66,7 +66,8 @@ typedef struct VitastorRPC } VitastorRPC; static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task); -static void vitastor_co_generic_bh_cb(long retval, void *opaque); +static void vitastor_co_generic_bh_cb(void *opaque, long retval); +static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version); static void vitastor_close(BlockDriverState *bs); static char *qemu_rbd_next_tok(char *src, char delim, char **p) @@ -391,7 +392,7 @@ static void vitastor_co_init_task(BlockDriverState *bs, VitastorRPC *task) }; } -static void vitastor_co_generic_bh_cb(long retval, void *opaque) +static void vitastor_co_generic_bh_cb(void *opaque, long retval) { VitastorRPC *task = opaque; task->ret = retval; @@ -407,6 +408,11 @@ static void vitastor_co_generic_bh_cb(long retval, void *opaque) } } +static void vitastor_co_read_cb(void *opaque, long retval, uint64_t version) +{ + vitastor_co_generic_bh_cb(opaque, retval); +} + static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *iov, int flags) { VitastorClient *client = bs->opaque; @@ -416,7 +422,7 @@ static int coroutine_fn vitastor_co_preadv(BlockDriverState *bs, uint64_t offset uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode; qemu_mutex_lock(&client->mutex); - vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task); + vitastor_c_read(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_read_cb, &task); qemu_mutex_unlock(&client->mutex); while (!task.complete) @@ -436,7 +442,7 @@ static int coroutine_fn vitastor_co_pwritev(BlockDriverState *bs, uint64_t offse uint64_t inode = client->watch ? vitastor_c_inode_get_num(client->watch) : client->inode; qemu_mutex_lock(&client->mutex); - vitastor_c_write(client->proxy, inode, offset, bytes, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task); + vitastor_c_write(client->proxy, inode, offset, bytes, 0, iov->iov, iov->niov, vitastor_co_generic_bh_cb, &task); qemu_mutex_unlock(&client->mutex); while (!task.complete) diff --git a/src/vitastor_c.cpp b/src/vitastor_c.cpp index 9f6cfa6d..9300888e 100644 --- a/src/vitastor_c.cpp +++ b/src/vitastor_c.cpp @@ -166,11 +166,11 @@ void vitastor_c_uring_wait_events(vitastor_c *client) client->ringloop->wait(); } -static inline void vitastor_c_rw(bool write, vitastor_c *p, uint64_t inode, uint64_t offset, uint64_t len, - struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque) +void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, + struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque) { cluster_op_t *op = new cluster_op_t; - op->opcode = write ? OSD_OP_WRITE : OSD_OP_READ; + op->opcode = OSD_OP_READ; op->inode = inode; op->offset = offset; op->len = len; @@ -180,22 +180,31 @@ static inline void vitastor_c_rw(bool write, vitastor_c *p, uint64_t inode, uint } op->callback = [cb, opaque](cluster_op_t *op) { - cb(op->retval, opaque); + cb(opaque, op->retval, op->version); delete op; }; - p->cli->execute(op); + client->cli->execute(op); } -void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, +void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version, struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque) { - vitastor_c_rw(0, client, inode, offset, len, iov, iovcnt, cb, opaque); -} - -void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, - struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque) -{ - vitastor_c_rw(1, client, inode, offset, len, iov, iovcnt, cb, opaque); + cluster_op_t *op = new cluster_op_t; + op->opcode = OSD_OP_WRITE; + op->inode = inode; + op->offset = offset; + op->len = len; + op->version = check_version; + for (int i = 0; i < iovcnt; i++) + { + op->iov.push_back(iov[i].iov_base, iov[i].iov_len); + } + op->callback = [cb, opaque](cluster_op_t *op) + { + cb(opaque, op->retval); + delete op; + }; + client->cli->execute(op); } void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque) @@ -204,7 +213,7 @@ void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque) op->opcode = OSD_OP_SYNC; op->callback = [cb, opaque](cluster_op_t *op) { - cb(op->retval, opaque); + cb(opaque, op->retval); delete op; }; client->cli->execute(op); @@ -215,7 +224,7 @@ void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler c client->cli->on_ready([=]() { auto watch = client->cli->st_cli.watch_inode(std::string(image)); - cb((long)watch, opaque); + cb(opaque, (long)watch); }); } diff --git a/src/vitastor_c.h b/src/vitastor_c.h index 7c816dff..f964c642 100644 --- a/src/vitastor_c.h +++ b/src/vitastor_c.h @@ -19,7 +19,8 @@ extern "C" { struct vitastor_c; typedef struct vitastor_c vitastor_c; -typedef void VitastorIOHandler(long retval, void *opaque); +typedef void VitastorReadHandler(void *opaque, long retval, uint64_t version); +typedef void VitastorIOHandler(void *opaque, long retval); // QEMU typedef void IOHandler(void *opaque); @@ -37,8 +38,8 @@ void vitastor_c_uring_wait_ready(vitastor_c *client); void vitastor_c_uring_handle_events(vitastor_c *client); void vitastor_c_uring_wait_events(vitastor_c *client); void vitastor_c_read(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, - struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque); -void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, + struct iovec *iov, int iovcnt, VitastorReadHandler cb, void *opaque); +void vitastor_c_write(vitastor_c *client, uint64_t inode, uint64_t offset, uint64_t len, uint64_t check_version, struct iovec *iov, int iovcnt, VitastorIOHandler cb, void *opaque); void vitastor_c_sync(vitastor_c *client, VitastorIOHandler cb, void *opaque); void vitastor_c_watch_inode(vitastor_c *client, char *image, VitastorIOHandler cb, void *opaque);