From c573bc6bb3d8b8c10cda30171a979c550b250f51 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 6 Jun 2020 01:39:58 +0300 Subject: [PATCH] (Probably almost) implement cluster client --- Makefile | 30 ++-- cluster_client.cpp | 309 ++++++++++++++++++++++++++++++++++++++++++ cluster_client.h | 74 ++++++++++ etcd_state_client.cpp | 7 +- etcd_state_client.h | 1 + messenger.cpp | 10 +- messenger.h | 9 +- msgr_receive.cpp | 8 +- osd.cpp | 4 +- osd_client.cpp | 40 ------ osd_cluster.cpp | 6 +- osd_peering.cpp | 2 +- osd_primary.cpp | 18 ++- 13 files changed, 432 insertions(+), 86 deletions(-) create mode 100644 cluster_client.cpp create mode 100644 cluster_client.h delete mode 100644 osd_client.cpp diff --git a/Makefile b/Makefile index 89361150..fb411ecd 100644 --- a/Makefile +++ b/Makefile @@ -72,7 +72,7 @@ blockstore_sync.o: blockstore_sync.cpp allocator.h blockstore.h blockstore_flush g++ $(CXXFLAGS) -c -o $@ $< blockstore_write.o: blockstore_write.cpp allocator.h blockstore.h blockstore_flush.h blockstore_impl.h blockstore_init.h blockstore_journal.h cpp-btree/btree_map.h crc32c.h object_id.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< -messenger.o: messenger.cpp messenger.h json11/json11.hpp object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h +cluster_client.o: cluster_client.cpp cluster_client.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< dump_journal.o: dump_journal.cpp allocator.h blockstore.h blockstore_flush.h blockstore_impl.h blockstore_init.h blockstore_journal.h cpp-btree/btree_map.h crc32c.h object_id.h ringloop.h g++ $(CXXFLAGS) -c -o $@ $< @@ -84,33 +84,35 @@ fio_sec_osd.o: fio_sec_osd.cpp fio/fio.h fio/optgroup.h object_id.h osd_id.h osd g++ $(CXXFLAGS) -c -o $@ $< http_client.o: http_client.cpp http_client.h json11/json11.hpp timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< -osd.o: osd.cpp blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h +messenger.o: messenger.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< -osd_cluster.o: osd_cluster.cpp base64.h blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h +msgr_receive.o: msgr_receive.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< -osd_flush.o: osd_flush.cpp blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h +msgr_send.o: msgr_send.cpp json11/json11.hpp messenger.h object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< -osd_main.o: osd_main.cpp blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h +osd.o: osd.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< -osd_peering.o: osd_peering.cpp base64.h blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h +osd_cluster.o: osd_cluster.cpp base64.h blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h + g++ $(CXXFLAGS) -c -o $@ $< +osd_flush.o: osd_flush.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h + g++ $(CXXFLAGS) -c -o $@ $< +osd_main.o: osd_main.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h + g++ $(CXXFLAGS) -c -o $@ $< +osd_peering.o: osd_peering.cpp base64.h blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering_pg.o: osd_peering_pg.cpp cpp-btree/btree_map.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h g++ $(CXXFLAGS) -c -o $@ $< osd_peering_pg_test.o: osd_peering_pg_test.cpp cpp-btree/btree_map.h object_id.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h g++ $(CXXFLAGS) -c -o $@ $< -osd_primary.o: osd_primary.cpp blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h +osd_primary.o: osd_primary.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< -osd_primary_subops.o: osd_primary_subops.cpp blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h - g++ $(CXXFLAGS) -c -o $@ $< -msgr_receive.o: msgr_receive.cpp messenger.h json11/json11.hpp object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h +osd_primary_subops.o: osd_primary_subops.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h osd_primary.h osd_rmw.h pg_states.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< osd_rmw.o: osd_rmw.cpp object_id.h osd_id.h osd_rmw.h xor.h g++ $(CXXFLAGS) -c -o $@ $< osd_rmw_test.o: osd_rmw_test.cpp object_id.h osd_id.h osd_rmw.cpp osd_rmw.h test_pattern.h xor.h g++ $(CXXFLAGS) -c -o $@ $< -osd_secondary.o: osd_secondary.cpp blockstore.h messenger.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h - g++ $(CXXFLAGS) -c -o $@ $< -msgr_send.o: msgr_send.cpp messenger.h json11/json11.hpp object_id.h osd_id.h osd_ops.h ringloop.h timerfd_manager.h +osd_secondary.o: osd_secondary.cpp blockstore.h cpp-btree/btree_map.h etcd_state_client.h http_client.h json11/json11.hpp messenger.h object_id.h osd.h osd_id.h osd_ops.h osd_peering_pg.h pg_states.h ringloop.h timerfd_manager.h g++ $(CXXFLAGS) -c -o $@ $< osd_test.o: osd_test.cpp object_id.h osd_id.h osd_ops.h rw_blocking.h test_pattern.h g++ $(CXXFLAGS) -c -o $@ $< @@ -130,8 +132,6 @@ test_allocator.o: test_allocator.cpp allocator.h g++ $(CXXFLAGS) -c -o $@ $< test_blockstore.o: test_blockstore.cpp blockstore.h object_id.h ringloop.h timerfd_interval.h g++ $(CXXFLAGS) -c -o $@ $< -test_crc.o: test_crc.cpp crc32c.h - g++ $(CXXFLAGS) -c -o $@ $< timerfd_interval.o: timerfd_interval.cpp ringloop.h timerfd_interval.h g++ $(CXXFLAGS) -c -o $@ $< timerfd_manager.o: timerfd_manager.cpp timerfd_manager.h diff --git a/cluster_client.cpp b/cluster_client.cpp new file mode 100644 index 00000000..8024855d --- /dev/null +++ b/cluster_client.cpp @@ -0,0 +1,309 @@ +#include "cluster_client.h" + +cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd) +{ + this->ringloop = ringloop; + this->tfd = tfd; + + msgr.tfd = tfd; + msgr.ringloop = ringloop; + msgr.repeer_pgs = [this](osd_num_t peer_osd) + { + // peer_osd just connected or dropped connection + if (msgr.osd_peer_fds.find(peer_osd) != msgr.osd_peer_fds.end()) + { + // really connected :) + continue_ops(); + } + }; + + st_cli.tfd = tfd; + st_cli.log_level = log_level; + st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; + st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); }; + st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_hook(changes); }; + st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); }; + st_cli.load_global_config(); +} + +void cluster_client_t::continue_ops() +{ + for (auto op_it = unsent_ops.begin(); op_it != unsent_ops.end(); ) + { + cluster_op_t *op = *op_it; + if (op->needs_reslice && !op->sent_count) + { + op->parts.clear(); + op->done_count = 0; + op->needs_reslice = false; + } + if (!op->parts.size()) + { + unsent_ops.erase(op_it++); + execute(op); + continue; + } + if (!op->needs_reslice) + { + for (auto & op_part: op->parts) + { + if (!op_part.sent && !op_part.done) + { + try_send(op, &op_part); + } + } + if (op->sent_count == op->parts.size() - op->done_count) + { + unsent_ops.erase(op_it++); + sent_ops.insert(op); + } + else + op_it++; + } + else + op_it++; + } +} + +static uint32_t is_power_of_two(uint64_t value) +{ + uint32_t l = 0; + while (value > 1) + { + if (value & 1) + { + return 64; + } + value = value >> 1; + l++; + } + return l; +} + +void cluster_client_t::on_load_config_hook(json11::Json::object & config) +{ + bs_block_size = config["block_size"].uint64_value(); + bs_disk_alignment = config["disk_alignment"].uint64_value(); + bs_bitmap_granularity = config["bitmap_granularity"].uint64_value(); + if (!bs_block_size) + bs_block_size = DEFAULT_BLOCK_SIZE; + if (!bs_disk_alignment) + bs_disk_alignment = DEFAULT_DISK_ALIGNMENT; + if (!bs_bitmap_granularity) + bs_bitmap_granularity = DEFAULT_BITMAP_GRANULARITY; + { + uint32_t block_order; + if ((block_order = is_power_of_two(bs_block_size)) >= 64 || bs_block_size < MIN_BLOCK_SIZE || bs_block_size >= MAX_BLOCK_SIZE) + throw std::runtime_error("Bad block size"); + } + if (config.find("pg_stripe_size") != config.end()) + { + pg_stripe_size = config["pg_stripe_size"].uint64_value(); + if (!pg_stripe_size) + pg_stripe_size = DEFAULT_PG_STRIPE_SIZE; + } + msgr.peer_connect_interval = config["peer_connect_interval"].uint64_value(); + if (!msgr.peer_connect_interval) + msgr.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; + msgr.peer_connect_timeout = config["peer_connect_timeout"].uint64_value(); + if (!msgr.peer_connect_timeout) + msgr.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; +} + +void cluster_client_t::on_load_pgs_hook(bool success) +{ + if (success) + { + pg_count = st_cli.pg_config.size(); + continue_ops(); + } +} + +void cluster_client_t::on_change_hook(json11::Json::object & changes) +{ + if (pg_count != st_cli.pg_config.size()) + { + // At this point, all operations should be suspended + // And they need to be resliced! + for (auto op: unsent_ops) + { + op->needs_reslice = true; + } + for (auto op: sent_ops) + { + op->needs_reslice = true; + } + pg_count = st_cli.pg_config.size(); + } + continue_ops(); +} + +void cluster_client_t::on_change_osd_state_hook(uint64_t peer_osd) +{ + if (msgr.wanted_peers.find(peer_osd) != msgr.wanted_peers.end()) + { + msgr.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); + } +} + +void cluster_client_t::execute(cluster_op_t *op) +{ + if (op->opcode != OSD_OP_READ && op->opcode != OSD_OP_OUT || !op->inode || !op->len || + op->offset % bs_disk_alignment || op->len % bs_disk_alignment) + { + op->retval = -EINVAL; + std::function(op->callback)(op); + return; + } + if (!pg_stripe_size) + { + // Config is not loaded yet + unsent_ops.insert(op); + return; + } + // Slice the request into individual object stripe requests + // Primary OSDs still operate individual stripes, but their size is multiplied by PG minsize in case of EC + uint64_t pg_block_size = bs_block_size * pg_part_count; + uint64_t first_stripe = (op->offset / pg_block_size) * pg_block_size; + uint64_t last_stripe = ((op->offset + op->len + pg_block_size - 1) / pg_block_size - 1) * pg_block_size; + int part_count = 0; + for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) + { + if (op->offset < (stripe+pg_block_size) && (op->offset+op->len) > stripe) + { + part_count++; + } + } + op->parts.resize(part_count); + bool resend = false; + int i = 0; + for (uint64_t stripe = first_stripe; stripe <= last_stripe; stripe += pg_block_size) + { + uint64_t stripe_end = stripe + pg_block_size; + if (op->offset < stripe_end && (op->offset+op->len) > stripe) + { + pg_num_t pg_num = (op->inode + stripe/pg_stripe_size) % pg_count + 1; + op->parts[i] = { + .parent = op, + .offset = op->offset < stripe ? stripe : op->offset, + .len = (uint32_t)((op->offset+op->len) > stripe_end ? pg_block_size : op->offset+op->len-stripe), + .pg_num = pg_num, + .buf = op->buf + (op->offset < stripe ? stripe-op->offset : 0), + .sent = false, + .done = false, + }; + if (!try_send(op, &op->parts[i])) + { + // Part needs to be sent later + resend = true; + } + i++; + } + } + if (resend) + { + unsent_ops.insert(op); + } + else + { + sent_ops.insert(op); + } +} + +bool cluster_client_t::try_send(cluster_op_t *op, cluster_op_part_t *part) +{ + auto pg_it = st_cli.pg_config.find(part->pg_num); + if (pg_it != st_cli.pg_config.end() && + !pg_it->second.pause && pg_it->second.cur_primary) + { + osd_num_t primary_osd = pg_it->second.cur_primary; + auto peer_it = msgr.osd_peer_fds.find(primary_osd); + if (peer_it != msgr.osd_peer_fds.end()) + { + int peer_fd = peer_it->second; + part->osd_num = primary_osd; + part->sent = true; + op->sent_count++; + part->op = { + .op_type = OSD_OP_OUT, + .peer_fd = peer_fd, + .req = { .rw = { + .header = { + .magic = SECONDARY_OSD_OP_MAGIC, + .id = op_id++, + .opcode = op->opcode, + }, + .inode = op->inode, + .offset = part->offset, + .len = part->len, + } }, + .callback = [this, part](osd_op_t *op_part) + { + handle_op_part(part); + }, + }; + part->op.send_list.push_back(part->op.req.buf, OSD_PACKET_SIZE); + if (op->opcode == OSD_OP_WRITE) + { + part->op.send_list.push_back(part->buf, part->len); + } + else + { + part->op.buf = part->buf; + } + msgr.outbox_push(&part->op); + return true; + } + else if (msgr.wanted_peers.find(primary_osd) == msgr.wanted_peers.end()) + { + msgr.connect_peer(primary_osd, st_cli.peer_states[primary_osd]); + } + } + return false; +} + +void cluster_client_t::handle_op_part(cluster_op_part_t *part) +{ + cluster_op_t *op = part->parent; + part->sent = false; + op->sent_count--; + part->op.buf = NULL; + if (part->op.reply.hdr.retval != part->op.req.rw.len) + { + // Operation failed, retry + printf( + "Operation part failed on OSD %lu: retval=%ld (expected %u), reconnecting\n", + part->osd_num, part->op.reply.hdr.retval, part->op.req.rw.len + ); + msgr.stop_client(part->op.peer_fd); + if (op->sent_count == op->parts.size() - op->done_count - 1) + { + // Resend later when OSDs come up + // FIXME: Check for different types of errors + // FIXME: Repeat operations after a small timeout, for the case when OSD is coming up + sent_ops.erase(op); + unsent_ops.insert(op); + } + if (op->sent_count == 0 && op->needs_reslice) + { + // PG count has changed, reslice the operation + unsent_ops.erase(op); + op->parts.clear(); + op->done_count = 0; + op->needs_reslice = false; + execute(op); + } + } + else + { + // OK + part->done = true; + op->done_count++; + if (op->done_count >= op->parts.size()) + { + // Finished! + op->retval = op->len; + std::function(op->callback)(op); + } + } +} diff --git a/cluster_client.h b/cluster_client.h new file mode 100644 index 00000000..a49c23e7 --- /dev/null +++ b/cluster_client.h @@ -0,0 +1,74 @@ +#pragma once + +#include "messenger.h" +#include "etcd_state_client.h" + +#define MIN_BLOCK_SIZE 4*1024 +#define MAX_BLOCK_SIZE 128*1024*1024 +#define DEFAULT_BLOCK_SIZE 128*1024 +#define DEFAULT_PG_STRIPE_SIZE 4*1024*1024 +#define DEFAULT_DISK_ALIGNMENT 4096 +#define DEFAULT_BITMAP_GRANULARITY 4096 + +struct cluster_op_t; + +struct cluster_op_part_t +{ + cluster_op_t *parent; + uint64_t offset; + uint32_t len; + pg_num_t pg_num; + osd_num_t osd_num; + void *buf; + bool sent; + bool done; + osd_op_t op; +}; + +struct cluster_op_t +{ + uint64_t opcode; // OSD_OP_READ, OSD_OP_WRITE + uint64_t inode; + uint64_t offset; + uint64_t len; + int retval; + void *buf; + std::function callback; +protected: + bool needs_reslice = false; + int sent_count = 0, done_count = 0; + std::vector parts; + friend class cluster_client_t; +}; + +class cluster_client_t +{ + timerfd_manager_t *tfd; + ring_loop_t *ringloop; + + uint64_t pg_part_count = 2; + uint64_t pg_stripe_size = 0; + uint64_t bs_block_size = 0; + uint64_t bs_disk_alignment = 0; + uint64_t bs_bitmap_granularity = 0; + uint64_t pg_count = 0; + int log_level; + + uint64_t op_id = 1; + etcd_state_client_t st_cli; + osd_messenger_t msgr; + std::set sent_ops, unsent_ops; + +public: + cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd); + void execute(cluster_op_t *op); + +protected: + void continue_ops(); + void on_load_config_hook(json11::Json::object & cfg); + void on_load_pgs_hook(bool success); + void on_change_hook(json11::Json::object & changes); + void on_change_osd_state_hook(uint64_t peer_osd); + bool try_send(cluster_op_t *op, cluster_op_part_t *part); + void handle_op_part(cluster_op_part_t *part); +}; diff --git a/etcd_state_client.cpp b/etcd_state_client.cpp index 82be8f6d..82f1cb91 100644 --- a/etcd_state_client.cpp +++ b/etcd_state_client.cpp @@ -93,7 +93,10 @@ void etcd_state_client_t::start_etcd_watcher() parse_state(kv.first, kv.second); } // React to changes - on_change_hook(changes); + if (on_change_hook != NULL) + { + on_change_hook(changes); + } } } if (msg->eof) @@ -208,7 +211,7 @@ void etcd_state_client_t::load_pgs() }, }; json11::Json::object req = { { "success", txn } }; - json11::Json checks = load_pgs_checks_hook(); + json11::Json checks = load_pgs_checks_hook != NULL ? load_pgs_checks_hook() : json11::Json(); if (checks.array_items().size() > 0) { req["compare"] = checks; diff --git a/etcd_state_client.h b/etcd_state_client.h index 91b35938..4d95e128 100644 --- a/etcd_state_client.h +++ b/etcd_state_client.h @@ -1,5 +1,6 @@ #pragma once +#include "osd_id.h" #include "http_client.h" #include "timerfd_manager.h" diff --git a/messenger.cpp b/messenger.cpp index 92d5f869..514dd83d 100644 --- a/messenger.cpp +++ b/messenger.cpp @@ -22,19 +22,19 @@ osd_op_t::~osd_op_t() } } -void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json address_list, int port) +void osd_messenger_t::connect_peer(uint64_t peer_osd, json11::Json peer_state) { if (wanted_peers.find(peer_osd) == wanted_peers.end()) { wanted_peers[peer_osd] = (osd_wanted_peer_t){ - .address_list = address_list, - .port = port, + .address_list = peer_state["addresses"], + .port = (int)peer_state["port"].int64_value(), }; } else { - wanted_peers[peer_osd].address_list = address_list; - wanted_peers[peer_osd].port = port; + wanted_peers[peer_osd].address_list = peer_state["addresses"]; + wanted_peers[peer_osd].port = (int)peer_state["port"].int64_value(); } wanted_peers[peer_osd].address_changed = true; if (!wanted_peers[peer_osd].connecting && diff --git a/messenger.h b/messenger.h index b57b0e20..3c6968a5 100644 --- a/messenger.h +++ b/messenger.h @@ -29,6 +29,9 @@ #define PEER_CONNECTING 1 #define PEER_CONNECTED 2 +#define DEFAULT_PEER_CONNECT_INTERVAL 5 +#define DEFAULT_PEER_CONNECT_TIMEOUT 5 + struct osd_op_buf_list_t { int count = 0, alloc = 0, sent = 0; @@ -166,8 +169,8 @@ struct osd_messenger_t // osd_num_t is only for logging and asserts osd_num_t osd_num; int receive_buffer_size = 9000; - int peer_connect_interval = 5; - int peer_connect_timeout = 5; + int peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; + int peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; int log_level = 0; std::map wanted_peers; @@ -182,7 +185,7 @@ struct osd_messenger_t osd_op_stats_t stats; public: - void connect_peer(uint64_t osd_num, json11::Json address_list, int port); + void connect_peer(uint64_t osd_num, json11::Json peer_state); void stop_client(int peer_fd); void outbox_push(osd_op_t *cur_op); std::function exec_op; diff --git a/msgr_receive.cpp b/msgr_receive.cpp index b2f22bbf..82fdfa70 100644 --- a/msgr_receive.cpp +++ b/msgr_receive.cpp @@ -222,7 +222,7 @@ void osd_messenger_t::handle_reply_hdr(osd_client_t *cl) } osd_op_t *op = req_it->second; memcpy(op->reply.buf, cur_op->req.buf, OSD_PACKET_SIZE); - if (op->reply.hdr.opcode == OSD_OP_SECONDARY_READ && + if ((op->reply.hdr.opcode == OSD_OP_SECONDARY_READ || op->reply.hdr.opcode == OSD_OP_READ) && op->reply.hdr.retval > 0) { // Read data. In this case we assume that the buffer is preallocated by the caller (!) @@ -232,8 +232,7 @@ void osd_messenger_t::handle_reply_hdr(osd_client_t *cl) cl->read_buf = op->buf; cl->read_remaining = op->reply.hdr.retval; } - else if (op->reply.hdr.opcode == OSD_OP_SECONDARY_LIST && - op->reply.hdr.retval > 0) + else if (op->reply.hdr.opcode == OSD_OP_SECONDARY_LIST && op->reply.hdr.retval > 0) { op->buf = memalign(MEM_ALIGNMENT, sizeof(obj_ver_id) * op->reply.hdr.retval); cl->read_state = CL_READ_REPLY_DATA; @@ -241,8 +240,7 @@ void osd_messenger_t::handle_reply_hdr(osd_client_t *cl) cl->read_buf = op->buf; cl->read_remaining = sizeof(obj_ver_id) * op->reply.hdr.retval; } - else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && - op->reply.hdr.retval > 0) + else if (op->reply.hdr.opcode == OSD_OP_SHOW_CONFIG && op->reply.hdr.retval > 0) { op->buf = malloc(op->reply.hdr.retval); cl->read_state = CL_READ_REPLY_DATA; diff --git a/osd.cpp b/osd.cpp index 6829c045..093584a2 100644 --- a/osd.cpp +++ b/osd.cpp @@ -143,10 +143,10 @@ void osd_t::parse_config(blockstore_config_t & config) print_stats_interval = 3; c_cli.peer_connect_interval = strtoull(config["peer_connect_interval"].c_str(), NULL, 10); if (!c_cli.peer_connect_interval) - c_cli.peer_connect_interval = 5; + c_cli.peer_connect_interval = DEFAULT_PEER_CONNECT_INTERVAL; c_cli.peer_connect_timeout = strtoull(config["peer_connect_timeout"].c_str(), NULL, 10); if (!c_cli.peer_connect_timeout) - c_cli.peer_connect_timeout = 5; + c_cli.peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; log_level = strtoull(config["log_level"].c_str(), NULL, 10); st_cli.log_level = log_level; c_cli.log_level = log_level; diff --git a/osd_client.cpp b/osd_client.cpp deleted file mode 100644 index c53b1306..00000000 --- a/osd_client.cpp +++ /dev/null @@ -1,40 +0,0 @@ -void slice() -{ - // Slice the request into blockstore requests to individual objects - // Primary OSD still operates individual stripes, except they're twice the size of the blockstore's stripe. - std::vector read_parts; - int block = bs->get_block_size(); - uint64_t stripe1 = cur_op->req.rw.offset / block / 2; - uint64_t stripe2 = (cur_op->req.rw.offset + cur_op->req.rw.len + block*2 - 1) / block / 2 - 1; - for (uint64_t s = stripe1; s <= stripe2; s++) - { - uint64_t start = s == stripe1 ? cur_op->req.rw.offset - stripe1*block*2 : 0; - uint64_t end = s == stripe2 ? cur_op->req.rw.offset + cur_op->req.rw.len - stripe2*block*2 : block*2; - if (start < block) - { - read_parts.push_back({ - .role = 1, - .oid = { - .inode = cur_op->req.rw.inode, - .stripe = (s << STRIPE_ROLE_BITS) | 1, - }, - .version = UINT64_MAX, - .offset = start, - .len = (block < end ? block : end) - start, - }); - } - if (end > block) - { - read_parts.push_back({ - .role = 2, - .oid = { - .inode = cur_op->req.rw.inode, - .stripe = (s << STRIPE_ROLE_BITS) | 2, - }, - .version = UINT64_MAX, - .offset = (start > block ? start-block : 0), - .len = end - (start > block ? start-block : 0), - }); - } - } -} diff --git a/osd_cluster.cpp b/osd_cluster.cpp index 0dba02f5..53a89de2 100644 --- a/osd_cluster.cpp +++ b/osd_cluster.cpp @@ -83,7 +83,7 @@ void osd_t::parse_test_peer(std::string peer) { "addresses", json11::Json::array { addr } }, { "port", port }, }; - c_cli.connect_peer(peer_osd, json11::Json::array { addr }, port); + c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); } json11::Json osd_t::get_osd_state() @@ -211,7 +211,7 @@ void osd_t::on_change_osd_state_hook(uint64_t peer_osd) { if (c_cli.wanted_peers.find(peer_osd) != c_cli.wanted_peers.end()) { - c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]["addresses"], st_cli.peer_states[peer_osd]["port"].int64_value()); + c_cli.connect_peer(peer_osd, st_cli.peer_states[peer_osd]); } } @@ -556,7 +556,7 @@ void osd_t::apply_pg_config() { if (pg_osd != this->osd_num && c_cli.osd_peer_fds.find(pg_osd) == c_cli.osd_peer_fds.end()) { - c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]["addresses"], st_cli.peer_states[pg_osd]["port"].int64_value()); + c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); } } start_pg_peering(pg_num); diff --git a/osd_peering.cpp b/osd_peering.cpp index be900d58..cf16cd53 100644 --- a/osd_peering.cpp +++ b/osd_peering.cpp @@ -187,7 +187,7 @@ void osd_t::start_pg_peering(pg_num_t pg_num) } else if (c_cli.wanted_peers.find(pg_osd) == c_cli.wanted_peers.end()) { - c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]["addresses"], st_cli.peer_states[pg_osd]["port"].int64_value()); + c_cli.connect_peer(pg_osd, st_cli.peer_states[pg_osd]); } } pg.cur_peers.insert(pg.cur_peers.begin(), cur_peers.begin(), cur_peers.end()); diff --git a/osd_primary.cpp b/osd_primary.cpp index d00fa3e7..dc8c4021 100644 --- a/osd_primary.cpp +++ b/osd_primary.cpp @@ -13,9 +13,14 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) { // PG number is calculated from the offset // Our EC scheme stores data in fixed chunks equal to (K*block size) - // But we must not use K in the process of calculating the PG number - // So we calculate the PG number using a separate setting which should be per-inode (FIXME) - pg_num_t pg_num = (cur_op->req.rw.inode + cur_op->req.rw.offset / pg_stripe_size) % pg_count + 1; + // K = pg_minsize and will be a property of the inode. Not it's hardcoded (FIXME) + uint64_t pg_block_size = bs_block_size * 2; + object_id oid = { + .inode = cur_op->req.rw.inode, + // oid.stripe = starting offset of the parity stripe + .stripe = (cur_op->req.rw.offset/pg_block_size)*pg_block_size, + }; + pg_num_t pg_num = (cur_op->req.rw.inode + oid.stripe/pg_stripe_size) % pg_count + 1; auto pg_it = pgs.find(pg_num); if (pg_it == pgs.end() || !(pg_it->second.state & PG_ACTIVE)) { @@ -23,13 +28,6 @@ bool osd_t::prepare_primary_rw(osd_op_t *cur_op) finish_op(cur_op, -EPIPE); return false; } - uint64_t pg_block_size = bs_block_size * pg_it->second.pg_minsize; - object_id oid = { - .inode = cur_op->req.rw.inode, - // oid.stripe = starting offset of the parity stripe, so it can be mapped back to the PG - .stripe = (cur_op->req.rw.offset / pg_stripe_size) * pg_stripe_size + - ((cur_op->req.rw.offset % pg_stripe_size) / pg_block_size) * pg_block_size - }; if ((cur_op->req.rw.offset + cur_op->req.rw.len) > (oid.stripe + pg_block_size) || (cur_op->req.rw.offset % bs_disk_alignment) != 0 || (cur_op->req.rw.len % bs_disk_alignment) != 0)