Browse Source

Fix a small memory leak and BS_OP_SYNC mishandling, now fio does not hang during primary-osd test

blocking-uring-test
Vitaliy Filippov 1 year ago
parent
commit
2be4824a7a
  1. 1
      Makefile
  2. 2
      blockstore_flush.cpp
  3. 2
      blockstore_impl.cpp
  4. 5
      blockstore_sync.cpp
  5. 25
      fio_sec_osd.cpp
  6. 17
      osd.cpp
  7. 2
      osd.h
  8. 35
      osd_primary.cpp
  9. 32
      osd_secondary.cpp

1
Makefile

@ -1,5 +1,6 @@
BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \
blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o
# -fsanitize=address
CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always
all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd osd_test
clean:

2
blockstore_flush.cpp

@ -543,6 +543,7 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
// We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
// so I'll avoid it as long as I can.
wr.submitted = false;
wr.sector = ((meta_loc >> bs->block_order) / (bs->meta_block_size / bs->clean_entry_size)) * bs->meta_block_size;
wr.pos = ((meta_loc >> bs->block_order) % (bs->meta_block_size / bs->clean_entry_size));
if (bs->inmemory_meta)
@ -573,7 +574,6 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_
}
else
{
wr.submitted = false;
wr.buf = wr.it->second.buf;
wr.it->second.usage_count++;
}

2
blockstore_impl.cpp

@ -420,7 +420,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op)
// Allocate memory
op->version = stable_count;
op->retval = total_count;
op->buf = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id) * total_count);
op->buf = malloc(sizeof(obj_ver_id) * total_count);
if (!op->buf)
{
op->retval = -ENOMEM;

5
blockstore_sync.cpp

@ -171,9 +171,12 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op)
else
{
PRIV(op)->sync_state = SYNC_DONE;
ack_sync(op);
}
}
if (PRIV(op)->sync_state == SYNC_DONE)
{
ack_sync(op);
}
return 1;
}

25
fio_sec_osd.cpp

@ -50,7 +50,8 @@ struct sec_options
int __pad;
char *host = NULL;
int port = 0;
bool single_primary = false;
int single_primary = 0;
int trace = 0;
};
static struct fio_option options[] = {
@ -82,6 +83,16 @@ static struct fio_option options[] = {
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = "osd_trace",
.lname = "OSD trace",
.type = FIO_OPT_BOOL,
.off1 = offsetof(struct sec_options, trace),
.help = "Trace OSD operations",
.def = "0",
.category = FIO_OPT_C_ENGINE,
.group = FIO_OPT_G_FILENAME,
},
{
.name = NULL,
},
@ -239,6 +250,12 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
return FIO_Q_COMPLETED;
}
if (opt->trace)
{
printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" :
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n);
}
io->error = 0;
bsd->inflight++;
bsd->op_n++;
@ -262,6 +279,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io)
static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t)
{
sec_options *opt = (sec_options*)td->eo;
sec_data *bsd = (sec_data*)td->io_ops_data;
// FIXME timeout, at least poll. Now it's the stupidest implementation possible
osd_any_reply_t reply;
@ -305,6 +323,11 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int
exit(1);
}
}
if (opt->trace)
{
printf("--- %s # %ld\n", io->ddir == DDIR_READ ? "READ" :
(io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), reply.hdr.id);
}
bsd->completed.push_back(io);
}
return bsd->completed.size();

17
osd.cpp

@ -249,11 +249,18 @@ void osd_t::cancel_osd_ops(osd_client_t & cl)
void osd_t::cancel_op(osd_op_t *op)
{
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->req.hdr.id;
op->reply.hdr.opcode = op->req.hdr.opcode;
op->reply.hdr.retval = -EPIPE;
op->callback(op);
if (op->op_type == OSD_OP_OUT)
{
op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
op->reply.hdr.id = op->req.hdr.id;
op->reply.hdr.opcode = op->req.hdr.opcode;
op->reply.hdr.retval = -EPIPE;
op->callback(op);
}
else
{
delete op;
}
}
void osd_t::stop_client(int peer_fd)

2
osd.h

@ -96,7 +96,7 @@ struct osd_primary_op_data_t;
struct osd_op_t
{
int op_type;
int op_type = OSD_OP_IN;
int peer_fd;
osd_any_op_t req;
osd_any_reply_t reply;

35
osd_primary.cpp

@ -38,12 +38,20 @@ struct osd_primary_op_data_t
void osd_t::finish_primary_op(osd_op_t *cur_op, int retval)
{
// FIXME add separate magics
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
cur_op->reply.hdr.id = cur_op->req.hdr.id;
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
cur_op->reply.hdr.retval = retval;
outbox_push(this->clients[cur_op->peer_fd], cur_op);
// FIXME add separate magic number
auto cl_it = clients.find(cur_op->peer_fd);
if (cl_it != clients.end())
{
cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC;
cur_op->reply.hdr.id = cur_op->req.hdr.id;
cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode;
cur_op->reply.hdr.retval = retval;
outbox_push(cl_it->second, cur_op);
}
else
{
delete cur_op;
}
}
bool osd_t::prepare_primary_rw(osd_op_t *cur_op)
@ -272,6 +280,7 @@ void osd_t::handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version)
op_data->fact_ver = version;
if (!ok)
{
// FIXME: Handle errors
op_data->errors++;
}
else
@ -295,6 +304,10 @@ void osd_t::handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version)
{
continue_primary_sync(cur_op);
}
else
{
throw std::runtime_error("BUG: unknown opcode");
}
}
}
@ -312,6 +325,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op)
else if (op_data->st == 3) goto resume_3;
else if (op_data->st == 4) goto resume_4;
else if (op_data->st == 5) goto resume_5;
assert(op_data->st == 0);
// Check if actions are pending for this object
{
auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){
@ -408,6 +422,7 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op)
else if (cur_op->op_data->st == 4) goto resume_4;
else if (cur_op->op_data->st == 5) goto resume_5;
else if (cur_op->op_data->st == 6) goto resume_6;
assert(cur_op->op_data->st == 0);
if (syncs_in_progress.size() > 0)
{
// Wait for previous syncs, if any
@ -481,7 +496,7 @@ resume_5:
resume_6:
// FIXME: Free them correctly (via a destructor or so)
delete cur_op->op_data->unstable_write_osds;
delete cur_op->op_data->unstable_writes;
delete[] cur_op->op_data->unstable_writes;
cur_op->op_data->unstable_writes = NULL;
cur_op->op_data->unstable_write_osds = NULL;
finish:
@ -490,9 +505,9 @@ finish:
finish_primary_op(cur_op, 0);
if (syncs_in_progress.size() > 0)
{
osd_op_t *next_op = syncs_in_progress.front();
next_op->op_data->st++;
continue_primary_sync(next_op);
cur_op = syncs_in_progress.front();
cur_op->op_data->st++;
goto resume_2;
}
}

32
osd_secondary.cpp

@ -12,11 +12,7 @@ void osd_t::secondary_op_callback(osd_op_t *op)
op->reply.hdr.id = op->req.hdr.id;
op->reply.hdr.opcode = op->req.hdr.opcode;
op->reply.hdr.retval = op->bs_op->retval;
if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST)
{
op->reply.sec_list.stable_count = op->bs_op->version;
}
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ ||
if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ ||
op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE)
{
op->reply.sec_rw.version = op->bs_op->version;
@ -30,11 +26,15 @@ void osd_t::secondary_op_callback(osd_op_t *op)
{
op->send_list.push_back(op->buf, op->reply.hdr.retval);
}
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST &&
op->reply.hdr.retval > 0)
else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST)
{
op->buf = op->bs_op->buf; // allocated by blockstore
op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id));
// allocated by blockstore
op->buf = op->bs_op->buf;
if (op->reply.hdr.retval > 0)
{
op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id));
}
op->reply.sec_list.stable_count = op->bs_op->version;
}
auto & cl = cl_it->second;
outbox_push(cl, op);
@ -65,17 +65,26 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
cur_op->bs_op->offset = cur_op->req.sec_rw.offset;
cur_op->bs_op->len = cur_op->req.sec_rw.len;
cur_op->bs_op->buf = cur_op->buf;
#ifdef OSD_STUB
cur_op->bs_op->retval = cur_op->bs_op->len;
#endif
}
else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_DELETE)
{
cur_op->bs_op->oid = cur_op->req.sec_del.oid;
cur_op->bs_op->version = cur_op->req.sec_del.version;
#ifdef OSD_STUB
cur_op->bs_op->retval = 0;
#endif
}
else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_STABILIZE ||
cur_op->req.hdr.opcode == OSD_OP_SECONDARY_ROLLBACK)
{
cur_op->bs_op->len = cur_op->req.sec_stab.len/sizeof(obj_ver_id);
cur_op->bs_op->buf = cur_op->buf;
#ifdef OSD_STUB
cur_op->bs_op->retval = 0;
#endif
}
else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_LIST)
{
@ -89,9 +98,12 @@ void osd_t::exec_secondary(osd_op_t *cur_op)
cur_op->bs_op->oid.stripe = cur_op->req.sec_list.parity_block_size;
cur_op->bs_op->len = cur_op->req.sec_list.pg_count;
cur_op->bs_op->offset = cur_op->req.sec_list.list_pg - 1;
#ifdef OSD_STUB
cur_op->bs_op->retval = 0;
cur_op->bs_op->buf = NULL;
#endif
}
#ifdef OSD_STUB
cur_op->bs_op->retval = cur_op->bs_op->len;
secondary_op_callback(cur_op);
#else
bs->enqueue_op(cur_op->bs_op);

Loading…
Cancel
Save