Browse Source

Fix op validation, fix journal sector usage tracking

Now it runs for a while with fio but then crashes with ENOSPC because
of an apparent bug in the allocator
blocking-uring-test
Vitaliy Filippov 3 years ago
parent
commit
b8e53f8c67
  1. 2
      blockstore.cpp
  2. 13
      blockstore_stable.cpp
  3. 6
      blockstore_sync.cpp
  4. 6
      blockstore_write.cpp
  5. 84
      fio_engine.cpp

2
blockstore.cpp

@ -256,7 +256,7 @@ void blockstore::enqueue_op(blockstore_operation *op)
{
int type = op->flags & OP_TYPE_MASK;
if (type < OP_READ || type > OP_DELETE || (type == OP_READ || type == OP_WRITE) &&
(op->offset >= block_size || op->len >= block_size-op->offset || (op->len % DISK_ALIGNMENT)))
(op->offset >= block_size || op->len > block_size-op->offset || (op->len % DISK_ALIGNMENT)))
{
// Basic verification not passed
op->retval = -EINVAL;

13
blockstore_stable.cpp

@ -122,6 +122,19 @@ void blockstore::handle_stable_event(ring_data_t *data, blockstore_operation *op
op->pending_ops--;
if (op->pending_ops == 0)
{
// Release used journal sectors
if (op->min_used_journal_sector > 0)
{
uint64_t s = op->min_used_journal_sector;
while (1)
{
journal.sector_info[s-1].usage_count--;
if (s == op->max_used_journal_sector)
break;
s = (s + 1) % journal.sector_count;
}
op->min_used_journal_sector = op->max_used_journal_sector = 0;
}
// First step: mark dirty_db entries as stable, acknowledge op completion
obj_ver_id* v;
int i;

6
blockstore_sync.cpp

@ -129,9 +129,13 @@ void blockstore::handle_sync_event(ring_data_t *data, blockstore_operation *op)
// Release used journal sectors
if (op->min_used_journal_sector > 0)
{
for (uint64_t s = op->min_used_journal_sector; s != op->max_used_journal_sector; s = (s + 1) % journal.sector_count)
uint64_t s = op->min_used_journal_sector;
while (1)
{
journal.sector_info[s-1].usage_count--;
if (s == op->max_used_journal_sector)
break;
s = (s + 1) % journal.sector_count;
}
op->min_used_journal_sector = op->max_used_journal_sector = 0;
}

6
blockstore_write.cpp

@ -173,9 +173,13 @@ void blockstore::handle_write_event(ring_data_t *data, blockstore_operation *op)
// Release used journal sectors
if (op->min_used_journal_sector > 0)
{
for (uint64_t s = op->min_used_journal_sector; s <= op->max_used_journal_sector; s++)
uint64_t s = op->min_used_journal_sector;
while (1)
{
journal.sector_info[s-1].usage_count--;
if (s == op->max_used_journal_sector)
break;
s = (s + 1) % journal.sector_count;
}
op->min_used_journal_sector = op->max_used_journal_sector = 0;
}

84
fio_engine.cpp

@ -7,12 +7,15 @@ extern "C" {
#include "fio/optgroup.h"
}
static const int DEBUG = 0;
struct bs_data
{
blockstore *bs;
ring_loop_t *ringloop;
/* The list of completed io_u structs. */
std::vector<io_u*> completed;
int op_n = 0;
};
struct bs_options
@ -101,59 +104,65 @@ static int bs_init(struct thread_data *td)
}
/* Begin read or write request. */
static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u)
static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io)
{
bs_data *bsd = (bs_data*)td->io_ops_data;
int n = bsd->op_n;
fio_ro_check(td, io_u);
fio_ro_check(td, io);
io_u->engine_data = bsd;
io->engine_data = bsd;
if (io_u->ddir == DDIR_WRITE || io_u->ddir == DDIR_READ)
assert(io_u->xfer_buflen <= bsd->bs->block_size);
if (io->ddir == DDIR_WRITE || io->ddir == DDIR_READ)
assert(io->xfer_buflen <= bsd->bs->block_size);
blockstore_operation *op = new blockstore_operation;
op->callback = NULL;
switch (io_u->ddir)
switch (io->ddir)
{
case DDIR_READ:
op->flags = OP_READ;
op->buf = io_u->xfer_buf;
op->buf = io->xfer_buf;
op->oid = {
.inode = 1,
.stripe = io_u->offset >> bsd->bs->block_order,
.stripe = io->offset >> bsd->bs->block_order,
};
op->offset = io_u->offset % bsd->bs->block_size;
op->len = io_u->xfer_buflen;
op->callback = [io_u](blockstore_operation *op)
op->offset = io->offset % bsd->bs->block_size;
op->len = io->xfer_buflen;
op->callback = [io](blockstore_operation *op)
{
bs_data *bsd = (bs_data*)io_u->engine_data;
bsd->completed.push_back(io_u);
io->error = op->retval < 0 ? -op->retval : 0;
bs_data *bsd = (bs_data*)io->engine_data;
bsd->completed.push_back(io);
delete op;
};
break;
case DDIR_WRITE:
op->flags = OP_WRITE;
op->buf = io_u->xfer_buf;
op->buf = io->xfer_buf;
op->oid = {
.inode = 1,
.stripe = io_u->offset >> bsd->bs->block_order,
.stripe = io->offset >> bsd->bs->block_order,
};
op->offset = io_u->offset % bsd->bs->block_size;
op->len = io_u->xfer_buflen;
op->callback = [io_u](blockstore_operation *op)
op->offset = io->offset % bsd->bs->block_size;
op->len = io->xfer_buflen;
op->callback = [io, n](blockstore_operation *op)
{
bs_data *bsd = (bs_data*)io_u->engine_data;
bsd->completed.push_back(io_u);
io->error = op->retval < 0 ? -op->retval : 0;
bs_data *bsd = (bs_data*)io->engine_data;
bsd->completed.push_back(io);
if (DEBUG)
printf("--- OP_WRITE %llx n=%d retval=%d\n", io, n, op->retval);
delete op;
};
break;
case DDIR_SYNC:
op->flags = OP_SYNC;
op->callback = [io_u](blockstore_operation *op)
op->callback = [io, n](blockstore_operation *op)
{
bs_data *bsd = (bs_data*)io_u->engine_data;
if (bsd->bs->unstable_writes.size() > 0)
bs_data *bsd = (bs_data*)io->engine_data;
if (op->retval >= 0 && bsd->bs->unstable_writes.size() > 0)
{
op->flags = OP_STABLE;
op->len = bsd->bs->unstable_writes.size();
@ -168,30 +177,41 @@ static enum fio_q_status bs_queue(struct thread_data *td, struct io_u *io_u)
};
}
bsd->bs->enqueue_op(op);
op->callback = [io_u](blockstore_operation *op)
op->callback = [io, n](blockstore_operation *op)
{
bs_data *bsd = (bs_data*)io_u->engine_data;
bsd->completed.push_back(io_u);
io->error = op->retval < 0 ? -op->retval : 0;
bs_data *bsd = (bs_data*)io->engine_data;
bsd->completed.push_back(io);
obj_ver_id *vers = (obj_ver_id*)op->buf;
delete[] vers;
if (DEBUG)
printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
delete op;
};
}
else
{
bsd->completed.push_back(io_u);
io->error = op->retval < 0 ? -op->retval : 0;
bsd->completed.push_back(io);
if (DEBUG)
printf("--- OP_SYNC %llx n=%d retval=%d\n", io, n, op->retval);
delete op;
}
};
break;
default:
io_u->error = EINVAL;
io->error = EINVAL;
return FIO_Q_COMPLETED;
}
if (DEBUG)
printf("+++ %s %llx\n", op->flags == OP_WRITE ? "OP_WRITE" : "OP_SYNC", io);
io->error = 0;
bsd->bs->enqueue_op(op);
bsd->op_n++;
io_u->error = 0;
if (io->error != 0)
return FIO_Q_COMPLETED;
return FIO_Q_QUEUED;
}
@ -220,13 +240,13 @@ static struct io_u *bs_event(struct thread_data *td, int event)
return ev;
}
static int bs_io_u_init(struct thread_data *td, struct io_u *io_u)
static int bs_io_u_init(struct thread_data *td, struct io_u *io)
{
io_u->engine_data = NULL;
io->engine_data = NULL;
return 0;
}
static void bs_io_u_free(struct thread_data *td, struct io_u *io_u)
static void bs_io_u_free(struct thread_data *td, struct io_u *io)
{
}

Loading…
Cancel
Save