Continue fio engine

blocking-uring-test
Vitaliy Filippov 2019-11-25 22:35:44 +03:00
parent b67406e764
commit 3e46728321
3 changed files with 88 additions and 100 deletions

View File

@ -85,6 +85,12 @@ int blockstore::dequeue_stable(blockstore_operation *op)
int s = 0, cur_sector = -1; int s = 0, cur_sector = -1;
for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++) for (i = 0, v = (obj_ver_id*)op->buf; i < op->len; i++, v++)
{ {
auto unstab_it = unstable_writes.find(v->oid);
if (unstab_it != unstable_writes.end() &&
unstab_it->second <= v->version)
{
unstable_writes.erase(unstab_it);
}
journal_entry_stable *je = (journal_entry_stable*) journal_entry_stable *je = (journal_entry_stable*)
prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable)); prefill_single_journal_entry(journal, JE_STABLE, sizeof(journal_entry_stable));
je->oid = v->oid; je->oid = v->oid;

View File

@ -236,6 +236,9 @@ static void crc32c_init_hw(void)
/* Compute CRC-32C using the Intel hardware instruction. */ /* Compute CRC-32C using the Intel hardware instruction. */
static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len) static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len)
{ {
#ifndef __x86_64__
return 0;
#else
const unsigned char *next = (const unsigned char*)buf; const unsigned char *next = (const unsigned char*)buf;
const unsigned char *end; const unsigned char *end;
uint64_t crc0, crc1, crc2; /* need to be 64 bits for crc32q */ uint64_t crc0, crc1, crc2; /* need to be 64 bits for crc32q */
@ -338,6 +341,7 @@ static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len)
/* return a post-processed crc */ /* return a post-processed crc */
return (uint32_t)crc0 ^ 0xffffffff; return (uint32_t)crc0 ^ 0xffffffff;
#endif
} }
/* Check for SSE 4.2. SSE 4.2 was first supported in Nehalem processors /* Check for SSE 4.2. SSE 4.2 was first supported in Nehalem processors
@ -360,8 +364,11 @@ static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len)
version. Otherwise, use the software version. */ version. Otherwise, use the software version. */
uint32_t crc32c(uint32_t crc, const void *buf, size_t len) uint32_t crc32c(uint32_t crc, const void *buf, size_t len)
{ {
#ifndef __x86_64__
return crc32c_sw(crc, buf, len);
#else
int sse42; int sse42;
SSE42(sse42); SSE42(sse42);
return sse42 ? crc32c_hw(crc, buf, len) : crc32c_sw(crc, buf, len); return sse42 ? crc32c_hw(crc, buf, len) : crc32c_sw(crc, buf, len);
#endif
} }

View File

@ -7,10 +7,8 @@ struct bs_data
{ {
blockstore *bs; blockstore *bs;
ring_loop_t *ringloop; ring_loop_t *ringloop;
/* The list of completed io_u structs. */ /* The list of completed io_u structs. */
struct io_u **completed; std::vector<io_u*> completed;
size_t nr_completed;
}; };
struct bs_options struct bs_options
@ -95,48 +93,10 @@ static int bs_init(struct thread_data *td)
return 0; return 0;
} }
/* A command in flight has been completed. */
static int cmd_completed(void *vp, int *error)
{
struct io_u *io_u;
struct bs_data *bs_data;
struct io_u **completed;
io_u = vp;
bs_data = io_u->engine_data;
if (bs_data->debug)
log_info("fio: nbd: command completed\n");
if (*error != 0)
io_u->error = *error;
else
io_u->error = 0;
/* Add this completion to the list so it can be picked up
* later by ->event.
*/
completed = realloc(bs_data->completed,
sizeof(struct io_u *) *
(bs_data->nr_completed+1));
if (completed == NULL) {
io_u->error = errno;
return 0;
}
bs_data->completed = completed;
bs_data->completed[bs_data->nr_completed] = io_u;
bs_data->nr_completed++;
return 0;
}
/* Begin read or write request. */ /* Begin read or write request. */
static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u) static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u)
{ {
struct bs_data *bsd = td->io_ops_data; struct bs_data *bsd = td->io_ops_data;
bs_completion_callback completion = { .callback = cmd_completed, .user_data = io_u };
int r;
fio_ro_check(td, io_u); fio_ro_check(td, io_u);
@ -145,88 +105,103 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u)
if (io_u->ddir == DDIR_WRITE || io_u->ddir == DDIR_READ) if (io_u->ddir == DDIR_WRITE || io_u->ddir == DDIR_READ)
assert(io_u->xfer_buflen <= bsd->block_size); assert(io_u->xfer_buflen <= bsd->block_size);
switch (io_u->ddir) { blockstore_operation *op = new blockstore_operation;
switch (io_u->ddir)
{
case DDIR_READ: case DDIR_READ:
r = bs_aio_pread(bs_data->nbd, op->flags = OP_READ;
io_u->xfer_buf, io_u->xfer_buflen, op->buf = io_u->xfer_buf;
io_u->offset, completion, 0); op->oid = {
.inode = 1,
.stripe = io_u->offset >> bsd->block_order,
};
op->offset = io_u->offset % bsd->block_size;
op->len = io_u->xfer_buflen;
op->callback = [&](blockstore_operation *op)
{
bsd->completed.push_back(io_u);
delete op;
};
break; break;
case DDIR_WRITE: case DDIR_WRITE:
r = bs_aio_pwrite(bs_data->nbd, op->flags = OP_WRITE;
io_u->xfer_buf, io_u->xfer_buflen, op->buf = io_u->xfer_buf;
io_u->offset, completion, 0); op->oid = {
break; .inode = 1,
case DDIR_TRIM: .stripe = io_u->offset >> bsd->block_order,
r = bs_aio_trim(bs_data->nbd, io_u->xfer_buflen, };
io_u->offset, completion, 0); op->offset = io_u->offset % bsd->block_size;
op->len = io_u->xfer_buflen;
op->callback = [&](blockstore_operation *op)
{
bsd->completed.push_back(io_u);
delete op;
};
break; break;
case DDIR_SYNC: case DDIR_SYNC:
/* XXX We could probably also handle op->flags = OP_SYNC;
* DDIR_SYNC_FILE_RANGE with a bit of effort. op->callback = [&](blockstore_operation *op)
*/ {
r = bs_aio_flush(bs_data->nbd, completion, 0); if (bsd->bs->unstable_writes.size() > 0)
{
op->flags = OP_STABLE;
op->len = bsd->bs->unstable_writes.size();
op->buf = new obj_ver_id[op->len];
int i = 0;
for (auto it = bsd->bs->unstable_writes.begin(); it != bsd->bs->unstable_writes.end(); it++, i++)
{
op->buf[i] = {
.oid = it->first,
.version = it->second,
};
}
bsd->bs->enqueue_op(op);
op->callback = [&](blockstore_operation *op)
{
bsd->completed.push_back(io_u);
delete[] op->buf;
delete op;
};
}
else
{
bsd->completed.push_back(io_u);
delete op;
}
};
break; break;
default: default:
io_u->error = EINVAL; io_u->error = EINVAL;
return FIO_Q_COMPLETED; return FIO_Q_COMPLETED;
} }
if (r == -1) { bsd->bs->enqueue_op(op);
/* errno is optional information on libnbd error path;
* if it's 0, set it to a default value
*/
io_u->error = bs_get_errno();
if (io_u->error == 0)
io_u->error = EIO;
return FIO_Q_COMPLETED;
}
if (bs_data->debug)
log_info("fio: nbd: command issued\n");
io_u->error = 0; io_u->error = 0;
return FIO_Q_QUEUED; return FIO_Q_QUEUED;
} }
static int bs_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) static int bs_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
{ {
struct bs_data *bs_data = td->io_ops_data; struct bs_data *bsd = td->io_ops_data;
int r; // FIXME timeout
unsigned events = 0; while (bsd->completed.size() < min)
int timeout; {
bsd->ringloop->loop();
/* XXX This handling of timeout is wrong because it will wait
* for up to loop iterations * timeout.
*/
timeout = !t ? -1 : t->tv_sec * 1000 + t->tv_nsec / 1000000;
while (events < min) {
r = bs_poll(bs_data->nbd, timeout);
if (r == -1) {
/* error in poll */
log_err("fio: bs_poll: %s\n", bs_get_error());
return -1;
}
else {
/* poll made progress */
events += retire_commands(bs_data->nbd);
}
} }
return bsd->completed.size();
return events;
} }
static struct io_u *bs_event(struct thread_data *td, int event) static struct io_u *bs_event(struct thread_data *td, int event)
{ {
struct bs_data *bs_data = td->io_ops_data; struct bs_data *bsd = td->io_ops_data;
if (bsd->completed.size() == 0)
if (bs_data->nr_completed == 0)
return NULL; return NULL;
/* FIXME We ignore the event number and assume fio calls us exactly once for [0..nr_events-1] */
/* XXX We ignore the event number and assume fio calls us struct io_u *ev = bsd->completed.back();
* exactly once for [0..nr_events-1]. bsd->completed.pop_back();
*/ return ev;
bs_data->nr_completed--;
return bs_data->completed[bs_data->nr_completed];
} }
static int bs_io_u_init(struct thread_data *td, struct io_u *io_u) static int bs_io_u_init(struct thread_data *td, struct io_u *io_u)