diff --git a/src/osd.cpp b/src/osd.cpp index 5ddb508f..356405e3 100644 --- a/src/osd.cpp +++ b/src/osd.cpp @@ -45,11 +45,11 @@ osd_t::osd_t(blockstore_config_t & config, ring_loop_t *ringloop) print_slow(); }); - c_cli.tfd = this->tfd; - c_cli.ringloop = this->ringloop; - c_cli.exec_op = [this](osd_op_t *op) { exec_op(op); }; - c_cli.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); }; - c_cli.init(); + msgr.tfd = this->tfd; + msgr.ringloop = this->ringloop; + msgr.exec_op = [this](osd_op_t *op) { exec_op(op); }; + msgr.repeer_pgs = [this](osd_num_t peer_osd) { repeer_pgs(peer_osd); }; + msgr.init(); init_cluster(); @@ -80,7 +80,7 @@ void osd_t::parse_config(blockstore_config_t & config) osd_num = strtoull(config["osd_num"].c_str(), NULL, 10); if (!osd_num) throw std::runtime_error("osd_num is required in the configuration"); - c_cli.osd_num = osd_num; + msgr.osd_num = osd_num; run_primary = config["run_primary"] != "false" && config["run_primary"] != "0" && config["run_primary"] != "no"; no_rebalance = config["no_rebalance"] == "true" || config["no_rebalance"] == "1" || config["no_rebalance"] == "yes"; no_recovery = config["no_recovery"] == "true" || config["no_recovery"] == "1" || config["no_recovery"] == "yes"; @@ -121,7 +121,7 @@ void osd_t::parse_config(blockstore_config_t & config) slow_log_interval = strtoull(config["slow_log_interval"].c_str(), NULL, 10); if (!slow_log_interval) slow_log_interval = 10; - c_cli.parse_config(json_config); + msgr.parse_config(json_config); } void osd_t::bind_socket() @@ -174,7 +174,7 @@ void osd_t::bind_socket() epmgr->set_fd_handler(listen_fd, false, [this](int fd, int events) { - c_cli.accept_connections(listen_fd); + msgr.accept_connections(listen_fd); }); } @@ -191,8 +191,8 @@ bool osd_t::shutdown() void osd_t::loop() { handle_peers(); - c_cli.read_requests(); - c_cli.send_replies(); + msgr.read_requests(); + msgr.send_replies(); ringloop->submit(); } @@ -276,7 +276,7 @@ void osd_t::exec_op(osd_op_t *cur_op) void osd_t::reset_stats() { - c_cli.stats = { 0 }; + msgr.stats = { 0 }; prev_stats = { 0 }; memset(recovery_stat_count, 0, sizeof(recovery_stat_count)); memset(recovery_stat_bytes, 0, sizeof(recovery_stat_bytes)); @@ -286,11 +286,11 @@ void osd_t::print_stats() { for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { - if (c_cli.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING) + if (msgr.stats.op_stat_count[i] != prev_stats.op_stat_count[i] && i != OSD_OP_PING) { - uint64_t avg = (c_cli.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(c_cli.stats.op_stat_count[i] - prev_stats.op_stat_count[i]); - uint64_t bw = (c_cli.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval; - if (c_cli.stats.op_stat_bytes[i] != 0) + uint64_t avg = (msgr.stats.op_stat_sum[i] - prev_stats.op_stat_sum[i])/(msgr.stats.op_stat_count[i] - prev_stats.op_stat_count[i]); + uint64_t bw = (msgr.stats.op_stat_bytes[i] - prev_stats.op_stat_bytes[i]) / print_stats_interval; + if (msgr.stats.op_stat_bytes[i] != 0) { printf( "[OSD %lu] avg latency for op %d (%s): %lu us, B/W: %.2f %s\n", osd_num, i, osd_op_names[i], avg, @@ -302,19 +302,19 @@ void osd_t::print_stats() { printf("[OSD %lu] avg latency for op %d (%s): %lu us\n", osd_num, i, osd_op_names[i], avg); } - prev_stats.op_stat_count[i] = c_cli.stats.op_stat_count[i]; - prev_stats.op_stat_sum[i] = c_cli.stats.op_stat_sum[i]; - prev_stats.op_stat_bytes[i] = c_cli.stats.op_stat_bytes[i]; + prev_stats.op_stat_count[i] = msgr.stats.op_stat_count[i]; + prev_stats.op_stat_sum[i] = msgr.stats.op_stat_sum[i]; + prev_stats.op_stat_bytes[i] = msgr.stats.op_stat_bytes[i]; } } for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { - if (c_cli.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i]) + if (msgr.stats.subop_stat_count[i] != prev_stats.subop_stat_count[i]) { - uint64_t avg = (c_cli.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(c_cli.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]); + uint64_t avg = (msgr.stats.subop_stat_sum[i] - prev_stats.subop_stat_sum[i])/(msgr.stats.subop_stat_count[i] - prev_stats.subop_stat_count[i]); printf("[OSD %lu] avg latency for subop %d (%s): %ld us\n", osd_num, i, osd_op_names[i], avg); - prev_stats.subop_stat_count[i] = c_cli.stats.subop_stat_count[i]; - prev_stats.subop_stat_sum[i] = c_cli.stats.subop_stat_sum[i]; + prev_stats.subop_stat_count[i] = msgr.stats.subop_stat_count[i]; + prev_stats.subop_stat_sum[i] = msgr.stats.subop_stat_sum[i]; } } for (int i = 0; i < 2; i++) @@ -351,7 +351,7 @@ void osd_t::print_slow() char alloc[1024]; timespec now; clock_gettime(CLOCK_REALTIME, &now); - for (auto & kv: c_cli.clients) + for (auto & kv: msgr.clients) { for (auto op: kv.second->received_ops) { diff --git a/src/osd.h b/src/osd.h index 7a67c6e9..96146e51 100644 --- a/src/osd.h +++ b/src/osd.h @@ -116,7 +116,7 @@ class osd_t // cluster state etcd_state_client_t st_cli; - osd_messenger_t c_cli; + osd_messenger_t msgr; int etcd_failed_attempts = 0; std::string etcd_lease_id; json11::Json self_state; diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index 7fc5500f..57637d7a 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -104,7 +104,7 @@ void osd_t::parse_test_peer(std::string peer) { "addresses", json11::Json::array { addr } }, { "port", port }, }; - c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); + msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); } json11::Json osd_t::get_osd_state() @@ -146,16 +146,16 @@ json11::Json osd_t::get_statistics() for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { op_stats[osd_op_names[i]] = json11::Json::object { - { "count", c_cli.stats.op_stat_count[i] }, - { "usec", c_cli.stats.op_stat_sum[i] }, - { "bytes", c_cli.stats.op_stat_bytes[i] }, + { "count", msgr.stats.op_stat_count[i] }, + { "usec", msgr.stats.op_stat_sum[i] }, + { "bytes", msgr.stats.op_stat_bytes[i] }, }; } for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++) { subop_stats[osd_op_names[i]] = json11::Json::object { - { "count", c_cli.stats.subop_stat_count[i] }, - { "usec", c_cli.stats.subop_stat_sum[i] }, + { "count", msgr.stats.subop_stat_count[i] }, + { "usec", msgr.stats.subop_stat_sum[i] }, }; } st["op_stats"] = op_stats; @@ -298,9 +298,9 @@ void osd_t::report_statistics() void osd_t::on_change_osd_state_hook(osd_num_t peer_osd) { - if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end()) + if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end()) { - c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); + msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); } } @@ -695,9 +695,9 @@ void osd_t::apply_pg_config() // Add peers for (auto pg_osd: all_peers) { - if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end()) + if (pg_osd != this->osd_num && msgr.osd_peer_fds.find(pg_osd) == msgr.osd_peer_fds.end()) { - c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); + msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); } } start_pg_peering(pg); diff --git a/src/osd_flush.cpp b/src/osd_flush.cpp index 62ab408f..2217c666 100644 --- a/src/osd_flush.cpp +++ b/src/osd_flush.cpp @@ -82,10 +82,10 @@ void osd_t::handle_flush_op(bool rollback, pool_id_t pool_id, pg_num_t pg_num, p else { printf("Error while doing flush on OSD %lu: %d (%s)\n", osd_num, retval, strerror(-retval)); - auto fd_it = c_cli.osd_peer_fds.find(peer_osd); - if (fd_it != c_cli.osd_peer_fds.end()) + auto fd_it = msgr.osd_peer_fds.find(peer_osd); + if (fd_it != msgr.osd_peer_fds.end()) { - c_cli.stop_client(fd_it->second); + msgr.stop_client(fd_it->second); } return; } @@ -188,7 +188,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t else { // Peer - int peer_fd = c_cli.osd_peer_fds[peer_osd]; + int peer_fd = msgr.osd_peer_fds[peer_osd]; op->op_type = OSD_OP_OUT; op->iov.push_back(op->buf, count * sizeof(obj_ver_id)); op->peer_fd = peer_fd; @@ -196,7 +196,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t .sec_stab = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = (uint64_t)(rollback ? OSD_OP_SEC_ROLLBACK : OSD_OP_SEC_STABILIZE), }, .len = count * sizeof(obj_ver_id), @@ -207,7 +207,7 @@ void osd_t::submit_flush_op(pool_id_t pool_id, pg_num_t pg_num, pg_flush_batch_t handle_flush_op(op->req.hdr.opcode == OSD_OP_SEC_ROLLBACK, pool_id, pg_num, fb, peer_osd, op->reply.hdr.retval); delete op; }; - c_cli.outbox_push(op); + msgr.outbox_push(op); } } diff --git a/src/osd_peering.cpp b/src/osd_peering.cpp index d8fac206..1d2c14ae 100644 --- a/src/osd_peering.cpp +++ b/src/osd_peering.cpp @@ -156,7 +156,7 @@ void osd_t::start_pg_peering(pg_t & pg) if (immediate_commit != IMMEDIATE_ALL) { std::vector to_stop; - for (auto & cp: c_cli.clients) + for (auto & cp: msgr.clients) { if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end()) { @@ -165,7 +165,7 @@ void osd_t::start_pg_peering(pg_t & pg) } for (auto peer_fd: to_stop) { - c_cli.stop_client(peer_fd); + msgr.stop_client(peer_fd); } } // Calculate current write OSD set @@ -175,7 +175,7 @@ void osd_t::start_pg_peering(pg_t & pg) for (int role = 0; role < pg.target_set.size(); role++) { pg.cur_set[role] = pg.target_set[role] == this->osd_num || - c_cli.osd_peer_fds.find(pg.target_set[role]) != c_cli.osd_peer_fds.end() ? pg.target_set[role] : 0; + msgr.osd_peer_fds.find(pg.target_set[role]) != msgr.osd_peer_fds.end() ? pg.target_set[role] : 0; if (pg.cur_set[role] != 0) { pg.pg_cursize++; @@ -199,7 +199,7 @@ void osd_t::start_pg_peering(pg_t & pg) { found = false; if (history_osd == this->osd_num || - c_cli.osd_peer_fds.find(history_osd) != c_cli.osd_peer_fds.end()) + msgr.osd_peer_fds.find(history_osd) != msgr.osd_peer_fds.end()) { found = true; break; @@ -223,13 +223,13 @@ void osd_t::start_pg_peering(pg_t & pg) std::set cur_peers; for (auto pg_osd: pg.all_peers) { - if (pg_osd == this->osd_num || c_cli.osd_peer_fds.find(pg_osd) != c_cli.osd_peer_fds.end()) + if (pg_osd == this->osd_num || msgr.osd_peer_fds.find(pg_osd) != msgr.osd_peer_fds.end()) { cur_peers.insert(pg_osd); } - else if (c_cli.wanted_peers.find(pg_osd) == c_cli.wanted_peers.end()) + else if (msgr.wanted_peers.find(pg_osd) == msgr.wanted_peers.end()) { - c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); + msgr.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); } } pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end()); @@ -325,7 +325,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p else { // Peer - auto & cl = c_cli.clients.at(c_cli.osd_peer_fds[role_osd]); + auto & cl = msgr.clients.at(msgr.osd_peer_fds[role_osd]); osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; op->peer_fd = cl->peer_fd; @@ -333,7 +333,7 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p .sec_sync = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_SYNC, }, }, @@ -347,14 +347,14 @@ void osd_t::submit_sync_and_list_subop(osd_num_t role_osd, pg_peering_state_t *p int fail_fd = op->peer_fd; ps->list_ops.erase(role_osd); delete op; - c_cli.stop_client(fail_fd); + msgr.stop_client(fail_fd); return; } delete op; ps->list_ops.erase(role_osd); submit_list_subop(role_osd, ps); }; - c_cli.outbox_push(op); + msgr.outbox_push(op); ps->list_ops[role_osd] = op; } } @@ -404,12 +404,12 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) // Peer osd_op_t *op = new osd_op_t(); op->op_type = OSD_OP_OUT; - op->peer_fd = c_cli.osd_peer_fds[role_osd]; + op->peer_fd = msgr.osd_peer_fds[role_osd]; op->req = (osd_any_op_t){ .sec_list = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_LIST, }, .list_pg = ps->pg_num, @@ -427,7 +427,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) int fail_fd = op->peer_fd; ps->list_ops.erase(role_osd); delete op; - c_cli.stop_client(fail_fd); + msgr.stop_client(fail_fd); return; } printf( @@ -444,7 +444,7 @@ void osd_t::submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps) ps->list_ops.erase(role_osd); delete op; }; - c_cli.outbox_push(op); + msgr.outbox_push(op); ps->list_ops[role_osd] = op; } } diff --git a/src/osd_primary_chain.cpp b/src/osd_primary_chain.cpp index 7d0e6912..e9a2fbfb 100644 --- a/src/osd_primary_chain.cpp +++ b/src/osd_primary_chain.cpp @@ -236,14 +236,14 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) // Send to a remote OSD osd_op_t *subop = op_data->subops+subop_idx; subop->op_type = OSD_OP_OUT; - subop->peer_fd = c_cli.osd_peer_fds.at(subop_osd_num); + subop->peer_fd = msgr.osd_peer_fds.at(subop_osd_num); // FIXME: Use the pre-allocated buffer subop->buf = malloc_or_die(sizeof(obj_ver_id)*(i+1-prev)); subop->req = (osd_any_op_t){ .sec_read_bmp = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_READ_BMP, }, .len = sizeof(obj_ver_id)*(i+1-prev), @@ -273,7 +273,7 @@ int osd_t::submit_bitmap_subops(osd_op_t *cur_op, pg_t & pg) } handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(subop); + msgr.outbox_push(subop); subop_idx++; } prev = i+1; diff --git a/src/osd_primary_subops.cpp b/src/osd_primary_subops.cpp index 68f8ae73..9b8eff08 100644 --- a/src/osd_primary_subops.cpp +++ b/src/osd_primary_subops.cpp @@ -87,14 +87,14 @@ void osd_t::finish_op(osd_op_t *cur_op, int retval) else { // FIXME add separate magic number for primary ops - auto cl_it = c_cli.clients.find(cur_op->peer_fd); - if (cl_it != c_cli.clients.end()) + auto cl_it = msgr.clients.find(cur_op->peer_fd); + if (cl_it != msgr.clients.end()) { cur_op->reply.hdr.magic = SECONDARY_OSD_REPLY_MAGIC; cur_op->reply.hdr.id = cur_op->req.hdr.id; cur_op->reply.hdr.opcode = cur_op->req.hdr.opcode; cur_op->reply.hdr.retval = retval; - c_cli.outbox_push(cur_op); + msgr.outbox_push(cur_op); } else { @@ -184,13 +184,13 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o else { subop->op_type = OSD_OP_OUT; - subop->peer_fd = c_cli.osd_peer_fds.at(role_osd_num); + subop->peer_fd = msgr.osd_peer_fds.at(role_osd_num); subop->bitmap = stripes[stripe_num].bmp_buf; subop->bitmap_len = clean_entry_bitmap_size; subop->req.sec_rw = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = (uint64_t)(wr ? (rep ? OSD_OP_SEC_WRITE_STABLE : OSD_OP_SEC_WRITE) : OSD_OP_SEC_READ), }, .oid = { @@ -227,7 +227,7 @@ int osd_t::submit_primary_subop_batch(int submit_type, inode_t inode, uint64_t o { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(subop); + msgr.outbox_push(subop); } i++; } @@ -282,20 +282,20 @@ void osd_t::add_bs_subop_stats(osd_op_t *subop) uint64_t opcode = bs_op_to_osd_op[subop->bs_op->opcode]; timespec tv_end; clock_gettime(CLOCK_REALTIME, &tv_end); - c_cli.stats.op_stat_count[opcode]++; - if (!c_cli.stats.op_stat_count[opcode]) + msgr.stats.op_stat_count[opcode]++; + if (!msgr.stats.op_stat_count[opcode]) { - c_cli.stats.op_stat_count[opcode] = 1; - c_cli.stats.op_stat_sum[opcode] = 0; - c_cli.stats.op_stat_bytes[opcode] = 0; + msgr.stats.op_stat_count[opcode] = 1; + msgr.stats.op_stat_sum[opcode] = 0; + msgr.stats.op_stat_bytes[opcode] = 0; } - c_cli.stats.op_stat_sum[opcode] += ( + msgr.stats.op_stat_sum[opcode] += ( (tv_end.tv_sec - subop->tv_begin.tv_sec)*1000000 + (tv_end.tv_nsec - subop->tv_begin.tv_nsec)/1000 ); if (opcode == OSD_OP_SEC_READ || opcode == OSD_OP_SEC_WRITE) { - c_cli.stats.op_stat_bytes[opcode] += subop->bs_op->len; + msgr.stats.op_stat_bytes[opcode] += subop->bs_op->len; } } @@ -322,7 +322,7 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) if (subop->peer_fd >= 0) { // Drop connection on any error - c_cli.stop_client(subop->peer_fd); + msgr.stop_client(subop->peer_fd); } } else @@ -332,8 +332,8 @@ void osd_t::handle_primary_subop(osd_op_t *subop, osd_op_t *cur_op) { uint64_t version = subop->reply.sec_rw.version; #ifdef OSD_DEBUG - uint64_t peer_osd = c_cli.clients.find(subop->peer_fd) != c_cli.clients.end() - ? c_cli.clients[subop->peer_fd]->osd_num : osd_num; + uint64_t peer_osd = msgr.clients.find(subop->peer_fd) != msgr.clients.end() + ? msgr.clients[subop->peer_fd]->osd_num : osd_num; printf("subop %lu from osd %lu: version = %lu\n", opcode, peer_osd, version); #endif if (op_data->fact_ver != UINT64_MAX) @@ -465,11 +465,11 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_ else { subops[i].op_type = OSD_OP_OUT; - subops[i].peer_fd = c_cli.osd_peer_fds.at(chunk.osd_num); + subops[i].peer_fd = msgr.osd_peer_fds.at(chunk.osd_num); subops[i].req = (osd_any_op_t){ .sec_del = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_DELETE, }, .oid = chunk.oid, @@ -479,7 +479,7 @@ void osd_t::submit_primary_del_batch(osd_op_t *cur_op, obj_ver_osd_t *chunks_to_ { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(&subops[i]); + msgr.outbox_push(&subops[i]); } } } @@ -509,14 +509,14 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op) }); bs->enqueue_op(subops[i].bs_op); } - else if ((peer_it = c_cli.osd_peer_fds.find(sync_osd)) != c_cli.osd_peer_fds.end()) + else if ((peer_it = msgr.osd_peer_fds.find(sync_osd)) != msgr.osd_peer_fds.end()) { subops[i].op_type = OSD_OP_OUT; subops[i].peer_fd = peer_it->second; subops[i].req = (osd_any_op_t){ .sec_sync = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_SYNC, }, } }; @@ -524,7 +524,7 @@ int osd_t::submit_primary_sync_subops(osd_op_t *cur_op) { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(&subops[i]); + msgr.outbox_push(&subops[i]); } else { @@ -569,11 +569,11 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) else { subops[i].op_type = OSD_OP_OUT; - subops[i].peer_fd = c_cli.osd_peer_fds.at(stab_osd.osd_num); + subops[i].peer_fd = msgr.osd_peer_fds.at(stab_osd.osd_num); subops[i].req = (osd_any_op_t){ .sec_stab = { .header = { .magic = SECONDARY_OSD_OP_MAGIC, - .id = c_cli.next_subop_id++, + .id = msgr.next_subop_id++, .opcode = OSD_OP_SEC_STABILIZE, }, .len = (uint64_t)(stab_osd.len * sizeof(obj_ver_id)), @@ -583,7 +583,7 @@ void osd_t::submit_primary_stab_subops(osd_op_t *cur_op) { handle_primary_subop(subop, cur_op); }; - c_cli.outbox_push(&subops[i]); + msgr.outbox_push(&subops[i]); } } } diff --git a/src/osd_primary_sync.cpp b/src/osd_primary_sync.cpp index 376181eb..3bd06919 100644 --- a/src/osd_primary_sync.cpp +++ b/src/osd_primary_sync.cpp @@ -247,8 +247,8 @@ resume_8: finish: if (cur_op->peer_fd) { - auto it = c_cli.clients.find(cur_op->peer_fd); - if (it != c_cli.clients.end()) + auto it = msgr.clients.find(cur_op->peer_fd); + if (it != msgr.clients.end()) it->second->dirty_pgs.clear(); } finish_op(cur_op, 0); diff --git a/src/osd_primary_write.cpp b/src/osd_primary_write.cpp index 4b76ca9c..d1e08f81 100644 --- a/src/osd_primary_write.cpp +++ b/src/osd_primary_write.cpp @@ -370,8 +370,8 @@ lazy: } // Remember PG as dirty to drop the connection when PG goes offline // (this is required because of the "lazy sync") - auto cl_it = c_cli.clients.find(cur_op->peer_fd); - if (cl_it != c_cli.clients.end()) + auto cl_it = msgr.clients.find(cur_op->peer_fd); + if (cl_it != msgr.clients.end()) { cl_it->second->dirty_pgs.insert({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }); }