From 2be4824a7a36525bdd1c792c050a33e53dfd32e1 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 28 Feb 2020 01:46:39 +0300 Subject: [PATCH] Fix a small memory leak and BS_OP_SYNC mishandling, now fio does not hang during primary-osd test --- Makefile | 1 + blockstore_flush.cpp | 2 +- blockstore_impl.cpp | 2 +- blockstore_sync.cpp | 5 ++++- fio_sec_osd.cpp | 25 ++++++++++++++++++++++++- osd.cpp | 17 ++++++++++++----- osd.h | 2 +- osd_primary.cpp | 35 +++++++++++++++++++++++++---------- osd_secondary.cpp | 32 ++++++++++++++++++++++---------- 9 files changed, 91 insertions(+), 30 deletions(-) diff --git a/Makefile b/Makefile index fae435d11..6e8223508 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ BLOCKSTORE_OBJS := allocator.o blockstore.o blockstore_impl.o blockstore_init.o blockstore_open.o blockstore_journal.o blockstore_read.o \ blockstore_write.o blockstore_sync.o blockstore_stable.o blockstore_rollback.o blockstore_flush.o crc32c.o ringloop.o timerfd_interval.o +# -fsanitize=address CXXFLAGS := -g -O3 -Wall -Wno-sign-compare -Wno-comment -Wno-parentheses -Wno-pointer-arith -fPIC -fdiagnostics-color=always all: $(BLOCKSTORE_OBJS) libfio_blockstore.so osd libfio_sec_osd.so test_blockstore stub_osd osd_test clean: diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 1a76789ea..1d53158ed 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -543,6 +543,7 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_ // We must check if the same sector is already in memory if we don't keep all metadata in memory all the time. // And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot, // so I'll avoid it as long as I can. + wr.submitted = false; wr.sector = ((meta_loc >> bs->block_order) / (bs->meta_block_size / bs->clean_entry_size)) * bs->meta_block_size; wr.pos = ((meta_loc >> bs->block_order) % (bs->meta_block_size / bs->clean_entry_size)); if (bs->inmemory_meta) @@ -573,7 +574,6 @@ bool journal_flusher_co::modify_meta_read(uint64_t meta_loc, flusher_meta_write_ } else { - wr.submitted = false; wr.buf = wr.it->second.buf; wr.it->second.usage_count++; } diff --git a/blockstore_impl.cpp b/blockstore_impl.cpp index 6af26a8c5..7b7a43e5d 100644 --- a/blockstore_impl.cpp +++ b/blockstore_impl.cpp @@ -420,7 +420,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) // Allocate memory op->version = stable_count; op->retval = total_count; - op->buf = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id) * total_count); + op->buf = malloc(sizeof(obj_ver_id) * total_count); if (!op->buf) { op->retval = -ENOMEM; diff --git a/blockstore_sync.cpp b/blockstore_sync.cpp index 5f781ca89..2b30b899e 100644 --- a/blockstore_sync.cpp +++ b/blockstore_sync.cpp @@ -171,9 +171,12 @@ int blockstore_impl_t::continue_sync(blockstore_op_t *op) else { PRIV(op)->sync_state = SYNC_DONE; - ack_sync(op); } } + if (PRIV(op)->sync_state == SYNC_DONE) + { + ack_sync(op); + } return 1; } diff --git a/fio_sec_osd.cpp b/fio_sec_osd.cpp index d32bb5c3a..45404dd50 100644 --- a/fio_sec_osd.cpp +++ b/fio_sec_osd.cpp @@ -50,7 +50,8 @@ struct sec_options int __pad; char *host = NULL; int port = 0; - bool single_primary = false; + int single_primary = 0; + int trace = 0; }; static struct fio_option options[] = { @@ -82,6 +83,16 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_FILENAME, }, + { + .name = "osd_trace", + .lname = "OSD trace", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct sec_options, trace), + .help = "Trace OSD operations", + .def = "0", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_FILENAME, + }, { .name = NULL, }, @@ -239,6 +250,12 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) return FIO_Q_COMPLETED; } + if (opt->trace) + { + printf("+++ %s # %d\n", io->ddir == DDIR_READ ? "READ" : + (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), n); + } + io->error = 0; bsd->inflight++; bsd->op_n++; @@ -262,6 +279,7 @@ static enum fio_q_status sec_queue(struct thread_data *td, struct io_u *io) static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int max, const struct timespec *t) { + sec_options *opt = (sec_options*)td->eo; sec_data *bsd = (sec_data*)td->io_ops_data; // FIXME timeout, at least poll. Now it's the stupidest implementation possible osd_any_reply_t reply; @@ -305,6 +323,11 @@ static int sec_getevents(struct thread_data *td, unsigned int min, unsigned int exit(1); } } + if (opt->trace) + { + printf("--- %s # %ld\n", io->ddir == DDIR_READ ? "READ" : + (io->ddir == DDIR_WRITE ? "WRITE" : "SYNC"), reply.hdr.id); + } bsd->completed.push_back(io); } return bsd->completed.size(); diff --git a/osd.cpp b/osd.cpp index 6062eed83..4ccc39c99 100644 --- a/osd.cpp +++ b/osd.cpp @@ -249,11 +249,18 @@ void osd_t::cancel_osd_ops(osd_client_t & cl) void osd_t::cancel_op(osd_op_t *op) { - op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - op->reply.hdr.id = op->req.hdr.id; - op->reply.hdr.opcode = op->req.hdr.opcode; - op->reply.hdr.retval = -EPIPE; - op->callback(op); + if (op->op_type == OSD_OP_OUT) + { + op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + op->reply.hdr.id = op->req.hdr.id; + op->reply.hdr.opcode = op->req.hdr.opcode; + op->reply.hdr.retval = -EPIPE; + op->callback(op); + } + else + { + delete op; + } } void osd_t::stop_client(int peer_fd) diff --git a/osd.h b/osd.h index 2f296c130..5ca62e696 100644 --- a/osd.h +++ b/osd.h @@ -96,7 +96,7 @@ struct osd_primary_op_data_t; struct osd_op_t { - int op_type; + int op_type = OSD_OP_IN; int peer_fd; osd_any_op_t req; osd_any_reply_t reply; diff --git a/osd_primary.cpp b/osd_primary.cpp index 511444894..2d1c306ce 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -38,12 +38,20 @@ struct osd_primary_op_data_t void osd_t::finish_primary_op(osd_op_t *cur_op, int retval) { - // FIXME add separate magics - cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; - cur_op->reply.hdr.id = cur_op->req.hdr.id; - cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; - cur_op->reply.hdr.retval = retval; - outbox_push(this->clients[cur_op->peer_fd], cur_op); + // FIXME add separate magic number + auto cl_it = clients.find(cur_op->peer_fd); + if (cl_it != clients.end()) + { + cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; + cur_op->reply.hdr.id = cur_op->req.hdr.id; + cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; + cur_op->reply.hdr.retval = retval; + outbox_push(cl_it->second, cur_op); + } + else + { + delete cur_op; + } } bool osd_t::prepare_primary_rw(osd_op_t *cur_op) @@ -272,6 +280,7 @@ void osd_t::handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version) op_data->fact_ver = version; if (!ok) { + // FIXME: Handle errors op_data->errors++; } else @@ -295,6 +304,10 @@ void osd_t::handle_primary_subop(osd_op_t *cur_op, int ok, uint64_t version) { continue_primary_sync(cur_op); } + else + { + throw std::runtime_error("BUG: unknown opcode"); + } } } @@ -312,6 +325,7 @@ void osd_t::continue_primary_write(osd_op_t *cur_op) else if (op_data->st == 3) goto resume_3; else if (op_data->st == 4) goto resume_4; else if (op_data->st == 5) goto resume_5; + assert(op_data->st == 0); // Check if actions are pending for this object { auto act_it = pg.obj_stab_actions.lower_bound((obj_piece_id_t){ @@ -408,6 +422,7 @@ void osd_t::continue_primary_sync(osd_op_t *cur_op) else if (cur_op->op_data->st == 4) goto resume_4; else if (cur_op->op_data->st == 5) goto resume_5; else if (cur_op->op_data->st == 6) goto resume_6; + assert(cur_op->op_data->st == 0); if (syncs_in_progress.size() > 0) { // Wait for previous syncs, if any @@ -481,7 +496,7 @@ resume_5: resume_6: // FIXME: Free them correctly (via a destructor or so) delete cur_op->op_data->unstable_write_osds; - delete cur_op->op_data->unstable_writes; + delete[] cur_op->op_data->unstable_writes; cur_op->op_data->unstable_writes = NULL; cur_op->op_data->unstable_write_osds = NULL; finish: @@ -490,9 +505,9 @@ finish: finish_primary_op(cur_op, 0); if (syncs_in_progress.size() > 0) { - osd_op_t *next_op = syncs_in_progress.front(); - next_op->op_data->st++; - continue_primary_sync(next_op); + cur_op = syncs_in_progress.front(); + cur_op->op_data->st++; + goto resume_2; } } diff --git a/osd_secondary.cpp b/osd_secondary.cpp index 98d88078d..e3b174680 100644 --- a/osd_secondary.cpp +++ b/osd_secondary.cpp @@ -12,11 +12,7 @@ void osd_t::secondary_op_callback(osd_op_t *op) op->reply.hdr.id = op->req.hdr.id; op->reply.hdr.opcode = op->req.hdr.opcode; op->reply.hdr.retval = op->bs_op->retval; - if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST) - { - op->reply.sec_list.stable_count = op->bs_op->version; - } - else if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ || + if (op->req.hdr.opcode == OSD_OP_SECONDARY_READ || op->req.hdr.opcode == OSD_OP_SECONDARY_WRITE) { op->reply.sec_rw.version = op->bs_op->version; @@ -30,11 +26,15 @@ void osd_t::secondary_op_callback(osd_op_t *op) { op->send_list.push_back(op->buf, op->reply.hdr.retval); } - else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST && - op->reply.hdr.retval > 0) + else if (op->req.hdr.opcode == OSD_OP_SECONDARY_LIST) { - op->buf = op->bs_op->buf; // allocated by blockstore - op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id)); + // allocated by blockstore + op->buf = op->bs_op->buf; + if (op->reply.hdr.retval > 0) + { + op->send_list.push_back(op->buf, op->reply.hdr.retval * sizeof(obj_ver_id)); + } + op->reply.sec_list.stable_count = op->bs_op->version; } auto & cl = cl_it->second; outbox_push(cl, op); @@ -65,17 +65,26 @@ void osd_t::exec_secondary(osd_op_t *cur_op) cur_op->bs_op->offset = cur_op->req.sec_rw.offset; cur_op->bs_op->len = cur_op->req.sec_rw.len; cur_op->bs_op->buf = cur_op->buf; +#ifdef OSD_STUB + cur_op->bs_op->retval = cur_op->bs_op->len; +#endif } else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_DELETE) { cur_op->bs_op->oid = cur_op->req.sec_del.oid; cur_op->bs_op->version = cur_op->req.sec_del.version; +#ifdef OSD_STUB + cur_op->bs_op->retval = 0; +#endif } else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_STABILIZE || cur_op->req.hdr.opcode == OSD_OP_SECONDARY_ROLLBACK) { cur_op->bs_op->len = cur_op->req.sec_stab.len/sizeof(obj_ver_id); cur_op->bs_op->buf = cur_op->buf; +#ifdef OSD_STUB + cur_op->bs_op->retval = 0; +#endif } else if (cur_op->req.hdr.opcode == OSD_OP_SECONDARY_LIST) { @@ -89,9 +98,12 @@ void osd_t::exec_secondary(osd_op_t *cur_op) cur_op->bs_op->oid.stripe = cur_op->req.sec_list.parity_block_size; cur_op->bs_op->len = cur_op->req.sec_list.pg_count; cur_op->bs_op->offset = cur_op->req.sec_list.list_pg - 1; +#ifdef OSD_STUB + cur_op->bs_op->retval = 0; + cur_op->bs_op->buf = NULL; +#endif } #ifdef OSD_STUB - cur_op->bs_op->retval = cur_op->bs_op->len; secondary_op_callback(cur_op); #else bs->enqueue_op(cur_op->bs_op);