From 3e46728321a29b29f331833933c0eca157104007 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 25 Nov 2019 22:35:44 +0300 Subject: [PATCH] Continue fio engine --- blockstore_stable.cpp | 6 ++ crc32c.c | 9 ++- fio_engine.cpp | 173 ++++++++++++++++++------------------------ 3 files changed, 88 insertions(+), 100 deletions(-) diff --git a/blockstore_stable.cpp b/blockstore_stable.cpp index 971778e3..2c2023c3 100644 --- a/blockstore_stable.cpp +++ b/blockstore_stable.cpp @@ -85,6 +85,12 @@ int blockstore::dequeue_stable(blockstore_operation *op) int s = 0, cur_sector = -1; for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) { + auto unstab_it = unstable_writes.find(v->oid); + if (unstab_it != unstable_writes.end() && + unstab_it->second <= v->version) + { + unstable_writes.erase(unstab_it); + } journal_entry_stable *je = (journal_entry_stable*) prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); je->oid = v->oid; diff --git a/crc32c.c b/crc32c.c index d0029ca4..deba6f9c 100644 --- a/crc32c.c +++ b/crc32c.c @@ -236,6 +236,9 @@ static void crc32c_init_hw(void) /* Compute CRC-32C using the Intel hardware instruction. */ static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len) { +#ifndef __x86_64__ + return 0; +#else const unsigned char *next = (const unsigned char*)buf; const unsigned char *end; uint64_t crc0, crc1, crc2; /* need to be 64 bits for crc32q */ @@ -338,6 +341,7 @@ static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len) /* return a post-processed crc */ return (uint32_t)crc0 ^ 0xffffffff; +#endif } /* Check for SSE 4.2. SSE 4.2 was first supported in Nehalem processors @@ -360,8 +364,11 @@ static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len) version. Otherwise, use the software version. */ uint32_t crc32c(uint32_t crc, const void *buf, size_t len) { +#ifndef __x86_64__ + return crc32c_sw(crc, buf, len); +#else int sse42; - SSE42(sse42); return sse42 ? crc32c_hw(crc, buf, len) : crc32c_sw(crc, buf, len); +#endif } diff --git a/fio_engine.cpp b/fio_engine.cpp index a6d7285f..0c1d93ed 100644 --- a/fio_engine.cpp +++ b/fio_engine.cpp @@ -7,10 +7,8 @@ struct bs_data { blockstore *bs; ring_loop_t *ringloop; - /* The list of completed io_u structs. */ - struct io_u **completed; - size_t nr_completed; + std::vector completed; }; struct bs_options @@ -95,48 +93,10 @@ static int bs_init(struct thread_data *td) return 0; } -/* A command in flight has been completed. */ -static int cmd_completed(void *vp, int *error) -{ - struct io_u *io_u; - struct bs_data *bs_data; - struct io_u **completed; - - io_u = vp; - bs_data = io_u->engine_data; - - if (bs_data->debug) - log_info("fio: nbd: command completed\n"); - - if (*error != 0) - io_u->error = *error; - else - io_u->error = 0; - - /* Add this completion to the list so it can be picked up - * later by ->event. - */ - completed = realloc(bs_data->completed, - sizeof(struct io_u *) * - (bs_data->nr_completed+1)); - if (completed == NULL) { - io_u->error = errno; - return 0; - } - - bs_data->completed = completed; - bs_data->completed[bs_data->nr_completed] = io_u; - bs_data->nr_completed++; - - return 0; -} - /* Begin read or write request. */ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u) { struct bs_data *bsd = td->io_ops_data; - bs_completion_callback completion = { .callback = cmd_completed, .user_data = io_u }; - int r; fio_ro_check(td, io_u); @@ -145,88 +105,103 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u) if (io_u->ddir == DDIR_WRITE || io_u->ddir == DDIR_READ) assert(io_u->xfer_buflen <= bsd->block_size); - switch (io_u->ddir) { + blockstore_operation *op = new blockstore_operation; + + switch (io_u->ddir) + { case DDIR_READ: - r = bs_aio_pread(bs_data->nbd, - io_u->xfer_buf, io_u->xfer_buflen, - io_u->offset, completion, 0); + op->flags = OP_READ; + op->buf = io_u->xfer_buf; + op->oid = { + .inode = 1, + .stripe = io_u->offset >> bsd->block_order, + }; + op->offset = io_u->offset % bsd->block_size; + op->len = io_u->xfer_buflen; + op->callback = [&](blockstore_operation *op) + { + bsd->completed.push_back(io_u); + delete op; + }; break; case DDIR_WRITE: - r = bs_aio_pwrite(bs_data->nbd, - io_u->xfer_buf, io_u->xfer_buflen, - io_u->offset, completion, 0); - break; - case DDIR_TRIM: - r = bs_aio_trim(bs_data->nbd, io_u->xfer_buflen, - io_u->offset, completion, 0); + op->flags = OP_WRITE; + op->buf = io_u->xfer_buf; + op->oid = { + .inode = 1, + .stripe = io_u->offset >> bsd->block_order, + }; + op->offset = io_u->offset % bsd->block_size; + op->len = io_u->xfer_buflen; + op->callback = [&](blockstore_operation *op) + { + bsd->completed.push_back(io_u); + delete op; + }; break; case DDIR_SYNC: - /* XXX We could probably also handle - * DDIR_SYNC_FILE_RANGE with a bit of effort. - */ - r = bs_aio_flush(bs_data->nbd, completion, 0); + op->flags = OP_SYNC; + op->callback = [&](blockstore_operation *op) + { + if (bsd->bs->unstable_writes.size() > 0) + { + op->flags = OP_STABLE; + op->len = bsd->bs->unstable_writes.size(); + op->buf = new obj_ver_id[op->len]; + int i = 0; + for (auto it = bsd->bs->unstable_writes.begin(); it != bsd->bs->unstable_writes.end(); it++, i++) + { + op->buf[i] = { + .oid = it->first, + .version = it->second, + }; + } + bsd->bs->enqueue_op(op); + op->callback = [&](blockstore_operation *op) + { + bsd->completed.push_back(io_u); + delete[] op->buf; + delete op; + }; + } + else + { + bsd->completed.push_back(io_u); + delete op; + } + }; break; default: io_u->error = EINVAL; return FIO_Q_COMPLETED; } - if (r == -1) { - /* errno is optional information on libnbd error path; - * if it's 0, set it to a default value - */ - io_u->error = bs_get_errno(); - if (io_u->error == 0) - io_u->error = EIO; - return FIO_Q_COMPLETED; - } + bsd->bs->enqueue_op(op); - if (bs_data->debug) - log_info("fio: nbd: command issued\n"); io_u->error = 0; return FIO_Q_QUEUED; } static int bs_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) { - struct bs_data *bs_data = td->io_ops_data; - int r; - unsigned events = 0; - int timeout; - - /* XXX This handling of timeout is wrong because it will wait - * for up to loop iterations * timeout. - */ - timeout = !t ? -1 : t->tv_sec * 1000 + t->tv_nsec / 1000000; - - while (events < min) { - r = bs_poll(bs_data->nbd, timeout); - if (r == -1) { - /* error in poll */ - log_err("fio: bs_poll: %s\n", bs_get_error()); - return -1; - } - else { - /* poll made progress */ - events += retire_commands(bs_data->nbd); - } + struct bs_data *bsd = td->io_ops_data; + // FIXME timeout + while (bsd->completed.size() < min) + { + bsd->ringloop->loop(); } - - return events; + return bsd->completed.size(); } static struct io_u *bs_event(struct thread_data *td, int event) { - struct bs_data *bs_data = td->io_ops_data; - - if (bs_data->nr_completed == 0) + struct bs_data *bsd = td->io_ops_data; + if (bsd->completed.size() == 0) return NULL; - - /* XXX We ignore the event number and assume fio calls us - * exactly once for [0..nr_events-1]. - */ - bs_data->nr_completed--; - return bs_data->completed[bs_data->nr_completed]; + /* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */ + struct io_u *ev = bsd->completed.back(); + bsd->completed.pop_back(); + return ev; } static int bs_io_u_init(struct thread_data *td, struct io_u *io_u)