diff --git a/src/cluster_client.cpp b/src/cluster_client.cpp index dd523166..050f55be 100644 --- a/src/cluster_client.cpp +++ b/src/cluster_client.cpp @@ -58,7 +58,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd st_cli.tfd = tfd; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); }; - st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_hook(changes); }; + st_cli.on_change_hook = [this](std::map & changes) { on_change_hook(changes); }; st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); }; st_cli.parse_config(config); @@ -273,7 +273,7 @@ void cluster_client_t::on_load_pgs_hook(bool success) continue_ops(); } -void cluster_client_t::on_change_hook(json11::Json::object & changes) +void cluster_client_t::on_change_hook(std::map & changes) { for (auto pool_item: st_cli.pool_config) { diff --git a/src/cluster_client.h b/src/cluster_client.h index 70439d56..14b0d474 100644 --- a/src/cluster_client.h +++ b/src/cluster_client.h @@ -108,7 +108,7 @@ protected: void flush_buffer(const object_id & oid, cluster_buffer_t *wr); void on_load_config_hook(json11::Json::object & config); void on_load_pgs_hook(bool success); - void on_change_hook(json11::Json::object & changes); + void on_change_hook(std::map & changes); void on_change_osd_state_hook(uint64_t peer_osd); int continue_rw(cluster_op_t *op); void slice_rw(cluster_op_t *op); diff --git a/src/etcd_state_client.cpp b/src/etcd_state_client.cpp index bd9fbb79..8d9cc5d4 100644 --- a/src/etcd_state_client.cpp +++ b/src/etcd_state_client.cpp @@ -27,9 +27,9 @@ etcd_state_client_t::~etcd_state_client_t() } #ifndef __MOCK__ -json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) +etcd_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) { - json_kv_t kv; + etcd_kv_t kv; kv.key = base64_decode(kv_json["key"].string_value()); std::string json_err, json_text = base64_decode(kv_json["value"].string_value()); kv.value = json_text == "" ? json11::Json() : json11::Json::parse(json_text, json_err); @@ -38,6 +38,8 @@ json_kv_t etcd_state_client_t::parse_etcd_kv(const json11::Json & kv_json) printf("Bad JSON in etcd key %s: %s (value: %s)\n", kv.key.c_str(), json_err.c_str(), json_text.c_str()); kv.key = ""; } + else + kv.mod_revision = kv_json["mod_revision"].uint64_value(); return kv; } @@ -150,22 +152,22 @@ void etcd_state_client_t::start_etcd_watcher() etcd_watch_revision = data["result"]["header"]["revision"].uint64_value(); } // First gather all changes into a hash to remove multiple overwrites - json11::Json::object changes; + std::map changes; for (auto & ev: data["result"]["events"].array_items()) { auto kv = parse_etcd_kv(ev["kv"]); if (kv.key != "") { - changes[kv.key] = kv.value; + changes[kv.key] = kv; } } for (auto & kv: changes) { if (this->log_level > 3) { - printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.dump().c_str()); + printf("Incoming event: %s -> %s\n", kv.first.c_str(), kv.second.value.dump().c_str()); } - parse_state(kv.first, kv.second); + parse_state(kv.second); } // React to changes if (on_change_hook != NULL) @@ -332,7 +334,7 @@ void etcd_state_client_t::load_pgs() for (auto & kv_json: res["response_range"]["kvs"].array_items()) { auto kv = parse_etcd_kv(kv_json); - parse_state(kv.key, kv.value); + parse_state(kv); } } on_load_pgs_hook(true); @@ -355,13 +357,10 @@ void etcd_state_client_t::load_pgs() } #endif -void etcd_state_client_t::parse_state(const json_kv_t & kv) -{ - parse_state(kv.key, kv.value); -} - -void etcd_state_client_t::parse_state(const std::string & key, const json11::Json & value) +void etcd_state_client_t::parse_state(const etcd_kv_t & kv) { + const std::string & key = kv.key; + const json11::Json & value = kv.value; if (key == etcd_prefix+"/config/pools") { for (auto & pool_item: this->pool_config) @@ -712,6 +711,7 @@ void etcd_state_client_t::parse_state(const std::string & key, const json11::Jso .size = value["size"].uint64_value(), .parent_id = parent_inode_num, .readonly = value["readonly"].bool_value(), + .mod_revision = kv.mod_revision, }; this->inode_config[inode_num] = cfg; if (cfg.name != "") diff --git a/src/etcd_state_client.h b/src/etcd_state_client.h index b554993c..7e0a9570 100644 --- a/src/etcd_state_client.h +++ b/src/etcd_state_client.h @@ -18,10 +18,11 @@ #define DEFAULT_BLOCK_SIZE 128*1024 -struct json_kv_t +struct etcd_kv_t { std::string key; json11::Json value; + uint64_t mod_revision; }; struct pg_config_t @@ -59,6 +60,8 @@ struct inode_config_t uint64_t size; inode_t parent_id; bool readonly; + // Change revision of the metadata in etcd + uint64_t mod_revision; }; struct inode_watch_t @@ -89,21 +92,20 @@ public: std::map inode_config; std::map inode_by_name; - std::function on_change_hook; + std::function &)> on_change_hook; std::function on_load_config_hook; std::function load_pgs_checks_hook; std::function on_load_pgs_hook; std::function on_change_pg_history_hook; std::function on_change_osd_state_hook; - json_kv_t parse_etcd_kv(const json11::Json & kv_json); + etcd_kv_t parse_etcd_kv(const json11::Json & kv_json); void etcd_call(std::string api, json11::Json payload, int timeout, std::function callback); void etcd_txn(json11::Json txn, int timeout, std::function callback); void start_etcd_watcher(); void load_global_config(); void load_pgs(); - void parse_state(const json_kv_t & kv); - void parse_state(const std::string & key, const json11::Json & value); + void parse_state(const etcd_kv_t & kv); void parse_config(json11::Json & config); inode_watch_t* watch_inode(std::string name); void close_watch(inode_watch_t* watch); diff --git a/src/osd.h b/src/osd.h index 817b721e..749ab0cd 100644 --- a/src/osd.h +++ b/src/osd.h @@ -147,7 +147,7 @@ class osd_t void init_cluster(); void on_change_osd_state_hook(osd_num_t peer_osd); void on_change_pg_history_hook(pool_id_t pool_id, pg_num_t pg_num); - void on_change_etcd_state_hook(json11::Json::object & changes); + void on_change_etcd_state_hook(std::map & changes); void on_load_config_hook(json11::Json::object & changes); json11::Json on_load_pgs_checks_hook(); void on_load_pgs_hook(bool success); diff --git a/src/osd_cluster.cpp b/src/osd_cluster.cpp index ec8c0140..a2fc4c8f 100644 --- a/src/osd_cluster.cpp +++ b/src/osd_cluster.cpp @@ -65,7 +65,7 @@ void osd_t::init_cluster() st_cli.log_level = log_level; st_cli.on_change_osd_state_hook = [this](osd_num_t peer_osd) { on_change_osd_state_hook(peer_osd); }; st_cli.on_change_pg_history_hook = [this](pool_id_t pool_id, pg_num_t pg_num) { on_change_pg_history_hook(pool_id, pg_num); }; - st_cli.on_change_hook = [this](json11::Json::object & changes) { on_change_etcd_state_hook(changes); }; + st_cli.on_change_hook = [this](std::map & changes) { on_change_etcd_state_hook(changes); }; st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); }; st_cli.load_pgs_checks_hook = [this]() { return on_load_pgs_checks_hook(); }; st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); }; @@ -272,7 +272,7 @@ void osd_t::on_change_osd_state_hook(osd_num_t peer_osd) } } -void osd_t::on_change_etcd_state_hook(json11::Json::object & changes) +void osd_t::on_change_etcd_state_hook(std::map & changes) { // FIXME apply config changes in runtime (maybe, some) if (run_primary) diff --git a/src/test_cluster_client.cpp b/src/test_cluster_client.cpp index 5541634c..61ab20dc 100644 --- a/src/test_cluster_client.cpp +++ b/src/test_cluster_client.cpp @@ -9,7 +9,7 @@ void configure_single_pg_pool(cluster_client_t *cli) { cli->st_cli.on_load_pgs_hook(true); - cli->st_cli.parse_state((json_kv_t){ + cli->st_cli.parse_state((etcd_kv_t){ .key = "/config/pools", .value = json11::Json::object { { "1", json11::Json::object { @@ -22,7 +22,7 @@ void configure_single_pg_pool(cluster_client_t *cli) } } }, }); - cli->st_cli.parse_state((json_kv_t){ + cli->st_cli.parse_state((etcd_kv_t){ .key = "/config/pgs", .value = json11::Json::object { { "items", json11::Json::object { @@ -35,7 +35,7 @@ void configure_single_pg_pool(cluster_client_t *cli) } } }, }); - cli->st_cli.parse_state((json_kv_t){ + cli->st_cli.parse_state((etcd_kv_t){ .key = "/pg/state/1/1", .value = json11::Json::object { { "peers", json11::Json::array { 1, 2 } }, @@ -43,7 +43,7 @@ void configure_single_pg_pool(cluster_client_t *cli) { "state", json11::Json::array { "active" } }, }, }); - json11::Json::object changes; + std::map changes; cli->st_cli.on_change_hook(changes); }