diff --git a/src/blockstore_impl.cpp b/src/blockstore_impl.cpp index 5a4e9a9b..29c8cd29 100644 --- a/src/blockstore_impl.cpp +++ b/src/blockstore_impl.cpp @@ -583,7 +583,7 @@ void blockstore_impl_t::process_list(blockstore_op_t *op) replace_stable(dirty_it->first.oid, 0, clean_stable_count, stable_count, stable); } } - else if (IS_STABLE(dirty_it->second.state)) + else if (IS_STABLE(dirty_it->second.state) || (dirty_it->second.state & BS_ST_INSTANT)) { // First try to replace a clean stable version in the first part of the list if (!replace_stable(dirty_it->first.oid, dirty_it->first.version, 0, clean_stable_count, stable)) diff --git a/src/blockstore_write.cpp b/src/blockstore_write.cpp index ad714208..fb86a62d 100644 --- a/src/blockstore_write.cpp +++ b/src/blockstore_write.cpp @@ -115,8 +115,8 @@ bool blockstore_impl_t::enqueue_write(blockstore_op_t *op) else if (!wait_del) printf("Write %lx:%lx v%lu offset=%u len=%u\n", op->oid.inode, op->oid.stripe, op->version, op->offset, op->len); #endif - // FIXME No strict need to add it into dirty_db here, it's just left - // from the previous implementation where reads waited for writes + // No strict need to add it into dirty_db here except maybe for listings to return + // correct data when there are inflight operations in the queue uint32_t state; if (is_del) state = BS_ST_DELETE | BS_ST_IN_FLIGHT; diff --git a/src/osd.h b/src/osd.h index 37810216..bb765754 100644 --- a/src/osd.h +++ b/src/osd.h @@ -205,7 +205,6 @@ class osd_t bool check_peer_config(osd_client_t *cl, json11::Json conf); void repeer_pgs(osd_num_t osd_num); void start_pg_peering(pg_t & pg); - void submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps); void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps); void discard_list_subop(osd_op_t *list_op); bool stop_pg(pg_t & pg); diff --git a/src/osd_peering.cpp b/src/osd_peering.cpp index b51cc9bc..2167baa3 100644 --- a/src/osd_peering.cpp +++ b/src/osd_peering.cpp @@ -311,82 +311,11 @@ void osd_t::start_pg_peering(pg_t & pg) { continue; } - submit_sync_and_list_subop(peer_osd, pg.peering_state); + submit_list_subop(peer_osd, pg.peering_state); } ringloop->wakeup(); } -void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) -{ - // Sync before listing, if not readonly - if (readonly) - { - submit_list_subop(role_osd, ps); - } - else if (role_osd == this->osd_num) - { - // Self - osd_op_t *op = new osd_op_t(); - op->op_type = 0; - op->peer_fd = SELF_FD; - clock_gettime(CLOCK_REALTIME, &op->tv_begin); - op->bs_op = new blockstore_op_t(); - op->bs_op->opcode = BS_OP_SYNC; - op->bs_op->callback = [this, ps, op, role_osd](blockstore_op_t *bs_op) - { - if (bs_op->retval < 0) - { - printf("Local OP_SYNC failed: %d (%s)\n", bs_op->retval, strerror(-bs_op->retval)); - force_stop(1); - return; - } - add_bs_subop_stats(op); - delete op->bs_op; - op->bs_op = NULL; - delete op; - ps->list_ops.erase(role_osd); - submit_list_subop(role_osd, ps); - }; - ps->list_ops[role_osd] = op; - bs->enqueue_op(op->bs_op); - } - else - { - // Peer - auto & cl = msgr.clients.at(msgr.osd_peer_fds.at(role_osd)); - osd_op_t *op = new osd_op_t(); - op->op_type = OSD_OP_OUT; - op->peer_fd = cl->peer_fd; - op->req = (osd_any_op_t){ - .sec_sync = { - .header = { - .magic = SECONDARY_OSD_OP_MAGIC, - .id = msgr.next_subop_id++, - .opcode = OSD_OP_SEC_SYNC, - }, - }, - }; - op->callback = [this, ps, role_osd](osd_op_t *op) - { - if (op->reply.hdr.retval < 0) - { - // FIXME: Mark peer as failed and don't reconnect immediately after dropping the connection - printf("Failed to sync OSD %lu: %ld (%s), disconnecting peer\n", role_osd, op->reply.hdr.retval, strerror(-op->reply.hdr.retval)); - int fail_fd = op->peer_fd; - ps->list_ops.erase(role_osd); - delete op; - msgr.stop_client(fail_fd); - return; - } - delete op; - ps->list_ops.erase(role_osd); - submit_list_subop(role_osd, ps); - }; - ps->list_ops[role_osd] = op; - msgr.outbox_push(op); - } -} - void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) { if (role_osd == this->osd_num)